Coverage for nexusLIMS/extractors/plugins/quanta_tif.py: 100%

352 statements  

« prev     ^ index     » next       coverage.py v7.11.3, created at 2026-03-24 05:23 +0000

1# ruff: noqa: N817, FBT001, FBT003 

2"""FEI/Thermo Fisher TIFF extractor plugin.""" 

3 

4import configparser 

5import contextlib 

6import io 

7import logging 

8import re 

9from decimal import Decimal, InvalidOperation 

10from math import degrees 

11from pathlib import Path 

12from typing import Any, ClassVar, Tuple 

13 

14from lxml import etree 

15from PIL import Image 

16 

17from nexusLIMS.extractors.base import ExtractionContext, FieldDefinition 

18from nexusLIMS.extractors.base import FieldDefinition as FD 

19from nexusLIMS.extractors.utils import _set_instr_name_and_time, add_to_extensions 

20from nexusLIMS.instruments import get_instr_from_filepath 

21from nexusLIMS.schemas.units import ureg 

22from nexusLIMS.utils.dicts import ( 

23 set_nested_dict_value, 

24 sort_dict, 

25 try_getting_dict_value, 

26) 

27 

28FEI_TIFF_TAG = 34682 

29""" 

30TIFF tag ID where FEI/Thermo stores metadata in TIFF files. 

31The tag contains INI-style metadata with sections like [User], [Beam], [Image], etc. 

32""" 

33 

34FEI_XML_TIFF_TAG = 34683 

35""" 

36TIFF tag ID where FEI/Thermo stores XML metadata in TIFF files (if present). 

37This tag contains supplementary XML metadata that may be embedded after 

38the standard INI metadata. 

39""" 

40 

41_logger = logging.getLogger(__name__) 

42 

43 

44class QuantaTiffExtractor: 

45 """ 

46 Extractor for FEI/Thermo Fisher TIFF files. 

47 

48 This extractor handles metadata extraction from .tif files saved by 

49 FEI/Thermo Fisher FIBs and SEMs (e.g., Quanta, Helios, etc.). The extractor 

50 performs content sniffing to verify the file contains FEI metadata before 

51 attempting extraction. 

52 """ 

53 

54 name = "quanta_tif_extractor" 

55 priority = 100 

56 supported_extensions: ClassVar = {"tif", "tiff"} 

57 

58 def supports(self, context: ExtractionContext) -> bool: 

59 """ 

60 Check if this extractor supports the given file. 

61 

62 Performs content sniffing to verify this is a FEI/Thermo TIFF file by: 

63 1. Checking for the FEI-specific TIFF tag (34682) containing [User] or [Beam] 

64 2. Falling back to binary content sniffing for files with FEI metadata markers 

65 

66 Parameters 

67 ---------- 

68 context 

69 The extraction context containing file information 

70 

71 Returns 

72 ------- 

73 bool 

74 True if this appears to be a FEI/Thermo TIFF file with metadata 

75 """ 

76 extension = context.file_path.suffix.lower().lstrip(".") 

77 if extension not in {"tif", "tiff"}: 

78 return False 

79 

80 # Strategy 1: Check for FEI metadata signature using TIFF tag 34682 

81 try: 

82 with Image.open(context.file_path) as img: 

83 # Check for FEI custom tag 

84 fei_metadata = img.tag_v2.get(FEI_TIFF_TAG) 

85 if fei_metadata is not None: 

86 # Verify the metadata starts with FEI-style markers 

87 metadata_str = str(fei_metadata) 

88 if "[User]" in metadata_str or "[Beam]" in metadata_str: 

89 return True 

90 except Exception as e: 

91 _logger.debug( 

92 "Could not read TIFF tags from %s: %s", 

93 context.file_path, 

94 e, 

95 ) 

96 

97 # Strategy 2: Fallback to binary content sniffing for files that may not be 

98 # proper TIFF files or use different metadata storage 

99 try: 

100 with context.file_path.open(mode="rb") as f: 

101 content = f.read(5000) # Read first 5KB to check for metadata markers 

102 except Exception as e: 

103 _logger.debug( 

104 "Could not read binary content from %s: %s", 

105 context.file_path, 

106 e, 

107 ) 

108 return False 

109 else: 

110 # Check for FEI metadata markers in file 

111 return b"[User]" in content or b"[Beam]" in content 

112 

113 def extract(self, context: ExtractionContext) -> list[dict[str, Any]]: 

114 """ 

115 Extract metadata from a FEI/Thermo TIFF file. 

116 

117 Returns the metadata (as a list of dictionaries) from a .tif file saved 

118 by the FEI Quanta SEM or related instruments. Specific tags of interest are 

119 extracted and placed under the root-level ``nx_meta`` node. 

120 

121 Parameters 

122 ---------- 

123 context 

124 The extraction context containing file information 

125 

126 Returns 

127 ------- 

128 list[dict] 

129 List containing a single metadata dict with 'nx_meta' key 

130 """ 

131 filename = context.file_path 

132 _logger.debug("Extracting metadata from FEI TIFF file: %s", filename) 

133 

134 mdict = {"nx_meta": {}} 

135 # assume all datasets coming from Quanta are Images, currently 

136 mdict["nx_meta"]["DatasetType"] = "Image" 

137 mdict["nx_meta"]["Data Type"] = "SEM_Imaging" 

138 

139 _set_instr_name_and_time(mdict, filename) 

140 

141 try: 

142 # Extract metadata from TIFF tags/binary 

143 metadata_str, xml_metadata = self._extract_metadata_from_tiff_tag(filename) 

144 

145 if not metadata_str: 

146 _logger.warning( 

147 "Did not find expected FEI tags in .tif file: %s", filename 

148 ) 

