Coverage for nexusLIMS/exporters/destinations/cdcs.py: 100%
69 statements
« prev ^ index » next coverage.py v7.11.3, created at 2026-03-24 05:23 +0000
« prev ^ index » next coverage.py v7.11.3, created at 2026-03-24 05:23 +0000
1"""CDCS export destination plugin.
3Exports NexusLIMS XML records to a CDCS (Configurable Data Curation System)
4instance using the CDCS REST API.
5"""
7from __future__ import annotations
9import logging
10from http import HTTPStatus
11from urllib.parse import urljoin
13from nexusLIMS.config import settings
14from nexusLIMS.exporters.base import ExportContext, ExportResult
15from nexusLIMS.utils.cdcs import AuthenticationError
16from nexusLIMS.utils.network import nexus_req
18_logger = logging.getLogger(__name__)
21class CDCSDestination:
22 """CDCS export destination plugin.
24 Uploads NexusLIMS XML records to a CDCS instance and assigns them
25 to the configured workspace.
27 Attributes
28 ----------
29 name : str
30 Destination identifier: "cdcs"
31 priority : int
32 Export priority: 100 (high priority, runs first)
33 """
35 name = "cdcs"
36 priority = 100
38 @property
39 def enabled(self) -> bool:
40 """Check if CDCS is configured and enabled.
42 Returns
43 -------
44 bool
45 True if both NX_CDCS_TOKEN and NX_CDCS_URL are configured
46 """
47 return (
48 hasattr(settings, "NX_CDCS_TOKEN")
49 and hasattr(settings, "NX_CDCS_URL")
50 and settings.NX_CDCS_TOKEN is not None
51 and settings.NX_CDCS_URL is not None
52 )
54 def validate_config(self) -> tuple[bool, str | None]: # noqa: PLR0911
55 """Validate CDCS configuration.
57 Tests:
58 - NX_CDCS_TOKEN is configured
59 - NX_CDCS_URL is configured
60 - Can authenticate to CDCS API
62 Returns
63 -------
64 tuple[bool, str | None]
65 (is_valid, error_message)
66 """
67 if not hasattr(settings, "NX_CDCS_TOKEN"):
68 return False, "NX_CDCS_TOKEN not configured"
69 if not settings.NX_CDCS_TOKEN:
70 return False, "NX_CDCS_TOKEN is empty"
71 if not hasattr(settings, "NX_CDCS_URL"):
72 return False, "NX_CDCS_URL not configured"
73 if not settings.NX_CDCS_URL:
74 return False, "NX_CDCS_URL is empty"
76 # Test authentication by getting workspace ID
77 try:
78 self._get_workspace_id()
79 except AuthenticationError as e:
80 return False, f"CDCS authentication failed: {e}"
81 except Exception as e:
82 return False, f"CDCS configuration error: {e}"
84 return True, None
86 def export(self, context: ExportContext) -> ExportResult:
87 """Export record to CDCS.
89 Reads the XML file, uploads it to CDCS, and assigns it to the
90 configured workspace. Never raises exceptions - all errors are
91 caught and returned as ExportResult with success=False.
93 Parameters
94 ----------
95 context
96 Export context with file path and session metadata
98 Returns
99 -------
100 ExportResult
101 Result of the export attempt
102 """
103 try:
104 # Read XML content
105 with context.xml_file_path.open(encoding="utf-8") as f:
106 xml_content = f.read()
108 # Upload to CDCS
109 title = context.xml_file_path.stem
110 record_id, record_url = self._upload_to_cdcs(xml_content, title)
112 return ExportResult(
113 success=True,
114 destination_name=self.name,
115 record_id=str(record_id),
116 record_url=record_url,
117 )
119 except Exception as e:
120 _logger.exception(
121 "Failed to export to CDCS: %s",
122 context.xml_file_path.name,
123 )
124 return ExportResult(
125 success=False,
126 destination_name=self.name,
127 error_message=str(e),
128 )
130 def _upload_to_cdcs(self, xml_content: str, title: str) -> tuple[int, str]:
131 """Upload XML to CDCS and return (record_id, record_url).
133 Parameters
134 ----------
135 xml_content
136 XML content to upload
137 title
138 Title for the record
140 Returns
141 -------
142 tuple[int, str]
143 (record_id, record_url)
145 Raises
146 ------
147 RuntimeError
148 If upload fails
149 """
150 endpoint = urljoin(str(settings.NX_CDCS_URL), "rest/data/")
152 payload = {
153 "template": self._get_template_id(),
154 "title": title,
155 "xml_content": xml_content,
156 }
158 post_r = nexus_req(
159 endpoint, "POST", json=payload, token_auth=settings.NX_CDCS_TOKEN
160 )
162 if post_r.status_code != HTTPStatus.CREATED:
163 msg = f"CDCS upload failed: {post_r.text}"
164 raise RuntimeError(msg)
166 record_id = post_r.json()["id"]
168 # Assign to workspace
169 wrk_endpoint = urljoin(
170 str(settings.NX_CDCS_URL),
171 f"rest/data/{record_id}/assign/{self._get_workspace_id()}",
172 )
173 _ = nexus_req(wrk_endpoint, "PATCH", token_auth=settings.NX_CDCS_TOKEN)
175 record_url = urljoin(str(settings.NX_CDCS_URL), f"data?id={record_id}")
176 _logger.info('Record "%s" available at %s', title, record_url)
178 return record_id, record_url
180 def _get_template_id(self) -> str:
181 """Get current template ID from CDCS.
183 Returns
184 -------
185 str
186 Template ID
188 Raises
189 ------
190 AuthenticationError
191 If authentication fails
192 """
193 endpoint = urljoin(
194 str(settings.NX_CDCS_URL), "rest/template-version-manager/global/"
195 )
196 r = nexus_req(endpoint, "GET", token_auth=settings.NX_CDCS_TOKEN)
198 if r.status_code in (HTTPStatus.UNAUTHORIZED, HTTPStatus.FORBIDDEN):
199 msg = "Could not authenticate to CDCS"
200 raise AuthenticationError(msg)
202 return r.json()[0]["current"]
204 def _get_workspace_id(self) -> int | None:
205 """Get workspace ID from CDCS.
207 Returns
208 -------
209 int or None
210 Workspace ID, or ``None`` if no workspaces are available (which
211 still means authentication succeeded).
213 Raises
214 ------
215 AuthenticationError
216 If authentication fails
217 """
218 endpoint = urljoin(str(settings.NX_CDCS_URL), "rest/workspace/read_access/")
219 r = nexus_req(endpoint, "GET", token_auth=settings.NX_CDCS_TOKEN)
221 if r.status_code in (HTTPStatus.UNAUTHORIZED, HTTPStatus.FORBIDDEN):
222 msg = "Could not authenticate to CDCS"
223 raise AuthenticationError(msg)
225 workspaces = r.json()
226 return workspaces[0]["id"] if workspaces else None