Coverage for nexusLIMS/extractors/plugins/digital_micrograph.py: 100%

356 statements  

« prev     ^ index     » next       coverage.py v7.11.3, created at 2026-03-24 05:23 +0000

1"""Digital Micrograph (.dm3/.dm4) extractor plugin.""" 

2 

3import contextlib 

4import logging 

5from datetime import UTC 

6from datetime import datetime as dt 

7from pathlib import Path 

8from struct import error 

9from typing import Any, ClassVar, Dict, List 

10 

11import numpy as np 

12from hyperspy.io import load as hs_load 

13from rsciio.utils.exceptions import ( 

14 DM3DataTypeError, 

15 DM3FileVersionError, 

16 DM3TagError, 

17 DM3TagIDError, 

18 DM3TagTypeError, 

19) 

20 

21from nexusLIMS.extractors.base import ExtractionContext 

22from nexusLIMS.extractors.plugins.basic_metadata import BasicFileInfoExtractor 

23from nexusLIMS.extractors.plugins.profiles import register_all_profiles 

24from nexusLIMS.extractors.profiles import get_profile_registry 

25from nexusLIMS.extractors.utils import ( 

26 _coerce_to_list, 

27 _find_val, 

28 _parse_filter_settings, 

29 _set_acquisition_device_name, 

30 _set_camera_binning, 

31 _set_eds_meta, 

32 _set_eels_meta, 

33 _set_eels_processing, 

34 _set_eels_spectrometer_meta, 

35 _set_exposure_time, 

36 _set_gms_version, 

37 _set_image_processing, 

38 _set_si_meta, 

39 _try_decimal, 

40 add_to_extensions, 

41) 

42from nexusLIMS.instruments import get_instr_from_filepath 

43from nexusLIMS.schemas.units import ureg 

44from nexusLIMS.utils.dicts import ( 

45 remove_dict_nones, 

46 remove_dtb_element, 

47 set_nested_dict_value, 

48 sort_dict, 

49 try_getting_dict_value, 

50) 

51from nexusLIMS.utils.time import current_system_tz 

52 

53_logger = logging.getLogger(__name__) 

54 

55 

56class DM3Extractor: 

57 """ 

58 Extractor for Gatan DigitalMicrograph files (.dm3 and .dm4). 

59 

60 This extractor handles metadata extraction from files saved by Gatan's 

61 DigitalMicrograph software, commonly used on FEI/Thermo and JEOL TEMs. 

62 """ 

63 

64 name = "dm3_extractor" 

65 priority = 100 

66 supported_extensions: ClassVar = {"dm3", "dm4"} 

67 

68 def supports(self, context: ExtractionContext) -> bool: 

69 """ 

70 Check if this extractor supports the given file. 

71 

72 Parameters 

73 ---------- 

74 context 

75 The extraction context containing file information 

76 

77 Returns 

78 ------- 

79 bool 

80 True if file extension is .dm3 or .dm4 

81 """ 

82 extension = context.file_path.suffix.lower().lstrip(".") 

83 return extension in {"dm3", "dm4"} 

84 

85 def extract( 

86 self, context: ExtractionContext 

87 ) -> dict[str, Any] | list[dict[str, Any]]: 

88 """ 

89 Extract metadata from a DM3/DM4 file. 

90 

91 Parameters 

92 ---------- 

93 context 

94 The extraction context containing file information 

95 

96 Returns 

97 ------- 

98 list[dict] or dict 

99 For DM3/DM4 files: Always returns a list of metadata dicts. 

100 Each dict contains 'nx_meta' with NexusLIMS-specific metadata. 

101 Single-signal files return a 1-element list for consistency. 

102 If the file cannot be opened, returns basic metadata as a single dict 

103 (following the standard extractor contract for error cases). 

104 """ 

105 _logger.debug("Extracting metadata from DM3/DM4 file: %s", context.file_path) 

106 # get_dm3_metadata() handles profile application internally 

107 metadata_list = get_dm3_metadata(context.file_path, context.instrument) 

108 

109 # If extraction failed, return minimal metadata with a warning 

110 if metadata_list is None: 

111 _logger.warning( 

112 "Failed to extract DM3/DM4 metadata from %s, " 

113 "falling back to basic metadata", 

114 context.file_path, 

115 ) 

116 # Use basic metadata extractor as fallback 

117 basic_extractor = BasicFileInfoExtractor() 

118 metadata_list = basic_extractor.extract(context) 

119 # Add a warning to indicate extraction failed 

120 metadata = metadata_list[0] 

121 metadata["nx_meta"]["warnings"] = metadata["nx_meta"].get("warnings", []) 

122 metadata["nx_meta"]["warnings"].append( 

123 ["DM3/DM4 file could not be read by HyperSpy"] 

124 ) 

125 return [metadata] 

126 

127 # Always return a list of metadata dicts 

128 # Single-signal files return a 1-element list for consistent interface 

129 return metadata_list 

130 

131 

132def get_dm3_metadata(filename: Path, instrument=None): 

133 """ 

134 Get metadata from a dm3 or dm4 file. 

135 

136 Returns the metadata from a .dm3 file saved by Digital Micrograph, with some 

137 non-relevant information stripped out. Instrument-specific metadata parsing is 

138 handled by instrument profiles (see nexusLIMS.extractors.plugins.profiles). 

139 

140 Parameters 

141 ---------- 

142 filename : str 

143 path to a .dm3 file saved by Gatan's Digital Micrograph 

144 instrument : Instrument, optional 

145 The instrument object (used for timezone info). Instrument-specific parsing 

146 is now handled via profiles, not this parameter. 

147 

148 Returns 

149 ------- 

150 metadata : list[dict] or None 

151 List of extracted metadata dicts, one per signal. If None, the file could 

152 not be opened. 

153 """ 

154 # We do lazy loading so we don't actually read the data from the disk to 

155 # save time and memory. 

156 try: 

157 s = hs_load(filename, lazy=True) 

158 except ( 

159 DM3DataTypeError, 

160 DM3FileVersionError, 

161 DM3TagError, 

162 DM3TagIDError, 

163 DM3TagTypeError, 

164 error, 

165 ) as exc: 