149 mdict["nx_meta"]["Data Type"] = "Unknown" 

150 mdict["nx_meta"]["Extractor Warnings"] = ( 

151 "Did not find expected FEI tags. Could not read metadata" 

152 ) 

153 mdict["nx_meta"] = sort_dict(mdict["nx_meta"]) 

154 return [mdict] 

155 

156 # Handle XML metadata if present 

157 if xml_metadata: 

158 mdict["FEI_XML_Metadata"] = xml_metadata 

159 

160 # Fix duplicate section headers (MultiGIS issue) 

161 metadata_str = self._fix_duplicate_multigis_metadata_tags(metadata_str) 

162 

163 # Parse INI format metadata 

164 mdict.update(self._parse_metadata_string(metadata_str)) 

165 

166 # Extract important fields to nx_meta 

167 mdict = self._parse_nx_meta(mdict) 

168 

169 # Migrate metadata to schema-compliant format 

170 mdict = self._migrate_to_schema_compliant_metadata(mdict) 

171 

172 except Exception as e: 

173 _logger.exception("Error extracting metadata from %s", filename) 

174 mdict["nx_meta"]["Data Type"] = "Unknown" 

175 mdict["nx_meta"]["Extractor Warnings"] = f"Extraction failed: {e}" 

176 

177 # sort the nx_meta dictionary (recursively) for nicer display 

178 mdict["nx_meta"] = sort_dict(mdict["nx_meta"]) 

179 

180 return [mdict] 

181 

182 def _extract_metadata_from_tiff_tag(self, tiff_path: Path) -> Tuple[str, dict]: 

183 """ 

184 Extract metadata string from FEI TIFF tags 34682 and 34683. 

185 

186 Extracts standard INI metadata from tag 34682 and XML metadata from tag 34683 

187 if present. Falls back to binary content sniffing if TIFF tags are not present. 

188 

189 Parameters 

190 ---------- 

191 tiff_path 

192 Path to the TIFF file 

193 

194 Returns 

195 ------- 

196 metadata_str 

197 Metadata string (INI format), or empty string if not found 

198 xml_metadata 

199 Dictionary of XML metadata if tag 34683 is present, else empty dict 

200 """ 

201 metadata_str = "" 

202 xml_metadata = {} 

203 

204 # Strategy 1: Try to extract from TIFF tags 34682 and 34683 

205 try: 

206 with Image.open(tiff_path) as img: 

207 # Extract standard metadata from tag 34682 

208 fei_metadata = img.tag_v2.get(FEI_TIFF_TAG) 

209 if fei_metadata is not None: 

210 # Convert tag to string 

211 metadata_str_val = ( 

212 fei_metadata 

213 if isinstance(fei_metadata, str) 

214 else str(fei_metadata) 

215 ) 

216 metadata_str = self._extract_metadata_string( 

217 metadata_str_val.encode() 

218 ) 

219 

220 # Extract XML metadata from tag 34683 if present 

221 xml_metadata_tag = img.tag_v2.get(FEI_XML_TIFF_TAG) 

222 if xml_metadata_tag is not None: 

223 xml_metadata_str = ( 

224 xml_metadata_tag 

225 if isinstance(xml_metadata_tag, str) 

226 else str(xml_metadata_tag) 

227 ) 

228 # Check if this is XML 

229 if "<?xml" in xml_metadata_str: 

230 try: 

231 root = etree.fromstring(xml_metadata_str) 

232 xml_metadata = self._xml_el_to_dict(root) 

233 except Exception as e: 

234 _logger.debug( 

235 "Failed to parse XML from TIFF tag 34683: %s", e 

236 ) 

237 except Exception as e: 

238 _logger.debug("Failed to extract FEI metadata from TIFF tags: %s", e) 

239 

240 # If we got metadata from TIFF tags, return it 

241 if metadata_str: 

242 return metadata_str, xml_metadata 

243 

244 # Strategy 2: Fallback to binary content extraction for files where 

245 # metadata might not be in a standard TIFF tag 

246 try: 

247 with tiff_path.open(mode="rb") as f: 

248 content = f.read() 

249 user_idx = content.find(b"[User]") 

250 if user_idx != -1: 

251 # Extract metadata string from binary 

252 metadata_str_raw = self._extract_metadata_string(content[user_idx:]) 

253 # Check for XML in the binary content 

254 metadata_str_clean, xml_meta = self._detect_and_process_xml_metadata( 

255 metadata_str_raw 

256 ) 

257 return metadata_str_clean, xml_meta 

258 except Exception as e: 

259 _logger.debug("Failed to extract FEI metadata from binary content: %s", e) 

260 

261 return "", {} 

262 

263 def _extract_metadata_string(self, metadata_bytes: bytes) -> str: 

264 """ 

265 Extract metadata string from binary data. 

266 

267 Removes null bytes and normalizes line endings from the binary 

268 metadata extracted from the TIFF file. 

269 

270 Parameters 

271 ---------- 

272 metadata_bytes 

273 Raw binary metadata from the TIFF file 

274 

275 Returns 

276 ------- 

277 str 

278 Cleaned metadata string 

279 """ 

280 # remove any null bytes since they break the extractor 

281 metadata_bytes = metadata_bytes.replace(b"\x00", b"") 

282 metadata_str = metadata_bytes.decode(errors="ignore") 

283 # normalize line endings 

284 return metadata_str.replace("\r\n", "\n").replace("\r", "\n") 

285 

286 def _detect_and_process_xml_metadata( 

287 self, 

288 metadata_str: str, 

289 ) -> Tuple[str, dict]: 

