Coverage for nexusLIMS/extractors/plugins/digital_micrograph.py: 100%
356 statements
« prev ^ index » next coverage.py v7.11.3, created at 2026-03-24 05:23 +0000
« prev ^ index » next coverage.py v7.11.3, created at 2026-03-24 05:23 +0000
1"""Digital Micrograph (.dm3/.dm4) extractor plugin."""
3import contextlib
4import logging
5from datetime import UTC
6from datetime import datetime as dt
7from pathlib import Path
8from struct import error
9from typing import Any, ClassVar, Dict, List
11import numpy as np
12from hyperspy.io import load as hs_load
13from rsciio.utils.exceptions import (
14 DM3DataTypeError,
15 DM3FileVersionError,
16 DM3TagError,
17 DM3TagIDError,
18 DM3TagTypeError,
19)
21from nexusLIMS.extractors.base import ExtractionContext
22from nexusLIMS.extractors.plugins.basic_metadata import BasicFileInfoExtractor
23from nexusLIMS.extractors.plugins.profiles import register_all_profiles
24from nexusLIMS.extractors.profiles import get_profile_registry
25from nexusLIMS.extractors.utils import (
26 _coerce_to_list,
27 _find_val,
28 _parse_filter_settings,
29 _set_acquisition_device_name,
30 _set_camera_binning,
31 _set_eds_meta,
32 _set_eels_meta,
33 _set_eels_processing,
34 _set_eels_spectrometer_meta,
35 _set_exposure_time,
36 _set_gms_version,
37 _set_image_processing,
38 _set_si_meta,
39 _try_decimal,
40 add_to_extensions,
41)
42from nexusLIMS.instruments import get_instr_from_filepath
43from nexusLIMS.schemas.units import ureg
44from nexusLIMS.utils.dicts import (
45 remove_dict_nones,
46 remove_dtb_element,
47 set_nested_dict_value,
48 sort_dict,
49 try_getting_dict_value,
50)
51from nexusLIMS.utils.time import current_system_tz
53_logger = logging.getLogger(__name__)
56class DM3Extractor:
57 """
58 Extractor for Gatan DigitalMicrograph files (.dm3 and .dm4).
60 This extractor handles metadata extraction from files saved by Gatan's
61 DigitalMicrograph software, commonly used on FEI/Thermo and JEOL TEMs.
62 """
64 name = "dm3_extractor"
65 priority = 100
66 supported_extensions: ClassVar = {"dm3", "dm4"}
68 def supports(self, context: ExtractionContext) -> bool:
69 """
70 Check if this extractor supports the given file.
72 Parameters
73 ----------
74 context
75 The extraction context containing file information
77 Returns
78 -------
79 bool
80 True if file extension is .dm3 or .dm4
81 """
82 extension = context.file_path.suffix.lower().lstrip(".")
83 return extension in {"dm3", "dm4"}
85 def extract(
86 self, context: ExtractionContext
87 ) -> dict[str, Any] | list[dict[str, Any]]:
88 """
89 Extract metadata from a DM3/DM4 file.
91 Parameters
92 ----------
93 context
94 The extraction context containing file information
96 Returns
97 -------
98 list[dict] or dict
99 For DM3/DM4 files: Always returns a list of metadata dicts.
100 Each dict contains 'nx_meta' with NexusLIMS-specific metadata.
101 Single-signal files return a 1-element list for consistency.
102 If the file cannot be opened, returns basic metadata as a single dict
103 (following the standard extractor contract for error cases).
104 """
105 _logger.debug("Extracting metadata from DM3/DM4 file: %s", context.file_path)
106 # get_dm3_metadata() handles profile application internally
107 metadata_list = get_dm3_metadata(context.file_path, context.instrument)
109 # If extraction failed, return minimal metadata with a warning
110 if metadata_list is None:
111 _logger.warning(
112 "Failed to extract DM3/DM4 metadata from %s, "
113 "falling back to basic metadata",
114 context.file_path,
115 )
116 # Use basic metadata extractor as fallback
117 basic_extractor = BasicFileInfoExtractor()
118 metadata_list = basic_extractor.extract(context)
119 # Add a warning to indicate extraction failed
120 metadata = metadata_list[0]
121 metadata["nx_meta"]["warnings"] = metadata["nx_meta"].get("warnings", [])
122 metadata["nx_meta"]["warnings"].append(
123 ["DM3/DM4 file could not be read by HyperSpy"]
124 )
125 return [metadata]
127 # Always return a list of metadata dicts
128 # Single-signal files return a 1-element list for consistent interface
129 return metadata_list
132def get_dm3_metadata(filename: Path, instrument=None):
133 """
134 Get metadata from a dm3 or dm4 file.
136 Returns the metadata from a .dm3 file saved by Digital Micrograph, with some
137 non-relevant information stripped out. Instrument-specific metadata parsing is
138 handled by instrument profiles (see nexusLIMS.extractors.plugins.profiles).
140 Parameters
141 ----------
142 filename : str
143 path to a .dm3 file saved by Gatan's Digital Micrograph
144 instrument : Instrument, optional
145 The instrument object (used for timezone info). Instrument-specific parsing
146 is now handled via profiles, not this parameter.
148 Returns
149 -------
150 metadata : list[dict] or None
151 List of extracted metadata dicts, one per signal. If None, the file could
152 not be opened.
153 """
154 # We do lazy loading so we don't actually read the data from the disk to
155 # save time and memory.
156 try:
157 s = hs_load(filename, lazy=True)
158 except (
159 DM3DataTypeError,
160 DM3FileVersionError,
161 DM3TagError,
162 DM3TagIDError,
163 DM3TagTypeError,
164 error,
165 ) as exc:
166 _logger.warning(
167 "File reader could not open %s, received exception: %s",
168 filename,
169 repr(exc),
170 )
171 return None
173 if isinstance(s, list):
174 # s is a list, rather than a single signal
175 m_list = [{}] * len(s)
176 for i, _ in enumerate(s):
177 m_list[i] = s[i].original_metadata
178 else:
179 s = [s]
180 m_list = [s[0].original_metadata]
182 for i, m_tree in enumerate(m_list):
183 # Important trees:
184 # DocumentObjectList
185 # Contains information about the display of the information, including bits
186 # about annotations that are included on top of the image data, the CLUT
187 # (color look-up table), data min/max.
188 #
189 # ImageList
190 # Contains the actual image information
192 # Remove the trees that are not of interest:
193 for tag in [
194 "ApplicationBounds",
195 "LayoutType",
196 "DocumentTags",
197 "HasWindowPosition",
198 "ImageSourceList",
199 "Image_Behavior",
200 "InImageMode",
201 "MinVersionList",
202 "NextDocumentObjectID",
203 "PageSetup",
204 "Page_Behavior",
205 "SentinelList",
206 "Thumbnails",
207 "WindowPosition",
208 "root",
209 ]:
210 m_tree = remove_dtb_element(m_tree, tag) # noqa: PLW2901
212 # Within the DocumentObjectList tree, we really only care about the
213 # AnnotationGroupList for each TagGroup, so go into each TagGroup and
214 # delete everything but that...
215 # NB: the hyperspy DictionaryTreeBrowser __iter__ function returns each
216 # tree element as a tuple containing the tree name and the actual
217 # tree, so we loop through the tag names by taking the first part
218 # of the tuple:
219 for tg_name, tag in m_tree.DocumentObjectList:
220 # tg_name should be 'TagGroup0', 'TagGroup1', etc.
221 keys = tag.keys()
222 # we want to keep this, so remove from the list to loop through
223 if "AnnotationGroupList" in keys:
224 keys.remove("AnnotationGroupList")
225 for k in keys:
226 m_tree = remove_dtb_element( # noqa: PLW2901
227 m_tree,
228 f"DocumentObjectList.{tg_name}.{k}",
229 )
231 for tg_name, tag in m_tree.ImageList:
232 # tg_name should be 'TagGroup0', 'TagGroup1', etc.
233 keys = tag.keys()
234 # We want to keep 'ImageTags' and 'Name', so remove from list
235 keys.remove("ImageTags")
236 keys.remove("Name")
237 for k in keys:
238 # k should be in ['ImageData', 'UniqueID']
239 m_tree = remove_dtb_element( # noqa: PLW2901
240 m_tree,
241 f"ImageList.{tg_name}.{k}",
242 )
244 m_list[i] = m_tree.as_dictionary()
246 # Get the instrument object associated with this file
247 # Use provided instrument if available, otherwise look it up
248 instr = (
249 instrument if instrument is not None else get_instr_from_filepath(filename)
250 )
251 # get the modification time (as ISO format):
252 mtime = filename.stat().st_mtime
253 # Use instrument timezone if available, otherwise fall back to system timezone
254 tz = instr.timezone if instr else current_system_tz()
255 mtime_iso = dt.fromtimestamp(mtime, tz=tz).isoformat()
256 # if we found the instrument, then store the name as string, else None
257 instr_name = instr.name if instr is not None else None
258 m_list[i]["nx_meta"] = {}
259 m_list[i]["nx_meta"]["fname"] = str(filename)
260 # set type to Image by default
261 m_list[i]["nx_meta"]["DatasetType"] = "Image"
262 m_list[i]["nx_meta"]["Data Type"] = "TEM_Imaging"
263 m_list[i]["nx_meta"]["Creation Time"] = mtime_iso
264 m_list[i]["nx_meta"]["Data Dimensions"] = str(s[i].data.shape)
265 m_list[i]["nx_meta"]["Instrument ID"] = instr_name
266 m_list[i]["nx_meta"]["warnings"] = []
267 m_list[i] = parse_dm3_microscope_info(m_list[i])
268 m_list[i] = parse_dm3_eels_info(m_list[i])
269 m_list[i] = parse_dm3_eds_info(m_list[i])
270 m_list[i] = parse_dm3_spectrum_image_info(m_list[i])
272 # Apply instrument-specific profiles if an instrument was provided
273 if instr is not None:
274 m_list[i] = _apply_profile_to_metadata(m_list[i], instr, filename)
276 # we don't need to save the filename, it's just for internal processing
277 del m_list[i]["nx_meta"]["fname"]
279 # Migrate metadata to schema-compliant format
280 m_list[i] = _migrate_to_schema_compliant_metadata(m_list[i])
282 # sort the nx_meta dictionary (recursively) for nicer display
283 m_list[i]["nx_meta"] = sort_dict(m_list[i]["nx_meta"])
285 # return all signals as a list of dictionaries:
286 return [remove_dict_nones(m) for m in m_list]
289def _apply_profile_to_metadata(metadata: dict, instrument, file_path: Path) -> dict:
290 """
291 Apply instrument profile to metadata dictionary.
293 This is a helper function used by get_dm3_metadata() to maintain backward
294 compatibility with code that calls it directly.
296 Parameters
297 ----------
298 metadata
299 Metadata dictionary with 'nx_meta' key
300 instrument
301 Instrument object
302 file_path
303 Path to the file being processed
305 Returns
306 -------
307 dict
308 Modified metadata dictionary with profile transformations applied
309 """
310 # Ensure profiles are loaded
311 register_all_profiles()
313 profile = get_profile_registry().get_profile(instrument)
315 if profile is None:
316 return metadata
318 _logger.debug("Applying profile for instrument: %s", instrument.name)
320 # Create a mock context for profile application
321 context = ExtractionContext(file_path=file_path, instrument=instrument)
323 # Apply custom parsers in order
324 for parser_name, parser_func in profile.parsers.items():
325 try:
326 metadata = parser_func(metadata, context)
327 except Exception as e:
328 _logger.warning(
329 "Profile parser '%s' failed: %s",
330 parser_name,
331 e,
332 )
334 # Apply transformations
335 for key, transform_func in profile.transformations.items():
336 try:
337 if key in metadata:
338 metadata[key] = transform_func(metadata[key])
339 except Exception as e:
340 _logger.warning(
341 "Profile transformation '%s' failed: %s",
342 key,
343 e,
344 )
346 # Inject extension fields
347 if profile.extension_fields:
348 for key, value in profile.extension_fields.items():
349 try:
350 add_to_extensions(metadata["nx_meta"], key, value)
351 except Exception as e:
352 _logger.warning(
353 "Profile extension field injection '%s' failed: %s",
354 key,
355 e,
356 )
358 return metadata
361def get_pre_path(mdict: Dict) -> List[str]:
362 """
363 Get the appropriate pre-path in the metadata tag structure for a given signal.
365 Get the path into a dictionary where the important DigitalMicrograph metadata is
366 expected to be found. If the .dm3/.dm4 file contains a stack of images, the
367 important metadata for NexusLIMS is not at its usual place and is instead under a
368 `plan info` tag, so this method will determine if the stack metadata is present and
369 return the correct path.
371 Parameters
372 ----------
373 mdict : dict
374 A metadata dictionary as returned by :py:meth:`get_dm3_metadata`
376 Returns
377 -------
378 A list containing the subsequent keys that need to be traversed to
379 get to the point in the `mdict` where the important metadata is stored
380 """
381 # test if we have a stack
382 stack_val = try_getting_dict_value(
383 mdict,
384 ["ImageList", "TagGroup0", "ImageTags", "plane info"],
385 )
386 if stack_val is not None:
387 # we're in a stack
388 pre_path = [
389 "ImageList",
390 "TagGroup0",
391 "ImageTags",
392 "plane info",
393 "TagGroup0",
394 "source tags",
395 ]
396 else:
397 pre_path = ["ImageList", "TagGroup0", "ImageTags"]
399 return pre_path
402def _migrate_to_schema_compliant_metadata(mdict: dict) -> dict: # noqa: PLR0912
403 """
404 Migrate metadata to schema-compliant format.
406 This function reorganizes metadata extracted from DM3/DM4 files to conform
407 to the type-specific metadata schemas. It:
408 1. Maps display names to EM Glossary field names for core fields
409 2. Moves vendor-specific fields to the extensions section
410 3. Converts Stage Position dict to proper StagePosition structure
412 Parameters
413 ----------
414 mdict : dict
415 Metadata dictionary with 'nx_meta' key
417 Returns
418 -------
419 dict
420 Metadata dictionary with schema-compliant nx_meta
421 """
422 nx_meta = mdict.get("nx_meta", {})
423 dataset_type = nx_meta.get("DatasetType", "Image")
425 # Field mappings from display names to EM Glossary names
426 # These are core schema fields that just need renaming
427 # Note: dataset_type-specific fields are handled conditionally below
428 field_mappings = {
429 # Common mappings for all types
430 "Voltage": "acceleration_voltage",
431 "Horizontal Field Width": "horizontal_field_width",
432 "Vertical Field Width": "vertical_field_width",
433 "Acquisition Device": "acquisition_device",
434 "Sample Time": "dwell_time",
435 }
437 # Conditional mappings based on dataset type
438 if dataset_type == "Diffraction":
439 field_mappings["STEM Camera Length"] = "camera_length"
440 if dataset_type in ("Image", "SpectrumImage"):
441 # magnification is only a core field for image-like datasets;
442 # for others (e.g. Diffraction) it routes to extensions via the
443 # fall-through below
444 field_mappings["Indicated Magnification"] = "magnification"
446 # Fields that should ALWAYS go to extensions (vendor/instrument-specific)
447 extension_fields = {
448 # Gatan-specific
449 "GMS Version",
450 "Microscope",
451 "Operator",
452 "Specimen",
453 # Operation modes
454 "Illumination Mode",
455 "Imaging Mode",
456 "Operation Mode",
457 # Apertures
458 "Condenser Aperture",
459 "Objective Aperture",
460 "Selected Area Aperture",
461 # Vendor-specific settings
462 "Cs", # Spherical aberration
463 # Signal/Analytic metadata
464 "Signal Name",
465 "Analytic Format",
466 "Analytic Label",
467 "Analytic Signal",
468 # Nested vendor metadata (will be moved as-is)
469 "EELS",
470 "EDS",
471 # STEM-specific fields that should be extensions for non-Diffraction types
472 "STEM Camera Length", # Only core for Diffraction
473 }
475 # NOTE: "NexusLIMS Extraction" is added AFTER this migration function runs
476 # by add_extraction_details in __init__.py, so we don't need to handle it here
478 # Create new nx_meta dict with schema-compliant structure
479 new_nx_meta = {}
480 # Preserve any existing extensions (e.g., from instrument profiles)
481 extensions = nx_meta.get("extensions", {}).copy() if "extensions" in nx_meta else {}
483 # Copy required fields as-is
484 required_fields = {"Creation Time", "Data Type", "DatasetType"}
485 for field in required_fields:
486 if field in nx_meta:
487 new_nx_meta[field] = nx_meta[field]
489 # Copy common optional fields
490 common_fields = {
491 "Data Dimensions",
492 "Instrument ID",
493 "warnings",
494 "Extractor Warnings",
495 }
496 for field in common_fields:
497 if field in nx_meta:
498 new_nx_meta[field] = nx_meta[field]
500 # Process all other fields
501 for key, value in nx_meta.items():
502 # Skip if already processed
503 if key in required_fields or key in common_fields:
504 continue
506 # Check if it's a core field that needs renaming
507 if key in field_mappings:
508 new_key = field_mappings[key]
509 new_nx_meta[new_key] = value
510 # Check if it should go to extensions
511 elif key in extension_fields:
512 extensions[key] = value
513 # Handle Stage Position specially
514 elif key == "Stage Position":
515 # DM3 files have Stage Position as a dict with keys
516 # like 'X', 'Y', 'α', etc. # noqa: RUF003
517 # Convert to snake_case keys for StagePosition schema
518 if isinstance(value, dict):
519 stage_pos = {}
520 key_map = {
521 "X": "x",
522 "Y": "y",
523 "Z": "z",
524 "α": "tilt_alpha", # noqa: RUF001
525 "β": "tilt_beta",
526 }
527 for old_key, new_key in key_map.items():
528 if old_key in value:
529 # Convert to Pint Quantity if needed
530 val = value[old_key]
531 if new_key in ("x", "y") and not isinstance(val, ureg.Quantity):
532 # X/Y in micrometers
533 val = ureg.Quantity(val, "micrometer")
534 elif new_key == "z" and not isinstance(val, ureg.Quantity):
535 # Z in millimeters
536 val = ureg.Quantity(val, "millimeter")
537 elif new_key in (
538 "tilt_alpha",
539 "tilt_beta",
540 ) and not isinstance(val, ureg.Quantity):
541 # Tilts in degrees
542 val = ureg.Quantity(val, "degree")
543 stage_pos[new_key] = val
544 # Only emit stage_position when non-empty and the dataset
545 # type declares the field (Image / SpectrumImage); route
546 # non-empty values to extensions for other types and drop
547 # empty dicts entirely.
548 if stage_pos:
549 if dataset_type in ("Image", "SpectrumImage"):
550 new_nx_meta["stage_position"] = stage_pos
551 else:
552 extensions["Stage Position"] = stage_pos
553 else:
554 # If it's not a dict, move to extensions (this is not expected)
555 extensions["Stage Position"] = value # pragma: no cover
556 # Everything else goes to extensions
557 else:
558 extensions[key] = value
560 # Add extensions if any
561 for key, value in extensions.items():
562 add_to_extensions(new_nx_meta, key, value)
564 mdict["nx_meta"] = new_nx_meta
565 return mdict
568def parse_dm3_microscope_info(mdict): # noqa: PLR0912
569 """
570 Parse the "microscope info" metadata.
572 Parse the "important" metadata that is saved at specific places within the DM3 tag
573 structure into a consistent place in the metadata dictionary returned by
574 :py:meth:`get_dm3_metadata`. Specifically looks at the "Microscope Info",
575 "Session Info", and "Meta Data" nodes (these are not present on every microscope).
577 Parameters
578 ----------
579 mdict : dict
580 A metadata dictionary as returned by :py:meth:`get_dm3_metadata`
582 Returns
583 -------
584 mdict : dict
585 The same metadata dictionary with some values added under the
586 root-level ``nx_meta`` key
587 """
588 if "nx_meta" not in mdict:
589 mdict["nx_meta"] = {} # pragma: no cover
591 pre_path = get_pre_path(mdict)
593 # General "microscope info" .dm3 tags (not present on all instruments):
594 for meta_key in [
595 "Indicated Magnification",
596 "Actual Magnification",
597 "Cs(mm)",
598 "STEM Camera Length",
599 "Voltage",
600 "Operation Mode",
601 "Specimen",
602 "Microscope",
603 "Operator",
604 "Imaging Mode",
605 "Illumination Mode",
606 "Name",
607 "Field of View (\u00b5m)",
608 "Facility",
609 "Condenser Aperture",
610 "Objective Aperture",
611 "Selected Area Aperture",
612 ["Stage Position", "Stage Alpha"],
613 ["Stage Position", "Stage Beta"],
614 ["Stage Position", "Stage X"],
615 ["Stage Position", "Stage Y"],
616 ["Stage Position", "Stage Z"],
617 ]:
618 base = [*pre_path, "Microscope Info"]
619 meta_key = _coerce_to_list(meta_key) # noqa: PLW2901
621 val = try_getting_dict_value(mdict, base + meta_key)
622 # only add the value to this list if we found it, and it's not one of
623 # the "facility-wide" set values that do not have any meaning:
624 if val is not None and val not in ["DO NOT EDIT", "DO NOT ENTER"] and val != []:
625 # Store original field name for unit mapping
626 field_name = meta_key[-1] if isinstance(meta_key, list) else meta_key
628 # Convert to Pint Quantity if the field has units
629 unit_map = {
630 "Cs(mm)": "millimeter",
631 "STEM Camera Length": "millimeter",
632 "Voltage": "volt", # Will auto-convert to kilovolt
633 "Field of View (\u00b5m)": "micrometer",
634 }
635 if field_name in unit_map:
636 with contextlib.suppress(ValueError, TypeError):
637 val = ureg.Quantity(val, unit_map[field_name])
638 # Remove unit suffix from field name
639 if field_name == "Cs(mm)":
640 meta_key = ["Cs"] # noqa: PLW2901
641 elif field_name == "Field of View (\u00b5m)":
642 meta_key = ["Horizontal Field Width"] # noqa: PLW2901
644 # change output of "Stage Position" to unicode characters
645 if "Stage Position" in meta_key:
646 meta_key[-1] = (
647 meta_key[-1]
648 .replace("Alpha", "α") # noqa: RUF001
649 .replace("Beta", "β")
650 .replace("Stage ", "")
651 )
652 set_nested_dict_value(mdict, ["nx_meta", *meta_key], val)
654 # General "session info" .dm3 tags (sometimes this information is stored
655 # here instead of under "Microscope Info":
656 for meta_key in ["Detector", "Microscope", "Operator", "Specimen"]:
657 base = [*pre_path, "Session Info"]
658 meta_key = _coerce_to_list(meta_key) # noqa: PLW2901
660 val = try_getting_dict_value(mdict, base + meta_key)
661 # only add the value to this list if we found it, and it's not
662 # one of the "facility-wide" set values that do not have any meaning:
663 if val is not None and val not in ["DO NOT EDIT", "DO NOT ENTER"] and val != []:
664 set_nested_dict_value(mdict, ["nx_meta", *meta_key], val)
666 # General "Meta Data" .dm3 tags
667 for meta_key in [
668 "Acquisition Mode",
669 "Format",
670 "Signal",
671 # this one is seen sometimes in EDS signals:
672 ["Experiment keywords", "TagGroup1", "Label"],
673 ]:
674 base = [*pre_path, "Meta Data"]
675 meta_key = _coerce_to_list(meta_key) # noqa: PLW2901
677 val = try_getting_dict_value(mdict, base + meta_key)
678 # only add the value to this list if we found it, and it's not
679 # one of the "facility-wide" set values that do not have any meaning:
680 if val is not None and val not in ["DO NOT EDIT", "DO NOT ENTER"] and val != []:
681 if "Label" in meta_key:
682 set_nested_dict_value(mdict, ["nx_meta", "Analytic Label"], val)
683 else:
684 set_nested_dict_value(
685 mdict,
686 ["nx_meta"] + [f"Analytic {lbl}" for lbl in meta_key],
687 val,
688 )
690 # acquisition device name:
691 _set_acquisition_device_name(mdict, pre_path)
693 # exposure time:
694 _set_exposure_time(mdict, pre_path)
696 # GMS version:
697 _set_gms_version(mdict, pre_path)
699 # camera binning:
700 _set_camera_binning(mdict, pre_path)
702 # image processing:
703 _set_image_processing(mdict, pre_path)
705 # Signal Name (from DataBar):
706 signal_name = try_getting_dict_value(mdict, [*pre_path, "DataBar", "Signal Name"])
707 if signal_name is not None:
708 set_nested_dict_value(mdict, ["nx_meta", "Signal Name"], signal_name)
710 # DigiScan Sample Time (dwell time per pixel in microseconds):
711 sample_time = try_getting_dict_value(mdict, [*pre_path, "DigiScan", "Sample Time"])
712 if sample_time is not None:
713 with contextlib.suppress(ValueError, TypeError):
714 sample_time = ureg.Quantity(sample_time, "microsecond")
715 set_nested_dict_value(
716 mdict,
717 ["nx_meta", "Sample Time"],
718 sample_time,
719 )
721 if (
722 "Illumination Mode" in mdict["nx_meta"]
723 and "STEM" in mdict["nx_meta"]["Illumination Mode"]
724 ):
725 mdict["nx_meta"]["Data Type"] = "STEM_Imaging"
727 return mdict
730def parse_dm3_eels_info(mdict):
731 """
732 Parse EELS information from the metadata.
734 Parses metadata from the DigitalMicrograph tag structure that concerns any
735 EELS acquisition or spectrometer settings, placing it in an ``EELS``
736 dictionary underneath the root-level ``nx_meta`` node.
738 Parameters
739 ----------
740 mdict : dict
741 A metadata dictionary as returned by :py:meth:`get_dm3_metadata`
743 Returns
744 -------
745 mdict : dict
746 The metadata dict with all the "EELS-specific" metadata added under ``nx_meta``
747 """
748 pre_path = get_pre_path(mdict)
750 # EELS .dm3 tags of interest:
751 base = [*pre_path, "EELS"]
752 for meta_key in [
753 ["Acquisition", "Exposure (s)"],
754 ["Acquisition", "Integration time (s)"],
755 ["Acquisition", "Number of frames"],
756 ["Experimental Conditions", "Collection semi-angle (mrad)"],
757 ["Experimental Conditions", "Convergence semi-angle (mrad)"],
758 ]:
759 _set_eels_meta(mdict, base, meta_key)
761 # different instruments have the spectrometer information in different
762 # places...
763 if mdict["nx_meta"]["Instrument ID"] == "FEI-Titan-TEM":
764 base = [*pre_path, "EELS", "Acquisition", "Spectrometer"]
765 elif mdict["nx_meta"]["Instrument ID"] == "FEI-Titan-STEM":
766 base = [*pre_path, "EELS Spectrometer"]
767 else:
768 base = None
769 if base is not None:
770 for meta_key in [
771 "Aperture label",
772 "Dispersion (eV/ch)",
773 "Energy loss (eV)",
774 "Instrument name",
775 "Drift tube enabled",
776 "Drift tube voltage (V)",
777 "Slit inserted",
778 "Slit width (eV)",
779 "Prism offset (V)",
780 "Prism offset enabled ",
781 ]:
782 meta_key = [meta_key] # noqa: PLW2901
783 _set_eels_spectrometer_meta(mdict, base, meta_key)
785 _set_eels_processing(mdict, pre_path)
787 # Set the dataset type to Spectrum if any EELS tags were added
788 if "EELS" in mdict["nx_meta"]:
789 _logger.info("Detected file as Spectrum type based on EELS metadata")
790 mdict["nx_meta"]["DatasetType"] = "Spectrum"
791 if "STEM" in mdict["nx_meta"]["Illumination Mode"]:
792 mdict["nx_meta"]["Data Type"] = "STEM_EELS"
793 else:
794 mdict["nx_meta"]["Data Type"] = "TEM_EELS"
796 return mdict
799def parse_dm3_eds_info(mdict):
800 """
801 Parse EDS information from the dm3 metadata.
803 Parses metadata from the DigitalMicrograph tag structure that concerns any
804 EDS acquisition or spectrometer settings, placing it in an ``EDS``
805 dictionary underneath the root-level ``nx_meta`` node. Metadata values
806 that are commonly incorrect or may be placeholders are specified in a
807 list under the ``nx_meta.warnings`` node.
809 Parameters
810 ----------
811 mdict : dict
812 A metadata dictionary as returned by :py:meth:`get_dm3_metadata`
814 Returns
815 -------
816 mdict : dict
817 The metadata dictionary with all the "EDS-specific" metadata
818 added as sub-node under the ``nx_meta`` root level dictionary
819 """
820 pre_path = get_pre_path(mdict)
822 # EELS .dm3 tags of interest:
823 base = [*pre_path, "EDS"]
825 for meta_key in [
826 ["Acquisition", "Continuous Mode"],
827 ["Acquisition", "Count Rate Unit"],
828 ["Acquisition", "Dispersion (eV)"],
829 ["Acquisition", "Energy Cutoff (V)"],
830 ["Acquisition", "Exposure (s)"],
831 ["Count rate"],
832 ["Detector Info", "Active layer"],
833 ["Detector Info", "Azimuthal angle"],
834 ["Detector Info", "Dead layer"],
835 ["Detector Info", "Detector type"],
836 ["Detector Info", "Elevation angle"],
837 ["Detector Info", "Fano"],
838 ["Detector Info", "Gold layer"],
839 ["Detector Info", "Incidence angle"],
840 ["Detector Info", "Solid angle"],
841 ["Detector Info", "Stage tilt"],
842 ["Detector Info", "Window thickness"],
843 ["Detector Info", "Window type"],
844 ["Detector Info", "Zero fwhm"],
845 ["Live time"],
846 ["Real time"],
847 ]:
848 _set_eds_meta(mdict, base, meta_key)
850 # test to see if the SI attribute is present in the metadata dictionary.
851 # If so, then some relevant EDS values are located there, rather
852 # than in the root-level EDS tag (all the EDS.Acquisition tags from
853 # above)
854 if try_getting_dict_value(mdict, [*pre_path, "SI"]) is not None:
855 for meta_key in [
856 ["Acquisition", "Continuous Mode"],
857 ["Acquisition", "Count Rate Unit"],
858 ["Acquisition", "Dispersion (eV)"],
859 ["Acquisition", "Energy Cutoff (V)"],
860 ["Acquisition", "Exposure (s)"],
861 ]:
862 _set_si_meta(mdict, pre_path, meta_key)
864 # for an SI EDS dataset, set "Live time", "Real time" and "Count rate"
865 # to the averages stored in the ImageList.TagGroup0.ImageTags.EDS.Images
866 # values
867 im_dict = try_getting_dict_value(mdict, [*pre_path, "EDS", "Images"])
868 if isinstance(im_dict, dict):
869 for k, v in im_dict.items():
870 if k in mdict["nx_meta"]["EDS"]:
871 del mdict["nx_meta"]["EDS"][k]
872 # this should work for 2D (spectrum image) as well as 1D
873 # (linescan) datasets since DM saves this information as a 1D
874 # list regardless of original data shape
875 avg_val = np.array(v).mean()
876 set_nested_dict_value(
877 mdict,
878 ["nx_meta", "EDS", f"{k} (SI Average)"],
879 avg_val,
880 )
882 # Add the .dm3 EDS values to the warnings list, since they might not be
883 # accurate
884 for meta_key in [
885 ["Count rate"],
886 ["Detector Info", "Active layer"],
887 ["Detector Info", "Azimuthal angle"],
888 ["Detector Info", "Dead layer"],
889 ["Detector Info", "Detector type"],
890 ["Detector Info", "Elevation angle"],
891 ["Detector Info", "Fano"],
892 ["Detector Info", "Gold layer"],
893 ["Detector Info", "Incidence angle"],
894 ["Detector Info", "Solid angle"],
895 ["Detector Info", "Stage tilt"],
896 ["Detector Info", "Window thickness"],
897 ["Detector Info", "Window type"],
898 ["Detector Info", "Zero fwhm"],
899 ["Live time"],
900 ["Real time"],
901 ]:
902 if try_getting_dict_value(mdict, base + meta_key) is not None:
903 mdict["nx_meta"]["warnings"].append(
904 ["EDS", meta_key[-1] if len(meta_key) > 1 else meta_key[0]],
905 )
907 # Set the dataset type to Spectrum if any EDS tags were added
908 if "EDS" in mdict["nx_meta"]:
909 _logger.info("Detected file as Spectrum type based on presence of EDS metadata")
910 mdict["nx_meta"]["DatasetType"] = "Spectrum"
911 if "STEM" in mdict["nx_meta"]["Illumination Mode"]:
912 mdict["nx_meta"]["Data Type"] = "STEM_EDS"
913 else:
914 # no known files match this mode, so skip for coverage
915 mdict["nx_meta"]["Data Type"] = "TEM_EDS" # pragma: no cover
917 return mdict
920def parse_dm3_spectrum_image_info(mdict):
921 """
922 Parse "spectrum image" information from the metadata.
924 Parses metadata that concerns any spectrum imaging information (the "SI" tag) and
925 places it in a "Spectrum Imaging" dictionary underneath the root-level ``nx_meta``
926 node. Metadata values that are commonly incorrect or may be placeholders are
927 specified in a list under the ``nx_meta.warnings`` node.
929 Parameters
930 ----------
931 mdict : dict
932 A metadata dictionary as returned by :py:meth:`get_dm3_metadata`
934 Returns
935 -------
936 mdict : dict
937 The metadata dictionary with all the "EDS-specific" metadata
938 added as sub-node under the ``nx_meta`` root level dictionary
939 """
940 pre_path = get_pre_path(mdict)
942 # Spectrum imaging .dm3 tags of interest:
943 base = [*pre_path, "SI"]
945 for m_in, m_out in [
946 (["Acquisition", "Pixel time (s)"], ["Pixel time (s)"]),
947 (["Acquisition", "SI Application Mode", "Name"], ["Scan Mode"]),
948 (
949 ["Acquisition", "Spatial Sampling", "Height (pixels)"],
950 ["Spatial Sampling (Vertical)"],
951 ),
952 (
953 ["Acquisition", "Spatial Sampling", "Width (pixels)"],
954 ["Spatial Sampling (Horizontal)"],
955 ),
956 (
957 ["Acquisition", "Scan Options", "Sub-pixel sampling"],
958 ["Sub-pixel Sampling Factor"],
959 ),
960 ]:
961 val = try_getting_dict_value(mdict, base + m_in)
962 # only add the value to this list if we found it, and it's not
963 # one of the "facility-wide" set values that do not have any meaning:
964 if val is not None:
965 # Convert to Pint Quantity if the field has units
966 output_key = m_out[0] if len(m_out) == 1 else m_out
967 if output_key == "Pixel time (s)":
968 with contextlib.suppress(ValueError, TypeError):
969 val = ureg.Quantity(val, "second")
970 output_key = ["Pixel time"]
971 # add last value of each parameter to the "Spectrum Imaging" sub-tree
972 key_list = [output_key] if isinstance(output_key, str) else output_key
973 set_nested_dict_value(
974 mdict, ["nx_meta", "Spectrum Imaging", *key_list], val
975 )
977 # Check spatial drift correction separately:
978 drift_per_val = try_getting_dict_value(
979 mdict,
980 [*base, "Acquisition", "Artefact Correction", "Spatial Drift", "Periodicity"],
981 )
982 drift_unit_val = try_getting_dict_value(
983 mdict,
984 [*base, "Acquisition", "Artefact Correction", "Spatial Drift", "Units"],
985 )
986 if drift_per_val is not None and drift_unit_val is not None:
987 val_to_set = f"Spatial drift correction every {drift_per_val} {drift_unit_val}"
988 # make sure statement looks gramatically correct
989 if drift_per_val == 1:
990 val_to_set = val_to_set.replace("(s)", "")
991 else:
992 val_to_set = val_to_set.replace("(s)", "s")
993 # fix for "seconds(s)" (*********...)
994 if val_to_set[-2:] == "ss":
995 val_to_set = val_to_set[:-1]
996 set_nested_dict_value(
997 mdict,
998 ["nx_meta", "Spectrum Imaging", "Artefact Correction"],
999 val_to_set,
1000 )
1002 start_val = try_getting_dict_value(mdict, [*base, "Acquisition", "Start time"])
1003 end_val = try_getting_dict_value(mdict, [*base, "Acquisition", "End time"])
1004 if start_val is not None and end_val is not None:
1005 start_dt = dt.strptime(start_val, "%I:%M:%S %p").replace(tzinfo=UTC)
1006 end_dt = dt.strptime(end_val, "%I:%M:%S %p").replace(tzinfo=UTC)
1007 duration = (end_dt - start_dt).seconds # Calculate acquisition duration
1008 with contextlib.suppress(ValueError, TypeError):
1009 duration = ureg.Quantity(duration, "second")
1010 set_nested_dict_value(
1011 mdict,
1012 ["nx_meta", "Spectrum Imaging", "Acquisition Duration"],
1013 duration,
1014 )
1016 # Set the dataset type to SpectrumImage if it is already a Spectrum ( otherwise it's
1017 # just a STEM image) and any Spectrum Imaging tags were added
1018 if (
1019 "Spectrum Imaging" in mdict["nx_meta"]
1020 and mdict["nx_meta"]["DatasetType"] == "Spectrum"
1021 ):
1022 _logger.info(
1023 "Detected file as SpectrumImage type based on "
1024 "presence of spectral metadata and spectrum imaging "
1025 "info",
1026 )
1027 mdict["nx_meta"]["DatasetType"] = "SpectrumImage"
1028 mdict["nx_meta"]["Data Type"] = "Spectrum_Imaging"
1029 if "EELS" in mdict["nx_meta"]:
1030 mdict["nx_meta"]["Data Type"] = "EELS_Spectrum_Imaging"
1031 if "EDS" in mdict["nx_meta"]:
1032 mdict["nx_meta"]["Data Type"] = "EDS_Spectrum_Imaging"
1034 return mdict
1037def _parse_stage_position(tecnai_info):
1038 """
1039 Parse stage position from Tecnai metadata.
1041 Parameters
1042 ----------
1043 tecnai_info : list
1044 Split metadata strings
1046 Returns
1047 -------
1048 dict
1049 Dictionary with stage position x, y, z, theta, phi values
1050 """
1051 tmp = _find_val("Stage ", tecnai_info).split(",")
1052 tmp = [_try_decimal(t.strip(" umdeg")) for t in tmp]
1053 return {
1054 "Stage_Position_x": tmp[0],
1055 "Stage_Position_y": tmp[1],
1056 "Stage_Position_z": tmp[2],
1057 "Stage_Position_theta": tmp[3],
1058 "Stage_Position_phi": tmp[4],
1059 }
1062def _parse_apertures(tecnai_info):
1063 """
1064 Parse aperture settings from Tecnai metadata.
1066 Parameters
1067 ----------
1068 tecnai_info : list
1069 Split metadata strings
1071 Returns
1072 -------
1073 dict
1074 Dictionary with C1, C2, Obj, and SA aperture values
1075 """
1077 def _read_aperture(val, tecnai_info_):
1078 """Test if aperture has value or is retracted."""
1079 try:
1080 value = _find_val(val, tecnai_info_).strip(" um")
1081 return int(value)
1082 except (ValueError, AttributeError):
1083 return None
1085 return {
1086 "C1_Aperture": _read_aperture("C1 Aperture: ", tecnai_info),
1087 "C2_Aperture": _read_aperture("C2 Aperture: ", tecnai_info),
1088 "Obj_Aperture": _read_aperture("OBJ Aperture: ", tecnai_info),
1089 "SA_Aperture": _read_aperture("SA Aperture: ", tecnai_info),
1090 }
1093def process_tecnai_microscope_info(
1094 microscope_info,
1095 delimiter="\u2028",
1096):
1097 """
1098 Process the Microscope_Info metadata string into a dictionary of key-value pairs.
1100 This method is only relevant for FEI Titan TEMs that write additional metadata into
1101 a unicode-delimited string at a certain place in the DM3 tag structure
1103 Parameters
1104 ----------
1105 microscope_info : str
1106 The string of data obtained from the Tecnai.Microscope_Info leaf of the metadata
1107 delimiter : str
1108 The value (a unicode string) used to split the ``microscope_info`` string.
1110 Returns
1111 -------
1112 info_dict : dict
1113 The information contained in the string, in a more easily-digestible form.
1114 """
1115 info_dict = {}
1116 tecnai_info = microscope_info.split(delimiter)
1117 info_dict["Microscope_Name"] = _find_val("Microscope ", tecnai_info) # String
1118 info_dict["User"] = _find_val("User ", tecnai_info) # String
1120 tmp = _find_val("Gun ", tecnai_info)
1121 info_dict["Gun_Name"] = tmp[: tmp.index(" Extr volt")]
1122 tmp = tmp[tmp.index(info_dict["Gun_Name"]) + len(info_dict["Gun_Name"]) :] # String
1124 tmp = tmp.replace("Extr volt ", "")
1125 info_dict["Extractor_Voltage"] = int(tmp.split()[0]) # Integer (volts)
1127 tmp = tmp[tmp.index("Gun Lens ") + len("Gun Lens ") :]
1128 info_dict["Gun_Lens_No"] = int(tmp.split()[0]) # Integer
1130 tmp = tmp[tmp.index("Emission ") + len("Emission ") :]
1131 info_dict["Emission_Current"] = _try_decimal(tmp.split("uA")[0]) # Decimal (microA)
1133 tmp = _find_val("Mode ", tecnai_info)
1134 info_dict["Mode"] = tmp[: tmp.index(" Defocus")] # String
1135 # 'Mode' should be five terms long, and the last term is either 'Image',
1136 # 'Diffraction', (or maybe something else)
1138 # Decimal val (micrometer)
1139 if "Magn " in tmp: # Imaging mode
1140 info_dict["Defocus"] = _try_decimal(tmp.split("Defocus (um) ")[1].split()[0])
1141 elif "CL " in tmp: # Diffraction mode
1142 info_dict["Defocus"] = _try_decimal(tmp.split("Defocus ")[1].split()[0])
1144 # This value changes based on whether in image or diffraction mode (mag or CL)
1145 # Integer
1146 if info_dict["Mode"].split()[4] == "Image":
1147 info_dict["Magnification"] = int(tmp.split("Magn ")[1].strip("x"))
1148 # Decimal
1149 elif info_dict["Mode"].split()[4] == "Diffraction":
1150 info_dict["Camera_Length"] = _try_decimal(tmp.split("CL ")[1].strip("m"))
1152 # Integer (1 to 5)
1153 info_dict["Spot"] = int(_find_val("Spot ", tecnai_info))
1155 # Decimals - Lens strengths expressed as a "%" value
1156 info_dict["C2_Strength"] = _try_decimal(_find_val("C2 ", tecnai_info).strip("%"))
1157 info_dict["C3_Strength"] = _try_decimal(_find_val("C3 ", tecnai_info).strip("%"))
1158 info_dict["Obj_Strength"] = _try_decimal(_find_val("Obj ", tecnai_info).strip("%"))
1159 info_dict["Dif_Strength"] = _try_decimal(_find_val("Dif ", tecnai_info).strip("%"))
1161 # Decimal values (micrometers)
1162 tmp = _find_val("Image shift ", tecnai_info).strip("um")
1163 info_dict["Image_Shift_x"] = _try_decimal(tmp.split("/")[0])
1164 info_dict["Image_Shift_y"] = _try_decimal(tmp.split("/")[1])
1166 # Parse stage position and apertures using helper functions
1167 info_dict.update(_parse_stage_position(tecnai_info))
1168 info_dict.update(_parse_apertures(tecnai_info))
1170 # Nested dictionary
1171 info_dict = _parse_filter_settings(info_dict, tecnai_info)
1173 return _parse_filter_settings(info_dict, tecnai_info)