Files

470 lines
16 KiB
Python

"""
Database migration script for NHS High-Cost Drug Patient Pathway Analysis Tool.
Provides functions to initialize the SQLite database schema and CLI interface
for running migrations from the command line.
Usage:
# Initialize database (creates all tables)
python -m data_processing.migrate
# Drop existing tables and reinitialize
python -m data_processing.migrate --drop-existing
# Show current database status
python -m data_processing.migrate --status
# Migrate all reference data from CSV files
python -m data_processing.migrate --reference-data
# Migrate reference data with verification
python -m data_processing.migrate --reference-data --verify
"""
import argparse
import sys
from pathlib import Path
from typing import Optional
# Ensure src/ is on sys.path when run as `python -m data_processing.migrate`
_src_dir = str(Path(__file__).resolve().parent.parent)
if _src_dir not in sys.path:
sys.path.insert(0, _src_dir)
from core.logging_config import setup_logging, get_logger
from data_processing.database import DatabaseManager, DatabaseConfig
from core import PathConfig, default_paths
from data_processing.schema import (
create_all_tables,
drop_all_tables,
verify_all_tables_exist,
get_all_table_counts,
migrate_pathway_nodes_chart_type,
migrate_refresh_log_source_row_count,
)
from data_processing.reference_data import (
MigrationResult,
migrate_drug_names,
migrate_organizations,
migrate_directories,
migrate_drug_directory_map,
migrate_drug_indication_clusters,
verify_drug_names_migration,
verify_organizations_migration,
verify_directories_migration,
verify_drug_directory_map_migration,
verify_drug_indication_clusters_migration,
)
logger = get_logger(__name__)
def initialize_database(
db_manager: Optional[DatabaseManager] = None,
drop_existing: bool = False,
confirm_drop: bool = True
) -> bool:
"""
Initialize the database with all required tables.
Creates all tables defined in the schema (reference tables and pathway
tables). Uses IF NOT EXISTS so safe to run multiple times.
Args:
db_manager: DatabaseManager instance. Uses default if not provided.
drop_existing: If True, drops all existing tables before creating.
confirm_drop: If True and drop_existing=True, prompts for confirmation.
Set to False for non-interactive use.
Returns:
True if initialization succeeded, False otherwise.
"""
if db_manager is None:
db_manager = DatabaseManager()
logger.info(f"Initializing database at: {db_manager.db_path}")
# Handle drop existing with confirmation
if drop_existing:
if confirm_drop:
print(f"\nWARNING: This will delete ALL data from the database:")
print(f" {db_manager.db_path}\n")
response = input("Are you sure you want to continue? (yes/no): ")
if response.lower() not in ("yes", "y"):
print("Operation cancelled.")
return False
if db_manager.exists:
logger.warning("Dropping existing tables...")
with db_manager.get_connection() as conn:
drop_all_tables(conn)
conn.commit()
logger.info("Existing tables dropped")
else:
logger.info("Database does not exist yet, nothing to drop")
# Create all tables
try:
with db_manager.get_transaction() as conn:
create_all_tables(conn)
except Exception as e:
logger.error(f"Failed to create tables: {e}")
return False
# Run migrations for schema changes
try:
with db_manager.get_connection() as conn:
# Add chart_type column to pathway_nodes if it doesn't exist
success, msg = migrate_pathway_nodes_chart_type(conn)
if success:
logger.info(f"pathway_nodes migration: {msg}")
else:
logger.error(f"pathway_nodes migration failed: {msg}")
return False
# Add source_row_count column to pathway_refresh_log if it doesn't exist
success, msg = migrate_refresh_log_source_row_count(conn)
if success:
logger.info(f"pathway_refresh_log migration: {msg}")
else:
logger.error(f"pathway_refresh_log migration failed: {msg}")
return False
except Exception as e:
logger.error(f"Migration failed: {e}")
return False
# Verify all tables were created
with db_manager.get_connection() as conn:
missing = verify_all_tables_exist(conn)
if missing:
logger.error(f"Table creation failed. Missing tables: {missing}")
return False
logger.info("All tables created successfully")
return True
def migrate_all_reference_data(
db_manager: Optional[DatabaseManager] = None,
paths: Optional[PathConfig] = None,
verify: bool = False
) -> tuple[bool, list[MigrationResult]]:
"""
Run all reference data migrations from CSV files to SQLite tables.
Migrations are run in order:
1. Drug names (drugnames.csv → ref_drug_names)
2. Organizations (org_codes.csv → ref_organizations)
3. Directories (directory_list.csv → ref_directories)
4. Drug-directory mappings (drug_directory_list.csv → ref_drug_directory_map)
Args:
db_manager: DatabaseManager instance. Uses default if not provided.
paths: PathConfig instance for locating CSV files. Uses default if not provided.
verify: If True, runs verification after each migration.
Returns:
Tuple of (all_success: bool, results: list of MigrationResult)
"""
if db_manager is None:
db_manager = DatabaseManager()
if paths is None:
paths = default_paths
results: list[MigrationResult] = []
all_success = True
# Define migrations in order
# Note: drug_indication_clusters uses a different signature (csv_path instead of paths)
migrations = [
("Drug names", migrate_drug_names, verify_drug_names_migration if verify else None, True),
("Organizations", migrate_organizations, verify_organizations_migration if verify else None, True),
("Directories", migrate_directories, verify_directories_migration if verify else None, True),
("Drug-directory map", migrate_drug_directory_map, verify_drug_directory_map_migration if verify else None, True),
("Drug indication clusters", migrate_drug_indication_clusters, verify_drug_indication_clusters_migration if verify else None, False),
]
logger.info(f"Starting reference data migrations ({len(migrations)} tables)")
for name, migrate_fn, verify_fn, uses_paths in migrations:
logger.info(f"Migrating: {name}...")
# Run migration (some use paths parameter, some use csv_path)
if uses_paths:
result = migrate_fn(db_manager=db_manager, paths=paths) # type: ignore[operator]
else:
# Drug indication clusters uses csv_path instead of paths
result = migrate_fn(db_manager=db_manager) # type: ignore[operator]
results.append(result)
if not result.success:
logger.error(f"Migration failed: {name} - {result.error_message}")
all_success = False
continue
logger.info(f" {result}")
# Run verification if requested
if verify_fn is not None:
logger.info(f" Verifying {name}...")
if uses_paths:
verified, verify_msg = verify_fn(db_manager=db_manager, paths=paths) # type: ignore[call-arg]
else:
verified, verify_msg = verify_fn(db_manager=db_manager) # type: ignore[call-arg]
if verified:
logger.info(f" OK: {verify_msg}")
else:
logger.error(f" FAILED: Verification failed: {verify_msg}")
all_success = False
# Summary
successful = sum(1 for r in results if r.success)
logger.info(f"Reference data migrations complete: {successful}/{len(results)} succeeded")
return all_success, results
def print_migration_summary(results: list[MigrationResult]) -> None:
"""Print a summary of migration results to stdout."""
print("\n=== Reference Data Migration Summary ===\n")
for result in results:
status = "[OK]" if result.success else "[FAILED]"
print(f"{status} {result.table_name}")
if result.success:
print(f" Read: {result.rows_read}, Inserted: {result.rows_inserted}, Skipped: {result.rows_skipped}")
else:
print(f" Error: {result.error_message}")
successful = sum(1 for r in results if r.success)
print(f"\nTotal: {successful}/{len(results)} migrations succeeded")
print()
def create_progress_reporter(description: str = "Loading", width: int = 40):
"""
Create a progress callback that prints a progress bar to stdout.
Args:
description: Label to show before the progress bar.
width: Width of the progress bar in characters.
Returns:
Callback function(current, total) that prints progress.
"""
last_percent = [-1] # Use list to allow mutation in closure
def report_progress(current: int, total: int) -> None:
"""Print a progress bar showing current/total progress."""
if total == 0:
percent = 100
else:
percent = int(100 * current / total)
# Only update display when percentage changes (avoid excessive output)
if percent == last_percent[0]:
return
last_percent[0] = percent
filled = int(width * current / total) if total > 0 else width
bar = "=" * filled + "-" * (width - filled)
# Use carriage return to overwrite the line
sys.stdout.write(f"\r{description}: [{bar}] {percent:3d}% ({current:,}/{total:,})")
sys.stdout.flush()
# Print newline when complete
if current >= total:
print()
return report_progress
def get_database_status(db_manager: Optional[DatabaseManager] = None) -> dict:
"""
Get the current status of the database.
Returns:
Dictionary with database status information:
- exists: Whether the database file exists
- path: Path to the database file
- size_bytes: Size of database file (if exists)
- tables: Dictionary of table names to row counts
- missing_tables: List of expected tables that don't exist
"""
if db_manager is None:
db_manager = DatabaseManager()
status = {
"exists": db_manager.exists,
"path": str(db_manager.db_path),
"size_bytes": None,
"tables": {},
"missing_tables": [],
}
if db_manager.exists:
status["size_bytes"] = db_manager.db_path.stat().st_size
with db_manager.get_connection() as conn:
status["missing_tables"] = verify_all_tables_exist(conn)
# Get counts for existing tables
try:
status["tables"] = get_all_table_counts(conn)
except Exception as e:
logger.warning(f"Could not get table counts: {e}")
return status
def print_database_status(db_manager: Optional[DatabaseManager] = None) -> None:
"""Print database status to stdout in a human-readable format."""
status = get_database_status(db_manager)
print("\n=== Database Status ===\n")
print(f"Path: {status['path']}")
print(f"Exists: {status['exists']}")
if status["exists"]:
size_kb = (status["size_bytes"] or 0) / 1024
print(f"Size: {size_kb:.1f} KB")
if status["missing_tables"]:
print(f"\nMissing tables: {', '.join(status['missing_tables'])}")
else:
print("\nAll expected tables exist.")
if status["tables"]:
print("\nTable row counts:")
for table, count in sorted(status["tables"].items()):
print(f" {table}: {count:,} rows")
else:
print("\nDatabase does not exist. Run migration to create it.")
print()
def main():
"""CLI entry point for database migration."""
parser = argparse.ArgumentParser(
description="Initialize NHS Pathways Analysis SQLite database schema",
formatter_class=argparse.RawDescriptionHelpFormatter,
epilog="""
Examples:
python -m data_processing.migrate # Initialize database
python -m data_processing.migrate --status # Show database status
python -m data_processing.migrate --drop-existing # Reset database
python -m data_processing.migrate --reference-data # Migrate reference data
python -m data_processing.migrate --reference-data --verify # With verification
python -m data_processing.migrate --db-path ./data/test.db # Custom path
"""
)
parser.add_argument(
"--status",
action="store_true",
help="Show current database status and exit"
)
parser.add_argument(
"--drop-existing",
action="store_true",
help="Drop all existing tables before creating (WARNING: deletes data)"
)
parser.add_argument(
"--reference-data",
action="store_true",
help="Migrate all reference data from CSV files to SQLite tables"
)
parser.add_argument(
"--verify",
action="store_true",
help="Verify migrated data matches CSV sources (use with --reference-data)"
)
parser.add_argument(
"--db-path",
type=Path,
help="Path to database file (default: ./data/pathways.db)"
)
parser.add_argument(
"--yes", "-y",
action="store_true",
help="Skip confirmation prompts (for non-interactive use)"
)
parser.add_argument(
"--verbose", "-v",
action="store_true",
help="Enable verbose logging"
)
args = parser.parse_args()
# Set up logging
log_level = "DEBUG" if args.verbose else "INFO"
setup_logging(level=log_level, simple_console=True)
# Create database manager with optional custom path
if args.db_path:
config = DatabaseConfig(db_path=args.db_path)
db_manager = DatabaseManager(config)
else:
db_manager = DatabaseManager()
# Handle --status
if args.status:
print_database_status(db_manager)
return 0
# Validate configuration
config_errors = db_manager.config.validate()
if config_errors:
for error in config_errors:
logger.error(error)
return 1
# Handle --reference-data (migrate reference data from CSV to SQLite)
if args.reference_data:
# Ensure database exists with tables first
if not db_manager.exists:
print("Database does not exist. Initializing schema first...")
success = initialize_database(db_manager=db_manager)
if not success:
print("\nDatabase initialization failed. Check logs for details.")
return 1
# Run reference data migrations
success, results = migrate_all_reference_data(
db_manager=db_manager,
paths=default_paths,
verify=args.verify
)
print_migration_summary(results)
print_database_status(db_manager)
if success:
print("Reference data migration completed successfully.")
return 0
else:
print("Reference data migration completed with errors. Check logs for details.")
return 1
# Run schema migration (default behavior)
success = initialize_database(
db_manager=db_manager,
drop_existing=args.drop_existing,
confirm_drop=not args.yes
)
if success:
print("\nDatabase initialized successfully.")
print_database_status(db_manager)
return 0
else:
print("\nDatabase initialization failed. Check logs for details.")
return 1
if __name__ == "__main__":
sys.exit(main())