290 """ 

291 Find and (if necessary) parse XML metadata in a Thermo Fisher FIB/SEM TIF file. 

292 

293 Some Thermo Fisher FIB/SEM files have additional metadata embedded as XML 

294 at the end of the TIF file, which cannot be handled by the ConfigParser. 

295 This method will detect, parse, and remove the XML from the metadata if present. 

296 

297 Parameters 

298 ---------- 

299 metadata_str 

300 The metadata at the end of the TIF file as a string. May or may not include 

301 an XML section (this depends on the version of the Thermo software that 

302 saved the image). 

303 

304 Returns 

305 ------- 

306 metadata_str 

307 The originally provided metadata as a string, but with the XML portion 

308 removed if it was present 

309 

310 xml_metadata 

311 A dictionary containing the metadata that was present in the XML portion. 

312 Will be an empty dictionary if there was no XML. 

313 """ 

314 xml_regex = re.compile(r'<\?xml version=".+"\?>') 

315 regex_match = xml_regex.search(metadata_str) 

316 if regex_match: 

317 # there is an xml declaration in the metadata of this file, so parse it: 

318 xml_str = metadata_str[regex_match.span()[0] :] 

319 metadata_str = metadata_str[: regex_match.span()[0]] 

320 root = etree.fromstring(xml_str) 

321 return metadata_str, self._xml_el_to_dict(root) 

322 

323 return metadata_str, {} 

324 

325 @staticmethod 

326 def _xml_el_to_dict(node: etree.ElementBase) -> dict: 

327 """ 

328 Convert an lxml.etree node tree into a dict. 

329 

330 This is used to transform the XML metadata section into a dictionary 

331 representation so it can be stored alongside the other metadata. 

332 

333 Taken from https://stackoverflow.com/a/66103841/1435788 

334 

335 Parameters 

336 ---------- 

337 node 

338 XML element to convert 

339 

340 Returns 

341 ------- 

342 dict 

343 Dictionary representation of the XML element 

344 """ 

345 result = {} 

346 

347 for element in node.iterchildren(): 

348 # Remove namespace prefix 

349 key = element.tag.split("}")[1] if "}" in element.tag else element.tag 

350 

351 # Process element as tree element if the inner XML contains 

352 # non-whitespace content 

353 if element.text and element.text.strip(): 

354 value = element.text 

355 else: 

356 value = QuantaTiffExtractor._xml_el_to_dict(element) 

357 if key in result: 

358 if isinstance(result[key], list): 

359 result[key].append(value) # pragma: no cover 

360 else: 

361 tempvalue = result[key].copy() 

362 result[key] = [tempvalue, value] 

363 else: 

364 result[key] = value 

365 return result 

366 

367 @staticmethod 

368 def _fix_duplicate_multigis_metadata_tags(metadata_str: str) -> str: 

369 """ 

370 Rename the metadata section headers to allow parsing by ConfigParser. 

371 

372 Some instruments have metadata section titles like so: 

373 

374 [MultiGIS] 

375 [MultiGISUnit1] 

376 [MultiGISGas1] 

377 [MultiGISGas2] 

378 [MultiGISGas3] 

379 [MultiGISUnit2] 

380 [MultiGISGas1] 

381 ... 

382 

383 Which causes errors because ConfigParser raises a DuplicateSectionError. 

384 This method renames them to: 

385 

386 [MultiGIS] 

387 [MultiGISUnit1] 

388 [MultiGISUnit1.MultiGISGas1] 

389 [MultiGISUnit1.MultiGISGas2] 

390 [MultiGISUnit1.MultiGISGas3] 

391 [MultiGISUnit2] 

392 [MultiGISUnit2.MultiGISGas1] 

393 ... 

394 

395 Parameters 

396 ---------- 

397 metadata_str 

398 Metadata string potentially with duplicate section headers 

399 

400 Returns 

401 ------- 

402 str 

403 Metadata string with unique section headers 

404 """ 

405 metadata_to_return = "" 

406 multi_gis_section_numbers = re.findall(r"\[MultiGISUnit(\d+)\]", metadata_str) 

407 if multi_gis_section_numbers: 

408 multi_gis_unit_indices = [ 

409 metadata_str.index(f"[MultiGISUnit{num}]") 

410 for num in multi_gis_section_numbers 

411 ] 

412 metadata_to_return += metadata_str[: multi_gis_unit_indices[0]] 

413 for i, num in enumerate(multi_gis_section_numbers): 

414 if i < len(multi_gis_unit_indices) - 1: 

415 to_process = metadata_str[ 

416 multi_gis_unit_indices[i] : multi_gis_unit_indices[i + 1] 

417 ] 

418 else: 

419 to_process = metadata_str[multi_gis_unit_indices[i] :] 

420 multi_gis_gas_tags = re.findall(r"\[(MultiGISGas\d+)\]", to_process) 

421 for tag in multi_gis_gas_tags: 

422 to_process = to_process.replace(tag, f"MultiGISUnit{num}.{tag}") 

423 metadata_to_return += to_process 

424 else: 

425 metadata_to_return = metadata_str 

426 

427 return metadata_to_return 

428 

429 @staticmethod 

430 def _parse_metadata_string(hdr_string: str) -> dict[str, dict[str, str]]: 

431 """ 

432 Parse metadata from a string in INI format. 

433 

434 Parameters 

435 ---------- 

436 hdr_string 

437 Metadata as a string in INI format 

438 

439 Returns 

440 ------- 

441 dict 

442 Dictionary with section names as keys and key-value dicts as values 

443 """ 

444 config = configparser.RawConfigParser() 

445 # Make ConfigParser respect upper/lowercase values 

446 config.optionxform = lambda option: option 

447 

448 buf = io.StringIO(hdr_string) 

449 config.read_file(buf) 

450 

451 metadata = {} 

452 for section in config.sections(): 

453 metadata[section] = dict(config.items(section)) 

454 

455 return metadata 

