Source code for nexusLIMS.utils.cdcs

"""CDCS interaction utilities for NexusLIMS.

This module provides functions for querying, downloading, and deleting records
from a CDCS instance. These are non-export operations used primarily for
testing and maintenance.

For exporting records to CDCS, use the CDCSDestination plugin in
nexusLIMS.exporters.destinations.cdcs instead.
"""

import logging
from http import HTTPStatus
from pathlib import Path
from typing import Any, Dict, List
from urllib.parse import urljoin

from tqdm import tqdm

from nexusLIMS.config import settings
from nexusLIMS.utils.network import nexus_req

_logger = logging.getLogger(__name__)


[docs] class AuthenticationError(Exception): """Class for showing an exception having to do with authentication.""" def __init__(self, message): self.message = message
[docs] class CDCSDataRecord(Dict[str, Any]): """Type definition for a CDCS Data record returned by the API. This represents the structure of record objects returned by CDCS endpoints like /rest/data/query/ and /rest/data/query/keyword/. Attributes ---------- id : int The record ID template : int The template ID workspace : int | None The workspace ID user_id : str The user ID that created the record title : str The record title checksum : str | None The record checksum creation_date : str | None The record creation date last_modification_date : str | None The last modification date last_change_date : str | None The last change date xml_content : str The XML content of the record """
[docs] def get_cdcs_url() -> str: """Return the URL to the NexusLIMS CDCS instance from environment. Returns ------- str The URL of the NexusLIMS CDCS instance to use Raises ------ ValueError If the NX_CDCS_URL setting is not defined """ # NX_CDCS_URL is required, so validation ensures it exists # Convert AnyHttpUrl to string return str(settings.NX_CDCS_URL)
[docs] def get_workspace_id() -> int: """Get the workspace ID that the user has access to. This should be the Global Public Workspace in the current NexusLIMS CDCS implementation. Returns ------- int The workspace ID Raises ------ AuthenticationError If authentication to CDCS fails """ # assuming there's only one workspace for this user (that is the public # workspace) endpoint = urljoin(get_cdcs_url(), "rest/workspace/read_access") r = nexus_req(endpoint, "GET", token_auth=settings.NX_CDCS_TOKEN) if r.status_code in (HTTPStatus.UNAUTHORIZED, HTTPStatus.FORBIDDEN): msg = ( "Could not authenticate to CDCS. Is the NX_CDCS_TOKEN " "environment variable set correctly?" ) raise AuthenticationError(msg) return r.json()[0]["id"] # return workspace id
[docs] def get_template_id() -> str: """Get the template ID for the schema. Returns the template ID so records can be associated with the correct schema. Returns ------- str The template ID Raises ------ AuthenticationError If authentication to CDCS fails """ # get the current template (XSD) id value: endpoint = urljoin(get_cdcs_url(), "rest/template-version-manager/global") r = nexus_req(endpoint, "GET", token_auth=settings.NX_CDCS_TOKEN) if r.status_code in (HTTPStatus.UNAUTHORIZED, HTTPStatus.FORBIDDEN): msg = ( "Could not authenticate to CDCS. Is the NX_CDCS_TOKEN " "environment variable set correctly?" ) raise AuthenticationError(msg) return r.json()[0]["current"] # return template id
[docs] def delete_record(record_id: str): """Delete a Data record from the NexusLIMS CDCS instance via REST API. Parameters ---------- record_id The id value (on the CDCS server) of the record to be deleted Returns ------- requests.Response The REST response returned from the CDCS instance after attempting the delete operation """ endpoint = urljoin(get_cdcs_url(), f"rest/data/{record_id}") response = nexus_req(endpoint, "DELETE", token_auth=settings.NX_CDCS_TOKEN) if response.status_code != HTTPStatus.NO_CONTENT: # anything other than 204 status means something went wrong _logger.error("Received error while deleting %s:\n%s", record_id, response.text) return response
[docs] def search_records( title: str | None = None, template_id: str | None = None, keyword: str | None = None, ) -> list[CDCSDataRecord]: """Search for records in the CDCS instance by title, keyword, or criteria. This function uses the CDCS query endpoint to search for records. If no parameters are provided, all records are returned. Note ---- If ``keyword`` is provided, it takes precedence and the ``title`` parameter is ignored. The keyword search uses a different CDCS endpoint (``/rest/data/query/keyword/``) that performs full-text search but does not support title filtering. In this mode, only ``template_id`` can be combined with ``keyword`` to filter results. Parameters ---------- title The title to search for (exact match). Only used when ``keyword`` is None. template_id The template ID to filter by. Can be combined with either ``title`` or ``keyword``. keyword Keyword(s) for full-text search across record content. When provided, takes precedence over ``title`` parameter. Returns ------- list[CDCSDataRecord] List of matching record objects from CDCS. Each record is a dictionary containing id, title, xml_content, template, workspace, user_id, checksum, and date fields. See :class:`CDCSDataRecord` for complete structure. Raises ------ AuthenticationError If authentication fails ValueError If keyword parameter is empty or search parameters are invalid """ if keyword is not None and not keyword.strip(): msg = "Keyword parameter cannot be empty" raise ValueError(msg) # Use keyword search endpoint if keyword is provided if keyword is not None: endpoint = urljoin(get_cdcs_url(), "rest/data/query/keyword/") payload = { "query": keyword, "all": "true", # Return all results (not paginated) } if template_id is not None: payload["templates"] = [{"id": template_id}] else: endpoint = urljoin(get_cdcs_url(), "rest/data/query/") # Build query payload # The query endpoint expects a POST with JSON body payload = { "query": {}, # Empty query matches all records "all": "true", # Return all results (not paginated) } if title is not None: payload["title"] = title if template_id is not None: payload["templates"] = [{"id": template_id}] response = nexus_req( endpoint, "POST", json=payload, token_auth=settings.NX_CDCS_TOKEN ) if response.status_code == HTTPStatus.UNAUTHORIZED: msg = ( "Could not authenticate to CDCS. Is the NX_CDCS_TOKEN " "environment variable set correctly?" ) raise AuthenticationError(msg) if response.status_code == HTTPStatus.BAD_REQUEST: _logger.error("Bad request while searching records:\n%s", response.text) msg = f"Invalid search parameters: {response.text}" raise ValueError(msg) if response.status_code != HTTPStatus.OK: _logger.error("Got error while searching records:\n%s", response.text) return [] return response.json()
[docs] def download_record(record_id: str) -> str: """Download the XML content of a record from the CDCS instance. Parameters ---------- record_id The id value (on the CDCS server) of the record to download Returns ------- str The XML content of the record Raises ------ AuthenticationError If authentication fails ValueError If the record is not found or another error occurs """ endpoint = urljoin(get_cdcs_url(), f"rest/data/download/{record_id}/") response = nexus_req(endpoint, "GET", token_auth=settings.NX_CDCS_TOKEN) if response.status_code == HTTPStatus.UNAUTHORIZED: msg = ( "Could not authenticate to CDCS. Is the NX_CDCS_TOKEN " "environment variable set correctly?" ) raise AuthenticationError(msg) if response.status_code == HTTPStatus.NOT_FOUND: msg = f"Record with id {record_id} not found" raise ValueError(msg) if response.status_code != HTTPStatus.OK: _logger.error("Got error while downloading %s:\n%s", record_id, response.text) msg = f"Failed to download record {record_id}: {response.text}" raise ValueError(msg) return response.text
[docs] def upload_record_content(xml_content: str, title: str) -> tuple[Any, int | None]: """Upload a single XML record to the NexusLIMS CDCS instance. Note ---- This is a low-level utility function primarily used for testing. For production record uploads, use the CDCSDestination exporter plugin in nexusLIMS.exporters.destinations.cdcs instead. Parameters ---------- xml_content The actual content of an XML record (rather than a file) title The title to give to the record in CDCS Returns ------- tuple[requests.Response, int | None] A tuple of (response, record_id). The response is the REST response returned from the CDCS instance after attempting the upload. The record_id is the id (on the server) of the record that was uploaded, or None if there was an error. """ endpoint = urljoin(get_cdcs_url(), "rest/data/") payload = { "template": get_template_id(), "title": title, "xml_content": xml_content, } post_r = nexus_req( endpoint, "POST", json=payload, token_auth=settings.NX_CDCS_TOKEN ) if post_r.status_code != HTTPStatus.CREATED: # anything other than 201 status means something went wrong _logger.error("Got error while uploading %s:\n%s", title, post_r.text) return post_r, None # assign this record to the public workspace record_id = post_r.json()["id"] record_url = urljoin(get_cdcs_url(), f"data?id={record_id}") wrk_endpoint = urljoin( get_cdcs_url(), f"rest/data/{record_id}/assign/{get_workspace_id()}", ) _ = nexus_req(wrk_endpoint, "PATCH", token_auth=settings.NX_CDCS_TOKEN) _logger.info('Record "%s" available at %s', title, record_url) return post_r, record_id
[docs] def upload_record_files( files_to_upload: List[Path] | None, *, progress: bool = False, ) -> tuple[List[Path], List[int]]: """Upload record files to CDCS. Upload a list of .xml files (or all .xml files in the current directory) to the NexusLIMS CDCS instance using :py:meth:`upload_record_content`. Note ---- This is a utility function primarily used for testing and manual uploads. For production record uploads, use the CDCSDestination exporter plugin in nexusLIMS.exporters.destinations.cdcs instead. Parameters ---------- files_to_upload: List[pathlib.Path] | None The list of .xml files to upload. If ``None``, all .xml files in the current directory will be used instead. progress Whether to show a progress bar for uploading Returns ------- tuple[list[pathlib.Path], list[int]] A tuple of (files_uploaded, record_ids). files_uploaded is a list of the files that were successfully uploaded. record_ids is a list of the record id values (on the server) that were uploaded. Raises ------ ValueError If no .xml files are found """ if files_to_upload is None: _logger.info("Using all .xml files in this directory") files_to_upload = list(Path().glob("*.xml")) else: _logger.info("Using .xml files from command line") _logger.info("Found %s files to upload\n", len(files_to_upload)) if len(files_to_upload) == 0: msg = ( "No .xml files were found (please specify on the " "command line, or run this script from a directory " "containing one or more .xml files" ) _logger.error(msg) raise ValueError(msg) files_uploaded = [] record_ids = [] for f in tqdm(files_to_upload) if progress else files_to_upload: f_path = Path(f) with f_path.open(encoding="utf-8") as xml_file: xml_content = xml_file.read() title = f_path.stem response, record_id = upload_record_content(xml_content, title) if response.status_code != HTTPStatus.CREATED: _logger.warning("Could not upload %s", f_path.name) continue files_uploaded.append(f_path) record_ids.append(record_id) _logger.info( "Successfully uploaded %i of %i files", len(files_uploaded), len(files_to_upload), ) return files_uploaded, record_ids