Coverage for nexusLIMS/extractors/plugins/orion_HIM_tif.py: 100%
198 statements
« prev ^ index » next coverage.py v7.11.3, created at 2026-03-24 05:23 +0000
« prev ^ index » next coverage.py v7.11.3, created at 2026-03-24 05:23 +0000
1# ruff: noqa: S314, N817, FBT003
2"""Zeiss Orion/Fibics TIFF extractor plugin."""
4import logging
5import xml.etree.ElementTree as ET
6from decimal import Decimal
7from pathlib import Path
8from typing import Any, ClassVar
10from PIL import Image
12from nexusLIMS.extractors.base import ExtractionContext
13from nexusLIMS.extractors.base import FieldDefinition as FD
14from nexusLIMS.extractors.utils import _set_instr_name_and_time, add_to_extensions
15from nexusLIMS.schemas import em_glossary
16from nexusLIMS.schemas.units import ureg
17from nexusLIMS.utils.dicts import set_nested_dict_value, sort_dict
19ZEISS_TIFF_TAG = 65000
20"""
21TIFF tag ID where Zeiss Orion stores XML metadata in TIFF files.
22The tag contains serialized XML with an <ImageTags> root element
23that holds instrument configuration, beam parameters, stage position,
24detector settings, and other acquisition metadata.
25"""
27FIBICS_TIFF_TAG = 51023
28"""
29TIFF tag ID where Fibics helium ion microscope stores XML metadata in TIFF files.
30The tag contains serialized XML with a <Fibics> root element that holds
31application info, image data, scan parameters, stage position, beam info,
32and detector settings.
33"""
35_logger = logging.getLogger(__name__)
38class OrionTiffExtractor:
39 """
40 Extractor for Zeiss Orion and Fibics helium ion microscope TIFF files.
42 This extractor handles metadata extraction from .tif files saved by
43 Zeiss Orion and Fibics helium ion microscopes (HIM). These files contain
44 embedded XML metadata in custom TIFF tags:
45 - Zeiss: TIFF tag 65000 with <ImageTags> XML
46 - Fibics: TIFF tag 51023 with <Fibics> XML
47 """
49 name = "orion_HIM_tif_extractor"
50 priority = 150 # Higher than QuantaTiffExtractor (100) to handle Orion TIFFs first
51 supported_extensions: ClassVar = {
52 "tif",
53 "tiff",
54 } # Uses content sniffing in supports() to detect variant
56 def supports(self, context: ExtractionContext) -> bool:
57 """
58 Check if this extractor supports the given file.
60 Uses content sniffing to detect Zeiss/Fibics TIFF files by checking
61 for the presence of custom TIFF tags containing XML metadata.
63 Parameters
64 ----------
65 context
66 The extraction context containing file information
68 Returns
69 -------
70 bool
71 True if file is a Zeiss Orion or Fibics TIFF file
72 """
73 # File must exist to check TIFF tags
74 if not context.file_path.exists():
75 _logger.warning("File does not exist: %s", context.file_path)
76 return False
78 try:
79 with Image.open(context.file_path) as img:
80 variant = self._detect_variant(img)
81 return variant is not None
82 except Exception as e:
83 _logger.warning("Error checking TIFF tags for %s: %s", context.file_path, e)
84 return False
86 def extract(self, context: ExtractionContext) -> list[dict[str, Any]]:
87 """
88 Extract metadata from a Zeiss Orion or Fibics TIFF file.
90 Parameters
91 ----------
92 context
93 The extraction context containing file information
95 Returns
96 -------
97 list[dict]
98 List containing a single metadata dict with 'nx_meta' key
99 """
100 filename = context.file_path
101 _logger.debug("Extracting metadata from Zeiss/Fibics TIFF file: %s", filename)
103 mdict = {"nx_meta": {}}
104 mdict["nx_meta"]["DatasetType"] = "Image"
105 mdict["nx_meta"]["Data Type"] = "HIM_Imaging"
106 try:
107 _set_instr_name_and_time(mdict, filename)
108 with Image.open(filename) as img:
109 # Detect which variant we have
110 variant = self._detect_variant(img)
112 if variant == "zeiss":
113 xml_data = img.tag_v2[ZEISS_TIFF_TAG]
114 root = ET.fromstring(xml_data)
115 mdict = self._extract_zeiss_metadata(root, img, filename, mdict)
116 elif variant == "fibics":
117 xml_data = img.tag_v2[FIBICS_TIFF_TAG]
118 root = ET.fromstring(xml_data)
119 mdict = self._extract_fibics_metadata(root, img, filename, mdict)
120 else:
121 _logger.warning(
122 "Could not detect Zeiss/Fibics variant for %s", filename
123 )
124 mdict["nx_meta"]["Data Type"] = "Unknown"
125 mdict["nx_meta"]["Extractor Warnings"] = (
126 "Could not detect Zeiss/Fibics variant"
127 )
129 except Exception as e:
130 _logger.exception("Error extracting metadata from %s", filename)
131 mdict["nx_meta"]["Data Type"] = "Unknown"
132 mdict["nx_meta"]["Extractor Warnings"] = f"Extraction failed: {e}"
134 # Migrate metadata to schema-compliant format
135 mdict = self._migrate_to_schema_compliant_metadata(mdict)
137 # Sort the nx_meta dictionary for nicer display
138 mdict["nx_meta"] = sort_dict(mdict["nx_meta"])
140 return [mdict]
142 def _detect_variant(self, img: Image.Image) -> str | None:
143 """
144 Detect whether this is a Zeiss or Fibics TIFF file.
146 Parameters
147 ----------
148 img
149 PIL Image object
151 Returns
152 -------
153 str | None
154 "zeiss", "fibics", or None if neither detected
155 """
156 if ZEISS_TIFF_TAG in img.tag_v2:
157 xml_data = img.tag_v2[ZEISS_TIFF_TAG]
158 try:
159 root = ET.fromstring(xml_data)
160 if root.tag == "ImageTags" or "ImageTags" in root.tag:
161 return "zeiss"
162 except ET.ParseError as e:
163 _logger.warning("Failed to parse Zeiss XML from TIFF tag: %s", e)
165 if FIBICS_TIFF_TAG in img.tag_v2:
166 xml_data = img.tag_v2[FIBICS_TIFF_TAG]
167 try:
168 root = ET.fromstring(xml_data)
169 if root.tag == "Fibics" or "Fibics" in root.tag:
170 return "fibics"
171 except ET.ParseError as e:
172 _logger.warning("Failed to parse Fibics XML from TIFF tag: %s", e)
174 return None
176 def _extract_zeiss_metadata(
177 self,
178 root: ET.Element,
179 img: Image.Image,
180 filename: Path, # noqa: ARG002
181 mdict: dict,
182 ) -> dict:
183 """
184 Extract metadata from Zeiss Orion XML format.
186 Parameters
187 ----------
188 root
189 XML root element
190 img
191 PIL Image object
192 filename
193 Path to the file
194 mdict
195 Metadata dictionary to update
197 Returns
198 -------
199 dict
200 Updated metadata dictionary
201 """
202 # Parse Zeiss XML structure
203 # <ImageTags> contains nested sections with Value/Units pairs
205 # Set image dimensions
206 width, height = img.size
207 set_nested_dict_value(
208 mdict, ["nx_meta", "Data Dimensions"], str((width, height))
209 )
211 # Define metadata fields using FieldDefinition
212 # Note: XML stores values in Volts, we convert to target units
213 fields = [
214 # GFIS
215 FD(
216 "",
217 "GFIS.AccelerationVoltage",
218 ["GFIS", "Acceleration Voltage"],
219 1e-3,
220 False,
221 target_unit="kilovolt",
222 ),
223 FD(
224 "",
225 "GFIS.ExtractionVoltage",
226 ["GFIS", "Extraction Voltage"],
227 1e-3,
228 False,
229 target_unit="kilovolt",
230 ),
231 FD(
232 "",
233 "GFIS.CondenserVoltage",
234 ["GFIS", "Condenser Voltage"],
235 1e-3,
236 False,
237 target_unit="kilovolt",
238 ),
239 FD(
240 "",
241 "GFIS.ObjectiveVoltage",
242 ["GFIS", "Objective Voltage"],
243 1e-3,
244 False,
245 target_unit="kilovolt",
246 ),
247 FD(
248 "",
249 "GFIS.BeamCurrent",
250 ["GFIS", "Beam Current"],
251 1,
252 False,
253 target_unit="picoampere",
254 ),
255 FD("", "GFIS.PanX", ["GFIS", "Pan X"], 1, False, target_unit="micrometer"),
256 FD("", "GFIS.PanY", ["GFIS", "Pan Y"], 1, False, target_unit="micrometer"),
257 FD(
258 "",
259 "GFIS.FieldOfView",
260 ["GFIS", "Horizontal Field Width"],
261 1,
262 False,
263 target_unit="micrometer",
264 ),
265 FD(
266 "",
267 "GFIS.ScanRotation",
268 ["GFIS", "Scan Rotation"],
269 1,
270 False,
271 target_unit="degree",
272 ),
273 FD(
274 "", "GFIS.StigmationX", ["GFIS", "Stigmation X"], 1, False
275 ), # Dimensionless
276 FD(
277 "", "GFIS.StigmationY", ["GFIS", "Stigmation Y"], 1, False
278 ), # Dimensionless
279 FD(
280 "",
281 "GFIS.ApertureSize",
282 ["GFIS", "Aperture Size"],
283 1,
284 False,
285 target_unit="micrometer",
286 ),
287 FD(
288 "", "GFIS.ApertureIndex", ["GFIS", "Aperture Index"], 1, False
289 ), # Dimensionless
290 FD("", "GFIS.IonGas", ["GFIS", "Ion Gas"], 1, False), # String
291 FD(
292 "",
293 "GFIS.CrossoverPosition",
294 ["GFIS", "Crossover Position"],
295 1,
296 False,
297 target_unit="millimeter",
298 ),
299 FD(
300 "",
301 "GFIS.WorkingDistance",
302 ["GFIS", "Working Distance"],
303 1,
304 False,
305 target_unit="millimeter",
306 ),
307 # Beam
308 FD(
309 "",
310 "AccelerationVoltage",
311 ["acceleration_voltage"],
312 1e-3,
313 False,
314 target_unit="kilovolt",
315 ),
316 FD(
317 "",
318 "ExtractionVoltage",
319 ["Beam", "Extraction Voltage"],
320 1e-3,
321 False,
322 target_unit="kilovolt",
323 ),
324 FD(
325 "",
326 "BlankerCurrent",
327 ["Beam", "Blanker Current"],
328 1,
329 False,
330 target_unit="picoampere",
331 ),
332 FD(
333 "",
334 "SampleCurrent",
335 ["Beam", "Sample Current"],
336 1,
337 False,
338 target_unit="picoampere",
339 ),
340 FD("", "SpotNumber", ["Beam", "Spot Number"], 1, False), # Dimensionless
341 FD(
342 "",
343 "WorkingDistance",
344 ["Beam", "Working Distance"],
345 1,
346 False,
347 target_unit="millimeter",
348 ),
349 FD(
350 "",
351 "Fov",
352 ["horizontal_field_width"],
353 1,
354 False,
355 target_unit="micrometer",
356 ),
357 FD("", "PanX", ["Beam", "Pan X"], 1, False, target_unit="micrometer"),
358 FD("", "PanY", ["Beam", "Pan Y"], 1, False, target_unit="micrometer"),
359 FD(
360 "", "StigmationX", ["Beam", "Stigmator X Value"], 1, False
361 ), # Dimensionless
362 FD(
363 "", "StigmationY", ["Beam", "Stigmator Y Value"], 1, False
364 ), # Dimensionless
365 FD(
366 "", "ApertureSize", ["Beam", "Aperture Size"], 1, False
367 ), # Dimensionless (or unknown unit)
368 FD(
369 "",
370 "CrossOverPosition",
371 ["Beam", "Crossover Position"],
372 1,
373 False,
374 target_unit="millimeter",
375 ),
376 # Scan
377 FD(
378 "",
379 "FrameRetrace",
380 ["Scan", "Frame Retrace"],
381 1,
382 False,
383 target_unit="microsecond",
384 ),
385 FD(
386 "",
387 "LineRetrace",
388 ["Scan", "Line Retrace"],
389 1,
390 False,
391 target_unit="microsecond",
392 ),
393 FD("", "AveragingMode", ["Scan", "Averaging Mode"], 1, False), # String
394 FD(
395 "", "NumAverages", ["Scan", "Number of Averages"], 1, False
396 ), # Dimensionless
397 FD("", "ScanRotate", ["scan_rotation"], 1, False, target_unit="degree"),
398 FD(
399 "",
400 "DwellTime",
401 ["Scan", "Dwell Time"],
402 1,
403 False,
404 target_unit="microsecond",
405 ),
406 FD("", "SAS.ScanSize", ["Scan", "Scan Size"], 1, False), # Dimensionless
407 # Stage
408 FD(
409 "",
410 "StageX",
411 ["Stage Position", "X"],
412 1,
413 False,
414 target_unit="micrometer",
415 ),
416 FD(
417 "",
418 "StageY",
419 ["Stage Position", "Y"],
420 1,
421 False,
422 target_unit="micrometer",
423 ),
424 FD(
425 "",
426 "StageZ",
427 ["Stage Position", "Z"],
428 1,
429 False,
430 target_unit="millimeter",
431 ),
432 FD(
433 "",
434 "StageTilt",
435 ["Stage Position", "Tilt"],
436 1,
437 False,
438 target_unit="degree",
439 ),
440 FD(
441 "",
442 "StageRotate",
443 ["Stage Position", "Rotation"],
444 1,
445 False,
446 target_unit="degree",
447 ),
448 FD(
449 "",
450 "Stage.XLocation",
451 ["Stage Position", "X Location"],
452 1,
453 False,
454 target_unit="micrometer",
455 ),
456 FD(
457 "",
458 "Stage.YLocation",
459 ["Stage Position", "Y Location"],
460 1,
461 False,
462 target_unit="micrometer",
463 ),
464 # Optics
465 FD(
466 "",
467 "sFimFOV",
468 ["Optics", "sFIM Field of View"],
469 1,
470 False,
471 target_unit="micrometer",
472 ),
473 FD(
474 "",
475 "McXShift",
476 ["Optics", "MC X Shift"],
477 1,
478 False,
479 target_unit="microradian",
480 ),
481 FD(
482 "",
483 "McXTilt",
484 ["Optics", "MC X Tilt"],
485 1,
486 False,
487 target_unit="microradian",
488 ),
489 FD(
490 "",
491 "McYShift",
492 ["Optics", "MC Y Shift"],
493 1,
494 False,
495 target_unit="microradian",
496 ),
497 FD(
498 "",
499 "McYTilt",
500 ["Optics", "MC Y Tilt"],
501 1,
502 False,
503 target_unit="microradian",
504 ),
505 FD(
506 "", "ColumnMag", ["Optics", "Column Magnification"], 1, False
507 ), # Dimensionless
508 FD("", "ColumnMode", ["Optics", "Column Mode"], 1, False), # String
509 FD(
510 "",
511 "Lens1Voltage",
512 ["Optics", "Lens 1 Voltage"],
513 1e-3,
514 False,
515 target_unit="kilovolt",
516 ),
517 FD(
518 "",
519 "Lens2Voltage",
520 ["Optics", "Lens 2 Voltage"],
521 1e-3,
522 False,
523 target_unit="kilovolt",
524 ),
525 # Detector
526 FD("", "DetectorName", ["Detector", "Name"], 1, False), # String
527 FD(
528 "",
529 "ETGridVoltage",
530 ["Detector", "ET Grid Voltage"],
531 1,
532 False,
533 target_unit="volt",
534 ),
535 FD(
536 "", "ETContrast", ["Detector", "ET Contrast"], 1, False
537 ), # Dimensionless
538 FD(
539 "", "ETBrightness", ["Detector", "ET Brightness"], 1, False
540 ), # Dimensionless
541 FD(
542 "", "ETImageIntensity", ["Detector", "ET Image Intensity"], 1, False
543 ), # Dimensionless
544 FD(
545 "", "MCPContrast", ["Detector", "MCP Contrast"], 1, False
546 ), # Dimensionless
547 FD(
548 "", "MCPBrightness", ["Detector", "MCP Brightness"], 1, False
549 ), # Dimensionless
550 FD("", "MCPBias", ["Detector", "MCP Bias"], 1, False, target_unit="volt"),
551 FD(
552 "", "MCPImageIntensity", ["Detector", "MCP Image Intensity"], 1, False
553 ), # Dimensionless
554 FD(
555 "",
556 "Detector.Scintillator",
557 ["Detector", "Scintillator"],
558 1e-3,
559 False,
560 target_unit="kilovolt",
561 ),
562 FD(
563 "",
564 "SampleBiasVoltage",
565 ["Detector", "Sample Bias"],
566 1,
567 False,
568 target_unit="volt",
569 ),
570 # System
571 FD(
572 "",
573 "GunPressure",
574 ["System", "Gun Pressure"],
575 1,
576 False,
577 target_unit="torr",
578 ),
579 FD(
580 "",
581 "ColumnPressure",
582 ["System", "Column Pressure"],
583 1,
584 False,
585 target_unit="torr",
586 ),
587 FD(
588 "",
589 "ChamberPressure",
590 ["System", "Chamber Pressure"],
591 1,
592 False,
593 target_unit="torr",
594 ),
595 FD(
596 "",
597 "GunTemp",
598 ["System", "Gun Temperature"],
599 1,
600 False,
601 target_unit="kelvin",
602 ),
603 FD(
604 "",
605 "HeliumPressure",
606 ["System", "Helium Pressure"],
607 1,
608 False,
609 target_unit="torr",
610 ),
611 FD(
612 "", "Magnification4x5", ["Optics", "Magnification 4x5"], 1, False
613 ), # Dimensionless
614 FD(
615 "",
616 "MagnificationDisplay",
617 ["Optics", "Magnification Display"],
618 1,
619 False,
620 ), # Dimensionless (x)
621 FD("", "System.Model", ["System", "Model"], 1, False), # String
622 FD("", "System.Name", ["System", "Name"], 1, False), # String
623 FD(
624 "", "TimeStamp", ["System", "Acquisition Date/Time"], 1, False
625 ), # String
626 FD("", "ColumnType", ["System", "Column Type"], 1, False), # String
627 # Flood gun
628 FD("", "FloodGunMode", ["Flood Gun", "Mode"], 1, False), # String
629 FD(
630 "",
631 "FloodGunEnergy",
632 ["Flood Gun", "Energy"],
633 1,
634 False,
635 target_unit="electron_volt",
636 ),
637 FD(
638 "",
639 "FloodGunTime",
640 ["Flood Gun", "Time"],
641 1,
642 False,
643 target_unit="microsecond",
644 ),
645 FD(
646 "", "FloodGun.DeflectionX", ["Flood Gun", "Deflection X"], 1, False
647 ), # Dimensionless
648 FD(
649 "", "FloodGun.DeflectionY", ["Flood Gun", "Deflection Y"], 1, False
650 ), # Dimensionless
651 # Misc
652 FD(
653 "",
654 "ScalingX",
655 ["Calibration", "X Scale"],
656 1,
657 False,
658 target_unit="meter",
659 ),
660 FD(
661 "",
662 "ScalingY",
663 ["Calibration", "Y Scale"],
664 1,
665 False,
666 target_unit="meter",
667 ),
668 FD(
669 "", "ImageWidth", ["Image", "Width"], 1, False
670 ), # Dimensionless (pixels)
671 FD(
672 "", "ImageHeight", ["Image", "Height"], 1, False
673 ), # Dimensionless (pixels)
674 # Display
675 FD("", "LutMode", ["Display", "LUT Mode"], 1, False), # String
676 FD("", "LowGray", ["Display", "Low Gray Value"], 1, False), # Dimensionless
677 FD(
678 "", "HighGray", ["Display", "High Gray Value"], 1, False
679 ), # Dimensionless
680 FD("", "LUT.LUTGamma", ["Display", "LUT Gamma"], 1, False), # Dimensionless
681 ]
683 # Extract all fields
684 for field in fields:
685 self._parse_zeiss_field(
686 root,
687 field.source_key,
688 field.output_key,
689 mdict,
690 field.factor,
691 field.target_unit,
692 )
694 return mdict
696 def _extract_fibics_metadata(
697 self,
698 root: ET.Element,
699 img: Image.Image,
700 filename: Path, # noqa: ARG002
701 mdict: dict,
702 ) -> dict:
703 """
704 Extract metadata from Fibics XML format.
706 Parameters
707 ----------
708 root
709 XML root element
710 img
711 PIL Image object
712 filename
713 Path to the file
714 mdict
715 Metadata dictionary to update
717 Returns
718 -------
719 dict
720 Updated metadata dictionary
721 """
722 # Set image dimensions
723 width, height = img.size
724 set_nested_dict_value(
725 mdict, ["nx_meta", "Data Dimensions"], str((width, height))
726 )
728 # Define Fibics metadata fields using FD
729 # Note: factor=-1 is a sentinel value for "strip_units" conversion
730 fibics_fields = [
731 # Application section
732 FD(
733 "Application", "Version", ["Application", "Software Version"], 1, False
734 ), # String
735 FD(
736 "Application",
737 "Date",
738 ["Application", "Acquisition Date/Time"],
739 1,
740 False,
741 ), # String
742 FD(
743 "Application",
744 "SupportsTransparency",
745 ["Application", "Supports Transparency"],
746 1,
747 False,
748 ), # String
749 FD(
750 "Application",
751 "TransparentPixelValue",
752 ["Application", "Transparent Pixel Value"],
753 1,
754 False,
755 ), # Dimensionless
756 # Image section
757 FD(
758 "Image", "Width", ["Image", "Width"], 1, False
759 ), # Dimensionless (pixels)
760 FD(
761 "Image", "Height", ["Image", "Height"], 1, False
762 ), # Dimensionless (pixels)
763 FD(
764 "Image", "BoundingBox.Left", ["Image", "Bounding Box Left"], 1, False
765 ), # Dimensionless
766 FD(
767 "Image", "BoundingBox.Right", ["Image", "Bounding Box Right"], 1, False
768 ), # Dimensionless
769 FD(
770 "Image", "BoundingBox.Top", ["Image", "Bounding Box Top"], 1, False
771 ), # Dimensionless
772 FD(
773 "Image",
774 "BoundingBox.Bottom",
775 ["Image", "Bounding Box Bottom"],
776 1,
777 False,
778 ), # Dimensionless
779 FD("Image", "Machine", ["Image", "Machine Name"], 1, False), # String
780 FD("Image", "Beam", ["Image", "Beam Type"], 1, False), # String
781 FD(
782 "Image", "Aperture", ["Image", "Aperture Description"], 1, False
783 ), # String
784 FD("Image", "Detector", ["Detector", "Name"], 1, False), # String
785 FD(
786 "Image", "Contrast", ["Detector", "Contrast"], 1, False
787 ), # Dimensionless
788 FD(
789 "Image", "Brightness", ["Detector", "Brightness"], 1, False
790 ), # Dimensionless
791 # Scan section
792 FD(
793 "Scan",
794 "Dwell",
795 ["dwell_time"],
796 1e-3,
797 False,
798 target_unit="microsecond",
799 ), # Convert ns to μs
800 FD(
801 "Scan", "LineAvg", ["Scan", "Line Averaging"], 1, False
802 ), # Dimensionless
803 FD(
804 "Scan",
805 "FOV_X",
806 ["horizontal_field_width"],
807 1,
808 False,
809 target_unit="micrometer",
810 ),
811 FD(
812 "Scan",
813 "FOV_Y",
814 ["vertical_field_width"],
815 1,
816 False,
817 target_unit="micrometer",
818 ),
819 FD(
820 "Scan",
821 "ScanRot",
822 ["scan_rotation"],
823 1,
824 False,
825 target_unit="degree",
826 ),
827 FD("Scan", "Ux", ["Scan", "Affine Ux"], 1, False), # Dimensionless
828 FD("Scan", "Uy", ["Scan", "Affine Uy"], 1, False), # Dimensionless
829 FD("Scan", "Vx", ["Scan", "Affine Vx"], 1, False), # Dimensionless
830 FD("Scan", "Vy", ["Scan", "Affine Vy"], 1, False), # Dimensionless
831 FD("Scan", "Focus", ["Scan", "Focus Value"], 1, False), # Dimensionless
832 FD(
833 "Scan", "StigX", ["Scan", "Stigmator X Value"], 1, False
834 ), # Dimensionless
835 FD(
836 "Scan", "StigY", ["Scan", "Stigmator Y Value"], 1, False
837 ), # Dimensionless
838 # Stage section
839 FD(
840 "Stage",
841 "X",
842 ["Stage Position", "X"],
843 1,
844 False,
845 target_unit="micrometer",
846 ),
847 FD(
848 "Stage",
849 "Y",
850 ["Stage Position", "Y"],
851 1,
852 False,
853 target_unit="micrometer",
854 ),
855 FD(
856 "Stage",
857 "Z",
858 ["Stage Position", "Z"],
859 1,
860 False,
861 target_unit="micrometer",
862 ),
863 FD(
864 "Stage",
865 "Tilt",
866 ["Stage Position", "Tilt"],
867 1,
868 False,
869 target_unit="degree",
870 ),
871 FD(
872 "Stage",
873 "Rot",
874 ["Stage Position", "Rotation"],
875 1,
876 False,
877 target_unit="degree",
878 ),
879 FD(
880 "Stage",
881 "M",
882 ["Stage Position", "M"],
883 1,
884 False,
885 target_unit="millimeter",
886 ),
887 # BeamInfo section
888 FD(
889 "BeamInfo",
890 "BeamI",
891 ["beam_current"],
892 1,
893 False,
894 target_unit="picoampere",
895 ),
896 FD(
897 "BeamInfo",
898 "AccV",
899 ["acceleration_voltage"],
900 1e-3,
901 False,
902 target_unit="kilovolt",
903 ),
904 FD("BeamInfo", "Aperture", ["Beam", "Aperture"], 1, False), # Dimensionless
905 FD("BeamInfo", "GFISGas", ["Beam", "GFIS Gas Type"], 1, False), # String
906 FD(
907 "BeamInfo", "GunGasPressure", ["Beam", "Gun Gas Pressure"], 1, False
908 ), # Dimensionless (or unknown unit)
909 FD(
910 "BeamInfo", "SpotControl", ["Beam", "Spot Control"], 1, False
911 ), # Dimensionless
912 # DetectorInfo section - using -1 as sentinel for "strip_units"
913 FD(
914 "DetectorInfo",
915 "Collector",
916 ["Detector", "Collector Voltage"],
917 -1,
918 False,
919 target_unit="volt",
920 ),
921 FD(
922 "DetectorInfo",
923 "Stage Bias",
924 ["Detector", "Stage Bias Voltage"],
925 -1,
926 False,
927 target_unit="volt",
928 ),
929 ]
931 # Extract fields from each section
932 for field in fibics_fields:
933 section = self._find_fibics_section(root, field.section)
934 if section is not None:
935 # Use -1 as sentinel for "strip_units" conversion
936 conversion_factor = (
937 "strip_units" if field.factor == -1 else field.factor
938 )
939 value = self._parse_fibics_value(
940 section, field.source_key, conversion_factor, field.target_unit
941 )
942 if value is not None:
943 set_nested_dict_value(
944 mdict,
945 ["nx_meta", field.output_key]
946 if isinstance(field.output_key, str)
947 else ["nx_meta", *field.output_key],
948 value,
949 )
951 return mdict
953 def _parse_zeiss_field( # noqa: PLR0913
954 self,
955 root: ET.Element,
956 field_path: str,
957 output_key: str | list,
958 mdict: dict,
959 conversion_factor: float = 1.0,
960 unit: str | None = None,
961 ) -> None:
962 """
963 Parse a field from Zeiss XML and set it in the metadata dictionary.
965 Parameters
966 ----------
967 root
968 XML root element
969 field_path
970 Path to the field. Can be a simple tag name (e.g., "AccelerationVoltage"),
971 a tag name with dots (e.g., "GFIS.AccelerationVoltage"), or a nested path
972 (e.g., "System.Name"). First tries to find as a direct tag name, then falls
973 back to nested navigation.
974 output_key
975 Key path in nx_meta (e.g., "Voltage" or ["Stage Position", "X"])
976 mdict
977 Metadata dictionary to update
978 conversion_factor
979 Factor to multiply the value by for unit conversion
980 unit
981 Unit name for Pint Quantity. If None, stores as numeric or string value.
982 """
983 try:
984 # First try to find as a direct tag
985 # (handles dotted names like "GFIS.AccelerationVoltage")
986 current = root.find(field_path)
988 # If not found as direct tag, try nested path navigation
989 if current is None:
990 parts = field_path.split(".")
991 current = root
992 for part in parts:
993 found = False
994 for child in current:
995 if child.tag == part:
996 current = child
997 found = True
998 break
999 if not found:
1000 return
1002 # Get value and units
1003 value = current.find("Value")
1004 # if we want to eventually handle units, this is how we extract them
1005 # units = current.find("Units") # noqa: ERA001
1007 if value is not None and value.text:
1008 try:
1009 numeric_value = Decimal(value.text) * Decimal(
1010 str(conversion_factor)
1011 )
1013 # Create Pint Quantity if unit is specified
1014 if unit is not None:
1015 final_value = ureg.Quantity(numeric_value, unit)
1016 else:
1017 final_value = float(numeric_value)
1019 set_nested_dict_value(
1020 mdict,
1021 ["nx_meta", output_key]
1022 if isinstance(output_key, str)
1023 else ["nx_meta", *output_key],
1024 final_value,
1025 )
1026 except (ValueError, TypeError, Exception):
1027 # If conversion fails, store as string
1028 set_nested_dict_value(
1029 mdict,
1030 ["nx_meta", output_key]
1031 if isinstance(output_key, str)
1032 else ["nx_meta", *output_key],
1033 value.text,
1034 )
1035 except Exception as e:
1036 # Log parsing errors for individual fields
1037 _logger.debug(
1038 "Error parsing Zeiss field %s: %s", field_path, e, exc_info=True
1039 )
1041 def _find_fibics_section(
1042 self, root: ET.Element, section_name: str
1043 ) -> ET.Element | None:
1044 """
1045 Find a section in Fibics XML.
1047 Parameters
1048 ----------
1049 root
1050 XML root element
1051 section_name
1052 Name of section to find (e.g., "BeamInfo", "Scan")
1054 Returns
1055 -------
1056 ET.Element | None
1057 Section element if found, None otherwise
1058 """
1059 try:
1060 for child in root:
1061 if child.tag == section_name:
1062 return child
1063 except Exception:
1064 return None
1065 return None
1067 def _parse_fibics_value( # noqa: PLR0911
1068 self,
1069 section: ET.Element,
1070 field_name: str,
1071 conversion_factor: float | str = 1.0,
1072 unit: str | None = None,
1073 ) -> float | str | None:
1074 """
1075 Parse a value from a Fibics XML section.
1077 Parameters
1078 ----------
1079 section
1080 XML section element
1081 field_name
1082 Name of field to parse. First tries to find an element with this tag name.
1083 If not found, searches for an "item" element with a "name" attribute
1084 matching field_name.
1085 conversion_factor
1086 Factor to multiply the value by for unit conversion, or "strip_units" to
1087 remove unit suffixes (e.g., "=500.0 V" becomes 500.0)
1088 unit
1089 Unit name for Pint Quantity. If None, returns numeric or string value.
1091 Returns
1092 -------
1093 Quantity | float | str | None
1094 Parsed value (as Quantity if unit specified), or None if not found
1095 or parsing failed
1096 """
1097 try:
1098 # First try to find field as direct element
1099 field = section.find(field_name)
1101 # If not found, try to find an "item" element with matching "name" attribute
1102 if field is None:
1103 for item in section.findall("item"):
1104 if item.get("name") == field_name:
1105 field = item
1106 break
1108 if field is not None and field.text:
1109 text = field.text.strip()
1111 # Special handling for stripping unit suffixes
1112 # (e.g., "=500.0 V" -> "500.0")
1113 if conversion_factor == "strip_units":
1114 # Remove leading symbols like "=" and trailing units like " V"
1115 text = text.lstrip("=").strip()
1116 # Try to extract numeric value before unit suffix
1117 parts = text.split()
1118 if parts:
1119 text = parts[0]
1120 try:
1121 numeric_value = Decimal(text)
1122 # Create Pint Quantity if unit is specified
1123 if unit is not None:
1124 return ureg.Quantity(numeric_value, unit)
1125 return float(numeric_value)
1126 except (ValueError, Exception):
1127 # If conversion fails, return the raw string value
1128 return text
1130 try:
1131 numeric_value = Decimal(text) * Decimal(str(conversion_factor)) # type: ignore[operator]
1132 # Create Pint Quantity if unit is specified
1133 if unit is not None:
1134 return ureg.Quantity(numeric_value, unit)
1135 return float(numeric_value)
1136 except (ValueError, Exception):
1137 # If conversion fails, return the raw string value
1138 return text
1139 except Exception:
1140 return None
1141 return None
1143 def _migrate_to_schema_compliant_metadata(self, mdict: dict) -> dict:
1144 """
1145 Migrate metadata to schema-compliant format.
1147 Reorganizes metadata to conform to type-specific Pydantic schemas:
1148 - Extracts core EM Glossary fields to top level with standardized names
1149 - Moves vendor-specific nested dictionaries to extensions section
1150 - Preserves existing extensions from instrument profiles
1152 Parameters
1153 ----------
1154 mdict
1155 Metadata dictionary with nx_meta containing extracted fields
1157 Returns
1158 -------
1159 dict
1160 Metadata dictionary with schema-compliant nx_meta structure
1161 """
1162 nx_meta = mdict.get("nx_meta", {})
1164 # Preserve existing extensions from instrument profiles
1165 extensions = (
1166 nx_meta.get("extensions", {}).copy() if "extensions" in nx_meta else {}
1167 )
1169 # Field mappings from display names to EM Glossary names
1170 field_mappings = {
1171 "Acceleration Voltage": "acceleration_voltage",
1172 "Working Distance": "working_distance",
1173 "Beam Current": "beam_current",
1174 "Emission Current": "emission_current",
1175 "Dwell Time": "dwell_time",
1176 "Field of View": "horizontal_field_width",
1177 "Pixel Width": "pixel_width",
1178 "Pixel Height": "pixel_height",
1179 }
1181 # Get all EM Glossary field names from the metadata schema
1182 # These should remain at top level (not moved to extensions)
1183 emg_field_names = set(em_glossary.get_all_mapped_fields())
1185 # Zeiss/Fibics-specific vendor sections that ALWAYS go to extensions
1186 extension_top_level_keys = {
1187 "Beam",
1188 "GFIS",
1189 "Detector",
1190 "Stage Position",
1191 "Image",
1192 "Display",
1193 "Flood Gun",
1194 "Calibration",
1195 "System",
1196 "Application",
1197 "Sample",
1198 "Scan",
1199 "ScanSettings",
1200 "Optics",
1201 "Zeiss",
1202 "Fibics",
1203 }
1205 # Build new nx_meta with proper field organization
1206 new_nx_meta = {}
1208 # Copy required fields
1209 for field in ["DatasetType", "Data Type", "Creation Time"]:
1210 if field in nx_meta:
1211 new_nx_meta[field] = nx_meta[field]
1213 # Copy instrument identification
1214 if "Instrument ID" in nx_meta:
1215 new_nx_meta["Instrument ID"] = nx_meta["Instrument ID"]
1217 # Process all fields and categorize
1218 for old_name, value in nx_meta.items():
1219 # Skip fields we've already handled
1220 if old_name in [
1221 "DatasetType",
1222 "Data Type",
1223 "Creation Time",
1224 "Instrument ID",
1225 "Extractor Warnings",
1226 "warnings",
1227 "extensions",
1228 ]:
1229 continue
1231 # Top-level vendor sections go to extensions
1232 if old_name in extension_top_level_keys:
1233 extensions[old_name] = value
1234 continue
1236 # Check if this is a core field that needs renaming
1237 if old_name in field_mappings:
1238 emg_name = field_mappings[old_name]
1239 new_nx_meta[emg_name] = value
1240 continue
1242 # Keep EM Glossary fields at top level (already using correct names)
1243 if old_name in emg_field_names:
1244 new_nx_meta[old_name] = value
1245 continue
1247 # Everything else goes to extensions (vendor-specific by default)
1248 # This is safer than the top level where schema validation will reject
1249 extensions[old_name] = value
1251 # Copy warnings if present
1252 if "warnings" in nx_meta:
1253 new_nx_meta["warnings"] = nx_meta["warnings"]
1255 # Copy Extractor Warnings if present
1256 # (will be moved to NexusLIMS Extraction by add_extraction_details)
1257 if "Extractor Warnings" in nx_meta:
1258 new_nx_meta["Extractor Warnings"] = nx_meta["Extractor Warnings"]
1260 # Add extensions section if we have any
1261 for key, value in extensions.items():
1262 add_to_extensions(new_nx_meta, key, value)
1264 mdict["nx_meta"] = new_nx_meta
1265 return mdict