Coverage for nexusLIMS/utils/cdcs.py: 100%

114 statements  

« prev     ^ index     » next       coverage.py v7.11.3, created at 2026-03-24 05:23 +0000

1"""CDCS interaction utilities for NexusLIMS. 

2 

3This module provides functions for querying, downloading, and deleting records 

4from a CDCS instance. These are non-export operations used primarily for 

5testing and maintenance. 

6 

7For exporting records to CDCS, use the CDCSDestination plugin in 

8nexusLIMS.exporters.destinations.cdcs instead. 

9""" 

10 

11import logging 

12from http import HTTPStatus 

13from pathlib import Path 

14from typing import Any, Dict, List 

15from urllib.parse import urljoin 

16 

17from tqdm import tqdm 

18 

19from nexusLIMS.config import settings 

20from nexusLIMS.utils.network import nexus_req 

21 

22_logger = logging.getLogger(__name__) 

23 

24 

25class AuthenticationError(Exception): 

26 """Class for showing an exception having to do with authentication.""" 

27 

28 def __init__(self, message): 

29 self.message = message 

30 

31 

32class CDCSDataRecord(Dict[str, Any]): 

33 """Type definition for a CDCS Data record returned by the API. 

34 

35 This represents the structure of record objects returned by CDCS endpoints 

36 like /rest/data/query/ and /rest/data/query/keyword/. 

37 

38 Attributes 

39 ---------- 

40 id : int 

41 The record ID 

42 template : int 

43 The template ID 

44 workspace : int | None 

45 The workspace ID 

46 user_id : str 

47 The user ID that created the record 

48 title : str 

49 The record title 

50 checksum : str | None 

51 The record checksum 

52 creation_date : str | None 

53 The record creation date 

54 last_modification_date : str | None 

55 The last modification date 

56 last_change_date : str | None 

57 The last change date 

58 xml_content : str 

59 The XML content of the record 

60 """ 

61 

62 

63def get_cdcs_url() -> str: 

64 """Return the URL to the NexusLIMS CDCS instance from environment. 

65 

66 Returns 

67 ------- 

68 str 

69 The URL of the NexusLIMS CDCS instance to use 

70 

71 Raises 

72 ------ 

73 ValueError 

74 If the NX_CDCS_URL setting is not defined 

75 """ 

76 # NX_CDCS_URL is required, so validation ensures it exists 

77 # Convert AnyHttpUrl to string 

78 return str(settings.NX_CDCS_URL) 

79 

80 

81def get_workspace_id() -> int: 

82 """Get the workspace ID that the user has access to. 

83 

84 This should be the Global Public Workspace in the current NexusLIMS CDCS 

85 implementation. 

86 

87 Returns 

88 ------- 

89 int 

90 The workspace ID 

91 

92 Raises 

93 ------ 

94 AuthenticationError 

95 If authentication to CDCS fails 

96 """ 

97 # assuming there's only one workspace for this user (that is the public 

98 # workspace) 

99 endpoint = urljoin(get_cdcs_url(), "rest/workspace/read_access/") 

100 r = nexus_req(endpoint, "GET", token_auth=settings.NX_CDCS_TOKEN) 

101 if r.status_code in (HTTPStatus.UNAUTHORIZED, HTTPStatus.FORBIDDEN): 

102 msg = ( 

103 "Could not authenticate to CDCS. Is the NX_CDCS_TOKEN " 

104 "environment variable set correctly?" 

105 ) 

106 raise AuthenticationError(msg) 

107 

108 return r.json()[0]["id"] # return workspace id 

109 

110 

111def get_template_id() -> str: 

112 """Get the template ID for the schema. 

113 

114 Returns the template ID so records can be associated with the correct schema. 

115 

116 Returns 

117 ------- 

118 str 

119 The template ID 

120 

121 Raises 

122 ------ 

123 AuthenticationError 

124 If authentication to CDCS fails 

125 """ 

126 # get the current template (XSD) id value: 

127 endpoint = urljoin(get_cdcs_url(), "rest/template-version-manager/global/") 

128 r = nexus_req(endpoint, "GET", token_auth=settings.NX_CDCS_TOKEN) 

129 if r.status_code in (HTTPStatus.UNAUTHORIZED, HTTPStatus.FORBIDDEN): 

130 msg = ( 

131 "Could not authenticate to CDCS. Is the NX_CDCS_TOKEN " 

132 "environment variable set correctly?" 

133 ) 

134 raise AuthenticationError(msg) 

135 

136 return r.json()[0]["current"] # return template id 

137 

138 

139def delete_record(record_id: str): 

140 """Delete a Data record from the NexusLIMS CDCS instance via REST API. 

141 

142 Parameters 

143 ---------- 

144 record_id 

145 The id value (on the CDCS server) of the record to be deleted 

146 

147 Returns 

148 ------- 

149 requests.Response 

150 The REST response returned from the CDCS instance after attempting 

151 the delete operation 

152 """ 

153 endpoint = urljoin(get_cdcs_url(), f"rest/data/{record_id}/") 

