Coverage for nexusLIMS/harvesters/nemo/connector.py: 100%

272 statements  

« prev     ^ index     » next       coverage.py v7.11.3, created at 2026-03-24 05:23 +0000

1"""Defines the NemoConnector class that is used to interface with the NEMO API.""" 

2 

3import logging 

4from datetime import datetime 

5from typing import Any, List, Tuple, Union 

6from urllib.parse import parse_qs, quote, urlencode, urljoin, urlparse 

7 

8from pytz import timezone as pytz_timezone 

9 

10from nexusLIMS.db.enums import EventType, RecordStatus 

11from nexusLIMS.db.models import SessionLog 

12from nexusLIMS.db.session_handler import Session 

13from nexusLIMS.instruments import ( 

14 _ensure_instrument_db_loaded, 

15 get_instr_from_api_url, 

16 instrument_db, 

17) 

18from nexusLIMS.utils.network import nexus_req 

19 

20_logger = logging.getLogger(__name__) 

21 

22 

23class NemoConnector: 

24 """ 

25 A connection to an instance of the API of the NEMO laboratory management software. 

26 

27 Provides helper methods for fetching data from the API. 

28 

29 Parameters 

30 ---------- 

31 base_url : str 

32 The "root" of the API including a trailing slash; 

33 e.g. 'https://nemo.example.com/api/' 

34 token : str 

35 An authentication token for this NEMO instance 

36 strftime_fmt : str 

37 The "date format" to use when encoding dates to send as filters to the 

38 NEMO API. Should follow the same convention as 

39 :ref:`strftime-strptime-behavior`. If ``None``, ISO 8601 format 

40 will be used. 

41 strptime_fmt : str 

42 The "date format" to use when decoding date strings received in the 

43 response from the API. Should follow the same convention as 

44 :ref:`strftime-strptime-behavior`. If ``None``, ISO 8601 format 

45 will be used. 

46 timezone : str 

47 The timezone to use when decoding date strings received in the 

48 response from the API. Should be a IANA time zone database string; e.g. 

49 "America/New_York". Useful if no timezone information is returned 

50 from an instance of the NEMO API. If ``None``, no timezone setting will 

51 be done and the code will use whatever was returned from the server 

52 as is. 

53 retries : int 

54 The number of retries that will be used for failed HTTP requests before 

55 actually failing. 

56 """ 

57 

58 tools: dict[int, dict] 

59 users: dict[int, dict] 

60 users_by_username: dict[str, dict] 

61 projects: dict[int, dict] 

62 

63 def __init__( # pylint: disable=too-many-arguments,too-many-positional-arguments # noqa: PLR0913 

64 self, 

65 base_url: str, 

66 token: str, 

67 strftime_fmt: str | None = None, 

68 strptime_fmt: str | None = None, 

69 timezone: str | None = None, 

70 retries: int = 5, 

71 ): 

72 self.config = { 

73 "base_url": base_url, 

74 "token": token, 

75 "strftime_fmt": strftime_fmt, 

76 "strptime_fmt": strptime_fmt, 

77 "timezone": timezone, 

78 "retries": retries, 

79 } 

80 

81 # these attributes are used for "memoization" of NEMO content, 

82 # so it can be remembered and used for a cache lookup 

83 # keys should be NEMO internal IDs and values should be the 

84 # dictionary returned by the API 

85 self.tools = {} 

86 self.users = {} 

87 self.users_by_username = {} 

88 self.projects = {} 

89 

90 def __repr__(self): 

91 """Return custom representation of a NemoConnector.""" 

92 return f"Connection to NEMO API at {self.config['base_url']}" 

93 

94 def __eq__(self, other): 

95 """Compare two NemoConnector instances based on their config.""" 

96 if not isinstance(other, NemoConnector): 

97 return False 

98 return self.config == other.config 

99 

100 def __hash__(self): 

101 """Return hash of NemoConnector based on its config.""" 

102 return hash(frozenset(self.config.items())) 

103 

104 def strftime(self, date_dt) -> str: 

