Source code for nexusLIMS.db.migrations.utils
"""Utilities for database migrations.
Provides helper functions for data integrity verification and backup creation
that can be used by migration scripts.
"""
import shutil
from datetime import datetime
from pathlib import Path
import sqlalchemy as sa
[docs]
def create_backup(connection) -> Path:
"""Create timestamped backup of database before migration.
Parameters
----------
connection
SQLAlchemy connection to get database path from
Returns
-------
pathlib.Path
Path to the backup file
Examples
--------
>>> from alembic import op
>>> from nexusLIMS.migrations.utils import create_backup
>>> def upgrade():
... connection = op.get_bind()
... create_backup(connection)
... # ... perform migration ...
"""
# Get database path from connection
db_path = Path(connection.engine.url.database)
# Create backup with timestamp
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S") # noqa: DTZ005
backup_path = db_path.parent / f"{db_path.stem}_backup_{timestamp}{db_path.suffix}"
# Copy database file
shutil.copy2(db_path, backup_path)
print(f"✓ Database backup created: {backup_path}") # noqa: T201
return backup_path
[docs]
def verify_table_integrity( # noqa: PLR0913
connection,
table_name: str,
expected_count: int,
expected_pk_range: tuple[int, int] | None = None,
expected_distribution: dict | None = None,
distribution_column: str | None = None,
pk_column: str = "id",
):
"""Verify table data was preserved during migration.
Parameters
----------
connection
SQLAlchemy connection for queries
table_name : str
Name of the table to verify
expected_count : int
Expected number of rows
expected_pk_range : tuple[int, int] | None
Expected (min, max) primary key values
expected_distribution : dict | None
Expected distribution of values in a column (e.g., status counts)
distribution_column : str | None
Column name for distribution check
pk_column : str
Primary key column name (default: "id")
Raises
------
RuntimeError
If data integrity checks fail
Examples
--------
>>> from alembic import op
>>> from nexusLIMS.migrations.utils import verify_table_integrity
>>> def upgrade():
... connection = op.get_bind()
... # Before migration: collect baseline
... result = connection.execute(sa.text("SELECT COUNT(*) FROM my_table"))
... count = result.scalar()
... # After migration: verify
... verify_table_integrity(connection, "my_table_new", count)
"""
# Count rows
result = connection.execute(
sa.text(f"SELECT COUNT(*) FROM {table_name}") # noqa: S608
)
actual_count = result.scalar()
if actual_count != expected_count:
msg = (
f"Data integrity check FAILED for {table_name}: "
f"Row count mismatch! Expected: {expected_count}, Actual: {actual_count}"
)
raise RuntimeError(msg)
# Verify primary key range if provided
if expected_pk_range is not None:
result = connection.execute(
sa.text(
f"SELECT MIN({pk_column}), MAX({pk_column}) FROM {table_name}" # noqa: S608
)
)
min_pk, max_pk = result.fetchone()
if (min_pk, max_pk) != expected_pk_range:
msg = (
f"Data integrity check FAILED for {table_name}: "
f"Primary key range mismatch! "
f"Expected: {expected_pk_range}, Actual: ({min_pk}, {max_pk})"
)
raise RuntimeError(msg)
# Verify distribution if provided
if expected_distribution is not None and distribution_column is not None:
result = connection.execute(
sa.text(
f"SELECT {distribution_column}, COUNT(*) FROM {table_name} " # noqa: S608
f"GROUP BY {distribution_column} ORDER BY {distribution_column}"
)
)
actual_distribution = dict(result.fetchall())
if actual_distribution != expected_distribution:
msg = (
f"Data integrity check FAILED for {table_name}: "
f"Distribution mismatch in {distribution_column}! "
f"Expected: {expected_distribution}, Actual: {actual_distribution}"
)
raise RuntimeError(msg)
print( # noqa: T201
f"✓ Data integrity verified for {table_name}: {actual_count} rows preserved"
)