Coverage for nexusLIMS/utils/network.py: 100%
45 statements
« prev ^ index » next coverage.py v7.11.3, created at 2026-03-24 05:23 +0000
« prev ^ index » next coverage.py v7.11.3, created at 2026-03-24 05:23 +0000
1"""Network and HTTP utilities for NexusLIMS."""
3import logging
4import tempfile
5import time
6from pathlib import Path
8import certifi
9from requests import Session
10from requests.adapters import HTTPAdapter
12from nexusLIMS.config import settings
13from nexusLIMS.harvesters import get_ca_bundle_content
15_logger = logging.getLogger(__name__)
16_ssl_warning_logged = False
19def nexus_req(
20 url: str,
21 function: str,
22 *,
23 retries: int = 5,
24 token_auth: str | None = None,
25 **kwargs: dict | None,
26):
27 """
28 Make a request from NexusLIMS.
30 A helper method that wraps a function from :py:mod:`requests`, but adds a
31 local certificate authority chain to validate any custom certificates.
32 Will automatically retry on transient server errors (502, 503, 504) with
33 exponential backoff.
35 Parameters
36 ----------
37 url
38 The URL to fetch
39 function
40 The function from the ``requests`` library to use (e.g.
41 ``'GET'``, ``'POST'``, ``'PATCH'``, etc.)
42 retries
43 The maximum number of retry attempts (total attempts = retries + 1)
44 token_auth
45 If a value is provided, it will be used as a token for authentication
46 **kwargs :
47 Other keyword arguments are passed along to the ``fn``
49 Returns
50 -------
51 r : :py:class:`requests.Response`
52 A requests response object
53 """
54 # if token_auth is desired, add it to any existing headers passed along
55 # with the request
56 if token_auth:
57 if "headers" in kwargs:
58 kwargs["headers"]["Authorization"] = f"Token {token_auth}"
59 else:
60 kwargs["headers"] = {"Authorization": f"Token {token_auth}"}
62 # Status codes that should trigger a retry (transient server errors)
63 retry_status_codes = {502, 503, 504}
65 # Set up a session (without urllib3 retry logic - we'll handle it ourselves)
66 s = Session()
67 s.mount("https://", HTTPAdapter())
68 s.mount("http://", HTTPAdapter())
70 verify_arg = True
71 response = None
73 # honour NX_DISABLE_SSL_VERIFY (warn once per process)
74 global _ssl_warning_logged # noqa: PLW0603
75 if settings.NX_DISABLE_SSL_VERIFY:
76 verify_arg = False
77 if not _ssl_warning_logged:
78 _logger.warning(
79 "NX_DISABLE_SSL_VERIFY is enabled — SSL certificate "
80 "verification is disabled for all requests. This should "
81 "only be used during local development or testing."
82 )
83 _ssl_warning_logged = True
85 with tempfile.NamedTemporaryFile() as tmp:
86 if verify_arg is not False and (ca_bundle_content := get_ca_bundle_content()):
87 with Path(certifi.where()).open(mode="rb") as sys_cert:
88 lines = sys_cert.readlines()
89 tmp.writelines(lines)
90 tmp.writelines(ca_bundle_content)
91 tmp.seek(0)
92 verify_arg = tmp.name
94 # Retry loop with exponential backoff
95 for attempt in range(retries + 1):
96 response = s.request(function, url, verify=verify_arg, **kwargs)
98 # If we got a successful response or non-retryable error, return it
99 if response.status_code not in retry_status_codes:
100 return response
102 # If this is our last attempt, return the failed response
103 if attempt == retries:
104 _logger.warning(
105 "Request to %s failed with %s after %s attempts",
106 url,
107 response.status_code,
108 retries + 1,
109 )
110 return response
112 # Calculate backoff delay: 1s, 2s, 4s, 8s, etc.
113 delay = 2**attempt
114 _logger.debug(
115 "Request to %s returned %s, retrying in %ss (attempt %s/%s)",
116 url,
117 response.status_code,
118 delay,
119 attempt + 1,
120 retries + 1,
121 )
122 time.sleep(delay)
124 # This should never be reached in normal execution, but provides a fallback
125 # if the retry loop somehow doesn't execute (e.g., invalid retries parameter)
126 return response # pragma: no cover