Coverage for nexusLIMS/exporters/__init__.py: 100%
44 statements
« prev ^ index » next coverage.py v7.11.3, created at 2026-03-24 05:23 +0000
« prev ^ index » next coverage.py v7.11.3, created at 2026-03-24 05:23 +0000
1"""Multi-destination export framework for NexusLIMS records.
3This package provides a plugin-based architecture for exporting NexusLIMS
4XML records to multiple repository destinations (CDCS, LabArchives, eLabFTW, etc.).
6The main entry point is export_records(), which:
71. Exports XML files to all enabled destinations using the configured strategy
82. Logs results to the upload_log database table
93. Returns success/failure results for each file
11Example
12-------
13>>> from nexusLIMS.exporters import export_records
14>>> results = export_records([xml_file], [session])
15>>> if was_successfully_exported(xml_file, results):
16... print("Exported successfully!")
17"""
19from __future__ import annotations
21import json
22import logging
23from typing import TYPE_CHECKING
25from sqlmodel import Session as DBSession
27from nexusLIMS.config import settings
28from nexusLIMS.db.engine import get_engine
29from nexusLIMS.db.models import UploadLog
30from nexusLIMS.exporters.base import ExportContext, ExportResult
31from nexusLIMS.exporters.registry import get_registry
33if TYPE_CHECKING:
34 from pathlib import Path
36 from nexusLIMS.db.session_handler import Session
37 from nexusLIMS.harvesters.reservation_event import ReservationEvent
38 from nexusLIMS.schemas.activity import AcquisitionActivity
40_logger = logging.getLogger(__name__)
43def export_records(
44 xml_files: list[Path],
45 sessions: list[Session],
46 activities_per_session: list[list[AcquisitionActivity]] | None = None,
47 reservation_events: list[ReservationEvent | None] | None = None,
48) -> dict[Path, list[ExportResult]]:
49 """Export NexusLIMS records to all enabled destinations.
51 Main entry point for exporting records. Called by record_builder.py
52 after XML records are built and validated. Exports each record to all
53 enabled destinations using the configured strategy, logs results to
54 the database, and returns success/failure information.
56 Parameters
57 ----------
58 xml_files
59 List of XML record file paths to export
60 sessions
61 Corresponding Session objects (same length and order as xml_files)
62 activities_per_session
63 Corresponding AcquisitionActivity lists for each session (optional).
64 If None, each session will have an empty activities list.
65 reservation_events
66 Corresponding ReservationEvent for each session (optional).
67 If None, each session will have reservation_event=None.
69 Returns
70 -------
71 dict[pathlib.Path, list[ExportResult]]
72 Mapping of XML file path to list of export results (one per destination)
73 """
74 if len(xml_files) != len(sessions):
75 msg = (
76 f"xml_files ({len(xml_files)}) and sessions ({len(sessions)}) "
77 f"must have the same length"
78 )
79 raise ValueError(msg)
81 acts = activities_per_session or [[] for _ in xml_files]
82 res_events = reservation_events or [None] * len(xml_files)
84 registry = get_registry()
85 strategy = settings.NX_EXPORT_STRATEGY
87 _logger.info(
88 "Exporting %d record(s) using strategy: %s",
89 len(xml_files),
90 strategy,
91 )
93 results = {}
94 for xml_file, session, activities, res_event in zip(
95 xml_files, sessions, acts, res_events, strict=True
96 ):
97 # Build export context
98 context = ExportContext(
99 xml_file_path=xml_file,
100 session_identifier=session.session_identifier,
101 instrument_pid=session.instrument.name,
102 dt_from=session.dt_from,
103 dt_to=session.dt_to,
104 user=session.user,
105 activities=activities,
106 reservation_event=res_event,
107 )
109 # Export to all destinations
110 _logger.info("Exporting record: %s", xml_file.name)
111 export_results = registry.export_to_all(context, strategy=strategy)
112 results[xml_file] = export_results
114 # Write to upload_log table
115 _log_to_database(session.session_identifier, export_results)
117 # Log summary
118 success_count = sum(1 for r in export_results if r.success)
119 total_count = len(export_results)
120 if success_count > 0:
121 _logger.info(
122 "Exported %s: %d/%d destination(s) succeeded",
123 xml_file.name,
124 success_count,
125 total_count,
126 )
127 else:
128 _logger.error(
129 "Export failed for %s: all %d destination(s) failed",
130 xml_file.name,
131 total_count,
132 )
134 return results
137def _log_to_database(
138 session_identifier: str,
139 results: list[ExportResult],
140) -> None:
141 """Write export results to upload_log table.
143 Parameters
144 ----------
145 session_identifier
146 Session identifier for this export
147 results
148 List of export results to log
149 """
150 with DBSession(get_engine()) as db:
151 for result in results:
152 log_entry = UploadLog(
153 session_identifier=session_identifier,
154 destination_name=result.destination_name,
155 success=result.success,
156 record_id=result.record_id,
157 record_url=result.record_url,
158 error_message=result.error_message,
159 timestamp=result.timestamp,
160 metadata_json=(
161 json.dumps(result.metadata) if result.metadata else None
162 ),
163 )
164 db.add(log_entry)
165 db.commit()
168def was_successfully_exported(
169 xml_file: Path,
170 results: dict[Path, list[ExportResult]],
171) -> bool:
172 """Check if a file was successfully exported to at least one destination.
174 Parameters
175 ----------
176 xml_file
177 XML file path to check
178 results
179 Export results from export_records()
181 Returns
182 -------
183 bool
184 True if at least one destination succeeded, False otherwise
185 """
186 if xml_file not in results:
187 return False
188 return any(r.success for r in results[xml_file])
191# Public API
192__all__ = [
193 "ExportContext",
194 "ExportDestination",
195 "ExportResult",
196 "export_records",
197 "was_successfully_exported",
198]