105 """ 

106 Convert datetime to appropriate string format for this connector. 

107 

108 Using the settings for this NemoConnector, convert a datetime object 

109 to a string that will be understood by the API. If the ``strftime_fmt`` 

110 attribute for this NemoConnector is ``None``, ISO 8601 format will be 

111 used. 

112 

113 Parameters 

114 ---------- 

115 date_dt 

116 The date to be converted as a datetime object 

117 

118 Returns 

119 ------- 

120 date_str : str 

121 The date formatted as a string that will be understandable by the 

122 API for this NemoConnector 

123 """ 

124 if self.config["strftime_fmt"] is None: 

125 return date_dt.isoformat() 

126 

127 if ( 

128 "%z" in self.config["strftime_fmt"] or "%Z" in self.config["strftime_fmt"] 

129 ) and date_dt.tzinfo is None: 

130 # make sure datetime is timezone aware if timezone is 

131 # indicated in strftime_fmt. Use NEMO_tz setting if present, 

132 # otherwise use local server timezone 

133 if self.config["timezone"]: 

134 date_dt = pytz_timezone(self.config["timezone"]).localize(date_dt) 

135 else: 

136 date_dt = date_dt.astimezone() 

137 return date_dt.strftime(self.config["strftime_fmt"]) 

138 

139 def strptime(self, date_str) -> datetime: 

140 """ 

141 Convert string to datetime using this connector's API date format. 

142 

143 Using the settings for this NemoConnector, convert a datetime string 

144 representation from the API into a datetime object that can be used 

145 in Python. If the ``strptime_fmt`` attribute for this NemoConnector 

146 is ``None``, ISO 8601 format will be assumed. If a timezone is 

147 specified for this server, the resulting datetime will be coerced to 

148 that timezone. 

149 

150 Parameters 

151 ---------- 

152 date_str 

153 The date formatted as a string that is returned by the 

154 API for this NemoConnector 

155 

156 Returns 

157 ------- 

158 date_dt : ~datetime.datetime 

159 The date to be converted as a datetime object 

160 """ 

161 if self.config["strptime_fmt"] is None: 

162 date_dt = datetime.fromisoformat(date_str) 

163 else: 

164 # to be defensive here, try without microseconds as well if ".%f" 

165 # is in strptime_fmt and it fails (since sometimes NEMO doesn't 

166 # write microseconds for every time, even if it's supposed to 

167 fmt = self.config["strptime_fmt"] 

168 has_tz_directive = "%z" in fmt or "%Z" in fmt 

169 try: 

170 date_dt = datetime.strptime(date_str, fmt) # noqa: DTZ007 

171 # Only strip timezone if format doesn't include timezone directives 

172 if not has_tz_directive: 

173 date_dt = date_dt.replace(tzinfo=None) 

174 except ValueError as exception: 

175 if ".%f" in fmt: 

176 date_dt = datetime.strptime( # noqa: DTZ007 

177 date_str, 

178 fmt.replace(".%f", ""), 

179 ) 

180 # Only strip timezone if no timezone directives 

181 if not has_tz_directive: 

182 date_dt = date_dt.replace(tzinfo=None) 

183 else: 

184 raise ValueError(str(exception)) from exception # pragma: no cover 

185 

186 if self.config["timezone"]: 

187 target_tz = pytz_timezone(self.config["timezone"]) 

188 if date_dt.tzinfo is None: 

189 # Localize naive datetime to the configured timezone 

190 date_dt = target_tz.localize(date_dt) 

191 else: 

192 # If datetime already has a timezone, it is replaced with the 

193 # configured timezone. This is done by taking the naive 

194 # datetime and localizing it to the new timezone. This does 

195 # NOT preserve the moment in time but is useful for 

196 # correcting a misconfigured server. 

197 date_dt = target_tz.localize(date_dt.replace(tzinfo=None)) 

198 

199 return date_dt 

200 

201 def get_tools(self, tool_id: Union[int, List[int]]) -> List[dict]: 