166 _logger.warning( 

167 "File reader could not open %s, received exception: %s", 

168 filename, 

169 repr(exc), 

170 ) 

171 return None 

172 

173 if isinstance(s, list): 

174 # s is a list, rather than a single signal 

175 m_list = [{}] * len(s) 

176 for i, _ in enumerate(s): 

177 m_list[i] = s[i].original_metadata 

178 else: 

179 s = [s] 

180 m_list = [s[0].original_metadata] 

181 

182 for i, m_tree in enumerate(m_list): 

183 # Important trees: 

184 # DocumentObjectList 

185 # Contains information about the display of the information, including bits 

186 # about annotations that are included on top of the image data, the CLUT 

187 # (color look-up table), data min/max. 

188 # 

189 # ImageList 

190 # Contains the actual image information 

191 

192 # Remove the trees that are not of interest: 

193 for tag in [ 

194 "ApplicationBounds", 

195 "LayoutType", 

196 "DocumentTags", 

197 "HasWindowPosition", 

198 "ImageSourceList", 

199 "Image_Behavior", 

200 "InImageMode", 

201 "MinVersionList", 

202 "NextDocumentObjectID", 

203 "PageSetup", 

204 "Page_Behavior", 

205 "SentinelList", 

206 "Thumbnails", 

207 "WindowPosition", 

208 "root", 

209 ]: 

210 m_tree = remove_dtb_element(m_tree, tag) # noqa: PLW2901 

211 

212 # Within the DocumentObjectList tree, we really only care about the 

213 # AnnotationGroupList for each TagGroup, so go into each TagGroup and 

214 # delete everything but that... 

215 # NB: the hyperspy DictionaryTreeBrowser __iter__ function returns each 

216 # tree element as a tuple containing the tree name and the actual 

217 # tree, so we loop through the tag names by taking the first part 

218 # of the tuple: 

219 for tg_name, tag in m_tree.DocumentObjectList: 

220 # tg_name should be 'TagGroup0', 'TagGroup1', etc. 

221 keys = tag.keys() 

222 # we want to keep this, so remove from the list to loop through 

223 if "AnnotationGroupList" in keys: 

224 keys.remove("AnnotationGroupList") 

225 for k in keys: 

226 m_tree = remove_dtb_element( # noqa: PLW2901 

227 m_tree, 

228 f"DocumentObjectList.{tg_name}.{k}", 

229 ) 

230 

231 for tg_name, tag in m_tree.ImageList: 

232 # tg_name should be 'TagGroup0', 'TagGroup1', etc. 

233 keys = tag.keys() 

234 # We want to keep 'ImageTags' and 'Name', so remove from list 

235 keys.remove("ImageTags") 

236 keys.remove("Name") 

237 for k in keys: 

238 # k should be in ['ImageData', 'UniqueID'] 

239 m_tree = remove_dtb_element( # noqa: PLW2901 

240 m_tree, 

241 f"ImageList.{tg_name}.{k}", 

242 ) 

243 

244 m_list[i] = m_tree.as_dictionary() 

245 

246 # Get the instrument object associated with this file 

247 # Use provided instrument if available, otherwise look it up 

248 instr = ( 

249 instrument if instrument is not None else get_instr_from_filepath(filename) 

250 ) 

251 # get the modification time (as ISO format): 

252 mtime = filename.stat().st_mtime 

253 # Use instrument timezone if available, otherwise fall back to system timezone 

254 tz = instr.timezone if instr else current_system_tz() 

255 mtime_iso = dt.fromtimestamp(mtime, tz=tz).isoformat() 

256 # if we found the instrument, then store the name as string, else None 

257 instr_name = instr.name if instr is not None else None 

258 m_list[i]["nx_meta"] = {} 

259 m_list[i]["nx_meta"]["fname"] = str(filename) 

260 # set type to Image by default 

261 m_list[i]["nx_meta"]["DatasetType"] = "Image" 

262 m_list[i]["nx_meta"]["Data Type"] = "TEM_Imaging" 

263 m_list[i]["nx_meta"]["Creation Time"] = mtime_iso 

264 m_list[i]["nx_meta"]["Data Dimensions"] = str(s[i].data.shape) 

265 m_list[i]["nx_meta"]["Instrument ID"] = instr_name 

266 m_list[i]["nx_meta"]["warnings"] = [] 

267 m_list[i] = parse_dm3_microscope_info(m_list[i]) 

268 m_list[i] = parse_dm3_eels_info(m_list[i]) 

269 m_list[i] = parse_dm3_eds_info(m_list[i]) 

270 m_list[i] = parse_dm3_spectrum_image_info(m_list[i]) 

271 

272 # Apply instrument-specific profiles if an instrument was provided 

273 if instr is not None: 

274 m_list[i] = _apply_profile_to_metadata(m_list[i], instr, filename) 

275 

276 # we don't need to save the filename, it's just for internal processing 

277 del m_list[i]["nx_meta"]["fname"] 

278 

279 # Migrate metadata to schema-compliant format 

280 m_list[i] = _migrate_to_schema_compliant_metadata(m_list[i]) 

281 

282 # sort the nx_meta dictionary (recursively) for nicer display 

283 m_list[i]["nx_meta"] = sort_dict(m_list[i]["nx_meta"]) 

284 

285 # return all signals as a list of dictionaries: 

286 return [remove_dict_nones(m) for m in m_list] 

287 

288 

289def _apply_profile_to_metadata(metadata: dict, instrument, file_path: Path) -> dict: 

290 """ 

291 Apply instrument profile to metadata dictionary. 

292 

293 This is a helper function used by get_dm3_metadata() to maintain backward 

294 compatibility with code that calls it directly. 

295 

296 Parameters 

297 ---------- 

298 metadata 

299 Metadata dictionary with 'nx_meta' key 

300 instrument 

301 Instrument object 

302 file_path 

303 Path to the file being processed 

304 

305 Returns 

306 ------- 

307 dict 

308 Modified metadata dictionary with profile transformations applied 

309 """ 

310 # Ensure profiles are loaded 

311 register_all_profiles() 

312 

313 profile = get_profile_registry().get_profile(instrument) 

