Coverage for nexusLIMS/db/models.py: 100%
141 statements
« prev ^ index » next coverage.py v7.11.3, created at 2026-03-24 05:23 +0000
« prev ^ index » next coverage.py v7.11.3, created at 2026-03-24 05:23 +0000
1"""SQLModel database models for NexusLIMS.
3This module defines the SQLModel ORM classes that map to the NexusLIMS
4database tables (`instruments` and `session_log`).
5"""
7import datetime
8import json
9import logging
11import pytz
12from pytz.tzinfo import BaseTzInfo
13from sqlalchemy import CheckConstraint, UniqueConstraint, types
14from sqlalchemy.types import TypeDecorator
15from sqlmodel import Column, Field, Relationship, SQLModel, select
16from sqlmodel import Session as DBSession
18from nexusLIMS.db.engine import get_engine
19from nexusLIMS.db.enums import EventType, ExternalSystem, RecordStatus
21_logger = logging.getLogger(__name__)
24class TZDateTime(TypeDecorator):
25 """
26 Custom DateTime type that preserves timezone information in SQLite.
28 SQLite stores datetimes as TEXT and doesn't preserve timezone info.
29 This TypeDecorator stores timezone-aware datetimes as ISO-8601 strings
30 with timezone offset, and restores them as timezone-aware datetime objects.
31 """
33 impl = types.String
34 cache_ok = True
36 def process_bind_param(self, value, dialect): # noqa: ARG002
37 """Convert timezone-aware datetime to ISO string for storage."""
38 if value is not None:
39 if isinstance(value, datetime.datetime):
40 # Store as ISO string with timezone offset
41 return value.isoformat()
42 # Already a string
43 return value
44 return value
46 def process_result_value(self, value, dialect): # noqa: ARG002
47 """Convert ISO string back to timezone-aware datetime."""
48 if value is not None:
49 if isinstance(value, str):
50 # Parse ISO string with timezone
51 return datetime.datetime.fromisoformat(value)
52 # Already a datetime object
53 return value
54 return value
57class Instrument(SQLModel, table=True):
58 """
59 Instrument configuration from the NexusLIMS database.
61 Represents an electron microscopy instrument in the facility,
62 with configuration for calendar integration, file storage, and metadata.
64 Parameters
65 ----------
66 instrument_pid
67 Unique identifier for the instrument (e.g., "FEI-Titan-TEM-012345")
68 api_url
69 Calendar API endpoint URL for this instrument's scheduler (e.g.,
70 `https://<nemo_address>/api/tools/?id=<tool_id>`)
71 calendar_url
72 URL to the instrument's web-accessible calendar
73 location
74 Physical location (building and room number)
75 display_name
76 Human-readable instrument name displayed in NexusLIMS records
77 property_tag
78 Unique numeric identifier (for reference)
79 filestore_path
80 Relative path under NX_INSTRUMENT_DATA_PATH where data is stored
81 harvester
82 Harvester module to use ("nemo" or "sharepoint")
83 timezone_str
84 IANA timezone database string (e.g., "America/New_York")
85 """
87 __tablename__ = "instruments"
89 # Primary key
90 instrument_pid: str = Field(primary_key=True, max_length=100)
92 # Required fields
93 api_url: str = Field(unique=True)
94 calendar_url: str
95 location: str = Field(max_length=100)
96 display_name: str
97 property_tag: str = Field(max_length=20)
98 filestore_path: str
99 harvester: str = Field(default="nemo")
100 timezone_str: str = Field(
101 sa_column_kwargs={"name": "timezone"}, default="America/New_York"
102 )
104 # Relationships
105 session_logs: list["SessionLog"] = Relationship(back_populates="instrument_obj")
107 @property
108 def name(self) -> str:
109 """Alias for instrument_pid (backward compatibility)."""
110 return self.instrument_pid
112 @property
113 def timezone(self) -> BaseTzInfo:
114 """Convert timezone string to pytz timezone object."""
115 return pytz.timezone(self.timezone_str)
117 def __repr__(self):
118 """Return custom representation of an Instrument."""
119 return (
120 f"Nexus Instrument: {self.name}\n"
121 f"API url: {self.api_url}\n"
122 f"Calendar url: {self.calendar_url}\n"
123 f"Display name: {self.display_name}\n"
124 f"Location: {self.location}\n"
125 f"Property tag: {self.property_tag}\n"
126 f"Filestore path: {self.filestore_path}\n"
127 f"Harvester: {self.harvester}\n"
128 f"Timezone: {self.timezone}"
129 )
131 def __str__(self):
132 """Return custom string representation of an Instrument."""
133 return f"{self.name} in {self.location}" if self.location else ""
135 def localize_datetime(self, _dt: datetime.datetime) -> datetime.datetime:
136 """
137 Localize a datetime to an Instrument's timezone.
139 Convert a date and time to the timezone of this instrument. If the
140 supplied datetime is naive (i.e. does not have a timezone), it will be
141 assumed to already be in the timezone of the instrument, and the
142 displayed time will not change. If the timezone of the supplied
143 datetime is different than the instrument's, the time will be
144 adjusted to compensate for the timezone offset.
146 Parameters
147 ----------
148 _dt
149 The datetime object to localize
151 Returns
152 -------
153 datetime.datetime
154 A datetime object with the same timezone as the instrument
155 """
156 _logger = logging.getLogger(__name__)
158 if self.timezone is None:
159 _logger.warning(
160 "Tried to localize a datetime with instrument that does not have "
161 "timezone information (%s)",
162 self.name,
163 )
164 return _dt
165 if _dt.tzinfo is None:
166 # dt is timezone naive
167 return self.timezone.localize(_dt)
169 # dt has timezone info
170 return _dt.astimezone(self.timezone)
172 def localize_datetime_str(
173 self,
174 _dt: datetime.datetime,
175 fmt: str = "%Y-%m-%d %H:%M:%S %Z",
176 ) -> str:
177 """
178 Localize a datetime to an Instrument's timezone and return as string.
180 Convert a date and time to the timezone of this instrument, returning
181 a textual representation of the object, rather than the datetime
182 itself. Uses :py:meth:`localize_datetime` for the actual conversion.
184 Parameters
185 ----------
186 _dt
187 The datetime object to localize
188 fmt
189 The strftime format string to use to format the output
191 Returns
192 -------
193 str
194 The formatted textual representation of the localized datetime
195 """
196 return self.localize_datetime(_dt).strftime(fmt)
198 def to_dict(self) -> dict:
199 """
200 Return a dictionary representation of the Instrument object.
202 Handles special cases like renaming 'instrument_pid' and
203 converting timezone objects to strings.
205 Returns
206 -------
207 dict
208 A dictionary representation of the instrument, suitable for database
209 insertion or JSON serialization.
210 """
211 # Convert SQLModel to dict (excludes relationships by default)
212 return {
213 "instrument_pid": self.instrument_pid,
214 "api_url": self.api_url,
215 "calendar_url": self.calendar_url,
216 "location": self.location,
217 "display_name": self.display_name,
218 "property_tag": self.property_tag,
219 "filestore_path": self.filestore_path,
220 "harvester": self.harvester,
221 "timezone": self.timezone_str,
222 }
224 def to_json(self, **kwargs) -> str:
225 """
226 Return a JSON string representation of the Instrument object.
228 Parameters
229 ----------
230 **kwargs
231 Additional keyword arguments to pass to `json.dumps`.
233 Returns
234 -------
235 str
236 A JSON string representation of the instrument.
237 """
238 return json.dumps(self.to_dict(), **kwargs)
241class SessionLog(SQLModel, table=True):
242 """
243 Individual session log entry (START, END, or RECORD_GENERATION event).
245 A simple mapping of one row in the session_log table. Each session
246 typically has a START and END log with matching session_identifier,
247 and may have additional RECORD_GENERATION logs.
249 Parameters
250 ----------
251 session_identifier
252 A unique string consistent among a single record's START, END,
253 and RECORD_GENERATION events (often a UUID)
254 instrument
255 The instrument associated with this session (foreign key reference
256 to instruments table)
257 timestamp
258 The datetime representing when the event occurred
259 event_type
260 The type of log (START, END, or RECORD_GENERATION)
261 user
262 The username associated with this session (if known)
263 record_status
264 The status for this record (defaults to WAITING_FOR_END)
265 """
267 __tablename__ = "session_log"
268 __table_args__ = (
269 CheckConstraint(
270 "event_type IN ('START', 'END', 'RECORD_GENERATION')",
271 name="check_event_type",
272 ),
273 CheckConstraint(
274 "record_status IN ('COMPLETED', 'WAITING_FOR_END', 'TO_BE_BUILT', "
275 "'BUILT_NOT_EXPORTED', 'ERROR', 'NO_FILES_FOUND', 'NO_CONSENT', "
276 "'NO_RESERVATION')",
277 name="check_record_status",
278 ),
279 )
281 # Primary key
282 id_session_log: int | None = Field(default=None, primary_key=True)
284 # Required fields
285 session_identifier: str = Field(max_length=36, index=True)
286 instrument: str = Field(foreign_key="instruments.instrument_pid", max_length=100)
287 timestamp: datetime.datetime = Field(
288 sa_column=Column(TZDateTime)
289 ) # Preserve timezone
290 event_type: EventType # Enum for type safety
291 record_status: RecordStatus = Field(default=RecordStatus.WAITING_FOR_END)
293 # Optional field
294 user: str | None = Field(default=None, max_length=50)
296 # Relationships
297 instrument_obj: Instrument | None = Relationship(back_populates="session_logs")
299 def __repr__(self):
300 """Return custom representation of a SessionLog."""
301 return (
302 f"SessionLog (id={self.session_identifier}, "
303 f"instrument={self.instrument}, "
304 f"timestamp={self.timestamp}, "
305 f"event_type={self.event_type.value}, "
306 f"user={self.user}, "
307 f"record_status={self.record_status.value})"
308 )
310 def insert_log(self) -> bool:
311 """
312 Insert this log into the NexusLIMS database.
314 Inserts a log into the database with the information contained within
315 this SessionLog's attributes (used primarily for NEMO ``usage_event``
316 integration). It will check for the presence of a matching record first
317 and warn without inserting anything if it finds one.
319 Returns
320 -------
321 success : bool
322 Whether or not the session log row was inserted successfully
323 """
324 with DBSession(get_engine()) as session:
325 # Check for existing log
326 statement = select(SessionLog).where(
327 SessionLog.session_identifier == self.session_identifier,
328 SessionLog.instrument == self.instrument,
329 SessionLog.timestamp == self.timestamp,
330 SessionLog.event_type == self.event_type,
331 )
332 existing = session.exec(statement).first()
334 if existing:
335 _logger.warning("SessionLog already exists: %s", self)
336 return True
338 # Insert new log
339 session.add(self)
340 session.commit()
341 return True
344class UploadLog(SQLModel, table=True):
345 """
346 Log of export attempts to destination repositories.
348 Tracks per-destination export results for each session, enabling
349 multi-destination export with granular success/failure tracking.
351 Parameters
352 ----------
353 id
354 Auto-incrementing primary key
355 session_identifier
356 Foreign key reference to session_log.session_identifier
357 destination_name
358 Name of the export destination (e.g., "cdcs", "labarchives")
359 success
360 Whether the export succeeded
361 record_id
362 Destination-specific record identifier (if successful)
363 record_url
364 Direct URL to view the exported record (if successful)
365 error_message
366 Error message if export failed
367 timestamp
368 When the export attempt occurred
369 metadata_json
370 JSON-serialized metadata dict with destination-specific details
371 """
373 __tablename__ = "upload_log"
375 # Primary key
376 id: int | None = Field(default=None, primary_key=True)
378 # Required fields
379 session_identifier: str = Field(index=True, max_length=36)
380 destination_name: str = Field(index=True, max_length=100)
381 success: bool
382 timestamp: datetime.datetime = Field(sa_column=Column(TZDateTime))
384 # Optional fields
385 record_id: str | None = Field(default=None, max_length=255)
386 record_url: str | None = Field(default=None, max_length=500)
387 error_message: str | None = Field(default=None)
388 metadata_json: str | None = Field(default=None)
390 def __repr__(self):
391 """Return custom representation of an UploadLog."""
392 status = "SUCCESS" if self.success else "FAILED"
393 return (
394 f"UploadLog (session={self.session_identifier}, "
395 f"destination={self.destination_name}, "
396 f"status={status}, "
397 f"timestamp={self.timestamp})"
398 )
401class ExternalUserIdentifier(SQLModel, table=True):
402 """
403 Maps NexusLIMS usernames to external system user IDs.
405 Maintains a star topology with nexuslims_username (from session_log.user)
406 as the canonical identifier, mapping to external system IDs.
408 Parameters
409 ----------
410 id
411 Auto-incrementing primary key
412 nexuslims_username
413 Canonical username in NexusLIMS (from session_log.user)
414 external_system
415 External system identifier (nemo, labarchives_eln, etc.)
416 external_id
417 User ID/username in the external system
418 email
419 User's email for verification/matching (optional)
420 created_at
421 When this mapping was created
422 last_verified_at
423 Last time this mapping was verified (optional)
424 notes
425 Additional notes about this mapping (optional)
427 Examples
428 --------
429 >>> # NEMO harvester user ID
430 >>> ExternalUserIdentifier(
431 ... nexuslims_username='jsmith',
432 ... external_system=ExternalSystem.NEMO,
433 ... external_id='12345'
434 ... )
436 >>> # LabArchives UID from OAuth
437 >>> ExternalUserIdentifier(
438 ... nexuslims_username='jsmith',
439 ... external_system=ExternalSystem.LABARCHIVES_ELN,
440 ... external_id='285489257Ho...',
441 ... email='jsmith@upenn.edu'
442 ... )
443 """
445 __tablename__ = "external_user_identifiers"
446 __table_args__ = (
447 # NOTE: This CHECK constraint is dynamically generated from ExternalSystem enum
448 # to keep the model in sync with the enum. However, migrations should hardcode
449 # the values to preserve historical accuracy. When adding a new system, create
450 # a new migration to update the CHECK constraint.
451 CheckConstraint(
452 f"external_system IN ({', '.join(repr(s.value) for s in ExternalSystem)})",
453 name="valid_external_system",
454 ),
455 # UNIQUE constraints to enforce star-topology design
456 UniqueConstraint(
457 "nexuslims_username",
458 "external_system",
459 name="nexuslims_username_system_UNIQUE",
460 ),
461 UniqueConstraint(
462 "external_system", "external_id", name="system_external_id_UNIQUE"
463 ),
464 )
466 # Primary key
467 id: int | None = Field(default=None, primary_key=True)
469 # Required fields
470 nexuslims_username: str = Field(index=True)
471 external_system: str = Field()
472 external_id: str = Field()
474 # Optional fields
475 email: str | None = Field(default=None)
476 created_at: datetime.datetime = Field(
477 default_factory=lambda: datetime.datetime.now(pytz.UTC),
478 sa_column=Column(TZDateTime),
479 )
480 last_verified_at: datetime.datetime | None = Field(
481 default=None, sa_column=Column(TZDateTime, nullable=True)
482 )
483 notes: str | None = Field(default=None)
485 def __repr__(self):
486 """Return custom representation of an ExternalUserIdentifier."""
487 return (
488 f"ExternalUserIdentifier (username={self.nexuslims_username}, "
489 f"system={self.external_system}, "
490 f"external_id={self.external_id})"
491 )
494def get_external_id(
495 nexuslims_username: str, external_system: ExternalSystem
496) -> str | None:
497 """
498 Get external system ID for a NexusLIMS user.
500 Parameters
501 ----------
502 nexuslims_username
503 Username from session_log.user
504 external_system
505 Target external system
507 Returns
508 -------
509 str | None
510 External ID if found, None otherwise
512 Examples
513 --------
514 >>> from nexusLIMS.db.models import get_external_id
515 >>> from nexusLIMS.db.enums import ExternalSystem
516 >>> uid = get_external_id('jsmith', ExternalSystem.LABARCHIVES_ELN)
517 >>> print(uid)
518 '285489257Ho...'
519 """
520 with DBSession(get_engine()) as session:
521 result = session.exec(
522 select(ExternalUserIdentifier).where(
523 ExternalUserIdentifier.nexuslims_username == nexuslims_username,
524 ExternalUserIdentifier.external_system == external_system.value,
525 )
526 ).first()
527 return result.external_id if result else None
530def get_nexuslims_username(
531 external_id: str, external_system: ExternalSystem
532) -> str | None:
533 """
534 Reverse lookup: find NexusLIMS username from external ID.
536 Useful for harvesters that receive external IDs (e.g., NEMO user IDs)
537 and need to map them to NexusLIMS usernames for session_log entries.
539 Parameters
540 ----------
541 external_id
542 ID in external system
543 external_system
544 Source external system
546 Returns
547 -------
548 str | None
549 NexusLIMS username if found, None otherwise
551 Examples
552 --------
553 >>> from nexusLIMS.db.models import get_nexuslims_username
554 >>> from nexusLIMS.db.enums import ExternalSystem
555 >>> username = get_nexuslims_username('12345', ExternalSystem.NEMO)
556 >>> print(username)
557 'jsmith'
558 """
559 with DBSession(get_engine()) as session:
560 result = session.exec(
561 select(ExternalUserIdentifier).where(
562 ExternalUserIdentifier.external_id == external_id,
563 ExternalUserIdentifier.external_system == external_system.value,
564 )
565 ).first()
566 return result.nexuslims_username if result else None
569def store_external_id(
570 nexuslims_username: str,
571 external_system: ExternalSystem,
572 external_id: str,
573 email: str | None = None,
574 notes: str | None = None,
575) -> ExternalUserIdentifier:
576 """
577 Store or update external ID mapping.
579 If mapping exists for this user/system combination, updates it and
580 refreshes last_verified_at. Otherwise, creates new mapping.
582 Parameters
583 ----------
584 nexuslims_username
585 Username from session_log.user
586 external_system
587 Target external system
588 external_id
589 ID in external system
590 email
591 Optional email for verification
592 notes
593 Optional notes about this mapping
595 Returns
596 -------
597 ExternalUserIdentifier
598 Created or updated ExternalUserIdentifier record
600 Examples
601 --------
602 >>> from nexusLIMS.db.models import store_external_id
603 >>> from nexusLIMS.db.enums import ExternalSystem
604 >>> record = store_external_id(
605 ... nexuslims_username='jsmith',
606 ... external_system=ExternalSystem.LABARCHIVES_ELN,
607 ... external_id='285489257Ho...',
608 ... email='jsmith@upenn.edu',
609 ... notes='OAuth registration portal 2026-01-25'
610 ... )
611 """
612 with DBSession(get_engine()) as session:
613 # Check if mapping exists
614 existing = session.exec(
615 select(ExternalUserIdentifier).where(
616 ExternalUserIdentifier.nexuslims_username == nexuslims_username,
617 ExternalUserIdentifier.external_system == external_system.value,
618 )
619 ).first()
621 if existing:
622 # Update existing
623 existing.external_id = external_id
624 if email:
625 existing.email = email
626 if notes:
627 existing.notes = notes
628 existing.last_verified_at = datetime.datetime.now(pytz.UTC)
629 session.add(existing)
630 else:
631 # Create new
632 existing = ExternalUserIdentifier(
633 nexuslims_username=nexuslims_username,
634 external_system=external_system.value,
635 external_id=external_id,
636 email=email,
637 notes=notes,
638 )
639 session.add(existing)
641 session.commit()
642 session.refresh(existing)
643 return existing
646def get_all_external_ids(nexuslims_username: str) -> dict[str, str]:
647 """
648 Get all external IDs for a user.
650 Returns dict mapping external system name to external ID.
651 Useful for debugging or user profile displays.
653 Parameters
654 ----------
655 nexuslims_username
656 Username from session_log.user
658 Returns
659 -------
660 dict[str, str]
661 Dict mapping external system name to external ID
663 Examples
664 --------
665 >>> from nexusLIMS.db.models import get_all_external_ids
666 >>> ids = get_all_external_ids('jsmith')
667 >>> print(ids)
668 {
669 'nemo': '12345',
670 'labarchives_eln': '285489257Ho...',
671 'cdcs': 'jsmith@upenn.edu'
672 }
673 """
674 with DBSession(get_engine()) as session:
675 results = session.exec(
676 select(ExternalUserIdentifier).where(
677 ExternalUserIdentifier.nexuslims_username == nexuslims_username
678 )
679 ).all()
680 return {r.external_system: r.external_id for r in results}