202 """ 

203 Get a list of one or more tools from the NEMO API in a dictionary. 

204 

205 Parameters 

206 ---------- 

207 tool_id 

208 The tool(s) to fetch, as indexed by the NEMO instance (i.e. 

209 ``tool_id`` should be the internal primary key used by NEMO to 

210 identify the tool. If an empty list is given, all tools will be 

211 returned. 

212 

213 Returns 

214 ------- 

215 tools : List[dict] 

216 A list (could be empty) of tools that match the id (or ids) given 

217 in ``tool_id`` 

218 

219 Raises 

220 ------ 

221 requests.exceptions.HTTPError 

222 Raised if the request to the API did not go through correctly 

223 """ 

224 if hasattr(tool_id, "__iter__"): 

225 # Handle empty list case - should return all tools 

226 if len(tool_id) == 0: 

227 params = {} 

228 elif all(t_id in self.tools for t_id in tool_id): 

229 _logger.debug( 

230 "Using cached tool info for tools %s: %s", 

231 tool_id, 

232 [self.tools[t]["name"] for t in tool_id], 

233 ) 

234 return [self.tools[t_id] for t_id in tool_id] 

235 else: 

236 params = {"id__in": ",".join([str(i) for i in tool_id])} 

237 else: 

238 if tool_id in self.tools: 

239 _logger.debug( 

240 'Using cached tool info for tool %s: "%s"', 

241 tool_id, 

242 self.tools[tool_id]["name"], 

243 ) 

244 return [self.tools[tool_id]] 

245 params = {"id": tool_id} 

246 

247 tools = self._api_caller("GET", "tools/", params) 

248 

249 for tool in tools: 

250 # cache the tool results 

251 self.tools[tool["id"]] = tool 

252 

253 return tools 

254 

255 def get_users(self, user_id: Union[int, List[int]] | None = None) -> List[dict]: 

256 """ 

257 Get a list of one or more users from the NEMO API in a dictionary. 

258 

259 The results will be cached in the NemoConnector instance to prevent multiple 

260 API queries if not necessary. 

261 

262 Parameters 

263 ---------- 

264 user_id 

265 The user(s) to fetch, as indexed by the NEMO instance (i.e. 

266 ``user_id`` should be the internal primary key used by NEMO to 

267 identify the user. If an empty list or None is given, all users 

268 will be returned. 

269 

270 Returns 

271 ------- 

272 users : List[dict] 

273 A list (could be empty) of users that match the ids and/or 

274 usernames given 

275 

276 Raises 

277 ------ 

278 requests.exceptions.HTTPError 

279 Raised if the request to the API did not go through correctly 

280 """ 

281 params = {} 

282 

283 # Check if user_id is None - return all users 

284 if user_id is None: 

285 return self._get_users_helper(params) 

286 

287 # list of user ids 

288 if hasattr(user_id, "__iter__"): 

289 # Check if user_id is an empty list - if so, return all users 

290 if len(user_id) == 0: 

291 return self._get_users_helper(params) 

292 

293 params["id__in"] = ",".join([str(i) for i in user_id]) 

294 if all(u_id in self.users for u_id in user_id): 

295 _logger.debug( 

296 "Using cached user info for users with id in %s: %s", 

297 user_id, 

298 [self.users[u]["username"] for u in user_id], 

299 ) 

300 return [self.users[u_id] for u_id in user_id] 

301 # single user id 

302 else: 

303 params["id"] = user_id 

304 if user_id in self.users: 

305 _logger.debug( 

306 'Using cached user info for user id %s: "%s"', 

307 user_id, 

308 self.users[user_id]["username"], 

309 ) 

310 return [self.users[user_id]] 

311 

312 return self._get_users_helper(params) 

313 

314 def get_users_by_username(self, username=None) -> List: 

315 """ 

316 Get a list of one or more users from the NEMO API in a dictionary. 

317 

318 The results will be cached in the NemoConnector 

319 instance to prevent multiple API queries if not necessary. 

320 

321 Parameters 

322 ---------- 

323 username : str or :obj:`list` of :obj:`str` 

324 The user(s) to fetch, as indexed by their usernames in the NEMO 

325 instance. If an empty list or None is given, all users 

326 will be returned. 

327 

328 Returns 

329 ------- 

330 users : list 

331 A list (could be empty) of users that match the ids and/or 

332 usernames given 

333 

334 Raises 

335 ------ 

336 requests.exceptions.HTTPError 

337 Raised if the request to the API did not go through correctly 

338 """ 