314 

315 if profile is None: 

316 return metadata 

317 

318 _logger.debug("Applying profile for instrument: %s", instrument.name) 

319 

320 # Create a mock context for profile application 

321 context = ExtractionContext(file_path=file_path, instrument=instrument) 

322 

323 # Apply custom parsers in order 

324 for parser_name, parser_func in profile.parsers.items(): 

325 try: 

326 metadata = parser_func(metadata, context) 

327 except Exception as e: 

328 _logger.warning( 

329 "Profile parser '%s' failed: %s", 

330 parser_name, 

331 e, 

332 ) 

333 

334 # Apply transformations 

335 for key, transform_func in profile.transformations.items(): 

336 try: 

337 if key in metadata: 

338 metadata[key] = transform_func(metadata[key]) 

339 except Exception as e: 

340 _logger.warning( 

341 "Profile transformation '%s' failed: %s", 

342 key, 

343 e, 

344 ) 

345 

346 # Inject extension fields 

347 if profile.extension_fields: 

348 for key, value in profile.extension_fields.items(): 

349 try: 

350 add_to_extensions(metadata["nx_meta"], key, value) 

351 except Exception as e: 

352 _logger.warning( 

353 "Profile extension field injection '%s' failed: %s", 

354 key, 

355 e, 

356 ) 

357 

358 return metadata 

359 

360 

361def get_pre_path(mdict: Dict) -> List[str]: 

362 """ 

363 Get the appropriate pre-path in the metadata tag structure for a given signal. 

364 

365 Get the path into a dictionary where the important DigitalMicrograph metadata is 

366 expected to be found. If the .dm3/.dm4 file contains a stack of images, the 

367 important metadata for NexusLIMS is not at its usual place and is instead under a 

368 `plan info` tag, so this method will determine if the stack metadata is present and 

369 return the correct path. 

370 

371 Parameters 

372 ---------- 

373 mdict : dict 

374 A metadata dictionary as returned by :py:meth:`get_dm3_metadata` 

375 

376 Returns 

377 ------- 

378 A list containing the subsequent keys that need to be traversed to 

379 get to the point in the `mdict` where the important metadata is stored 

380 """ 

381 # test if we have a stack 

382 stack_val = try_getting_dict_value( 

383 mdict, 

384 ["ImageList", "TagGroup0", "ImageTags", "plane info"], 

385 ) 

386 if stack_val is not None: 

387 # we're in a stack 

388 pre_path = [ 

389 "ImageList", 

390 "TagGroup0", 

391 "ImageTags", 

392 "plane info", 

393 "TagGroup0", 

394 "source tags", 

395 ] 

396 else: 

397 pre_path = ["ImageList", "TagGroup0", "ImageTags"] 

398 

399 return pre_path 

400 

401 

402def _migrate_to_schema_compliant_metadata(mdict: dict) -> dict: # noqa: PLR0912 

403 """ 

404 Migrate metadata to schema-compliant format. 

405 

406 This function reorganizes metadata extracted from DM3/DM4 files to conform 

407 to the type-specific metadata schemas. It: 

408 1. Maps display names to EM Glossary field names for core fields 

409 2. Moves vendor-specific fields to the extensions section 

410 3. Converts Stage Position dict to proper StagePosition structure 

411 

412 Parameters 

413 ---------- 

414 mdict : dict 

415 Metadata dictionary with 'nx_meta' key 

416 

417 Returns 

418 ------- 

419 dict 

420 Metadata dictionary with schema-compliant nx_meta 

421 """ 

422 nx_meta = mdict.get("nx_meta", {}) 

423 dataset_type = nx_meta.get("DatasetType", "Image") 

424 

425 # Field mappings from display names to EM Glossary names 

426 # These are core schema fields that just need renaming 

427 # Note: dataset_type-specific fields are handled conditionally below 

428 field_mappings = { 

429 # Common mappings for all types 

430 "Voltage": "acceleration_voltage", 

431 "Horizontal Field Width": "horizontal_field_width", 

432 "Vertical Field Width": "vertical_field_width", 

433 "Acquisition Device": "acquisition_device", 

434 "Sample Time": "dwell_time", 

435 } 

436 

437 # Conditional mappings based on dataset type 

438 if dataset_type == "Diffraction": 

439 field_mappings["STEM Camera Length"] = "camera_length" 

440 if dataset_type in ("Image", "SpectrumImage"): 

441 # magnification is only a core field for image-like datasets; 

442 # for others (e.g. Diffraction) it routes to extensions via the 

443 # fall-through below 

444 field_mappings["Indicated Magnification"] = "magnification" 

445 

446 # Fields that should ALWAYS go to extensions (vendor/instrument-specific) 

447 extension_fields = { 

448 # Gatan-specific 

449 "GMS Version", 

450 "Microscope", 

451 "Operator", 

452 "Specimen", 

453 # Operation modes 

454 "Illumination Mode", 

455 "Imaging Mode", 

456 "Operation Mode", 

457 # Apertures 

458 "Condenser Aperture", 

459 "Objective Aperture", 

460 "Selected Area Aperture", 

461 # Vendor-specific settings 

462 "Cs", # Spherical aberration 

463 # Signal/Analytic metadata 

464 "Signal Name", 

465 "Analytic Format", 

466 "Analytic Label", 

467 "Analytic Signal", 

468 # Nested vendor metadata (will be moved as-is) 

469 "EELS", 

470 "EDS", 

471 # STEM-specific fields that should be extensions for non-Diffraction types 

472 "STEM Camera Length", # Only core for Diffraction 

473 } 

474 

475 # NOTE: "NexusLIMS Extraction" is added AFTER this migration function runs 

476 # by add_extraction_details in __init__.py, so we don't need to handle it here 

477 

478 # Create new nx_meta dict with schema-compliant structure 

479 new_nx_meta = {} 

480 # Preserve any existing extensions (e.g., from instrument profiles) 

481 extensions = nx_meta.get("extensions", {}).copy() if "extensions" in nx_meta else {} 

482 

483 # Copy required fields as-is 

484 required_fields = {"Creation Time", "Data Type", "DatasetType"} 

485 for field in required_fields: 

