Coverage for nexusLIMS/utils/labarchives.py: 100%
180 statements
« prev ^ index » next coverage.py v7.11.3, created at 2026-03-24 05:23 +0000
« prev ^ index » next coverage.py v7.11.3, created at 2026-03-24 05:23 +0000
1"""Low-level API client for LabArchives electronic lab notebook.
3This module provides a reusable client for interacting with the LabArchives API.
4Authentication uses HMAC-SHA-512 signed requests (akid + method + expires, keyed
5with the access password).
7LabArchives API documentation:
8 https://api.labarchives.com/api_docs/
10Example usage:
11 >>> from nexusLIMS.utils.labarchives import get_labarchives_client
12 >>> client = get_labarchives_client()
13 >>> nodes = client.get_tree_level("12345", "0")
14 >>> for node in nodes:
15 ... print(f"{node['tree_id']}: {node['display_text']}")
16 >>> folder_id = client.insert_folder("12345", "0", "My Folder")
17 >>> page_id = client.insert_page("12345", folder_id, "My Page")
18"""
20from __future__ import annotations
22import base64
23import hashlib
24import hmac
25import logging
26import time
27import xml.etree.ElementTree as ET
28from http import HTTPStatus
29from pathlib import Path # noqa: TC003
30from typing import Any
32import requests
34from nexusLIMS.config import settings
36_logger = logging.getLogger(__name__)
38# Minimum interval between API calls (seconds) per LabArchives ToS
39_MIN_CALL_INTERVAL = 1.0
40# Exponential backoff settings for 5xx errors
41_MAX_RETRIES = 3
42_RETRY_BASE_DELAY = 2.0
45class LabArchivesError(Exception):
46 """Base exception for LabArchives API errors."""
49class LabArchivesAuthenticationError(LabArchivesError):
50 """Authentication failed (invalid credentials or session expired)."""
53class LabArchivesPermissionError(LabArchivesError):
54 """Insufficient permissions for the requested operation."""
57class LabArchivesNotFoundError(LabArchivesError):
58 """Requested resource not found."""
61class LabArchivesRateLimitError(LabArchivesError):
62 """Server error or rate limit exceeded (5xx response)."""
65# LabArchives error codes that indicate authentication failures.
66# 4514 = "Login and/or password information is incorrect" — returned on 404 for
67# user_access_info when the supplied user credentials are wrong.
68_AUTH_ERROR_CODES = {"4504", "4506", "4507", "4514", "4520", "4533"}
69# LabArchives error codes that indicate permission failures
70_PERM_ERROR_CODES = {"4501", "4502"}
73class LabArchivesClient:
74 """Low-level client for the LabArchives API.
76 Authentication uses HMAC-SHA-512 signed requests. Each request includes:
77 - ``akid``: Access Key ID
78 - ``expires``: Unix timestamp in milliseconds
79 - ``sig``: URL-encoded base64 HMAC-SHA-512 signature of ``akid + method + expires``
81 Parameters
82 ----------
83 base_url : str
84 API base URL including the ``/api`` path segment
85 (e.g., ``"https://api.labarchives.com/api"``).
86 akid : str
87 Access Key ID for API authentication.
88 password : str
89 Access password (HMAC signing secret).
90 uid : str
91 LabArchives user ID for the account that owns records.
93 Examples
94 --------
95 >>> client = LabArchivesClient(
96 ... base_url="https://api.labarchives.com/api",
97 ... akid="your-akid",
98 ... password="your-password",
99 ... uid="your-uid",
100 ... )
101 >>> nodes = client.get_tree_level("12345", "0")
102 """
104 def __init__(self, base_url: str, akid: str, password: str, uid: str) -> None:
105 """Initialize LabArchives API client.
107 Parameters
108 ----------
109 base_url : str
110 Root URL of the LabArchives instance
111 akid : str
112 Access Key ID
113 password : str
114 Access password (HMAC signing secret)
115 uid : str
116 LabArchives user ID
117 """
118 self.base_url: str = base_url.rstrip("/")
119 self.akid: str = akid
120 self.password: str = password
121 self.uid: str = uid
122 self._last_call_time: float = 0.0
124 def _sign(self, method: str) -> tuple[str, str]:
125 """Generate HMAC-SHA-512 signature for an API method call.
127 Parameters
128 ----------
129 method : str
130 API method name only, without class prefix (e.g., "tree_level")
132 Returns
133 -------
134 tuple[str, str]
135 ``(expires_ms, sig)`` where ``expires_ms`` is the Unix timestamp
136 in milliseconds as a string and ``sig`` is the raw base64-encoded
137 HMAC-SHA-512 signature. Do **not** pre-encode the signature —
138 ``requests`` URL-encodes query parameter values automatically, so
139 pre-encoding with ``urllib.parse.quote`` would double-encode it
140 and invalidate the signature on the wire.
141 """
142 expires = str(int(time.time() * 1000))
143 msg = self.akid + method + expires
144 raw_sig = hmac.new(
145 self.password.encode("utf-8"),
146 msg.encode("utf-8"),
147 hashlib.sha512,
148 ).digest()
149 return expires, base64.b64encode(raw_sig).decode("utf-8")
151 def _auth_params(self, method: str) -> dict[str, str]:
152 """Build authentication parameters for a request.
154 Parameters
155 ----------
156 method : str
157 API method name
159 Returns
160 -------
161 dict[str, str]
162 Dict with ``akid``, ``expires``, and ``sig`` keys
163 """
164 expires, sig = self._sign(method)
165 return {"akid": self.akid, "expires": expires, "sig": sig}
167 def _throttle(self) -> None:
168 """Enforce minimum interval between API calls per LabArchives ToS."""
169 elapsed = time.time() - self._last_call_time
170 if elapsed < _MIN_CALL_INTERVAL:
171 time.sleep(_MIN_CALL_INTERVAL - elapsed) # pragma: no cover
172 self._last_call_time = time.time()
174 def _get(
175 self,
176 api_class: str,
177 method: str,
178 params: dict[str, Any] | None = None,
179 ) -> ET.Element:
180 """Make an authenticated GET request and return parsed XML root.
182 Parameters
183 ----------
184 api_class : str
185 API class path (e.g., "notebooks")
186 method : str
187 API method name (e.g., "tree_level")
188 params : dict, optional
189 Additional query parameters
191 Returns
192 -------
193 ET.Element
194 Parsed XML root element from the response
196 Raises
197 ------
198 LabArchivesError
199 For general API errors
200 LabArchivesAuthenticationError
201 For authentication failures
202 LabArchivesPermissionError
203 For permission failures
204 LabArchivesNotFoundError
205 For 404 responses
206 LabArchivesRateLimitError
207 For 5xx responses
208 """
209 full_method = f"{api_class}/{method}"
210 auth = self._auth_params(method)
211 all_params = {**auth, **({"uid": self.uid} if self.uid else {})}
212 if params:
213 all_params.update(params)
215 url = f"{self.base_url}/{full_method}"
216 self._throttle()
217 _logger.debug("LabArchives GET %s", url)
219 for attempt in range(_MAX_RETRIES):
220 resp = requests.get(url, params=all_params, timeout=30)
221 _logger.debug("LabArchives GET %s → HTTP %s", url, resp.status_code)
222 if resp.status_code < HTTPStatus.INTERNAL_SERVER_ERROR:
223 break
224 if attempt < _MAX_RETRIES - 1:
225 delay = _RETRY_BASE_DELAY ** (attempt + 1)
226 _logger.warning(
227 "LabArchives GET %s returned %s, retrying in %.1fs",
228 url,
229 resp.status_code,
230 delay,
231 )
232 time.sleep(delay)
233 else:
234 msg = (
235 f"LabArchives API server error after {_MAX_RETRIES} retries: "
236 f"{resp.status_code}"
237 )
238 raise LabArchivesRateLimitError(msg)
240 return self._parse_response(resp, full_method)
242 def _get_raw( # pragma: no cover
243 self,
244 api_class: str,
245 method: str,
246 params: dict[str, Any] | None = None,
247 ) -> bytes:
248 """Make an authenticated GET request and return the raw response body.
250 Used for endpoints that return binary data rather than XML (e.g.,
251 ``entries/entry_attachment``).
253 Parameters
254 ----------
255 api_class : str
256 API class path (e.g., "entries")
257 method : str
258 API method name (e.g., "entry_attachment")
259 params : dict, optional
260 Additional query parameters
262 Returns
263 -------
264 bytes
265 Raw response body
267 Raises
268 ------
269 LabArchivesNotFoundError
270 For HTTP 404 responses
271 LabArchivesRateLimitError
272 For HTTP 5xx responses after retries
273 LabArchivesError
274 For other non-2xx responses
275 """
276 full_method = f"{api_class}/{method}"
277 auth = self._auth_params(method)
278 all_params = {**auth, **({"uid": self.uid} if self.uid else {})}
279 if params:
280 all_params.update(params)
282 url = f"{self.base_url}/{full_method}"
283 self._throttle()
284 _logger.debug("LabArchives GET (raw) %s", url)
286 for attempt in range(_MAX_RETRIES):
287 resp = requests.get(url, params=all_params, timeout=30)
288 _logger.debug("LabArchives GET (raw) %s → HTTP %s", url, resp.status_code)
289 if resp.status_code < HTTPStatus.INTERNAL_SERVER_ERROR:
290 break
291 if attempt < _MAX_RETRIES - 1:
292 delay = _RETRY_BASE_DELAY ** (attempt + 1)
293 _logger.warning(
294 "LabArchives GET (raw) %s returned %s, retrying in %.1fs",
295 url,
296 resp.status_code,
297 delay,
298 )
299 time.sleep(delay)
300 else:
301 msg = (
302 f"LabArchives API server error after {_MAX_RETRIES} retries: "
303 f"{resp.status_code}"
304 )
305 raise LabArchivesRateLimitError(msg)
307 if resp.status_code == HTTPStatus.NOT_FOUND:
308 msg = f"LabArchives resource not found (HTTP 404): {full_method}"
309 raise LabArchivesNotFoundError(msg)
310 if not resp.ok:
311 msg = f"LabArchives request failed (HTTP {resp.status_code}): {full_method}"
312 raise LabArchivesError(msg)
314 return resp.content
316 def _post(
317 self,
318 api_class: str,
319 method: str,
320 params: dict[str, Any] | None = None,
321 data: bytes | None = None,
322 form: dict[str, Any] | None = None,
323 ) -> ET.Element:
324 """Make an authenticated POST request and return parsed XML root.
326 Parameters
327 ----------
328 api_class : str
329 API class path (e.g., "entries")
330 method : str
331 API method name (e.g., "add_entry_to_page")
332 params : dict, optional
333 Additional query parameters (appended to URL)
334 data : bytes, optional
335 Raw binary data for the request body
336 form : dict, optional
337 Form data (``application/x-www-form-urlencoded``)
339 Returns
340 -------
341 ET.Element
342 Parsed XML root element from the response
343 """
344 full_method = f"{api_class}/{method}"
345 auth = self._auth_params(method)
346 all_params = {**auth, **({"uid": self.uid} if self.uid else {})}
347 if params:
348 all_params.update(params)
350 url = f"{self.base_url}/{full_method}"
351 self._throttle()
352 _logger.debug("LabArchives POST %s", url)
354 headers: dict[str, str] = {}
355 if data is not None:
356 headers["Content-Type"] = "application/octet-stream"
358 for attempt in range(_MAX_RETRIES):
359 resp = requests.post(
360 url,
361 params=all_params,
362 data=data if data is not None else (form or {}),
363 headers=headers,
364 timeout=60,
365 )
366 if resp.status_code < HTTPStatus.INTERNAL_SERVER_ERROR:
367 break
368 if attempt < _MAX_RETRIES - 1: # pragma: no cover
369 delay = _RETRY_BASE_DELAY ** (attempt + 1)
370 _logger.warning(
371 "LabArchives POST %s returned %s, retrying in %.1fs",
372 url,
373 resp.status_code,
374 delay,
375 )
376 time.sleep(delay)
377 else: # pragma: no cover
378 msg = (
379 f"LabArchives API server error after {_MAX_RETRIES} retries: "
380 f"{resp.status_code}"
381 )
382 raise LabArchivesRateLimitError(msg)
384 return self._parse_response(resp, full_method)
386 def _parse_response(self, resp: requests.Response, method: str) -> ET.Element:
387 """Parse XML response and raise typed exceptions for errors.
389 Parameters
390 ----------
391 resp : requests.Response
392 HTTP response from the LabArchives API
393 method : str
394 API method that was called (for error messages)
396 Returns
397 -------
398 ET.Element
399 Parsed XML root element
401 Raises
402 ------
403 LabArchivesNotFoundError
404 For HTTP 404 responses
405 LabArchivesRateLimitError
406 For HTTP 5xx responses
407 LabArchivesAuthenticationError
408 For LA error codes indicating auth failures
409 LabArchivesPermissionError
410 For LA error codes indicating permission failures
411 LabArchivesError
412 For other API errors
413 """
414 if resp.status_code >= HTTPStatus.INTERNAL_SERVER_ERROR:
415 _logger.debug("LabArchives %s body: %s", resp.status_code, resp.text[:1000])
416 msg = f"LabArchives server error HTTP {resp.status_code}: {method}"
417 raise LabArchivesRateLimitError(msg)
419 try:
420 root = ET.fromstring(resp.text) # noqa: S314
421 except ET.ParseError as e:
422 _logger.debug("LabArchives unparseable body: %s", resp.text[:1000])
423 # Fall back to HTTP-status-based errors when XML is unparseable.
424 if resp.status_code == HTTPStatus.NOT_FOUND:
425 msg = f"LabArchives resource not found (HTTP 404): {method}"
426 raise LabArchivesNotFoundError(msg) from e
427 msg = (
428 f"Failed to parse LabArchives XML response for {method} "
429 f"(HTTP {resp.status_code}): {e}\n"
430 f"Response: {resp.text[:500]}"
431 )
432 raise LabArchivesError(msg) from e
434 # Check for LabArchives error responses.
435 # The API uses two different element naming schemes:
436 # legacy: <error><code>N</code><msg>…</msg></error>
437 # current: <error><error-code>N</error-code>
438 # <error-description>…</error-description></error>
439 error_el = root.find(".//error")
440 if error_el is not None:
441 code_el = error_el.find("error-code")
442 if code_el is None:
443 code_el = error_el.find("code")
444 msg_el = error_el.find("error-description")
445 if msg_el is None:
446 msg_el = error_el.find("msg")
447 code = code_el.text.strip() if code_el is not None and code_el.text else ""
448 msg = (
449 msg_el.text.strip()
450 if msg_el is not None and msg_el.text
451 else "Unknown error"
452 )
453 _logger.debug(
454 "LabArchives error response (HTTP %s, code %s): %s\nBody: %s",
455 resp.status_code,
456 code,
457 msg,
458 resp.text[:1000],
459 )
460 full_msg = f"HTTP {resp.status_code}, error code {code!r}: {msg}"
462 if code in _AUTH_ERROR_CODES:
463 raise LabArchivesAuthenticationError(full_msg)
464 if code in _PERM_ERROR_CODES:
465 raise LabArchivesPermissionError(full_msg)
466 raise LabArchivesError(full_msg)
468 # No XML error element — fall back to HTTP status for 404.
469 if resp.status_code == HTTPStatus.NOT_FOUND: # pragma: no cover
470 msg = f"LabArchives resource not found (HTTP 404): {method}"
471 raise LabArchivesNotFoundError(msg)
473 return root
475 @staticmethod
476 def _pretty_print_response(root: ET.Element) -> None: # pragma: no cover
477 """Print a nicely formatted XML representation of an API response.
479 Parameters
480 ----------
481 root : ET.Element
482 Parsed XML root element (as returned by :meth:`_parse_response`)
483 """
484 ET.indent(root)
485 print(ET.tostring(root, encoding="unicode")) # noqa: T201
487 # ------------------------------------------------------------------ #
488 # Public API methods #
489 # ------------------------------------------------------------------ #
491 def get_tree_level(
492 self,
493 nbid: str,
494 parent_tree_id: str = "0",
495 ) -> list[dict[str, Any]]:
496 """Get child nodes at a tree level in a notebook.
498 Parameters
499 ----------
500 nbid : str
501 Notebook ID
502 parent_tree_id : str, optional
503 Parent tree node ID; "0" for the root level
505 Returns
506 -------
507 list of dict
508 Each dict has ``tree_id``, ``display_text``, and ``is_page`` keys.
510 Raises
511 ------
512 LabArchivesError
513 If the API call fails
514 """
515 root = self._get(
516 "tree_tools",
517 "get_tree_level",
518 params={"nbid": nbid, "parent_tree_id": parent_tree_id},
519 )
521 nodes = []
522 for item in root.findall(".//level-node"):
523 tree_id_el = item.find("tree-id")
524 text_el = item.find("display-text")
525 is_page_el = item.find("is-page")
526 if tree_id_el is None or text_el is None: # pragma: no cover
527 continue
528 nodes.append(
529 {
530 "tree_id": tree_id_el.text or "",
531 "display_text": text_el.text or "",
532 "is_page": (is_page_el is not None and is_page_el.text == "true"),
533 }
534 )
536 return nodes
538 def get_page_entries( # pragma: no cover
539 self,
540 nbid: str,
541 page_tree_id: str,
542 *,
543 include_content: bool = False,
544 ) -> list[dict[str, Any]]:
545 """Get all entries on a notebook page.
547 Parameters
548 ----------
549 nbid : str
550 Notebook ID
551 page_tree_id : str
552 Tree ID of the page
553 include_content : bool, optional
554 When True, each entry dict will include an ``entry_data`` key
555 containing the raw HTML/text content of the entry.
557 Returns
558 -------
559 list of dict
560 Each dict has ``eid`` and ``part_type`` keys, plus ``entry_data``
561 when *include_content* is True.
563 Raises
564 ------
565 LabArchivesError
566 If the API call fails
567 """
568 root = self._get(
569 "tree_tools",
570 "get_entries_for_page",
571 params={
572 "nbid": nbid,
573 "page_tree_id": page_tree_id,
574 "entry_data": "true" if include_content else "false",
575 },
576 )
578 entries = []
579 for entry_el in root.findall(".//entry"):
580 eid_el = entry_el.find("eid")
581 part_type_el = entry_el.find("part-type")
582 entry: dict[str, Any] = {
583 "eid": eid_el.text.strip()
584 if eid_el is not None and eid_el.text
585 else "",
586 "part_type": (
587 part_type_el.text.strip()
588 if part_type_el is not None and part_type_el.text
589 else ""
590 ),
591 }
592 if include_content:
593 # The API uses <entry_data> (underscore) per the docs
594 data_el = entry_el.find("entry_data")
595 if data_el is None:
596 data_el = entry_el.find("entry-data")
597 entry["entry_data"] = (
598 data_el.text.strip() if data_el is not None and data_el.text else ""
599 )
600 entries.append(entry)
602 return entries
604 def _insert_node(
605 self,
606 nbid: str,
607 parent_tree_id: str,
608 display_text: str,
609 *,
610 is_folder: bool,
611 ) -> str:
612 """Create a folder or page node in a notebook tree.
614 Parameters
615 ----------
616 nbid : str
617 Notebook ID
618 parent_tree_id : str
619 Parent tree node ID
620 display_text : str
621 Display name for the new node
622 is_folder : bool
623 True to create a folder; False to create a page
625 Returns
626 -------
627 str
628 The ``tree_id`` of the newly created node
630 Raises
631 ------
632 LabArchivesError
633 If the API call fails
634 """
635 root = self._get(
636 "tree_tools",
637 "insert_node",
638 params={
639 "nbid": nbid,
640 "parent_tree_id": parent_tree_id,
641 "display_text": display_text,
642 "is_folder": "true" if is_folder else "false",
643 },
644 )
646 tree_id_el = root.find(".//tree-id")
647 if tree_id_el is None or not tree_id_el.text:
648 msg = f"insert_node response missing tree-id for '{display_text}'"
649 raise LabArchivesError(msg)
651 return tree_id_el.text.strip()
653 def insert_folder(self, nbid: str, parent_tree_id: str, name: str) -> str:
654 """Create a folder in a notebook tree.
656 Parameters
657 ----------
658 nbid : str
659 Notebook ID
660 parent_tree_id : str
661 Parent tree node ID; ``"0"`` for the root level
662 name : str
663 Display name for the new folder
665 Returns
666 -------
667 str
668 The ``tree_id`` of the newly created folder
670 Raises
671 ------
672 LabArchivesError
673 If the API call fails
674 """
675 return self._insert_node(nbid, parent_tree_id, name, is_folder=True)
677 def insert_page(self, nbid: str, parent_tree_id: str, name: str) -> str:
678 """Create a page in a notebook tree.
680 Parameters
681 ----------
682 nbid : str
683 Notebook ID
684 parent_tree_id : str
685 Parent tree node ID (folder ``tree_id`` or ``"0"`` for root)
686 name : str
687 Display name for the new page
689 Returns
690 -------
691 str
692 The ``tree_id`` of the newly created page
694 Raises
695 ------
696 LabArchivesError
697 If the API call fails
698 """
699 return self._insert_node(nbid, parent_tree_id, name, is_folder=False)
701 def add_entry(
702 self,
703 nbid: str,
704 page_tree_id: str,
705 entry_data: str,
706 part_type: str = "text entry",
707 ) -> str:
708 """Add a text/HTML entry to a notebook page.
710 Parameters
711 ----------
712 nbid : str
713 Notebook ID
714 page_tree_id : str
715 Tree ID of the target page
716 entry_data : str
717 HTML or plain-text content for the entry
718 part_type : str, optional
719 Entry part type (default: "text entry")
721 Returns
722 -------
723 str
724 The ``eid`` (entry ID) of the newly created entry
726 Raises
727 ------
728 LabArchivesError
729 If the API call fails
730 """
731 root = self._post(
732 "entries",
733 "add_entry",
734 form={
735 "nbid": nbid,
736 "pid": page_tree_id,
737 "entry_data": entry_data,
738 "part_type": part_type,
739 },
740 )
742 eid_el = root.find(".//eid")
743 if eid_el is None or not eid_el.text:
744 msg = f"add_entry response missing eid for page {page_tree_id}"
745 raise LabArchivesError(msg)
747 return eid_el.text.strip()
749 def add_attachment(
750 self,
751 nbid: str,
752 page_tree_id: str,
753 filename: str,
754 data: bytes,
755 caption: str = "",
756 ) -> str:
757 """Upload a file attachment to a notebook page.
759 Parameters
760 ----------
761 nbid : str
762 Notebook ID
763 page_tree_id : str
764 Tree ID of the target page
765 filename : str
766 Name to use for the uploaded file
767 data : bytes
768 Raw file content
769 caption : str, optional
770 Caption for the attachment
772 Returns
773 -------
774 str
775 The ``eid`` (entry ID) of the attachment entry
777 Raises
778 ------
779 LabArchivesError
780 If the API call fails
781 """
782 root = self._post(
783 "entries",
784 "add_attachment",
785 params={
786 "nbid": nbid,
787 "pid": page_tree_id,
788 "filename": filename,
789 "caption": caption,
790 },
791 data=data,
792 )
794 eid_el = root.find(".//eid")
795 if eid_el is None or not eid_el.text: # pragma: no cover
796 msg = f"add_attachment response missing eid for page {page_tree_id}"
797 raise LabArchivesError(msg)
799 return eid_el.text.strip()
801 def attach_file( # pragma: no cover
802 self,
803 nbid: str,
804 page_tree_id: str,
805 path: Path,
806 caption: str = "",
807 ) -> str:
808 """Upload a file from disk as an attachment to a notebook page.
810 Convenience wrapper around :meth:`add_attachment` that reads the file
811 and derives the filename from the path automatically.
813 Parameters
814 ----------
815 nbid : str
816 Notebook ID
817 page_tree_id : str
818 Tree ID of the target page
819 path : Path
820 Local file to upload
821 caption : str, optional
822 Caption for the attachment
824 Returns
825 -------
826 str
827 The ``eid`` (entry ID) of the attachment entry
829 Raises
830 ------
831 LabArchivesError
832 If the API call fails
833 """
834 return self.add_attachment(
835 nbid, page_tree_id, path.name, path.read_bytes(), caption
836 )
838 def get_attachment_content(self, eid: str) -> bytes: # pragma: no cover
839 """Download the raw content of an attachment entry.
841 Parameters
842 ----------
843 eid : str
844 Entry ID of the attachment (as returned by :meth:`add_attachment`)
846 Returns
847 -------
848 bytes
849 Raw file content of the attachment
851 Raises
852 ------
853 LabArchivesNotFoundError
854 If the entry does not exist
855 LabArchivesError
856 If the API call fails
857 """
858 return self._get_raw("entries", "entry_attachment", params={"eid": eid})
860 def get_user_info(self, login: str, password: str) -> dict[str, Any]:
861 """Exchange login credentials for user info and notebook list.
863 This method is used to obtain the ``uid`` for a LabArchives account.
864 The returned ``uid`` should be stored in ``NX_LABARCHIVES_USER_ID``
865 for subsequent API calls.
867 Parameters
868 ----------
869 login : str
870 LabArchives login (email address)
871 password : str
872 LabArchives account password
874 Returns
875 -------
876 dict
877 User info including ``uid``, ``email``, and ``notebooks`` list.
879 Raises
880 ------
881 LabArchivesAuthenticationError
882 If login credentials are invalid
883 LabArchivesError
884 If the API call fails
885 """
886 # Response root element IS <users> — _parse_response returns it directly.
887 # Structure:
888 # <users>
889 # <id>…</id>
890 # <fullname>…</fullname>
891 # <email>…</email>
892 # <notebooks type="array">
893 # <notebook><id>…</id><name>…</name></notebook>
894 # </notebooks>
895 # </users>
896 root = self._get(
897 "users",
898 "user_access_info",
899 params={"login_or_email": login, "password": password},
900 )
902 # The <users> root may be a direct child of a <response> wrapper in
903 # tests; use a helper that finds it wherever it lives.
904 users_el = root if root.tag == "users" else root.find(".//users")
905 if users_el is None: # pragma: no cover
906 users_el = root
908 result: dict[str, Any] = {}
910 id_el = users_el.find("id")
911 if id_el is not None and id_el.text:
912 result["uid"] = id_el.text.strip()
914 for tag in ("fullname", "first-name", "last-name", "email"):
915 el = users_el.find(tag)
916 if el is not None and el.text:
917 result[tag] = el.text.strip()
919 notebooks = []
920 for nb in users_el.findall("notebooks/notebook"):
921 nb_id_el = nb.find("id")
922 nb_name_el = nb.find("name")
923 nb_data: dict[str, str] = {}
924 if nb_id_el is not None and nb_id_el.text:
925 nb_data["id"] = nb_id_el.text.strip()
926 if nb_name_el is not None and nb_name_el.text:
927 nb_data["name"] = nb_name_el.text.strip()
928 notebooks.append(nb_data)
929 result["notebooks"] = notebooks
931 return result
933 def get_user_info_by_uid(
934 self, uid: str | None = None
935 ) -> dict[str, Any]: # pragma: no cover
936 """Fetch user info for a LabArchives account using a uid.
938 Calls the ``users/user_info_via_id`` endpoint, which requires only the
939 uid (no login password). When *uid* is omitted the client's own
940 ``uid`` is used.
942 Parameters
943 ----------
944 uid : str, optional
945 LabArchives user ID. Defaults to the client's configured uid.
947 Returns
948 -------
949 dict
950 User info including ``uid``, ``email``, and ``notebooks`` list.
952 Raises
953 ------
954 LabArchivesError
955 If the API call fails
956 """
957 # Structure mirrors user_access_info:
958 # <users>
959 # <id>…</id>
960 # <fullname>…</fullname>
961 # <email>…</email>
962 # <notebooks type="array">
963 # <notebook><id>…</id><name>…</name></notebook>
964 # </notebooks>
965 # </users>
966 lookup_uid = uid if uid is not None else self.uid
967 root = self._get(
968 "users",
969 "user_info_via_id",
970 params={"uid": lookup_uid},
971 )
973 users_el = root if root.tag == "users" else root.find(".//users")
974 if users_el is None:
975 users_el = root
977 result: dict[str, Any] = {}
979 id_el = users_el.find("id")
980 if id_el is not None and id_el.text:
981 result["uid"] = id_el.text.strip()
983 for tag in ("fullname", "first-name", "last-name", "email"):
984 el = users_el.find(tag)
985 if el is not None and el.text:
986 result[tag] = el.text.strip()
988 notebooks = []
989 for nb in users_el.findall("notebooks/notebook"):
990 nb_id_el = nb.find("id")
991 nb_name_el = nb.find("name")
992 nb_data: dict[str, str] = {}
993 if nb_id_el is not None and nb_id_el.text:
994 nb_data["id"] = nb_id_el.text.strip()
995 if nb_name_el is not None and nb_name_el.text:
996 nb_data["name"] = nb_name_el.text.strip()
997 notebooks.append(nb_data)
998 result["notebooks"] = notebooks
1000 return result
1003def get_labarchives_client() -> LabArchivesClient:
1004 """Get configured LabArchives client from settings.
1006 Creates a :class:`LabArchivesClient` using credentials from the NexusLIMS
1007 configuration (``NX_LABARCHIVES_*`` settings).
1009 Returns
1010 -------
1011 LabArchivesClient
1012 Configured client instance
1014 Raises
1015 ------
1016 LabArchivesError
1017 If required settings are not configured
1019 Examples
1020 --------
1021 >>> from nexusLIMS.utils.labarchives import get_labarchives_client
1022 >>> client = get_labarchives_client()
1023 >>> nodes = client.get_tree_level("12345", "0")
1024 >>> folder_id = client.insert_folder("12345", "0", "My Folder")
1025 >>> page_id = client.insert_page("12345", folder_id, "My Page")
1026 """
1027 if not settings.NX_LABARCHIVES_ACCESS_KEY_ID:
1028 msg = "NX_LABARCHIVES_ACCESS_KEY_ID not configured"
1029 raise LabArchivesError(msg)
1031 if not settings.NX_LABARCHIVES_ACCESS_PASSWORD:
1032 msg = "NX_LABARCHIVES_ACCESS_PASSWORD not configured"
1033 raise LabArchivesError(msg)
1035 if not settings.NX_LABARCHIVES_USER_ID:
1036 msg = "NX_LABARCHIVES_USER_ID not configured"
1037 raise LabArchivesError(msg)
1039 if not settings.NX_LABARCHIVES_URL:
1040 msg = "NX_LABARCHIVES_URL not configured"
1041 raise LabArchivesError(msg)
1043 return LabArchivesClient(
1044 base_url=str(settings.NX_LABARCHIVES_URL),
1045 akid=settings.NX_LABARCHIVES_ACCESS_KEY_ID,
1046 password=settings.NX_LABARCHIVES_ACCESS_PASSWORD,
1047 uid=settings.NX_LABARCHIVES_USER_ID,
1048 )