Coverage for nexusLIMS/db/migrations/versions/v2_4_0b_add_check_constraints.py: 100%
68 statements
« prev ^ index » next coverage.py v7.11.3, created at 2026-03-24 05:23 +0000
« prev ^ index » next coverage.py v7.11.3, created at 2026-03-24 05:23 +0000
1"""Add check constraints to session_log.
3Adds CHECK constraints to session_log table for event_type and record_status.
5This migration:
61. Adds CHECK constraint for event_type enum values
72. Adds CHECK constraint for record_status enum values (including BUILT_NOT_EXPORTED)
9For SQLite, this requires recreating the table since ALTER TABLE doesn't support
10adding CHECK constraints.
12Revision ID: v2_4_0b
13Revises: v2_4_0a
14Create Date: 2026-01-25 10:28:38.768026
16"""
18from typing import Sequence, Union
20import sqlalchemy as sa
21from alembic import op
23from nexusLIMS.db.migrations.utils import verify_table_integrity
25# revision identifiers, used by Alembic.
26revision: str = "v2_4_0b"
27down_revision: Union[str, Sequence[str], None] = "v2_4_0a"
28branch_labels: Union[str, Sequence[str], None] = None
29depends_on: Union[str, Sequence[str], None] = None
32def upgrade() -> None:
33 """Upgrade schema."""
34 # SQLite doesn't support adding CHECK constraints via ALTER TABLE,
35 # so we need to recreate the table
37 from alembic import context # noqa: PLC0415
39 connection = op.get_bind()
41 # Skip data verification in offline/SQL mode (when generating SQL scripts)
42 # In these modes, there's no actual database to query
43 is_offline = context.is_offline_mode()
45 if not is_offline:
46 # Collect baseline data for verification (instruments table should be unchanged)
47 result = connection.execute(sa.text("SELECT COUNT(*) FROM instruments"))
48 instruments_count = result.scalar()
50 # Collect session_log data for integrity verification
51 result = connection.execute(sa.text("SELECT COUNT(*) FROM session_log"))
52 session_log_count = result.scalar()
54 result = connection.execute(
55 sa.text("SELECT MIN(id_session_log), MAX(id_session_log) FROM session_log")
56 )
57 min_id, max_id = result.fetchone()
59 result = connection.execute(
60 sa.text(
61 "SELECT record_status, COUNT(*) FROM session_log "
62 "GROUP BY record_status ORDER BY record_status"
63 )
64 )
65 status_counts = dict(result.fetchall())
67 # Only show migration message if there's data to migrate
68 if session_log_count > 0:
69 print(f"→ Migrating {session_log_count} session logs...") # noqa: T201
70 else:
71 # In offline mode, set dummy values (won't be used)
72 instruments_count = 0
73 session_log_count = 0
74 min_id = None
75 max_id = None
76 status_counts = {}
78 # Step 1: Create new table with CHECK constraints (without index yet)
79 op.create_table(
80 "session_log_new",
81 sa.Column("id_session_log", sa.Integer(), nullable=False),
82 sa.Column("session_identifier", sa.String(length=36), nullable=False),
83 sa.Column("instrument", sa.String(length=100), nullable=False),
84 sa.Column("timestamp", sa.String(), nullable=False),
85 sa.Column("event_type", sa.String(length=17), nullable=False),
86 sa.Column("record_status", sa.String(length=18), nullable=False),
87 sa.Column("user", sa.String(length=50), nullable=True),
88 sa.CheckConstraint(
89 "event_type IN ('START', 'END', 'RECORD_GENERATION')",
90 name="check_event_type",
91 ),
92 sa.CheckConstraint(
93 "record_status IN ('COMPLETED', 'WAITING_FOR_END', 'TO_BE_BUILT', "
94 "'BUILT_NOT_EXPORTED', 'ERROR', 'NO_FILES_FOUND', 'NO_CONSENT', "
95 "'NO_RESERVATION')",
96 name="check_record_status",
97 ),
98 sa.ForeignKeyConstraint(
99 ["instrument"],
100 ["instruments.instrument_pid"],
101 ),
102 sa.PrimaryKeyConstraint("id_session_log"),
103 )
105 # Step 2: Copy data from old table to new table
106 op.execute(
107 """
108 INSERT INTO session_log_new (
109 id_session_log, session_identifier, instrument, timestamp,
110 event_type, record_status, user
111 )
112 SELECT
113 id_session_log, session_identifier, instrument, timestamp,
114 event_type, record_status, user
115 FROM session_log
116 """
117 )
119 # Step 2a: Verify data integrity before dropping old table
120 # Only verify if there's data to verify and we're not in offline/SQL mode
121 if not is_offline and (session_log_count > 0 or instruments_count > 0):
122 # Verify instruments unchanged
123 verify_table_integrity(connection, "instruments", instruments_count)
125 # Verify session_log data preserved
126 verify_table_integrity(
127 connection,
128 "session_log_new",
129 session_log_count,
130 expected_pk_range=(min_id, max_id),
131 expected_distribution=status_counts,
132 distribution_column="record_status",
133 pk_column="id_session_log",
134 )
135 print( # noqa: T201
136 f"✓ Data integrity verified: {instruments_count} instruments, "
137 f"{session_log_count} session logs preserved"
138 )
140 # Step 3: Drop any indexes from old table
141 # Use IF EXISTS to handle both v1.4.3 databases (different index names)
142 # and databases created via migration 001 (which have
143 # ix_session_log_session_identifier)
144 op.execute("DROP INDEX IF EXISTS ix_session_log_session_identifier")
145 op.execute('DROP INDEX IF EXISTS "session_log.fk_instrument_idx"')
147 # Step 4: Drop old table
148 op.drop_table("session_log")
150 # Step 5: Rename new table to original name
151 op.rename_table("session_log_new", "session_log")
153 # Step 6: Create index on the renamed table (after old one is dropped)
154 op.create_index(
155 "ix_session_log_session_identifier",
156 "session_log",
157 ["session_identifier"],
158 unique=False,
159 )
162def downgrade() -> None:
163 """Downgrade schema."""
164 # Recreate table without CHECK constraints
166 from alembic import context # noqa: PLC0415
168 connection = op.get_bind()
170 # Skip data verification in offline/SQL mode
171 is_offline = context.is_offline_mode()
173 if not is_offline:
174 # Collect baseline data for verification
175 result = connection.execute(sa.text("SELECT COUNT(*) FROM instruments"))
176 instruments_count = result.scalar()
178 # Collect session_log data for integrity verification
179 result = connection.execute(sa.text("SELECT COUNT(*) FROM session_log"))
180 session_log_count = result.scalar()
182 result = connection.execute(
183 sa.text("SELECT MIN(id_session_log), MAX(id_session_log) FROM session_log")
184 )
185 min_id, max_id = result.fetchone()
187 result = connection.execute(
188 sa.text(
189 "SELECT record_status, COUNT(*) FROM session_log "
190 "GROUP BY record_status ORDER BY record_status"
191 )
192 )
193 status_counts = dict(result.fetchall())
195 # Only show downgrade message if there's data to downgrade
196 if session_log_count > 0:
197 print(f"→ Downgrading {session_log_count} session logs...") # noqa: T201
198 else:
199 # In offline mode, set dummy values
200 instruments_count = 0
201 session_log_count = 0
202 min_id = None
203 max_id = None
204 status_counts = {}
206 # Step 1: Create table without CHECK constraints (without index yet)
207 op.create_table(
208 "session_log_old",
209 sa.Column("id_session_log", sa.Integer(), nullable=False),
210 sa.Column("session_identifier", sa.String(length=36), nullable=False),
211 sa.Column("instrument", sa.String(length=100), nullable=False),
212 sa.Column("timestamp", sa.String(), nullable=False),
213 sa.Column("event_type", sa.String(length=17), nullable=False),
214 sa.Column("record_status", sa.String(length=18), nullable=False),
215 sa.Column("user", sa.String(length=50), nullable=True),
216 sa.ForeignKeyConstraint(
217 ["instrument"],
218 ["instruments.instrument_pid"],
219 ),
220 sa.PrimaryKeyConstraint("id_session_log"),
221 )
223 # Step 2: Copy data
224 op.execute(
225 """
226 INSERT INTO session_log_old (
227 id_session_log, session_identifier, instrument, timestamp,
228 event_type, record_status, user
229 )
230 SELECT
231 id_session_log, session_identifier, instrument, timestamp,
232 event_type, record_status, user
233 FROM session_log
234 """
235 )
237 # Step 2a: Verify data integrity before dropping current table
238 # Only verify if there's data to verify and we're not in offline/SQL mode
239 if not is_offline and (session_log_count > 0 or instruments_count > 0):
240 # Verify instruments unchanged
241 verify_table_integrity(connection, "instruments", instruments_count)
243 # Verify session_log data preserved
244 verify_table_integrity(
245 connection,
246 "session_log_old",
247 session_log_count,
248 expected_pk_range=(min_id, max_id),
249 expected_distribution=status_counts,
250 distribution_column="record_status",
251 pk_column="id_session_log",
252 )
253 print( # noqa: T201
254 f"✓ Data integrity verified: {instruments_count} instruments, "
255 f"{session_log_count} session logs preserved"
256 )
258 # Step 3: Drop index from current table
259 # Use IF EXISTS to handle different database states
260 op.execute("DROP INDEX IF EXISTS ix_session_log_session_identifier")
261 op.execute('DROP INDEX IF EXISTS "session_log.fk_instrument_idx"')
263 # Step 4: Drop new table
264 op.drop_table("session_log")
266 # Step 5: Rename old table
267 op.rename_table("session_log_old", "session_log")
269 # Step 6: Create index on renamed table
270 op.create_index(
271 "ix_session_log_session_identifier",
272 "session_log",
273 ["session_identifier"],
274 unique=False,
275 )