486 if field in nx_meta: 

487 new_nx_meta[field] = nx_meta[field] 

488 

489 # Copy common optional fields 

490 common_fields = { 

491 "Data Dimensions", 

492 "Instrument ID", 

493 "warnings", 

494 "Extractor Warnings", 

495 } 

496 for field in common_fields: 

497 if field in nx_meta: 

498 new_nx_meta[field] = nx_meta[field] 

499 

500 # Process all other fields 

501 for key, value in nx_meta.items(): 

502 # Skip if already processed 

503 if key in required_fields or key in common_fields: 

504 continue 

505 

506 # Check if it's a core field that needs renaming 

507 if key in field_mappings: 

508 new_key = field_mappings[key] 

509 new_nx_meta[new_key] = value 

510 # Check if it should go to extensions 

511 elif key in extension_fields: 

512 extensions[key] = value 

513 # Handle Stage Position specially 

514 elif key == "Stage Position": 

515 # DM3 files have Stage Position as a dict with keys 

516 # like 'X', 'Y', 'α', etc. # noqa: RUF003 

517 # Convert to snake_case keys for StagePosition schema 

518 if isinstance(value, dict): 

519 stage_pos = {} 

520 key_map = { 

521 "X": "x", 

522 "Y": "y", 

523 "Z": "z", 

524 "α": "tilt_alpha", # noqa: RUF001 

525 "β": "tilt_beta", 

526 } 

527 for old_key, new_key in key_map.items(): 

528 if old_key in value: 

529 # Convert to Pint Quantity if needed 

530 val = value[old_key] 

531 if new_key in ("x", "y") and not isinstance(val, ureg.Quantity): 

532 # X/Y in micrometers 

533 val = ureg.Quantity(val, "micrometer") 

534 elif new_key == "z" and not isinstance(val, ureg.Quantity): 

535 # Z in millimeters 

536 val = ureg.Quantity(val, "millimeter") 

537 elif new_key in ( 

538 "tilt_alpha", 

539 "tilt_beta", 

540 ) and not isinstance(val, ureg.Quantity): 

541 # Tilts in degrees 

542 val = ureg.Quantity(val, "degree") 

543 stage_pos[new_key] = val 

544 # Only emit stage_position when non-empty and the dataset 

545 # type declares the field (Image / SpectrumImage); route 

546 # non-empty values to extensions for other types and drop 

547 # empty dicts entirely. 

548 if stage_pos: 

549 if dataset_type in ("Image", "SpectrumImage"): 

550 new_nx_meta["stage_position"] = stage_pos 

551 else: 

552 extensions["Stage Position"] = stage_pos 

553 else: 

554 # If it's not a dict, move to extensions (this is not expected) 

555 extensions["Stage Position"] = value # pragma: no cover 

556 # Everything else goes to extensions 

557 else: 

558 extensions[key] = value 

559 

560 # Add extensions if any 

561 for key, value in extensions.items(): 

562 add_to_extensions(new_nx_meta, key, value) 

563 

564 mdict["nx_meta"] = new_nx_meta 

565 return mdict 

566 

567 

568def parse_dm3_microscope_info(mdict): # noqa: PLR0912 

569 """ 

570 Parse the "microscope info" metadata. 

571 

572 Parse the "important" metadata that is saved at specific places within the DM3 tag 

573 structure into a consistent place in the metadata dictionary returned by 

574 :py:meth:`get_dm3_metadata`. Specifically looks at the "Microscope Info", 

575 "Session Info", and "Meta Data" nodes (these are not present on every microscope). 

576 

577 Parameters 

578 ---------- 

579 mdict : dict 

580 A metadata dictionary as returned by :py:meth:`get_dm3_metadata` 

581 

582 Returns 

583 ------- 

584 mdict : dict 

585 The same metadata dictionary with some values added under the 

586 root-level ``nx_meta`` key 

587 """ 

588 if "nx_meta" not in mdict: 

589 mdict["nx_meta"] = {} # pragma: no cover 

590 

591 pre_path = get_pre_path(mdict) 

592 

593 # General "microscope info" .dm3 tags (not present on all instruments): 

594 for meta_key in [ 

595 "Indicated Magnification", 

596 "Actual Magnification", 

597 "Cs(mm)", 

598 "STEM Camera Length", 

599 "Voltage", 

600 "Operation Mode", 

601 "Specimen", 

602 "Microscope", 

603 "Operator", 

604 "Imaging Mode", 

605 "Illumination Mode", 

606 "Name", 

607 "Field of View (\u00b5m)", 

608 "Facility", 

609 "Condenser Aperture", 

610 "Objective Aperture", 

611 "Selected Area Aperture", 

612 ["Stage Position", "Stage Alpha"], 

613 ["Stage Position", "Stage Beta"], 

614 ["Stage Position", "Stage X"], 

615 ["Stage Position", "Stage Y"], 

616 ["Stage Position", "Stage Z"], 

617 ]: 

618 base = [*pre_path, "Microscope Info"] 

619 meta_key = _coerce_to_list(meta_key) # noqa: PLW2901 

620 

621 val = try_getting_dict_value(mdict, base + meta_key) 

622 # only add the value to this list if we found it, and it's not one of 

623 # the "facility-wide" set values that do not have any meaning: 

624 if val is not None and val not in ["DO NOT EDIT", "DO NOT ENTER"] and val != []: 

625 # Store original field name for unit mapping 

626 field_name = meta_key[-1] if isinstance(meta_key, list) else meta_key 

627 

628 # Convert to Pint Quantity if the field has units 

629 unit_map = { 

630 "Cs(mm)": "millimeter", 

631 "STEM Camera Length": "millimeter", 

632 "Voltage": "volt", # Will auto-convert to kilovolt 

633 "Field of View (\u00b5m)": "micrometer", 

634 } 

635 if field_name in unit_map: 

636 with contextlib.suppress(ValueError, TypeError): 

637 val = ureg.Quantity(val, unit_map[field_name]) 

638 # Remove unit suffix from field name 

639 if field_name == "Cs(mm)": 

640 meta_key = ["Cs"] # noqa: PLW2901 

