Coverage for nexusLIMS/db/models.py: 100%

141 statements  

« prev     ^ index     » next       coverage.py v7.11.3, created at 2026-03-24 05:23 +0000

1"""SQLModel database models for NexusLIMS. 

2 

3This module defines the SQLModel ORM classes that map to the NexusLIMS 

4database tables (`instruments` and `session_log`). 

5""" 

6 

7import datetime 

8import json 

9import logging 

10 

11import pytz 

12from pytz.tzinfo import BaseTzInfo 

13from sqlalchemy import CheckConstraint, UniqueConstraint, types 

14from sqlalchemy.types import TypeDecorator 

15from sqlmodel import Column, Field, Relationship, SQLModel, select 

16from sqlmodel import Session as DBSession 

17 

18from nexusLIMS.db.engine import get_engine 

19from nexusLIMS.db.enums import EventType, ExternalSystem, RecordStatus 

20 

21_logger = logging.getLogger(__name__) 

22 

23 

24class TZDateTime(TypeDecorator): 

25 """ 

26 Custom DateTime type that preserves timezone information in SQLite. 

27 

28 SQLite stores datetimes as TEXT and doesn't preserve timezone info. 

29 This TypeDecorator stores timezone-aware datetimes as ISO-8601 strings 

30 with timezone offset, and restores them as timezone-aware datetime objects. 

31 """ 

32 

33 impl = types.String 

34 cache_ok = True 

35 

36 def process_bind_param(self, value, dialect): # noqa: ARG002 

37 """Convert timezone-aware datetime to ISO string for storage.""" 

38 if value is not None: 

39 if isinstance(value, datetime.datetime): 

40 # Store as ISO string with timezone offset 

41 return value.isoformat() 

42 # Already a string 

43 return value 

44 return value 

45 

46 def process_result_value(self, value, dialect): # noqa: ARG002 

47 """Convert ISO string back to timezone-aware datetime.""" 

48 if value is not None: 

49 if isinstance(value, str): 

50 # Parse ISO string with timezone 

51 return datetime.datetime.fromisoformat(value) 

52 # Already a datetime object 

53 return value 

54 return value 

55 

56 

57class Instrument(SQLModel, table=True): 

58 """ 

59 Instrument configuration from the NexusLIMS database. 

60 

61 Represents an electron microscopy instrument in the facility, 

62 with configuration for calendar integration, file storage, and metadata. 

63 

64 Parameters 

65 ---------- 

66 instrument_pid 

67 Unique identifier for the instrument (e.g., "FEI-Titan-TEM-012345") 

68 api_url 

69 Calendar API endpoint URL for this instrument's scheduler (e.g., 

70 `https://<nemo_address>/api/tools/?id=<tool_id>`) 

71 calendar_url 

72 URL to the instrument's web-accessible calendar 

73 location 

74 Physical location (building and room number) 

75 display_name 

76 Human-readable instrument name displayed in NexusLIMS records 

77 property_tag 

78 Unique numeric identifier (for reference) 

79 filestore_path 

80 Relative path under NX_INSTRUMENT_DATA_PATH where data is stored 

81 harvester 

82 Harvester module to use ("nemo" or "sharepoint") 

83 timezone_str 

84 IANA timezone database string (e.g., "America/New_York") 

85 """ 

86 

87 __tablename__ = "instruments" 

88 

89 # Primary key 

90 instrument_pid: str = Field(primary_key=True, max_length=100) 

91 

92 # Required fields 

93 api_url: str = Field(unique=True) 

94 calendar_url: str 

95 location: str = Field(max_length=100) 

96 display_name: str 

97 property_tag: str = Field(max_length=20) 

98 filestore_path: str 

99 harvester: str = Field(default="nemo") 

100 timezone_str: str = Field( 

101 sa_column_kwargs={"name": "timezone"}, default="America/New_York" 

102 ) 

103 

104 # Relationships 