339 params = {} 

340 if isinstance(username, str): 

341 params["username__iexact"] = username 

342 if username in self.users_by_username: 

343 _logger.debug('Using cached user info for username "%s"', username) 

344 return [self.users_by_username[username]] 

345 else: 

346 params["username__in"] = ",".join(username) 

347 if all(uname in self.users_by_username for uname in username): 

348 _logger.debug("Using cache user info for users with id in %s", username) 

349 return [self.users_by_username[uname] for uname in username] 

350 

351 return self._get_users_helper(params) 

352 

353 def get_projects(self, proj_id: Union[int, List[int]]) -> List[dict]: 

354 """ 

355 Get a list of one or more projects from the NEMO API in a dictionary. 

356 

357 The local cache will be checked prior to fetching 

358 from the API to save a network request if possible. 

359 

360 Parameters 

361 ---------- 

362 proj_id 

363 The project(s) to fetch, as indexed by the NEMO instance (i.e. 

364 ``proj_id`` should be the internal primary key used by NEMO to 

365 identify the project. If an empty list is given, all projects 

366 will be returned. 

367 

368 Returns 

369 ------- 

370 projects : List[dict] 

371 A list (could be empty) of projects that match the id (or ids) given 

372 in ``proj_id`` 

373 

374 Raises 

375 ------ 

376 requests.exceptions.HTTPError 

377 Raised if the request to the API did not go through correctly 

378 """ 

379 if hasattr(proj_id, "__iter__"): 

380 # Handle empty list case - should return all projects 

381 if len(proj_id) == 0: 

382 params = {} 

383 elif all(p_id in self.projects for p_id in proj_id): 

384 _logger.debug( 

385 "Using cached project info for projects %s: %s", 

386 proj_id, 

387 [self.projects[p]["name"] for p in proj_id], 

388 ) 

389 return [self.projects[p_id] for p_id in proj_id] 

390 else: 

391 params = {"id__in": ",".join([str(i) for i in proj_id])} 

392 else: 

393 if proj_id in self.projects: 

394 _logger.debug( 

395 'Using cached project info for project %s: "%s"', 

396 proj_id, 

397 self.projects[proj_id]["name"], 

398 ) 

399 return [self.projects[proj_id]] 

400 params = {"id": proj_id} 

401 

402 projects = self._api_caller("GET", "projects/", params) 

403 

404 for params in projects: 

405 # expand the only_allow_tools node 

406 if "only_allow_tools" in params: 

407 params.update( 

408 { 

409 "only_allow_tools": [ 

410 self.get_tools(t)[0] for t in params["only_allow_tools"] 

411 ], 

412 }, 

413 ) 

414 self.projects[params["id"]] = params 

415 

416 return projects 

417 

418 def get_reservations( 

419 self, 

420 dt_from: datetime | None = None, 

421 dt_to: datetime | None = None, 

422 tool_id: Union[int, List[int]] | None = None, 

423 *, 

424 cancelled: bool | None = False, 

425 ) -> List[dict]: 

426 """ 

427 Get reservations from the NEMO API filtered in various ways. 

428 

429 Return a list of reservations from the API, filtered by date 

430 (inclusive). If only one argument is provided, the API will return all 

431 reservations either before or after the parameter. With no arguments, 

432 the method will return all reservations. The method will 

433 "auto-expand" linked information such as user, project, tool, etc. so 

434 results will have a full dictionary for each of those fields, rather 

435 than just the index (as returned from the API). 

436 

437 Parameters 

438 ---------- 

439 dt_from 

440 The "starting point" of the time range; only reservations at or 

441 after this point in time will be returned 

442 dt_to 

443 The "ending point" of the time range; only reservations at 

444 or prior to this point in time will be returned 

445 tool_id 

446 A tool identifier (or list of them) to limit the scope of the 

447 reservation search (this should be the NEMO internal integer ID) 

448 cancelled 

449 Whether to get canceled or active reservations in the response 

450 (default is False -- meaning non-cancelled active reservations 

451 are returned by default). Set to None to include all reservations 

452 regardless of whether they are cancelled or active. 

453 

454 Returns 

455 ------- 

456 reservations : List[dict] 

457 A list (could be empty) of reservations that match the date range 

458 supplied 

459 """ 

