"""Multi-destination export framework for NexusLIMS records.
This package provides a plugin-based architecture for exporting NexusLIMS
XML records to multiple repository destinations (CDCS, LabArchives, eLabFTW, etc.).
The main entry point is export_records(), which:
1. Exports XML files to all enabled destinations using the configured strategy
2. Logs results to the upload_log database table
3. Returns success/failure results for each file
Example
-------
>>> from nexusLIMS.exporters import export_records
>>> results = export_records([xml_file], [session])
>>> if was_successfully_exported(xml_file, results):
... print("Exported successfully!")
"""
from __future__ import annotations
import json
import logging
from typing import TYPE_CHECKING
from sqlmodel import Session as DBSession
from nexusLIMS.config import settings
from nexusLIMS.db.engine import get_engine
from nexusLIMS.db.models import UploadLog
from nexusLIMS.exporters.base import ExportContext, ExportResult
from nexusLIMS.exporters.registry import get_registry
if TYPE_CHECKING:
from pathlib import Path
from nexusLIMS.db.session_handler import Session
from nexusLIMS.harvesters.reservation_event import ReservationEvent
from nexusLIMS.schemas.activity import AcquisitionActivity
_logger = logging.getLogger(__name__)
[docs]
def export_records(
xml_files: list[Path],
sessions: list[Session],
activities_per_session: list[list[AcquisitionActivity]] | None = None,
reservation_events: list[ReservationEvent | None] | None = None,
) -> dict[Path, list[ExportResult]]:
"""Export NexusLIMS records to all enabled destinations.
Main entry point for exporting records. Called by record_builder.py
after XML records are built and validated. Exports each record to all
enabled destinations using the configured strategy, logs results to
the database, and returns success/failure information.
Parameters
----------
xml_files
List of XML record file paths to export
sessions
Corresponding Session objects (same length and order as xml_files)
activities_per_session
Corresponding AcquisitionActivity lists for each session (optional).
If None, each session will have an empty activities list.
reservation_events
Corresponding ReservationEvent for each session (optional).
If None, each session will have reservation_event=None.
Returns
-------
dict[pathlib.Path, list[ExportResult]]
Mapping of XML file path to list of export results (one per destination)
"""
if len(xml_files) != len(sessions):
msg = (
f"xml_files ({len(xml_files)}) and sessions ({len(sessions)}) "
f"must have the same length"
)
raise ValueError(msg)
acts = activities_per_session or [[] for _ in xml_files]
res_events = reservation_events or [None] * len(xml_files)
registry = get_registry()
strategy = settings.NX_EXPORT_STRATEGY
_logger.info(
"Exporting %d record(s) using strategy: %s",
len(xml_files),
strategy,
)
results = {}
for xml_file, session, activities, res_event in zip(
xml_files, sessions, acts, res_events, strict=True
):
# Build export context
context = ExportContext(
xml_file_path=xml_file,
session_identifier=session.session_identifier,
instrument_pid=session.instrument.name,
dt_from=session.dt_from,
dt_to=session.dt_to,
user=session.user,
activities=activities,
reservation_event=res_event,
)
# Export to all destinations
_logger.info("Exporting record: %s", xml_file.name)
export_results = registry.export_to_all(context, strategy=strategy)
results[xml_file] = export_results
# Write to upload_log table
_log_to_database(session.session_identifier, export_results)
# Log summary
success_count = sum(1 for r in export_results if r.success)
total_count = len(export_results)
if success_count > 0:
_logger.info(
"Exported %s: %d/%d destination(s) succeeded",
xml_file.name,
success_count,
total_count,
)
else:
_logger.error(
"Export failed for %s: all %d destination(s) failed",
xml_file.name,
total_count,
)
return results
def _log_to_database(
session_identifier: str,
results: list[ExportResult],
) -> None:
"""Write export results to upload_log table.
Parameters
----------
session_identifier
Session identifier for this export
results
List of export results to log
"""
with DBSession(get_engine()) as db:
for result in results:
log_entry = UploadLog(
session_identifier=session_identifier,
destination_name=result.destination_name,
success=result.success,
record_id=result.record_id,
record_url=result.record_url,
error_message=result.error_message,
timestamp=result.timestamp,
metadata_json=(
json.dumps(result.metadata) if result.metadata else None
),
)
db.add(log_entry)
db.commit()
[docs]
def was_successfully_exported(
xml_file: Path,
results: dict[Path, list[ExportResult]],
) -> bool:
"""Check if a file was successfully exported to at least one destination.
Parameters
----------
xml_file
XML file path to check
results
Export results from export_records()
Returns
-------
bool
True if at least one destination succeeded, False otherwise
"""
if xml_file not in results:
return False
return any(r.success for r in results[xml_file])
# Public API
__all__ = [
"ExportContext",
"ExportDestination",
"ExportResult",
"export_records",
"was_successfully_exported",
]