105 session_logs: list["SessionLog"] = Relationship(back_populates="instrument_obj") 

106 

107 @property 

108 def name(self) -> str: 

109 """Alias for instrument_pid (backward compatibility).""" 

110 return self.instrument_pid 

111 

112 @property 

113 def timezone(self) -> BaseTzInfo: 

114 """Convert timezone string to pytz timezone object.""" 

115 return pytz.timezone(self.timezone_str) 

116 

117 def __repr__(self): 

118 """Return custom representation of an Instrument.""" 

119 return ( 

120 f"Nexus Instrument: {self.name}\n" 

121 f"API url: {self.api_url}\n" 

122 f"Calendar url: {self.calendar_url}\n" 

123 f"Display name: {self.display_name}\n" 

124 f"Location: {self.location}\n" 

125 f"Property tag: {self.property_tag}\n" 

126 f"Filestore path: {self.filestore_path}\n" 

127 f"Harvester: {self.harvester}\n" 

128 f"Timezone: {self.timezone}" 

129 ) 

130 

131 def __str__(self): 

132 """Return custom string representation of an Instrument.""" 

133 return f"{self.name} in {self.location}" if self.location else "" 

134 

135 def localize_datetime(self, _dt: datetime.datetime) -> datetime.datetime: 

136 """ 

137 Localize a datetime to an Instrument's timezone. 

138 

139 Convert a date and time to the timezone of this instrument. If the 

140 supplied datetime is naive (i.e. does not have a timezone), it will be 

141 assumed to already be in the timezone of the instrument, and the 

142 displayed time will not change. If the timezone of the supplied 

143 datetime is different than the instrument's, the time will be 

144 adjusted to compensate for the timezone offset. 

145 

146 Parameters 

147 ---------- 

148 _dt 

149 The datetime object to localize 

150 

151 Returns 

152 ------- 

153 datetime.datetime 

154 A datetime object with the same timezone as the instrument 

155 """ 

156 _logger = logging.getLogger(__name__) 

157 

158 if self.timezone is None: 

159 _logger.warning( 

160 "Tried to localize a datetime with instrument that does not have " 

161 "timezone information (%s)", 

162 self.name, 

163 ) 

164 return _dt 

165 if _dt.tzinfo is None: 

166 # dt is timezone naive 

167 return self.timezone.localize(_dt) 

168 

169 # dt has timezone info 

170 return _dt.astimezone(self.timezone) 

171 

172 def localize_datetime_str( 

173 self, 

174 _dt: datetime.datetime, 

175 fmt: str = "%Y-%m-%d %H:%M:%S %Z", 

176 ) -> str: 

177 """ 

178 Localize a datetime to an Instrument's timezone and return as string. 

179 

180 Convert a date and time to the timezone of this instrument, returning 

181 a textual representation of the object, rather than the datetime 

182 itself. Uses :py:meth:`localize_datetime` for the actual conversion. 

183 

184 Parameters 

185 ---------- 

186 _dt 

187 The datetime object to localize 

188 fmt 

189 The strftime format string to use to format the output 

190 

191 Returns 

192 ------- 

193 str 

194 The formatted textual representation of the localized datetime 

195 """ 

196 return self.localize_datetime(_dt).strftime(fmt) 

197 

198 def to_dict(self) -> dict: 

199 """ 

200 Return a dictionary representation of the Instrument object. 

201 

202 Handles special cases like renaming 'instrument_pid' and 

203 converting timezone objects to strings. 

204 

205 Returns 

206 ------- 

207 dict 

208 A dictionary representation of the instrument, suitable for database 

209 insertion or JSON serialization. 

210 """ 

211 # Convert SQLModel to dict (excludes relationships by default) 

212 return { 

213 "instrument_pid": self.instrument_pid, 

214 "api_url": self.api_url, 

215 "calendar_url": self.calendar_url, 

216 "location": self.location, 

217 "display_name": self.display_name, 

218 "property_tag": self.property_tag, 

219 "filestore_path": self.filestore_path, 

220 "harvester": self.harvester, 

221 "timezone": self.timezone_str, 

222 } 

