"""SQLModel database models for NexusLIMS.
This module defines the SQLModel ORM classes that map to the NexusLIMS
database tables (`instruments` and `session_log`).
"""
import datetime
import json
import logging
import pytz
from pytz.tzinfo import BaseTzInfo
from sqlalchemy import CheckConstraint, UniqueConstraint, types
from sqlalchemy.types import TypeDecorator
from sqlmodel import Column, Field, Relationship, SQLModel, select
from sqlmodel import Session as DBSession
from nexusLIMS.db.engine import get_engine
from nexusLIMS.db.enums import EventType, ExternalSystem, RecordStatus
_logger = logging.getLogger(__name__)
[docs]
class TZDateTime(TypeDecorator):
"""
Custom DateTime type that preserves timezone information in SQLite.
SQLite stores datetimes as TEXT and doesn't preserve timezone info.
This TypeDecorator stores timezone-aware datetimes as ISO-8601 strings
with timezone offset, and restores them as timezone-aware datetime objects.
"""
impl = types.String
cache_ok = True
[docs]
def process_bind_param(self, value, dialect): # noqa: ARG002
"""Convert timezone-aware datetime to ISO string for storage."""
if value is not None:
if isinstance(value, datetime.datetime):
# Store as ISO string with timezone offset
return value.isoformat()
# Already a string
return value
return value
[docs]
def process_result_value(self, value, dialect): # noqa: ARG002
"""Convert ISO string back to timezone-aware datetime."""
if value is not None:
if isinstance(value, str):
# Parse ISO string with timezone
return datetime.datetime.fromisoformat(value)
# Already a datetime object
return value
return value
[docs]
class Instrument(SQLModel, table=True):
"""
Instrument configuration from the NexusLIMS database.
Represents an electron microscopy instrument in the facility,
with configuration for calendar integration, file storage, and metadata.
Parameters
----------
instrument_pid
Unique identifier for the instrument (e.g., "FEI-Titan-TEM-012345")
api_url
Calendar API endpoint URL for this instrument's scheduler (e.g.,
`https://<nemo_address>/api/tools/?id=<tool_id>`)
calendar_url
URL to the instrument's web-accessible calendar
location
Physical location (building and room number)
display_name
Human-readable instrument name displayed in NexusLIMS records
property_tag
Unique numeric identifier (for reference)
filestore_path
Relative path under NX_INSTRUMENT_DATA_PATH where data is stored
harvester
Harvester module to use ("nemo" or "sharepoint")
timezone_str
IANA timezone database string (e.g., "America/New_York")
"""
__tablename__ = "instruments"
# Primary key
instrument_pid: str = Field(primary_key=True, max_length=100)
# Required fields
api_url: str = Field(unique=True)
calendar_url: str
location: str = Field(max_length=100)
display_name: str
property_tag: str = Field(max_length=20)
filestore_path: str
harvester: str = Field(default="nemo")
timezone_str: str = Field(
sa_column_kwargs={"name": "timezone"}, default="America/New_York"
)
# Relationships
session_logs: list["SessionLog"] = Relationship(back_populates="instrument_obj")
@property
def name(self) -> str:
"""Alias for instrument_pid (backward compatibility)."""
return self.instrument_pid
@property
def timezone(self) -> BaseTzInfo:
"""Convert timezone string to pytz timezone object."""
return pytz.timezone(self.timezone_str)
def __repr__(self):
"""Return custom representation of an Instrument."""
return (
f"Nexus Instrument: {self.name}\n"
f"API url: {self.api_url}\n"
f"Calendar url: {self.calendar_url}\n"
f"Display name: {self.display_name}\n"
f"Location: {self.location}\n"
f"Property tag: {self.property_tag}\n"
f"Filestore path: {self.filestore_path}\n"
f"Harvester: {self.harvester}\n"
f"Timezone: {self.timezone}"
)
def __str__(self):
"""Return custom string representation of an Instrument."""
return f"{self.name} in {self.location}" if self.location else ""
[docs]
def localize_datetime(self, _dt: datetime.datetime) -> datetime.datetime:
"""
Localize a datetime to an Instrument's timezone.
Convert a date and time to the timezone of this instrument. If the
supplied datetime is naive (i.e. does not have a timezone), it will be
assumed to already be in the timezone of the instrument, and the
displayed time will not change. If the timezone of the supplied
datetime is different than the instrument's, the time will be
adjusted to compensate for the timezone offset.
Parameters
----------
_dt
The datetime object to localize
Returns
-------
datetime.datetime
A datetime object with the same timezone as the instrument
"""
_logger = logging.getLogger(__name__)
if self.timezone is None:
_logger.warning(
"Tried to localize a datetime with instrument that does not have "
"timezone information (%s)",
self.name,
)
return _dt
if _dt.tzinfo is None:
# dt is timezone naive
return self.timezone.localize(_dt)
# dt has timezone info
return _dt.astimezone(self.timezone)
[docs]
def localize_datetime_str(
self,
_dt: datetime.datetime,
fmt: str = "%Y-%m-%d %H:%M:%S %Z",
) -> str:
"""
Localize a datetime to an Instrument's timezone and return as string.
Convert a date and time to the timezone of this instrument, returning
a textual representation of the object, rather than the datetime
itself. Uses :py:meth:`localize_datetime` for the actual conversion.
Parameters
----------
_dt
The datetime object to localize
fmt
The strftime format string to use to format the output
Returns
-------
str
The formatted textual representation of the localized datetime
"""
return self.localize_datetime(_dt).strftime(fmt)
[docs]
def to_dict(self) -> dict:
"""
Return a dictionary representation of the Instrument object.
Handles special cases like renaming 'instrument_pid' and
converting timezone objects to strings.
Returns
-------
dict
A dictionary representation of the instrument, suitable for database
insertion or JSON serialization.
"""
# Convert SQLModel to dict (excludes relationships by default)
return {
"instrument_pid": self.instrument_pid,
"api_url": self.api_url,
"calendar_url": self.calendar_url,
"location": self.location,
"display_name": self.display_name,
"property_tag": self.property_tag,
"filestore_path": self.filestore_path,
"harvester": self.harvester,
"timezone": self.timezone_str,
}
[docs]
def to_json(self, **kwargs) -> str:
"""
Return a JSON string representation of the Instrument object.
Parameters
----------
**kwargs
Additional keyword arguments to pass to `json.dumps`.
Returns
-------
str
A JSON string representation of the instrument.
"""
return json.dumps(self.to_dict(), **kwargs)
[docs]
class SessionLog(SQLModel, table=True):
"""
Individual session log entry (START, END, or RECORD_GENERATION event).
A simple mapping of one row in the session_log table. Each session
typically has a START and END log with matching session_identifier,
and may have additional RECORD_GENERATION logs.
Parameters
----------
session_identifier
A unique string consistent among a single record's START, END,
and RECORD_GENERATION events (often a UUID)
instrument
The instrument associated with this session (foreign key reference
to instruments table)
timestamp
The datetime representing when the event occurred
event_type
The type of log (START, END, or RECORD_GENERATION)
user
The username associated with this session (if known)
record_status
The status for this record (defaults to WAITING_FOR_END)
"""
__tablename__ = "session_log"
__table_args__ = (
CheckConstraint(
"event_type IN ('START', 'END', 'RECORD_GENERATION')",
name="check_event_type",
),
CheckConstraint(
"record_status IN ('COMPLETED', 'WAITING_FOR_END', 'TO_BE_BUILT', "
"'BUILT_NOT_EXPORTED', 'ERROR', 'NO_FILES_FOUND', 'NO_CONSENT', "
"'NO_RESERVATION')",
name="check_record_status",
),
)
# Primary key
id_session_log: int | None = Field(default=None, primary_key=True)
# Required fields
session_identifier: str = Field(max_length=36, index=True)
instrument: str = Field(foreign_key="instruments.instrument_pid", max_length=100)
timestamp: datetime.datetime = Field(
sa_column=Column(TZDateTime)
) # Preserve timezone
event_type: EventType # Enum for type safety
record_status: RecordStatus = Field(default=RecordStatus.WAITING_FOR_END)
# Optional field
user: str | None = Field(default=None, max_length=50)
# Relationships
instrument_obj: Instrument | None = Relationship(back_populates="session_logs")
def __repr__(self):
"""Return custom representation of a SessionLog."""
return (
f"SessionLog (id={self.session_identifier}, "
f"instrument={self.instrument}, "
f"timestamp={self.timestamp}, "
f"event_type={self.event_type.value}, "
f"user={self.user}, "
f"record_status={self.record_status.value})"
)
[docs]
def insert_log(self) -> bool:
"""
Insert this log into the NexusLIMS database.
Inserts a log into the database with the information contained within
this SessionLog's attributes (used primarily for NEMO ``usage_event``
integration). It will check for the presence of a matching record first
and warn without inserting anything if it finds one.
Returns
-------
success : bool
Whether or not the session log row was inserted successfully
"""
with DBSession(get_engine()) as session:
# Check for existing log
statement = select(SessionLog).where(
SessionLog.session_identifier == self.session_identifier,
SessionLog.instrument == self.instrument,
SessionLog.timestamp == self.timestamp,
SessionLog.event_type == self.event_type,
)
existing = session.exec(statement).first()
if existing:
_logger.warning("SessionLog already exists: %s", self)
return True
# Insert new log
session.add(self)
session.commit()
return True
[docs]
class UploadLog(SQLModel, table=True):
"""
Log of export attempts to destination repositories.
Tracks per-destination export results for each session, enabling
multi-destination export with granular success/failure tracking.
Parameters
----------
id
Auto-incrementing primary key
session_identifier
Foreign key reference to session_log.session_identifier
destination_name
Name of the export destination (e.g., "cdcs", "labarchives")
success
Whether the export succeeded
record_id
Destination-specific record identifier (if successful)
record_url
Direct URL to view the exported record (if successful)
error_message
Error message if export failed
timestamp
When the export attempt occurred
metadata_json
JSON-serialized metadata dict with destination-specific details
"""
__tablename__ = "upload_log"
# Primary key
id: int | None = Field(default=None, primary_key=True)
# Required fields
session_identifier: str = Field(index=True, max_length=36)
destination_name: str = Field(index=True, max_length=100)
success: bool
timestamp: datetime.datetime = Field(sa_column=Column(TZDateTime))
# Optional fields
record_id: str | None = Field(default=None, max_length=255)
record_url: str | None = Field(default=None, max_length=500)
error_message: str | None = Field(default=None)
metadata_json: str | None = Field(default=None)
def __repr__(self):
"""Return custom representation of an UploadLog."""
status = "SUCCESS" if self.success else "FAILED"
return (
f"UploadLog (session={self.session_identifier}, "
f"destination={self.destination_name}, "
f"status={status}, "
f"timestamp={self.timestamp})"
)
[docs]
class ExternalUserIdentifier(SQLModel, table=True):
"""
Maps NexusLIMS usernames to external system user IDs.
Maintains a star topology with nexuslims_username (from session_log.user)
as the canonical identifier, mapping to external system IDs.
Parameters
----------
id
Auto-incrementing primary key
nexuslims_username
Canonical username in NexusLIMS (from session_log.user)
external_system
External system identifier (nemo, labarchives_eln, etc.)
external_id
User ID/username in the external system
email
User's email for verification/matching (optional)
created_at
When this mapping was created
last_verified_at
Last time this mapping was verified (optional)
notes
Additional notes about this mapping (optional)
Examples
--------
>>> # NEMO harvester user ID
>>> ExternalUserIdentifier(
... nexuslims_username='jsmith',
... external_system=ExternalSystem.NEMO,
... external_id='12345'
... )
>>> # LabArchives UID from OAuth
>>> ExternalUserIdentifier(
... nexuslims_username='jsmith',
... external_system=ExternalSystem.LABARCHIVES_ELN,
... external_id='285489257Ho...',
... email='jsmith@upenn.edu'
... )
"""
__tablename__ = "external_user_identifiers"
__table_args__ = (
# NOTE: This CHECK constraint is dynamically generated from ExternalSystem enum
# to keep the model in sync with the enum. However, migrations should hardcode
# the values to preserve historical accuracy. When adding a new system, create
# a new migration to update the CHECK constraint.
CheckConstraint(
f"external_system IN ({', '.join(repr(s.value) for s in ExternalSystem)})",
name="valid_external_system",
),
# UNIQUE constraints to enforce star-topology design
UniqueConstraint(
"nexuslims_username",
"external_system",
name="nexuslims_username_system_UNIQUE",
),
UniqueConstraint(
"external_system", "external_id", name="system_external_id_UNIQUE"
),
)
# Primary key
id: int | None = Field(default=None, primary_key=True)
# Required fields
nexuslims_username: str = Field(index=True)
external_system: str = Field()
external_id: str = Field()
# Optional fields
email: str | None = Field(default=None)
created_at: datetime.datetime = Field(
default_factory=lambda: datetime.datetime.now(pytz.UTC),
sa_column=Column(TZDateTime),
)
last_verified_at: datetime.datetime | None = Field(
default=None, sa_column=Column(TZDateTime, nullable=True)
)
notes: str | None = Field(default=None)
def __repr__(self):
"""Return custom representation of an ExternalUserIdentifier."""
return (
f"ExternalUserIdentifier (username={self.nexuslims_username}, "
f"system={self.external_system}, "
f"external_id={self.external_id})"
)
[docs]
def get_external_id(
nexuslims_username: str, external_system: ExternalSystem
) -> str | None:
"""
Get external system ID for a NexusLIMS user.
Parameters
----------
nexuslims_username
Username from session_log.user
external_system
Target external system
Returns
-------
str | None
External ID if found, None otherwise
Examples
--------
>>> from nexusLIMS.db.models import get_external_id
>>> from nexusLIMS.db.enums import ExternalSystem
>>> uid = get_external_id('jsmith', ExternalSystem.LABARCHIVES_ELN)
>>> print(uid)
'285489257Ho...'
"""
with DBSession(get_engine()) as session:
result = session.exec(
select(ExternalUserIdentifier).where(
ExternalUserIdentifier.nexuslims_username == nexuslims_username,
ExternalUserIdentifier.external_system == external_system.value,
)
).first()
return result.external_id if result else None
[docs]
def get_nexuslims_username(
external_id: str, external_system: ExternalSystem
) -> str | None:
"""
Reverse lookup: find NexusLIMS username from external ID.
Useful for harvesters that receive external IDs (e.g., NEMO user IDs)
and need to map them to NexusLIMS usernames for session_log entries.
Parameters
----------
external_id
ID in external system
external_system
Source external system
Returns
-------
str | None
NexusLIMS username if found, None otherwise
Examples
--------
>>> from nexusLIMS.db.models import get_nexuslims_username
>>> from nexusLIMS.db.enums import ExternalSystem
>>> username = get_nexuslims_username('12345', ExternalSystem.NEMO)
>>> print(username)
'jsmith'
"""
with DBSession(get_engine()) as session:
result = session.exec(
select(ExternalUserIdentifier).where(
ExternalUserIdentifier.external_id == external_id,
ExternalUserIdentifier.external_system == external_system.value,
)
).first()
return result.nexuslims_username if result else None
[docs]
def store_external_id(
nexuslims_username: str,
external_system: ExternalSystem,
external_id: str,
email: str | None = None,
notes: str | None = None,
) -> ExternalUserIdentifier:
"""
Store or update external ID mapping.
If mapping exists for this user/system combination, updates it and
refreshes last_verified_at. Otherwise, creates new mapping.
Parameters
----------
nexuslims_username
Username from session_log.user
external_system
Target external system
external_id
ID in external system
email
Optional email for verification
notes
Optional notes about this mapping
Returns
-------
ExternalUserIdentifier
Created or updated ExternalUserIdentifier record
Examples
--------
>>> from nexusLIMS.db.models import store_external_id
>>> from nexusLIMS.db.enums import ExternalSystem
>>> record = store_external_id(
... nexuslims_username='jsmith',
... external_system=ExternalSystem.LABARCHIVES_ELN,
... external_id='285489257Ho...',
... email='jsmith@upenn.edu',
... notes='OAuth registration portal 2026-01-25'
... )
"""
with DBSession(get_engine()) as session:
# Check if mapping exists
existing = session.exec(
select(ExternalUserIdentifier).where(
ExternalUserIdentifier.nexuslims_username == nexuslims_username,
ExternalUserIdentifier.external_system == external_system.value,
)
).first()
if existing:
# Update existing
existing.external_id = external_id
if email:
existing.email = email
if notes:
existing.notes = notes
existing.last_verified_at = datetime.datetime.now(pytz.UTC)
session.add(existing)
else:
# Create new
existing = ExternalUserIdentifier(
nexuslims_username=nexuslims_username,
external_system=external_system.value,
external_id=external_id,
email=email,
notes=notes,
)
session.add(existing)
session.commit()
session.refresh(existing)
return existing
[docs]
def get_all_external_ids(nexuslims_username: str) -> dict[str, str]:
"""
Get all external IDs for a user.
Returns dict mapping external system name to external ID.
Useful for debugging or user profile displays.
Parameters
----------
nexuslims_username
Username from session_log.user
Returns
-------
dict[str, str]
Dict mapping external system name to external ID
Examples
--------
>>> from nexusLIMS.db.models import get_all_external_ids
>>> ids = get_all_external_ids('jsmith')
>>> print(ids)
{
'nemo': '12345',
'labarchives_eln': '285489257Ho...',
'cdcs': 'jsmith@upenn.edu'
}
"""
with DBSession(get_engine()) as session:
results = session.exec(
select(ExternalUserIdentifier).where(
ExternalUserIdentifier.nexuslims_username == nexuslims_username
)
).all()
return {r.external_system: r.external_id for r in results}