Coverage for nexusLIMS/builder/preflight.py: 100%
243 statements
« prev ^ index » next coverage.py v7.11.3, created at 2026-03-24 05:23 +0000
« prev ^ index » next coverage.py v7.11.3, created at 2026-03-24 05:23 +0000
1"""Preflight checks for the NexusLIMS record builder.
3This module provides a ``run_preflight_checks()`` function that validates
4the environment before the record builder starts any harvesting or record
5building work. Misconfigurations are caught early and reported with
6actionable messages.
8Attributes
9----------
10CheckResult
11 Dataclass representing the result of a single preflight check.
12PreflightError
13 Exception raised when one or more error-severity checks fail.
14"""
16import logging
17import os
18import time
19from dataclasses import dataclass
20from importlib import import_module
21from pathlib import Path
22from typing import Literal
24import pytz
25from alembic.config import Config
26from alembic.script import ScriptDirectory
27from sqlalchemy import text
28from sqlmodel import Session as DBSession
29from sqlmodel import SQLModel, select
31from nexusLIMS.config import settings
32from nexusLIMS.db.engine import get_engine
33from nexusLIMS.db.models import ( # noqa: F401 — imported to populate SQLModel.metadata
34 ExternalUserIdentifier,
35 Instrument,
36 SessionLog,
37 UploadLog,
38)
39from nexusLIMS.exporters.registry import get_registry
40from nexusLIMS.harvesters.nemo.utils import get_harvesters_enabled
41from nexusLIMS.utils.network import nexus_req
43_logger = logging.getLogger(__name__)
45# Absolute path to alembic.ini at the project root (CWD-independent)
46_ALEMBIC_INI_PATH = Path(__file__).parents[2] / "alembic.ini"
49@dataclass
50class CheckResult:
51 """Result of a single preflight check.
53 Parameters
54 ----------
55 name
56 Short label for the check (e.g. ``"db_tables"``).
57 passed
58 Whether the check passed.
59 severity
60 ``"error"`` if the check failure means the run cannot succeed;
61 ``"warning"`` if suspicious but the run may still succeed.
62 message
63 Human-readable, actionable description of the result.
64 """
66 name: str
67 passed: bool
68 severity: Literal["error", "warning"]
69 message: str
72class PreflightError(Exception):
73 """Raised when one or more preflight checks with severity='error' fail.
75 Parameters
76 ----------
77 failed_checks
78 The list of failed :class:`CheckResult` objects with
79 ``severity="error"``.
80 """
82 def __init__(self, failed_checks: list[CheckResult]) -> None:
83 self.failed_checks = failed_checks
84 names = ", ".join(c.name for c in failed_checks)
85 super().__init__(f"Preflight checks failed: {names}")
88# ---------------------------------------------------------------------------
89# Individual check helpers
90# ---------------------------------------------------------------------------
93def _check_db_reachable() -> CheckResult:
94 """Check that the SQLite database file exists and is connectable."""
95 name = "db_reachable"
96 db_path = Path(settings.NX_DB_PATH)
98 if not db_path.exists():
99 return CheckResult(
100 name=name,
101 passed=False,
102 severity="error",
103 message=(
104 f"Database file not found: {db_path}. "
105 "Run 'nexuslims db init' to create the database."
106 ),
107 )
109 try:
110 with DBSession(get_engine()) as session:
111 session.exec(text("SELECT 1")) # type: ignore[call-overload]
112 except Exception as exc:
113 return CheckResult(
114 name=name,
115 passed=False,
116 severity="error",
117 message=(
118 f"Cannot connect to database at {db_path}: {exc}. "
119 "Ensure the file is a valid SQLite database."
120 ),
121 )
123 return CheckResult(
124 name=name,
125 passed=True,
126 severity="error",
127 message=f"Database reachable at {db_path}.",
128 )
131def _check_db_tables() -> CheckResult:
132 """Check that all expected ORM tables exist in the database."""
133 name = "db_tables"
135 # SQLModel.metadata.tables is populated by the model imports at module top
136 expected = set(SQLModel.metadata.tables.keys())
138 try:
139 with DBSession(get_engine()) as session:
140 rows = session.exec(
141 text("SELECT name FROM sqlite_master WHERE type='table'") # type: ignore[call-overload]
142 ).all()
143 actual = {row[0] for row in rows}
144 except Exception as exc:
145 return CheckResult(
146 name=name,
147 passed=False,
148 severity="error",
149 message=f"Could not query database tables: {exc}",
150 )
152 missing = expected - actual
153 if missing:
154 return CheckResult(
155 name=name,
156 passed=False,
157 severity="error",
158 message=(
159 f"Missing tables: {', '.join(sorted(missing))}. "
160 "Run 'nexuslims db upgrade' to apply migrations."
161 ),
162 )
164 return CheckResult(
165 name=name,
166 passed=True,
167 severity="error",
168 message=f"All expected tables present: {', '.join(sorted(expected))}.",
169 )
172def _check_alembic_migration() -> CheckResult:
173 """Check that the database schema is at the latest Alembic migration."""
174 name = "alembic_migration"
176 try:
177 cfg = Config(str(_ALEMBIC_INI_PATH))
178 # script_location in alembic.ini is a relative path; resolve it to an
179 # absolute path so the check works regardless of CWD.
180 cfg.set_main_option(
181 "script_location",
182 str(_ALEMBIC_INI_PATH.parent / "nexusLIMS" / "db" / "migrations"),
183 )
184 script = ScriptDirectory.from_config(cfg)
185 head_rev = script.get_current_head()
186 except Exception as exc:
187 return CheckResult(
188 name=name,
189 passed=False,
190 severity="error",
191 message=f"Could not determine Alembic head revision: {exc}",
192 )
194 if head_rev is None:
195 # No migrations defined yet — nothing to check
196 return CheckResult(
197 name=name,
198 passed=True,
199 severity="error",
200 message="No Alembic revisions found; skipping migration check.",
201 )
203 # Query current revision from the database
204 try:
205 with DBSession(get_engine()) as session:
206 # Check if alembic_version table exists first
207 tables_result = session.exec(
208 text( # type: ignore[call-overload]
209 "SELECT name FROM sqlite_master WHERE type='table' "
210 "AND name='alembic_version'"
211 )
212 ).first()
213 if tables_result is None:
214 return CheckResult(
215 name=name,
216 passed=False,
217 severity="error",
218 message=(
219 "alembic_version table not found — database is not managed by "
220 "Alembic. Run 'nexuslims db upgrade' to initialise migrations."
221 ),
222 )
223 current_rev = session.exec(
224 text("SELECT version_num FROM alembic_version") # type: ignore[call-overload]
225 ).first()
226 except Exception as exc:
227 return CheckResult(
228 name=name,
229 passed=False,
230 severity="error",
231 message=f"Could not query alembic_version: {exc}",
232 )
234 current = current_rev[0] if current_rev else None
236 if current == head_rev:
237 return CheckResult(
238 name=name,
239 passed=True,
240 severity="error",
241 message=f"Database schema is up to date (revision {current}).",
242 )
244 return CheckResult(
245 name=name,
246 passed=False,
247 severity="error",
248 message=(
249 f"Database schema is out of date: current={current!r}, "
250 f"head={head_rev!r}. Run 'nexuslims db upgrade' to apply pending "
251 "migrations."
252 ),
253 )
256def _check_instruments_exist() -> CheckResult:
257 """Check that at least one instrument is registered in the database."""
258 name = "instruments_exist"
260 try:
261 with DBSession(get_engine()) as session:
262 count = session.exec(
263 text("SELECT COUNT(*) FROM instruments") # type: ignore[call-overload]
264 ).first()
265 n = count[0] if count else 0
266 except Exception as exc:
267 return CheckResult(
268 name=name,
269 passed=False,
270 severity="warning",
271 message=f"Could not query instruments table: {exc}",
272 )
274 if n == 0:
275 return CheckResult(
276 name=name,
277 passed=False,
278 severity="warning",
279 message=(
280 "No instruments found in the database. "
281 "Add instruments with 'nexuslims instruments add' before building "
282 "records."
283 ),
284 )
286 return CheckResult(
287 name=name,
288 passed=True,
289 severity="warning",
290 message=f"Found {n} instrument(s) in the database.",
291 )
294def _check_instrument_filestore_paths() -> list[CheckResult]:
295 """Check each instrument filestore path exists under NX_INSTRUMENT_DATA_PATH."""
296 name = "instrument_filestore_paths"
297 base = Path(settings.NX_INSTRUMENT_DATA_PATH)
299 if not base.exists():
300 return [
301 CheckResult(
302 name=name,
303 passed=False,
304 severity="warning",
305 message=(
306 f"NX_INSTRUMENT_DATA_PATH ({base}) does not exist or is not "
307 "mounted. Instrument file searches will fail."
308 ),
309 )
310 ]
312 try:
313 with DBSession(get_engine()) as session:
314 instruments = session.exec(select(Instrument)).all()
315 except Exception as exc:
316 return [
317 CheckResult(
318 name=name,
319 passed=False,
320 severity="warning",
321 message=f"Could not query instruments for filestore path check: {exc}",
322 )
323 ]
325 if not instruments:
326 return [
327 CheckResult(
328 name=name,
329 passed=True,
330 severity="warning",
331 message="No instruments in DB; skipping filestore path check.",
332 )
333 ]
335 results = []
336 for instr in instruments:
337 path = base / instr.filestore_path
338 if not path.exists():
339 results.append(
340 CheckResult(
341 name=name,
342 passed=False,
343 severity="warning",
344 message=(
345 f"Instrument '{instr.instrument_pid}': filestore path "
346 f"{path} does not exist."
347 ),
348 )
349 )
351 if not results:
352 return [
353 CheckResult(
354 name=name,
355 passed=True,
356 severity="warning",
357 message=(
358 f"All {len(instruments)} instrument filestore path(s) exist "
359 f"under {base}."
360 ),
361 )
362 ]
364 return results
367def _check_instrument_harvesters() -> list[CheckResult]:
368 """Check that each instrument's harvester module is importable and complete."""
369 name = "instrument_harvesters"
371 try:
372 with DBSession(get_engine()) as session:
373 instruments = session.exec(select(Instrument)).all()
374 except Exception as exc:
375 return [
376 CheckResult(
377 name=name,
378 passed=False,
379 severity="error",
380 message=f"Could not query instruments for harvester check: {exc}",
381 )
382 ]
384 if not instruments:
385 return [
386 CheckResult(
387 name=name,
388 passed=True,
389 severity="error",
390 message="No instruments in DB; skipping harvester check.",
391 )
392 ]
394 # Group instruments by harvester name
395 harvester_to_instruments: dict[str, list[str]] = {}
396 for instr in instruments:
397 harvester_to_instruments.setdefault(instr.harvester, []).append(
398 instr.instrument_pid
399 )
401 results = []
402 for harvester_name, pids in harvester_to_instruments.items():
403 instrument_list = ", ".join(pids)
404 try:
405 module = import_module(f".{harvester_name}", "nexusLIMS.harvesters")
406 except ImportError as exc:
407 results.append(
408 CheckResult(
409 name=name,
410 passed=False,
411 severity="error",
412 message=(
413 f"Harvester module '{harvester_name}' cannot be imported: "
414 f"{exc}. Affected instruments: {instrument_list}."
415 ),
416 )
417 )
418 continue
420 if not hasattr(module, "res_event_from_session"):
421 results.append(
422 CheckResult(
423 name=name,
424 passed=False,
425 severity="error",
426 message=(
427 f"Harvester '{harvester_name}' is missing required function "
428 f"'res_event_from_session'. Affected instruments: "
429 f"{instrument_list}."
430 ),
431 )
432 )
433 else:
434 results.append(
435 CheckResult(
436 name=name,
437 passed=True,
438 severity="error",
439 message=(
440 f"Harvester '{harvester_name}' OK "
441 f"(instruments: {instrument_list})."
442 ),
443 )
444 )
446 return results
449def _check_instrument_timezones() -> list[CheckResult]:
450 """Check that each instrument's timezone string is a valid IANA timezone."""
451 name = "instrument_timezones"
453 try:
454 with DBSession(get_engine()) as session:
455 instruments = session.exec(select(Instrument)).all()
456 except Exception as exc:
457 return [
458 CheckResult(
459 name=name,
460 passed=False,
461 severity="warning",
462 message=f"Could not query instruments for timezone check: {exc}",
463 )
464 ]
466 if not instruments:
467 return [
468 CheckResult(
469 name=name,
470 passed=True,
471 severity="warning",
472 message="No instruments in DB; skipping timezone check.",
473 )
474 ]
476 # Group instruments by timezone string
477 tz_to_instruments: dict[str, list[str]] = {}
478 for instr in instruments:
479 tz_to_instruments.setdefault(instr.timezone_str, []).append(
480 instr.instrument_pid
481 )
483 results = []
484 for tz_str, pids in tz_to_instruments.items():
485 instrument_list = ", ".join(pids)
486 try:
487 pytz.timezone(tz_str)
488 except pytz.exceptions.UnknownTimeZoneError:
489 results.append(
490 CheckResult(
491 name=name,
492 passed=False,
493 severity="warning",
494 message=(
495 f"Unknown timezone '{tz_str}'. "
496 f"Affected instruments: {instrument_list}. "
497 "Use a valid IANA timezone (e.g., 'America/New_York')."
498 ),
499 )
500 )
501 else:
502 results.append(
503 CheckResult(
504 name=name,
505 passed=True,
506 severity="warning",
507 message=(
508 f"Timezone '{tz_str}' is valid "
509 f"(instruments: {instrument_list})."
510 ),
511 )
512 )
514 return results
517def _check_data_path_writable() -> CheckResult:
518 """Check that NX_DATA_PATH is writable by the current process."""
519 name = "data_path_writable"
520 data_path = Path(settings.NX_DATA_PATH)
522 if not os.access(data_path, os.W_OK):
523 return CheckResult(
524 name=name,
525 passed=False,
526 severity="error",
527 message=(
528 f"NX_DATA_PATH ({data_path}) is not writable. "
529 "Ensure the NexusLIMS process has write permission to this directory."
530 ),
531 )
533 return CheckResult(
534 name=name,
535 passed=True,
536 severity="error",
537 message=f"NX_DATA_PATH ({data_path}) is writable.",
538 )
541def _check_export_destinations() -> CheckResult:
542 """Check that at least one export destination is enabled and configured."""
543 name = "export_destinations"
545 try:
546 registry = get_registry()
547 enabled = registry.get_enabled_destinations()
548 except Exception as exc:
549 return CheckResult(
550 name=name,
551 passed=False,
552 severity="warning",
553 message=f"Could not discover export destinations: {exc}",
554 )
556 if not enabled:
557 return CheckResult(
558 name=name,
559 passed=False,
560 severity="warning",
561 message=(
562 "No export destinations are enabled. Built records will not be "
563 "uploaded anywhere. Configure at least one destination "
564 "(e.g., NX_CDCS_URL and NX_CDCS_TOKEN for CDCS)."
565 ),
566 )
568 failures = []
569 for dest in enabled:
570 try:
571 valid, err_msg = dest.validate_config()
572 except Exception as exc:
573 failures.append(f"{dest.name}: unexpected error: {exc}")
574 continue
575 if not valid:
576 failures.append(f"{dest.name}: {err_msg}")
578 if failures:
579 return CheckResult(
580 name=name,
581 passed=False,
582 severity="warning",
583 message=(
584 "Some export destinations have configuration issues "
585 "(transient network errors may be ignored): " + "; ".join(failures)
586 ),
587 )
589 dest_names = ", ".join(d.name for d in enabled)
590 return CheckResult(
591 name=name,
592 passed=True,
593 severity="warning",
594 message=f"Export destination(s) OK: {dest_names}.",
595 )
598def _check_nemo_harvester_config() -> list[CheckResult]:
599 """Check NEMO harvester config is present and each instance is reachable."""
600 name = "nemo_harvester_config"
602 try:
603 nemo_harvesters = settings.nemo_harvesters()
604 except Exception as exc:
605 return [
606 CheckResult(
607 name=name,
608 passed=False,
609 severity="warning",
610 message=f"Could not read NEMO harvester config: {exc}",
611 )
612 ]
614 if not nemo_harvesters:
615 # No harvesters configured — check if any instrument needs one
616 try:
617 with DBSession(get_engine()) as session:
618 nemo_instruments = session.exec(
619 select(Instrument).where(Instrument.harvester == "nemo")
620 ).all()
621 except Exception as exc:
622 return [
623 CheckResult(
624 name=name,
625 passed=False,
626 severity="warning",
627 message=(
628 f"No NEMO harvester config found, and could not query "
629 f"instruments: {exc}"
630 ),
631 )
632 ]
634 if not nemo_instruments:
635 return [
636 CheckResult(
637 name=name,
638 passed=True,
639 severity="warning",
640 message=(
641 "No NEMO harvester configured and no instruments use NEMO "
642 "harvester; skipping."
643 ),
644 )
645 ]
647 pids = ", ".join(i.instrument_pid for i in nemo_instruments)
648 return [
649 CheckResult(
650 name=name,
651 passed=False,
652 severity="warning",
653 message=(
654 f"No NEMO harvester configuration found "
655 f"(NX_NEMO_ADDRESS_N / NX_NEMO_TOKEN_N not set), but the "
656 f"following instruments use the NEMO harvester: {pids}. "
657 "Set NX_NEMO_ADDRESS_1 and NX_NEMO_TOKEN_1 (and _2, _3, … "
658 "for additional instances)."
659 ),
660 )
661 ]
663 # Harvesters are configured — probe each instance for reachability.
664 # Any HTTP response (including 4xx) counts as reachable; connection-level
665 # exceptions (refused, timeout, DNS) are retried with exponential backoff.
666 # After all retries are exhausted the check fails hard (severity="error").
667 probe_retries = 3 # 4 total attempts: delays of 1s, 2s, 4s between them
669 results = []
670 connectors = get_harvesters_enabled()
671 for i, connector in enumerate(connectors, start=1):
672 base_url = connector.config["base_url"]
673 token = connector.config["token"]
674 last_exc: Exception | None = None
676 for attempt in range(probe_retries + 1):
677 try:
678 resp = nexus_req(
679 base_url,
680 "GET",
681 token_auth=token,
682 retries=0,
683 timeout=10,
684 )
685 # Any HTTP response means the server is up
686 results.append(
687 CheckResult(
688 name=name,
689 passed=True,
690 severity="error",
691 message=(
692 f"NEMO instance {i} ({base_url}) is reachable "
693 f"(HTTP {resp.status_code})."
694 ),
695 )
696 )
697 last_exc = None
698 break
699 except Exception as exc:
700 last_exc = exc
701 if attempt < probe_retries:
702 delay = 2**attempt # 1s, 2s, 4s
703 _logger.debug(
704 "[preflight] NEMO instance %s unreachable (%s), "
705 "retrying in %ss (attempt %s/%s)",
706 i,
707 exc,
708 delay,
709 attempt + 1,
710 probe_retries + 1,
711 )
712 time.sleep(delay)
714 if last_exc is not None:
715 results.append(
716 CheckResult(
717 name=name,
718 passed=False,
719 severity="error",
720 message=(
721 f"NEMO instance {i} ({base_url}) is not reachable after "
722 f"{probe_retries + 1} attempts: {last_exc}. "
723 "Harvesting will fail for all instruments using this instance."
724 ),
725 )
726 )
728 return results
731# ---------------------------------------------------------------------------
732# Public entry point
733# ---------------------------------------------------------------------------
736def run_preflight_checks(*, dry_run: bool = False) -> list[CheckResult]:
737 """Run all preflight checks and return the results.
739 Checks are grouped into three categories:
741 * **Always-run single-result checks** — DB connectivity, table existence,
742 Alembic migration status, and instrument count.
743 * **Always-run multi-result checks** — per-instrument filestore paths,
744 harvester imports, timezone validity, and NEMO config presence.
745 * **Write-path checks (skipped in dry-run)** — data path writability and
746 export destination validation.
748 Every check is wrapped in ``try/except Exception`` so that an unexpected
749 failure in one check never prevents the remaining checks from running.
751 Parameters
752 ----------
753 dry_run
754 When ``True``, skip checks that require write access (checks 8 and 9).
756 Returns
757 -------
758 list[CheckResult]
759 All check results in the order they were executed.
760 """
761 results: list[CheckResult] = []
763 # --- Single-result checks (always run) ---
764 single_checks = [
765 (_check_db_reachable, "db_reachable", "error"),
766 (_check_db_tables, "db_tables", "error"),
767 (_check_alembic_migration, "alembic_migration", "error"),
768 (_check_instruments_exist, "instruments_exist", "warning"),
769 ]
770 for fn, fallback_name, fallback_severity in single_checks:
771 try:
772 results.append(fn())
773 except Exception as exc:
774 results.append(
775 CheckResult(
776 name=fallback_name,
777 passed=False,
778 severity=fallback_severity, # type: ignore[arg-type]
779 message=f"Unexpected error in check: {exc}",
780 )
781 )
783 # --- Multi-result checks (always run) ---
784 multi_checks = [
785 (_check_instrument_filestore_paths, "instrument_filestore_paths", "warning"),
786 (_check_instrument_harvesters, "instrument_harvesters", "error"),
787 (_check_instrument_timezones, "instrument_timezones", "warning"),
788 (_check_nemo_harvester_config, "nemo_harvester_config", "warning"),
789 ]
790 for fn, fallback_name, fallback_severity in multi_checks:
791 try:
792 results.extend(fn())
793 except Exception as exc:
794 results.append(
795 CheckResult(
796 name=fallback_name,
797 passed=False,
798 severity=fallback_severity, # type: ignore[arg-type]
799 message=f"Unexpected error in check: {exc}",
800 )
801 )
803 # --- Write-path checks (skipped in dry-run) ---
804 if not dry_run:
805 write_checks = [
806 (_check_data_path_writable, "data_path_writable", "error"),
807 (_check_export_destinations, "export_destinations", "warning"),
808 ]
809 for fn, fallback_name, fallback_severity in write_checks:
810 try:
811 results.append(fn())
812 except Exception as exc:
813 results.append(
814 CheckResult(
815 name=fallback_name,
816 passed=False,
817 severity=fallback_severity, # type: ignore[arg-type]
818 message=f"Unexpected error in check: {exc}",
819 )
820 )
822 return results