Coverage for nexusLIMS/harvesters/nemo/utils.py: 100%

77 statements  

« prev     ^ index     » next       coverage.py v7.11.3, created at 2026-03-24 05:23 +0000

1"""Various utility functions used by the NEMO harvester.""" 

2 

3import json 

4import logging 

5from datetime import datetime 

6from typing import Dict, List, Tuple, Union 

7from urllib.parse import parse_qs, urljoin, urlparse 

8 

9from nexusLIMS.config import settings 

10from nexusLIMS.db.session_handler import Session 

11 

12from .connector import NemoConnector 

13 

14_logger = logging.getLogger(__name__) 

15 

16 

17def get_harvesters_enabled() -> List[NemoConnector]: 

18 """ 

19 Return a list of enabled connectors based off the environment. 

20 

21 Returns 

22 ------- 

23 harvesters_enabled : List[NemoConnector] 

24 A list of NemoConnector objects representing the NEMO APIs enabled 

25 via environment settings 

26 """ 

27 return [ 

28 NemoConnector( 

29 base_url=str(config.address), 

30 token=config.token, 

31 strftime_fmt=( 

32 config.strftime_fmt 

33 if config.strftime_fmt != "%Y-%m-%dT%H:%M:%S%z" 

34 else None 

35 ), 

36 strptime_fmt=( 

37 config.strptime_fmt 

38 if config.strptime_fmt != "%Y-%m-%dT%H:%M:%S%z" 

39 else None 

40 ), 

41 timezone=config.tz, 

42 ) 

43 for config in settings.nemo_harvesters().values() 

44 ] 

45 

46 

47def add_all_usage_events_to_db( 

48 user: Union[str, int] | None = None, 

49 dt_from: datetime | None = None, 

50 dt_to: datetime | None = None, 

51 tool_id: Union[int, List[int]] | None = None, 

52): 

53 """ 

54 Add all usage events to database for enabled NEMO connectors. 

55 

56 Loop through enabled NEMO connectors and add each one's usage events to 

57 the NexusLIMS ``session_log`` database table (if required). 

58 

59 Parameters 

60 ---------- 

61 user 

62 The user(s) for which to add usage events. If ``None``, events will 

63 not be filtered by user at all 

64 dt_from 

65 The point in time after which usage events will be added. If ``None``, 

66 no date filtering will be performed 

67 dt_to 

68 The point in time before which usage events will be added. If 

69 ``None``, no date filtering will be performed 

70 tool_id 

71 The tools(s) for which to add usage events. If ``'None'`` (default), 

72 the tool IDs for each instrument in the NexusLIMS DB will be extracted 

73 and used to limit the API response 

74 """ 

75 for nemo_connector in get_harvesters_enabled(): 

76 events = nemo_connector.get_usage_events( 

77 user=user, 

78 dt_range=(dt_from, dt_to), 

79 tool_id=tool_id, 

80 ) 

81 for event in events: 

82 nemo_connector.write_usage_event_to_session_log(event["id"]) 

83 

84 

85def get_usage_events_as_sessions( 

86 user: Union[str, int] | None = None, 

87 dt_from: datetime | None = None, 

88 dt_to: datetime | None = None, 

89 tool_id: Union[int, List[int]] | None = None, 

90) -> List[Session]: 

91 """ 

92 Get all usage events for enabled NEMO connectors as Sessions. 

93 

94 Loop through enabled NEMO connectors and return each one's usage events to 

95 as :py:class:`~nexusLIMS.db.session_handler.Session` objects without 

96 writing logs to the ``session_log`` table. Mostly used for doing dry runs 

97 of the record builder. 

98 

99 Parameters 

100 ---------- 

101 user 

102 The user(s) for which to fetch usage events. If ``None``, events will 

103 not be filtered by user at all 

104 dt_from 

105 The point in time after which usage events will be fetched. If ``None``, 

106 no date filtering will be performed 

107 dt_to 

108 The point in time before which usage events will be fetched. If 

109 ``None``, no date filtering will be performed 

110 tool_id 

111 The tools(s) for which to fetch usage events. If ``None``, events will 

112 only be filtered by tools known in the NexusLIMS DB for each connector 

113 """ 

114 sessions = [] 