223 

224 def to_json(self, **kwargs) -> str: 

225 """ 

226 Return a JSON string representation of the Instrument object. 

227 

228 Parameters 

229 ---------- 

230 **kwargs 

231 Additional keyword arguments to pass to `json.dumps`. 

232 

233 Returns 

234 ------- 

235 str 

236 A JSON string representation of the instrument. 

237 """ 

238 return json.dumps(self.to_dict(), **kwargs) 

239 

240 

241class SessionLog(SQLModel, table=True): 

242 """ 

243 Individual session log entry (START, END, or RECORD_GENERATION event). 

244 

245 A simple mapping of one row in the session_log table. Each session 

246 typically has a START and END log with matching session_identifier, 

247 and may have additional RECORD_GENERATION logs. 

248 

249 Parameters 

250 ---------- 

251 session_identifier 

252 A unique string consistent among a single record's START, END, 

253 and RECORD_GENERATION events (often a UUID) 

254 instrument 

255 The instrument associated with this session (foreign key reference 

256 to instruments table) 

257 timestamp 

258 The datetime representing when the event occurred 

259 event_type 

260 The type of log (START, END, or RECORD_GENERATION) 

261 user 

262 The username associated with this session (if known) 

263 record_status 

264 The status for this record (defaults to WAITING_FOR_END) 

265 """ 

266 

267 __tablename__ = "session_log" 

268 __table_args__ = ( 

269 CheckConstraint( 

270 "event_type IN ('START', 'END', 'RECORD_GENERATION')", 

271 name="check_event_type", 

272 ), 

273 CheckConstraint( 

274 "record_status IN ('COMPLETED', 'WAITING_FOR_END', 'TO_BE_BUILT', " 

275 "'BUILT_NOT_EXPORTED', 'ERROR', 'NO_FILES_FOUND', 'NO_CONSENT', " 

276 "'NO_RESERVATION')", 

277 name="check_record_status", 

278 ), 

279 ) 

280 

281 # Primary key 

282 id_session_log: int | None = Field(default=None, primary_key=True) 

283 

284 # Required fields 

285 session_identifier: str = Field(max_length=36, index=True) 

286 instrument: str = Field(foreign_key="instruments.instrument_pid", max_length=100) 

287 timestamp: datetime.datetime = Field( 

288 sa_column=Column(TZDateTime) 

289 ) # Preserve timezone 

290 event_type: EventType # Enum for type safety 

291 record_status: RecordStatus = Field(default=RecordStatus.WAITING_FOR_END) 

292 

293 # Optional field 

294 user: str | None = Field(default=None, max_length=50) 

295 

296 # Relationships 

297 instrument_obj: Instrument | None = Relationship(back_populates="session_logs") 

298 

299 def __repr__(self): 

300 """Return custom representation of a SessionLog.""" 

301 return ( 

302 f"SessionLog (id={self.session_identifier}, " 

303 f"instrument={self.instrument}, " 

304 f"timestamp={self.timestamp}, " 

305 f"event_type={self.event_type.value}, " 

306 f"user={self.user}, " 

307 f"record_status={self.record_status.value})" 

308 ) 

309 

310 def insert_log(self) -> bool: 

311 """ 

312 Insert this log into the NexusLIMS database. 

313 

314 Inserts a log into the database with the information contained within 

315 this SessionLog's attributes (used primarily for NEMO ``usage_event`` 

316 integration). It will check for the presence of a matching record first 

317 and warn without inserting anything if it finds one. 

318 

319 Returns 

320 ------- 

321 success : bool 

322 Whether or not the session log row was inserted successfully 

323 """ 

324 with DBSession(get_engine()) as session: 

325 # Check for existing log 