641 elif field_name == "Field of View (\u00b5m)": 

642 meta_key = ["Horizontal Field Width"] # noqa: PLW2901 

643 

644 # change output of "Stage Position" to unicode characters 

645 if "Stage Position" in meta_key: 

646 meta_key[-1] = ( 

647 meta_key[-1] 

648 .replace("Alpha", "α") # noqa: RUF001 

649 .replace("Beta", "β") 

650 .replace("Stage ", "") 

651 ) 

652 set_nested_dict_value(mdict, ["nx_meta", *meta_key], val) 

653 

654 # General "session info" .dm3 tags (sometimes this information is stored 

655 # here instead of under "Microscope Info": 

656 for meta_key in ["Detector", "Microscope", "Operator", "Specimen"]: 

657 base = [*pre_path, "Session Info"] 

658 meta_key = _coerce_to_list(meta_key) # noqa: PLW2901 

659 

660 val = try_getting_dict_value(mdict, base + meta_key) 

661 # only add the value to this list if we found it, and it's not 

662 # one of the "facility-wide" set values that do not have any meaning: 

663 if val is not None and val not in ["DO NOT EDIT", "DO NOT ENTER"] and val != []: 

664 set_nested_dict_value(mdict, ["nx_meta", *meta_key], val) 

665 

666 # General "Meta Data" .dm3 tags 

667 for meta_key in [ 

668 "Acquisition Mode", 

669 "Format", 

670 "Signal", 

671 # this one is seen sometimes in EDS signals: 

672 ["Experiment keywords", "TagGroup1", "Label"], 

673 ]: 

674 base = [*pre_path, "Meta Data"] 

675 meta_key = _coerce_to_list(meta_key) # noqa: PLW2901 

676 

677 val = try_getting_dict_value(mdict, base + meta_key) 

678 # only add the value to this list if we found it, and it's not 

679 # one of the "facility-wide" set values that do not have any meaning: 

680 if val is not None and val not in ["DO NOT EDIT", "DO NOT ENTER"] and val != []: 

681 if "Label" in meta_key: 

682 set_nested_dict_value(mdict, ["nx_meta", "Analytic Label"], val) 

683 else: 

684 set_nested_dict_value( 

685 mdict, 

686 ["nx_meta"] + [f"Analytic {lbl}" for lbl in meta_key], 

687 val, 

688 ) 

689 

690 # acquisition device name: 

691 _set_acquisition_device_name(mdict, pre_path) 

692 

693 # exposure time: 

694 _set_exposure_time(mdict, pre_path) 

695 

696 # GMS version: 

697 _set_gms_version(mdict, pre_path) 

698 

699 # camera binning: 

700 _set_camera_binning(mdict, pre_path) 

701 

702 # image processing: 

703 _set_image_processing(mdict, pre_path) 

704 

705 # Signal Name (from DataBar): 

706 signal_name = try_getting_dict_value(mdict, [*pre_path, "DataBar", "Signal Name"]) 

707 if signal_name is not None: 

708 set_nested_dict_value(mdict, ["nx_meta", "Signal Name"], signal_name) 

709 

710 # DigiScan Sample Time (dwell time per pixel in microseconds): 

711 sample_time = try_getting_dict_value(mdict, [*pre_path, "DigiScan", "Sample Time"]) 

712 if sample_time is not None: 

713 with contextlib.suppress(ValueError, TypeError): 

714 sample_time = ureg.Quantity(sample_time, "microsecond") 

715 set_nested_dict_value( 

716 mdict, 

717 ["nx_meta", "Sample Time"], 

718 sample_time, 

719 ) 

720 

721 if ( 

722 "Illumination Mode" in mdict["nx_meta"] 

723 and "STEM" in mdict["nx_meta"]["Illumination Mode"] 

724 ): 

725 mdict["nx_meta"]["Data Type"] = "STEM_Imaging" 

726 

727 return mdict 

728 

729 

730def parse_dm3_eels_info(mdict): 

731 """ 

732 Parse EELS information from the metadata. 

733 

734 Parses metadata from the DigitalMicrograph tag structure that concerns any 

735 EELS acquisition or spectrometer settings, placing it in an ``EELS`` 

736 dictionary underneath the root-level ``nx_meta`` node. 

737 

738 Parameters 

739 ---------- 

740 mdict : dict 

741 A metadata dictionary as returned by :py:meth:`get_dm3_metadata` 

742 

743 Returns 

744 ------- 

745 mdict : dict 

746 The metadata dict with all the "EELS-specific" metadata added under ``nx_meta`` 

747 """ 

748 pre_path = get_pre_path(mdict) 

749 

750 # EELS .dm3 tags of interest: 

751 base = [*pre_path, "EELS"] 

752 for meta_key in [ 

753 ["Acquisition", "Exposure (s)"], 

754 ["Acquisition", "Integration time (s)"], 

755 ["Acquisition", "Number of frames"], 

756 ["Experimental Conditions", "Collection semi-angle (mrad)"], 

757 ["Experimental Conditions", "Convergence semi-angle (mrad)"], 

758 ]: 

759 _set_eels_meta(mdict, base, meta_key) 

760 

761 # different instruments have the spectrometer information in different 

762 # places... 

763 if mdict["nx_meta"]["Instrument ID"] == "FEI-Titan-TEM": 

764 base = [*pre_path, "EELS", "Acquisition", "Spectrometer"] 

765 elif mdict["nx_meta"]["Instrument ID"] == "FEI-Titan-STEM": 

766 base = [*pre_path, "EELS Spectrometer"] 

767 else: 

768 base = None 

769 if base is not None: 

770 for meta_key in [ 

771 "Aperture label", 

772 "Dispersion (eV/ch)", 

773 "Energy loss (eV)", 

774 "Instrument name", 

775 "Drift tube enabled", 

776 "Drift tube voltage (V)", 

777 "Slit inserted", 

778 "Slit width (eV)", 

779 "Prism offset (V)", 

780 "Prism offset enabled ", 

781 ]: 

782 meta_key = [meta_key] # noqa: PLW2901 

783 _set_eels_spectrometer_meta(mdict, base, meta_key) 

784 

785 _set_eels_processing(mdict, pre_path) 