456 

457 def _build_field_definitions(self, mdict: dict) -> list[FieldDefinition]: 

458 """Build field definitions for metadata extraction. 

459 

460 Parameters 

461 ---------- 

462 mdict 

463 Metadata dictionary with raw extracted metadata 

464 

465 Returns 

466 ------- 

467 list[FieldDefinition] 

468 List of field definitions for extraction 

469 """ 

470 beam_name = try_getting_dict_value(mdict, ["Beam", "Beam"]) 

471 det_name = try_getting_dict_value(mdict, ["Detectors", "Name"]) 

472 scan_name = try_getting_dict_value(mdict, ["Beam", "Scan"]) 

473 

474 fields = [] 

475 

476 # Beam section fields 

477 if beam_name is not None: 

478 fields.extend( 

479 [ 

480 FD( 

481 beam_name, 

482 "EmissionCurrent", 

483 "Emission Current", 

484 1.0, 

485 False, 

486 target_unit="ampere", 

487 ), 

488 FD( 

489 beam_name, 

490 "HFW", 

491 "Horizontal Field Width", 

492 1.0, 

493 False, 

494 target_unit="meter", 

495 ), 

496 FD(beam_name, "HV", "Voltage", 1.0, False, target_unit="volt"), 

497 FD(beam_name, "SourceTiltX", "Beam Tilt X", 1.0, False), 

498 FD(beam_name, "SourceTiltY", "Beam Tilt Y", 1.0, False), 

499 FD(beam_name, "StageR", ["Stage Position", "R"], 1.0, False), 

500 FD(beam_name, "StageTa", ["Stage Position", "α"], 1.0, False), # noqa: RUF001 

501 FD(beam_name, "StageX", ["Stage Position", "X"], 1.0, False), 

502 FD(beam_name, "StageY", ["Stage Position", "Y"], 1.0, False), 

503 FD(beam_name, "StageZ", ["Stage Position", "Z"], 1.0, False), 

504 FD( 

505 beam_name, 

506 "StageTb", 

507 ["Stage Position", "β"], 

508 1.0, 

509 False, 

510 suppress_zero=False, 

511 ), 

512 FD(beam_name, "StigmatorX", "Stigmator X Value", 1.0, False), 

513 FD(beam_name, "StigmatorY", "Stigmator Y Value", 1.0, False), 

514 FD( 

515 beam_name, 

516 "VFW", 

517 "Vertical Field Width", 

518 1.0, 

519 False, 

520 target_unit="meter", 

521 ), 

522 FD( 

523 beam_name, 

524 "WD", 

525 "Working Distance", 

526 1.0, 

527 False, 

528 target_unit="meter", 

529 ), 

530 FD( 

531 beam_name, 

532 "EucWD", 

533 "Eucentric WD", 

534 1.0, 

535 False, 

536 target_unit="meter", 

537 ), 

538 FD(beam_name, "ImageMode", "Image Mode", 1.0, True), 

539 FD( 

540 beam_name, 

541 "BeamShiftX", 

542 "Beam Shift X", 

543 1.0, 

544 False, 

545 ), 

546 FD( 

547 beam_name, 

548 "BeamShiftY", 

549 "Beam Shift Y", 

550 1.0, 

551 False, 

552 ), 

553 FD(beam_name, "BeamMode", "Beam Mode", 1.0, True), 

554 FD(beam_name, "PreTilt", "Pre-Tilt", 1.0, False), 

555 ] 

556 ) 

557 

558 # Scan section fields 

559 if scan_name is not None: 

560 fields.extend( 

561 [ 

562 FD( 

563 scan_name, 

564 "Dwell", 

565 "Pixel Dwell Time", 

566 1.0, 

567 False, 

568 target_unit="second", 

569 ), 

570 FD( 

571 scan_name, 

572 "FrameTime", 

573 "Total Frame Time", 

574 1.0, 

575 False, 

576 target_unit="second", 

577 ), 

578 FD( 

579 scan_name, 

580 "HorFieldsize", 

581 "Horizontal Field Width", 

582 1.0, 

583 False, 

584 target_unit="meter", 

585 ), 

586 FD( 

587 scan_name, 

588 "VerFieldsize", 

589 "Vertical Field Width", 

590 1.0, 

591 False, 

592 target_unit="meter", 

593 ), 

594 FD( 

595 scan_name, 

596 "PixelHeight", 

597 "Pixel Width", 

598 1.0, 

599 False, 

600 target_unit="meter", 

601 ), 

602 FD( 

603 scan_name, 

604 "PixelWidth", 

605 "Pixel Height", 

606 1.0, 

607 False, 

608 target_unit="meter", 

609 ), 

610 FD( 

611 scan_name, 

612 "LineTime", 

613 "Line Time", 

614 1.0, 

615 False, 

616 target_unit="second", 

617 ), 

618 FD( 

619 scan_name, 

620 "LineIntegration", 

621 "Line Integration", 

622 1.0, 

623 False, 

624 ), 

625 FD( 

626 scan_name, 

627 "ScanInterlacing", 

628 "Scan Interlacing", 

629 1.0, 

630 False, 

631 ), 

632 ] 

633 ) 

634 

635 # Detector section fields 

636 if det_name is not None: 

