Coverage for nexusLIMS/db/migrations/utils.py: 100%

30 statements  

« prev     ^ index     » next       coverage.py v7.11.3, created at 2026-03-24 05:23 +0000

1"""Utilities for database migrations. 

2 

3Provides helper functions for data integrity verification and backup creation 

4that can be used by migration scripts. 

5""" 

6 

7import shutil 

8from datetime import datetime 

9from pathlib import Path 

10 

11import sqlalchemy as sa 

12 

13 

14def create_backup(connection) -> Path: 

15 """Create timestamped backup of database before migration. 

16 

17 Parameters 

18 ---------- 

19 connection 

20 SQLAlchemy connection to get database path from 

21 

22 Returns 

23 ------- 

24 pathlib.Path 

25 Path to the backup file 

26 

27 Examples 

28 -------- 

29 >>> from alembic import op 

30 >>> from nexusLIMS.migrations.utils import create_backup 

31 >>> def upgrade(): 

32 ... connection = op.get_bind() 

33 ... create_backup(connection) 

34 ... # ... perform migration ... 

35 """ 

36 # Get database path from connection 

37 db_path = Path(connection.engine.url.database) 

38 

39 # Create backup with timestamp 

40 timestamp = datetime.now().strftime("%Y%m%d_%H%M%S") # noqa: DTZ005 

41 backup_path = db_path.parent / f"{db_path.stem}_backup_{timestamp}{db_path.suffix}" 

42 

43 # Copy database file 

44 shutil.copy2(db_path, backup_path) 

45 

46 print(f"✓ Database backup created: {backup_path}") # noqa: T201 

47 return backup_path 

48 

49 

50def verify_table_integrity( # noqa: PLR0913 

51 connection, 

52 table_name: str, 

53 expected_count: int, 

54 expected_pk_range: tuple[int, int] | None = None, 

55 expected_distribution: dict | None = None, 

56 distribution_column: str | None = None, 

57 pk_column: str = "id", 

58): 

59 """Verify table data was preserved during migration. 

60 

61 Parameters 

62 ---------- 

63 connection 

64 SQLAlchemy connection for queries 

65 table_name : str 

66 Name of the table to verify 

67 expected_count : int 

68 Expected number of rows 

69 expected_pk_range : tuple[int, int] | None 

70 Expected (min, max) primary key values 

71 expected_distribution : dict | None 

72 Expected distribution of values in a column (e.g., status counts) 

73 distribution_column : str | None 

74 Column name for distribution check 

75 pk_column : str 

76 Primary key column name (default: "id") 

77 

78 Raises 

79 ------ 

80 RuntimeError 

81 If data integrity checks fail 

82 

83 Examples 

84 -------- 

85 >>> from alembic import op 

86 >>> from nexusLIMS.migrations.utils import verify_table_integrity 

87 >>> def upgrade(): 

88 ... connection = op.get_bind() 

89 ... # Before migration: collect baseline 

90 ... result = connection.execute(sa.text("SELECT COUNT(*) FROM my_table")) 

91 ... count = result.scalar() 

92 ... # After migration: verify 

93 ... verify_table_integrity(connection, "my_table_new", count) 

94 """ 

95 # Count rows 

96 result = connection.execute( 

97 sa.text(f"SELECT COUNT(*) FROM {table_name}") # noqa: S608 

98 ) 

99 actual_count = result.scalar() 

100 

101 if actual_count != expected_count: 

102 msg = ( 

103 f"Data integrity check FAILED for {table_name}: " 

104 f"Row count mismatch! Expected: {expected_count}, Actual: {actual_count}" 

105 ) 

106 raise RuntimeError(msg) 

107 

108 # Verify primary key range if provided 

109 if expected_pk_range is not None: 

110 result = connection.execute( 

111 sa.text( 

112 f"SELECT MIN({pk_column}), MAX({pk_column}) FROM {table_name}" # noqa: S608 

113 ) 

114 ) 

115 min_pk, max_pk = result.fetchone() 

116 

117 if (min_pk, max_pk) != expected_pk_range: 

118 msg = ( 

119 f"Data integrity check FAILED for {table_name}: " 

120 f"Primary key range mismatch! " 

121 f"Expected: {expected_pk_range}, Actual: ({min_pk}, {max_pk})" 

122 ) 

123 raise RuntimeError(msg) 

124 

125 # Verify distribution if provided 

126 if expected_distribution is not None and distribution_column is not None: 

127 result = connection.execute( 

128 sa.text( 

129 f"SELECT {distribution_column}, COUNT(*) FROM {table_name} " # noqa: S608 

130 f"GROUP BY {distribution_column} ORDER BY {distribution_column}" 

131 ) 

132 ) 

133 actual_distribution = dict(result.fetchall()) 

134 

135 if actual_distribution != expected_distribution: 

136 msg = ( 

137 f"Data integrity check FAILED for {table_name}: " 

138 f"Distribution mismatch in {distribution_column}! " 

139 f"Expected: {expected_distribution}, Actual: {actual_distribution}" 

140 ) 

141 raise RuntimeError(msg) 

142 

143 print( # noqa: T201 

144 f"✓ Data integrity verified for {table_name}: {actual_count} rows preserved" 

145 )