Source code for nexusLIMS.db.migrations.versions.v2_4_0b_add_check_constraints

"""Add check constraints to session_log.

Adds CHECK constraints to session_log table for event_type and record_status.

This migration:
1. Adds CHECK constraint for event_type enum values
2. Adds CHECK constraint for record_status enum values (including BUILT_NOT_EXPORTED)

For SQLite, this requires recreating the table since ALTER TABLE doesn't support
adding CHECK constraints.

Revision ID: v2_4_0b
Revises: v2_4_0a
Create Date: 2026-01-25 10:28:38.768026

"""

from typing import Sequence, Union

import sqlalchemy as sa
from alembic import op

from nexusLIMS.db.migrations.utils import verify_table_integrity

# revision identifiers, used by Alembic.
revision: str = "v2_4_0b"
down_revision: Union[str, Sequence[str], None] = "v2_4_0a"
branch_labels: Union[str, Sequence[str], None] = None
depends_on: Union[str, Sequence[str], None] = None


[docs] def upgrade() -> None: """Upgrade schema.""" # SQLite doesn't support adding CHECK constraints via ALTER TABLE, # so we need to recreate the table from alembic import context # noqa: PLC0415 connection = op.get_bind() # Skip data verification in offline/SQL mode (when generating SQL scripts) # In these modes, there's no actual database to query is_offline = context.is_offline_mode() if not is_offline: # Collect baseline data for verification (instruments table should be unchanged) result = connection.execute(sa.text("SELECT COUNT(*) FROM instruments")) instruments_count = result.scalar() # Collect session_log data for integrity verification result = connection.execute(sa.text("SELECT COUNT(*) FROM session_log")) session_log_count = result.scalar() result = connection.execute( sa.text("SELECT MIN(id_session_log), MAX(id_session_log) FROM session_log") ) min_id, max_id = result.fetchone() result = connection.execute( sa.text( "SELECT record_status, COUNT(*) FROM session_log " "GROUP BY record_status ORDER BY record_status" ) ) status_counts = dict(result.fetchall()) # Only show migration message if there's data to migrate if session_log_count > 0: print(f"→ Migrating {session_log_count} session logs...") # noqa: T201 else: # In offline mode, set dummy values (won't be used) instruments_count = 0 session_log_count = 0 min_id = None max_id = None status_counts = {} # Step 1: Create new table with CHECK constraints (without index yet) op.create_table( "session_log_new", sa.Column("id_session_log", sa.Integer(), nullable=False), sa.Column("session_identifier", sa.String(length=36), nullable=False), sa.Column("instrument", sa.String(length=100), nullable=False), sa.Column("timestamp", sa.String(), nullable=False), sa.Column("event_type", sa.String(length=17), nullable=False), sa.Column("record_status", sa.String(length=18), nullable=False), sa.Column("user", sa.String(length=50), nullable=True), sa.CheckConstraint( "event_type IN ('START', 'END', 'RECORD_GENERATION')", name="check_event_type", ), sa.CheckConstraint( "record_status IN ('COMPLETED', 'WAITING_FOR_END', 'TO_BE_BUILT', " "'BUILT_NOT_EXPORTED', 'ERROR', 'NO_FILES_FOUND', 'NO_CONSENT', " "'NO_RESERVATION')", name="check_record_status", ), sa.ForeignKeyConstraint( ["instrument"], ["instruments.instrument_pid"], ), sa.PrimaryKeyConstraint("id_session_log"), ) # Step 2: Copy data from old table to new table op.execute( """ INSERT INTO session_log_new ( id_session_log, session_identifier, instrument, timestamp, event_type, record_status, user ) SELECT id_session_log, session_identifier, instrument, timestamp, event_type, record_status, user FROM session_log """ ) # Step 2a: Verify data integrity before dropping old table # Only verify if there's data to verify and we're not in offline/SQL mode if not is_offline and (session_log_count > 0 or instruments_count > 0): # Verify instruments unchanged verify_table_integrity(connection, "instruments", instruments_count) # Verify session_log data preserved verify_table_integrity( connection, "session_log_new", session_log_count, expected_pk_range=(min_id, max_id), expected_distribution=status_counts, distribution_column="record_status", pk_column="id_session_log", ) print( # noqa: T201 f"✓ Data integrity verified: {instruments_count} instruments, " f"{session_log_count} session logs preserved" ) # Step 3: Drop any indexes from old table # Use IF EXISTS to handle both v1.4.3 databases (different index names) # and databases created via migration 001 (which have # ix_session_log_session_identifier) op.execute("DROP INDEX IF EXISTS ix_session_log_session_identifier") op.execute('DROP INDEX IF EXISTS "session_log.fk_instrument_idx"') # Step 4: Drop old table op.drop_table("session_log") # Step 5: Rename new table to original name op.rename_table("session_log_new", "session_log") # Step 6: Create index on the renamed table (after old one is dropped) op.create_index( "ix_session_log_session_identifier", "session_log", ["session_identifier"], unique=False, )
[docs] def downgrade() -> None: """Downgrade schema.""" # Recreate table without CHECK constraints from alembic import context # noqa: PLC0415 connection = op.get_bind() # Skip data verification in offline/SQL mode is_offline = context.is_offline_mode() if not is_offline: # Collect baseline data for verification result = connection.execute(sa.text("SELECT COUNT(*) FROM instruments")) instruments_count = result.scalar() # Collect session_log data for integrity verification result = connection.execute(sa.text("SELECT COUNT(*) FROM session_log")) session_log_count = result.scalar() result = connection.execute( sa.text("SELECT MIN(id_session_log), MAX(id_session_log) FROM session_log") ) min_id, max_id = result.fetchone() result = connection.execute( sa.text( "SELECT record_status, COUNT(*) FROM session_log " "GROUP BY record_status ORDER BY record_status" ) ) status_counts = dict(result.fetchall()) # Only show downgrade message if there's data to downgrade if session_log_count > 0: print(f"→ Downgrading {session_log_count} session logs...") # noqa: T201 else: # In offline mode, set dummy values instruments_count = 0 session_log_count = 0 min_id = None max_id = None status_counts = {} # Step 1: Create table without CHECK constraints (without index yet) op.create_table( "session_log_old", sa.Column("id_session_log", sa.Integer(), nullable=False), sa.Column("session_identifier", sa.String(length=36), nullable=False), sa.Column("instrument", sa.String(length=100), nullable=False), sa.Column("timestamp", sa.String(), nullable=False), sa.Column("event_type", sa.String(length=17), nullable=False), sa.Column("record_status", sa.String(length=18), nullable=False), sa.Column("user", sa.String(length=50), nullable=True), sa.ForeignKeyConstraint( ["instrument"], ["instruments.instrument_pid"], ), sa.PrimaryKeyConstraint("id_session_log"), ) # Step 2: Copy data op.execute( """ INSERT INTO session_log_old ( id_session_log, session_identifier, instrument, timestamp, event_type, record_status, user ) SELECT id_session_log, session_identifier, instrument, timestamp, event_type, record_status, user FROM session_log """ ) # Step 2a: Verify data integrity before dropping current table # Only verify if there's data to verify and we're not in offline/SQL mode if not is_offline and (session_log_count > 0 or instruments_count > 0): # Verify instruments unchanged verify_table_integrity(connection, "instruments", instruments_count) # Verify session_log data preserved verify_table_integrity( connection, "session_log_old", session_log_count, expected_pk_range=(min_id, max_id), expected_distribution=status_counts, distribution_column="record_status", pk_column="id_session_log", ) print( # noqa: T201 f"✓ Data integrity verified: {instruments_count} instruments, " f"{session_log_count} session logs preserved" ) # Step 3: Drop index from current table # Use IF EXISTS to handle different database states op.execute("DROP INDEX IF EXISTS ix_session_log_session_identifier") op.execute('DROP INDEX IF EXISTS "session_log.fk_instrument_idx"') # Step 4: Drop new table op.drop_table("session_log") # Step 5: Rename old table op.rename_table("session_log_old", "session_log") # Step 6: Create index on renamed table op.create_index( "ix_session_log_session_identifier", "session_log", ["session_identifier"], unique=False, )