460 params = {} 

461 

462 if dt_from: 

463 params["start__gte"] = self.strftime(dt_from) 

464 if dt_to: 

465 params["end__lte"] = self.strftime(dt_to) 

466 if cancelled is not None: 

467 params["cancelled"] = cancelled 

468 

469 if tool_id is not None: 

470 if isinstance(tool_id, list): 

471 params.update({"tool_id__in": ",".join([str(i) for i in tool_id])}) 

472 else: 

473 params.update({"tool_id": str(tool_id)}) 

474 

475 reservations = self._api_caller("GET", "reservations/", params) 

476 

477 parsed_reservations = [] 

478 for reservation in reservations: 

479 parsed_reservations.append(self._parse_reservation(reservation)) 

480 

481 return parsed_reservations 

482 

483 def _parse_reservation(self, reservation: dict) -> dict: 

484 # expand various fields within the reservation data 

485 if reservation["user"]: 

486 user = self.get_users(reservation["user"]) 

487 if user: 

488 reservation.update({"user": user[0]}) 

489 if reservation["creator"]: 

490 user = self.get_users(reservation["creator"]) 

491 if user: 

492 reservation.update({"creator": user[0]}) 

493 if reservation["tool"]: 

494 tool = self.get_tools(reservation["tool"]) 

495 if tool: 

496 reservation.update({"tool": tool[0]}) 

497 if reservation["project"]: 

498 params = self.get_projects(reservation["project"]) 

499 if params: 

500 reservation.update({"project": params[0]}) 

501 if reservation["cancelled_by"]: 

502 user = self.get_users(reservation["cancelled_by"]) 

503 if user: 

504 reservation.update({"cancelled_by": user[0]}) 

505 

506 return reservation 

507 

508 def get_usage_events( 

509 self, 

510 event_id: Union[int, List[int]] | None = None, 

511 user: Union[str, int] | None = None, 

512 dt_range: Tuple[datetime | None, datetime | None] | None = None, 

513 tool_id: Union[int, List[int]] | None = None, 

514 ) -> List: 

515 """ 

516 Get usage events from the NEMO API filtered in various ways. 

517 

518 Return a list of usage events from the API, filtered by date 

519 (inclusive). If only one argument is provided, the API will return all 

520 reservations either before or after the parameter. With no arguments, 

521 the method will return all reservations. The method will 

522 "auto-expand" linked information such as user, project, tool, etc. so 

523 results will have a full dictionary for each of those fields, rather 

524 than just the index (as returned from the API). 

525 

526 Parameters 

527 ---------- 

528 event_id 

529 The NEMO integer identifier (or a list of them) to fetch. If 

530 ``None``, the returned usage events will not be filtered by ID 

531 number 

532 user 

533 The user for which to fetch usage events, as either an integer 

534 representing their id in the NEMO instance, or as a string 

535 containing their username. If ``None`` is given, usage events 

536 from all users will be returned. 

537 dt_range 

538 The "starting" and "end" points of the time range; only usage events 

539 starting between these points in time will be returned 

540 tool_id 

541 A tool identifier (or list of them) to limit the scope of the 

542 usage event search (this should be the NEMO internal integer ID). 

543 Regardless of what value is given, this method will always limit 

544 the API query to tools specified in the NexusLIMS DB for this 

545 harvester 

546 

547 Returns 

548 ------- 

549 usage_events : List 

550 A list (could be empty) of usage events that match the filters 

551 supplied 

552 """ 

553 params = self._parse_dt_range(dt_range, {}) 

554 

555 if event_id is not None: 

556 if hasattr(event_id, "__iter__"): 

557 params.update({"id__in": ",".join([str(i) for i in event_id])}) 

558 else: 

559 params.update({"id": event_id}) 

560 if user: 

561 if isinstance(user, str): 

562 u_id = self.get_users_by_username(user)[0]["id"] 

563 else: 

564 u_id = user 

565 params["user_id"] = u_id 

566 

567 # filtering for tool; if at the end of this block tool_id is an empty 

