Coverage for nexusLIMS/db/migrations/versions/v2_4_0b_add_check_constraints.py: 100%

68 statements  

« prev     ^ index     » next       coverage.py v7.11.3, created at 2026-03-24 05:23 +0000

1"""Add check constraints to session_log. 

2 

3Adds CHECK constraints to session_log table for event_type and record_status. 

4 

5This migration: 

61. Adds CHECK constraint for event_type enum values 

72. Adds CHECK constraint for record_status enum values (including BUILT_NOT_EXPORTED) 

8 

9For SQLite, this requires recreating the table since ALTER TABLE doesn't support 

10adding CHECK constraints. 

11 

12Revision ID: v2_4_0b 

13Revises: v2_4_0a 

14Create Date: 2026-01-25 10:28:38.768026 

15 

16""" 

17 

18from typing import Sequence, Union 

19 

20import sqlalchemy as sa 

21from alembic import op 

22 

23from nexusLIMS.db.migrations.utils import verify_table_integrity 

24 

25# revision identifiers, used by Alembic. 

26revision: str = "v2_4_0b" 

27down_revision: Union[str, Sequence[str], None] = "v2_4_0a" 

28branch_labels: Union[str, Sequence[str], None] = None 

29depends_on: Union[str, Sequence[str], None] = None 

30 

31 

32def upgrade() -> None: 

33 """Upgrade schema.""" 

34 # SQLite doesn't support adding CHECK constraints via ALTER TABLE, 

35 # so we need to recreate the table 

36 

37 from alembic import context # noqa: PLC0415 

38 

39 connection = op.get_bind() 

40 

41 # Skip data verification in offline/SQL mode (when generating SQL scripts) 

42 # In these modes, there's no actual database to query 

43 is_offline = context.is_offline_mode() 

44 

45 if not is_offline: 

46 # Collect baseline data for verification (instruments table should be unchanged) 

47 result = connection.execute(sa.text("SELECT COUNT(*) FROM instruments")) 

48 instruments_count = result.scalar() 

49 

50 # Collect session_log data for integrity verification 

51 result = connection.execute(sa.text("SELECT COUNT(*) FROM session_log")) 

52 session_log_count = result.scalar() 

53 

54 result = connection.execute( 

55 sa.text("SELECT MIN(id_session_log), MAX(id_session_log) FROM session_log") 

56 ) 

57 min_id, max_id = result.fetchone() 

58 

59 result = connection.execute( 

60 sa.text( 

61 "SELECT record_status, COUNT(*) FROM session_log " 

62 "GROUP BY record_status ORDER BY record_status" 

63 ) 

64 ) 

65 status_counts = dict(result.fetchall()) 

66 

67 # Only show migration message if there's data to migrate 

68 if session_log_count > 0: 

69 print(f"→ Migrating {session_log_count} session logs...") # noqa: T201 

70 else: 

71 # In offline mode, set dummy values (won't be used) 

72 instruments_count = 0 

73 session_log_count = 0 

74 min_id = None 

75 max_id = None 

76 status_counts = {} 

77 

78 # Step 1: Create new table with CHECK constraints (without index yet) 

79 op.create_table( 

80 "session_log_new", 

81 sa.Column("id_session_log", sa.Integer(), nullable=False), 

82 sa.Column("session_identifier", sa.String(length=36), nullable=False), 

83 sa.Column("instrument", sa.String(length=100), nullable=False), 

84 sa.Column("timestamp", sa.String(), nullable=False), 

85 sa.Column("event_type", sa.String(length=17), nullable=False), 

86 sa.Column("record_status", sa.String(length=18), nullable=False), 

87 sa.Column("user", sa.String(length=50), nullable=True), 

88 sa.CheckConstraint( 

89 "event_type IN ('START', 'END', 'RECORD_GENERATION')", 

90 name="check_event_type", 

91 ), 

92 sa.CheckConstraint( 

93 "record_status IN ('COMPLETED', 'WAITING_FOR_END', 'TO_BE_BUILT', " 

94 "'BUILT_NOT_EXPORTED', 'ERROR', 'NO_FILES_FOUND', 'NO_CONSENT', " 

95 "'NO_RESERVATION')", 

96 name="check_record_status", 

97 ), 

98 sa.ForeignKeyConstraint( 

99 ["instrument"], 

100 ["instruments.instrument_pid"], 

101 ), 

102 sa.PrimaryKeyConstraint("id_session_log"), 

103 ) 

104 

105 # Step 2: Copy data from old table to new table 

106 op.execute( 

107 """ 

108 INSERT INTO session_log_new ( 

109 id_session_log, session_identifier, instrument, timestamp, 

110 event_type, record_status, user 

111 ) 

112 SELECT 

113 id_session_log, session_identifier, instrument, timestamp, 

114 event_type, record_status, user 

115 FROM session_log 

116 """ 

117 ) 

118 

119 # Step 2a: Verify data integrity before dropping old table 

120 # Only verify if there's data to verify and we're not in offline/SQL mode 

121 if not is_offline and (session_log_count > 0 or instruments_count > 0): 

122 # Verify instruments unchanged 

123 verify_table_integrity(connection, "instruments", instruments_count) 

124 

125 # Verify session_log data preserved 

126 verify_table_integrity( 

127 connection, 

128 "session_log_new", 

129 session_log_count, 

130 expected_pk_range=(min_id, max_id), 

131 expected_distribution=status_counts, 

132 distribution_column="record_status", 

133 pk_column="id_session_log", 

134 ) 

