Coverage for nexusLIMS/extractors/utils.py: 100%
195 statements
« prev ^ index » next coverage.py v7.11.3, created at 2026-03-24 05:23 +0000
« prev ^ index » next coverage.py v7.11.3, created at 2026-03-24 05:23 +0000
1"""Methods (primarily intended to be private) that are used by the other extractors."""
3import contextlib
4import logging
5import re
6import shutil
7import tarfile
8from datetime import UTC, datetime
9from decimal import Decimal, InvalidOperation
10from pathlib import Path
11from typing import Any, Dict, List
13from rsciio.digitalmicrograph._api import ( # pylint: disable=import-error,no-name-in-module
14 DigitalMicrographReader,
15 ImageObject,
16)
18from nexusLIMS.instruments import Instrument, get_instr_from_filepath
19from nexusLIMS.schemas.units import ureg
20from nexusLIMS.utils.dicts import set_nested_dict_value, try_getting_dict_value
22_logger = logging.getLogger(__name__)
25def _coerce_to_list(meta_key):
26 if isinstance(meta_key, str):
27 return [meta_key]
28 return meta_key
31def _get_mtime_iso(filename: Path, instrument: Instrument | None = None):
32 return datetime.fromtimestamp(
33 filename.stat().st_mtime,
34 tz=instrument.timezone if instrument else UTC,
35 ).isoformat()
38def _set_instr_name_and_time(mdict: Dict, filename: Path):
39 instr = get_instr_from_filepath(filename)
40 # if we found the instrument, then store the name as string, else None
41 instr_name = instr.name if instr is not None else None
43 mdict["nx_meta"]["Instrument ID"] = instr_name
44 mdict["nx_meta"]["Creation Time"] = _get_mtime_iso(filename, instr)
45 mdict["nx_meta"]["warnings"] = []
48def _set_acquisition_device_name(mdict: Dict, pre_path: List[str]):
49 val = try_getting_dict_value(mdict, [*pre_path, "Acquisition", "Device", "Name"])
50 if val is None:
51 val = try_getting_dict_value(mdict, [*pre_path, "DataBar", "Device Name"])
52 if val is not None:
53 set_nested_dict_value(mdict, ["nx_meta", "Acquisition Device"], val)
56def _set_exposure_time(mdict: Dict, pre_path: List[str]):
57 val = try_getting_dict_value(
58 mdict,
59 [*pre_path, "Acquisition", "Parameters", "High Level", "Exposure (s)"],
60 )
61 if val is None:
62 val = try_getting_dict_value(mdict, [*pre_path, "DataBar", "Exposure Time (s)"])
63 if val is not None:
64 # Convert to Pint Quantity with seconds unit
65 with contextlib.suppress(ValueError, TypeError):
66 val = ureg.Quantity(val, "second")
67 set_nested_dict_value(mdict, ["nx_meta", "Exposure Time"], val)
70def _set_gms_version(mdict: Dict, pre_path: List[str]):
71 val = try_getting_dict_value(mdict, [*pre_path, "GMS Version", "Created"])
72 if val is not None:
73 set_nested_dict_value(mdict, ["nx_meta", "GMS Version"], val)
76def _set_camera_binning(mdict: Dict, pre_path: List[str]):
77 val = try_getting_dict_value(
78 mdict,
79 [*pre_path, "Acquisition", "Parameters", "High Level", "Binning"],
80 )
81 if val is not None:
82 set_nested_dict_value(mdict, ["nx_meta", "Binning (Horizontal)"], val[0])
83 set_nested_dict_value(mdict, ["nx_meta", "Binning (Vertical)"], val[1])
86def _set_image_processing(mdict: Dict, pre_path: List[str]):
87 # ImageTags.Acquisition.Parameters["High Level"].Processing will be
88 # something like "Gain normalized" - not just for EELS so move this to
89 # general
90 val = try_getting_dict_value(
91 mdict,
92 [*pre_path, "Acquisition", "Parameters", "High Level", "Processing"],
93 )
94 if val is not None:
95 set_nested_dict_value(mdict, ["nx_meta", "Camera/Detector Processing"], val)
98def _set_eels_meta(mdict, base, meta_key):
99 val = try_getting_dict_value(mdict, base + meta_key)
100 # only add the value to this list if we found it, and it's not
101 # one of the "facility-wide" set values that do not have any meaning:
102 if val is not None:
103 field_name = meta_key[-1]
104 # Convert to Pint Quantity if the field has units
105 unit_map = {
106 "Exposure (s)": "second",
107 "Integration time (s)": "second",
108 "Collection semi-angle (mrad)": "milliradian",
109 "Convergence semi-angle (mrad)": "milliradian",
110 }
111 if field_name in unit_map:
112 with contextlib.suppress(ValueError, TypeError):
113 val = ureg.Quantity(val, unit_map[field_name])
114 # Remove unit suffix from field name
115 field_name = field_name.rsplit(" (", 1)[0]
116 # add last value of each parameter to the "EELS" sub-tree of nx_meta
117 set_nested_dict_value(mdict, ["nx_meta", "EELS", field_name], val)
120def _set_eels_spectrometer_meta(mdict, base, meta_key):
121 val = try_getting_dict_value(mdict, base + meta_key)
122 if val is not None:
123 field_name = meta_key[0]
124 # Convert to Pint Quantity if the field has units
125 unit_map = {
126 "Energy loss (eV)": "electron_volt",
127 "Drift tube voltage (V)": "volt",
128 "Slit width (eV)": "electron_volt",
129 "Prism offset (V)": "volt",
130 }
131 if field_name in unit_map:
132 with contextlib.suppress(ValueError, TypeError):
133 val = ureg.Quantity(val, unit_map[field_name])
134 # Remove unit suffix from field name
135 field_name = field_name.rsplit(" (", 1)[0]
136 # add last value of each param to the "EELS" sub-tree of nx_meta
137 set_nested_dict_value(
138 mdict,
139 ["nx_meta", "EELS", "Spectrometer " + field_name],
140 val,
141 )
144def _set_eels_processing(mdict, pre_path):
145 # Process known tags under "processing":
146 # ImageTags.Processing will be a list of things done (in multiple
147 # TagGroups) - things like Compute thickness, etc.
148 val = try_getting_dict_value(mdict, [*pre_path, "Processing"])
149 if val is not None and isinstance(val, dict):
150 # if val is a dict, then there were processing steps applied
151 eels_ops = []
152 for _, v in val.items():
153 # k will be TagGroup0, TagGroup1, etc.
154 # v will be dictionaries specifying the process step
155 # AlignSIByPeak, DataPicker, SpectrumCalibrate,
156 # Compute Thickness, Background Removal, Signal Integration
157 operation = v["Operation"]
158 param = v["Parameters"]
159 if operation == "AlignSIByPeak":
160 eels_ops.append("Aligned parent SI By Peak")
161 elif operation == "Background Removal":
162 val = try_getting_dict_value(param, ["Model"])
163 if val is not None:
164 set_nested_dict_value(
165 mdict,
166 ["nx_meta", "EELS", "Background Removal Model"],
167 val,
168 )
169 eels_ops.append(operation)
170 elif operation == "SpectrumCalibrate":
171 eels_ops.append("Calibrated Post-acquisition")
172 elif operation == "Compute Thickness":
173 mdict = _process_thickness_metadata(mdict, [*pre_path, "EELS"])
174 eels_ops.append(operation)
175 elif operation == "DataPicker":
176 eels_ops.append("Extracted from SI")
177 elif operation == "Signal Integration":
178 eels_ops.append(operation)
179 if eels_ops:
180 # remove duplicates (convert to set) and sort alphabetically:
181 set_nested_dict_value(
182 mdict,
183 ["nx_meta", "EELS", "Processing Steps"],
184 ", ".join(sorted(set(eels_ops))),
185 )
188def _process_thickness_metadata(mdict, base):
189 abs_thick = try_getting_dict_value(
190 mdict,
191 [*base, "Thickness", "Absolute", "Measurement"],
192 )
193 abs_units = try_getting_dict_value(mdict, [*base, "Thickness", "Absolute", "Units"])
194 abs_mfp = try_getting_dict_value(
195 mdict,
196 [*base, "Thickness", "Absolute", "Mean Free Path"],
197 )
198 rel_thick = try_getting_dict_value(
199 mdict,
200 [*base, "Thickness", "Relative", "Measurement"],
201 )
202 if abs_thick is not None:
203 set_nested_dict_value(
204 mdict,
205 ["nx_meta", "EELS", f"Thickness (absolute) [{abs_units}]"],
206 abs_thick,
207 )
208 if abs_mfp is not None:
209 set_nested_dict_value(
210 mdict,
211 ["nx_meta", "EELS", "Thickness (absolute) mean free path"],
212 abs_mfp[0],
213 )
214 if rel_thick is not None:
215 set_nested_dict_value(
216 mdict,
217 ["nx_meta", "EELS", "Thickness (relative) [t/λ]"],
218 rel_thick,
219 )
221 return mdict
224def _set_eds_meta(mdict, base, meta_key):
225 val = try_getting_dict_value(mdict, base + meta_key)
226 # only add the value to this list if we found it, and it's not
227 # one of the "facility-wide" set values that do not have any meaning:
228 if val is not None:
229 field_name = meta_key[-1] if len(meta_key) > 1 else meta_key[0]
230 # Convert to Pint Quantity if the field has units
231 unit_map = {
232 "Dispersion (eV)": "electron_volt",
233 "Energy Cutoff (V)": "volt",
234 "Exposure (s)": "second",
235 "Azimuthal angle": "degree",
236 "Elevation angle": "degree",
237 "Incidence angle": "degree",
238 "Stage tilt": "degree",
239 "Live time": "second",
240 "Real time": "second",
241 }
242 if field_name in unit_map:
243 with contextlib.suppress(ValueError, TypeError):
244 val = ureg.Quantity(val, unit_map[field_name])
245 # Remove unit suffix from field name if present
246 field_name = field_name.rsplit(" (", 1)[0]
247 # add last value of each parameter to the "EDS" sub-tree of nx_meta
248 set_nested_dict_value(
249 mdict,
250 ["nx_meta", "EDS", field_name],
251 val,
252 )
255def _set_si_meta(mdict, pre_path, meta_key):
256 val = try_getting_dict_value(mdict, [*pre_path, "SI", *meta_key])
257 if val is not None:
258 field_name = meta_key[-1]
259 # Convert to Pint Quantity if the field has units
260 unit_map = {
261 "Dispersion (eV)": "electron_volt",
262 "Energy Cutoff (V)": "volt",
263 "Exposure (s)": "second",
264 }
265 if field_name in unit_map:
266 with contextlib.suppress(ValueError, TypeError):
267 val = ureg.Quantity(val, unit_map[field_name])
268 # Remove unit suffix from field name
269 field_name = field_name.rsplit(" (", 1)[0]
270 # add last value of each parameter to the "EDS" sub-tree of
271 # nx_meta
272 set_nested_dict_value(mdict, ["nx_meta", "EDS", field_name], val)
275def _try_decimal(val):
276 try:
277 val = Decimal(val)
278 val = float(val)
279 except (ValueError, InvalidOperation):
280 pass
281 return val
284def _parse_filter_settings(info_dict, tecnai_info):
285 try:
286 info_dict["Filter_Settings"] = {}
287 tecnai_filter_info = tecnai_info[
288 tecnai_info.index("Filter related settings:") + 1 :
289 ]
290 # String
291 info_dict["Filter_Settings"]["Mode"] = _find_val("Mode: ", tecnai_filter_info)
292 # Decimal (eV/channel) # noqa: ERA001
293 tmp = _find_val("Selected dispersion: ", tecnai_filter_info)
294 if tmp is not None:
295 tmp = re.sub(r"\[eV/Channel\]", "", tmp)
296 info_dict["Filter_Settings"]["Dispersion"] = _try_decimal(tmp)
298 # Decimal (millimeter) # noqa: ERA001
299 tmp = _find_val("Selected aperture: ", tecnai_filter_info)
300 if tmp is not None:
301 tmp = tmp.strip("m")
302 info_dict["Filter_Settings"]["Aperture"] = _try_decimal(tmp)
304 # Decimal (eV) # noqa: ERA001
305 tmp = _find_val("Prism shift: ", tecnai_filter_info)
306 if tmp is not None:
307 tmp = re.sub(r"\[eV\]", "", tmp)
308 info_dict["Filter_Settings"]["Prism_Shift"] = _try_decimal(tmp)
310 # Decimal (eV) # noqa: ERA001
311 tmp = _find_val("Drift tube: ", tecnai_filter_info)
312 if tmp is not None:
313 tmp = re.sub(r"\[eV\]", "", tmp)
314 info_dict["Filter_Settings"]["Drift_Tube"] = _try_decimal(tmp)
316 # Decimal (eV) # noqa: ERA001
317 tmp = _find_val("Total energy loss: ", tecnai_filter_info)
318 if tmp is not None:
319 tmp = re.sub(r"\[eV\]", "", tmp)
320 info_dict["Filter_Settings"]["Total_Energy_Loss"] = _try_decimal(tmp)
321 except ValueError:
322 _logger.info("Filter settings not found in Tecnai microscope info")
324 return info_dict
327def _zero_data_in_dm3(
328 filename: Path,
329 out_filename: Path | None = None,
330 *,
331 compress=True,
332) -> Path:
333 """
334 Zero out data in a DM3 file.
336 Helper method that will overwrite the data in a dm3 image file with
337 zeros and save it as either another dm3, or as a compressed archive (used
338 for creating files for the test suite that don't take up tons of space).
339 Since the resulting file is just some text metadata and zeros, it should
340 be highly compressible (initial tests allowed for a 16MB file to be
341 compressed to ~100KB).
343 Parameters
344 ----------
345 filename
346 Path to file to be modified
347 out_filename
348 Name with which to save the output file. If None, it will be
349 automatically generated from the ``filename``.
350 compress
351 Whether to compress the files into a tar.gz file
353 Returns
354 -------
355 Path
356 The path of the compressed (or zeroed) file
357 """
358 # zero out extent of data in DM3 file and compress to tar.gz:
359 if not out_filename:
360 mod_fname = filename.parent / (filename.stem + "_dataZeroed" + filename.suffix)
361 else:
362 mod_fname = out_filename
364 shutil.copyfile(filename, mod_fname)
366 # Do some lower-level reading on the .dm3 file to get the ImageObject refs
367 with filename.open(mode="rb") as f:
368 dm_reader = DigitalMicrographReader(f)
369 dm_reader.parse_file()
370 images = [
371 ImageObject(im_dict, f) for im_dict in dm_reader.get_image_dictionaries()
372 ]
374 # write zeros to the file in the data block (offset + size in bytes
375 # information is obtained from the ImageObject ref)
376 # NB: currently this is just tested for single-image .dm3 files. Spectra
377 # and image stacks will probably work differently.
378 with mod_fname.open(mode="r+b") as f:
379 f.seek(images[0].imdict.ImageData.Data.offset)
380 f.write(b"\x00" * images[0].imdict.ImageData.Data.size_bytes)
382 # compress the output, if requested
383 if compress:
384 tar_path = Path(f"{mod_fname}.tar.gz")
385 with tarfile.open(tar_path, "w:gz") as tar:
386 tar.add(mod_fname)
387 out_fpath = tar_path
388 mod_fname.unlink()
389 else:
390 out_fpath = mod_fname
392 return out_fpath
395def _find_val(s_to_find, list_to_search):
396 """
397 Find a value in a list.
399 Return the first value in list_to_search that contains s_to_find, or
400 None if it is not found.
402 Note: If needed, this could be improved to use regex instead, which
403 would provide more control over the patterns to return
404 """
405 res = [x for x in list_to_search if s_to_find in x]
406 if len(res) > 0:
407 res = res[0]
408 # remove the string we searched for from the beginning of the res
409 return re.sub("^" + s_to_find, "", res)
411 return None
414# Field categorization helpers for schema-based metadata extraction
417def add_to_extensions(nx_meta: dict, field_name: str, value: Any) -> None:
418 """
419 Add a field to the extensions section of nx_meta.
421 This is a convenience function that ensures the extensions dict exists
422 before adding a field. Use this for vendor-specific, instrument-specific,
423 or facility-specific metadata that doesn't fit the core schema.
425 Parameters
426 ----------
427 nx_meta : dict
428 The nx_meta dictionary being built by the extractor. Will be modified
429 in place to add the field to the extensions section.
430 field_name : str
431 Name of the field to add. Use descriptive names that clearly indicate
432 the field's meaning (e.g., 'quanta_spot_size', 'detector_contrast').
433 value : Any
434 The value to store. Can be any JSON-serializable type, including
435 Pint Quantity objects which will be automatically serialized.
437 Examples
438 --------
439 Add vendor-specific fields during metadata extraction:
441 >>> nx_meta = {
442 ... "DatasetType": "Image",
443 ... "Data Type": "SEM_Imaging",
444 ... "Creation Time": "2024-01-15T10:30:00-05:00",
445 ... }
446 >>> add_to_extensions(nx_meta, "spot_size", 3.5)
447 >>> add_to_extensions(nx_meta, "detector_contrast", 50.0)
448 >>> nx_meta["extensions"]
449 {'spot_size': 3.5, 'detector_contrast': 50.0}
451 Works with Pint Quantities:
453 >>> from nexusLIMS.schemas.units import ureg
454 >>> add_to_extensions(nx_meta, "chamber_pressure", ureg.Quantity(79.8, "pascal"))
456 Notes
457 -----
458 The extensions section preserves all metadata that doesn't fit the core
459 schema, ensuring no data loss during extraction. Extensions are included
460 in the XML output and preserved through the record building process.
461 """
462 # Ensure extensions dict exists
463 if "extensions" not in nx_meta:
464 nx_meta["extensions"] = {}
466 # Add the field
467 nx_meta["extensions"][field_name] = value