Coverage for nexusLIMS/extractors/plugins/quanta_tif.py: 100%
352 statements
« prev ^ index » next coverage.py v7.11.3, created at 2026-03-24 05:23 +0000
« prev ^ index » next coverage.py v7.11.3, created at 2026-03-24 05:23 +0000
1# ruff: noqa: N817, FBT001, FBT003
2"""FEI/Thermo Fisher TIFF extractor plugin."""
4import configparser
5import contextlib
6import io
7import logging
8import re
9from decimal import Decimal, InvalidOperation
10from math import degrees
11from pathlib import Path
12from typing import Any, ClassVar, Tuple
14from lxml import etree
15from PIL import Image
17from nexusLIMS.extractors.base import ExtractionContext, FieldDefinition
18from nexusLIMS.extractors.base import FieldDefinition as FD
19from nexusLIMS.extractors.utils import _set_instr_name_and_time, add_to_extensions
20from nexusLIMS.instruments import get_instr_from_filepath
21from nexusLIMS.schemas.units import ureg
22from nexusLIMS.utils.dicts import (
23 set_nested_dict_value,
24 sort_dict,
25 try_getting_dict_value,
26)
28FEI_TIFF_TAG = 34682
29"""
30TIFF tag ID where FEI/Thermo stores metadata in TIFF files.
31The tag contains INI-style metadata with sections like [User], [Beam], [Image], etc.
32"""
34FEI_XML_TIFF_TAG = 34683
35"""
36TIFF tag ID where FEI/Thermo stores XML metadata in TIFF files (if present).
37This tag contains supplementary XML metadata that may be embedded after
38the standard INI metadata.
39"""
41_logger = logging.getLogger(__name__)
44class QuantaTiffExtractor:
45 """
46 Extractor for FEI/Thermo Fisher TIFF files.
48 This extractor handles metadata extraction from .tif files saved by
49 FEI/Thermo Fisher FIBs and SEMs (e.g., Quanta, Helios, etc.). The extractor
50 performs content sniffing to verify the file contains FEI metadata before
51 attempting extraction.
52 """
54 name = "quanta_tif_extractor"
55 priority = 100
56 supported_extensions: ClassVar = {"tif", "tiff"}
58 def supports(self, context: ExtractionContext) -> bool:
59 """
60 Check if this extractor supports the given file.
62 Performs content sniffing to verify this is a FEI/Thermo TIFF file by:
63 1. Checking for the FEI-specific TIFF tag (34682) containing [User] or [Beam]
64 2. Falling back to binary content sniffing for files with FEI metadata markers
66 Parameters
67 ----------
68 context
69 The extraction context containing file information
71 Returns
72 -------
73 bool
74 True if this appears to be a FEI/Thermo TIFF file with metadata
75 """
76 extension = context.file_path.suffix.lower().lstrip(".")
77 if extension not in {"tif", "tiff"}:
78 return False
80 # Strategy 1: Check for FEI metadata signature using TIFF tag 34682
81 try:
82 with Image.open(context.file_path) as img:
83 # Check for FEI custom tag
84 fei_metadata = img.tag_v2.get(FEI_TIFF_TAG)
85 if fei_metadata is not None:
86 # Verify the metadata starts with FEI-style markers
87 metadata_str = str(fei_metadata)
88 if "[User]" in metadata_str or "[Beam]" in metadata_str:
89 return True
90 except Exception as e:
91 _logger.debug(
92 "Could not read TIFF tags from %s: %s",
93 context.file_path,
94 e,
95 )
97 # Strategy 2: Fallback to binary content sniffing for files that may not be
98 # proper TIFF files or use different metadata storage
99 try:
100 with context.file_path.open(mode="rb") as f:
101 content = f.read(5000) # Read first 5KB to check for metadata markers
102 except Exception as e:
103 _logger.debug(
104 "Could not read binary content from %s: %s",
105 context.file_path,
106 e,
107 )
108 return False
109 else:
110 # Check for FEI metadata markers in file
111 return b"[User]" in content or b"[Beam]" in content
113 def extract(self, context: ExtractionContext) -> list[dict[str, Any]]:
114 """
115 Extract metadata from a FEI/Thermo TIFF file.
117 Returns the metadata (as a list of dictionaries) from a .tif file saved
118 by the FEI Quanta SEM or related instruments. Specific tags of interest are
119 extracted and placed under the root-level ``nx_meta`` node.
121 Parameters
122 ----------
123 context
124 The extraction context containing file information
126 Returns
127 -------
128 list[dict]
129 List containing a single metadata dict with 'nx_meta' key
130 """
131 filename = context.file_path
132 _logger.debug("Extracting metadata from FEI TIFF file: %s", filename)
134 mdict = {"nx_meta": {}}
135 # assume all datasets coming from Quanta are Images, currently
136 mdict["nx_meta"]["DatasetType"] = "Image"
137 mdict["nx_meta"]["Data Type"] = "SEM_Imaging"
139 _set_instr_name_and_time(mdict, filename)
141 try:
142 # Extract metadata from TIFF tags/binary
143 metadata_str, xml_metadata = self._extract_metadata_from_tiff_tag(filename)
145 if not metadata_str:
146 _logger.warning(
147 "Did not find expected FEI tags in .tif file: %s", filename
148 )
149 mdict["nx_meta"]["Data Type"] = "Unknown"
150 mdict["nx_meta"]["Extractor Warnings"] = (
151 "Did not find expected FEI tags. Could not read metadata"
152 )
153 mdict["nx_meta"] = sort_dict(mdict["nx_meta"])
154 return [mdict]
156 # Handle XML metadata if present
157 if xml_metadata:
158 mdict["FEI_XML_Metadata"] = xml_metadata
160 # Fix duplicate section headers (MultiGIS issue)
161 metadata_str = self._fix_duplicate_multigis_metadata_tags(metadata_str)
163 # Parse INI format metadata
164 mdict.update(self._parse_metadata_string(metadata_str))
166 # Extract important fields to nx_meta
167 mdict = self._parse_nx_meta(mdict)
169 # Migrate metadata to schema-compliant format
170 mdict = self._migrate_to_schema_compliant_metadata(mdict)
172 except Exception as e:
173 _logger.exception("Error extracting metadata from %s", filename)
174 mdict["nx_meta"]["Data Type"] = "Unknown"
175 mdict["nx_meta"]["Extractor Warnings"] = f"Extraction failed: {e}"
177 # sort the nx_meta dictionary (recursively) for nicer display
178 mdict["nx_meta"] = sort_dict(mdict["nx_meta"])
180 return [mdict]
182 def _extract_metadata_from_tiff_tag(self, tiff_path: Path) -> Tuple[str, dict]:
183 """
184 Extract metadata string from FEI TIFF tags 34682 and 34683.
186 Extracts standard INI metadata from tag 34682 and XML metadata from tag 34683
187 if present. Falls back to binary content sniffing if TIFF tags are not present.
189 Parameters
190 ----------
191 tiff_path
192 Path to the TIFF file
194 Returns
195 -------
196 metadata_str
197 Metadata string (INI format), or empty string if not found
198 xml_metadata
199 Dictionary of XML metadata if tag 34683 is present, else empty dict
200 """
201 metadata_str = ""
202 xml_metadata = {}
204 # Strategy 1: Try to extract from TIFF tags 34682 and 34683
205 try:
206 with Image.open(tiff_path) as img:
207 # Extract standard metadata from tag 34682
208 fei_metadata = img.tag_v2.get(FEI_TIFF_TAG)
209 if fei_metadata is not None:
210 # Convert tag to string
211 metadata_str_val = (
212 fei_metadata
213 if isinstance(fei_metadata, str)
214 else str(fei_metadata)
215 )
216 metadata_str = self._extract_metadata_string(
217 metadata_str_val.encode()
218 )
220 # Extract XML metadata from tag 34683 if present
221 xml_metadata_tag = img.tag_v2.get(FEI_XML_TIFF_TAG)
222 if xml_metadata_tag is not None:
223 xml_metadata_str = (
224 xml_metadata_tag
225 if isinstance(xml_metadata_tag, str)
226 else str(xml_metadata_tag)
227 )
228 # Check if this is XML
229 if "<?xml" in xml_metadata_str:
230 try:
231 root = etree.fromstring(xml_metadata_str)
232 xml_metadata = self._xml_el_to_dict(root)
233 except Exception as e:
234 _logger.debug(
235 "Failed to parse XML from TIFF tag 34683: %s", e
236 )
237 except Exception as e:
238 _logger.debug("Failed to extract FEI metadata from TIFF tags: %s", e)
240 # If we got metadata from TIFF tags, return it
241 if metadata_str:
242 return metadata_str, xml_metadata
244 # Strategy 2: Fallback to binary content extraction for files where
245 # metadata might not be in a standard TIFF tag
246 try:
247 with tiff_path.open(mode="rb") as f:
248 content = f.read()
249 user_idx = content.find(b"[User]")
250 if user_idx != -1:
251 # Extract metadata string from binary
252 metadata_str_raw = self._extract_metadata_string(content[user_idx:])
253 # Check for XML in the binary content
254 metadata_str_clean, xml_meta = self._detect_and_process_xml_metadata(
255 metadata_str_raw
256 )
257 return metadata_str_clean, xml_meta
258 except Exception as e:
259 _logger.debug("Failed to extract FEI metadata from binary content: %s", e)
261 return "", {}
263 def _extract_metadata_string(self, metadata_bytes: bytes) -> str:
264 """
265 Extract metadata string from binary data.
267 Removes null bytes and normalizes line endings from the binary
268 metadata extracted from the TIFF file.
270 Parameters
271 ----------
272 metadata_bytes
273 Raw binary metadata from the TIFF file
275 Returns
276 -------
277 str
278 Cleaned metadata string
279 """
280 # remove any null bytes since they break the extractor
281 metadata_bytes = metadata_bytes.replace(b"\x00", b"")
282 metadata_str = metadata_bytes.decode(errors="ignore")
283 # normalize line endings
284 return metadata_str.replace("\r\n", "\n").replace("\r", "\n")
286 def _detect_and_process_xml_metadata(
287 self,
288 metadata_str: str,
289 ) -> Tuple[str, dict]:
290 """
291 Find and (if necessary) parse XML metadata in a Thermo Fisher FIB/SEM TIF file.
293 Some Thermo Fisher FIB/SEM files have additional metadata embedded as XML
294 at the end of the TIF file, which cannot be handled by the ConfigParser.
295 This method will detect, parse, and remove the XML from the metadata if present.
297 Parameters
298 ----------
299 metadata_str
300 The metadata at the end of the TIF file as a string. May or may not include
301 an XML section (this depends on the version of the Thermo software that
302 saved the image).
304 Returns
305 -------
306 metadata_str
307 The originally provided metadata as a string, but with the XML portion
308 removed if it was present
310 xml_metadata
311 A dictionary containing the metadata that was present in the XML portion.
312 Will be an empty dictionary if there was no XML.
313 """
314 xml_regex = re.compile(r'<\?xml version=".+"\?>')
315 regex_match = xml_regex.search(metadata_str)
316 if regex_match:
317 # there is an xml declaration in the metadata of this file, so parse it:
318 xml_str = metadata_str[regex_match.span()[0] :]
319 metadata_str = metadata_str[: regex_match.span()[0]]
320 root = etree.fromstring(xml_str)
321 return metadata_str, self._xml_el_to_dict(root)
323 return metadata_str, {}
325 @staticmethod
326 def _xml_el_to_dict(node: etree.ElementBase) -> dict:
327 """
328 Convert an lxml.etree node tree into a dict.
330 This is used to transform the XML metadata section into a dictionary
331 representation so it can be stored alongside the other metadata.
333 Taken from https://stackoverflow.com/a/66103841/1435788
335 Parameters
336 ----------
337 node
338 XML element to convert
340 Returns
341 -------
342 dict
343 Dictionary representation of the XML element
344 """
345 result = {}
347 for element in node.iterchildren():
348 # Remove namespace prefix
349 key = element.tag.split("}")[1] if "}" in element.tag else element.tag
351 # Process element as tree element if the inner XML contains
352 # non-whitespace content
353 if element.text and element.text.strip():
354 value = element.text
355 else:
356 value = QuantaTiffExtractor._xml_el_to_dict(element)
357 if key in result:
358 if isinstance(result[key], list):
359 result[key].append(value) # pragma: no cover
360 else:
361 tempvalue = result[key].copy()
362 result[key] = [tempvalue, value]
363 else:
364 result[key] = value
365 return result
367 @staticmethod
368 def _fix_duplicate_multigis_metadata_tags(metadata_str: str) -> str:
369 """
370 Rename the metadata section headers to allow parsing by ConfigParser.
372 Some instruments have metadata section titles like so:
374 [MultiGIS]
375 [MultiGISUnit1]
376 [MultiGISGas1]
377 [MultiGISGas2]
378 [MultiGISGas3]
379 [MultiGISUnit2]
380 [MultiGISGas1]
381 ...
383 Which causes errors because ConfigParser raises a DuplicateSectionError.
384 This method renames them to:
386 [MultiGIS]
387 [MultiGISUnit1]
388 [MultiGISUnit1.MultiGISGas1]
389 [MultiGISUnit1.MultiGISGas2]
390 [MultiGISUnit1.MultiGISGas3]
391 [MultiGISUnit2]
392 [MultiGISUnit2.MultiGISGas1]
393 ...
395 Parameters
396 ----------
397 metadata_str
398 Metadata string potentially with duplicate section headers
400 Returns
401 -------
402 str
403 Metadata string with unique section headers
404 """
405 metadata_to_return = ""
406 multi_gis_section_numbers = re.findall(r"\[MultiGISUnit(\d+)\]", metadata_str)
407 if multi_gis_section_numbers:
408 multi_gis_unit_indices = [
409 metadata_str.index(f"[MultiGISUnit{num}]")
410 for num in multi_gis_section_numbers
411 ]
412 metadata_to_return += metadata_str[: multi_gis_unit_indices[0]]
413 for i, num in enumerate(multi_gis_section_numbers):
414 if i < len(multi_gis_unit_indices) - 1:
415 to_process = metadata_str[
416 multi_gis_unit_indices[i] : multi_gis_unit_indices[i + 1]
417 ]
418 else:
419 to_process = metadata_str[multi_gis_unit_indices[i] :]
420 multi_gis_gas_tags = re.findall(r"\[(MultiGISGas\d+)\]", to_process)
421 for tag in multi_gis_gas_tags:
422 to_process = to_process.replace(tag, f"MultiGISUnit{num}.{tag}")
423 metadata_to_return += to_process
424 else:
425 metadata_to_return = metadata_str
427 return metadata_to_return
429 @staticmethod
430 def _parse_metadata_string(hdr_string: str) -> dict[str, dict[str, str]]:
431 """
432 Parse metadata from a string in INI format.
434 Parameters
435 ----------
436 hdr_string
437 Metadata as a string in INI format
439 Returns
440 -------
441 dict
442 Dictionary with section names as keys and key-value dicts as values
443 """
444 config = configparser.RawConfigParser()
445 # Make ConfigParser respect upper/lowercase values
446 config.optionxform = lambda option: option
448 buf = io.StringIO(hdr_string)
449 config.read_file(buf)
451 metadata = {}
452 for section in config.sections():
453 metadata[section] = dict(config.items(section))
455 return metadata
457 def _build_field_definitions(self, mdict: dict) -> list[FieldDefinition]:
458 """Build field definitions for metadata extraction.
460 Parameters
461 ----------
462 mdict
463 Metadata dictionary with raw extracted metadata
465 Returns
466 -------
467 list[FieldDefinition]
468 List of field definitions for extraction
469 """
470 beam_name = try_getting_dict_value(mdict, ["Beam", "Beam"])
471 det_name = try_getting_dict_value(mdict, ["Detectors", "Name"])
472 scan_name = try_getting_dict_value(mdict, ["Beam", "Scan"])
474 fields = []
476 # Beam section fields
477 if beam_name is not None:
478 fields.extend(
479 [
480 FD(
481 beam_name,
482 "EmissionCurrent",
483 "Emission Current",
484 1.0,
485 False,
486 target_unit="ampere",
487 ),
488 FD(
489 beam_name,
490 "HFW",
491 "Horizontal Field Width",
492 1.0,
493 False,
494 target_unit="meter",
495 ),
496 FD(beam_name, "HV", "Voltage", 1.0, False, target_unit="volt"),
497 FD(beam_name, "SourceTiltX", "Beam Tilt X", 1.0, False),
498 FD(beam_name, "SourceTiltY", "Beam Tilt Y", 1.0, False),
499 FD(beam_name, "StageR", ["Stage Position", "R"], 1.0, False),
500 FD(beam_name, "StageTa", ["Stage Position", "α"], 1.0, False), # noqa: RUF001
501 FD(beam_name, "StageX", ["Stage Position", "X"], 1.0, False),
502 FD(beam_name, "StageY", ["Stage Position", "Y"], 1.0, False),
503 FD(beam_name, "StageZ", ["Stage Position", "Z"], 1.0, False),
504 FD(
505 beam_name,
506 "StageTb",
507 ["Stage Position", "β"],
508 1.0,
509 False,
510 suppress_zero=False,
511 ),
512 FD(beam_name, "StigmatorX", "Stigmator X Value", 1.0, False),
513 FD(beam_name, "StigmatorY", "Stigmator Y Value", 1.0, False),
514 FD(
515 beam_name,
516 "VFW",
517 "Vertical Field Width",
518 1.0,
519 False,
520 target_unit="meter",
521 ),
522 FD(
523 beam_name,
524 "WD",
525 "Working Distance",
526 1.0,
527 False,
528 target_unit="meter",
529 ),
530 FD(
531 beam_name,
532 "EucWD",
533 "Eucentric WD",
534 1.0,
535 False,
536 target_unit="meter",
537 ),
538 FD(beam_name, "ImageMode", "Image Mode", 1.0, True),
539 FD(
540 beam_name,
541 "BeamShiftX",
542 "Beam Shift X",
543 1.0,
544 False,
545 ),
546 FD(
547 beam_name,
548 "BeamShiftY",
549 "Beam Shift Y",
550 1.0,
551 False,
552 ),
553 FD(beam_name, "BeamMode", "Beam Mode", 1.0, True),
554 FD(beam_name, "PreTilt", "Pre-Tilt", 1.0, False),
555 ]
556 )
558 # Scan section fields
559 if scan_name is not None:
560 fields.extend(
561 [
562 FD(
563 scan_name,
564 "Dwell",
565 "Pixel Dwell Time",
566 1.0,
567 False,
568 target_unit="second",
569 ),
570 FD(
571 scan_name,
572 "FrameTime",
573 "Total Frame Time",
574 1.0,
575 False,
576 target_unit="second",
577 ),
578 FD(
579 scan_name,
580 "HorFieldsize",
581 "Horizontal Field Width",
582 1.0,
583 False,
584 target_unit="meter",
585 ),
586 FD(
587 scan_name,
588 "VerFieldsize",
589 "Vertical Field Width",
590 1.0,
591 False,
592 target_unit="meter",
593 ),
594 FD(
595 scan_name,
596 "PixelHeight",
597 "Pixel Width",
598 1.0,
599 False,
600 target_unit="meter",
601 ),
602 FD(
603 scan_name,
604 "PixelWidth",
605 "Pixel Height",
606 1.0,
607 False,
608 target_unit="meter",
609 ),
610 FD(
611 scan_name,
612 "LineTime",
613 "Line Time",
614 1.0,
615 False,
616 target_unit="second",
617 ),
618 FD(
619 scan_name,
620 "LineIntegration",
621 "Line Integration",
622 1.0,
623 False,
624 ),
625 FD(
626 scan_name,
627 "ScanInterlacing",
628 "Scan Interlacing",
629 1.0,
630 False,
631 ),
632 ]
633 )
635 # Detector section fields
636 if det_name is not None:
637 fields.extend(
638 [
639 FD(
640 det_name,
641 "Brightness",
642 "Detector Brightness Setting",
643 1.0,
644 False,
645 ),
646 FD(det_name, "Contrast", "Detector Contrast Setting", 1.0, False),
647 FD(
648 det_name,
649 "EnhancedContrast",
650 "Detector Enhanced Contrast Setting",
651 1.0,
652 False,
653 ),
654 FD(det_name, "Signal", "Detector Signal", 1.0, False),
655 FD(
656 det_name,
657 "Grid",
658 "Detector Grid Voltage",
659 1.0,
660 False,
661 target_unit="volt",
662 ),
663 FD(
664 det_name, "BrightnessDB", "Detector Brightness (DB)", 1.0, False
665 ),
666 FD(det_name, "ContrastDB", "Detector Contrast (DB)", 1.0, False),
667 FD(
668 det_name,
669 "Mix",
670 "Detector Mix (%)",
671 1.0,
672 False,
673 ),
674 FD(
675 det_name,
676 "MinimumDwellTime",
677 "Minimum Dwell Time",
678 1.0,
679 False,
680 target_unit="second",
681 ),
682 ]
683 )
685 # System section fields
686 fields.extend(
687 [
688 FD("System", "Chamber", "Chamber ID", 1.0, True),
689 FD("System", "Pump", "Vacuum Pump", 1.0, True),
690 FD("System", "SystemType", "System Type", 1.0, True),
691 FD("System", "Stage", "Stage Description", 1.0, True),
692 FD("System", "Dnumber", "Device Number", 1.0, True),
693 FD("System", "Source", "Electron Source", 1.0, True),
694 FD("System", "FinalLens", "Final Lens", 1.0, True),
695 FD("System", "ESEM", "ESEM Setting", 1.0, True),
696 FD("System", "Aperture", "Aperture Type", 1.0, True),
697 ]
698 )
700 # Other fields
701 fields.extend(
702 [
703 FD("Beam", "Spot", "Spot Size", 1.0, False),
704 FD(
705 "Specimen",
706 "Temperature",
707 "Specimen Temperature",
708 1.0,
709 False,
710 target_unit="kelvin",
711 ),
712 FD(
713 "Specimen",
714 "Humidity",
715 "Specimen Humidity",
716 1.0,
717 False,
718 target_unit="percent",
719 ),
720 FD("User", "UserText", "User Text", 1.0, True),
721 FD("User", "Date", "Acquisition Date", 1.0, True),
722 FD("User", "Time", "Acquisition Time", 1.0, True),
723 FD("Vacuum", "UserMode", "Vacuum Mode", 1.0, True),
724 FD("Vacuum", "Gas", "Vacuum Gas", 1.0, False),
725 FD("Image", "MagnificationMode", "Magnification Mode", 1.0, False),
726 FD(
727 "Image",
728 "DigitalContrast",
729 "Digital Contrast",
730 1.0,
731 False,
732 ),
733 FD(
734 "Image",
735 "DigitalBrightness",
736 "Digital Brightness",
737 1.0,
738 False,
739 ),
740 FD(
741 "Image",
742 "DigitalGamma",
743 "Digital Gamma",
744 1.0,
745 False,
746 ),
747 FD(
748 "Image",
749 "ZoomFactor",
750 "Zoom Factor",
751 1.0,
752 False,
753 ),
754 FD("Image", "ZoomPanX", "Zoom Pan X", 1.0, False),
755 FD("Image", "ZoomPanY", "Zoom Pan Y", 1.0, False),
756 FD(
757 "Image",
758 "MagCanvasRealWidth",
759 "Magnification Canvas Real Width",
760 1.0,
761 False,
762 ),
763 FD(
764 "Image",
765 "ScreenMagCanvasRealWidth",
766 "Screen Magnification Canvas Real Width",
767 1.0,
768 False,
769 ),
770 FD(
771 "Image",
772 "ScreenMagnificationMode",
773 "Screen Magnification Mode",
774 1.0,
775 False,
776 ),
777 FD("Image", "Average", "Frame Average", 1.0, False),
778 FD("Image", "PostProcessing", "Post Processing", 1.0, False),
779 ]
780 )
782 # EScan Mainslock field
783 if scan_name is not None:
784 fields.append(FD(scan_name, "Mainslock", "Mainslock", 1.0, True))
786 return fields
788 def _process_standard_fields(
789 self, mdict: dict, fields: list[FieldDefinition], det_name: str
790 ) -> None:
791 """Process standard field definitions."""
792 for field in fields:
793 value = try_getting_dict_value(mdict, [field.section, field.source_key])
795 if value is not None and value != "":
796 # Skip detector "Setting" if numeric (duplicate of Grid voltage)
797 if field.section == det_name and field.source_key == "Setting":
798 try:
799 Decimal(value)
800 continue
801 except (ValueError, InvalidOperation):
802 pass
804 if field.is_string:
805 self._set_field_value(mdict, field.output_key, value)
806 else:
807 self._set_numeric_field_value(
808 mdict,
809 field.output_key,
810 value,
811 field.factor,
812 field.suppress_zero,
813 field.target_unit,
814 )
816 def _set_field_value(self, mdict: dict, output_key: str | list, value: str) -> None:
817 """Set a string field value in metadata."""
818 if isinstance(output_key, list):
819 set_nested_dict_value(mdict, ["nx_meta", *output_key], value)
820 else:
821 set_nested_dict_value(mdict, ["nx_meta", output_key], value)
823 def _set_numeric_field_value( # noqa: PLR0913
824 self,
825 mdict: dict,
826 output_key: str | list,
827 value: str,
828 factor: float,
829 suppress_zero: bool,
830 unit: str | None = None,
831 ) -> None:
832 """Set a numeric field value with unit conversion.
834 Parameters
835 ----------
836 mdict
837 Metadata dictionary
838 output_key
839 Output key or nested path
840 value
841 String value to convert
842 factor
843 Multiplicative conversion factor
844 suppress_zero
845 If True, skip if value equals zero
846 unit
847 Pint unit string (e.g., "kilovolt"). If provided, creates a Quantity.
848 """
849 try:
850 decimal_val = Decimal(value) * Decimal(str(factor))
851 if not suppress_zero or decimal_val != 0:
852 # Create Pint Quantity if unit is specified
853 if unit is not None:
854 quantity_val = ureg.Quantity(decimal_val, unit)
855 self._set_field_value(mdict, output_key, quantity_val)
856 else:
857 # Convert to float for non-quantity values
858 self._set_field_value(mdict, output_key, float(decimal_val))
859 except (ValueError, InvalidOperation, TypeError):
860 # TypeError can occur if value is None
861 if value is not None:
862 self._set_field_value(mdict, output_key, value)
864 def _parse_special_cases(self, mdict: dict, beam_name: str, det_name: str) -> None:
865 """Parse special case metadata fields."""
866 if beam_name is not None:
867 set_nested_dict_value(mdict, ["nx_meta", "Beam Name"], beam_name)
868 if det_name is not None:
869 set_nested_dict_value(mdict, ["nx_meta", "Detector Name"], det_name)
871 if beam_name is not None:
872 self._parse_scan_rotation(mdict, beam_name)
873 self._parse_tilt_correction(mdict, beam_name)
874 self._parse_beam_control_flags(mdict, beam_name)
875 self._parse_drift_correction(mdict)
876 self._parse_frame_integration(mdict)
877 self._parse_resolution(mdict)
878 self._parse_operator(mdict)
879 self._parse_chamber_pressure(mdict)
880 self._parse_software_version(mdict)
881 self._parse_column_type(mdict)
882 self._parse_scan_settings(mdict)
884 def _parse_scan_rotation(self, mdict: dict, beam_name: str) -> None:
885 """Parse scan rotation (radians → degrees)."""
886 scan_rot_val = try_getting_dict_value(mdict, [beam_name, "ScanRotation"])
887 if scan_rot_val is not None:
888 scan_rot_dec = Decimal(scan_rot_val)
889 digits = abs(scan_rot_dec.as_tuple().exponent)
890 scan_rot_degrees = round(degrees(scan_rot_dec), digits)
891 scan_rot_quantity = ureg.Quantity(scan_rot_degrees, "degree")
892 set_nested_dict_value(
893 mdict, ["nx_meta", "Scan Rotation"], scan_rot_quantity
894 )
896 def _parse_tilt_correction(self, mdict: dict, beam_name: str) -> None:
897 """Parse tilt correction (conditional on TiltCorrectionIsOn)."""
898 tilt_corr_on = try_getting_dict_value(mdict, [beam_name, "TiltCorrectionIsOn"])
899 if tilt_corr_on == "yes":
900 tilt_corr_val = try_getting_dict_value(
901 mdict, [beam_name, "TiltCorrectionAngle"]
902 )
903 if tilt_corr_val is not None:
904 set_nested_dict_value(
905 mdict,
906 ["nx_meta", "Tilt Correction Angle"],
907 float(Decimal(tilt_corr_val)),
908 )
910 def _parse_beam_control_flags(self, mdict: dict, beam_name: str) -> None:
911 """Parse beam control boolean flags."""
912 # Tilt correction on/off
913 tilt_corr_on = try_getting_dict_value(mdict, [beam_name, "TiltCorrectionIsOn"])
914 if tilt_corr_on is not None:
915 set_nested_dict_value(
916 mdict, ["nx_meta", "Tilt Correction Enabled"], tilt_corr_on == "yes"
917 )
919 # Dynamic focus on/off
920 dyn_focus = try_getting_dict_value(mdict, [beam_name, "DynamicFocusIsOn"])
921 if dyn_focus is not None:
922 set_nested_dict_value(
923 mdict, ["nx_meta", "Dynamic Focus Enabled"], dyn_focus == "yes"
924 )
926 # Dynamic WD on/off
927 dyn_wd = try_getting_dict_value(mdict, [beam_name, "DynamicWDIsOn"])
928 if dyn_wd is not None:
929 set_nested_dict_value(
930 mdict, ["nx_meta", "Dynamic WD Enabled"], dyn_wd == "yes"
931 )
933 def _parse_drift_correction(self, mdict: dict) -> None:
934 """Parse drift correction (boolean)."""
935 drift_val = try_getting_dict_value(mdict, ["Image", "DriftCorrected"])
936 if drift_val is not None:
937 set_nested_dict_value(
938 mdict, ["nx_meta", "Drift Correction Applied"], drift_val == "On"
939 )
941 def _parse_frame_integration(self, mdict: dict) -> None:
942 """Parse frame integration (only if > 1)."""
943 integrate_val = try_getting_dict_value(mdict, ["Image", "Integrate"])
944 if integrate_val is not None:
945 with contextlib.suppress(ValueError):
946 integrate_int = int(integrate_val)
947 if integrate_int > 1:
948 set_nested_dict_value(
949 mdict, ["nx_meta", "Frames Integrated"], integrate_int
950 )
952 def _parse_resolution(self, mdict: dict) -> None:
953 """Parse resolution (paired X/Y as tuple string)."""
954 x_val = try_getting_dict_value(mdict, ["Image", "ResolutionX"])
955 y_val = try_getting_dict_value(mdict, ["Image", "ResolutionY"])
956 if x_val is not None and y_val is not None:
957 with contextlib.suppress(ValueError):
958 x_int = int(x_val)
959 y_int = int(y_val)
960 set_nested_dict_value(
961 mdict, ["nx_meta", "Data Dimensions"], str((x_int, y_int))
962 )
964 def _parse_operator(self, mdict: dict) -> None:
965 """Parse operator (with warning)."""
966 user_val = try_getting_dict_value(mdict, ["User", "User"])
967 if user_val is not None:
968 set_nested_dict_value(mdict, ["nx_meta", "Operator"], user_val)
969 mdict["nx_meta"]["warnings"].append(["Operator"])
971 def _parse_chamber_pressure(self, mdict: dict) -> None:
972 """Parse chamber pressure (unit depends on vacuum mode)."""
973 ch_pres_val = try_getting_dict_value(mdict, ["Vacuum", "ChPressure"])
974 if ch_pres_val is not None and ch_pres_val != "":
975 try:
976 ch_pres_decimal = Decimal(ch_pres_val)
977 is_high_vacuum = (
978 try_getting_dict_value(mdict, ["nx_meta", "Vacuum Mode"])
979 == "High vacuum"
980 )
982 if is_high_vacuum:
983 # Value is in Pa, multiply by 1000 to get mPa
984 ch_pres_decimal_mpa = ch_pres_decimal * 10**3
985 ch_pres_quantity = ureg.Quantity(ch_pres_decimal_mpa, "millipascal")
986 else:
987 # Value is already in Pa
988 ch_pres_quantity = ureg.Quantity(ch_pres_decimal, "pascal")
990 set_nested_dict_value(
991 mdict,
992 ["nx_meta", "Chamber Pressure"],
993 ch_pres_quantity,
994 )
995 except (ValueError, InvalidOperation):
996 # If conversion fails, store as string without unit
997 set_nested_dict_value(
998 mdict, ["nx_meta", "Chamber Pressure"], ch_pres_val
999 )
1001 def _parse_software_version(self, mdict: dict) -> None:
1002 """Parse software version (aggregate Software + BuildNr)."""
1003 software_parts = []
1004 software_val = try_getting_dict_value(mdict, ["System", "Software"])
1005 if software_val is not None:
1006 software_parts.append(software_val)
1007 build_val = try_getting_dict_value(mdict, ["System", "BuildNr"])
1008 if build_val is not None:
1009 software_parts.append(f"(build {build_val})")
1010 if software_parts:
1011 set_nested_dict_value(
1012 mdict, ["nx_meta", "Software Version"], " ".join(software_parts)
1013 )
1015 def _parse_column_type(self, mdict: dict) -> None:
1016 """Parse column type (aggregate Column + Type)."""
1017 column_parts = []
1018 column_val = try_getting_dict_value(mdict, ["System", "Column"])
1019 if column_val is not None:
1020 column_parts.append(column_val)
1021 type_val = try_getting_dict_value(mdict, ["System", "Type"])
1022 if type_val is not None:
1023 column_parts.append(type_val)
1024 if column_parts:
1025 set_nested_dict_value(
1026 mdict, ["nx_meta", "Column Type"], " ".join(column_parts)
1027 )
1029 def _parse_scan_settings(self, mdict: dict) -> None:
1030 """Parse scan-related settings."""
1031 # Internal scan flag
1032 scan_name = try_getting_dict_value(mdict, ["Beam", "Scan"])
1033 if scan_name is not None:
1034 internal_scan = try_getting_dict_value(mdict, [scan_name, "InternalScan"])
1035 if internal_scan is not None:
1036 set_nested_dict_value(
1037 mdict, ["nx_meta", "Internal Scan"], internal_scan == "true"
1038 )
1040 def _parse_nx_meta(self, mdict: dict) -> dict:
1041 """
1042 Parse metadata into NexusLIMS format.
1044 Parse the "important" metadata that is saved at specific places within
1045 the Quanta tag structure into a consistent place in the metadata dictionary.
1047 The metadata contained in the XML section (if present) is not parsed, since it
1048 appears to only contain duplicates or slightly renamed metadata values compared
1049 to the typical config-style section.
1051 Parameters
1052 ----------
1053 mdict
1054 A metadata dictionary with raw extracted metadata
1056 Returns
1057 -------
1058 dict
1059 The same metadata dictionary with parsed values added under the
1060 root-level ``nx_meta`` key
1061 """
1062 if "warnings" not in mdict["nx_meta"]:
1063 mdict["nx_meta"]["warnings"] = []
1065 beam_name = try_getting_dict_value(mdict, ["Beam", "Beam"])
1066 det_name = try_getting_dict_value(mdict, ["Detectors", "Name"])
1068 fields = self._build_field_definitions(mdict)
1069 self._process_standard_fields(mdict, fields, det_name)
1070 self._parse_special_cases(mdict, beam_name, det_name)
1072 return mdict
1074 def _migrate_to_schema_compliant_metadata(self, mdict: dict) -> dict:
1075 """
1076 Migrate metadata to schema-compliant format.
1078 Reorganizes metadata to conform to type-specific Pydantic schemas:
1079 - Extracts core EM Glossary fields to top level with standardized names
1080 - Moves vendor-specific nested dictionaries to extensions section
1081 - Preserves existing extensions from instrument profiles
1083 Parameters
1084 ----------
1085 mdict
1086 Metadata dictionary with nx_meta containing extracted fields
1088 Returns
1089 -------
1090 dict
1091 Metadata dictionary with schema-compliant nx_meta structure
1092 """
1093 nx_meta = mdict.get("nx_meta", {})
1095 # Preserve existing extensions from instrument profiles
1096 extensions = (
1097 nx_meta.get("extensions", {}).copy() if "extensions" in nx_meta else {}
1098 )
1100 # Field mappings from display names to EM Glossary names
1101 field_mappings = {
1102 "Voltage": "acceleration_voltage",
1103 "Working Distance": "working_distance",
1104 "Emission Current": "emission_current",
1105 "Pixel Dwell Time": "dwell_time",
1106 "Horizontal Field Width": "horizontal_field_width",
1107 "Vertical Field Width": "vertical_field_width",
1108 "Pixel Width": "pixel_width",
1109 "Pixel Height": "pixel_height",
1110 }
1112 # Fields that ALWAYS go to extensions (vendor-specific nested dicts)
1113 extension_top_level_keys = {
1114 "Beam",
1115 "Scan",
1116 "Detector",
1117 "Stage Position",
1118 "Image",
1119 "Application",
1120 "Vacuum",
1121 "System",
1122 "User",
1123 "Detectors",
1124 "GIS",
1125 "Specimen",
1126 "PrivateFei",
1127 "FEI_XML_Metadata",
1128 "Optics",
1129 }
1131 # Also move these individual vendor fields to extensions
1132 extension_field_names = {
1133 "Detector Brightness Setting",
1134 "Detector Contrast Setting",
1135 "Detector Enhanced Contrast Setting",
1136 "Detector Signal",
1137 "Detector Grid Voltage",
1138 "Beam Tilt X",
1139 "Beam Tilt Y",
1140 "Stigmator X Value",
1141 "Stigmator Y Value",
1142 "Beam Shift X",
1143 "Beam Shift Y",
1144 "Beam Mode",
1145 "Image Mode",
1146 "Pre-Tilt",
1147 "Eucentric WD",
1148 "Total Frame Time",
1149 "Line Time",
1150 "Line Integration",
1151 "Scan Interlacing",
1152 }
1154 # Build new nx_meta with proper field organization
1155 new_nx_meta = {}
1157 # Copy required fields
1158 for field in ["DatasetType", "Data Type", "Creation Time"]:
1159 if field in nx_meta:
1160 new_nx_meta[field] = nx_meta[field]
1162 # Copy instrument identification
1163 if "Instrument ID" in nx_meta:
1164 new_nx_meta["Instrument ID"] = nx_meta["Instrument ID"]
1166 # Process all fields and categorize
1167 for old_name, value in nx_meta.items():
1168 # Skip fields we've already handled
1169 if old_name in [
1170 "DatasetType",
1171 "Data Type",
1172 "Creation Time",
1173 "Instrument ID",
1174 "Extractor Warnings",
1175 "warnings",
1176 "extensions",
1177 ]:
1178 continue
1180 # Top-level vendor sections go to extensions
1181 if old_name in extension_top_level_keys:
1182 extensions[old_name] = value
1183 continue
1185 # Check if this is a core field that needs renaming
1186 if old_name in field_mappings:
1187 emg_name = field_mappings[old_name]
1188 new_nx_meta[emg_name] = value
1189 continue
1191 # Vendor-specific individual fields go to extensions
1192 if old_name in extension_field_names:
1193 extensions[old_name] = value
1194 continue
1196 # Everything else goes to extensions (vendor-specific by default)
1197 # This is safer than at top level where schema validation will reject
1198 extensions[old_name] = value
1200 # Copy warnings if present
1201 if "warnings" in nx_meta:
1202 new_nx_meta["warnings"] = nx_meta["warnings"]
1204 # Add extensions section if we have any
1205 for key, value in extensions.items():
1206 add_to_extensions(new_nx_meta, key, value)
1208 mdict["nx_meta"] = new_nx_meta
1209 return mdict
1212# Backward compatibility function for tests
1213def get_quanta_metadata(filename):
1214 """
1215 Get metadata from a Quanta TIF file.
1217 .. deprecated::
1218 This function is deprecated. Use QuantaTiffExtractor class instead.
1220 Parameters
1221 ----------
1222 filename : pathlib.Path
1223 path to a file saved in the harvested directory of the instrument
1225 Returns
1226 -------
1227 mdict : dict
1228 A description of the file's metadata.
1229 """
1230 context = ExtractionContext(
1231 file_path=filename, instrument=get_instr_from_filepath(filename)
1232 )
1233 return QuantaTiffExtractor().extract(context)