Source code for nexusLIMS.utils.network

"""Network and HTTP utilities for NexusLIMS."""

import logging
import tempfile
import time
from pathlib import Path

import certifi
from requests import Session
from requests.adapters import HTTPAdapter

from nexusLIMS.config import settings
from nexusLIMS.harvesters import get_ca_bundle_content

_logger = logging.getLogger(__name__)
_ssl_warning_logged = False


[docs] def nexus_req( url: str, function: str, *, retries: int = 5, token_auth: str | None = None, **kwargs: dict | None, ): """ Make a request from NexusLIMS. A helper method that wraps a function from :py:mod:`requests`, but adds a local certificate authority chain to validate any custom certificates. Will automatically retry on transient server errors (502, 503, 504) with exponential backoff. Parameters ---------- url The URL to fetch function The function from the ``requests`` library to use (e.g. ``'GET'``, ``'POST'``, ``'PATCH'``, etc.) retries The maximum number of retry attempts (total attempts = retries + 1) token_auth If a value is provided, it will be used as a token for authentication **kwargs : Other keyword arguments are passed along to the ``fn`` Returns ------- r : :py:class:`requests.Response` A requests response object """ # if token_auth is desired, add it to any existing headers passed along # with the request if token_auth: if "headers" in kwargs: kwargs["headers"]["Authorization"] = f"Token {token_auth}" else: kwargs["headers"] = {"Authorization": f"Token {token_auth}"} # Status codes that should trigger a retry (transient server errors) retry_status_codes = {502, 503, 504} # Set up a session (without urllib3 retry logic - we'll handle it ourselves) s = Session() s.mount("https://", HTTPAdapter()) s.mount("http://", HTTPAdapter()) verify_arg = True response = None # honour NX_DISABLE_SSL_VERIFY (warn once per process) global _ssl_warning_logged # noqa: PLW0603 if settings.NX_DISABLE_SSL_VERIFY: verify_arg = False if not _ssl_warning_logged: _logger.warning( "NX_DISABLE_SSL_VERIFY is enabled — SSL certificate " "verification is disabled for all requests. This should " "only be used during local development or testing." ) _ssl_warning_logged = True with tempfile.NamedTemporaryFile() as tmp: if verify_arg is not False and (ca_bundle_content := get_ca_bundle_content()): with Path(certifi.where()).open(mode="rb") as sys_cert: lines = sys_cert.readlines() tmp.writelines(lines) tmp.writelines(ca_bundle_content) tmp.seek(0) verify_arg = tmp.name # Retry loop with exponential backoff for attempt in range(retries + 1): response = s.request(function, url, verify=verify_arg, **kwargs) # If we got a successful response or non-retryable error, return it if response.status_code not in retry_status_codes: return response # If this is our last attempt, return the failed response if attempt == retries: _logger.warning( "Request to %s failed with %s after %s attempts", url, response.status_code, retries + 1, ) return response # Calculate backoff delay: 1s, 2s, 4s, 8s, etc. delay = 2**attempt _logger.debug( "Request to %s returned %s, retrying in %ss (attempt %s/%s)", url, response.status_code, delay, attempt + 1, retries + 1, ) time.sleep(delay) # This should never be reached in normal execution, but provides a fallback # if the retry loop somehow doesn't execute (e.g., invalid retries parameter) return response # pragma: no cover