"""Low-level API client for LabArchives electronic lab notebook.
This module provides a reusable client for interacting with the LabArchives API.
Authentication uses HMAC-SHA-512 signed requests (akid + method + expires, keyed
with the access password).
LabArchives API documentation:
https://api.labarchives.com/api_docs/
Example usage:
>>> from nexusLIMS.utils.labarchives import get_labarchives_client
>>> client = get_labarchives_client()
>>> nodes = client.get_tree_level("12345", "0")
>>> for node in nodes:
... print(f"{node['tree_id']}: {node['display_text']}")
>>> folder_id = client.insert_folder("12345", "0", "My Folder")
>>> page_id = client.insert_page("12345", folder_id, "My Page")
"""
from __future__ import annotations
import base64
import hashlib
import hmac
import logging
import time
import xml.etree.ElementTree as ET
from http import HTTPStatus
from pathlib import Path # noqa: TC003
from typing import Any
import requests
from nexusLIMS.config import settings
_logger = logging.getLogger(__name__)
# Minimum interval between API calls (seconds) per LabArchives ToS
_MIN_CALL_INTERVAL = 1.0
# Exponential backoff settings for 5xx errors
_MAX_RETRIES = 3
_RETRY_BASE_DELAY = 2.0
[docs]
class LabArchivesError(Exception):
"""Base exception for LabArchives API errors."""
[docs]
class LabArchivesAuthenticationError(LabArchivesError):
"""Authentication failed (invalid credentials or session expired)."""
[docs]
class LabArchivesPermissionError(LabArchivesError):
"""Insufficient permissions for the requested operation."""
[docs]
class LabArchivesNotFoundError(LabArchivesError):
"""Requested resource not found."""
[docs]
class LabArchivesRateLimitError(LabArchivesError):
"""Server error or rate limit exceeded (5xx response)."""
# LabArchives error codes that indicate authentication failures.
# 4514 = "Login and/or password information is incorrect" — returned on 404 for
# user_access_info when the supplied user credentials are wrong.
_AUTH_ERROR_CODES = {"4504", "4506", "4507", "4514", "4520", "4533"}
# LabArchives error codes that indicate permission failures
_PERM_ERROR_CODES = {"4501", "4502"}
[docs]
class LabArchivesClient:
"""Low-level client for the LabArchives API.
Authentication uses HMAC-SHA-512 signed requests. Each request includes:
- ``akid``: Access Key ID
- ``expires``: Unix timestamp in milliseconds
- ``sig``: URL-encoded base64 HMAC-SHA-512 signature of ``akid + method + expires``
Parameters
----------
base_url : str
API base URL including the ``/api`` path segment
(e.g., ``"https://api.labarchives.com/api"``).
akid : str
Access Key ID for API authentication.
password : str
Access password (HMAC signing secret).
uid : str
LabArchives user ID for the account that owns records.
Examples
--------
>>> client = LabArchivesClient(
... base_url="https://api.labarchives.com/api",
... akid="your-akid",
... password="your-password",
... uid="your-uid",
... )
>>> nodes = client.get_tree_level("12345", "0")
"""
def __init__(self, base_url: str, akid: str, password: str, uid: str) -> None:
"""Initialize LabArchives API client.
Parameters
----------
base_url : str
Root URL of the LabArchives instance
akid : str
Access Key ID
password : str
Access password (HMAC signing secret)
uid : str
LabArchives user ID
"""
self.base_url: str = base_url.rstrip("/")
self.akid: str = akid
self.password: str = password
self.uid: str = uid
self._last_call_time: float = 0.0
def _sign(self, method: str) -> tuple[str, str]:
"""Generate HMAC-SHA-512 signature for an API method call.
Parameters
----------
method : str
API method name only, without class prefix (e.g., "tree_level")
Returns
-------
tuple[str, str]
``(expires_ms, sig)`` where ``expires_ms`` is the Unix timestamp
in milliseconds as a string and ``sig`` is the raw base64-encoded
HMAC-SHA-512 signature. Do **not** pre-encode the signature —
``requests`` URL-encodes query parameter values automatically, so
pre-encoding with ``urllib.parse.quote`` would double-encode it
and invalidate the signature on the wire.
"""
expires = str(int(time.time() * 1000))
msg = self.akid + method + expires
raw_sig = hmac.new(
self.password.encode("utf-8"),
msg.encode("utf-8"),
hashlib.sha512,
).digest()
return expires, base64.b64encode(raw_sig).decode("utf-8")
def _auth_params(self, method: str) -> dict[str, str]:
"""Build authentication parameters for a request.
Parameters
----------
method : str
API method name
Returns
-------
dict[str, str]
Dict with ``akid``, ``expires``, and ``sig`` keys
"""
expires, sig = self._sign(method)
return {"akid": self.akid, "expires": expires, "sig": sig}
def _throttle(self) -> None:
"""Enforce minimum interval between API calls per LabArchives ToS."""
elapsed = time.time() - self._last_call_time
if elapsed < _MIN_CALL_INTERVAL:
time.sleep(_MIN_CALL_INTERVAL - elapsed) # pragma: no cover
self._last_call_time = time.time()
def _get(
self,
api_class: str,
method: str,
params: dict[str, Any] | None = None,
) -> ET.Element:
"""Make an authenticated GET request and return parsed XML root.
Parameters
----------
api_class : str
API class path (e.g., "notebooks")
method : str
API method name (e.g., "tree_level")
params : dict, optional
Additional query parameters
Returns
-------
ET.Element
Parsed XML root element from the response
Raises
------
LabArchivesError
For general API errors
LabArchivesAuthenticationError
For authentication failures
LabArchivesPermissionError
For permission failures
LabArchivesNotFoundError
For 404 responses
LabArchivesRateLimitError
For 5xx responses
"""
full_method = f"{api_class}/{method}"
auth = self._auth_params(method)
all_params = {**auth, **({"uid": self.uid} if self.uid else {})}
if params:
all_params.update(params)
url = f"{self.base_url}/{full_method}"
self._throttle()
_logger.debug("LabArchives GET %s", url)
for attempt in range(_MAX_RETRIES):
resp = requests.get(url, params=all_params, timeout=30)
_logger.debug("LabArchives GET %s → HTTP %s", url, resp.status_code)
if resp.status_code < HTTPStatus.INTERNAL_SERVER_ERROR:
break
if attempt < _MAX_RETRIES - 1:
delay = _RETRY_BASE_DELAY ** (attempt + 1)
_logger.warning(
"LabArchives GET %s returned %s, retrying in %.1fs",
url,
resp.status_code,
delay,
)
time.sleep(delay)
else:
msg = (
f"LabArchives API server error after {_MAX_RETRIES} retries: "
f"{resp.status_code}"
)
raise LabArchivesRateLimitError(msg)
return self._parse_response(resp, full_method)
def _get_raw( # pragma: no cover
self,
api_class: str,
method: str,
params: dict[str, Any] | None = None,
) -> bytes:
"""Make an authenticated GET request and return the raw response body.
Used for endpoints that return binary data rather than XML (e.g.,
``entries/entry_attachment``).
Parameters
----------
api_class : str
API class path (e.g., "entries")
method : str
API method name (e.g., "entry_attachment")
params : dict, optional
Additional query parameters
Returns
-------
bytes
Raw response body
Raises
------
LabArchivesNotFoundError
For HTTP 404 responses
LabArchivesRateLimitError
For HTTP 5xx responses after retries
LabArchivesError
For other non-2xx responses
"""
full_method = f"{api_class}/{method}"
auth = self._auth_params(method)
all_params = {**auth, **({"uid": self.uid} if self.uid else {})}
if params:
all_params.update(params)
url = f"{self.base_url}/{full_method}"
self._throttle()
_logger.debug("LabArchives GET (raw) %s", url)
for attempt in range(_MAX_RETRIES):
resp = requests.get(url, params=all_params, timeout=30)
_logger.debug("LabArchives GET (raw) %s → HTTP %s", url, resp.status_code)
if resp.status_code < HTTPStatus.INTERNAL_SERVER_ERROR:
break
if attempt < _MAX_RETRIES - 1:
delay = _RETRY_BASE_DELAY ** (attempt + 1)
_logger.warning(
"LabArchives GET (raw) %s returned %s, retrying in %.1fs",
url,
resp.status_code,
delay,
)
time.sleep(delay)
else:
msg = (
f"LabArchives API server error after {_MAX_RETRIES} retries: "
f"{resp.status_code}"
)
raise LabArchivesRateLimitError(msg)
if resp.status_code == HTTPStatus.NOT_FOUND:
msg = f"LabArchives resource not found (HTTP 404): {full_method}"
raise LabArchivesNotFoundError(msg)
if not resp.ok:
msg = f"LabArchives request failed (HTTP {resp.status_code}): {full_method}"
raise LabArchivesError(msg)
return resp.content
def _post(
self,
api_class: str,
method: str,
params: dict[str, Any] | None = None,
data: bytes | None = None,
form: dict[str, Any] | None = None,
) -> ET.Element:
"""Make an authenticated POST request and return parsed XML root.
Parameters
----------
api_class : str
API class path (e.g., "entries")
method : str
API method name (e.g., "add_entry_to_page")
params : dict, optional
Additional query parameters (appended to URL)
data : bytes, optional
Raw binary data for the request body
form : dict, optional
Form data (``application/x-www-form-urlencoded``)
Returns
-------
ET.Element
Parsed XML root element from the response
"""
full_method = f"{api_class}/{method}"
auth = self._auth_params(method)
all_params = {**auth, **({"uid": self.uid} if self.uid else {})}
if params:
all_params.update(params)
url = f"{self.base_url}/{full_method}"
self._throttle()
_logger.debug("LabArchives POST %s", url)
headers: dict[str, str] = {}
if data is not None:
headers["Content-Type"] = "application/octet-stream"
for attempt in range(_MAX_RETRIES):
resp = requests.post(
url,
params=all_params,
data=data if data is not None else (form or {}),
headers=headers,
timeout=60,
)
if resp.status_code < HTTPStatus.INTERNAL_SERVER_ERROR:
break
if attempt < _MAX_RETRIES - 1: # pragma: no cover
delay = _RETRY_BASE_DELAY ** (attempt + 1)
_logger.warning(
"LabArchives POST %s returned %s, retrying in %.1fs",
url,
resp.status_code,
delay,
)
time.sleep(delay)
else: # pragma: no cover
msg = (
f"LabArchives API server error after {_MAX_RETRIES} retries: "
f"{resp.status_code}"
)
raise LabArchivesRateLimitError(msg)
return self._parse_response(resp, full_method)
def _parse_response(self, resp: requests.Response, method: str) -> ET.Element:
"""Parse XML response and raise typed exceptions for errors.
Parameters
----------
resp : requests.Response
HTTP response from the LabArchives API
method : str
API method that was called (for error messages)
Returns
-------
ET.Element
Parsed XML root element
Raises
------
LabArchivesNotFoundError
For HTTP 404 responses
LabArchivesRateLimitError
For HTTP 5xx responses
LabArchivesAuthenticationError
For LA error codes indicating auth failures
LabArchivesPermissionError
For LA error codes indicating permission failures
LabArchivesError
For other API errors
"""
if resp.status_code >= HTTPStatus.INTERNAL_SERVER_ERROR:
_logger.debug("LabArchives %s body: %s", resp.status_code, resp.text[:1000])
msg = f"LabArchives server error HTTP {resp.status_code}: {method}"
raise LabArchivesRateLimitError(msg)
try:
root = ET.fromstring(resp.text) # noqa: S314
except ET.ParseError as e:
_logger.debug("LabArchives unparseable body: %s", resp.text[:1000])
# Fall back to HTTP-status-based errors when XML is unparseable.
if resp.status_code == HTTPStatus.NOT_FOUND:
msg = f"LabArchives resource not found (HTTP 404): {method}"
raise LabArchivesNotFoundError(msg) from e
msg = (
f"Failed to parse LabArchives XML response for {method} "
f"(HTTP {resp.status_code}): {e}\n"
f"Response: {resp.text[:500]}"
)
raise LabArchivesError(msg) from e
# Check for LabArchives error responses.
# The API uses two different element naming schemes:
# legacy: <error><code>N</code><msg>…</msg></error>
# current: <error><error-code>N</error-code>
# <error-description>…</error-description></error>
error_el = root.find(".//error")
if error_el is not None:
code_el = error_el.find("error-code")
if code_el is None:
code_el = error_el.find("code")
msg_el = error_el.find("error-description")
if msg_el is None:
msg_el = error_el.find("msg")
code = code_el.text.strip() if code_el is not None and code_el.text else ""
msg = (
msg_el.text.strip()
if msg_el is not None and msg_el.text
else "Unknown error"
)
_logger.debug(
"LabArchives error response (HTTP %s, code %s): %s\nBody: %s",
resp.status_code,
code,
msg,
resp.text[:1000],
)
full_msg = f"HTTP {resp.status_code}, error code {code!r}: {msg}"
if code in _AUTH_ERROR_CODES:
raise LabArchivesAuthenticationError(full_msg)
if code in _PERM_ERROR_CODES:
raise LabArchivesPermissionError(full_msg)
raise LabArchivesError(full_msg)
# No XML error element — fall back to HTTP status for 404.
if resp.status_code == HTTPStatus.NOT_FOUND: # pragma: no cover
msg = f"LabArchives resource not found (HTTP 404): {method}"
raise LabArchivesNotFoundError(msg)
return root
@staticmethod
def _pretty_print_response(root: ET.Element) -> None: # pragma: no cover
"""Print a nicely formatted XML representation of an API response.
Parameters
----------
root : ET.Element
Parsed XML root element (as returned by :meth:`_parse_response`)
"""
ET.indent(root)
print(ET.tostring(root, encoding="unicode")) # noqa: T201
# ------------------------------------------------------------------ #
# Public API methods #
# ------------------------------------------------------------------ #
[docs]
def get_tree_level(
self,
nbid: str,
parent_tree_id: str = "0",
) -> list[dict[str, Any]]:
"""Get child nodes at a tree level in a notebook.
Parameters
----------
nbid : str
Notebook ID
parent_tree_id : str, optional
Parent tree node ID; "0" for the root level
Returns
-------
list of dict
Each dict has ``tree_id``, ``display_text``, and ``is_page`` keys.
Raises
------
LabArchivesError
If the API call fails
"""
root = self._get(
"tree_tools",
"get_tree_level",
params={"nbid": nbid, "parent_tree_id": parent_tree_id},
)
nodes = []
for item in root.findall(".//level-node"):
tree_id_el = item.find("tree-id")
text_el = item.find("display-text")
is_page_el = item.find("is-page")
if tree_id_el is None or text_el is None: # pragma: no cover
continue
nodes.append(
{
"tree_id": tree_id_el.text or "",
"display_text": text_el.text or "",
"is_page": (is_page_el is not None and is_page_el.text == "true"),
}
)
return nodes
[docs]
def get_page_entries( # pragma: no cover
self,
nbid: str,
page_tree_id: str,
*,
include_content: bool = False,
) -> list[dict[str, Any]]:
"""Get all entries on a notebook page.
Parameters
----------
nbid : str
Notebook ID
page_tree_id : str
Tree ID of the page
include_content : bool, optional
When True, each entry dict will include an ``entry_data`` key
containing the raw HTML/text content of the entry.
Returns
-------
list of dict
Each dict has ``eid`` and ``part_type`` keys, plus ``entry_data``
when *include_content* is True.
Raises
------
LabArchivesError
If the API call fails
"""
root = self._get(
"tree_tools",
"get_entries_for_page",
params={
"nbid": nbid,
"page_tree_id": page_tree_id,
"entry_data": "true" if include_content else "false",
},
)
entries = []
for entry_el in root.findall(".//entry"):
eid_el = entry_el.find("eid")
part_type_el = entry_el.find("part-type")
entry: dict[str, Any] = {
"eid": eid_el.text.strip()
if eid_el is not None and eid_el.text
else "",
"part_type": (
part_type_el.text.strip()
if part_type_el is not None and part_type_el.text
else ""
),
}
if include_content:
# The API uses <entry_data> (underscore) per the docs
data_el = entry_el.find("entry_data")
if data_el is None:
data_el = entry_el.find("entry-data")
entry["entry_data"] = (
data_el.text.strip() if data_el is not None and data_el.text else ""
)
entries.append(entry)
return entries
def _insert_node(
self,
nbid: str,
parent_tree_id: str,
display_text: str,
*,
is_folder: bool,
) -> str:
"""Create a folder or page node in a notebook tree.
Parameters
----------
nbid : str
Notebook ID
parent_tree_id : str
Parent tree node ID
display_text : str
Display name for the new node
is_folder : bool
True to create a folder; False to create a page
Returns
-------
str
The ``tree_id`` of the newly created node
Raises
------
LabArchivesError
If the API call fails
"""
root = self._get(
"tree_tools",
"insert_node",
params={
"nbid": nbid,
"parent_tree_id": parent_tree_id,
"display_text": display_text,
"is_folder": "true" if is_folder else "false",
},
)
tree_id_el = root.find(".//tree-id")
if tree_id_el is None or not tree_id_el.text:
msg = f"insert_node response missing tree-id for '{display_text}'"
raise LabArchivesError(msg)
return tree_id_el.text.strip()
[docs]
def insert_folder(self, nbid: str, parent_tree_id: str, name: str) -> str:
"""Create a folder in a notebook tree.
Parameters
----------
nbid : str
Notebook ID
parent_tree_id : str
Parent tree node ID; ``"0"`` for the root level
name : str
Display name for the new folder
Returns
-------
str
The ``tree_id`` of the newly created folder
Raises
------
LabArchivesError
If the API call fails
"""
return self._insert_node(nbid, parent_tree_id, name, is_folder=True)
[docs]
def insert_page(self, nbid: str, parent_tree_id: str, name: str) -> str:
"""Create a page in a notebook tree.
Parameters
----------
nbid : str
Notebook ID
parent_tree_id : str
Parent tree node ID (folder ``tree_id`` or ``"0"`` for root)
name : str
Display name for the new page
Returns
-------
str
The ``tree_id`` of the newly created page
Raises
------
LabArchivesError
If the API call fails
"""
return self._insert_node(nbid, parent_tree_id, name, is_folder=False)
[docs]
def add_entry(
self,
nbid: str,
page_tree_id: str,
entry_data: str,
part_type: str = "text entry",
) -> str:
"""Add a text/HTML entry to a notebook page.
Parameters
----------
nbid : str
Notebook ID
page_tree_id : str
Tree ID of the target page
entry_data : str
HTML or plain-text content for the entry
part_type : str, optional
Entry part type (default: "text entry")
Returns
-------
str
The ``eid`` (entry ID) of the newly created entry
Raises
------
LabArchivesError
If the API call fails
"""
root = self._post(
"entries",
"add_entry",
form={
"nbid": nbid,
"pid": page_tree_id,
"entry_data": entry_data,
"part_type": part_type,
},
)
eid_el = root.find(".//eid")
if eid_el is None or not eid_el.text:
msg = f"add_entry response missing eid for page {page_tree_id}"
raise LabArchivesError(msg)
return eid_el.text.strip()
[docs]
def add_attachment(
self,
nbid: str,
page_tree_id: str,
filename: str,
data: bytes,
caption: str = "",
) -> str:
"""Upload a file attachment to a notebook page.
Parameters
----------
nbid : str
Notebook ID
page_tree_id : str
Tree ID of the target page
filename : str
Name to use for the uploaded file
data : bytes
Raw file content
caption : str, optional
Caption for the attachment
Returns
-------
str
The ``eid`` (entry ID) of the attachment entry
Raises
------
LabArchivesError
If the API call fails
"""
root = self._post(
"entries",
"add_attachment",
params={
"nbid": nbid,
"pid": page_tree_id,
"filename": filename,
"caption": caption,
},
data=data,
)
eid_el = root.find(".//eid")
if eid_el is None or not eid_el.text: # pragma: no cover
msg = f"add_attachment response missing eid for page {page_tree_id}"
raise LabArchivesError(msg)
return eid_el.text.strip()
[docs]
def attach_file( # pragma: no cover
self,
nbid: str,
page_tree_id: str,
path: Path,
caption: str = "",
) -> str:
"""Upload a file from disk as an attachment to a notebook page.
Convenience wrapper around :meth:`add_attachment` that reads the file
and derives the filename from the path automatically.
Parameters
----------
nbid : str
Notebook ID
page_tree_id : str
Tree ID of the target page
path : Path
Local file to upload
caption : str, optional
Caption for the attachment
Returns
-------
str
The ``eid`` (entry ID) of the attachment entry
Raises
------
LabArchivesError
If the API call fails
"""
return self.add_attachment(
nbid, page_tree_id, path.name, path.read_bytes(), caption
)
[docs]
def get_attachment_content(self, eid: str) -> bytes: # pragma: no cover
"""Download the raw content of an attachment entry.
Parameters
----------
eid : str
Entry ID of the attachment (as returned by :meth:`add_attachment`)
Returns
-------
bytes
Raw file content of the attachment
Raises
------
LabArchivesNotFoundError
If the entry does not exist
LabArchivesError
If the API call fails
"""
return self._get_raw("entries", "entry_attachment", params={"eid": eid})
[docs]
def get_user_info(self, login: str, password: str) -> dict[str, Any]:
"""Exchange login credentials for user info and notebook list.
This method is used to obtain the ``uid`` for a LabArchives account.
The returned ``uid`` should be stored in ``NX_LABARCHIVES_USER_ID``
for subsequent API calls.
Parameters
----------
login : str
LabArchives login (email address)
password : str
LabArchives account password
Returns
-------
dict
User info including ``uid``, ``email``, and ``notebooks`` list.
Raises
------
LabArchivesAuthenticationError
If login credentials are invalid
LabArchivesError
If the API call fails
"""
# Response root element IS <users> — _parse_response returns it directly.
# Structure:
# <users>
# <id>…</id>
# <fullname>…</fullname>
# <email>…</email>
# <notebooks type="array">
# <notebook><id>…</id><name>…</name></notebook>
# </notebooks>
# </users>
root = self._get(
"users",
"user_access_info",
params={"login_or_email": login, "password": password},
)
# The <users> root may be a direct child of a <response> wrapper in
# tests; use a helper that finds it wherever it lives.
users_el = root if root.tag == "users" else root.find(".//users")
if users_el is None: # pragma: no cover
users_el = root
result: dict[str, Any] = {}
id_el = users_el.find("id")
if id_el is not None and id_el.text:
result["uid"] = id_el.text.strip()
for tag in ("fullname", "first-name", "last-name", "email"):
el = users_el.find(tag)
if el is not None and el.text:
result[tag] = el.text.strip()
notebooks = []
for nb in users_el.findall("notebooks/notebook"):
nb_id_el = nb.find("id")
nb_name_el = nb.find("name")
nb_data: dict[str, str] = {}
if nb_id_el is not None and nb_id_el.text:
nb_data["id"] = nb_id_el.text.strip()
if nb_name_el is not None and nb_name_el.text:
nb_data["name"] = nb_name_el.text.strip()
notebooks.append(nb_data)
result["notebooks"] = notebooks
return result
[docs]
def get_user_info_by_uid(
self, uid: str | None = None
) -> dict[str, Any]: # pragma: no cover
"""Fetch user info for a LabArchives account using a uid.
Calls the ``users/user_info_via_id`` endpoint, which requires only the
uid (no login password). When *uid* is omitted the client's own
``uid`` is used.
Parameters
----------
uid : str, optional
LabArchives user ID. Defaults to the client's configured uid.
Returns
-------
dict
User info including ``uid``, ``email``, and ``notebooks`` list.
Raises
------
LabArchivesError
If the API call fails
"""
# Structure mirrors user_access_info:
# <users>
# <id>…</id>
# <fullname>…</fullname>
# <email>…</email>
# <notebooks type="array">
# <notebook><id>…</id><name>…</name></notebook>
# </notebooks>
# </users>
lookup_uid = uid if uid is not None else self.uid
root = self._get(
"users",
"user_info_via_id",
params={"uid": lookup_uid},
)
users_el = root if root.tag == "users" else root.find(".//users")
if users_el is None:
users_el = root
result: dict[str, Any] = {}
id_el = users_el.find("id")
if id_el is not None and id_el.text:
result["uid"] = id_el.text.strip()
for tag in ("fullname", "first-name", "last-name", "email"):
el = users_el.find(tag)
if el is not None and el.text:
result[tag] = el.text.strip()
notebooks = []
for nb in users_el.findall("notebooks/notebook"):
nb_id_el = nb.find("id")
nb_name_el = nb.find("name")
nb_data: dict[str, str] = {}
if nb_id_el is not None and nb_id_el.text:
nb_data["id"] = nb_id_el.text.strip()
if nb_name_el is not None and nb_name_el.text:
nb_data["name"] = nb_name_el.text.strip()
notebooks.append(nb_data)
result["notebooks"] = notebooks
return result
[docs]
def get_labarchives_client() -> LabArchivesClient:
"""Get configured LabArchives client from settings.
Creates a :class:`LabArchivesClient` using credentials from the NexusLIMS
configuration (``NX_LABARCHIVES_*`` settings).
Returns
-------
LabArchivesClient
Configured client instance
Raises
------
LabArchivesError
If required settings are not configured
Examples
--------
>>> from nexusLIMS.utils.labarchives import get_labarchives_client
>>> client = get_labarchives_client()
>>> nodes = client.get_tree_level("12345", "0")
>>> folder_id = client.insert_folder("12345", "0", "My Folder")
>>> page_id = client.insert_page("12345", folder_id, "My Page")
"""
if not settings.NX_LABARCHIVES_ACCESS_KEY_ID:
msg = "NX_LABARCHIVES_ACCESS_KEY_ID not configured"
raise LabArchivesError(msg)
if not settings.NX_LABARCHIVES_ACCESS_PASSWORD:
msg = "NX_LABARCHIVES_ACCESS_PASSWORD not configured"
raise LabArchivesError(msg)
if not settings.NX_LABARCHIVES_USER_ID:
msg = "NX_LABARCHIVES_USER_ID not configured"
raise LabArchivesError(msg)
if not settings.NX_LABARCHIVES_URL:
msg = "NX_LABARCHIVES_URL not configured"
raise LabArchivesError(msg)
return LabArchivesClient(
base_url=str(settings.NX_LABARCHIVES_URL),
akid=settings.NX_LABARCHIVES_ACCESS_KEY_ID,
password=settings.NX_LABARCHIVES_ACCESS_PASSWORD,
uid=settings.NX_LABARCHIVES_USER_ID,
)