326 statement = select(SessionLog).where( 

327 SessionLog.session_identifier == self.session_identifier, 

328 SessionLog.instrument == self.instrument, 

329 SessionLog.timestamp == self.timestamp, 

330 SessionLog.event_type == self.event_type, 

331 ) 

332 existing = session.exec(statement).first() 

333 

334 if existing: 

335 _logger.warning("SessionLog already exists: %s", self) 

336 return True 

337 

338 # Insert new log 

339 session.add(self) 

340 session.commit() 

341 return True 

342 

343 

344class UploadLog(SQLModel, table=True): 

345 """ 

346 Log of export attempts to destination repositories. 

347 

348 Tracks per-destination export results for each session, enabling 

349 multi-destination export with granular success/failure tracking. 

350 

351 Parameters 

352 ---------- 

353 id 

354 Auto-incrementing primary key 

355 session_identifier 

356 Foreign key reference to session_log.session_identifier 

357 destination_name 

358 Name of the export destination (e.g., "cdcs", "labarchives") 

359 success 

360 Whether the export succeeded 

361 record_id 

362 Destination-specific record identifier (if successful) 

363 record_url 

364 Direct URL to view the exported record (if successful) 

365 error_message 

366 Error message if export failed 

367 timestamp 

368 When the export attempt occurred 

369 metadata_json 

370 JSON-serialized metadata dict with destination-specific details 

371 """ 

372 

373 __tablename__ = "upload_log" 

374 

375 # Primary key 

376 id: int | None = Field(default=None, primary_key=True) 

377 

378 # Required fields 

379 session_identifier: str = Field(index=True, max_length=36) 

380 destination_name: str = Field(index=True, max_length=100) 

381 success: bool 

382 timestamp: datetime.datetime = Field(sa_column=Column(TZDateTime)) 

383 

384 # Optional fields 

385 record_id: str | None = Field(default=None, max_length=255) 

386 record_url: str | None = Field(default=None, max_length=500) 

387 error_message: str | None = Field(default=None) 

388 metadata_json: str | None = Field(default=None) 

389 

390 def __repr__(self): 

391 """Return custom representation of an UploadLog.""" 

392 status = "SUCCESS" if self.success else "FAILED" 

393 return ( 

394 f"UploadLog (session={self.session_identifier}, " 

395 f"destination={self.destination_name}, " 

396 f"status={status}, " 

397 f"timestamp={self.timestamp})" 

398 ) 

399 

400 

401class ExternalUserIdentifier(SQLModel, table=True): 

402 """ 

403 Maps NexusLIMS usernames to external system user IDs. 

404 

405 Maintains a star topology with nexuslims_username (from session_log.user) 

406 as the canonical identifier, mapping to external system IDs. 

407 

408 Parameters 

409 ---------- 

410 id 

411 Auto-incrementing primary key 

412 nexuslims_username 

413 Canonical username in NexusLIMS (from session_log.user) 

414 external_system 

415 External system identifier (nemo, labarchives_eln, etc.) 

416 external_id 

417 User ID/username in the external system 

418 email 

419 User's email for verification/matching (optional) 

420 created_at 

421 When this mapping was created 

422 last_verified_at 

423 Last time this mapping was verified (optional) 

424 notes 

425 Additional notes about this mapping (optional) 

426 

427 Examples 

428 -------- 

429 >>> # NEMO harvester user ID 

430 >>> ExternalUserIdentifier( 

431 ... nexuslims_username='jsmith', 

432 ... external_system=ExternalSystem.NEMO, 

433 ... external_id='12345' 

434 ... ) 

435 

436 >>> # LabArchives UID from OAuth 

437 >>> ExternalUserIdentifier( 

438 ... nexuslims_username='jsmith', 

439 ... external_system=ExternalSystem.LABARCHIVES_ELN, 

440 ... external_id='285489257Ho...', 

441 ... email='jsmith@upenn.edu' 

442 ... ) 

443 """ 

444 

