Coverage for nexusLIMS/extractors/utils.py: 100%

195 statements  

« prev     ^ index     » next       coverage.py v7.11.3, created at 2026-03-24 05:23 +0000

1"""Methods (primarily intended to be private) that are used by the other extractors.""" 

2 

3import contextlib 

4import logging 

5import re 

6import shutil 

7import tarfile 

8from datetime import UTC, datetime 

9from decimal import Decimal, InvalidOperation 

10from pathlib import Path 

11from typing import Any, Dict, List 

12 

13from rsciio.digitalmicrograph._api import ( # pylint: disable=import-error,no-name-in-module 

14 DigitalMicrographReader, 

15 ImageObject, 

16) 

17 

18from nexusLIMS.instruments import Instrument, get_instr_from_filepath 

19from nexusLIMS.schemas.units import ureg 

20from nexusLIMS.utils.dicts import set_nested_dict_value, try_getting_dict_value 

21 

22_logger = logging.getLogger(__name__) 

23 

24 

25def _coerce_to_list(meta_key): 

26 if isinstance(meta_key, str): 

27 return [meta_key] 

28 return meta_key 

29 

30 

31def _get_mtime_iso(filename: Path, instrument: Instrument | None = None): 

32 return datetime.fromtimestamp( 

33 filename.stat().st_mtime, 

34 tz=instrument.timezone if instrument else UTC, 

35 ).isoformat() 

36 

37 

38def _set_instr_name_and_time(mdict: Dict, filename: Path): 

39 instr = get_instr_from_filepath(filename) 

40 # if we found the instrument, then store the name as string, else None 

41 instr_name = instr.name if instr is not None else None 

42 

43 mdict["nx_meta"]["Instrument ID"] = instr_name 

44 mdict["nx_meta"]["Creation Time"] = _get_mtime_iso(filename, instr) 

45 mdict["nx_meta"]["warnings"] = [] 

46 

47 

48def _set_acquisition_device_name(mdict: Dict, pre_path: List[str]): 

49 val = try_getting_dict_value(mdict, [*pre_path, "Acquisition", "Device", "Name"]) 

50 if val is None: 

51 val = try_getting_dict_value(mdict, [*pre_path, "DataBar", "Device Name"]) 

52 if val is not None: 

53 set_nested_dict_value(mdict, ["nx_meta", "Acquisition Device"], val) 

54 

55 

56def _set_exposure_time(mdict: Dict, pre_path: List[str]): 

57 val = try_getting_dict_value( 

58 mdict, 

59 [*pre_path, "Acquisition", "Parameters", "High Level", "Exposure (s)"], 

60 ) 

61 if val is None: 

62 val = try_getting_dict_value(mdict, [*pre_path, "DataBar", "Exposure Time (s)"]) 

63 if val is not None: 

64 # Convert to Pint Quantity with seconds unit 

65 with contextlib.suppress(ValueError, TypeError): 

66 val = ureg.Quantity(val, "second") 

67 set_nested_dict_value(mdict, ["nx_meta", "Exposure Time"], val) 

68 

69 

70def _set_gms_version(mdict: Dict, pre_path: List[str]): 

71 val = try_getting_dict_value(mdict, [*pre_path, "GMS Version", "Created"]) 

72 if val is not None: 

73 set_nested_dict_value(mdict, ["nx_meta", "GMS Version"], val) 

74 

75 

76def _set_camera_binning(mdict: Dict, pre_path: List[str]): 

77 val = try_getting_dict_value( 

78 mdict, 

79 [*pre_path, "Acquisition", "Parameters", "High Level", "Binning"], 

80 ) 

81 if val is not None: 

82 set_nested_dict_value(mdict, ["nx_meta", "Binning (Horizontal)"], val[0]) 

83 set_nested_dict_value(mdict, ["nx_meta", "Binning (Vertical)"], val[1]) 

84 

85 

86def _set_image_processing(mdict: Dict, pre_path: List[str]): 