154 response = nexus_req(endpoint, "DELETE", token_auth=settings.NX_CDCS_TOKEN) 

155 if response.status_code != HTTPStatus.NO_CONTENT: 

156 # anything other than 204 status means something went wrong 

157 _logger.error("Received error while deleting %s:\n%s", record_id, response.text) 

158 return response 

159 

160 

161def search_records( 

162 title: str | None = None, 

163 template_id: str | None = None, 

164 keyword: str | None = None, 

165) -> list[CDCSDataRecord]: 

166 """Search for records in the CDCS instance by title, keyword, or criteria. 

167 

168 This function uses the CDCS query endpoint to search for records. 

169 If no parameters are provided, all records are returned. 

170 

171 Note 

172 ---- 

173 If ``keyword`` is provided, it takes precedence and the ``title`` parameter 

174 is ignored. The keyword search uses a different CDCS endpoint 

175 (``/rest/data/query/keyword/``) that performs full-text search but does not 

176 support title filtering. In this mode, only ``template_id`` can be combined 

177 with ``keyword`` to filter results. 

178 

179 Parameters 

180 ---------- 

181 title 

182 The title to search for (exact match). Only used when ``keyword`` is None. 

183 template_id 

184 The template ID to filter by. Can be combined with either ``title`` or 

185 ``keyword``. 

186 keyword 

187 Keyword(s) for full-text search across record content. When provided, 

188 takes precedence over ``title`` parameter. 

189 

190 Returns 

191 ------- 

192 list[CDCSDataRecord] 

193 List of matching record objects from CDCS. Each record is a dictionary 

194 containing id, title, xml_content, template, workspace, user_id, checksum, 

195 and date fields. See :class:`CDCSDataRecord` for complete structure. 

196 

197 Raises 

198 ------ 

199 AuthenticationError 

200 If authentication fails 

201 ValueError 

202 If keyword parameter is empty or search parameters are invalid 

203 """ 

204 if keyword is not None and not keyword.strip(): 

205 msg = "Keyword parameter cannot be empty" 

206 raise ValueError(msg) 

207 

208 # Use keyword search endpoint if keyword is provided 

209 if keyword is not None: 

210 endpoint = urljoin(get_cdcs_url(), "rest/data/query/keyword/") 

211 payload = { 

212 "query": keyword, 

213 "all": "true", # Return all results (not paginated) 

214 } 

215 if template_id is not None: 

216 payload["templates"] = [{"id": template_id}] 

217 else: 

218 endpoint = urljoin(get_cdcs_url(), "rest/data/query/") 

219 # Build query payload 

220 # The query endpoint expects a POST with JSON body 

221 payload = { 

222 "query": {}, # Empty query matches all records 

223 "all": "true", # Return all results (not paginated) 

224 } 

225 if title is not None: 

226 payload["title"] = title 

227 if template_id is not None: 

228 payload["templates"] = [{"id": template_id}] 

229 

230 response = nexus_req( 

231 endpoint, "POST", json=payload, token_auth=settings.NX_CDCS_TOKEN 

232 ) 

233 

234 if response.status_code == HTTPStatus.UNAUTHORIZED: 

235 msg = ( 

236 "Could not authenticate to CDCS. Is the NX_CDCS_TOKEN " 

237 "environment variable set correctly?" 

238 ) 

239 raise AuthenticationError(msg) 

240 

241 if response.status_code == HTTPStatus.BAD_REQUEST: 

242 _logger.error("Bad request while searching records:\n%s", response.text) 

243 msg = f"Invalid search parameters: {response.text}" 

244 raise ValueError(msg) 

245 

246 if response.status_code != HTTPStatus.OK: 

247 _logger.error("Got error while searching records:\n%s", response.text) 

248 return [] 

249 

250 return response.json() 

251 

252 

253def download_record(record_id: str) -> str: 

254 """Download the XML content of a record from the CDCS instance. 

255 

256 Parameters 

257 ---------- 

258 record_id 

259 The id value (on the CDCS server) of the record to download 

260 

261 Returns 

262 ------- 

263 str 

264 The XML content of the record 

265 

266 Raises 

267 ------ 

268 AuthenticationError 

269 If authentication fails 

270 ValueError 

271 If the record is not found or another error occurs 

272 """ 

273 endpoint = urljoin(get_cdcs_url(), f"rest/data/download/{record_id}/") 

274 response = nexus_req(endpoint, "GET", token_auth=settings.NX_CDCS_TOKEN) 

275 

276 if response.status_code == HTTPStatus.UNAUTHORIZED: 

277 msg = ( 

278 "Could not authenticate to CDCS. Is the NX_CDCS_TOKEN " 

279 "environment variable set correctly?" 

280 ) 

281 raise AuthenticationError(msg) 

282 

283 if response.status_code == HTTPStatus.NOT_FOUND: 

284 msg = f"Record with id {record_id} not found" 

285 raise ValueError(msg) 

286 

287 if response.status_code != HTTPStatus.OK: 