786 

787 # Set the dataset type to Spectrum if any EELS tags were added 

788 if "EELS" in mdict["nx_meta"]: 

789 _logger.info("Detected file as Spectrum type based on EELS metadata") 

790 mdict["nx_meta"]["DatasetType"] = "Spectrum" 

791 if "STEM" in mdict["nx_meta"]["Illumination Mode"]: 

792 mdict["nx_meta"]["Data Type"] = "STEM_EELS" 

793 else: 

794 mdict["nx_meta"]["Data Type"] = "TEM_EELS" 

795 

796 return mdict 

797 

798 

799def parse_dm3_eds_info(mdict): 

800 """ 

801 Parse EDS information from the dm3 metadata. 

802 

803 Parses metadata from the DigitalMicrograph tag structure that concerns any 

804 EDS acquisition or spectrometer settings, placing it in an ``EDS`` 

805 dictionary underneath the root-level ``nx_meta`` node. Metadata values 

806 that are commonly incorrect or may be placeholders are specified in a 

807 list under the ``nx_meta.warnings`` node. 

808 

809 Parameters 

810 ---------- 

811 mdict : dict 

812 A metadata dictionary as returned by :py:meth:`get_dm3_metadata` 

813 

814 Returns 

815 ------- 

816 mdict : dict 

817 The metadata dictionary with all the "EDS-specific" metadata 

818 added as sub-node under the ``nx_meta`` root level dictionary 

819 """ 

820 pre_path = get_pre_path(mdict) 

821 

822 # EELS .dm3 tags of interest: 

823 base = [*pre_path, "EDS"] 

824 

825 for meta_key in [ 

826 ["Acquisition", "Continuous Mode"], 

827 ["Acquisition", "Count Rate Unit"], 

828 ["Acquisition", "Dispersion (eV)"], 

829 ["Acquisition", "Energy Cutoff (V)"], 

830 ["Acquisition", "Exposure (s)"], 

831 ["Count rate"], 

832 ["Detector Info", "Active layer"], 

833 ["Detector Info", "Azimuthal angle"], 

834 ["Detector Info", "Dead layer"], 

835 ["Detector Info", "Detector type"], 

836 ["Detector Info", "Elevation angle"], 

837 ["Detector Info", "Fano"], 

838 ["Detector Info", "Gold layer"], 

839 ["Detector Info", "Incidence angle"], 

840 ["Detector Info", "Solid angle"], 

841 ["Detector Info", "Stage tilt"], 

842 ["Detector Info", "Window thickness"], 

843 ["Detector Info", "Window type"], 

844 ["Detector Info", "Zero fwhm"], 

845 ["Live time"], 

846 ["Real time"], 

847 ]: 

848 _set_eds_meta(mdict, base, meta_key) 

849 

850 # test to see if the SI attribute is present in the metadata dictionary. 

851 # If so, then some relevant EDS values are located there, rather 

852 # than in the root-level EDS tag (all the EDS.Acquisition tags from 

853 # above) 

854 if try_getting_dict_value(mdict, [*pre_path, "SI"]) is not None: 

855 for meta_key in [ 

856 ["Acquisition", "Continuous Mode"], 

857 ["Acquisition", "Count Rate Unit"], 

858 ["Acquisition", "Dispersion (eV)"], 

859 ["Acquisition", "Energy Cutoff (V)"], 

860 ["Acquisition", "Exposure (s)"], 

861 ]: 

862 _set_si_meta(mdict, pre_path, meta_key) 

863 

864 # for an SI EDS dataset, set "Live time", "Real time" and "Count rate" 

865 # to the averages stored in the ImageList.TagGroup0.ImageTags.EDS.Images 

866 # values 

867 im_dict = try_getting_dict_value(mdict, [*pre_path, "EDS", "Images"]) 

868 if isinstance(im_dict, dict): 

869 for k, v in im_dict.items(): 

870 if k in mdict["nx_meta"]["EDS"]: 

871 del mdict["nx_meta"]["EDS"][k] 

872 # this should work for 2D (spectrum image) as well as 1D 

873 # (linescan) datasets since DM saves this information as a 1D 

874 # list regardless of original data shape 

875 avg_val = np.array(v).mean() 

876 set_nested_dict_value( 

877 mdict, 

878 ["nx_meta", "EDS", f"{k} (SI Average)"], 

879 avg_val, 

880 ) 

881 

882 # Add the .dm3 EDS values to the warnings list, since they might not be 

883 # accurate 

884 for meta_key in [ 

885 ["Count rate"], 

886 ["Detector Info", "Active layer"], 

887 ["Detector Info", "Azimuthal angle"], 

888 ["Detector Info", "Dead layer"], 

889 ["Detector Info", "Detector type"], 

890 ["Detector Info", "Elevation angle"], 

891 ["Detector Info", "Fano"], 

892 ["Detector Info", "Gold layer"], 

893 ["Detector Info", "Incidence angle"], 

894 ["Detector Info", "Solid angle"], 

895 ["Detector Info", "Stage tilt"], 

896 ["Detector Info", "Window thickness"], 

897 ["Detector Info", "Window type"], 

898 ["Detector Info", "Zero fwhm"], 

899 ["Live time"], 

900 ["Real time"], 

901 ]: 

902 if try_getting_dict_value(mdict, base + meta_key) is not None: 

903 mdict["nx_meta"]["warnings"].append( 

904 ["EDS", meta_key[-1] if len(meta_key) > 1 else meta_key[0]], 

905 ) 

906 

907 # Set the dataset type to Spectrum if any EDS tags were added 

908 if "EDS" in mdict["nx_meta"]: 

909 _logger.info("Detected file as Spectrum type based on presence of EDS metadata") 

910 mdict["nx_meta"]["DatasetType"] = "Spectrum" 

911 if "STEM" in mdict["nx_meta"]["Illumination Mode"]: 

912 mdict["nx_meta"]["Data Type"] = "STEM_EDS" 

913 else: 

914 # no known files match this mode, so skip for coverage 

915 mdict["nx_meta"]["Data Type"] = "TEM_EDS" # pragma: no cover 

916 

917 return mdict 

918 