87 # ImageTags.Acquisition.Parameters["High Level"].Processing will be 

88 # something like "Gain normalized" - not just for EELS so move this to 

89 # general 

90 val = try_getting_dict_value( 

91 mdict, 

92 [*pre_path, "Acquisition", "Parameters", "High Level", "Processing"], 

93 ) 

94 if val is not None: 

95 set_nested_dict_value(mdict, ["nx_meta", "Camera/Detector Processing"], val) 

96 

97 

98def _set_eels_meta(mdict, base, meta_key): 

99 val = try_getting_dict_value(mdict, base + meta_key) 

100 # only add the value to this list if we found it, and it's not 

101 # one of the "facility-wide" set values that do not have any meaning: 

102 if val is not None: 

103 field_name = meta_key[-1] 

104 # Convert to Pint Quantity if the field has units 

105 unit_map = { 

106 "Exposure (s)": "second", 

107 "Integration time (s)": "second", 

108 "Collection semi-angle (mrad)": "milliradian", 

109 "Convergence semi-angle (mrad)": "milliradian", 

110 } 

111 if field_name in unit_map: 

112 with contextlib.suppress(ValueError, TypeError): 

113 val = ureg.Quantity(val, unit_map[field_name]) 

114 # Remove unit suffix from field name 

115 field_name = field_name.rsplit(" (", 1)[0] 

116 # add last value of each parameter to the "EELS" sub-tree of nx_meta 

117 set_nested_dict_value(mdict, ["nx_meta", "EELS", field_name], val) 

118 

119 

120def _set_eels_spectrometer_meta(mdict, base, meta_key): 

121 val = try_getting_dict_value(mdict, base + meta_key) 

122 if val is not None: 

123 field_name = meta_key[0] 

124 # Convert to Pint Quantity if the field has units 

125 unit_map = { 

126 "Energy loss (eV)": "electron_volt", 

127 "Drift tube voltage (V)": "volt", 

128 "Slit width (eV)": "electron_volt", 

129 "Prism offset (V)": "volt", 

130 } 

131 if field_name in unit_map: 

132 with contextlib.suppress(ValueError, TypeError): 

133 val = ureg.Quantity(val, unit_map[field_name]) 

134 # Remove unit suffix from field name 

135 field_name = field_name.rsplit(" (", 1)[0] 

136 # add last value of each param to the "EELS" sub-tree of nx_meta 

137 set_nested_dict_value( 

138 mdict, 

139 ["nx_meta", "EELS", "Spectrometer " + field_name], 

140 val, 

141 ) 

142 

143 

144def _set_eels_processing(mdict, pre_path): 

145 # Process known tags under "processing": 

146 # ImageTags.Processing will be a list of things done (in multiple 

147 # TagGroups) - things like Compute thickness, etc. 

148 val = try_getting_dict_value(mdict, [*pre_path, "Processing"]) 

149 if val is not None and isinstance(val, dict): 

150 # if val is a dict, then there were processing steps applied 

151 eels_ops = [] 

152 for _, v in val.items(): 

153 # k will be TagGroup0, TagGroup1, etc. 

154 # v will be dictionaries specifying the process step 

155 # AlignSIByPeak, DataPicker, SpectrumCalibrate, 

156 # Compute Thickness, Background Removal, Signal Integration 

157 operation = v["Operation"] 

158 param = v["Parameters"] 

159 if operation == "AlignSIByPeak": 

160 eels_ops.append("Aligned parent SI By Peak") 

161 elif operation == "Background Removal": 

162 val = try_getting_dict_value(param, ["Model"]) 

163 if val is not None: 

164 set_nested_dict_value( 

165 mdict, 

166 ["nx_meta", "EELS", "Background Removal Model"], 

167 val, 

168 ) 

169 eels_ops.append(operation) 

170 elif operation == "SpectrumCalibrate": 

171 eels_ops.append("Calibrated Post-acquisition") 

172 elif operation == "Compute Thickness": 

173 mdict = _process_thickness_metadata(mdict, [*pre_path, "EELS"]) 

