Files
HighCostDrugsDemo/data_processing/migrate.py
T
Andrew Charlwood 19607d72b0 feat: add chart_type column to pathway_nodes schema (Task 2.2)
- Add chart_type column (TEXT NOT NULL DEFAULT 'directory')
- Update UNIQUE constraint to (date_filter_id, chart_type, ids)
- Add idx_pathway_nodes_chart_type index for filtering
- Add migrate_pathway_nodes_chart_type() function for existing databases
- Update initialize_database() to run migration automatically
- Existing rows default to 'directory' chart type
2026-02-05 14:24:57 +00:00

609 lines
20 KiB
Python

"""
Database migration script for NHS High-Cost Drug Patient Pathway Analysis Tool.
Provides functions to initialize the SQLite database schema and CLI interface
for running migrations from the command line.
Usage:
# Initialize database (creates all tables)
python -m data_processing.migrate
# Drop existing tables and reinitialize
python -m data_processing.migrate --drop-existing
# Show current database status
python -m data_processing.migrate --status
# Migrate all reference data from CSV files
python -m data_processing.migrate --reference-data
# Migrate reference data with verification
python -m data_processing.migrate --reference-data --verify
"""
import argparse
import sys
from pathlib import Path
from typing import Optional
from core.logging_config import setup_logging, get_logger
from data_processing.database import DatabaseManager, DatabaseConfig
from core import PathConfig, default_paths
from data_processing.schema import (
create_all_tables,
drop_all_tables,
verify_all_tables_exist,
get_all_table_counts,
migrate_pathway_nodes_chart_type,
)
from data_processing.reference_data import (
MigrationResult,
migrate_drug_names,
migrate_organizations,
migrate_directories,
migrate_drug_directory_map,
migrate_drug_indication_clusters,
verify_drug_names_migration,
verify_organizations_migration,
verify_directories_migration,
verify_drug_directory_map_migration,
verify_drug_indication_clusters_migration,
)
from data_processing.patient_data import (
load_patient_data,
refresh_patient_treatment_summary,
get_patient_data_stats,
verify_mv_consistency,
)
logger = get_logger(__name__)
def initialize_database(
db_manager: Optional[DatabaseManager] = None,
drop_existing: bool = False,
confirm_drop: bool = True
) -> bool:
"""
Initialize the database with all required tables.
Creates all tables defined in the schema (reference tables, fact tables,
materialized views, and file tracking tables). Uses IF NOT EXISTS so
safe to run multiple times.
Args:
db_manager: DatabaseManager instance. Uses default if not provided.
drop_existing: If True, drops all existing tables before creating.
confirm_drop: If True and drop_existing=True, prompts for confirmation.
Set to False for non-interactive use.
Returns:
True if initialization succeeded, False otherwise.
"""
if db_manager is None:
db_manager = DatabaseManager()
logger.info(f"Initializing database at: {db_manager.db_path}")
# Handle drop existing with confirmation
if drop_existing:
if confirm_drop:
print(f"\nWARNING: This will delete ALL data from the database:")
print(f" {db_manager.db_path}\n")
response = input("Are you sure you want to continue? (yes/no): ")
if response.lower() not in ("yes", "y"):
print("Operation cancelled.")
return False
if db_manager.exists:
logger.warning("Dropping existing tables...")
with db_manager.get_connection() as conn:
drop_all_tables(conn)
conn.commit()
logger.info("Existing tables dropped")
else:
logger.info("Database does not exist yet, nothing to drop")
# Create all tables
try:
with db_manager.get_transaction() as conn:
create_all_tables(conn)
except Exception as e:
logger.error(f"Failed to create tables: {e}")
return False
# Run migrations for schema changes
try:
with db_manager.get_connection() as conn:
# Add chart_type column to pathway_nodes if it doesn't exist
success, msg = migrate_pathway_nodes_chart_type(conn)
if success:
logger.info(f"pathway_nodes migration: {msg}")
else:
logger.error(f"pathway_nodes migration failed: {msg}")
return False
except Exception as e:
logger.error(f"Migration failed: {e}")
return False
# Verify all tables were created
with db_manager.get_connection() as conn:
missing = verify_all_tables_exist(conn)
if missing:
logger.error(f"Table creation failed. Missing tables: {missing}")
return False
logger.info("All tables created successfully")
return True
def migrate_all_reference_data(
db_manager: Optional[DatabaseManager] = None,
paths: Optional[PathConfig] = None,
verify: bool = False
) -> tuple[bool, list[MigrationResult]]:
"""
Run all reference data migrations from CSV files to SQLite tables.
Migrations are run in order:
1. Drug names (drugnames.csv → ref_drug_names)
2. Organizations (org_codes.csv → ref_organizations)
3. Directories (directory_list.csv → ref_directories)
4. Drug-directory mappings (drug_directory_list.csv → ref_drug_directory_map)
Args:
db_manager: DatabaseManager instance. Uses default if not provided.
paths: PathConfig instance for locating CSV files. Uses default if not provided.
verify: If True, runs verification after each migration.
Returns:
Tuple of (all_success: bool, results: list of MigrationResult)
"""
if db_manager is None:
db_manager = DatabaseManager()
if paths is None:
paths = default_paths
results: list[MigrationResult] = []
all_success = True
# Define migrations in order
# Note: drug_indication_clusters uses a different signature (csv_path instead of paths)
migrations = [
("Drug names", migrate_drug_names, verify_drug_names_migration if verify else None, True),
("Organizations", migrate_organizations, verify_organizations_migration if verify else None, True),
("Directories", migrate_directories, verify_directories_migration if verify else None, True),
("Drug-directory map", migrate_drug_directory_map, verify_drug_directory_map_migration if verify else None, True),
("Drug indication clusters", migrate_drug_indication_clusters, verify_drug_indication_clusters_migration if verify else None, False),
]
logger.info(f"Starting reference data migrations ({len(migrations)} tables)")
for name, migrate_fn, verify_fn, uses_paths in migrations:
logger.info(f"Migrating: {name}...")
# Run migration (some use paths parameter, some use csv_path)
if uses_paths:
result = migrate_fn(db_manager=db_manager, paths=paths) # type: ignore[operator]
else:
# Drug indication clusters uses csv_path instead of paths
result = migrate_fn(db_manager=db_manager) # type: ignore[operator]
results.append(result)
if not result.success:
logger.error(f"Migration failed: {name} - {result.error_message}")
all_success = False
continue
logger.info(f" {result}")
# Run verification if requested
if verify_fn is not None:
logger.info(f" Verifying {name}...")
if uses_paths:
verified, verify_msg = verify_fn(db_manager=db_manager, paths=paths) # type: ignore[call-arg]
else:
verified, verify_msg = verify_fn(db_manager=db_manager) # type: ignore[call-arg]
if verified:
logger.info(f" OK: {verify_msg}")
else:
logger.error(f" FAILED: Verification failed: {verify_msg}")
all_success = False
# Summary
successful = sum(1 for r in results if r.success)
logger.info(f"Reference data migrations complete: {successful}/{len(results)} succeeded")
return all_success, results
def print_migration_summary(results: list[MigrationResult]) -> None:
"""Print a summary of migration results to stdout."""
print("\n=== Reference Data Migration Summary ===\n")
for result in results:
status = "[OK]" if result.success else "[FAILED]"
print(f"{status} {result.table_name}")
if result.success:
print(f" Read: {result.rows_read}, Inserted: {result.rows_inserted}, Skipped: {result.rows_skipped}")
else:
print(f" Error: {result.error_message}")
successful = sum(1 for r in results if r.success)
print(f"\nTotal: {successful}/{len(results)} migrations succeeded")
print()
def create_progress_reporter(description: str = "Loading", width: int = 40):
"""
Create a progress callback that prints a progress bar to stdout.
Args:
description: Label to show before the progress bar.
width: Width of the progress bar in characters.
Returns:
Callback function(current, total) that prints progress.
"""
last_percent = [-1] # Use list to allow mutation in closure
def report_progress(current: int, total: int) -> None:
"""Print a progress bar showing current/total progress."""
if total == 0:
percent = 100
else:
percent = int(100 * current / total)
# Only update display when percentage changes (avoid excessive output)
if percent == last_percent[0]:
return
last_percent[0] = percent
filled = int(width * current / total) if total > 0 else width
bar = "=" * filled + "-" * (width - filled)
# Use carriage return to overwrite the line
sys.stdout.write(f"\r{description}: [{bar}] {percent:3d}% ({current:,}/{total:,})")
sys.stdout.flush()
# Print newline when complete
if current >= total:
print()
return report_progress
def load_patient_data_cli(
file_path: Path,
db_manager: Optional[DatabaseManager] = None,
paths: Optional[PathConfig] = None,
force: bool = False,
refresh_mv: bool = True
) -> bool:
"""
Load patient data from file with CLI progress reporting.
Args:
file_path: Path to CSV or Parquet file.
db_manager: DatabaseManager instance. Uses default if not provided.
paths: PathConfig for reference data. Uses default if not provided.
force: If True, re-process even if file hash matches.
refresh_mv: If True, refresh the materialized view after loading.
Returns:
True if loading succeeded, False otherwise.
"""
if db_manager is None:
db_manager = DatabaseManager()
if paths is None:
paths = default_paths
print(f"\n=== Loading Patient Data ===\n")
print(f"File: {file_path}")
# Check file exists
if not file_path.exists():
print(f"ERROR: File not found: {file_path}")
return False
# Calculate and display file info
file_size_mb = file_path.stat().st_size / (1024 * 1024)
print(f"Size: {file_size_mb:.1f} MB")
print()
# Create progress callback
progress_callback = create_progress_reporter("Loading rows", width=40)
# Load the data
result = load_patient_data(
file_path=file_path,
db_manager=db_manager,
paths=paths,
batch_size=5000,
force=force,
progress_callback=progress_callback
)
# Print result
print()
if result.was_already_processed:
print("File already processed (same hash). Skipping.")
print(f"Use --force to re-process.")
elif result.success:
print(f"Loaded {result.rows_inserted:,} rows in {result.load_time_seconds:.1f}s")
if result.rows_skipped > 0:
print(f"Skipped {result.rows_skipped:,} rows (missing UPID or date)")
else:
print(f"FAILED: {result.error_message}")
return False
# Refresh materialized view if requested
if refresh_mv and result.success and not result.was_already_processed:
print()
print("Refreshing materialized view...")
mv_progress = create_progress_reporter("Processing patients", width=40)
mv_result = refresh_patient_treatment_summary(
db_manager=db_manager,
progress_callback=mv_progress
)
if mv_result.success:
print(f"MV refreshed: {mv_result.patients_processed:,} patients in {mv_result.refresh_time_seconds:.1f}s")
# Verify consistency
consistent, msg = verify_mv_consistency(db_manager)
if consistent:
print(f"MV verification: OK")
else:
print(f"MV verification: FAILED - {msg}")
else:
print(f"MV refresh FAILED: {mv_result.error_message}")
# Print summary statistics
print()
print("=== Patient Data Summary ===")
stats = get_patient_data_stats(db_manager)
print(f" Total rows: {stats['total_rows']:,}")
print(f" Unique patients: {stats['unique_patients']:,}")
print(f" Unique drugs: {stats['unique_drugs']:,}")
print(f" Unique organizations: {stats['unique_organizations']:,}")
if stats['date_range'][0] and stats['date_range'][1]:
print(f" Date range: {stats['date_range'][0]} to {stats['date_range'][1]}")
print()
return result.success
def get_database_status(db_manager: Optional[DatabaseManager] = None) -> dict:
"""
Get the current status of the database.
Returns:
Dictionary with database status information:
- exists: Whether the database file exists
- path: Path to the database file
- size_bytes: Size of database file (if exists)
- tables: Dictionary of table names to row counts
- missing_tables: List of expected tables that don't exist
"""
if db_manager is None:
db_manager = DatabaseManager()
status = {
"exists": db_manager.exists,
"path": str(db_manager.db_path),
"size_bytes": None,
"tables": {},
"missing_tables": [],
}
if db_manager.exists:
status["size_bytes"] = db_manager.db_path.stat().st_size
with db_manager.get_connection() as conn:
status["missing_tables"] = verify_all_tables_exist(conn)
# Get counts for existing tables
try:
status["tables"] = get_all_table_counts(conn)
except Exception as e:
logger.warning(f"Could not get table counts: {e}")
return status
def print_database_status(db_manager: Optional[DatabaseManager] = None) -> None:
"""Print database status to stdout in a human-readable format."""
status = get_database_status(db_manager)
print("\n=== Database Status ===\n")
print(f"Path: {status['path']}")
print(f"Exists: {status['exists']}")
if status["exists"]:
size_kb = (status["size_bytes"] or 0) / 1024
print(f"Size: {size_kb:.1f} KB")
if status["missing_tables"]:
print(f"\nMissing tables: {', '.join(status['missing_tables'])}")
else:
print("\nAll expected tables exist.")
if status["tables"]:
print("\nTable row counts:")
for table, count in sorted(status["tables"].items()):
print(f" {table}: {count:,} rows")
else:
print("\nDatabase does not exist. Run migration to create it.")
print()
def main():
"""CLI entry point for database migration."""
parser = argparse.ArgumentParser(
description="Initialize NHS Pathways Analysis SQLite database schema",
formatter_class=argparse.RawDescriptionHelpFormatter,
epilog="""
Examples:
python -m data_processing.migrate # Initialize database
python -m data_processing.migrate --status # Show database status
python -m data_processing.migrate --drop-existing # Reset database
python -m data_processing.migrate --reference-data # Migrate reference data
python -m data_processing.migrate --reference-data --verify # With verification
python -m data_processing.migrate --load-patient-data data.parquet # Load patient data
python -m data_processing.migrate --load-patient-data data.csv --force # Force reload
python -m data_processing.migrate --db-path ./data/test.db # Custom path
"""
)
parser.add_argument(
"--status",
action="store_true",
help="Show current database status and exit"
)
parser.add_argument(
"--drop-existing",
action="store_true",
help="Drop all existing tables before creating (WARNING: deletes data)"
)
parser.add_argument(
"--reference-data",
action="store_true",
help="Migrate all reference data from CSV files to SQLite tables"
)
parser.add_argument(
"--verify",
action="store_true",
help="Verify migrated data matches CSV sources (use with --reference-data)"
)
parser.add_argument(
"--db-path",
type=Path,
help="Path to database file (default: ./data/pathways.db)"
)
parser.add_argument(
"--yes", "-y",
action="store_true",
help="Skip confirmation prompts (for non-interactive use)"
)
parser.add_argument(
"--verbose", "-v",
action="store_true",
help="Enable verbose logging"
)
parser.add_argument(
"--load-patient-data",
type=Path,
metavar="FILE",
help="Load patient data from CSV or Parquet file with progress reporting"
)
parser.add_argument(
"--force",
action="store_true",
help="Force re-processing even if file hash matches (use with --load-patient-data)"
)
parser.add_argument(
"--no-refresh-mv",
action="store_true",
help="Skip materialized view refresh after loading (use with --load-patient-data)"
)
args = parser.parse_args()
# Set up logging
log_level = "DEBUG" if args.verbose else "INFO"
setup_logging(level=log_level, simple_console=True)
# Create database manager with optional custom path
if args.db_path:
config = DatabaseConfig(db_path=args.db_path)
db_manager = DatabaseManager(config)
else:
db_manager = DatabaseManager()
# Handle --status
if args.status:
print_database_status(db_manager)
return 0
# Validate configuration
config_errors = db_manager.config.validate()
if config_errors:
for error in config_errors:
logger.error(error)
return 1
# Handle --reference-data (migrate reference data from CSV to SQLite)
if args.reference_data:
# Ensure database exists with tables first
if not db_manager.exists:
print("Database does not exist. Initializing schema first...")
success = initialize_database(db_manager=db_manager)
if not success:
print("\nDatabase initialization failed. Check logs for details.")
return 1
# Run reference data migrations
success, results = migrate_all_reference_data(
db_manager=db_manager,
paths=default_paths,
verify=args.verify
)
print_migration_summary(results)
print_database_status(db_manager)
if success:
print("Reference data migration completed successfully.")
return 0
else:
print("Reference data migration completed with errors. Check logs for details.")
return 1
# Handle --load-patient-data (load patient data from CSV/Parquet)
if args.load_patient_data:
# Ensure database exists with tables first
if not db_manager.exists:
print("Database does not exist. Initializing schema first...")
success = initialize_database(db_manager=db_manager)
if not success:
print("\nDatabase initialization failed. Check logs for details.")
return 1
# Load patient data with progress reporting
success = load_patient_data_cli(
file_path=args.load_patient_data,
db_manager=db_manager,
paths=default_paths,
force=args.force,
refresh_mv=not args.no_refresh_mv
)
if success:
print("Patient data load completed successfully.")
return 0
else:
print("Patient data load failed. Check logs for details.")
return 1
# Run schema migration (default behavior)
success = initialize_database(
db_manager=db_manager,
drop_existing=args.drop_existing,
confirm_drop=not args.yes
)
if success:
print("\nDatabase initialized successfully.")
print_database_status(db_manager)
return 0
else:
print("\nDatabase initialization failed. Check logs for details.")
return 1
if __name__ == "__main__":
sys.exit(main())