Coverage for nexusLIMS/harvesters/nemo/connector.py: 100%
272 statements
« prev ^ index » next coverage.py v7.11.3, created at 2026-03-24 05:23 +0000
« prev ^ index » next coverage.py v7.11.3, created at 2026-03-24 05:23 +0000
1"""Defines the NemoConnector class that is used to interface with the NEMO API."""
3import logging
4from datetime import datetime
5from typing import Any, List, Tuple, Union
6from urllib.parse import parse_qs, quote, urlencode, urljoin, urlparse
8from pytz import timezone as pytz_timezone
10from nexusLIMS.db.enums import EventType, RecordStatus
11from nexusLIMS.db.models import SessionLog
12from nexusLIMS.db.session_handler import Session
13from nexusLIMS.instruments import (
14 _ensure_instrument_db_loaded,
15 get_instr_from_api_url,
16 instrument_db,
17)
18from nexusLIMS.utils.network import nexus_req
20_logger = logging.getLogger(__name__)
23class NemoConnector:
24 """
25 A connection to an instance of the API of the NEMO laboratory management software.
27 Provides helper methods for fetching data from the API.
29 Parameters
30 ----------
31 base_url : str
32 The "root" of the API including a trailing slash;
33 e.g. 'https://nemo.example.com/api/'
34 token : str
35 An authentication token for this NEMO instance
36 strftime_fmt : str
37 The "date format" to use when encoding dates to send as filters to the
38 NEMO API. Should follow the same convention as
39 :ref:`strftime-strptime-behavior`. If ``None``, ISO 8601 format
40 will be used.
41 strptime_fmt : str
42 The "date format" to use when decoding date strings received in the
43 response from the API. Should follow the same convention as
44 :ref:`strftime-strptime-behavior`. If ``None``, ISO 8601 format
45 will be used.
46 timezone : str
47 The timezone to use when decoding date strings received in the
48 response from the API. Should be a IANA time zone database string; e.g.
49 "America/New_York". Useful if no timezone information is returned
50 from an instance of the NEMO API. If ``None``, no timezone setting will
51 be done and the code will use whatever was returned from the server
52 as is.
53 retries : int
54 The number of retries that will be used for failed HTTP requests before
55 actually failing.
56 """
58 tools: dict[int, dict]
59 users: dict[int, dict]
60 users_by_username: dict[str, dict]
61 projects: dict[int, dict]
63 def __init__( # pylint: disable=too-many-arguments,too-many-positional-arguments # noqa: PLR0913
64 self,
65 base_url: str,
66 token: str,
67 strftime_fmt: str | None = None,
68 strptime_fmt: str | None = None,
69 timezone: str | None = None,
70 retries: int = 5,
71 ):
72 self.config = {
73 "base_url": base_url,
74 "token": token,
75 "strftime_fmt": strftime_fmt,
76 "strptime_fmt": strptime_fmt,
77 "timezone": timezone,
78 "retries": retries,
79 }
81 # these attributes are used for "memoization" of NEMO content,
82 # so it can be remembered and used for a cache lookup
83 # keys should be NEMO internal IDs and values should be the
84 # dictionary returned by the API
85 self.tools = {}
86 self.users = {}
87 self.users_by_username = {}
88 self.projects = {}
90 def __repr__(self):
91 """Return custom representation of a NemoConnector."""
92 return f"Connection to NEMO API at {self.config['base_url']}"
94 def __eq__(self, other):
95 """Compare two NemoConnector instances based on their config."""
96 if not isinstance(other, NemoConnector):
97 return False
98 return self.config == other.config
100 def __hash__(self):
101 """Return hash of NemoConnector based on its config."""
102 return hash(frozenset(self.config.items()))
104 def strftime(self, date_dt) -> str:
105 """
106 Convert datetime to appropriate string format for this connector.
108 Using the settings for this NemoConnector, convert a datetime object
109 to a string that will be understood by the API. If the ``strftime_fmt``
110 attribute for this NemoConnector is ``None``, ISO 8601 format will be
111 used.
113 Parameters
114 ----------
115 date_dt
116 The date to be converted as a datetime object
118 Returns
119 -------
120 date_str : str
121 The date formatted as a string that will be understandable by the
122 API for this NemoConnector
123 """
124 if self.config["strftime_fmt"] is None:
125 return date_dt.isoformat()
127 if (
128 "%z" in self.config["strftime_fmt"] or "%Z" in self.config["strftime_fmt"]
129 ) and date_dt.tzinfo is None:
130 # make sure datetime is timezone aware if timezone is
131 # indicated in strftime_fmt. Use NEMO_tz setting if present,
132 # otherwise use local server timezone
133 if self.config["timezone"]:
134 date_dt = pytz_timezone(self.config["timezone"]).localize(date_dt)
135 else:
136 date_dt = date_dt.astimezone()
137 return date_dt.strftime(self.config["strftime_fmt"])
139 def strptime(self, date_str) -> datetime:
140 """
141 Convert string to datetime using this connector's API date format.
143 Using the settings for this NemoConnector, convert a datetime string
144 representation from the API into a datetime object that can be used
145 in Python. If the ``strptime_fmt`` attribute for this NemoConnector
146 is ``None``, ISO 8601 format will be assumed. If a timezone is
147 specified for this server, the resulting datetime will be coerced to
148 that timezone.
150 Parameters
151 ----------
152 date_str
153 The date formatted as a string that is returned by the
154 API for this NemoConnector
156 Returns
157 -------
158 date_dt : ~datetime.datetime
159 The date to be converted as a datetime object
160 """
161 if self.config["strptime_fmt"] is None:
162 date_dt = datetime.fromisoformat(date_str)
163 else:
164 # to be defensive here, try without microseconds as well if ".%f"
165 # is in strptime_fmt and it fails (since sometimes NEMO doesn't
166 # write microseconds for every time, even if it's supposed to
167 fmt = self.config["strptime_fmt"]
168 has_tz_directive = "%z" in fmt or "%Z" in fmt
169 try:
170 date_dt = datetime.strptime(date_str, fmt) # noqa: DTZ007
171 # Only strip timezone if format doesn't include timezone directives
172 if not has_tz_directive:
173 date_dt = date_dt.replace(tzinfo=None)
174 except ValueError as exception:
175 if ".%f" in fmt:
176 date_dt = datetime.strptime( # noqa: DTZ007
177 date_str,
178 fmt.replace(".%f", ""),
179 )
180 # Only strip timezone if no timezone directives
181 if not has_tz_directive:
182 date_dt = date_dt.replace(tzinfo=None)
183 else:
184 raise ValueError(str(exception)) from exception # pragma: no cover
186 if self.config["timezone"]:
187 target_tz = pytz_timezone(self.config["timezone"])
188 if date_dt.tzinfo is None:
189 # Localize naive datetime to the configured timezone
190 date_dt = target_tz.localize(date_dt)
191 else:
192 # If datetime already has a timezone, it is replaced with the
193 # configured timezone. This is done by taking the naive
194 # datetime and localizing it to the new timezone. This does
195 # NOT preserve the moment in time but is useful for
196 # correcting a misconfigured server.
197 date_dt = target_tz.localize(date_dt.replace(tzinfo=None))
199 return date_dt
201 def get_tools(self, tool_id: Union[int, List[int]]) -> List[dict]:
202 """
203 Get a list of one or more tools from the NEMO API in a dictionary.
205 Parameters
206 ----------
207 tool_id
208 The tool(s) to fetch, as indexed by the NEMO instance (i.e.
209 ``tool_id`` should be the internal primary key used by NEMO to
210 identify the tool. If an empty list is given, all tools will be
211 returned.
213 Returns
214 -------
215 tools : List[dict]
216 A list (could be empty) of tools that match the id (or ids) given
217 in ``tool_id``
219 Raises
220 ------
221 requests.exceptions.HTTPError
222 Raised if the request to the API did not go through correctly
223 """
224 if hasattr(tool_id, "__iter__"):
225 # Handle empty list case - should return all tools
226 if len(tool_id) == 0:
227 params = {}
228 elif all(t_id in self.tools for t_id in tool_id):
229 _logger.debug(
230 "Using cached tool info for tools %s: %s",
231 tool_id,
232 [self.tools[t]["name"] for t in tool_id],
233 )
234 return [self.tools[t_id] for t_id in tool_id]
235 else:
236 params = {"id__in": ",".join([str(i) for i in tool_id])}
237 else:
238 if tool_id in self.tools:
239 _logger.debug(
240 'Using cached tool info for tool %s: "%s"',
241 tool_id,
242 self.tools[tool_id]["name"],
243 )
244 return [self.tools[tool_id]]
245 params = {"id": tool_id}
247 tools = self._api_caller("GET", "tools/", params)
249 for tool in tools:
250 # cache the tool results
251 self.tools[tool["id"]] = tool
253 return tools
255 def get_users(self, user_id: Union[int, List[int]] | None = None) -> List[dict]:
256 """
257 Get a list of one or more users from the NEMO API in a dictionary.
259 The results will be cached in the NemoConnector instance to prevent multiple
260 API queries if not necessary.
262 Parameters
263 ----------
264 user_id
265 The user(s) to fetch, as indexed by the NEMO instance (i.e.
266 ``user_id`` should be the internal primary key used by NEMO to
267 identify the user. If an empty list or None is given, all users
268 will be returned.
270 Returns
271 -------
272 users : List[dict]
273 A list (could be empty) of users that match the ids and/or
274 usernames given
276 Raises
277 ------
278 requests.exceptions.HTTPError
279 Raised if the request to the API did not go through correctly
280 """
281 params = {}
283 # Check if user_id is None - return all users
284 if user_id is None:
285 return self._get_users_helper(params)
287 # list of user ids
288 if hasattr(user_id, "__iter__"):
289 # Check if user_id is an empty list - if so, return all users
290 if len(user_id) == 0:
291 return self._get_users_helper(params)
293 params["id__in"] = ",".join([str(i) for i in user_id])
294 if all(u_id in self.users for u_id in user_id):
295 _logger.debug(
296 "Using cached user info for users with id in %s: %s",
297 user_id,
298 [self.users[u]["username"] for u in user_id],
299 )
300 return [self.users[u_id] for u_id in user_id]
301 # single user id
302 else:
303 params["id"] = user_id
304 if user_id in self.users:
305 _logger.debug(
306 'Using cached user info for user id %s: "%s"',
307 user_id,
308 self.users[user_id]["username"],
309 )
310 return [self.users[user_id]]
312 return self._get_users_helper(params)
314 def get_users_by_username(self, username=None) -> List:
315 """
316 Get a list of one or more users from the NEMO API in a dictionary.
318 The results will be cached in the NemoConnector
319 instance to prevent multiple API queries if not necessary.
321 Parameters
322 ----------
323 username : str or :obj:`list` of :obj:`str`
324 The user(s) to fetch, as indexed by their usernames in the NEMO
325 instance. If an empty list or None is given, all users
326 will be returned.
328 Returns
329 -------
330 users : list
331 A list (could be empty) of users that match the ids and/or
332 usernames given
334 Raises
335 ------
336 requests.exceptions.HTTPError
337 Raised if the request to the API did not go through correctly
338 """
339 params = {}
340 if isinstance(username, str):
341 params["username__iexact"] = username
342 if username in self.users_by_username:
343 _logger.debug('Using cached user info for username "%s"', username)
344 return [self.users_by_username[username]]
345 else:
346 params["username__in"] = ",".join(username)
347 if all(uname in self.users_by_username for uname in username):
348 _logger.debug("Using cache user info for users with id in %s", username)
349 return [self.users_by_username[uname] for uname in username]
351 return self._get_users_helper(params)
353 def get_projects(self, proj_id: Union[int, List[int]]) -> List[dict]:
354 """
355 Get a list of one or more projects from the NEMO API in a dictionary.
357 The local cache will be checked prior to fetching
358 from the API to save a network request if possible.
360 Parameters
361 ----------
362 proj_id
363 The project(s) to fetch, as indexed by the NEMO instance (i.e.
364 ``proj_id`` should be the internal primary key used by NEMO to
365 identify the project. If an empty list is given, all projects
366 will be returned.
368 Returns
369 -------
370 projects : List[dict]
371 A list (could be empty) of projects that match the id (or ids) given
372 in ``proj_id``
374 Raises
375 ------
376 requests.exceptions.HTTPError
377 Raised if the request to the API did not go through correctly
378 """
379 if hasattr(proj_id, "__iter__"):
380 # Handle empty list case - should return all projects
381 if len(proj_id) == 0:
382 params = {}
383 elif all(p_id in self.projects for p_id in proj_id):
384 _logger.debug(
385 "Using cached project info for projects %s: %s",
386 proj_id,
387 [self.projects[p]["name"] for p in proj_id],
388 )
389 return [self.projects[p_id] for p_id in proj_id]
390 else:
391 params = {"id__in": ",".join([str(i) for i in proj_id])}
392 else:
393 if proj_id in self.projects:
394 _logger.debug(
395 'Using cached project info for project %s: "%s"',
396 proj_id,
397 self.projects[proj_id]["name"],
398 )
399 return [self.projects[proj_id]]
400 params = {"id": proj_id}
402 projects = self._api_caller("GET", "projects/", params)
404 for params in projects:
405 # expand the only_allow_tools node
406 if "only_allow_tools" in params:
407 params.update(
408 {
409 "only_allow_tools": [
410 self.get_tools(t)[0] for t in params["only_allow_tools"]
411 ],
412 },
413 )
414 self.projects[params["id"]] = params
416 return projects
418 def get_reservations(
419 self,
420 dt_from: datetime | None = None,
421 dt_to: datetime | None = None,
422 tool_id: Union[int, List[int]] | None = None,
423 *,
424 cancelled: bool | None = False,
425 ) -> List[dict]:
426 """
427 Get reservations from the NEMO API filtered in various ways.
429 Return a list of reservations from the API, filtered by date
430 (inclusive). If only one argument is provided, the API will return all
431 reservations either before or after the parameter. With no arguments,
432 the method will return all reservations. The method will
433 "auto-expand" linked information such as user, project, tool, etc. so
434 results will have a full dictionary for each of those fields, rather
435 than just the index (as returned from the API).
437 Parameters
438 ----------
439 dt_from
440 The "starting point" of the time range; only reservations at or
441 after this point in time will be returned
442 dt_to
443 The "ending point" of the time range; only reservations at
444 or prior to this point in time will be returned
445 tool_id
446 A tool identifier (or list of them) to limit the scope of the
447 reservation search (this should be the NEMO internal integer ID)
448 cancelled
449 Whether to get canceled or active reservations in the response
450 (default is False -- meaning non-cancelled active reservations
451 are returned by default). Set to None to include all reservations
452 regardless of whether they are cancelled or active.
454 Returns
455 -------
456 reservations : List[dict]
457 A list (could be empty) of reservations that match the date range
458 supplied
459 """
460 params = {}
462 if dt_from:
463 params["start__gte"] = self.strftime(dt_from)
464 if dt_to:
465 params["end__lte"] = self.strftime(dt_to)
466 if cancelled is not None:
467 params["cancelled"] = cancelled
469 if tool_id is not None:
470 if isinstance(tool_id, list):
471 params.update({"tool_id__in": ",".join([str(i) for i in tool_id])})
472 else:
473 params.update({"tool_id": str(tool_id)})
475 reservations = self._api_caller("GET", "reservations/", params)
477 parsed_reservations = []
478 for reservation in reservations:
479 parsed_reservations.append(self._parse_reservation(reservation))
481 return parsed_reservations
483 def _parse_reservation(self, reservation: dict) -> dict:
484 # expand various fields within the reservation data
485 if reservation["user"]:
486 user = self.get_users(reservation["user"])
487 if user:
488 reservation.update({"user": user[0]})
489 if reservation["creator"]:
490 user = self.get_users(reservation["creator"])
491 if user:
492 reservation.update({"creator": user[0]})
493 if reservation["tool"]:
494 tool = self.get_tools(reservation["tool"])
495 if tool:
496 reservation.update({"tool": tool[0]})
497 if reservation["project"]:
498 params = self.get_projects(reservation["project"])
499 if params:
500 reservation.update({"project": params[0]})
501 if reservation["cancelled_by"]:
502 user = self.get_users(reservation["cancelled_by"])
503 if user:
504 reservation.update({"cancelled_by": user[0]})
506 return reservation
508 def get_usage_events(
509 self,
510 event_id: Union[int, List[int]] | None = None,
511 user: Union[str, int] | None = None,
512 dt_range: Tuple[datetime | None, datetime | None] | None = None,
513 tool_id: Union[int, List[int]] | None = None,
514 ) -> List:
515 """
516 Get usage events from the NEMO API filtered in various ways.
518 Return a list of usage events from the API, filtered by date
519 (inclusive). If only one argument is provided, the API will return all
520 reservations either before or after the parameter. With no arguments,
521 the method will return all reservations. The method will
522 "auto-expand" linked information such as user, project, tool, etc. so
523 results will have a full dictionary for each of those fields, rather
524 than just the index (as returned from the API).
526 Parameters
527 ----------
528 event_id
529 The NEMO integer identifier (or a list of them) to fetch. If
530 ``None``, the returned usage events will not be filtered by ID
531 number
532 user
533 The user for which to fetch usage events, as either an integer
534 representing their id in the NEMO instance, or as a string
535 containing their username. If ``None`` is given, usage events
536 from all users will be returned.
537 dt_range
538 The "starting" and "end" points of the time range; only usage events
539 starting between these points in time will be returned
540 tool_id
541 A tool identifier (or list of them) to limit the scope of the
542 usage event search (this should be the NEMO internal integer ID).
543 Regardless of what value is given, this method will always limit
544 the API query to tools specified in the NexusLIMS DB for this
545 harvester
547 Returns
548 -------
549 usage_events : List
550 A list (could be empty) of usage events that match the filters
551 supplied
552 """
553 params = self._parse_dt_range(dt_range, {})
555 if event_id is not None:
556 if hasattr(event_id, "__iter__"):
557 params.update({"id__in": ",".join([str(i) for i in event_id])})
558 else:
559 params.update({"id": event_id})
560 if user:
561 if isinstance(user, str):
562 u_id = self.get_users_by_username(user)[0]["id"]
563 else:
564 u_id = user
565 params["user_id"] = u_id
567 # filtering for tool; if at the end of this block tool_id is an empty
568 # list, we should just immediately return an empty list, since either
569 # there were no tools for this connector in our DB, or the tools
570 # specified were not found in the DB, so we know there are no
571 # usage_events of interest
572 #
573 # NOTE: Skip tool_id filtering if event_id is specified, since an event ID
574 # uniquely identifies an event and adding tool_id__in causes NEMO API
575 # to reject the request with 400 Bad Request
576 if event_id is None:
577 this_connectors_tools = self.get_known_tool_ids()
578 if tool_id is None:
579 # by default (no tool_id specified), we should fetch events from
580 # only the tools known to the NexusLIMS DB for this connector
581 tool_id = this_connectors_tools
582 if isinstance(tool_id, int):
583 # coerce tool_id to list to make subsequent processing easier
584 tool_id = [tool_id]
586 # limit tool_id to values that are present in this_connectors_tools
587 tool_id = [i for i in tool_id if i in this_connectors_tools]
589 # if tool_id is empty, we should just return
590 if not tool_id:
591 return []
593 params.update({"tool_id__in": ",".join([str(i) for i in tool_id])})
595 usage_events = self._api_caller("GET", "usage_events/", params)
597 parsed_events = []
598 for event in usage_events:
599 parsed_events.append(self._parse_event(event))
601 return parsed_events
603 def _parse_dt_range(
604 self,
605 dt_range: Tuple[datetime | None, datetime | None] | None,
606 params: dict,
607 ) -> dict:
608 if dt_range is not None:
609 dt_from, dt_to = dt_range
610 if dt_from is not None:
611 params["start__gte"] = self.strftime(dt_from)
612 if dt_to is not None:
613 params["end__lte"] = self.strftime(dt_to)
614 return params
616 def _parse_event(self, event: dict) -> dict:
617 # expand various fields within the usage event data
618 if event["user"]:
619 user = self.get_users(event["user"])
620 if user:
621 event.update({"user": user[0]})
622 if event["operator"]:
623 user = self.get_users(event["operator"])
624 if user:
625 event.update({"operator": user[0]})
626 if event["project"]:
627 proj = self.get_projects(event["project"])
628 if proj:
629 event.update({"project": proj[0]})
630 if event["tool"]:
631 tool = self.get_tools(event["tool"])
632 if tool:
633 event.update({"tool": tool[0]})
634 return event
636 def write_usage_event_to_session_log(self, event_id: int) -> None:
637 """
638 Write a usage event to the NexusLIMS database session log.
640 Inserts two rows (if needed) into the ``session_log`` (marking the start
641 and end of a usage event), only for instruments recognized by
642 NexusLIMS (i.e. that have a row in the ``instruments`` table of the DB).
643 If the usage event has not ended yet, no action is performed.
645 Parameters
646 ----------
647 event_id
648 The NEMO id number for the event to insert
650 """
651 event = self.get_usage_events(event_id=event_id)
652 if event:
653 # get_usage_events returns list, so pick out first one
654 event = event[0]
655 tool_api_url = f"{self.config['base_url']}tools/?id={event['tool']['id']}"
656 instr = get_instr_from_api_url(tool_api_url)
657 if instr is None: # pragma: no cover
658 # this shouldn't happen since we limit our usage event API call
659 # only to instruments contained in our DB, but we can still
660 # defend against it regardless
661 _logger.warning(
662 "Usage event %s was for an instrument (%s) not known "
663 "to NexusLIMS, so no records will be added to DB.",
664 event_id,
665 tool_api_url,
666 )
667 return
668 if event["end"] is None:
669 _logger.warning(
670 "Usage event %s has not yet ended, so no records "
671 "will be added to DB.",
672 event_id,
673 )
674 return
675 session_id = f"{self.config['base_url']}usage_events/?id={event['id']}"
677 # try to insert start log
678 start_log = SessionLog(
679 session_identifier=session_id,
680 instrument=instr.name,
681 # SQLModel handles datetime, no need for isoformat()
682 timestamp=self.strptime(event["start"]),
683 event_type=EventType.START,
684 user=event["user"]["username"],
685 record_status=RecordStatus.TO_BE_BUILT,
686 )
687 start_log.insert_log() # insert_log() is idempotent, handles duplicates
689 # try to insert end log
690 end_log = SessionLog(
691 session_identifier=session_id,
692 instrument=instr.name,
693 # SQLModel handles datetime, no need for isoformat()
694 timestamp=self.strptime(event["end"]),
695 event_type=EventType.END,
696 user=event["user"]["username"],
697 record_status=RecordStatus.TO_BE_BUILT,
698 )
699 end_log.insert_log() # insert_log() is idempotent, handles duplicates
700 else:
701 _logger.warning(
702 "No usage event with id = %s was found for %s",
703 event_id,
704 self,
705 )
707 def get_session_from_usage_event(self, event_id: int) -> Session | None:
708 """
709 Get a Session representation of a usage event.
711 Get a :py:class:`~nexusLIMS.db.session_handler.Session`
712 representation of a usage event for use in dry runs of the record
713 builder.
715 Parameters
716 ----------
717 event_id
718 The NEMO id number for the event to insert
720 Returns
721 -------
722 session : ~nexusLIMS.db.session_handler.Session
723 A representation of the usage_event from NEMO as a
724 :py:class:`~nexusLIMS.db.session_handler.Session` object
725 """
726 event = self.get_usage_events(event_id=event_id)
727 if event:
728 event = event[0]
729 # we cannot reliably test an unended event, so exlcude from coverage
730 if event["start"] is not None and event["end"] is None: # pragma: no cover
731 _logger.warning(
732 "Usage event with id = %s has not yet ended for '%s'",
733 event_id,
734 self,
735 )
736 return None
737 instr = get_instr_from_api_url(
738 f"{self.config['base_url']}tools/?id={event['tool']['id']}",
739 )
740 session_id = f"{self.config['base_url']}usage_events/?id={event_id}"
741 return Session(
742 session_identifier=session_id,
743 instrument=instr,
744 dt_range=(self.strptime(event["start"]), self.strptime(event["end"])),
745 user=event["user"]["username"],
746 )
748 _logger.warning("No usage event with id = %s found for '%s'", event_id, self)
749 return None
751 def get_known_tool_ids(self) -> List[int]:
752 """
753 Get NEMO tool ID values from the NexusLIMS database.
755 Inspect the ``api_url`` values of known Instruments (from
756 the ``instruments`` table in the DB), and extract their tool_id number
757 if it is from this NemoConnector.
759 Returns
760 -------
761 tool_ids : List[int]
762 The list of tool ID numbers known to NexusLIMS for this harvester
763 """
764 tool_ids = []
765 _ensure_instrument_db_loaded()
767 for _, v in instrument_db.items():
768 if self.config["base_url"] in v.api_url:
769 # Instrument is associated with this connector
770 parsed_url = urlparse(v.api_url)
771 # extract 'id' query parameter from url
772 tool_id = parse_qs(parsed_url.query)["id"][0]
773 tool_ids.append(int(tool_id))
775 return tool_ids
777 def _get_users_helper(self, params: dict[str, str]) -> list:
778 """
779 Call the users API with certain parameters.
781 Parameters
782 ----------
783 params
784 A dictionary of query parameters for the API query
786 Returns
787 -------
788 users
789 A list (could be empty) of users that match the ids and/or
790 usernames given
791 """
792 users = self._api_caller("GET", "users/", params)
793 for user in users:
794 # cache the users response by ID and username
795 self.users[user["id"]] = user
796 self.users_by_username[user["username"]] = user
798 return users
800 def _build_url_with_params(self, base_url: str, params: dict | None) -> str:
801 """
802 Build URL with query string, preserving unencoded commas for __in filters.
804 Django's ORM expects __in filter syntax like: ?tool_id__in=1,3,10,999
805 The requests library would encode this as: ?tool_id__in=1%2C3%2C10%2C999
806 which breaks Django's queryset filtering because Django splits on commas:
807 ``queryset.filter(tool_id__in=request.GET.get('tool_id__in').split(','))``
809 This method manually constructs the query string to keep commas unencoded
810 in __in filter values while still properly encoding other special characters.
812 Parameters
813 ----------
814 base_url
815 The base URL without query parameters
816 params
817 Dictionary of query parameters. Keys ending with '__in' will have
818 their comma-separated values preserved without encoding the commas.
820 Returns
821 -------
822 str
823 Complete URL with properly encoded query string
825 Examples
826 --------
827 >>> connector._build_url_with_params(
828 ... "http://api.example.com/events/",
829 ... {"tool_id__in": "1,3,10", "start": "2025-01-01T00:00:00"}
830 ... )
831 'http://api.example.com/events/?start=2025-01-01T00%3A00%3A00&tool_id__in=1,3,10'
832 """
833 if not params:
834 return base_url
836 # Separate __in parameters from regular parameters
837 in_params = {k: v for k, v in params.items() if k.endswith("__in")}
838 regular_params = {k: v for k, v in params.items() if not k.endswith("__in")}
840 # Build query string components
841 query_parts = []
843 # Add regular params (URL-encoded via urlencode)
844 if regular_params:
845 query_parts.append(urlencode(regular_params))
847 # Add __in params (manually constructed with unencoded commas)
848 for key, value in in_params.items():
849 # Quote the key and value but preserve commas in the value
850 encoded_key = quote(key, safe="")
851 # safe=',' tells quote() to not encode commas
852 encoded_value = quote(str(value), safe=",")
853 query_parts.append(f"{encoded_key}={encoded_value}")
855 # Combine base URL with query string
856 query_string = "&".join(query_parts)
857 separator = "&" if "?" in base_url else "?"
858 return f"{base_url}{separator}{query_string}"
860 def _api_caller(
861 self,
862 verb: str,
863 endpoint: str,
864 params: dict[str, str],
865 ) -> List[dict[str, Any]]:
866 """
867 Make a call to the NEMO API.
869 Helper function to deduplicate actual calls to the API. Takes care of
870 building the full URL from the base URL and specific endpoint,
871 as well as authentication, passing of parameters, and parsing the
872 results.
874 Parameters
875 ----------
876 verb
877 The ``requests`` verb (``'POST'``, ``'GET'``,
878 ``'PATCH'``, etc.) to use for the API request
879 endpoint
880 The API endpoint to use. Should be formatted with a trailing
881 slash and no leading slash. i.e. the endpoint for Projects data
882 should be supplied as ``'projects/'``
883 params
884 The URL parameters to pass along with the request. These are
885 generally filters for the API data such as ``id`` or a date range
886 or something similar.
888 Returns
889 -------
890 results
891 The API response, formatted as a list of dict objects
892 """
893 base_url = urljoin(self.config["base_url"], endpoint)
894 # Build complete URL with custom encoding for __in filters
895 url = self._build_url_with_params(base_url, params)
896 _logger.info("getting data from %s", url)
897 response = nexus_req(
898 url,
899 verb,
900 token_auth=self.config["token"],
901 params=None, # Already included in URL
902 retries=self.config["retries"],
903 )
904 response.raise_for_status()
906 return response.json()