174 eels_ops.append(operation) 

175 elif operation == "DataPicker": 

176 eels_ops.append("Extracted from SI") 

177 elif operation == "Signal Integration": 

178 eels_ops.append(operation) 

179 if eels_ops: 

180 # remove duplicates (convert to set) and sort alphabetically: 

181 set_nested_dict_value( 

182 mdict, 

183 ["nx_meta", "EELS", "Processing Steps"], 

184 ", ".join(sorted(set(eels_ops))), 

185 ) 

186 

187 

188def _process_thickness_metadata(mdict, base): 

189 abs_thick = try_getting_dict_value( 

190 mdict, 

191 [*base, "Thickness", "Absolute", "Measurement"], 

192 ) 

193 abs_units = try_getting_dict_value(mdict, [*base, "Thickness", "Absolute", "Units"]) 

194 abs_mfp = try_getting_dict_value( 

195 mdict, 

196 [*base, "Thickness", "Absolute", "Mean Free Path"], 

197 ) 

198 rel_thick = try_getting_dict_value( 

199 mdict, 

200 [*base, "Thickness", "Relative", "Measurement"], 

201 ) 

202 if abs_thick is not None: 

203 set_nested_dict_value( 

204 mdict, 

205 ["nx_meta", "EELS", f"Thickness (absolute) [{abs_units}]"], 

206 abs_thick, 

207 ) 

208 if abs_mfp is not None: 

209 set_nested_dict_value( 

210 mdict, 

211 ["nx_meta", "EELS", "Thickness (absolute) mean free path"], 

212 abs_mfp[0], 

213 ) 

214 if rel_thick is not None: 

215 set_nested_dict_value( 

216 mdict, 

217 ["nx_meta", "EELS", "Thickness (relative) [t/λ]"], 

218 rel_thick, 

219 ) 

220 

221 return mdict 

222 

223 

224def _set_eds_meta(mdict, base, meta_key): 

225 val = try_getting_dict_value(mdict, base + meta_key) 

226 # only add the value to this list if we found it, and it's not 

227 # one of the "facility-wide" set values that do not have any meaning: 

228 if val is not None: 

229 field_name = meta_key[-1] if len(meta_key) > 1 else meta_key[0] 

230 # Convert to Pint Quantity if the field has units 

231 unit_map = { 

232 "Dispersion (eV)": "electron_volt", 

233 "Energy Cutoff (V)": "volt", 

234 "Exposure (s)": "second", 

235 "Azimuthal angle": "degree", 

236 "Elevation angle": "degree", 

237 "Incidence angle": "degree", 

238 "Stage tilt": "degree", 

239 "Live time": "second", 

240 "Real time": "second", 

241 } 

242 if field_name in unit_map: 

243 with contextlib.suppress(ValueError, TypeError): 

244 val = ureg.Quantity(val, unit_map[field_name]) 

245 # Remove unit suffix from field name if present 

246 field_name = field_name.rsplit(" (", 1)[0] 

247 # add last value of each parameter to the "EDS" sub-tree of nx_meta 

248 set_nested_dict_value( 

249 mdict, 

250 ["nx_meta", "EDS", field_name], 

251 val, 

252 ) 

253 

254 

255def _set_si_meta(mdict, pre_path, meta_key): 

256 val = try_getting_dict_value(mdict, [*pre_path, "SI", *meta_key]) 

257 if val is not None: 

258 field_name = meta_key[-1] 

259 # Convert to Pint Quantity if the field has units 

260 unit_map = { 

261 "Dispersion (eV)": "electron_volt", 

262 "Energy Cutoff (V)": "volt", 

263 "Exposure (s)": "second", 

264 } 

265 if field_name in unit_map: 

266 with contextlib.suppress(ValueError, TypeError): 

267 val = ureg.Quantity(val, unit_map[field_name]) 

268 # Remove unit suffix from field name 

269 field_name = field_name.rsplit(" (", 1)[0] 

270 # add last value of each parameter to the "EDS" sub-tree of 