919 

920def parse_dm3_spectrum_image_info(mdict): 

921 """ 

922 Parse "spectrum image" information from the metadata. 

923 

924 Parses metadata that concerns any spectrum imaging information (the "SI" tag) and 

925 places it in a "Spectrum Imaging" dictionary underneath the root-level ``nx_meta`` 

926 node. Metadata values that are commonly incorrect or may be placeholders are 

927 specified in a list under the ``nx_meta.warnings`` node. 

928 

929 Parameters 

930 ---------- 

931 mdict : dict 

932 A metadata dictionary as returned by :py:meth:`get_dm3_metadata` 

933 

934 Returns 

935 ------- 

936 mdict : dict 

937 The metadata dictionary with all the "EDS-specific" metadata 

938 added as sub-node under the ``nx_meta`` root level dictionary 

939 """ 

940 pre_path = get_pre_path(mdict) 

941 

942 # Spectrum imaging .dm3 tags of interest: 

943 base = [*pre_path, "SI"] 

944 

945 for m_in, m_out in [ 

946 (["Acquisition", "Pixel time (s)"], ["Pixel time (s)"]), 

947 (["Acquisition", "SI Application Mode", "Name"], ["Scan Mode"]), 

948 ( 

949 ["Acquisition", "Spatial Sampling", "Height (pixels)"], 

950 ["Spatial Sampling (Vertical)"], 

951 ), 

952 ( 

953 ["Acquisition", "Spatial Sampling", "Width (pixels)"], 

954 ["Spatial Sampling (Horizontal)"], 

955 ), 

956 ( 

957 ["Acquisition", "Scan Options", "Sub-pixel sampling"], 

958 ["Sub-pixel Sampling Factor"], 

959 ), 

960 ]: 

961 val = try_getting_dict_value(mdict, base + m_in) 

962 # only add the value to this list if we found it, and it's not 

963 # one of the "facility-wide" set values that do not have any meaning: 

964 if val is not None: 

965 # Convert to Pint Quantity if the field has units 

966 output_key = m_out[0] if len(m_out) == 1 else m_out 

967 if output_key == "Pixel time (s)": 

968 with contextlib.suppress(ValueError, TypeError): 

969 val = ureg.Quantity(val, "second") 

970 output_key = ["Pixel time"] 

971 # add last value of each parameter to the "Spectrum Imaging" sub-tree 

972 key_list = [output_key] if isinstance(output_key, str) else output_key 

973 set_nested_dict_value( 

974 mdict, ["nx_meta", "Spectrum Imaging", *key_list], val 

975 ) 

976 

977 # Check spatial drift correction separately: 

978 drift_per_val = try_getting_dict_value( 

979 mdict, 

980 [*base, "Acquisition", "Artefact Correction", "Spatial Drift", "Periodicity"], 

981 ) 

982 drift_unit_val = try_getting_dict_value( 

983 mdict, 

984 [*base, "Acquisition", "Artefact Correction", "Spatial Drift", "Units"], 

985 ) 

986 if drift_per_val is not None and drift_unit_val is not None: 

987 val_to_set = f"Spatial drift correction every {drift_per_val} {drift_unit_val}" 

988 # make sure statement looks gramatically correct 

989 if drift_per_val == 1: 

990 val_to_set = val_to_set.replace("(s)", "") 

991 else: 

992 val_to_set = val_to_set.replace("(s)", "s") 

993 # fix for "seconds(s)" (*********...) 

994 if val_to_set[-2:] == "ss": 

995 val_to_set = val_to_set[:-1] 

996 set_nested_dict_value( 

997 mdict, 

998 ["nx_meta", "Spectrum Imaging", "Artefact Correction"], 

999 val_to_set, 

1000 ) 

1001 

1002 start_val = try_getting_dict_value(mdict, [*base, "Acquisition", "Start time"]) 

1003 end_val = try_getting_dict_value(mdict, [*base, "Acquisition", "End time"]) 

1004 if start_val is not None and end_val is not None: 

1005 start_dt = dt.strptime(start_val, "%I:%M:%S %p").replace(tzinfo=UTC) 

1006 end_dt = dt.strptime(end_val, "%I:%M:%S %p").replace(tzinfo=UTC) 

1007 duration = (end_dt - start_dt).seconds # Calculate acquisition duration 

1008 with contextlib.suppress(ValueError, TypeError): 

1009 duration = ureg.Quantity(duration, "second") 

1010 set_nested_dict_value( 

1011 mdict, 

1012 ["nx_meta", "Spectrum Imaging", "Acquisition Duration"], 

1013 duration, 

1014 ) 

1015 

1016 # Set the dataset type to SpectrumImage if it is already a Spectrum ( otherwise it's 

1017 # just a STEM image) and any Spectrum Imaging tags were added 

1018 if ( 

1019 "Spectrum Imaging" in mdict["nx_meta"] 

1020 and mdict["nx_meta"]["DatasetType"] == "Spectrum" 

1021 ): 

1022 _logger.info( 

1023 "Detected file as SpectrumImage type based on " 

1024 "presence of spectral metadata and spectrum imaging " 

1025 "info", 

1026 ) 

1027 mdict["nx_meta"]["DatasetType"] = "SpectrumImage" 

1028 mdict["nx_meta"]["Data Type"] = "Spectrum_Imaging" 

1029 if "EELS" in mdict["nx_meta"]: 

1030 mdict["nx_meta"]["Data Type"] = "EELS_Spectrum_Imaging" 

1031 if "EDS" in mdict["nx_meta"]: 

1032 mdict["nx_meta"]["Data Type"] = "EDS_Spectrum_Imaging" 

1033 

1034 return mdict 

1035 

1036 

1037def _parse_stage_position(tecnai_info): 

1038 """ 

1039 Parse stage position from Tecnai metadata. 

1040 

1041 Parameters 

1042 ---------- 

1043 tecnai_info : list 

1044 Split metadata strings 

1045 

1046 Returns 

1047 ------- 

1048 dict 

1049 Dictionary with stage position x, y, z, theta, phi values 

1050 """ 

1051 tmp = _find_val("Stage ", tecnai_info).split(",") 