637 fields.extend( 

638 [ 

639 FD( 

640 det_name, 

641 "Brightness", 

642 "Detector Brightness Setting", 

643 1.0, 

644 False, 

645 ), 

646 FD(det_name, "Contrast", "Detector Contrast Setting", 1.0, False), 

647 FD( 

648 det_name, 

649 "EnhancedContrast", 

650 "Detector Enhanced Contrast Setting", 

651 1.0, 

652 False, 

653 ), 

654 FD(det_name, "Signal", "Detector Signal", 1.0, False), 

655 FD( 

656 det_name, 

657 "Grid", 

658 "Detector Grid Voltage", 

659 1.0, 

660 False, 

661 target_unit="volt", 

662 ), 

663 FD( 

664 det_name, "BrightnessDB", "Detector Brightness (DB)", 1.0, False 

665 ), 

666 FD(det_name, "ContrastDB", "Detector Contrast (DB)", 1.0, False), 

667 FD( 

668 det_name, 

669 "Mix", 

670 "Detector Mix (%)", 

671 1.0, 

672 False, 

673 ), 

674 FD( 

675 det_name, 

676 "MinimumDwellTime", 

677 "Minimum Dwell Time", 

678 1.0, 

679 False, 

680 target_unit="second", 

681 ), 

682 ] 

683 ) 

684 

685 # System section fields 

686 fields.extend( 

687 [ 

688 FD("System", "Chamber", "Chamber ID", 1.0, True), 

689 FD("System", "Pump", "Vacuum Pump", 1.0, True), 

690 FD("System", "SystemType", "System Type", 1.0, True), 

691 FD("System", "Stage", "Stage Description", 1.0, True), 

692 FD("System", "Dnumber", "Device Number", 1.0, True), 

693 FD("System", "Source", "Electron Source", 1.0, True), 

694 FD("System", "FinalLens", "Final Lens", 1.0, True), 

695 FD("System", "ESEM", "ESEM Setting", 1.0, True), 

696 FD("System", "Aperture", "Aperture Type", 1.0, True), 

697 ] 

698 ) 

699 

700 # Other fields 

701 fields.extend( 

702 [ 

703 FD("Beam", "Spot", "Spot Size", 1.0, False), 

704 FD( 

705 "Specimen", 

706 "Temperature", 

707 "Specimen Temperature", 

708 1.0, 

709 False, 

710 target_unit="kelvin", 

711 ), 

712 FD( 

713 "Specimen", 

714 "Humidity", 

715 "Specimen Humidity", 

716 1.0, 

717 False, 

718 target_unit="percent", 

719 ), 

720 FD("User", "UserText", "User Text", 1.0, True), 

721 FD("User", "Date", "Acquisition Date", 1.0, True), 

722 FD("User", "Time", "Acquisition Time", 1.0, True), 

723 FD("Vacuum", "UserMode", "Vacuum Mode", 1.0, True), 

724 FD("Vacuum", "Gas", "Vacuum Gas", 1.0, False), 

725 FD("Image", "MagnificationMode", "Magnification Mode", 1.0, False), 

726 FD( 

727 "Image", 

728 "DigitalContrast", 

729 "Digital Contrast", 

730 1.0, 

731 False, 

732 ), 

733 FD( 

734 "Image", 

735 "DigitalBrightness", 

736 "Digital Brightness", 

737 1.0, 

738 False, 

739 ), 

740 FD( 

741 "Image", 

742 "DigitalGamma", 

743 "Digital Gamma", 

744 1.0, 

745 False, 

746 ), 

747 FD( 

748 "Image", 

749 "ZoomFactor", 

750 "Zoom Factor", 

751 1.0, 

752 False, 

753 ), 

754 FD("Image", "ZoomPanX", "Zoom Pan X", 1.0, False), 

755 FD("Image", "ZoomPanY", "Zoom Pan Y", 1.0, False), 

756 FD( 

757 "Image", 

758 "MagCanvasRealWidth", 

759 "Magnification Canvas Real Width", 

760 1.0, 

761 False, 

762 ), 

763 FD( 

764 "Image", 

765 "ScreenMagCanvasRealWidth", 

766 "Screen Magnification Canvas Real Width", 

767 1.0, 

768 False, 

769 ), 

770 FD( 

771 "Image", 

772 "ScreenMagnificationMode", 

773 "Screen Magnification Mode", 

774 1.0, 

775 False, 

776 ), 

777 FD("Image", "Average", "Frame Average", 1.0, False), 

778 FD("Image", "PostProcessing", "Post Processing", 1.0, False), 

779 ] 

780 ) 

781 

782 # EScan Mainslock field 

783 if scan_name is not None: 

784 fields.append(FD(scan_name, "Mainslock", "Mainslock", 1.0, True)) 

785 

786 return fields 

787 

788 def _process_standard_fields( 

789 self, mdict: dict, fields: list[FieldDefinition], det_name: str 

790 ) -> None: 

791 """Process standard field definitions.""" 

792 for field in fields: 

793 value = try_getting_dict_value(mdict, [field.section, field.source_key]) 

794 

795 if value is not None and value != "": 

796 # Skip detector "Setting" if numeric (duplicate of Grid voltage) 

797 if field.section == det_name and field.source_key == "Setting": 

798 try: 

799 Decimal(value) 

800 continue 

801 except (ValueError, InvalidOperation): 

802 pass 

803 

804 if field.is_string: 

805 self._set_field_value(mdict, field.output_key, value) 

806 else: 

807 self._set_numeric_field_value( 

808 mdict, 

809 field.output_key, 

810 value, 

811 field.factor, 

812 field.suppress_zero, 

813 field.target_unit, 

814 ) 

815 

816 def _set_field_value(self, mdict: dict, output_key: str | list, value: str) -> None: 

817 """Set a string field value in metadata.""" 

818 if isinstance(output_key, list): 

819 set_nested_dict_value(mdict, ["nx_meta", *output_key], value) 

820 else: 

821 set_nested_dict_value(mdict, ["nx_meta", output_key], value) 

822 

823 def _set_numeric_field_value( # noqa: PLR0913 

824 self, 

825 mdict: dict, 

826 output_key: str | list, 

827 value: str, 

828 factor: float, 

829 suppress_zero: bool, 

830 unit: str | None = None, 

831 ) -> None: 

