Source code for nexusLIMS.exporters.destinations.cdcs

"""CDCS export destination plugin.

Exports NexusLIMS XML records to a CDCS (Configurable Data Curation System)
instance using the CDCS REST API.
"""

from __future__ import annotations

import logging
from http import HTTPStatus
from urllib.parse import urljoin

from nexusLIMS.config import settings
from nexusLIMS.exporters.base import ExportContext, ExportResult
from nexusLIMS.utils.cdcs import AuthenticationError
from nexusLIMS.utils.network import nexus_req

_logger = logging.getLogger(__name__)


[docs] class CDCSDestination: """CDCS export destination plugin. Uploads NexusLIMS XML records to a CDCS instance and assigns them to the configured workspace. Attributes ---------- name : str Destination identifier: "cdcs" priority : int Export priority: 100 (high priority, runs first) """ name = "cdcs" priority = 100 @property def enabled(self) -> bool: """Check if CDCS is configured and enabled. Returns ------- bool True if both NX_CDCS_TOKEN and NX_CDCS_URL are configured """ return ( hasattr(settings, "NX_CDCS_TOKEN") and hasattr(settings, "NX_CDCS_URL") and settings.NX_CDCS_TOKEN is not None and settings.NX_CDCS_URL is not None )
[docs] def validate_config(self) -> tuple[bool, str | None]: # noqa: PLR0911 """Validate CDCS configuration. Tests: - NX_CDCS_TOKEN is configured - NX_CDCS_URL is configured - Can authenticate to CDCS API Returns ------- tuple[bool, str | None] (is_valid, error_message) """ if not hasattr(settings, "NX_CDCS_TOKEN"): return False, "NX_CDCS_TOKEN not configured" if not settings.NX_CDCS_TOKEN: return False, "NX_CDCS_TOKEN is empty" if not hasattr(settings, "NX_CDCS_URL"): return False, "NX_CDCS_URL not configured" if not settings.NX_CDCS_URL: return False, "NX_CDCS_URL is empty" # Test authentication by getting workspace ID try: self._get_workspace_id() except AuthenticationError as e: return False, f"CDCS authentication failed: {e}" except Exception as e: return False, f"CDCS configuration error: {e}" return True, None
[docs] def export(self, context: ExportContext) -> ExportResult: """Export record to CDCS. Reads the XML file, uploads it to CDCS, and assigns it to the configured workspace. Never raises exceptions - all errors are caught and returned as ExportResult with success=False. Parameters ---------- context Export context with file path and session metadata Returns ------- ExportResult Result of the export attempt """ try: # Read XML content with context.xml_file_path.open(encoding="utf-8") as f: xml_content = f.read() # Upload to CDCS title = context.xml_file_path.stem record_id, record_url = self._upload_to_cdcs(xml_content, title) return ExportResult( success=True, destination_name=self.name, record_id=str(record_id), record_url=record_url, ) except Exception as e: _logger.exception( "Failed to export to CDCS: %s", context.xml_file_path.name, ) return ExportResult( success=False, destination_name=self.name, error_message=str(e), )
def _upload_to_cdcs(self, xml_content: str, title: str) -> tuple[int, str]: """Upload XML to CDCS and return (record_id, record_url). Parameters ---------- xml_content XML content to upload title Title for the record Returns ------- tuple[int, str] (record_id, record_url) Raises ------ RuntimeError If upload fails """ endpoint = urljoin(str(settings.NX_CDCS_URL), "rest/data/") payload = { "template": self._get_template_id(), "title": title, "xml_content": xml_content, } post_r = nexus_req( endpoint, "POST", json=payload, token_auth=settings.NX_CDCS_TOKEN ) if post_r.status_code != HTTPStatus.CREATED: msg = f"CDCS upload failed: {post_r.text}" raise RuntimeError(msg) record_id = post_r.json()["id"] # Assign to workspace wrk_endpoint = urljoin( str(settings.NX_CDCS_URL), f"rest/data/{record_id}/assign/{self._get_workspace_id()}", ) _ = nexus_req(wrk_endpoint, "PATCH", token_auth=settings.NX_CDCS_TOKEN) record_url = urljoin(str(settings.NX_CDCS_URL), f"data?id={record_id}") _logger.info('Record "%s" available at %s', title, record_url) return record_id, record_url def _get_template_id(self) -> str: """Get current template ID from CDCS. Returns ------- str Template ID Raises ------ AuthenticationError If authentication fails """ endpoint = urljoin( str(settings.NX_CDCS_URL), "rest/template-version-manager/global" ) r = nexus_req(endpoint, "GET", token_auth=settings.NX_CDCS_TOKEN) if r.status_code in (HTTPStatus.UNAUTHORIZED, HTTPStatus.FORBIDDEN): msg = "Could not authenticate to CDCS" raise AuthenticationError(msg) return r.json()[0]["current"] def _get_workspace_id(self) -> int | None: """Get workspace ID from CDCS. Returns ------- int or None Workspace ID, or ``None`` if no workspaces are available (which still means authentication succeeded). Raises ------ AuthenticationError If authentication fails """ endpoint = urljoin(str(settings.NX_CDCS_URL), "rest/workspace/read_access") r = nexus_req(endpoint, "GET", token_auth=settings.NX_CDCS_TOKEN) if r.status_code in (HTTPStatus.UNAUTHORIZED, HTTPStatus.FORBIDDEN): msg = "Could not authenticate to CDCS" raise AuthenticationError(msg) workspaces = r.json() return workspaces[0]["id"] if workspaces else None