445 __tablename__ = "external_user_identifiers" 

446 __table_args__ = ( 

447 # NOTE: This CHECK constraint is dynamically generated from ExternalSystem enum 

448 # to keep the model in sync with the enum. However, migrations should hardcode 

449 # the values to preserve historical accuracy. When adding a new system, create 

450 # a new migration to update the CHECK constraint. 

451 CheckConstraint( 

452 f"external_system IN ({', '.join(repr(s.value) for s in ExternalSystem)})", 

453 name="valid_external_system", 

454 ), 

455 # UNIQUE constraints to enforce star-topology design 

456 UniqueConstraint( 

457 "nexuslims_username", 

458 "external_system", 

459 name="nexuslims_username_system_UNIQUE", 

460 ), 

461 UniqueConstraint( 

462 "external_system", "external_id", name="system_external_id_UNIQUE" 

463 ), 

464 ) 

465 

466 # Primary key 

467 id: int | None = Field(default=None, primary_key=True) 

468 

469 # Required fields 

470 nexuslims_username: str = Field(index=True) 

471 external_system: str = Field() 

472 external_id: str = Field() 

473 

474 # Optional fields 

475 email: str | None = Field(default=None) 

476 created_at: datetime.datetime = Field( 

477 default_factory=lambda: datetime.datetime.now(pytz.UTC), 

478 sa_column=Column(TZDateTime), 

479 ) 

480 last_verified_at: datetime.datetime | None = Field( 

481 default=None, sa_column=Column(TZDateTime, nullable=True) 

482 ) 

483 notes: str | None = Field(default=None) 

484 

485 def __repr__(self): 

486 """Return custom representation of an ExternalUserIdentifier.""" 

487 return ( 

488 f"ExternalUserIdentifier (username={self.nexuslims_username}, " 

489 f"system={self.external_system}, " 

490 f"external_id={self.external_id})" 

491 ) 

492 

493 

494def get_external_id( 

495 nexuslims_username: str, external_system: ExternalSystem 

496) -> str | None: 

497 """ 

498 Get external system ID for a NexusLIMS user. 

499 

500 Parameters 

501 ---------- 

502 nexuslims_username 

503 Username from session_log.user 

504 external_system 

505 Target external system 

506 

507 Returns 

508 ------- 

509 str | None 

510 External ID if found, None otherwise 

511 

512 Examples 

513 -------- 

514 >>> from nexusLIMS.db.models import get_external_id 

515 >>> from nexusLIMS.db.enums import ExternalSystem 

516 >>> uid = get_external_id('jsmith', ExternalSystem.LABARCHIVES_ELN) 

517 >>> print(uid) 

518 '285489257Ho...' 

519 """ 

520 with DBSession(get_engine()) as session: 

521 result = session.exec( 

522 select(ExternalUserIdentifier).where( 

523 ExternalUserIdentifier.nexuslims_username == nexuslims_username, 

524 ExternalUserIdentifier.external_system == external_system.value, 

525 ) 

526 ).first() 

527 return result.external_id if result else None 

528 

529 

530def get_nexuslims_username( 

531 external_id: str, external_system: ExternalSystem 

532) -> str | None: 

533 """ 

534 Reverse lookup: find NexusLIMS username from external ID. 

535 

536 Useful for harvesters that receive external IDs (e.g., NEMO user IDs) 

537 and need to map them to NexusLIMS usernames for session_log entries. 

538 

539 Parameters 

540 ---------- 

541 external_id 

542 ID in external system 

543 external_system 

544 Source external system 

545 

546 Returns 

547 ------- 

548 str | None 

549 NexusLIMS username if found, None otherwise 

550 

551 Examples 

552 -------- 

553 >>> from nexusLIMS.db.models import get_nexuslims_username 

554 >>> from nexusLIMS.db.enums import ExternalSystem 

555 >>> username = get_nexuslims_username('12345', ExternalSystem.NEMO) 

556 >>> print(username) 

557 'jsmith' 

558 """ 