568 # list, we should just immediately return an empty list, since either 

569 # there were no tools for this connector in our DB, or the tools 

570 # specified were not found in the DB, so we know there are no 

571 # usage_events of interest 

572 # 

573 # NOTE: Skip tool_id filtering if event_id is specified, since an event ID 

574 # uniquely identifies an event and adding tool_id__in causes NEMO API 

575 # to reject the request with 400 Bad Request 

576 if event_id is None: 

577 this_connectors_tools = self.get_known_tool_ids() 

578 if tool_id is None: 

579 # by default (no tool_id specified), we should fetch events from 

580 # only the tools known to the NexusLIMS DB for this connector 

581 tool_id = this_connectors_tools 

582 if isinstance(tool_id, int): 

583 # coerce tool_id to list to make subsequent processing easier 

584 tool_id = [tool_id] 

585 

586 # limit tool_id to values that are present in this_connectors_tools 

587 tool_id = [i for i in tool_id if i in this_connectors_tools] 

588 

589 # if tool_id is empty, we should just return 

590 if not tool_id: 

591 return [] 

592 

593 params.update({"tool_id__in": ",".join([str(i) for i in tool_id])}) 

594 

595 usage_events = self._api_caller("GET", "usage_events/", params) 

596 

597 parsed_events = [] 

598 for event in usage_events: 

599 parsed_events.append(self._parse_event(event)) 

600 

601 return parsed_events 

602 

603 def _parse_dt_range( 

604 self, 

605 dt_range: Tuple[datetime | None, datetime | None] | None, 

606 params: dict, 

607 ) -> dict: 

608 if dt_range is not None: 

609 dt_from, dt_to = dt_range 

610 if dt_from is not None: 

611 params["start__gte"] = self.strftime(dt_from) 

612 if dt_to is not None: 

613 params["end__lte"] = self.strftime(dt_to) 

614 return params 

615 

616 def _parse_event(self, event: dict) -> dict: 

617 # expand various fields within the usage event data 

618 if event["user"]: 

619 user = self.get_users(event["user"]) 

620 if user: 

621 event.update({"user": user[0]}) 

622 if event["operator"]: 

623 user = self.get_users(event["operator"]) 

624 if user: 

625 event.update({"operator": user[0]}) 

626 if event["project"]: 

627 proj = self.get_projects(event["project"]) 

628 if proj: 

629 event.update({"project": proj[0]}) 

630 if event["tool"]: 

631 tool = self.get_tools(event["tool"]) 

632 if tool: 

633 event.update({"tool": tool[0]}) 

634 return event 

635 

636 def write_usage_event_to_session_log(self, event_id: int) -> None: 

637 """ 

638 Write a usage event to the NexusLIMS database session log. 

639 

640 Inserts two rows (if needed) into the ``session_log`` (marking the start 

641 and end of a usage event), only for instruments recognized by 

642 NexusLIMS (i.e. that have a row in the ``instruments`` table of the DB). 

643 If the usage event has not ended yet, no action is performed. 

644 

645 Parameters 

646 ---------- 

647 event_id 

648 The NEMO id number for the event to insert 

649 

650 """ 

651 event = self.get_usage_events(event_id=event_id) 

652 if event: 

653 # get_usage_events returns list, so pick out first one 

654 event = event[0] 

655 tool_api_url = f"{self.config['base_url']}tools/?id={event['tool']['id']}" 

656 instr = get_instr_from_api_url(tool_api_url) 

657 if instr is None: # pragma: no cover 

658 # this shouldn't happen since we limit our usage event API call 

659 # only to instruments contained in our DB, but we can still 

660 # defend against it regardless 

661 _logger.warning( 

662 "Usage event %s was for an instrument (%s) not known " 

663 "to NexusLIMS, so no records will be added to DB.", 

664 event_id, 

665 tool_api_url, 

666 ) 

667 return 

668 if event["end"] is None: 

669 _logger.warning( 

670 "Usage event %s has not yet ended, so no records " 

671 "will be added to DB.", 

672 event_id, 

673 ) 

674 return 

675 session_id = f"{self.config['base_url']}usage_events/?id={event['id']}" 

676 