271 # nx_meta 

272 set_nested_dict_value(mdict, ["nx_meta", "EDS", field_name], val) 

273 

274 

275def _try_decimal(val): 

276 try: 

277 val = Decimal(val) 

278 val = float(val) 

279 except (ValueError, InvalidOperation): 

280 pass 

281 return val 

282 

283 

284def _parse_filter_settings(info_dict, tecnai_info): 

285 try: 

286 info_dict["Filter_Settings"] = {} 

287 tecnai_filter_info = tecnai_info[ 

288 tecnai_info.index("Filter related settings:") + 1 : 

289 ] 

290 # String 

291 info_dict["Filter_Settings"]["Mode"] = _find_val("Mode: ", tecnai_filter_info) 

292 # Decimal (eV/channel) # noqa: ERA001 

293 tmp = _find_val("Selected dispersion: ", tecnai_filter_info) 

294 if tmp is not None: 

295 tmp = re.sub(r"\[eV/Channel\]", "", tmp) 

296 info_dict["Filter_Settings"]["Dispersion"] = _try_decimal(tmp) 

297 

298 # Decimal (millimeter) # noqa: ERA001 

299 tmp = _find_val("Selected aperture: ", tecnai_filter_info) 

300 if tmp is not None: 

301 tmp = tmp.strip("m") 

302 info_dict["Filter_Settings"]["Aperture"] = _try_decimal(tmp) 

303 

304 # Decimal (eV) # noqa: ERA001 

305 tmp = _find_val("Prism shift: ", tecnai_filter_info) 

306 if tmp is not None: 

307 tmp = re.sub(r"\[eV\]", "", tmp) 

308 info_dict["Filter_Settings"]["Prism_Shift"] = _try_decimal(tmp) 

309 

310 # Decimal (eV) # noqa: ERA001 

311 tmp = _find_val("Drift tube: ", tecnai_filter_info) 

312 if tmp is not None: 

313 tmp = re.sub(r"\[eV\]", "", tmp) 

314 info_dict["Filter_Settings"]["Drift_Tube"] = _try_decimal(tmp) 

315 

316 # Decimal (eV) # noqa: ERA001 

317 tmp = _find_val("Total energy loss: ", tecnai_filter_info) 

318 if tmp is not None: 

319 tmp = re.sub(r"\[eV\]", "", tmp) 

320 info_dict["Filter_Settings"]["Total_Energy_Loss"] = _try_decimal(tmp) 

321 except ValueError: 

322 _logger.info("Filter settings not found in Tecnai microscope info") 

323 

324 return info_dict 

325 

326 

327def _zero_data_in_dm3( 

328 filename: Path, 

329 out_filename: Path | None = None, 

330 *, 

331 compress=True, 

332) -> Path: 

333 """ 

334 Zero out data in a DM3 file. 

335 

336 Helper method that will overwrite the data in a dm3 image file with 

337 zeros and save it as either another dm3, or as a compressed archive (used 

338 for creating files for the test suite that don't take up tons of space). 

339 Since the resulting file is just some text metadata and zeros, it should 

340 be highly compressible (initial tests allowed for a 16MB file to be 

341 compressed to ~100KB). 

342 

343 Parameters 

344 ---------- 

345 filename 

346 Path to file to be modified 

347 out_filename 

348 Name with which to save the output file. If None, it will be 

349 automatically generated from the ``filename``. 

350 compress 

351 Whether to compress the files into a tar.gz file 

352 

353 Returns 

354 ------- 

355 Path 

356 The path of the compressed (or zeroed) file 

357 """ 

358 # zero out extent of data in DM3 file and compress to tar.gz: 

359 if not out_filename: 

360 mod_fname = filename.parent / (filename.stem + "_dataZeroed" + filename.suffix) 

361 else: 

362 mod_fname = out_filename 

363 

364 shutil.copyfile(filename, mod_fname) 

365 

366 # Do some lower-level reading on the .dm3 file to get the ImageObject refs 

