Coverage for nexusLIMS/extractors/registry.py: 100%
242 statements
« prev ^ index » next coverage.py v7.11.3, created at 2026-03-24 05:23 +0000
« prev ^ index » next coverage.py v7.11.3, created at 2026-03-24 05:23 +0000
1"""Extractor registry for plugin discovery and selection.
3This module provides the central registry that discovers, manages, and selects
4extractors based on file type and context. It implements auto-discovery by
5walking the plugins directory and uses priority-based selection.
6"""
8from __future__ import annotations
10import importlib
11import inspect
12import logging
13import pkgutil
14from collections import defaultdict
15from pathlib import Path
16from typing import TYPE_CHECKING, Any
18from nexusLIMS.extractors.plugins.basic_metadata import BasicFileInfoExtractor
19from nexusLIMS.extractors.plugins.profiles import register_all_profiles
21if TYPE_CHECKING:
22 from nexusLIMS.extractors.base import (
23 BaseExtractor,
24 ExtractionContext,
25 PreviewGenerator,
26 )
28_logger = logging.getLogger(__name__)
30__all__ = [
31 "ExtractorRegistry",
32 "get_registry",
33]
36class ExtractorRegistry:
37 """
38 Central registry for extractor plugins.
40 Manages auto-discovery, registration, and selection of metadata extractors.
41 Uses priority-based selection with content sniffing support.
43 This is a singleton - use :func:`get_registry` to access.
45 Features
46 --------
47 - Auto-discovers plugins by walking nexusLIMS/extractors/plugins/
48 - Maintains priority-sorted lists per extension
49 - Lazy instantiation for performance
50 - Caches extractor instances
51 - Never returns None (always has fallback extractor)
53 Examples
54 --------
55 Get an extractor for a file:
57 >>> from nexusLIMS.extractors.registry import get_registry
58 >>> from nexusLIMS.extractors.base import ExtractionContext
59 >>> from pathlib import Path
60 >>>
61 >>> registry = get_registry()
62 >>> context = ExtractionContext(Path("data.dm3"), instrument=None)
63 >>> extractor = registry.get_extractor(context)
64 >>> metadata = extractor.extract(context)
66 Manual registration (for testing):
68 >>> class MyExtractor:
69 ... name = "my_extractor"
70 ... priority = 100
71 ... def supports(self, context): return True
72 ... def extract(self, context): return {"nx_meta": {}}
73 >>>
74 >>> registry = get_registry()
75 >>> registry.register_extractor(MyExtractor)
76 """
78 def __init__(self):
79 """Initialize the extractor registry."""
80 # Maps extension -> list of extractor classes (sorted by priority)
81 self._extractors: dict[str, list[type[BaseExtractor]]] = defaultdict(list)
83 # Cache of instantiated extractors (name -> instance)
84 self._instances: dict[str, BaseExtractor] = {}
86 # Wildcard extractors that support any extension
87 self._wildcard_extractors: list[type[BaseExtractor]] = []
89 # Preview generators (maps extension -> list of generator classes)
90 self._preview_generators: dict[str, list[type[PreviewGenerator]]] = defaultdict(
91 list
92 )
94 # Cache of instantiated preview generators (name -> instance)
95 self._preview_instances: dict[str, PreviewGenerator] = {}
97 # Discovery state
98 self._discovered = False
100 _logger.debug("Initialized ExtractorRegistry")
102 @property
103 def extractors(self) -> dict[str, list[type[BaseExtractor]]]:
104 """
105 Get the extractor list.
107 Returns a dictionary mapping file extensions to lists of extractor classes,
108 sorted by priority (descending).
110 Auto-discovers plugins if not already discovered.
112 Returns
113 -------
114 dict[str, list[type[BaseExtractor]]]
115 Maps extension (without dot) to list of extractor classes
117 Examples
118 --------
119 >>> registry = get_registry()
120 >>> extractors_by_ext = registry.extractors
121 >>> print(extractors_by_ext.get("dm3", []))
122 """
123 if not self._discovered:
124 self.discover_plugins()
125 return dict(self._extractors)
127 @property
128 def extractor_names(self) -> list[str]:
129 """
130 Get a deduplicated list of extractor names.
132 Returns extractor names sorted alphabetically, with duplicates removed.
134 Auto-discovers plugins if not already discovered.
136 Returns
137 -------
138 list[str]
139 Sorted list of unique extractor names
141 Examples
142 --------
143 >>> registry = get_registry()
144 >>> names = registry.extractor_names
145 >>> print(names)
146 ['BasicFileInfoExtractor', 'DM3Extractor', 'QuantaTiffExtractor', ...]
147 """
148 if not self._discovered:
149 self.discover_plugins()
151 # Collect all extractor names
152 extractor_names_set = set()
153 for extractor_classes in self._extractors.values():
154 for extractor_class in extractor_classes:
155 extractor_names_set.add(extractor_class.__name__)
157 # Also add wildcard extractors
158 for extractor_class in self._wildcard_extractors:
159 extractor_names_set.add(extractor_class.__name__)
161 return sorted(extractor_names_set)
163 @property
164 def all_extractors(self) -> list[BaseExtractor]:
165 """
166 Get a deduplicated flat list of all registered extractor instances.
168 Returns one instance per unique extractor class (both extension-specific
169 and wildcard extractors), sorted by priority descending.
171 Auto-discovers plugins if not already discovered.
173 Returns
174 -------
175 list[BaseExtractor]
176 Unique extractor instances sorted by priority (descending)
178 Examples
179 --------
180 >>> registry = get_registry()
181 >>> for ext in registry.all_extractors:
182 ... print(f"{ext.name}: priority {ext.priority}")
183 """
184 if not self._discovered:
185 self.discover_plugins()
187 seen: set[type] = set()
188 unique_classes: list[type] = []
189 for extractor_classes in self._extractors.values():
190 for cls in extractor_classes:
191 if cls not in seen:
192 seen.add(cls)
193 unique_classes.append(cls)
194 for cls in self._wildcard_extractors:
195 if cls not in seen:
196 seen.add(cls)
197 unique_classes.append(cls)
199 instances = [self._get_instance(cls) for cls in unique_classes]
200 return sorted(instances, key=lambda e: e.priority, reverse=True)
202 def discover_plugins(self) -> None:
203 """
204 Auto-discover extractor plugins by walking the plugins directory.
206 Walks nexusLIMS/extractors/plugins/, imports all Python modules,
207 and registers any classes that implement the BaseExtractor protocol.
209 This is called automatically on first use, but can be called manually
210 to force re-discovery.
212 Examples
213 --------
214 >>> registry = get_registry()
215 >>> registry.discover_plugins()
216 >>> extractors = registry.get_extractors_for_extension("dm3")
217 >>> print(f"Found {len(extractors)} extractors for .dm3 files")
218 """
219 if self._discovered:
220 _logger.debug("Plugins already discovered, skipping")
221 return
223 _logger.info("Discovering extractor plugins...")
225 # Find the plugins directory
226 plugins_package = "nexusLIMS.extractors.plugins"
228 try:
229 # Import the plugins package to get its path
230 plugins_module = importlib.import_module(plugins_package)
231 plugins_path = Path(plugins_module.__file__).parent
232 except (ImportError, AttributeError) as e:
233 _logger.warning(
234 "Could not import plugins package '%s': %s. Plugin discovery skipped.",
235 plugins_package,
236 e,
237 )
238 self._discovered = True
239 return
241 # Walk the plugins directory
242 discovered_count = 0
243 for _finder, name, _ispkg in pkgutil.walk_packages(
244 [str(plugins_path)],
245 prefix=f"{plugins_package}.",
246 ):
247 # Skip __pycache__ and other special directories
248 if "__pycache__" in name:
249 continue # pragma: no cover
251 try:
252 module = importlib.import_module(name)
253 _logger.debug("Imported plugin module: %s", name)
255 # Look for classes implementing BaseExtractor/PreviewGenerator protocol
256 for _item_name, obj in inspect.getmembers(module, inspect.isclass):
257 # Skip imported classes (only use classes defined in this module)
258 if obj.__module__ != module.__name__:
259 continue
261 # Check if it looks like a BaseExtractor
262 if self._is_extractor(obj):
263 self.register_extractor(obj)
264 discovered_count += 1
265 _logger.debug(
266 "Discovered extractor: %s (priority: %d)",
267 obj.name,
268 obj.priority,
269 )
270 # Check if it looks like a PreviewGenerator
271 elif self._is_preview_generator(obj):
272 self.register_preview_generator(obj)
273 discovered_count += 1
274 _logger.debug(
275 "Discovered preview generator: %s (priority: %d)",
276 obj.name,
277 obj.priority,
278 )
280 except Exception as e:
281 _logger.warning(
282 "Failed to import plugin module '%s': %s",
283 name,
284 e,
285 exc_info=True,
286 )
288 _logger.info("Discovered %d extractor plugins", discovered_count)
290 # Register instrument profiles
291 self._register_instrument_profiles()
293 self._discovered = True
295 def _register_instrument_profiles(self) -> None:
296 """
297 Register all instrument profiles.
299 This calls the profile package's auto-discovery function to load
300 and register all instrument-specific profiles.
301 """
302 try:
303 register_all_profiles()
304 except ImportError as e:
305 _logger.warning(
306 "Could not import profiles package: %s. No profiles will be loaded.",
307 e,
308 )
309 except Exception as e:
310 _logger.warning(
311 "Error registering instrument profiles: %s",
312 e,
313 exc_info=True,
314 )
316 def _is_extractor(self, obj: Any) -> bool:
317 """
318 Check if an object implements the BaseExtractor protocol.
320 Parameters
321 ----------
322 obj
323 Object to check
325 Returns
326 -------
327 bool
328 True if obj implements BaseExtractor protocol
329 """
330 # Must be a class
331 if not inspect.isclass(obj):
332 return False
334 # Check for required attributes
335 if not hasattr(obj, "name") or not isinstance(obj.name, str):
336 return False
338 if not hasattr(obj, "priority") or not isinstance(obj.priority, int):
339 return False
341 # Check for required methods
342 if not hasattr(obj, "supports") or not callable(obj.supports):
343 return False
345 if not hasattr(obj, "extract") or not callable(obj.extract): # noqa: SIM103
346 return False
348 return True
350 def _is_preview_generator(self, obj: Any) -> bool:
351 """
352 Check if an object implements the PreviewGenerator protocol.
354 Parameters
355 ----------
356 obj
357 Object to check
359 Returns
360 -------
361 bool
362 True if obj implements PreviewGenerator protocol
363 """
364 # Must be a class
365 if not inspect.isclass(obj):
366 return False
368 # Check for required attributes
369 if not hasattr(obj, "name") or not isinstance(obj.name, str):
370 return False
372 if not hasattr(obj, "priority") or not isinstance(obj.priority, int):
373 return False
375 # Check for required methods
376 if not hasattr(obj, "supports") or not callable(obj.supports):
377 return False
379 if not hasattr(obj, "generate") or not callable(obj.generate): # noqa: SIM103
380 return False
382 return True
384 def register_extractor(self, extractor_class: type[BaseExtractor]) -> None:
385 """
386 Manually register an extractor class.
388 This method is called automatically during plugin discovery, but can
389 also be used to manually register extractors (useful for testing).
391 Parameters
392 ----------
393 extractor_class
394 The extractor class to register (not an instance)
396 Examples
397 --------
398 >>> class MyExtractor:
399 ... name = "my_extractor"
400 ... priority = 100
401 ... def supports(self, context): return True
402 ... def extract(self, context): return {"nx_meta": {}}
403 >>>
404 >>> registry = get_registry()
405 >>> registry.register_extractor(MyExtractor)
406 """
407 # Determine which extensions this extractor supports
408 # We'll do this by creating a temporary instance and asking it
409 extensions = self._get_supported_extensions(extractor_class)
411 if not extensions:
412 # This is a wildcard extractor (supports any extension)
413 if extractor_class not in self._wildcard_extractors:
414 self._wildcard_extractors.append(extractor_class)
415 _logger.debug(
416 "Registered wildcard extractor: %s",
417 extractor_class.name,
418 )
419 else:
420 _logger.debug(
421 "Extractor %s already registered (skipping duplicate)",
422 extractor_class.name,
423 )
424 else:
425 # Register for specific extensions
426 for ext in extensions:
427 if extractor_class not in self._extractors[ext]:
428 self._extractors[ext].append(extractor_class)
429 _logger.debug(
430 "Registered %s for extension: .%s",
431 extractor_class.name,
432 ext,
433 )
434 else:
435 _logger.debug(
436 "Extractor %s already registered for .%s (skipping duplicate)",
437 extractor_class.name,
438 ext,
439 )
441 # Sort by priority (descending) for each extension
442 for ext in extensions:
443 self._extractors[ext].sort(key=lambda e: e.priority, reverse=True)
445 def _get_supported_extensions(
446 self,
447 extractor_class: type[BaseExtractor],
448 ) -> set[str]:
449 """
450 Get supported file extensions from an extractor class.
452 Uses the extractor's declared supported_extensions attribute.
454 Parameters
455 ----------
456 extractor_class
457 The extractor class to check
459 Returns
460 -------
461 set[str]
462 Set of supported extensions (without dots), or empty set if
463 this is a wildcard extractor
464 """
465 if not hasattr(extractor_class, "supported_extensions"):
466 _logger.warning(
467 "Extractor %s does not have supported_extensions attribute",
468 extractor_class.name if hasattr(extractor_class, "name") else "unknown",
469 )
470 return set()
472 extensions = extractor_class.supported_extensions
473 if extensions is None:
474 # Wildcard extractor
475 return set()
477 # Return the declared extensions
478 return extensions if isinstance(extensions, set) else set(extensions)
480 def _get_instance(self, extractor_class: type[BaseExtractor]) -> BaseExtractor:
481 """
482 Get or create an instance of an extractor class.
484 Instances are cached for performance.
486 Parameters
487 ----------
488 extractor_class
489 The extractor class
491 Returns
492 -------
493 BaseExtractor
494 Instance of the extractor
495 """
496 name = extractor_class.name
497 if name not in self._instances:
498 self._instances[name] = extractor_class()
499 _logger.debug("Instantiated extractor: %s", name)
501 return self._instances[name]
503 def get_extractor(self, context: ExtractionContext) -> BaseExtractor:
504 """
505 Get the best extractor for a given file context.
507 Selection algorithm:
508 1. Auto-discover plugins if not already done
509 2. Get extractors registered for this file's extension
510 3. Try each in priority order (high to low) until one's supports() returns True
511 4. If none match, try wildcard extractors
512 5. If still none, return BasicMetadataExtractor fallback
514 This method NEVER returns None - there is always a fallback.
516 Parameters
517 ----------
518 context
519 Extraction context containing file path, instrument, etc.
521 Returns
522 -------
523 BaseExtractor
524 The best extractor for this file (never None)
526 Examples
527 --------
528 >>> from nexusLIMS.extractors.base import ExtractionContext
529 >>> from pathlib import Path
530 >>>
531 >>> context = ExtractionContext(Path("data.dm3"), None)
532 >>> registry = get_registry()
533 >>> extractor = registry.get_extractor(context)
534 >>> print(f"Selected: {extractor.name}")
535 """
536 # Auto-discover if needed
537 if not self._discovered:
538 self.discover_plugins()
540 # Get file extension
541 ext = context.file_path.suffix.lstrip(".").lower()
543 # Try extension-specific extractors
544 if ext in self._extractors:
545 for extractor_class in self._extractors[ext]:
546 instance = self._get_instance(extractor_class)
547 try:
548 if instance.supports(context):
549 _logger.debug(
550 "Selected extractor %s for %s",
551 instance.name,
552 context.file_path.name,
553 )
554 return instance
555 except Exception as e:
556 _logger.warning(
557 "Error in %s.supports(): %s",
558 instance.name,
559 e,
560 exc_info=True,
561 )
563 # Try wildcard extractors
564 for extractor_class in self._wildcard_extractors:
565 instance = self._get_instance(extractor_class)
566 try:
567 if instance.supports(context):
568 _logger.debug(
569 "Selected wildcard extractor %s for %s",
570 instance.name,
571 context.file_path.name,
572 )
573 return instance
574 except Exception as e:
575 _logger.warning(
576 "Error in wildcard %s.supports(): %s",
577 instance.name,
578 e,
579 exc_info=True,
580 )
582 # Fallback: use basic metadata extractor
583 _logger.debug(
584 "No extractor found for %s, using fallback",
585 context.file_path.name,
586 )
587 return self._get_fallback_extractor()
589 def _get_fallback_extractor(self) -> BaseExtractor:
590 """
591 Get the fallback extractor for unknown file types.
593 Returns
594 -------
595 BaseExtractor
596 BasicFileInfoExtractor instance
597 """
598 return self._get_instance(BasicFileInfoExtractor)
600 def get_extractors_for_extension(self, extension: str) -> list[BaseExtractor]:
601 """
602 Get all extractors registered for a specific extension.
604 Parameters
605 ----------
606 extension
607 File extension (with or without leading dot)
609 Returns
610 -------
611 list[BaseExtractor]
612 List of extractors, sorted by priority (descending)
614 Examples
615 --------
616 >>> registry = get_registry()
617 >>> extractors = registry.get_extractors_for_extension("dm3")
618 >>> for e in extractors:
619 ... print(f"{e.name}: priority {e.priority}")
620 """
621 # Auto-discover if needed
622 if not self._discovered:
623 self.discover_plugins()
625 ext = extension.lstrip(".").lower()
626 if ext not in self._extractors:
627 return []
629 return [
630 self._get_instance(extractor_class)
631 for extractor_class in self._extractors[ext]
632 ]
634 def get_supported_extensions(self, exclude_fallback: bool = False) -> set[str]: # noqa: FBT001, FBT002
635 """
636 Get all file extensions that have registered extractors.
638 Parameters
639 ----------
640 exclude_fallback
641 If True, exclude extensions that only have the fallback extractor
643 Returns
644 -------
645 set[str]
646 Set of extensions (without dots)
648 Examples
649 --------
650 >>> registry = get_registry()
651 >>> extensions = registry.get_supported_extensions()
652 >>> print(f"Supported: {', '.join(sorted(extensions))}")
653 >>> specialized = registry.get_supported_extensions(exclude_fallback=True)
654 >>> print(f"Specialized: {', '.join(sorted(specialized))}")
655 """
656 # Auto-discover if needed
657 if not self._discovered:
658 self.discover_plugins()
660 if not exclude_fallback:
661 return set(self._extractors.keys())
663 # Only return extensions that have non-fallback extractors
664 specialized_extensions = set()
665 for ext, extractors in self._extractors.items():
666 # Check if any extractor for this extension is NOT the fallback
667 for extractor_class in extractors:
668 instance = self._get_instance(extractor_class)
669 # Basic file info extractor has priority 0 and is the fallback
670 if instance.priority > 0:
671 specialized_extensions.add(ext)
672 break
674 return specialized_extensions
676 def clear(self) -> None:
677 """
678 Clear all registered extractors and reset discovery state.
680 Primarily used for testing.
682 Examples
683 --------
684 >>> registry = get_registry()
685 >>> registry.clear()
686 >>> # Will re-discover on next use
687 """
688 self._extractors.clear()
689 self._instances.clear()
690 self._wildcard_extractors.clear()
691 self._preview_generators.clear()
692 self._preview_instances.clear()
693 self._discovered = False
694 _logger.debug("Cleared extractor registry")
696 def register_preview_generator(
697 self,
698 generator_class: type[PreviewGenerator],
699 ) -> None:
700 """
701 Manually register a preview generator class.
703 This method is called automatically during plugin discovery, but can
704 also be used to manually register generators (useful for testing).
706 Parameters
707 ----------
708 generator_class
709 The preview generator class to register (not an instance)
711 Examples
712 --------
713 >>> class MyGenerator:
714 ... name = "my_generator"
715 ... priority = 100
716 ... def supports(self, context): return True
717 ... def generate(self, context, output_path): return True
718 >>>
719 >>> registry = get_registry()
720 >>> registry.register_preview_generator(MyGenerator)
721 """
722 # Determine which extensions this generator supports
723 extensions = self._get_supported_extensions_for_generator(generator_class)
725 if extensions:
726 # Register for specific extensions
727 for ext in extensions:
728 self._preview_generators[ext].append(generator_class)
729 _logger.debug(
730 "Registered preview generator %s for extension: .%s",
731 generator_class.name,
732 ext,
733 )
735 # Sort by priority (descending) for each extension
736 for ext in extensions:
737 self._preview_generators[ext].sort(
738 key=lambda g: g.priority,
739 reverse=True,
740 )
742 def _get_supported_extensions_for_generator(
743 self,
744 generator_class: type[PreviewGenerator],
745 ) -> set[str]:
746 """
747 Get supported file extensions from a preview generator class.
749 Uses the generator's declared supported_extensions attribute.
751 Parameters
752 ----------
753 generator_class
754 The preview generator class to check
756 Returns
757 -------
758 set[str]
759 Set of supported extensions (without dots)
760 """
761 if not hasattr(generator_class, "supported_extensions"):
762 _logger.warning(
763 "Preview generator %s does not have supported_extensions attribute",
764 generator_class.name if hasattr(generator_class, "name") else "unknown",
765 )
766 return set()
768 extensions = generator_class.supported_extensions
769 if extensions is None:
770 # Wildcard generator
771 return set()
773 # Return the declared extensions
774 return extensions if isinstance(extensions, set) else set(extensions)
776 def _get_preview_instance(
777 self,
778 generator_class: type[PreviewGenerator],
779 ) -> PreviewGenerator:
780 """
781 Get or create an instance of a preview generator class.
783 Instances are cached for performance.
785 Parameters
786 ----------
787 generator_class
788 The preview generator class
790 Returns
791 -------
792 PreviewGenerator
793 Instance of the preview generator
794 """
795 name = generator_class.name
796 if name not in self._preview_instances:
797 self._preview_instances[name] = generator_class()
798 _logger.debug("Instantiated preview generator: %s", name)
800 return self._preview_instances[name]
802 def get_preview_generator(
803 self,
804 context: ExtractionContext,
805 ) -> PreviewGenerator | None:
806 """
807 Get the best preview generator for a given file context.
809 Selection algorithm:
810 1. Auto-discover plugins if not already done
811 2. Get generators registered for this file's extension
812 3. Try each in priority order (high to low) until one's supports() returns True
813 4. If none match, return None
815 Parameters
816 ----------
817 context
818 Extraction context containing file path, instrument, etc.
820 Returns
821 -------
822 PreviewGenerator | None
823 The best preview generator for this file, or None if no generator found
825 Examples
826 --------
827 >>> from nexusLIMS.extractors.base import ExtractionContext
828 >>> from pathlib import Path
829 >>>
830 >>> context = ExtractionContext(Path("data.dm3"), None)
831 >>> registry = get_registry()
832 >>> generator = registry.get_preview_generator(context)
833 >>> if generator:
834 ... generator.generate(context, Path("preview.png"))
835 """
836 # Auto-discover if needed
837 if not self._discovered:
838 self.discover_plugins()
840 # Get file extension
841 ext = context.file_path.suffix.lstrip(".").lower()
843 # Try extension-specific generators
844 if ext in self._preview_generators:
845 for generator_class in self._preview_generators[ext]:
846 instance = self._get_preview_instance(generator_class)
847 try:
848 if instance.supports(context):
849 _logger.debug(
850 "Selected preview generator %s for %s",
851 instance.name,
852 context.file_path.name,
853 )
854 return instance
855 except Exception as e:
856 _logger.warning(
857 "Error in %s.supports(): %s",
858 instance.name,
859 e,
860 exc_info=True,
861 )
863 # No generator found
864 _logger.debug(
865 "No preview generator found for %s",
866 context.file_path.name,
867 )
868 return None
871# Singleton instance
872_registry: ExtractorRegistry | None = None
875def get_registry() -> ExtractorRegistry:
876 """
877 Get the global extractor registry (singleton).
879 Returns
880 -------
881 ExtractorRegistry
882 The global registry instance
884 Examples
885 --------
886 >>> from nexusLIMS.extractors.registry import get_registry
887 >>> registry = get_registry()
888 >>> # Always returns the same instance
889 >>> assert get_registry() is registry
890 """
891 global _registry # noqa: PLW0603
892 if _registry is None:
893 _registry = ExtractorRegistry()
894 return _registry