832 """Set a numeric field value with unit conversion. 

833 

834 Parameters 

835 ---------- 

836 mdict 

837 Metadata dictionary 

838 output_key 

839 Output key or nested path 

840 value 

841 String value to convert 

842 factor 

843 Multiplicative conversion factor 

844 suppress_zero 

845 If True, skip if value equals zero 

846 unit 

847 Pint unit string (e.g., "kilovolt"). If provided, creates a Quantity. 

848 """ 

849 try: 

850 decimal_val = Decimal(value) * Decimal(str(factor)) 

851 if not suppress_zero or decimal_val != 0: 

852 # Create Pint Quantity if unit is specified 

853 if unit is not None: 

854 quantity_val = ureg.Quantity(decimal_val, unit) 

855 self._set_field_value(mdict, output_key, quantity_val) 

856 else: 

857 # Convert to float for non-quantity values 

858 self._set_field_value(mdict, output_key, float(decimal_val)) 

859 except (ValueError, InvalidOperation, TypeError): 

860 # TypeError can occur if value is None 

861 if value is not None: 

862 self._set_field_value(mdict, output_key, value) 

863 

864 def _parse_special_cases(self, mdict: dict, beam_name: str, det_name: str) -> None: 

865 """Parse special case metadata fields.""" 

866 if beam_name is not None: 

867 set_nested_dict_value(mdict, ["nx_meta", "Beam Name"], beam_name) 

868 if det_name is not None: 

869 set_nested_dict_value(mdict, ["nx_meta", "Detector Name"], det_name) 

870 

871 if beam_name is not None: 

872 self._parse_scan_rotation(mdict, beam_name) 

873 self._parse_tilt_correction(mdict, beam_name) 

874 self._parse_beam_control_flags(mdict, beam_name) 

875 self._parse_drift_correction(mdict) 

876 self._parse_frame_integration(mdict) 

877 self._parse_resolution(mdict) 

878 self._parse_operator(mdict) 

879 self._parse_chamber_pressure(mdict) 

880 self._parse_software_version(mdict) 

881 self._parse_column_type(mdict) 

882 self._parse_scan_settings(mdict) 

883 

884 def _parse_scan_rotation(self, mdict: dict, beam_name: str) -> None: 

885 """Parse scan rotation (radians → degrees).""" 

886 scan_rot_val = try_getting_dict_value(mdict, [beam_name, "ScanRotation"]) 

887 if scan_rot_val is not None: 

888 scan_rot_dec = Decimal(scan_rot_val) 

889 digits = abs(scan_rot_dec.as_tuple().exponent) 

890 scan_rot_degrees = round(degrees(scan_rot_dec), digits) 

891 scan_rot_quantity = ureg.Quantity(scan_rot_degrees, "degree") 

892 set_nested_dict_value( 

893 mdict, ["nx_meta", "Scan Rotation"], scan_rot_quantity 

894 ) 

895 

896 def _parse_tilt_correction(self, mdict: dict, beam_name: str) -> None: 

897 """Parse tilt correction (conditional on TiltCorrectionIsOn).""" 

898 tilt_corr_on = try_getting_dict_value(mdict, [beam_name, "TiltCorrectionIsOn"]) 

899 if tilt_corr_on == "yes": 

900 tilt_corr_val = try_getting_dict_value( 

901 mdict, [beam_name, "TiltCorrectionAngle"] 

902 ) 

903 if tilt_corr_val is not None: 

904 set_nested_dict_value( 

905 mdict, 

906 ["nx_meta", "Tilt Correction Angle"], 

907 float(Decimal(tilt_corr_val)), 

908 ) 

909 

910 def _parse_beam_control_flags(self, mdict: dict, beam_name: str) -> None: 

911 """Parse beam control boolean flags.""" 

912 # Tilt correction on/off 

913 tilt_corr_on = try_getting_dict_value(mdict, [beam_name, "TiltCorrectionIsOn"]) 

914 if tilt_corr_on is not None: 

915 set_nested_dict_value( 

916 mdict, ["nx_meta", "Tilt Correction Enabled"], tilt_corr_on == "yes" 

917 ) 

918 

919 # Dynamic focus on/off 

920 dyn_focus = try_getting_dict_value(mdict, [beam_name, "DynamicFocusIsOn"]) 

921 if dyn_focus is not None: 

922 set_nested_dict_value( 

923 mdict, ["nx_meta", "Dynamic Focus Enabled"], dyn_focus == "yes" 

924 ) 

925 

926 # Dynamic WD on/off 

927 dyn_wd = try_getting_dict_value(mdict, [beam_name, "DynamicWDIsOn"]) 

928 if dyn_wd is not None: 

929 set_nested_dict_value( 

930 mdict, ["nx_meta", "Dynamic WD Enabled"], dyn_wd == "yes" 

931 ) 

932 

933 def _parse_drift_correction(self, mdict: dict) -> None: 

934 """Parse drift correction (boolean).""" 

935 drift_val = try_getting_dict_value(mdict, ["Image", "DriftCorrected"]) 

936 if drift_val is not None: 

937 set_nested_dict_value( 

938 mdict, ["nx_meta", "Drift Correction Applied"], drift_val == "On" 

939 ) 

940 

941 def _parse_frame_integration(self, mdict: dict) -> None: 

942 """Parse frame integration (only if > 1).""" 

943 integrate_val = try_getting_dict_value(mdict, ["Image", "Integrate"]) 

944 if integrate_val is not None: 

945 with contextlib.suppress(ValueError): 

946 integrate_int = int(integrate_val) 

947 if integrate_int > 1: 

948 set_nested_dict_value( 

949 mdict, ["nx_meta", "Frames Integrated"], integrate_int 

950 ) 