1052 tmp = [_try_decimal(t.strip(" umdeg")) for t in tmp] 

1053 return { 

1054 "Stage_Position_x": tmp[0], 

1055 "Stage_Position_y": tmp[1], 

1056 "Stage_Position_z": tmp[2], 

1057 "Stage_Position_theta": tmp[3], 

1058 "Stage_Position_phi": tmp[4], 

1059 } 

1060 

1061 

1062def _parse_apertures(tecnai_info): 

1063 """ 

1064 Parse aperture settings from Tecnai metadata. 

1065 

1066 Parameters 

1067 ---------- 

1068 tecnai_info : list 

1069 Split metadata strings 

1070 

1071 Returns 

1072 ------- 

1073 dict 

1074 Dictionary with C1, C2, Obj, and SA aperture values 

1075 """ 

1076 

1077 def _read_aperture(val, tecnai_info_): 

1078 """Test if aperture has value or is retracted.""" 

1079 try: 

1080 value = _find_val(val, tecnai_info_).strip(" um") 

1081 return int(value) 

1082 except (ValueError, AttributeError): 

1083 return None 

1084 

1085 return { 

1086 "C1_Aperture": _read_aperture("C1 Aperture: ", tecnai_info), 

1087 "C2_Aperture": _read_aperture("C2 Aperture: ", tecnai_info), 

1088 "Obj_Aperture": _read_aperture("OBJ Aperture: ", tecnai_info), 

1089 "SA_Aperture": _read_aperture("SA Aperture: ", tecnai_info), 

1090 } 

1091 

1092 

1093def process_tecnai_microscope_info( 

1094 microscope_info, 

1095 delimiter="\u2028", 

1096): 

1097 """ 

1098 Process the Microscope_Info metadata string into a dictionary of key-value pairs. 

1099 

1100 This method is only relevant for FEI Titan TEMs that write additional metadata into 

1101 a unicode-delimited string at a certain place in the DM3 tag structure 

1102 

1103 Parameters 

1104 ---------- 

1105 microscope_info : str 

1106 The string of data obtained from the Tecnai.Microscope_Info leaf of the metadata 

1107 delimiter : str 

1108 The value (a unicode string) used to split the ``microscope_info`` string. 

1109 

1110 Returns 

1111 ------- 

1112 info_dict : dict 

1113 The information contained in the string, in a more easily-digestible form. 

1114 """ 

1115 info_dict = {} 

1116 tecnai_info = microscope_info.split(delimiter) 

1117 info_dict["Microscope_Name"] = _find_val("Microscope ", tecnai_info) # String 

1118 info_dict["User"] = _find_val("User ", tecnai_info) # String 

1119 

1120 tmp = _find_val("Gun ", tecnai_info) 

1121 info_dict["Gun_Name"] = tmp[: tmp.index(" Extr volt")] 

1122 tmp = tmp[tmp.index(info_dict["Gun_Name"]) + len(info_dict["Gun_Name"]) :] # String 

1123 

1124 tmp = tmp.replace("Extr volt ", "") 

1125 info_dict["Extractor_Voltage"] = int(tmp.split()[0]) # Integer (volts) 

1126 

1127 tmp = tmp[tmp.index("Gun Lens ") + len("Gun Lens ") :] 

1128 info_dict["Gun_Lens_No"] = int(tmp.split()[0]) # Integer 

1129 

1130 tmp = tmp[tmp.index("Emission ") + len("Emission ") :] 

1131 info_dict["Emission_Current"] = _try_decimal(tmp.split("uA")[0]) # Decimal (microA) 

1132 

1133 tmp = _find_val("Mode ", tecnai_info) 

1134 info_dict["Mode"] = tmp[: tmp.index(" Defocus")] # String 

1135 # 'Mode' should be five terms long, and the last term is either 'Image', 

1136 # 'Diffraction', (or maybe something else) 

1137 

1138 # Decimal val (micrometer) 

1139 if "Magn " in tmp: # Imaging mode 

1140 info_dict["Defocus"] = _try_decimal(tmp.split("Defocus (um) ")[1].split()[0]) 

1141 elif "CL " in tmp: # Diffraction mode 

1142 info_dict["Defocus"] = _try_decimal(tmp.split("Defocus ")[1].split()[0]) 

1143 

1144 # This value changes based on whether in image or diffraction mode (mag or CL) 

1145 # Integer 

1146 if info_dict["Mode"].split()[4] == "Image": 

1147 info_dict["Magnification"] = int(tmp.split("Magn ")[1].strip("x")) 

1148 # Decimal 

1149 elif info_dict["Mode"].split()[4] == "Diffraction": 

1150 info_dict["Camera_Length"] = _try_decimal(tmp.split("CL ")[1].strip("m")) 

1151 

1152 # Integer (1 to 5) 

1153 info_dict["Spot"] = int(_find_val("Spot ", tecnai_info)) 

1154 

1155 # Decimals - Lens strengths expressed as a "%" value 

1156 info_dict["C2_Strength"] = _try_decimal(_find_val("C2 ", tecnai_info).strip("%")) 

1157 info_dict["C3_Strength"] = _try_decimal(_find_val("C3 ", tecnai_info).strip("%")) 

1158 info_dict["Obj_Strength"] = _try_decimal(_find_val("Obj ", tecnai_info).strip("%")) 

1159 info_dict["Dif_Strength"] = _try_decimal(_find_val("Dif ", tecnai_info).strip("%")) 

1160 

1161 # Decimal values (micrometers) 

1162 tmp = _find_val("Image shift ", tecnai_info).strip("um") 

1163 info_dict["Image_Shift_x"] = _try_decimal(tmp.split("/")[0]) 

1164 info_dict["Image_Shift_y"] = _try_decimal(tmp.split("/")[1]) 

1165 

1166 # Parse stage position and apertures using helper functions 

1167 info_dict.update(_parse_stage_position(tecnai_info)) 

1168 info_dict.update(_parse_apertures(tecnai_info)) 

1169 

1170 # Nested dictionary 

1171 info_dict = _parse_filter_settings(info_dict, tecnai_info) 

1172 

1173 return _parse_filter_settings(info_dict, tecnai_info)