Coverage for nexusLIMS/utils/cdcs.py: 100%
114 statements
« prev ^ index » next coverage.py v7.11.3, created at 2026-03-24 05:23 +0000
« prev ^ index » next coverage.py v7.11.3, created at 2026-03-24 05:23 +0000
1"""CDCS interaction utilities for NexusLIMS.
3This module provides functions for querying, downloading, and deleting records
4from a CDCS instance. These are non-export operations used primarily for
5testing and maintenance.
7For exporting records to CDCS, use the CDCSDestination plugin in
8nexusLIMS.exporters.destinations.cdcs instead.
9"""
11import logging
12from http import HTTPStatus
13from pathlib import Path
14from typing import Any, Dict, List
15from urllib.parse import urljoin
17from tqdm import tqdm
19from nexusLIMS.config import settings
20from nexusLIMS.utils.network import nexus_req
22_logger = logging.getLogger(__name__)
25class AuthenticationError(Exception):
26 """Class for showing an exception having to do with authentication."""
28 def __init__(self, message):
29 self.message = message
32class CDCSDataRecord(Dict[str, Any]):
33 """Type definition for a CDCS Data record returned by the API.
35 This represents the structure of record objects returned by CDCS endpoints
36 like /rest/data/query/ and /rest/data/query/keyword/.
38 Attributes
39 ----------
40 id : int
41 The record ID
42 template : int
43 The template ID
44 workspace : int | None
45 The workspace ID
46 user_id : str
47 The user ID that created the record
48 title : str
49 The record title
50 checksum : str | None
51 The record checksum
52 creation_date : str | None
53 The record creation date
54 last_modification_date : str | None
55 The last modification date
56 last_change_date : str | None
57 The last change date
58 xml_content : str
59 The XML content of the record
60 """
63def get_cdcs_url() -> str:
64 """Return the URL to the NexusLIMS CDCS instance from environment.
66 Returns
67 -------
68 str
69 The URL of the NexusLIMS CDCS instance to use
71 Raises
72 ------
73 ValueError
74 If the NX_CDCS_URL setting is not defined
75 """
76 # NX_CDCS_URL is required, so validation ensures it exists
77 # Convert AnyHttpUrl to string
78 return str(settings.NX_CDCS_URL)
81def get_workspace_id() -> int:
82 """Get the workspace ID that the user has access to.
84 This should be the Global Public Workspace in the current NexusLIMS CDCS
85 implementation.
87 Returns
88 -------
89 int
90 The workspace ID
92 Raises
93 ------
94 AuthenticationError
95 If authentication to CDCS fails
96 """
97 # assuming there's only one workspace for this user (that is the public
98 # workspace)
99 endpoint = urljoin(get_cdcs_url(), "rest/workspace/read_access/")
100 r = nexus_req(endpoint, "GET", token_auth=settings.NX_CDCS_TOKEN)
101 if r.status_code in (HTTPStatus.UNAUTHORIZED, HTTPStatus.FORBIDDEN):
102 msg = (
103 "Could not authenticate to CDCS. Is the NX_CDCS_TOKEN "
104 "environment variable set correctly?"
105 )
106 raise AuthenticationError(msg)
108 return r.json()[0]["id"] # return workspace id
111def get_template_id() -> str:
112 """Get the template ID for the schema.
114 Returns the template ID so records can be associated with the correct schema.
116 Returns
117 -------
118 str
119 The template ID
121 Raises
122 ------
123 AuthenticationError
124 If authentication to CDCS fails
125 """
126 # get the current template (XSD) id value:
127 endpoint = urljoin(get_cdcs_url(), "rest/template-version-manager/global/")
128 r = nexus_req(endpoint, "GET", token_auth=settings.NX_CDCS_TOKEN)
129 if r.status_code in (HTTPStatus.UNAUTHORIZED, HTTPStatus.FORBIDDEN):
130 msg = (
131 "Could not authenticate to CDCS. Is the NX_CDCS_TOKEN "
132 "environment variable set correctly?"
133 )
134 raise AuthenticationError(msg)
136 return r.json()[0]["current"] # return template id
139def delete_record(record_id: str):
140 """Delete a Data record from the NexusLIMS CDCS instance via REST API.
142 Parameters
143 ----------
144 record_id
145 The id value (on the CDCS server) of the record to be deleted
147 Returns
148 -------
149 requests.Response
150 The REST response returned from the CDCS instance after attempting
151 the delete operation
152 """
153 endpoint = urljoin(get_cdcs_url(), f"rest/data/{record_id}/")
154 response = nexus_req(endpoint, "DELETE", token_auth=settings.NX_CDCS_TOKEN)
155 if response.status_code != HTTPStatus.NO_CONTENT:
156 # anything other than 204 status means something went wrong
157 _logger.error("Received error while deleting %s:\n%s", record_id, response.text)
158 return response
161def search_records(
162 title: str | None = None,
163 template_id: str | None = None,
164 keyword: str | None = None,
165) -> list[CDCSDataRecord]:
166 """Search for records in the CDCS instance by title, keyword, or criteria.
168 This function uses the CDCS query endpoint to search for records.
169 If no parameters are provided, all records are returned.
171 Note
172 ----
173 If ``keyword`` is provided, it takes precedence and the ``title`` parameter
174 is ignored. The keyword search uses a different CDCS endpoint
175 (``/rest/data/query/keyword/``) that performs full-text search but does not
176 support title filtering. In this mode, only ``template_id`` can be combined
177 with ``keyword`` to filter results.
179 Parameters
180 ----------
181 title
182 The title to search for (exact match). Only used when ``keyword`` is None.
183 template_id
184 The template ID to filter by. Can be combined with either ``title`` or
185 ``keyword``.
186 keyword
187 Keyword(s) for full-text search across record content. When provided,
188 takes precedence over ``title`` parameter.
190 Returns
191 -------
192 list[CDCSDataRecord]
193 List of matching record objects from CDCS. Each record is a dictionary
194 containing id, title, xml_content, template, workspace, user_id, checksum,
195 and date fields. See :class:`CDCSDataRecord` for complete structure.
197 Raises
198 ------
199 AuthenticationError
200 If authentication fails
201 ValueError
202 If keyword parameter is empty or search parameters are invalid
203 """
204 if keyword is not None and not keyword.strip():
205 msg = "Keyword parameter cannot be empty"
206 raise ValueError(msg)
208 # Use keyword search endpoint if keyword is provided
209 if keyword is not None:
210 endpoint = urljoin(get_cdcs_url(), "rest/data/query/keyword/")
211 payload = {
212 "query": keyword,
213 "all": "true", # Return all results (not paginated)
214 }
215 if template_id is not None:
216 payload["templates"] = [{"id": template_id}]
217 else:
218 endpoint = urljoin(get_cdcs_url(), "rest/data/query/")
219 # Build query payload
220 # The query endpoint expects a POST with JSON body
221 payload = {
222 "query": {}, # Empty query matches all records
223 "all": "true", # Return all results (not paginated)
224 }
225 if title is not None:
226 payload["title"] = title
227 if template_id is not None:
228 payload["templates"] = [{"id": template_id}]
230 response = nexus_req(
231 endpoint, "POST", json=payload, token_auth=settings.NX_CDCS_TOKEN
232 )
234 if response.status_code == HTTPStatus.UNAUTHORIZED:
235 msg = (
236 "Could not authenticate to CDCS. Is the NX_CDCS_TOKEN "
237 "environment variable set correctly?"
238 )
239 raise AuthenticationError(msg)
241 if response.status_code == HTTPStatus.BAD_REQUEST:
242 _logger.error("Bad request while searching records:\n%s", response.text)
243 msg = f"Invalid search parameters: {response.text}"
244 raise ValueError(msg)
246 if response.status_code != HTTPStatus.OK:
247 _logger.error("Got error while searching records:\n%s", response.text)
248 return []
250 return response.json()
253def download_record(record_id: str) -> str:
254 """Download the XML content of a record from the CDCS instance.
256 Parameters
257 ----------
258 record_id
259 The id value (on the CDCS server) of the record to download
261 Returns
262 -------
263 str
264 The XML content of the record
266 Raises
267 ------
268 AuthenticationError
269 If authentication fails
270 ValueError
271 If the record is not found or another error occurs
272 """
273 endpoint = urljoin(get_cdcs_url(), f"rest/data/download/{record_id}/")
274 response = nexus_req(endpoint, "GET", token_auth=settings.NX_CDCS_TOKEN)
276 if response.status_code == HTTPStatus.UNAUTHORIZED:
277 msg = (
278 "Could not authenticate to CDCS. Is the NX_CDCS_TOKEN "
279 "environment variable set correctly?"
280 )
281 raise AuthenticationError(msg)
283 if response.status_code == HTTPStatus.NOT_FOUND:
284 msg = f"Record with id {record_id} not found"
285 raise ValueError(msg)
287 if response.status_code != HTTPStatus.OK:
288 _logger.error("Got error while downloading %s:\n%s", record_id, response.text)
289 msg = f"Failed to download record {record_id}: {response.text}"
290 raise ValueError(msg)
292 return response.text
295def upload_record_content(xml_content: str, title: str) -> tuple[Any, int | None]:
296 """Upload a single XML record to the NexusLIMS CDCS instance.
298 Note
299 ----
300 This is a low-level utility function primarily used for testing.
301 For production record uploads, use the CDCSDestination exporter plugin
302 in nexusLIMS.exporters.destinations.cdcs instead.
304 Parameters
305 ----------
306 xml_content
307 The actual content of an XML record (rather than a file)
308 title
309 The title to give to the record in CDCS
311 Returns
312 -------
313 tuple[requests.Response, int | None]
314 A tuple of (response, record_id). The response is the REST response
315 returned from the CDCS instance after attempting the upload.
316 The record_id is the id (on the server) of the record that was uploaded,
317 or None if there was an error.
318 """
319 endpoint = urljoin(get_cdcs_url(), "rest/data/")
321 payload = {
322 "template": get_template_id(),
323 "title": title,
324 "xml_content": xml_content,
325 }
327 post_r = nexus_req(
328 endpoint, "POST", json=payload, token_auth=settings.NX_CDCS_TOKEN
329 )
331 if post_r.status_code != HTTPStatus.CREATED:
332 # anything other than 201 status means something went wrong
333 _logger.error("Got error while uploading %s:\n%s", title, post_r.text)
334 return post_r, None
336 # assign this record to the public workspace
337 record_id = post_r.json()["id"]
338 record_url = urljoin(get_cdcs_url(), f"data?id={record_id}")
339 wrk_endpoint = urljoin(
340 get_cdcs_url(),
341 f"rest/data/{record_id}/assign/{get_workspace_id()}",
342 )
344 _ = nexus_req(wrk_endpoint, "PATCH", token_auth=settings.NX_CDCS_TOKEN)
346 _logger.info('Record "%s" available at %s', title, record_url)
347 return post_r, record_id
350def upload_record_files(
351 files_to_upload: List[Path] | None,
352 *,
353 progress: bool = False,
354) -> tuple[List[Path], List[int]]:
355 """Upload record files to CDCS.
357 Upload a list of .xml files (or all .xml files in the current directory)
358 to the NexusLIMS CDCS instance using :py:meth:`upload_record_content`.
360 Note
361 ----
362 This is a utility function primarily used for testing and manual uploads.
363 For production record uploads, use the CDCSDestination exporter plugin
364 in nexusLIMS.exporters.destinations.cdcs instead.
366 Parameters
367 ----------
368 files_to_upload: List[pathlib.Path] | None
369 The list of .xml files to upload. If ``None``, all .xml files in the
370 current directory will be used instead.
371 progress
372 Whether to show a progress bar for uploading
374 Returns
375 -------
376 tuple[list[pathlib.Path], list[int]]
377 A tuple of (files_uploaded, record_ids). files_uploaded is a list of
378 the files that were successfully uploaded. record_ids is a list of the
379 record id values (on the server) that were uploaded.
381 Raises
382 ------
383 ValueError
384 If no .xml files are found
385 """
386 if files_to_upload is None:
387 _logger.info("Using all .xml files in this directory")
388 files_to_upload = list(Path().glob("*.xml"))
389 else:
390 _logger.info("Using .xml files from command line")
392 _logger.info("Found %s files to upload\n", len(files_to_upload))
393 if len(files_to_upload) == 0:
394 msg = (
395 "No .xml files were found (please specify on the "
396 "command line, or run this script from a directory "
397 "containing one or more .xml files"
398 )
399 _logger.error(msg)
400 raise ValueError(msg)
402 files_uploaded = []
403 record_ids = []
405 for f in tqdm(files_to_upload) if progress else files_to_upload:
406 f_path = Path(f)
407 with f_path.open(encoding="utf-8") as xml_file:
408 xml_content = xml_file.read()
410 title = f_path.stem
411 response, record_id = upload_record_content(xml_content, title)
413 if response.status_code != HTTPStatus.CREATED:
414 _logger.warning("Could not upload %s", f_path.name)
415 continue
417 files_uploaded.append(f_path)
418 record_ids.append(record_id)
420 _logger.info(
421 "Successfully uploaded %i of %i files",
422 len(files_uploaded),
423 len(files_to_upload),
424 )
426 return files_uploaded, record_ids