288 _logger.error("Got error while downloading %s:\n%s", record_id, response.text) 

289 msg = f"Failed to download record {record_id}: {response.text}" 

290 raise ValueError(msg) 

291 

292 return response.text 

293 

294 

295def upload_record_content(xml_content: str, title: str) -> tuple[Any, int | None]: 

296 """Upload a single XML record to the NexusLIMS CDCS instance. 

297 

298 Note 

299 ---- 

300 This is a low-level utility function primarily used for testing. 

301 For production record uploads, use the CDCSDestination exporter plugin 

302 in nexusLIMS.exporters.destinations.cdcs instead. 

303 

304 Parameters 

305 ---------- 

306 xml_content 

307 The actual content of an XML record (rather than a file) 

308 title 

309 The title to give to the record in CDCS 

310 

311 Returns 

312 ------- 

313 tuple[requests.Response, int | None] 

314 A tuple of (response, record_id). The response is the REST response 

315 returned from the CDCS instance after attempting the upload. 

316 The record_id is the id (on the server) of the record that was uploaded, 

317 or None if there was an error. 

318 """ 

319 endpoint = urljoin(get_cdcs_url(), "rest/data/") 

320 

321 payload = { 

322 "template": get_template_id(), 

323 "title": title, 

324 "xml_content": xml_content, 

325 } 

326 

327 post_r = nexus_req( 

328 endpoint, "POST", json=payload, token_auth=settings.NX_CDCS_TOKEN 

329 ) 

330 

331 if post_r.status_code != HTTPStatus.CREATED: 

332 # anything other than 201 status means something went wrong 

333 _logger.error("Got error while uploading %s:\n%s", title, post_r.text) 

334 return post_r, None 

335 

336 # assign this record to the public workspace 

337 record_id = post_r.json()["id"] 

338 record_url = urljoin(get_cdcs_url(), f"data?id={record_id}") 

339 wrk_endpoint = urljoin( 

340 get_cdcs_url(), 

341 f"rest/data/{record_id}/assign/{get_workspace_id()}", 

342 ) 

343 

344 _ = nexus_req(wrk_endpoint, "PATCH", token_auth=settings.NX_CDCS_TOKEN) 

345 

346 _logger.info('Record "%s" available at %s', title, record_url) 

347 return post_r, record_id 

348 

349 

350def upload_record_files( 

351 files_to_upload: List[Path] | None, 

352 *, 

353 progress: bool = False, 

354) -> tuple[List[Path], List[int]]: 

355 """Upload record files to CDCS. 

356 

357 Upload a list of .xml files (or all .xml files in the current directory) 

358 to the NexusLIMS CDCS instance using :py:meth:`upload_record_content`. 

359 

360 Note 

361 ---- 

362 This is a utility function primarily used for testing and manual uploads. 

363 For production record uploads, use the CDCSDestination exporter plugin 

364 in nexusLIMS.exporters.destinations.cdcs instead. 

365 

366 Parameters 

367 ---------- 

368 files_to_upload: List[pathlib.Path] | None 

369 The list of .xml files to upload. If ``None``, all .xml files in the 

370 current directory will be used instead. 

371 progress 

372 Whether to show a progress bar for uploading 

373 

374 Returns 

375 ------- 

376 tuple[list[pathlib.Path], list[int]] 

377 A tuple of (files_uploaded, record_ids). files_uploaded is a list of 

378 the files that were successfully uploaded. record_ids is a list of the 

379 record id values (on the server) that were uploaded. 

380 

381 Raises 

382 ------ 

383 ValueError 

384 If no .xml files are found 

385 """ 

386 if files_to_upload is None: 

387 _logger.info("Using all .xml files in this directory") 

388 files_to_upload = list(Path().glob("*.xml")) 

389 else: 

390 _logger.info("Using .xml files from command line") 

391 

392 _logger.info("Found %s files to upload\n", len(files_to_upload)) 

393 if len(files_to_upload) == 0: 

394 msg = ( 

395 "No .xml files were found (please specify on the " 

396 "command line, or run this script from a directory " 

397 "containing one or more .xml files" 

398 ) 

399 _logger.error(msg) 

400 raise ValueError(msg) 

401 

402 files_uploaded = [] 

403 record_ids = [] 

404 

405 for f in tqdm(files_to_upload) if progress else files_to_upload: 

406 f_path = Path(f) 

407 with f_path.open(encoding="utf-8") as xml_file: 

408 xml_content = xml_file.read() 

409 

410 title = f_path.stem 

411 response, record_id = upload_record_content(xml_content, title) 

412 

413 if response.status_code != HTTPStatus.CREATED: 

414 _logger.warning("Could not upload %s", f_path.name) 

415 continue 

416 

417 files_uploaded.append(f_path) 

418 record_ids.append(record_id) 

419 

420 _logger.info( 

421 "Successfully uploaded %i of %i files", 

422 len(files_uploaded), 

423 len(files_to_upload), 

424 ) 

425 

426 return files_uploaded, record_ids