135 print( # noqa: T201 

136 f"✓ Data integrity verified: {instruments_count} instruments, " 

137 f"{session_log_count} session logs preserved" 

138 ) 

139 

140 # Step 3: Drop any indexes from old table 

141 # Use IF EXISTS to handle both v1.4.3 databases (different index names) 

142 # and databases created via migration 001 (which have 

143 # ix_session_log_session_identifier) 

144 op.execute("DROP INDEX IF EXISTS ix_session_log_session_identifier") 

145 op.execute('DROP INDEX IF EXISTS "session_log.fk_instrument_idx"') 

146 

147 # Step 4: Drop old table 

148 op.drop_table("session_log") 

149 

150 # Step 5: Rename new table to original name 

151 op.rename_table("session_log_new", "session_log") 

152 

153 # Step 6: Create index on the renamed table (after old one is dropped) 

154 op.create_index( 

155 "ix_session_log_session_identifier", 

156 "session_log", 

157 ["session_identifier"], 

158 unique=False, 

159 ) 

160 

161 

162def downgrade() -> None: 

163 """Downgrade schema.""" 

164 # Recreate table without CHECK constraints 

165 

166 from alembic import context # noqa: PLC0415 

167 

168 connection = op.get_bind() 

169 

170 # Skip data verification in offline/SQL mode 

171 is_offline = context.is_offline_mode() 

172 

173 if not is_offline: 

174 # Collect baseline data for verification 

175 result = connection.execute(sa.text("SELECT COUNT(*) FROM instruments")) 

176 instruments_count = result.scalar() 

177 

178 # Collect session_log data for integrity verification 

179 result = connection.execute(sa.text("SELECT COUNT(*) FROM session_log")) 

180 session_log_count = result.scalar() 

181 

182 result = connection.execute( 

183 sa.text("SELECT MIN(id_session_log), MAX(id_session_log) FROM session_log") 

184 ) 

185 min_id, max_id = result.fetchone() 

186 

187 result = connection.execute( 

188 sa.text( 

189 "SELECT record_status, COUNT(*) FROM session_log " 

190 "GROUP BY record_status ORDER BY record_status" 

191 ) 

192 ) 

193 status_counts = dict(result.fetchall()) 

194 

195 # Only show downgrade message if there's data to downgrade 

196 if session_log_count > 0: 

197 print(f"→ Downgrading {session_log_count} session logs...") # noqa: T201 

198 else: 

199 # In offline mode, set dummy values 

200 instruments_count = 0 

201 session_log_count = 0 

202 min_id = None 

203 max_id = None 

204 status_counts = {} 

205 

206 # Step 1: Create table without CHECK constraints (without index yet) 

207 op.create_table( 

208 "session_log_old", 

209 sa.Column("id_session_log", sa.Integer(), nullable=False), 

210 sa.Column("session_identifier", sa.String(length=36), nullable=False), 

211 sa.Column("instrument", sa.String(length=100), nullable=False), 

212 sa.Column("timestamp", sa.String(), nullable=False), 

213 sa.Column("event_type", sa.String(length=17), nullable=False), 

214 sa.Column("record_status", sa.String(length=18), nullable=False), 

215 sa.Column("user", sa.String(length=50), nullable=True), 

216 sa.ForeignKeyConstraint( 

217 ["instrument"], 

218 ["instruments.instrument_pid"], 

219 ), 

220 sa.PrimaryKeyConstraint("id_session_log"), 

221 ) 

222 

223 # Step 2: Copy data 

224 op.execute( 

225 """ 

226 INSERT INTO session_log_old ( 

227 id_session_log, session_identifier, instrument, timestamp, 

228 event_type, record_status, user 

229 ) 

230 SELECT 

231 id_session_log, session_identifier, instrument, timestamp, 

232 event_type, record_status, user 

233 FROM session_log 

234 """ 

235 ) 

236 

237 # Step 2a: Verify data integrity before dropping current table 

238 # Only verify if there's data to verify and we're not in offline/SQL mode 

239 if not is_offline and (session_log_count > 0 or instruments_count > 0): 

240 # Verify instruments unchanged 

241 verify_table_integrity(connection, "instruments", instruments_count) 

242 

243 # Verify session_log data preserved 

244 verify_table_integrity( 

245 connection, 

246 "session_log_old", 

247 session_log_count, 

248 expected_pk_range=(min_id, max_id), 

249 expected_distribution=status_counts, 

250 distribution_column="record_status", 

251 pk_column="id_session_log", 

252 ) 

253 print( # noqa: T201 

254 f"✓ Data integrity verified: {instruments_count} instruments, " 

255 f"{session_log_count} session logs preserved" 

256 ) 

257 

258 # Step 3: Drop index from current table 

259 # Use IF EXISTS to handle different database states 

260 op.execute("DROP INDEX IF EXISTS ix_session_log_session_identifier") 

261 op.execute('DROP INDEX IF EXISTS "session_log.fk_instrument_idx"') 

262 

263 # Step 4: Drop new table 

264 op.drop_table("session_log") 

265 

266 # Step 5: Rename old table 

267 op.rename_table("session_log_old", "session_log") 

268 

269 # Step 6: Create index on renamed table 

270 op.create_index( 

271 "ix_session_log_session_identifier", 

272 "session_log", 

273 ["session_identifier"], 

274 unique=False, 

275 )