115 for nemo_connector in get_harvesters_enabled(): 

116 events = nemo_connector.get_usage_events( 

117 user=user, 

118 dt_range=(dt_from, dt_to), 

119 tool_id=tool_id, 

120 ) 

121 for event in events: 

122 this_session = nemo_connector.get_session_from_usage_event(event["id"]) 

123 # this_session could be None, and if the instrument from the 

124 # usage event is not in our DB, this_session.instrument could 

125 # also be None. In each case, we should ignore that one 

126 if this_session is not None and this_session.instrument is not None: 

127 sessions.append(this_session) 

128 

129 return sessions 

130 

131 

132def get_connector_for_session(session: Session) -> NemoConnector: 

133 """ 

134 Get the appropriate NEMO connector for a given Session. 

135 

136 Given a :py:class:`~nexusLIMS.db.session_handler.Session`, find the matching 

137 :py:class:`~nexusLIMS.harvesters.nemo.connector.NemoConnector` from the enabled 

138 list of NEMO harvesters. 

139 

140 Parameters 

141 ---------- 

142 session 

143 The session for which a NemoConnector is needed 

144 

145 Returns 

146 ------- 

147 n : ~nexusLIMS.harvesters.nemo.connector.NemoConnector 

148 The connector object that allows for querying the NEMO API for the 

149 instrument contained in ``session`` 

150 

151 Raises 

152 ------ 

153 LookupError 

154 Raised if a matching connector is not found 

155 """ 

156 instr_base_url = urljoin(session.instrument.api_url, ".") 

157 

158 for nemo_connector in get_harvesters_enabled(): 

159 if nemo_connector.config["base_url"] in instr_base_url: 

160 return nemo_connector 

161 

162 msg = ( 

163 f"Did not find enabled NEMO harvester for " 

164 f'"{session.instrument.name}". Perhaps check environment ' 

165 f"variables? The following harvesters are enabled: " 

166 f"{get_harvesters_enabled()}" 

167 ) 

168 raise LookupError(msg) 

169 

170 

171def get_connector_by_base_url(base_url: str) -> NemoConnector: 

172 """ 

173 Get an enabled NemoConnector by inspecting the ``base_url``. 

174 

175 Parameters 

176 ---------- 

177 base_url 

178 A portion of the API url to search for 

179 

180 Returns 

181 ------- 

182 n : ~nexusLIMS.harvesters.nemo.connector.NemoConnector 

183 The enabled NemoConnector instance 

184 

185 Raises 

186 ------ 

187 LookupError 

188 Raised if a matching connector is not found 

189 """ 

190 for nemo_connector in get_harvesters_enabled(): 

191 if base_url in nemo_connector.config["base_url"]: 

192 return nemo_connector 

193 

194 msg = ( 

195 f"Did not find enabled NEMO harvester with url " 

196 f'containing "{base_url}". Perhaps check environment ' 

197 f"variables? The following harvesters are enabled: " 

198 f"{get_harvesters_enabled()}" 

199 ) 

200 raise LookupError(msg) 

201 

202 

203def process_res_question_samples( 

204 res_dict: Dict, 

205) -> Tuple[ 

206 List[str | None] | None, 

207 List[str | None] | None, 

208 List[str | None] | None, 

209 List[str | None] | None, 

210]: 

211 """ 

212 Process sample information from reservation questions. 

213 

214 Parameters 

215 ---------- 

216 res_dict 

217 The reservation dictionary (i.e. the response from the ``reservations`` api 

218 endpoint) 

219 """ 

220 sample_details, sample_pid, sample_name, periodic_tables = [], [], [], [] 

221 sample_group = _get_res_question_value("sample_group", res_dict) 

222 if sample_group is not None: 

223 # multiple samples form will have 

224 # res_dict['question_data']['sample_group']['user_input'] of form: 

225 # 

226 # _{ 

227 # _ "0": { 

228 # _ "sample_name": "sample_pid_1", 

229 # _ "sample_or_pid": "PID", 

230 # _ "sample_details": "A sample with a PID and some more details" 

231 # _ }, 

232 # _ "1": { 

233 # _ "sample_name": "sample name 1", 

234 # _ "sample_or_pid": "Sample Name", 

235 # _ "sample_details": "A sample with name and some additional detail", 