559 with DBSession(get_engine()) as session: 

560 result = session.exec( 

561 select(ExternalUserIdentifier).where( 

562 ExternalUserIdentifier.external_id == external_id, 

563 ExternalUserIdentifier.external_system == external_system.value, 

564 ) 

565 ).first() 

566 return result.nexuslims_username if result else None 

567 

568 

569def store_external_id( 

570 nexuslims_username: str, 

571 external_system: ExternalSystem, 

572 external_id: str, 

573 email: str | None = None, 

574 notes: str | None = None, 

575) -> ExternalUserIdentifier: 

576 """ 

577 Store or update external ID mapping. 

578 

579 If mapping exists for this user/system combination, updates it and 

580 refreshes last_verified_at. Otherwise, creates new mapping. 

581 

582 Parameters 

583 ---------- 

584 nexuslims_username 

585 Username from session_log.user 

586 external_system 

587 Target external system 

588 external_id 

589 ID in external system 

590 email 

591 Optional email for verification 

592 notes 

593 Optional notes about this mapping 

594 

595 Returns 

596 ------- 

597 ExternalUserIdentifier 

598 Created or updated ExternalUserIdentifier record 

599 

600 Examples 

601 -------- 

602 >>> from nexusLIMS.db.models import store_external_id 

603 >>> from nexusLIMS.db.enums import ExternalSystem 

604 >>> record = store_external_id( 

605 ... nexuslims_username='jsmith', 

606 ... external_system=ExternalSystem.LABARCHIVES_ELN, 

607 ... external_id='285489257Ho...', 

608 ... email='jsmith@upenn.edu', 

609 ... notes='OAuth registration portal 2026-01-25' 

610 ... ) 

611 """ 

612 with DBSession(get_engine()) as session: 

613 # Check if mapping exists 

614 existing = session.exec( 

615 select(ExternalUserIdentifier).where( 

616 ExternalUserIdentifier.nexuslims_username == nexuslims_username, 

617 ExternalUserIdentifier.external_system == external_system.value, 

618 ) 

619 ).first() 

620 

621 if existing: 

622 # Update existing 

623 existing.external_id = external_id 

624 if email: 

625 existing.email = email 

626 if notes: 

627 existing.notes = notes 

628 existing.last_verified_at = datetime.datetime.now(pytz.UTC) 

629 session.add(existing) 

630 else: 

631 # Create new 

632 existing = ExternalUserIdentifier( 

633 nexuslims_username=nexuslims_username, 

634 external_system=external_system.value, 

635 external_id=external_id, 

636 email=email, 

637 notes=notes, 

638 ) 

639 session.add(existing) 

640 

641 session.commit() 

642 session.refresh(existing) 

643 return existing 

644 

645 

646def get_all_external_ids(nexuslims_username: str) -> dict[str, str]: 

647 """ 

648 Get all external IDs for a user. 

649 

650 Returns dict mapping external system name to external ID. 

651 Useful for debugging or user profile displays. 

652 

653 Parameters 

654 ---------- 

655 nexuslims_username 

656 Username from session_log.user 

657 

658 Returns 

659 ------- 

660 dict[str, str] 

661 Dict mapping external system name to external ID 

662 

663 Examples 

664 -------- 

665 >>> from nexusLIMS.db.models import get_all_external_ids 

666 >>> ids = get_all_external_ids('jsmith') 

667 >>> print(ids) 

668 { 

669 'nemo': '12345', 

670 'labarchives_eln': '285489257Ho...', 

671 'cdcs': 'jsmith@upenn.edu' 

672 } 

673 """ 

674 with DBSession(get_engine()) as session: 

675 results = session.exec( 

676 select(ExternalUserIdentifier).where( 

677 ExternalUserIdentifier.nexuslims_username == nexuslims_username 

678 ) 

679 ).all() 

680 return {r.external_system: r.external_id for r in results}