677 # try to insert start log 

678 start_log = SessionLog( 

679 session_identifier=session_id, 

680 instrument=instr.name, 

681 # SQLModel handles datetime, no need for isoformat() 

682 timestamp=self.strptime(event["start"]), 

683 event_type=EventType.START, 

684 user=event["user"]["username"], 

685 record_status=RecordStatus.TO_BE_BUILT, 

686 ) 

687 start_log.insert_log() # insert_log() is idempotent, handles duplicates 

688 

689 # try to insert end log 

690 end_log = SessionLog( 

691 session_identifier=session_id, 

692 instrument=instr.name, 

693 # SQLModel handles datetime, no need for isoformat() 

694 timestamp=self.strptime(event["end"]), 

695 event_type=EventType.END, 

696 user=event["user"]["username"], 

697 record_status=RecordStatus.TO_BE_BUILT, 

698 ) 

699 end_log.insert_log() # insert_log() is idempotent, handles duplicates 

700 else: 

701 _logger.warning( 

702 "No usage event with id = %s was found for %s", 

703 event_id, 

704 self, 

705 ) 

706 

707 def get_session_from_usage_event(self, event_id: int) -> Session | None: 

708 """ 

709 Get a Session representation of a usage event. 

710 

711 Get a :py:class:`~nexusLIMS.db.session_handler.Session` 

712 representation of a usage event for use in dry runs of the record 

713 builder. 

714 

715 Parameters 

716 ---------- 

717 event_id 

718 The NEMO id number for the event to insert 

719 

720 Returns 

721 ------- 

722 session : ~nexusLIMS.db.session_handler.Session 

723 A representation of the usage_event from NEMO as a 

724 :py:class:`~nexusLIMS.db.session_handler.Session` object 

725 """ 

726 event = self.get_usage_events(event_id=event_id) 

727 if event: 

728 event = event[0] 

729 # we cannot reliably test an unended event, so exlcude from coverage 

730 if event["start"] is not None and event["end"] is None: # pragma: no cover 

731 _logger.warning( 

732 "Usage event with id = %s has not yet ended for '%s'", 

733 event_id, 

734 self, 

735 ) 

736 return None 

737 instr = get_instr_from_api_url( 

738 f"{self.config['base_url']}tools/?id={event['tool']['id']}", 

739 ) 

740 session_id = f"{self.config['base_url']}usage_events/?id={event_id}" 

741 return Session( 

742 session_identifier=session_id, 

743 instrument=instr, 

744 dt_range=(self.strptime(event["start"]), self.strptime(event["end"])), 

745 user=event["user"]["username"], 

746 ) 

747 

748 _logger.warning("No usage event with id = %s found for '%s'", event_id, self) 

749 return None 

750 

751 def get_known_tool_ids(self) -> List[int]: 

752 """ 

753 Get NEMO tool ID values from the NexusLIMS database. 

754 

755 Inspect the ``api_url`` values of known Instruments (from 

756 the ``instruments`` table in the DB), and extract their tool_id number 

757 if it is from this NemoConnector. 

758 

759 Returns 

760 ------- 

761 tool_ids : List[int] 

762 The list of tool ID numbers known to NexusLIMS for this harvester 

763 """ 

764 tool_ids = [] 

765 _ensure_instrument_db_loaded() 

766 

767 for _, v in instrument_db.items(): 

768 if self.config["base_url"] in v.api_url: 

769 # Instrument is associated with this connector 

770 parsed_url = urlparse(v.api_url) 

771 # extract 'id' query parameter from url 

772 tool_id = parse_qs(parsed_url.query)["id"][0] 

773 tool_ids.append(int(tool_id)) 

774 

775 return tool_ids 

776 

777 def _get_users_helper(self, params: dict[str, str]) -> list: 

778 """ 

779 Call the users API with certain parameters. 

780 

781 Parameters 

782 ---------- 

783 params 

784 A dictionary of query parameters for the API query 

785 

786 Returns 

787 ------- 

788 users 

789 A list (could be empty) of users that match the ids and/or 

790 usernames given 

791 """ 

792 users = self._api_caller("GET", "users/", params) 

793 for user in users: 