951 

952 def _parse_resolution(self, mdict: dict) -> None: 

953 """Parse resolution (paired X/Y as tuple string).""" 

954 x_val = try_getting_dict_value(mdict, ["Image", "ResolutionX"]) 

955 y_val = try_getting_dict_value(mdict, ["Image", "ResolutionY"]) 

956 if x_val is not None and y_val is not None: 

957 with contextlib.suppress(ValueError): 

958 x_int = int(x_val) 

959 y_int = int(y_val) 

960 set_nested_dict_value( 

961 mdict, ["nx_meta", "Data Dimensions"], str((x_int, y_int)) 

962 ) 

963 

964 def _parse_operator(self, mdict: dict) -> None: 

965 """Parse operator (with warning).""" 

966 user_val = try_getting_dict_value(mdict, ["User", "User"]) 

967 if user_val is not None: 

968 set_nested_dict_value(mdict, ["nx_meta", "Operator"], user_val) 

969 mdict["nx_meta"]["warnings"].append(["Operator"]) 

970 

971 def _parse_chamber_pressure(self, mdict: dict) -> None: 

972 """Parse chamber pressure (unit depends on vacuum mode).""" 

973 ch_pres_val = try_getting_dict_value(mdict, ["Vacuum", "ChPressure"]) 

974 if ch_pres_val is not None and ch_pres_val != "": 

975 try: 

976 ch_pres_decimal = Decimal(ch_pres_val) 

977 is_high_vacuum = ( 

978 try_getting_dict_value(mdict, ["nx_meta", "Vacuum Mode"]) 

979 == "High vacuum" 

980 ) 

981 

982 if is_high_vacuum: 

983 # Value is in Pa, multiply by 1000 to get mPa 

984 ch_pres_decimal_mpa = ch_pres_decimal * 10**3 

985 ch_pres_quantity = ureg.Quantity(ch_pres_decimal_mpa, "millipascal") 

986 else: 

987 # Value is already in Pa 

988 ch_pres_quantity = ureg.Quantity(ch_pres_decimal, "pascal") 

989 

990 set_nested_dict_value( 

991 mdict, 

992 ["nx_meta", "Chamber Pressure"], 

993 ch_pres_quantity, 

994 ) 

995 except (ValueError, InvalidOperation): 

996 # If conversion fails, store as string without unit 

997 set_nested_dict_value( 

998 mdict, ["nx_meta", "Chamber Pressure"], ch_pres_val 

999 ) 

1000 

1001 def _parse_software_version(self, mdict: dict) -> None: 

1002 """Parse software version (aggregate Software + BuildNr).""" 

1003 software_parts = [] 

1004 software_val = try_getting_dict_value(mdict, ["System", "Software"]) 

1005 if software_val is not None: 

1006 software_parts.append(software_val) 

1007 build_val = try_getting_dict_value(mdict, ["System", "BuildNr"]) 

1008 if build_val is not None: 

1009 software_parts.append(f"(build {build_val})") 

1010 if software_parts: 

1011 set_nested_dict_value( 

1012 mdict, ["nx_meta", "Software Version"], " ".join(software_parts) 

1013 ) 

1014 

1015 def _parse_column_type(self, mdict: dict) -> None: 

1016 """Parse column type (aggregate Column + Type).""" 

1017 column_parts = [] 

1018 column_val = try_getting_dict_value(mdict, ["System", "Column"]) 

1019 if column_val is not None: 

1020 column_parts.append(column_val) 

1021 type_val = try_getting_dict_value(mdict, ["System", "Type"]) 

1022 if type_val is not None: 

1023 column_parts.append(type_val) 

1024 if column_parts: 

1025 set_nested_dict_value( 

1026 mdict, ["nx_meta", "Column Type"], " ".join(column_parts) 

1027 ) 

1028 

1029 def _parse_scan_settings(self, mdict: dict) -> None: 

1030 """Parse scan-related settings.""" 

1031 # Internal scan flag 

1032 scan_name = try_getting_dict_value(mdict, ["Beam", "Scan"]) 

1033 if scan_name is not None: 

1034 internal_scan = try_getting_dict_value(mdict, [scan_name, "InternalScan"]) 

1035 if internal_scan is not None: 

1036 set_nested_dict_value( 

1037 mdict, ["nx_meta", "Internal Scan"], internal_scan == "true" 

1038 ) 

1039 

1040 def _parse_nx_meta(self, mdict: dict) -> dict: 

1041 """ 

1042 Parse metadata into NexusLIMS format. 

1043 

1044 Parse the "important" metadata that is saved at specific places within 

1045 the Quanta tag structure into a consistent place in the metadata dictionary. 

1046 

1047 The metadata contained in the XML section (if present) is not parsed, since it 

1048 appears to only contain duplicates or slightly renamed metadata values compared 

1049 to the typical config-style section. 

1050 

1051 Parameters 

1052 ---------- 

1053 mdict 

1054 A metadata dictionary with raw extracted metadata 

1055 

1056 Returns 

1057 ------- 

1058 dict 

1059 The same metadata dictionary with parsed values added under the 

1060 root-level ``nx_meta`` key 

1061 """ 

1062 if "warnings" not in mdict["nx_meta"]: 

1063 mdict["nx_meta"]["warnings"] = [] 

1064 

1065 beam_name = try_getting_dict_value(mdict, ["Beam", "Beam"]) 

1066 det_name = try_getting_dict_value(mdict, ["Detectors", "Name"]) 

1067 

1068 fields = self._build_field_definitions(mdict) 

1069 self._process_standard_fields(mdict, fields, det_name) 

1070 self._parse_special_cases(mdict, beam_name, det_name) 

1071 

1072 return mdict 

