"""Methods (primarily intended to be private) that are used by the other extractors."""
import contextlib
import logging
import re
import shutil
import tarfile
from datetime import UTC, datetime
from decimal import Decimal, InvalidOperation
from pathlib import Path
from typing import Any, Dict, List
from rsciio.digitalmicrograph._api import ( # pylint: disable=import-error,no-name-in-module
DigitalMicrographReader,
ImageObject,
)
from nexusLIMS.instruments import Instrument, get_instr_from_filepath
from nexusLIMS.schemas.units import ureg
from nexusLIMS.utils.dicts import set_nested_dict_value, try_getting_dict_value
_logger = logging.getLogger(__name__)
def _coerce_to_list(meta_key):
if isinstance(meta_key, str):
return [meta_key]
return meta_key
def _get_mtime_iso(filename: Path, instrument: Instrument | None = None):
return datetime.fromtimestamp(
filename.stat().st_mtime,
tz=instrument.timezone if instrument else UTC,
).isoformat()
def _set_instr_name_and_time(mdict: Dict, filename: Path):
instr = get_instr_from_filepath(filename)
# if we found the instrument, then store the name as string, else None
instr_name = instr.name if instr is not None else None
mdict["nx_meta"]["Instrument ID"] = instr_name
mdict["nx_meta"]["Creation Time"] = _get_mtime_iso(filename, instr)
mdict["nx_meta"]["warnings"] = []
def _set_acquisition_device_name(mdict: Dict, pre_path: List[str]):
val = try_getting_dict_value(mdict, [*pre_path, "Acquisition", "Device", "Name"])
if val is None:
val = try_getting_dict_value(mdict, [*pre_path, "DataBar", "Device Name"])
if val is not None:
set_nested_dict_value(mdict, ["nx_meta", "Acquisition Device"], val)
def _set_exposure_time(mdict: Dict, pre_path: List[str]):
val = try_getting_dict_value(
mdict,
[*pre_path, "Acquisition", "Parameters", "High Level", "Exposure (s)"],
)
if val is None:
val = try_getting_dict_value(mdict, [*pre_path, "DataBar", "Exposure Time (s)"])
if val is not None:
# Convert to Pint Quantity with seconds unit
with contextlib.suppress(ValueError, TypeError):
val = ureg.Quantity(val, "second")
set_nested_dict_value(mdict, ["nx_meta", "Exposure Time"], val)
def _set_gms_version(mdict: Dict, pre_path: List[str]):
val = try_getting_dict_value(mdict, [*pre_path, "GMS Version", "Created"])
if val is not None:
set_nested_dict_value(mdict, ["nx_meta", "GMS Version"], val)
def _set_camera_binning(mdict: Dict, pre_path: List[str]):
val = try_getting_dict_value(
mdict,
[*pre_path, "Acquisition", "Parameters", "High Level", "Binning"],
)
if val is not None:
set_nested_dict_value(mdict, ["nx_meta", "Binning (Horizontal)"], val[0])
set_nested_dict_value(mdict, ["nx_meta", "Binning (Vertical)"], val[1])
def _set_image_processing(mdict: Dict, pre_path: List[str]):
# ImageTags.Acquisition.Parameters["High Level"].Processing will be
# something like "Gain normalized" - not just for EELS so move this to
# general
val = try_getting_dict_value(
mdict,
[*pre_path, "Acquisition", "Parameters", "High Level", "Processing"],
)
if val is not None:
set_nested_dict_value(mdict, ["nx_meta", "Camera/Detector Processing"], val)
def _set_eels_meta(mdict, base, meta_key):
val = try_getting_dict_value(mdict, base + meta_key)
# only add the value to this list if we found it, and it's not
# one of the "facility-wide" set values that do not have any meaning:
if val is not None:
field_name = meta_key[-1]
# Convert to Pint Quantity if the field has units
unit_map = {
"Exposure (s)": "second",
"Integration time (s)": "second",
"Collection semi-angle (mrad)": "milliradian",
"Convergence semi-angle (mrad)": "milliradian",
}
if field_name in unit_map:
with contextlib.suppress(ValueError, TypeError):
val = ureg.Quantity(val, unit_map[field_name])
# Remove unit suffix from field name
field_name = field_name.rsplit(" (", 1)[0]
# add last value of each parameter to the "EELS" sub-tree of nx_meta
set_nested_dict_value(mdict, ["nx_meta", "EELS", field_name], val)
def _set_eels_spectrometer_meta(mdict, base, meta_key):
val = try_getting_dict_value(mdict, base + meta_key)
if val is not None:
field_name = meta_key[0]
# Convert to Pint Quantity if the field has units
unit_map = {
"Energy loss (eV)": "electron_volt",
"Drift tube voltage (V)": "volt",
"Slit width (eV)": "electron_volt",
"Prism offset (V)": "volt",
}
if field_name in unit_map:
with contextlib.suppress(ValueError, TypeError):
val = ureg.Quantity(val, unit_map[field_name])
# Remove unit suffix from field name
field_name = field_name.rsplit(" (", 1)[0]
# add last value of each param to the "EELS" sub-tree of nx_meta
set_nested_dict_value(
mdict,
["nx_meta", "EELS", "Spectrometer " + field_name],
val,
)
def _set_eels_processing(mdict, pre_path):
# Process known tags under "processing":
# ImageTags.Processing will be a list of things done (in multiple
# TagGroups) - things like Compute thickness, etc.
val = try_getting_dict_value(mdict, [*pre_path, "Processing"])
if val is not None and isinstance(val, dict):
# if val is a dict, then there were processing steps applied
eels_ops = []
for _, v in val.items():
# k will be TagGroup0, TagGroup1, etc.
# v will be dictionaries specifying the process step
# AlignSIByPeak, DataPicker, SpectrumCalibrate,
# Compute Thickness, Background Removal, Signal Integration
operation = v["Operation"]
param = v["Parameters"]
if operation == "AlignSIByPeak":
eels_ops.append("Aligned parent SI By Peak")
elif operation == "Background Removal":
val = try_getting_dict_value(param, ["Model"])
if val is not None:
set_nested_dict_value(
mdict,
["nx_meta", "EELS", "Background Removal Model"],
val,
)
eels_ops.append(operation)
elif operation == "SpectrumCalibrate":
eels_ops.append("Calibrated Post-acquisition")
elif operation == "Compute Thickness":
mdict = _process_thickness_metadata(mdict, [*pre_path, "EELS"])
eels_ops.append(operation)
elif operation == "DataPicker":
eels_ops.append("Extracted from SI")
elif operation == "Signal Integration":
eels_ops.append(operation)
if eels_ops:
# remove duplicates (convert to set) and sort alphabetically:
set_nested_dict_value(
mdict,
["nx_meta", "EELS", "Processing Steps"],
", ".join(sorted(set(eels_ops))),
)
def _process_thickness_metadata(mdict, base):
abs_thick = try_getting_dict_value(
mdict,
[*base, "Thickness", "Absolute", "Measurement"],
)
abs_units = try_getting_dict_value(mdict, [*base, "Thickness", "Absolute", "Units"])
abs_mfp = try_getting_dict_value(
mdict,
[*base, "Thickness", "Absolute", "Mean Free Path"],
)
rel_thick = try_getting_dict_value(
mdict,
[*base, "Thickness", "Relative", "Measurement"],
)
if abs_thick is not None:
set_nested_dict_value(
mdict,
["nx_meta", "EELS", f"Thickness (absolute) [{abs_units}]"],
abs_thick,
)
if abs_mfp is not None:
set_nested_dict_value(
mdict,
["nx_meta", "EELS", "Thickness (absolute) mean free path"],
abs_mfp[0],
)
if rel_thick is not None:
set_nested_dict_value(
mdict,
["nx_meta", "EELS", "Thickness (relative) [t/λ]"],
rel_thick,
)
return mdict
def _set_eds_meta(mdict, base, meta_key):
val = try_getting_dict_value(mdict, base + meta_key)
# only add the value to this list if we found it, and it's not
# one of the "facility-wide" set values that do not have any meaning:
if val is not None:
field_name = meta_key[-1] if len(meta_key) > 1 else meta_key[0]
# Convert to Pint Quantity if the field has units
unit_map = {
"Dispersion (eV)": "electron_volt",
"Energy Cutoff (V)": "volt",
"Exposure (s)": "second",
"Azimuthal angle": "degree",
"Elevation angle": "degree",
"Incidence angle": "degree",
"Stage tilt": "degree",
"Live time": "second",
"Real time": "second",
}
if field_name in unit_map:
with contextlib.suppress(ValueError, TypeError):
val = ureg.Quantity(val, unit_map[field_name])
# Remove unit suffix from field name if present
field_name = field_name.rsplit(" (", 1)[0]
# add last value of each parameter to the "EDS" sub-tree of nx_meta
set_nested_dict_value(
mdict,
["nx_meta", "EDS", field_name],
val,
)
def _set_si_meta(mdict, pre_path, meta_key):
val = try_getting_dict_value(mdict, [*pre_path, "SI", *meta_key])
if val is not None:
field_name = meta_key[-1]
# Convert to Pint Quantity if the field has units
unit_map = {
"Dispersion (eV)": "electron_volt",
"Energy Cutoff (V)": "volt",
"Exposure (s)": "second",
}
if field_name in unit_map:
with contextlib.suppress(ValueError, TypeError):
val = ureg.Quantity(val, unit_map[field_name])
# Remove unit suffix from field name
field_name = field_name.rsplit(" (", 1)[0]
# add last value of each parameter to the "EDS" sub-tree of
# nx_meta
set_nested_dict_value(mdict, ["nx_meta", "EDS", field_name], val)
def _try_decimal(val):
try:
val = Decimal(val)
val = float(val)
except (ValueError, InvalidOperation):
pass
return val
def _parse_filter_settings(info_dict, tecnai_info):
try:
info_dict["Filter_Settings"] = {}
tecnai_filter_info = tecnai_info[
tecnai_info.index("Filter related settings:") + 1 :
]
# String
info_dict["Filter_Settings"]["Mode"] = _find_val("Mode: ", tecnai_filter_info)
# Decimal (eV/channel) # noqa: ERA001
tmp = _find_val("Selected dispersion: ", tecnai_filter_info)
if tmp is not None:
tmp = re.sub(r"\[eV/Channel\]", "", tmp)
info_dict["Filter_Settings"]["Dispersion"] = _try_decimal(tmp)
# Decimal (millimeter) # noqa: ERA001
tmp = _find_val("Selected aperture: ", tecnai_filter_info)
if tmp is not None:
tmp = tmp.strip("m")
info_dict["Filter_Settings"]["Aperture"] = _try_decimal(tmp)
# Decimal (eV) # noqa: ERA001
tmp = _find_val("Prism shift: ", tecnai_filter_info)
if tmp is not None:
tmp = re.sub(r"\[eV\]", "", tmp)
info_dict["Filter_Settings"]["Prism_Shift"] = _try_decimal(tmp)
# Decimal (eV) # noqa: ERA001
tmp = _find_val("Drift tube: ", tecnai_filter_info)
if tmp is not None:
tmp = re.sub(r"\[eV\]", "", tmp)
info_dict["Filter_Settings"]["Drift_Tube"] = _try_decimal(tmp)
# Decimal (eV) # noqa: ERA001
tmp = _find_val("Total energy loss: ", tecnai_filter_info)
if tmp is not None:
tmp = re.sub(r"\[eV\]", "", tmp)
info_dict["Filter_Settings"]["Total_Energy_Loss"] = _try_decimal(tmp)
except ValueError:
_logger.info("Filter settings not found in Tecnai microscope info")
return info_dict
def _zero_data_in_dm3(
filename: Path,
out_filename: Path | None = None,
*,
compress=True,
) -> Path:
"""
Zero out data in a DM3 file.
Helper method that will overwrite the data in a dm3 image file with
zeros and save it as either another dm3, or as a compressed archive (used
for creating files for the test suite that don't take up tons of space).
Since the resulting file is just some text metadata and zeros, it should
be highly compressible (initial tests allowed for a 16MB file to be
compressed to ~100KB).
Parameters
----------
filename
Path to file to be modified
out_filename
Name with which to save the output file. If None, it will be
automatically generated from the ``filename``.
compress
Whether to compress the files into a tar.gz file
Returns
-------
Path
The path of the compressed (or zeroed) file
"""
# zero out extent of data in DM3 file and compress to tar.gz:
if not out_filename:
mod_fname = filename.parent / (filename.stem + "_dataZeroed" + filename.suffix)
else:
mod_fname = out_filename
shutil.copyfile(filename, mod_fname)
# Do some lower-level reading on the .dm3 file to get the ImageObject refs
with filename.open(mode="rb") as f:
dm_reader = DigitalMicrographReader(f)
dm_reader.parse_file()
images = [
ImageObject(im_dict, f) for im_dict in dm_reader.get_image_dictionaries()
]
# write zeros to the file in the data block (offset + size in bytes
# information is obtained from the ImageObject ref)
# NB: currently this is just tested for single-image .dm3 files. Spectra
# and image stacks will probably work differently.
with mod_fname.open(mode="r+b") as f:
f.seek(images[0].imdict.ImageData.Data.offset)
f.write(b"\x00" * images[0].imdict.ImageData.Data.size_bytes)
# compress the output, if requested
if compress:
tar_path = Path(f"{mod_fname}.tar.gz")
with tarfile.open(tar_path, "w:gz") as tar:
tar.add(mod_fname)
out_fpath = tar_path
mod_fname.unlink()
else:
out_fpath = mod_fname
return out_fpath
def _find_val(s_to_find, list_to_search):
"""
Find a value in a list.
Return the first value in list_to_search that contains s_to_find, or
None if it is not found.
Note: If needed, this could be improved to use regex instead, which
would provide more control over the patterns to return
"""
res = [x for x in list_to_search if s_to_find in x]
if len(res) > 0:
res = res[0]
# remove the string we searched for from the beginning of the res
return re.sub("^" + s_to_find, "", res)
return None
# Field categorization helpers for schema-based metadata extraction
[docs]
def add_to_extensions(nx_meta: dict, field_name: str, value: Any) -> None:
"""
Add a field to the extensions section of nx_meta.
This is a convenience function that ensures the extensions dict exists
before adding a field. Use this for vendor-specific, instrument-specific,
or facility-specific metadata that doesn't fit the core schema.
Parameters
----------
nx_meta : dict
The nx_meta dictionary being built by the extractor. Will be modified
in place to add the field to the extensions section.
field_name : str
Name of the field to add. Use descriptive names that clearly indicate
the field's meaning (e.g., 'quanta_spot_size', 'detector_contrast').
value : Any
The value to store. Can be any JSON-serializable type, including
Pint Quantity objects which will be automatically serialized.
Examples
--------
Add vendor-specific fields during metadata extraction:
>>> nx_meta = {
... "DatasetType": "Image",
... "Data Type": "SEM_Imaging",
... "Creation Time": "2024-01-15T10:30:00-05:00",
... }
>>> add_to_extensions(nx_meta, "spot_size", 3.5)
>>> add_to_extensions(nx_meta, "detector_contrast", 50.0)
>>> nx_meta["extensions"]
{'spot_size': 3.5, 'detector_contrast': 50.0}
Works with Pint Quantities:
>>> from nexusLIMS.schemas.units import ureg
>>> add_to_extensions(nx_meta, "chamber_pressure", ureg.Quantity(79.8, "pascal"))
Notes
-----
The extensions section preserves all metadata that doesn't fit the core
schema, ensuring no data loss during extraction. Extensions are included
in the XML output and preserved through the record building process.
"""
# Ensure extensions dict exists
if "extensions" not in nx_meta:
nx_meta["extensions"] = {}
# Add the field
nx_meta["extensions"][field_name] = value