367 with filename.open(mode="rb") as f: 

368 dm_reader = DigitalMicrographReader(f) 

369 dm_reader.parse_file() 

370 images = [ 

371 ImageObject(im_dict, f) for im_dict in dm_reader.get_image_dictionaries() 

372 ] 

373 

374 # write zeros to the file in the data block (offset + size in bytes 

375 # information is obtained from the ImageObject ref) 

376 # NB: currently this is just tested for single-image .dm3 files. Spectra 

377 # and image stacks will probably work differently. 

378 with mod_fname.open(mode="r+b") as f: 

379 f.seek(images[0].imdict.ImageData.Data.offset) 

380 f.write(b"\x00" * images[0].imdict.ImageData.Data.size_bytes) 

381 

382 # compress the output, if requested 

383 if compress: 

384 tar_path = Path(f"{mod_fname}.tar.gz") 

385 with tarfile.open(tar_path, "w:gz") as tar: 

386 tar.add(mod_fname) 

387 out_fpath = tar_path 

388 mod_fname.unlink() 

389 else: 

390 out_fpath = mod_fname 

391 

392 return out_fpath 

393 

394 

395def _find_val(s_to_find, list_to_search): 

396 """ 

397 Find a value in a list. 

398 

399 Return the first value in list_to_search that contains s_to_find, or 

400 None if it is not found. 

401 

402 Note: If needed, this could be improved to use regex instead, which 

403 would provide more control over the patterns to return 

404 """ 

405 res = [x for x in list_to_search if s_to_find in x] 

406 if len(res) > 0: 

407 res = res[0] 

408 # remove the string we searched for from the beginning of the res 

409 return re.sub("^" + s_to_find, "", res) 

410 

411 return None 

412 

413 

414# Field categorization helpers for schema-based metadata extraction 

415 

416 

417def add_to_extensions(nx_meta: dict, field_name: str, value: Any) -> None: 

418 """ 

419 Add a field to the extensions section of nx_meta. 

420 

421 This is a convenience function that ensures the extensions dict exists 

422 before adding a field. Use this for vendor-specific, instrument-specific, 

423 or facility-specific metadata that doesn't fit the core schema. 

424 

425 Parameters 

426 ---------- 

427 nx_meta : dict 

428 The nx_meta dictionary being built by the extractor. Will be modified 

429 in place to add the field to the extensions section. 

430 field_name : str 

431 Name of the field to add. Use descriptive names that clearly indicate 

432 the field's meaning (e.g., 'quanta_spot_size', 'detector_contrast'). 

433 value : Any 

434 The value to store. Can be any JSON-serializable type, including 

435 Pint Quantity objects which will be automatically serialized. 

436 

437 Examples 

438 -------- 

439 Add vendor-specific fields during metadata extraction: 

440 

441 >>> nx_meta = { 

442 ... "DatasetType": "Image", 

443 ... "Data Type": "SEM_Imaging", 

444 ... "Creation Time": "2024-01-15T10:30:00-05:00", 

445 ... } 

446 >>> add_to_extensions(nx_meta, "spot_size", 3.5) 

447 >>> add_to_extensions(nx_meta, "detector_contrast", 50.0) 

448 >>> nx_meta["extensions"] 

449 {'spot_size': 3.5, 'detector_contrast': 50.0} 

450 

451 Works with Pint Quantities: 

452 

453 >>> from nexusLIMS.schemas.units import ureg 

454 >>> add_to_extensions(nx_meta, "chamber_pressure", ureg.Quantity(79.8, "pascal")) 

455 

456 Notes 

457 ----- 

458 The extensions section preserves all metadata that doesn't fit the core 

459 schema, ensuring no data loss during extraction. Extensions are included 

460 in the XML output and preserved through the record building process. 

461 """ 

462 # Ensure extensions dict exists 

463 if "extensions" not in nx_meta: 

464 nx_meta["extensions"] = {} 

465 

466 # Add the field 

467 nx_meta["extensions"][field_name] = value