Coverage for nexusLIMS/harvesters/nemo/utils.py: 100%
77 statements
« prev ^ index » next coverage.py v7.11.3, created at 2026-03-24 05:23 +0000
« prev ^ index » next coverage.py v7.11.3, created at 2026-03-24 05:23 +0000
1"""Various utility functions used by the NEMO harvester."""
3import json
4import logging
5from datetime import datetime
6from typing import Dict, List, Tuple, Union
7from urllib.parse import parse_qs, urljoin, urlparse
9from nexusLIMS.config import settings
10from nexusLIMS.db.session_handler import Session
12from .connector import NemoConnector
14_logger = logging.getLogger(__name__)
17def get_harvesters_enabled() -> List[NemoConnector]:
18 """
19 Return a list of enabled connectors based off the environment.
21 Returns
22 -------
23 harvesters_enabled : List[NemoConnector]
24 A list of NemoConnector objects representing the NEMO APIs enabled
25 via environment settings
26 """
27 return [
28 NemoConnector(
29 base_url=str(config.address),
30 token=config.token,
31 strftime_fmt=(
32 config.strftime_fmt
33 if config.strftime_fmt != "%Y-%m-%dT%H:%M:%S%z"
34 else None
35 ),
36 strptime_fmt=(
37 config.strptime_fmt
38 if config.strptime_fmt != "%Y-%m-%dT%H:%M:%S%z"
39 else None
40 ),
41 timezone=config.tz,
42 )
43 for config in settings.nemo_harvesters().values()
44 ]
47def add_all_usage_events_to_db(
48 user: Union[str, int] | None = None,
49 dt_from: datetime | None = None,
50 dt_to: datetime | None = None,
51 tool_id: Union[int, List[int]] | None = None,
52):
53 """
54 Add all usage events to database for enabled NEMO connectors.
56 Loop through enabled NEMO connectors and add each one's usage events to
57 the NexusLIMS ``session_log`` database table (if required).
59 Parameters
60 ----------
61 user
62 The user(s) for which to add usage events. If ``None``, events will
63 not be filtered by user at all
64 dt_from
65 The point in time after which usage events will be added. If ``None``,
66 no date filtering will be performed
67 dt_to
68 The point in time before which usage events will be added. If
69 ``None``, no date filtering will be performed
70 tool_id
71 The tools(s) for which to add usage events. If ``'None'`` (default),
72 the tool IDs for each instrument in the NexusLIMS DB will be extracted
73 and used to limit the API response
74 """
75 for nemo_connector in get_harvesters_enabled():
76 events = nemo_connector.get_usage_events(
77 user=user,
78 dt_range=(dt_from, dt_to),
79 tool_id=tool_id,
80 )
81 for event in events:
82 nemo_connector.write_usage_event_to_session_log(event["id"])
85def get_usage_events_as_sessions(
86 user: Union[str, int] | None = None,
87 dt_from: datetime | None = None,
88 dt_to: datetime | None = None,
89 tool_id: Union[int, List[int]] | None = None,
90) -> List[Session]:
91 """
92 Get all usage events for enabled NEMO connectors as Sessions.
94 Loop through enabled NEMO connectors and return each one's usage events to
95 as :py:class:`~nexusLIMS.db.session_handler.Session` objects without
96 writing logs to the ``session_log`` table. Mostly used for doing dry runs
97 of the record builder.
99 Parameters
100 ----------
101 user
102 The user(s) for which to fetch usage events. If ``None``, events will
103 not be filtered by user at all
104 dt_from
105 The point in time after which usage events will be fetched. If ``None``,
106 no date filtering will be performed
107 dt_to
108 The point in time before which usage events will be fetched. If
109 ``None``, no date filtering will be performed
110 tool_id
111 The tools(s) for which to fetch usage events. If ``None``, events will
112 only be filtered by tools known in the NexusLIMS DB for each connector
113 """
114 sessions = []
115 for nemo_connector in get_harvesters_enabled():
116 events = nemo_connector.get_usage_events(
117 user=user,
118 dt_range=(dt_from, dt_to),
119 tool_id=tool_id,
120 )
121 for event in events:
122 this_session = nemo_connector.get_session_from_usage_event(event["id"])
123 # this_session could be None, and if the instrument from the
124 # usage event is not in our DB, this_session.instrument could
125 # also be None. In each case, we should ignore that one
126 if this_session is not None and this_session.instrument is not None:
127 sessions.append(this_session)
129 return sessions
132def get_connector_for_session(session: Session) -> NemoConnector:
133 """
134 Get the appropriate NEMO connector for a given Session.
136 Given a :py:class:`~nexusLIMS.db.session_handler.Session`, find the matching
137 :py:class:`~nexusLIMS.harvesters.nemo.connector.NemoConnector` from the enabled
138 list of NEMO harvesters.
140 Parameters
141 ----------
142 session
143 The session for which a NemoConnector is needed
145 Returns
146 -------
147 n : ~nexusLIMS.harvesters.nemo.connector.NemoConnector
148 The connector object that allows for querying the NEMO API for the
149 instrument contained in ``session``
151 Raises
152 ------
153 LookupError
154 Raised if a matching connector is not found
155 """
156 instr_base_url = urljoin(session.instrument.api_url, ".")
158 for nemo_connector in get_harvesters_enabled():
159 if nemo_connector.config["base_url"] in instr_base_url:
160 return nemo_connector
162 msg = (
163 f"Did not find enabled NEMO harvester for "
164 f'"{session.instrument.name}". Perhaps check environment '
165 f"variables? The following harvesters are enabled: "
166 f"{get_harvesters_enabled()}"
167 )
168 raise LookupError(msg)
171def get_connector_by_base_url(base_url: str) -> NemoConnector:
172 """
173 Get an enabled NemoConnector by inspecting the ``base_url``.
175 Parameters
176 ----------
177 base_url
178 A portion of the API url to search for
180 Returns
181 -------
182 n : ~nexusLIMS.harvesters.nemo.connector.NemoConnector
183 The enabled NemoConnector instance
185 Raises
186 ------
187 LookupError
188 Raised if a matching connector is not found
189 """
190 for nemo_connector in get_harvesters_enabled():
191 if base_url in nemo_connector.config["base_url"]:
192 return nemo_connector
194 msg = (
195 f"Did not find enabled NEMO harvester with url "
196 f'containing "{base_url}". Perhaps check environment '
197 f"variables? The following harvesters are enabled: "
198 f"{get_harvesters_enabled()}"
199 )
200 raise LookupError(msg)
203def process_res_question_samples(
204 res_dict: Dict,
205) -> Tuple[
206 List[str | None] | None,
207 List[str | None] | None,
208 List[str | None] | None,
209 List[str | None] | None,
210]:
211 """
212 Process sample information from reservation questions.
214 Parameters
215 ----------
216 res_dict
217 The reservation dictionary (i.e. the response from the ``reservations`` api
218 endpoint)
219 """
220 sample_details, sample_pid, sample_name, periodic_tables = [], [], [], []
221 sample_group = _get_res_question_value("sample_group", res_dict)
222 if sample_group is not None:
223 # multiple samples form will have
224 # res_dict['question_data']['sample_group']['user_input'] of form:
225 #
226 # _{
227 # _ "0": {
228 # _ "sample_name": "sample_pid_1",
229 # _ "sample_or_pid": "PID",
230 # _ "sample_details": "A sample with a PID and some more details"
231 # _ },
232 # _ "1": {
233 # _ "sample_name": "sample name 1",
234 # _ "sample_or_pid": "Sample Name",
235 # _ "sample_details": "A sample with name and some additional detail",
236 # _ "periodic_table": ["H", "Ti", "Cu", "Sb", "Re"]
237 # _ },
238 # _ ...
239 # _
240 # _}
241 # each key "0", "1", "2", etc. represents a single sample the user
242 # added via the "Add" button. There should always be at least one,
243 # since sample information is required
244 # the "periodic_table" key is optional, and won't be present if the
245 # user did not select anything in that section of the questions
246 for _, v in sample_group.items():
247 if v["sample_or_pid"].lower() == "pid":
248 sample_pid.append(v["sample_name"])
249 sample_name.append(None)
250 elif v["sample_or_pid"].lower() == "sample name":
251 sample_name.append(v["sample_name"])
252 sample_pid.append(None)
253 else:
254 sample_name.append(None)
255 sample_pid.append(None)
256 # as of NEMO 4.3.2, an empty textarea returns None rather than "",
257 # so check for None first, then test string length
258 if v["sample_details"] is not None and len(v["sample_details"]) > 0:
259 sample_details.append(v["sample_details"])
260 else:
261 sample_details.append(None)
262 if "periodic_table" in v:
263 periodic_tables.append(v["periodic_table"])
264 else:
265 periodic_tables.append(None)
266 else: # pragma: no cover
267 # non-multiple samples (old-style form) (this is deprecated,
268 # so doesn't need coverage since we don't have reservations in this
269 # style any longer)
270 sample_details = [_get_res_question_value("sample_details", res_dict)]
271 sample_pid = [None]
272 sample_name = [_get_res_question_value("sample_name", res_dict)]
273 return sample_details, sample_pid, sample_name, periodic_tables
276def _get_res_question_value(value: str, res_dict: Dict) -> Union[str, Dict] | None:
277 if "question_data" in res_dict and res_dict["question_data"] is not None:
278 if value in res_dict["question_data"]:
279 return res_dict["question_data"][value].get("user_input", None)
281 return None
283 return None
286def has_valid_question_data(event_dict: Dict, field: str = "run_data") -> bool:
287 """
288 Check if usage event has valid question data in specified field.
290 Works for both run_data and pre_run_data fields, which use identical structure.
292 A usage event has valid question data if:
293 1. It has the specified field
294 2. Field value is not None or empty string
295 3. Field value can be parsed as JSON
296 4. Parsed data is not empty
297 5. Parsed data has data_consent field (required)
299 Parameters
300 ----------
301 event_dict
302 The usage event dictionary from NEMO API
303 field
304 Field name to check ("run_data" or "pre_run_data")
306 Returns
307 -------
308 bool
309 True if usage event has valid question data in the specified field,
310 False otherwise
311 """
312 # Check field exists and is a non-empty string
313 if (
314 field not in event_dict
315 or event_dict[field] is None
316 or not isinstance(event_dict[field], str)
317 or len(event_dict[field]) == 0
318 ):
319 return False
321 # Try to parse JSON
322 try:
323 parsed_data = json.loads(event_dict[field])
324 except (json.JSONDecodeError, TypeError):
325 return False
327 # Check parsed data is a non-empty dict with data_consent field
328 return (
329 isinstance(parsed_data, dict)
330 and len(parsed_data) > 0
331 and "data_consent" in parsed_data
332 )
335def id_from_url(url: str) -> int | None:
336 """
337 Get the value of the id query parameter stored in URL string.
339 This is used to extract the value as needed from API strings.
341 Parameters
342 ----------
343 url
344 The URL to parse, such as
345 ``https://nemo.example.com/api/usage_events/?id=9``
347 Returns
348 -------
349 this_id : None or int
350 The id value if one is present, otherwise ``None``
351 """
352 query = parse_qs(urlparse(url).query)
353 if "id" in query:
354 return int(query["id"][0])
356 return None