794 # cache the users response by ID and username 

795 self.users[user["id"]] = user 

796 self.users_by_username[user["username"]] = user 

797 

798 return users 

799 

800 def _build_url_with_params(self, base_url: str, params: dict | None) -> str: 

801 """ 

802 Build URL with query string, preserving unencoded commas for __in filters. 

803 

804 Django's ORM expects __in filter syntax like: ?tool_id__in=1,3,10,999 

805 The requests library would encode this as: ?tool_id__in=1%2C3%2C10%2C999 

806 which breaks Django's queryset filtering because Django splits on commas: 

807 ``queryset.filter(tool_id__in=request.GET.get('tool_id__in').split(','))`` 

808 

809 This method manually constructs the query string to keep commas unencoded 

810 in __in filter values while still properly encoding other special characters. 

811 

812 Parameters 

813 ---------- 

814 base_url 

815 The base URL without query parameters 

816 params 

817 Dictionary of query parameters. Keys ending with '__in' will have 

818 their comma-separated values preserved without encoding the commas. 

819 

820 Returns 

821 ------- 

822 str 

823 Complete URL with properly encoded query string 

824 

825 Examples 

826 -------- 

827 >>> connector._build_url_with_params( 

828 ... "http://api.example.com/events/", 

829 ... {"tool_id__in": "1,3,10", "start": "2025-01-01T00:00:00"} 

830 ... ) 

831 'http://api.example.com/events/?start=2025-01-01T00%3A00%3A00&tool_id__in=1,3,10' 

832 """ 

833 if not params: 

834 return base_url 

835 

836 # Separate __in parameters from regular parameters 

837 in_params = {k: v for k, v in params.items() if k.endswith("__in")} 

838 regular_params = {k: v for k, v in params.items() if not k.endswith("__in")} 

839 

840 # Build query string components 

841 query_parts = [] 

842 

843 # Add regular params (URL-encoded via urlencode) 

844 if regular_params: 

845 query_parts.append(urlencode(regular_params)) 

846 

847 # Add __in params (manually constructed with unencoded commas) 

848 for key, value in in_params.items(): 

849 # Quote the key and value but preserve commas in the value 

850 encoded_key = quote(key, safe="") 

851 # safe=',' tells quote() to not encode commas 

852 encoded_value = quote(str(value), safe=",") 

853 query_parts.append(f"{encoded_key}={encoded_value}") 

854 

855 # Combine base URL with query string 

856 query_string = "&".join(query_parts) 

857 separator = "&" if "?" in base_url else "?" 

858 return f"{base_url}{separator}{query_string}" 

859 

860 def _api_caller( 

861 self, 

862 verb: str, 

863 endpoint: str, 

864 params: dict[str, str], 

865 ) -> List[dict[str, Any]]: 

866 """ 

867 Make a call to the NEMO API. 

868 

869 Helper function to deduplicate actual calls to the API. Takes care of 

870 building the full URL from the base URL and specific endpoint, 

871 as well as authentication, passing of parameters, and parsing the 

872 results. 

873 

874 Parameters 

875 ---------- 

876 verb 

877 The ``requests`` verb (``'POST'``, ``'GET'``, 

878 ``'PATCH'``, etc.) to use for the API request 

879 endpoint 

880 The API endpoint to use. Should be formatted with a trailing 

881 slash and no leading slash. i.e. the endpoint for Projects data 

882 should be supplied as ``'projects/'`` 

883 params 

884 The URL parameters to pass along with the request. These are 

885 generally filters for the API data such as ``id`` or a date range 

886 or something similar. 

887 

888 Returns 

889 ------- 

890 results 

891 The API response, formatted as a list of dict objects 

892 """ 

893 base_url = urljoin(self.config["base_url"], endpoint) 

894 # Build complete URL with custom encoding for __in filters 

895 url = self._build_url_with_params(base_url, params) 

896 _logger.info("getting data from %s", url) 

897 response = nexus_req( 

898 url, 

899 verb, 

900 token_auth=self.config["token"], 

901 params=None, # Already included in URL 

902 retries=self.config["retries"], 

903 ) 

904 response.raise_for_status() 

905 

906 return response.json()