Coverage for nexusLIMS/schemas/activity.py: 100%
223 statements
« prev ^ index » next coverage.py v7.11.3, created at 2026-03-24 05:23 +0000
« prev ^ index » next coverage.py v7.11.3, created at 2026-03-24 05:23 +0000
1"""
2The "Acquisition Activity" module.
4Provides a class to represent and operate on an Acquisition Activity (as defined by the
5NexusLIMS `Experiment` schema), as well as a helper method to cluster a list of
6filenames by the files' modification times.
7"""
9import logging
10import math
11from dataclasses import dataclass, field
12from datetime import datetime as dt
13from pathlib import Path
14from timeit import default_timer
15from typing import Any, Dict, List
16from urllib.parse import quote, unquote
17from xml.sax.saxutils import escape
19import numpy as np
20from lxml import etree
21from pint import Quantity
22from scipy.signal import argrelextrema
23from sklearn.model_selection import GridSearchCV, LeaveOneOut
24from sklearn.neighbors import KernelDensity
26from nexusLIMS.config import settings
27from nexusLIMS.extractors import flatten_dict, parse_metadata
28from nexusLIMS.extractors.xml_serialization import serialize_quantity_to_xml
29from nexusLIMS.schemas import em_glossary
30from nexusLIMS.utils.time import current_system_tz
32_logger = logging.getLogger(__name__)
35def cluster_filelist_mtimes(filelist: List[str]) -> List[float]:
36 """
37 Cluster a list of files by modification time.
39 Perform a statistical clustering of the timestamps (`mtime` values) of a
40 list of files to find "relatively" large gaps in acquisition time. The
41 definition of `relatively` depends on the context of the entire list of
42 files. For example, if many files are simultaneously acquired,
43 the "inter-file" time spacing between these will be very small (near zero),
44 meaning even fairly short gaps between files may be important.
45 Conversely, if files are saved every 30 seconds or so, the tolerance for
46 a "large gap" will need to be correspondingly larger.
48 The approach this method uses is to detect minima in the
49 [Kernel Density Estimation](https://scikit-learn.org/stable/modules/density.html#kernel-density)
50 (KDE) of the file modification times. To determine the optimal bandwidth parameter
51 to use in KDE, a [grid search](https://scikit-learn.org/stable/modules/grid_search.html#grid-search)
52 over possible appropriate bandwidths is performed, using
53 [Leave One Out](https://scikit-learn.org/stable/modules/cross_validation.html#leave-one-out-loo)
54 cross-validation. This approach allows the method to determine the
55 important gaps in file acquisition times with sensitivity controlled by
56 the distribution of the data itself, rather than a pre-supposed optimum.
57 The KDE minima approach was suggested [here](https://stackoverflow.com/a/35151947/1435788).
59 The sensitivity of the clustering can be controlled via the
60 ``NX_CLUSTERING_SENSITIVITY`` environment variable:
62 - Values > 1.0 make clustering more sensitive to time gaps (more activities)
63 - Values < 1.0 make clustering less sensitive (fewer activities)
64 - Value of 0 disables clustering entirely (all files in one activity)
65 - Default is 1.0 (no adjustment to automatic clustering)
67 Parameters
68 ----------
69 filelist : List[str]
70 The files (as a list) whose timestamps will be interrogated to find
71 "relatively" large gaps in acquisition time (as a means to find the
72 breaks between discrete Acquisition Activities)
74 Returns
75 -------
76 aa_boundaries : List[float]
77 A list of the `mtime` values that represent boundaries between
78 discrete Acquisition Activities. Returns empty list if clustering
79 is disabled or only one file is provided.
80 """
81 # Check if clustering is disabled
82 sensitivity = settings.NX_CLUSTERING_SENSITIVITY
83 if sensitivity == 0:
84 _logger.info("Clustering disabled (NX_CLUSTERING_SENSITIVITY=0)")
85 return []
87 _logger.info("Starting clustering of file mtimes")
88 start_timer = default_timer()
89 mtimes = sorted([f.stat().st_mtime for f in filelist])
91 # remove duplicate file mtimes (since they cause errors below):
92 mtimes = sorted(set(mtimes))
93 m_array = np.array(mtimes).reshape(-1, 1)
95 if len(mtimes) == 1:
96 # if there was only one file, don't do any more processing and just
97 # return the one mtime as the AA boundary
98 return mtimes
100 # mtime_diff is a discrete differentiation to find the time gap between
101 # sequential files
102 mtime_diff = [j - i for i, j in zip(mtimes[:-1], mtimes[1:])]
104 # Bandwidth to use is uncertain, so do a grid search over possible values
105 # from smallest to largest sequential mtime difference (logarithmically
106 # biased towards smaller values). we do cross-validation using the Leave
107 # One Out strategy and using the total log-likelihood from the KDE as
108 # the score to maximize (goodness of fit)
109 bandwidths = np.logspace(
110 math.log(min(mtime_diff)),
111 math.log(max(mtime_diff)),
112 35,
113 base=math.e,
114 )
115 _logger.info("KDE bandwidth grid search")
116 grid = GridSearchCV(
117 KernelDensity(kernel="gaussian"),
118 {"bandwidth": bandwidths},
119 cv=LeaveOneOut(),
120 n_jobs=-1,
121 )
122 grid.fit(m_array)
123 bandwidth = grid.best_params_["bandwidth"]
125 # Apply sensitivity adjustment: higher sensitivity = smaller bandwidth = more
126 # activity boundaries detected. We divide by sensitivity so that values > 1
127 # result in smaller bandwidth (more sensitive to gaps).
128 if sensitivity != 1.0:
129 adjusted_bandwidth = bandwidth / sensitivity
130 _logger.info(
131 "Adjusted bandwidth from %.3f to %.3f (sensitivity=%.2f)",
132 bandwidth,
133 adjusted_bandwidth,
134 sensitivity,
135 )
136 bandwidth = adjusted_bandwidth
137 else:
138 _logger.info("Using bandwidth of %.3f for KDE", bandwidth)
140 # Calculate AcquisitionActivity boundaries by "clustering" the timestamps
141 # using KDE using KDTree nearest neighbor estimates, and the previously
142 # identified "optimal" bandwidth
143 kde = KernelDensity(kernel="gaussian", bandwidth=bandwidth)
144 kde: KernelDensity = kde.fit(m_array)
145 s = np.linspace(m_array.min(), m_array.max(), num=len(mtimes) * 10)
146 scores = kde.score_samples(s.reshape(-1, 1))
148 mins = argrelextrema(scores, np.less)[0] # the minima indices
149 aa_boundaries = [s[m] for m in mins] # the minima mtime values
150 end_timer = default_timer()
151 _logger.info(
152 "Detected %i activities in %.2f seconds",
153 len(aa_boundaries) + 1,
154 end_timer - start_timer,
155 )
157 return aa_boundaries
160def _escape(val: Any) -> Any:
161 """
162 Check to see if a value needs to be escaped and escape it or just return it as is.
164 Parameters
165 ----------
166 val
167 The value to conditionally escape
169 Returns
170 -------
171 Any
172 The value either as-is or escaped
173 """
174 if isinstance(val, str) and any(c in val for c in "<&"):
175 return escape(val)
176 return val
179def _add_dataset_element( # noqa: PLR0913
180 file: str,
181 aq_ac_xml_el: etree.Element,
182 meta: Dict,
183 unique_meta: Dict,
184 warning: List,
185 preview_path: Path | None = None,
186 signal_index: int | None = None,
187 total_signals: int | None = None,
188):
189 # escape any bad characters in the filename
190 file = _escape(file)
192 # build path to thumbnail
193 rel_fname = file.replace(str(settings.NX_INSTRUMENT_DATA_PATH), "")
195 # Use provided preview path if available, otherwise compute from filename
196 if preview_path is not None:
197 # Convert preview path to relative path
198 rel_thumb_name = str(preview_path).replace(str(settings.NX_DATA_PATH), "")
199 else:
200 # Legacy: compute from filename
201 rel_thumb_name = f"{rel_fname}.thumb.png"
203 # encode for safe URLs
204 rel_fname = quote(rel_fname)
205 rel_thumb_name = quote(rel_thumb_name)
207 # f is string; um is a dictionary, w is a list
208 dset_el = etree.SubElement(aq_ac_xml_el, "dataset")
209 dset_el.set("type", str(meta["DatasetType"]))
210 dset_el.set("role", "Experimental")
212 dset_name_el = etree.SubElement(dset_el, "name")
213 # For multi-signal files, append signal index to make names unique
214 base_name = Path(file).name
215 if signal_index is not None and total_signals is not None and total_signals > 1:
216 # Append signal index in format: "filename.ext (X of Y)"
217 dset_name_el.text = f"{base_name} ({signal_index + 1} of {total_signals})"
218 else:
219 dset_name_el.text = base_name
221 dset_loc_el = etree.SubElement(dset_el, "location")
222 dset_loc_el.text = rel_fname
224 # check if preview image exists before adding it XML structure
225 if rel_thumb_name[0] == "/":
226 test_path = Path(settings.NX_DATA_PATH) / unquote(rel_thumb_name)[1:]
227 else: # pragma: no cover
228 # this shouldn't happen, but just in case...
229 test_path = Path(settings.NX_DATA_PATH) / unquote(rel_thumb_name)
231 if test_path.exists():
232 dset_prev_el = etree.SubElement(dset_el, "preview")
233 dset_prev_el.text = rel_thumb_name
235 for meta_k, meta_v in sorted(unique_meta.items(), key=lambda i: i[0].lower()):
236 if meta_k not in ["warnings", "DatasetType"]:
237 meta_el = etree.SubElement(dset_el, "meta")
238 meta_el.set("name", str(meta_k))
239 if meta_k in warning:
240 meta_el.set("warning", "true")
242 # Handle Pint Quantity objects with unit attribute
243 if isinstance(meta_v, Quantity):
244 magnitude, unit = serialize_quantity_to_xml(meta_v)
245 meta_el.text = str(magnitude)
246 meta_el.set("unit", unit)
247 else:
248 # Handle regular values (strings, numbers, etc.)
249 meta_v = _escape(meta_v) # noqa: PLW2901
250 meta_el.text = str(meta_v)
252 return aq_ac_xml_el
255@dataclass
256class AcquisitionActivity:
257 """
258 A collection of files/metadata attributed to a physical acquisition activity.
260 Instances of this class correspond to AcquisitionActivity nodes in the
261 [NexusLIMS schema](https://data.nist.gov/od/dm/nexus/experiment/v1.0).
263 Parameters
264 ----------
265 start : datetime.datetime
266 The start point of this AcquisitionActivity
267 end : datetime.datetime
268 The end point of this AcquisitionActivity
269 mode : str
270 The microscope mode for this AcquisitionActivity (i.e. 'IMAGING',
271 'DIFFRACTION', 'SCANNING', etc.)
272 unique_params : set
273 A set of dictionary keys that comprises all unique metadata keys
274 contained within the files of this AcquisitionActivity
275 setup_params : dict
276 A dictionary containing metadata about the data that is shared
277 amongst all data files in this AcquisitionActivity
278 unique_meta : list
279 A list of dictionaries (one for each file in this
280 AcquisitionActivity) containing metadata key-value pairs that are
281 unique to each file in ``files`` (i.e. those that could not be moved
282 into ``setup_params``)
283 files : list
284 A list of filenames belonging to this AcquisitionActivity
285 previews : list
286 A list of filenames pointing to the previews for each file in
287 ``files``
288 meta : list
289 A list of dictionaries containing the "important" metadata for each
290 file in ``files``
291 warnings : list
292 A list of metadata values that may be untrustworthy because of the
293 software
294 """
296 start: dt | None = None
297 end: dt | None = None
298 mode: str = ""
299 unique_params: set | None = None
300 setup_params: dict | None = None
301 unique_meta: list | None = None
302 files: list = field(default_factory=list)
303 previews: list = field(default_factory=list)
304 meta: list = field(default_factory=list)
305 warnings: list = field(default_factory=list)
307 def __post_init__(self):
308 """Post-initialization to set defaults for start/end times."""
309 if self.start is None:
310 self.start = dt.now(tz=current_system_tz())
311 if self.end is None:
312 self.end = dt.now(tz=current_system_tz())
313 if self.unique_params is None:
314 self.unique_params = set()
316 def __repr__(self):
317 """Return custom representation of AcquisitionActivity."""
318 return (
319 f"{self.mode:<12} AcquisitionActivity; "
320 f"start: {self.start.isoformat()}; "
321 f"end: {self.end.isoformat()}"
322 )
324 def __str__(self):
325 """Return custom string representation of AcquisitionActivity."""
326 return f"{self.start.isoformat()} AcquisitionActivity {self.mode}"
328 def add_file(self, fname: Path, *, generate_preview=True):
329 """
330 Add file to AcquisitionActivity.
332 Add a file to this activity's file list, parse its metadata (storing
333 a flattened copy of it to this activity), and generate a preview
334 thumbnail.
336 parse_metadata always returns a list of metadata dicts (one per signal).
337 For files containing multiple signals (e.g., multi-signal DM3/DM4 files),
338 this method adds one entry per signal to the parallel lists, repeating
339 the filename for each signal but using different preview paths and metadata.
341 Parameters
342 ----------
343 fname : str
344 The file to be added to the file list
345 generate_preview : bool
346 Whether or not to create the preview thumbnail images
347 """
348 if fname.exists():
349 gen_prev = generate_preview
350 meta_list, preview_fnames = parse_metadata(fname, generate_preview=gen_prev)
352 if meta_list is None:
353 # Something bad happened, so we need to alert the user
354 _logger.warning("Could not parse metadata of %s", fname)
355 # Still add the file to maintain original behavior
356 self.files.append(str(fname))
357 self.previews.append(None)
358 self.meta.append({})
359 self.warnings.append([])
360 else:
361 # meta_list is always a list of dicts, one per signal
362 for i, signal_meta in enumerate(meta_list):
363 self.files.append(
364 str(fname)
365 ) # Same file, repeated for multi-signal
367 # Merge extensions into root level before flattening
368 # This ensures vendor-specific fields appear at root in XML
369 nx_meta = signal_meta["nx_meta"].copy()
370 if "extensions" in nx_meta:
371 extensions = nx_meta.pop("extensions")
372 nx_meta.update(extensions)
374 # Convert EM Glossary snake_case fields to display names for XML
375 # Only convert fields that are in snake_case (contain underscores)
377 nx_meta_for_xml = {}
378 for field_name, value in nx_meta.items():
379 # Only convert snake_case EM Glossary field names
380 if "_" in field_name and field_name.islower():
381 display_name = em_glossary.get_display_name(field_name)
382 nx_meta_for_xml[display_name] = value
383 else:
384 # Keep original name (DatasetType, Data Type, etc.)
385 nx_meta_for_xml[field_name] = value
387 self.meta.append(
388 flatten_dict(nx_meta_for_xml, separator=" – ") # noqa: RUF001
389 )
391 # Handle previews (always a list)
392 if preview_fnames and i < len(preview_fnames):
393 self.previews.append(preview_fnames[i])
395 # Handle warnings
396 if "warnings" in signal_meta["nx_meta"]:
397 self.warnings.append(
398 [" ".join(w) for w in signal_meta["nx_meta"]["warnings"]],
399 )
400 else:
401 self.warnings.append([])
402 else:
403 msg = f"{fname} was not found"
404 raise FileNotFoundError(msg)
405 _logger.debug("appended %s to files", fname)
406 _logger.debug("self.files is now %s", self.files)
408 def store_unique_params(self):
409 """
410 Store unique metadata keys.
412 Analyze the metadata keys contained in this AcquisitionActivity and
413 store the unique values in a set (``self.unique_params``).
414 """
415 # self.meta is a list of dictionaries
416 for meta in self.meta:
417 self.unique_params.update(meta.keys())
419 def store_setup_params(self, values_to_search=None):
420 """
421 Store common metadata keys as "setup parameters".
423 Search the metadata of files in this AcquisitionActivity for those
424 containing identical values over all files, which will then be defined
425 as parameters attributed to experimental setup, rather than individual
426 datasets.
428 Stores a dictionary containing the metadata keys and values that are
429 consistent across all files in this AcquisitionActivity as an
430 attribute (``self.setup_params``).
432 Parameters
433 ----------
434 values_to_search : list
435 A list (or tuple, set, or other iterable type) containing values to
436 search for in the metadata dictionary list. If None (default), all
437 values contained in any file will be searched.
438 """
439 # Make sure unique params are defined before proceeding:
440 if self.unique_params == set():
441 _logger.info("Storing unique parameters for files in AcquisitionActivity")
442 self.store_unique_params()
444 if len(self.files) == 1:
445 _logger.info(
446 "Only one file found in this activity, so leaving "
447 "metadata associated with the file, rather than "
448 "activity",
449 )
450 self.setup_params = {}
451 return
453 if values_to_search is None:
454 values_to_search = self.unique_params
456 # meta will be individual dictionaries, since self.meta is list of dicts
457 setup_params = {}
458 for i, (meta, _file) in enumerate(zip(self.meta, self.files)):
459 # loop through the values_to_search
460 # using .copy() on the set allows us to remove values during each
461 # iteration, as described in:
462 # https://stackoverflow.com/a/22847851/1435788
463 for vts in values_to_search.copy():
464 # for the first iteration through the list of dictionaries,
465 # store any value found for a parameter
466 # as a "setup parameter". if it is not found, do not store it
467 # and remove from values_to_search to prevent it being searched
468 # on subsequent iterations.
469 if i == 0:
470 if vts in meta:
471 # this value was found in meta, so store it
472 setup_params[vts] = meta[vts]
473 _logger.debug(
474 "iter: %i; adding %s = %s to setup_params",
475 i,
476 vts,
477 meta[vts],
478 )
479 else:
480 # this value wasn't present in meta, so it can't be
481 # common to all, so remove it:
482 _logger.debug("iter: %i; removing %s", i, vts)
483 values_to_search.remove(vts)
484 # On the subsequent iterations test if values are same/different
485 # If different, then remove the key from setup_params and
486 # values_to_search, so at the end only identical values remain
487 # and duplicate value checks are minimized
488 else:
489 if vts not in setup_params:
490 # this condition should probably not be reached,
491 # but if it is, it means this value, which should
492 # have already been added to setup_params is somehow
493 # new, so delete vts from values to search
494 _logger.debug(
495 "iter: %i; removing %s",
496 i,
497 vts,
498 ) # pragma: no cover
499 values_to_search.remove(vts) # pragma: no cover
500 # Check if the parameter is missing in this file OR
501 # has a different value
502 if vts not in meta or setup_params[vts] != meta[vts]:
503 # Parameter is either missing or has different value,
504 # so this must be individual dataset metadata.
505 # Remove it from setup_params and values_to_search
506 _logger.debug(
507 "iter: %i; vts=%s - "
508 "%s; "
509 "removing %s from setup_params and values to search",
510 i,
511 vts,
512 "not in meta"
513 if vts not in meta
514 else (
515 f"meta[vts]={meta[vts]} != "
516 f"setup_params[vts]={setup_params[vts]}"
517 ),
518 vts,
519 )
520 del setup_params[vts]
521 values_to_search.remove(vts)
523 self.setup_params = setup_params
525 def store_unique_metadata(self):
526 """
527 Store unique metadata keys as unique to each file.
529 For each file in this AcquisitionActivity, stores the metadata that
530 is unique rather than common to the entire AcquisitionActivity (which
531 are kept in ``self.setup_params``.
532 """
533 if self.setup_params is None:
534 _logger.warning(
535 "%s -- setup_params has not been defined; call store_setup_params() "
536 "prior to using this method. Nothing was done.",
537 self,
538 )
539 return
541 unique_meta = []
542 for meta in self.meta:
543 tmp_unique = {}
544 # loop through each metadata dict, and if a given key k in meta is
545 # not present in self.setup_params, add it to the
546 # current dictionary (u_m) of unique_meta
547 for k, v in meta.items():
548 if k not in self.setup_params:
549 # this means k is unique to this file, so add it to
550 # unique_meta
551 tmp_unique[k] = v
552 unique_meta.append(tmp_unique)
554 # store what we calculated as unique metadata into the attribute
555 self.unique_meta = unique_meta
557 def as_xml(self, seqno, sample_id):
558 """
559 Translate AcquisitionActivity to an XML representation.
561 Build an XML (``lxml``) representation of this AcquisitionActivity (for
562 use in instances of the NexusLIMS schema).
564 Parameters
565 ----------
566 seqno : int
567 An integer number representing what number activity this is in a
568 sequence of activities.
569 sample_id : str
570 A unique identifier pointing to a sample identifier. No checks
571 are done on this value; it is merely reproduced in the XML output
573 Returns
574 -------
575 activity_xml : str
576 A string representing this AcquisitionActivity (note: is not a
577 properly-formed complete XML document since it does not have a
578 header or namespace definitions)
579 """
580 aq_ac_xml_el = etree.Element("acquisitionActivity")
581 aq_ac_xml_el.set("seqno", str(seqno))
582 start_time_el = etree.SubElement(aq_ac_xml_el, "startTime")
583 start_time_el.text = self.start.isoformat()
584 sample_id_el = etree.SubElement(aq_ac_xml_el, "sampleID")
585 sample_id_el.text = sample_id
587 setup_el = etree.SubElement(aq_ac_xml_el, "setup")
589 for param_k, param_v in sorted(
590 self.setup_params.items(),
591 key=lambda i: i[0].lower(),
592 ):
593 # metadata values to skip in XML output
594 if param_k not in ["warnings", "DatasetType"]:
595 # for setup parameters, a key in the first dataset's warning
596 # list is the same as in all of them
597 pk_warning = param_k in self.warnings[0]
598 param_el = etree.SubElement(setup_el, "param")
599 param_el.set("name", str(param_k))
600 if pk_warning:
601 param_el.set("warning", "true")
603 # Handle Pint Quantity objects with unit attribute
604 if isinstance(param_v, Quantity):
605 magnitude, unit = serialize_quantity_to_xml(param_v)
606 param_el.text = str(magnitude)
607 param_el.set("unit", unit)
608 else:
609 param_v = _escape(param_v) # noqa: PLW2901
610 param_el.text = str(param_v)
612 # Count how many times each file appears (for multi-signal files)
613 file_signal_counts = {}
614 file_signal_indices = {}
615 for _file in self.files:
616 if _file not in file_signal_counts:
617 file_signal_counts[_file] = 0
618 file_signal_indices[_file] = 0
619 file_signal_counts[_file] += 1
621 # Reset counters for actual iteration
622 file_signal_indices = dict.fromkeys(file_signal_counts, 0)
624 for _file, meta, unique_meta, warning, preview in zip(
625 self.files,
626 self.meta,
627 self.unique_meta,
628 self.warnings,
629 self.previews,
630 ):
631 # Get the signal index for this file
632 signal_index = file_signal_indices[_file]
633 total_signals = file_signal_counts[_file]
635 aq_ac_xml_el = _add_dataset_element(
636 _file,
637 aq_ac_xml_el,
638 meta,
639 unique_meta,
640 warning,
641 preview_path=preview,
642 signal_index=signal_index,
643 total_signals=total_signals,
644 )
646 # Increment the signal index for this file
647 file_signal_indices[_file] += 1
649 return aq_ac_xml_el