1073 

1074 def _migrate_to_schema_compliant_metadata(self, mdict: dict) -> dict: 

1075 """ 

1076 Migrate metadata to schema-compliant format. 

1077 

1078 Reorganizes metadata to conform to type-specific Pydantic schemas: 

1079 - Extracts core EM Glossary fields to top level with standardized names 

1080 - Moves vendor-specific nested dictionaries to extensions section 

1081 - Preserves existing extensions from instrument profiles 

1082 

1083 Parameters 

1084 ---------- 

1085 mdict 

1086 Metadata dictionary with nx_meta containing extracted fields 

1087 

1088 Returns 

1089 ------- 

1090 dict 

1091 Metadata dictionary with schema-compliant nx_meta structure 

1092 """ 

1093 nx_meta = mdict.get("nx_meta", {}) 

1094 

1095 # Preserve existing extensions from instrument profiles 

1096 extensions = ( 

1097 nx_meta.get("extensions", {}).copy() if "extensions" in nx_meta else {} 

1098 ) 

1099 

1100 # Field mappings from display names to EM Glossary names 

1101 field_mappings = { 

1102 "Voltage": "acceleration_voltage", 

1103 "Working Distance": "working_distance", 

1104 "Emission Current": "emission_current", 

1105 "Pixel Dwell Time": "dwell_time", 

1106 "Horizontal Field Width": "horizontal_field_width", 

1107 "Vertical Field Width": "vertical_field_width", 

1108 "Pixel Width": "pixel_width", 

1109 "Pixel Height": "pixel_height", 

1110 } 

1111 

1112 # Fields that ALWAYS go to extensions (vendor-specific nested dicts) 

1113 extension_top_level_keys = { 

1114 "Beam", 

1115 "Scan", 

1116 "Detector", 

1117 "Stage Position", 

1118 "Image", 

1119 "Application", 

1120 "Vacuum", 

1121 "System", 

1122 "User", 

1123 "Detectors", 

1124 "GIS", 

1125 "Specimen", 

1126 "PrivateFei", 

1127 "FEI_XML_Metadata", 

1128 "Optics", 

1129 } 

1130 

1131 # Also move these individual vendor fields to extensions 

1132 extension_field_names = { 

1133 "Detector Brightness Setting", 

1134 "Detector Contrast Setting", 

1135 "Detector Enhanced Contrast Setting", 

1136 "Detector Signal", 

1137 "Detector Grid Voltage", 

1138 "Beam Tilt X", 

1139 "Beam Tilt Y", 

1140 "Stigmator X Value", 

1141 "Stigmator Y Value", 

1142 "Beam Shift X", 

1143 "Beam Shift Y", 

1144 "Beam Mode", 

1145 "Image Mode", 

1146 "Pre-Tilt", 

1147 "Eucentric WD", 

1148 "Total Frame Time", 

1149 "Line Time", 

1150 "Line Integration", 

1151 "Scan Interlacing", 

1152 } 

1153 

1154 # Build new nx_meta with proper field organization 

1155 new_nx_meta = {} 

1156 

1157 # Copy required fields 

1158 for field in ["DatasetType", "Data Type", "Creation Time"]: 

1159 if field in nx_meta: 

1160 new_nx_meta[field] = nx_meta[field] 

1161 

1162 # Copy instrument identification 

1163 if "Instrument ID" in nx_meta: 

1164 new_nx_meta["Instrument ID"] = nx_meta["Instrument ID"] 

1165 

1166 # Process all fields and categorize 

1167 for old_name, value in nx_meta.items(): 

1168 # Skip fields we've already handled 

1169 if old_name in [ 

1170 "DatasetType", 

1171 "Data Type", 

1172 "Creation Time", 

1173 "Instrument ID", 

1174 "Extractor Warnings", 

1175 "warnings", 

1176 "extensions", 

1177 ]: 

1178 continue 

1179 

1180 # Top-level vendor sections go to extensions 

1181 if old_name in extension_top_level_keys: 

1182 extensions[old_name] = value 

1183 continue 

1184 

1185 # Check if this is a core field that needs renaming 

1186 if old_name in field_mappings: 

1187 emg_name = field_mappings[old_name] 

1188 new_nx_meta[emg_name] = value 

1189 continue 

1190 

1191 # Vendor-specific individual fields go to extensions 

1192 if old_name in extension_field_names: 

1193 extensions[old_name] = value 

1194 continue 

1195 

1196 # Everything else goes to extensions (vendor-specific by default) 

1197 # This is safer than at top level where schema validation will reject 

1198 extensions[old_name] = value 

1199 

1200 # Copy warnings if present 

1201 if "warnings" in nx_meta: 

1202 new_nx_meta["warnings"] = nx_meta["warnings"] 

1203 

1204 # Add extensions section if we have any 

1205 for key, value in extensions.items(): 

1206 add_to_extensions(new_nx_meta, key, value) 

1207 

1208 mdict["nx_meta"] = new_nx_meta 

1209 return mdict 

1210 

1211 

1212# Backward compatibility function for tests 

1213def get_quanta_metadata(filename): 

1214 """ 

1215 Get metadata from a Quanta TIF file. 

1216 

1217 .. deprecated:: 

1218 This function is deprecated. Use QuantaTiffExtractor class instead. 

1219 

1220 Parameters 

1221 ---------- 

1222 filename : pathlib.Path 

1223 path to a file saved in the harvested directory of the instrument 

1224 

1225 Returns 

1226 ------- 

1227 mdict : dict 

1228 A description of the file's metadata. 

1229 """ 

1230 context = ExtractionContext( 

1231 file_path=filename, instrument=get_instr_from_filepath(filename) 

1232 ) 

1233 return QuantaTiffExtractor().extract(context)