236 # _ "periodic_table": ["H", "Ti", "Cu", "Sb", "Re"] 

237 # _ }, 

238 # _ ... 

239 # _ 

240 # _} 

241 # each key "0", "1", "2", etc. represents a single sample the user 

242 # added via the "Add" button. There should always be at least one, 

243 # since sample information is required 

244 # the "periodic_table" key is optional, and won't be present if the 

245 # user did not select anything in that section of the questions 

246 for _, v in sample_group.items(): 

247 if v["sample_or_pid"].lower() == "pid": 

248 sample_pid.append(v["sample_name"]) 

249 sample_name.append(None) 

250 elif v["sample_or_pid"].lower() == "sample name": 

251 sample_name.append(v["sample_name"]) 

252 sample_pid.append(None) 

253 else: 

254 sample_name.append(None) 

255 sample_pid.append(None) 

256 # as of NEMO 4.3.2, an empty textarea returns None rather than "", 

257 # so check for None first, then test string length 

258 if v["sample_details"] is not None and len(v["sample_details"]) > 0: 

259 sample_details.append(v["sample_details"]) 

260 else: 

261 sample_details.append(None) 

262 if "periodic_table" in v: 

263 periodic_tables.append(v["periodic_table"]) 

264 else: 

265 periodic_tables.append(None) 

266 else: # pragma: no cover 

267 # non-multiple samples (old-style form) (this is deprecated, 

268 # so doesn't need coverage since we don't have reservations in this 

269 # style any longer) 

270 sample_details = [_get_res_question_value("sample_details", res_dict)] 

271 sample_pid = [None] 

272 sample_name = [_get_res_question_value("sample_name", res_dict)] 

273 return sample_details, sample_pid, sample_name, periodic_tables 

274 

275 

276def _get_res_question_value(value: str, res_dict: Dict) -> Union[str, Dict] | None: 

277 if "question_data" in res_dict and res_dict["question_data"] is not None: 

278 if value in res_dict["question_data"]: 

279 return res_dict["question_data"][value].get("user_input", None) 

280 

281 return None 

282 

283 return None 

284 

285 

286def has_valid_question_data(event_dict: Dict, field: str = "run_data") -> bool: 

287 """ 

288 Check if usage event has valid question data in specified field. 

289 

290 Works for both run_data and pre_run_data fields, which use identical structure. 

291 

292 A usage event has valid question data if: 

293 1. It has the specified field 

294 2. Field value is not None or empty string 

295 3. Field value can be parsed as JSON 

296 4. Parsed data is not empty 

297 5. Parsed data has data_consent field (required) 

298 

299 Parameters 

300 ---------- 

301 event_dict 

302 The usage event dictionary from NEMO API 

303 field 

304 Field name to check ("run_data" or "pre_run_data") 

305 

306 Returns 

307 ------- 

308 bool 

309 True if usage event has valid question data in the specified field, 

310 False otherwise 

311 """ 

312 # Check field exists and is a non-empty string 

313 if ( 

314 field not in event_dict 

315 or event_dict[field] is None 

316 or not isinstance(event_dict[field], str) 

317 or len(event_dict[field]) == 0 

318 ): 

319 return False 

320 

321 # Try to parse JSON 

322 try: 

323 parsed_data = json.loads(event_dict[field]) 

324 except (json.JSONDecodeError, TypeError): 

325 return False 

326 

327 # Check parsed data is a non-empty dict with data_consent field 

328 return ( 

329 isinstance(parsed_data, dict) 

330 and len(parsed_data) > 0 

331 and "data_consent" in parsed_data 

332 ) 

333 

334 

335def id_from_url(url: str) -> int | None: 

336 """ 

337 Get the value of the id query parameter stored in URL string. 

338 

339 This is used to extract the value as needed from API strings. 

340 

341 Parameters 

342 ---------- 

343 url 

344 The URL to parse, such as 

345 ``https://nemo.example.com/api/usage_events/?id=9`` 

346 

347 Returns 

348 ------- 

349 this_id : None or int 

350 The id value if one is present, otherwise ``None`` 

351 """ 

352 query = parse_qs(urlparse(url).query) 

353 if "id" in query: 

354 return int(query["id"][0]) 

355 

356 return None