Files

1193 lines
39 KiB
Python

"""
Reference data migration functions for NHS High-Cost Drug Patient Pathway Analysis Tool.
Provides functions to migrate reference data from CSV files to SQLite tables:
- drugnames.csv → ref_drug_names
- org_codes.csv → ref_organizations
- directory_list.csv → ref_directories
- drug_directory_list.csv → ref_drug_directory_map
Each migration function:
- Reads source CSV file
- Validates data format
- Inserts into SQLite table (INSERT OR IGNORE for duplicates)
- Returns statistics about the migration
"""
import csv
import sqlite3
from dataclasses import dataclass
from pathlib import Path
from typing import Optional
from core import PathConfig, default_paths
from core.logging_config import get_logger
from data_processing.database import DatabaseManager
logger = get_logger(__name__)
def _read_csv_with_fallback_encoding(filepath: Path) -> list[list[str]]:
"""
Read a CSV file with encoding fallback.
Tries UTF-8 first, falls back to latin-1 for Windows files with special characters.
Args:
filepath: Path to the CSV file.
Returns:
List of rows (each row is a list of strings).
"""
encodings = ['utf-8-sig', 'utf-8', 'latin-1', 'cp1252']
for encoding in encodings:
try:
with open(filepath, 'r', encoding=encoding) as f:
reader = csv.reader(f)
return list(reader)
except UnicodeDecodeError:
continue
# If all encodings fail, try latin-1 with errors='replace'
with open(filepath, 'r', encoding='latin-1', errors='replace') as f:
reader = csv.reader(f)
return list(reader)
@dataclass
class MigrationResult:
"""Results from a reference data migration."""
table_name: str
source_file: str
rows_read: int
rows_inserted: int
rows_skipped: int
success: bool
error_message: Optional[str] = None
def __str__(self) -> str:
if self.success:
return (
f"{self.table_name}: Read {self.rows_read} rows from {self.source_file}, "
f"inserted {self.rows_inserted}, skipped {self.rows_skipped} duplicates"
)
else:
return f"{self.table_name}: FAILED - {self.error_message}"
def migrate_drug_names(
db_manager: Optional[DatabaseManager] = None,
paths: Optional[PathConfig] = None
) -> MigrationResult:
"""
Migrate drug names from CSV to SQLite ref_drug_names table.
Source file format (no header):
raw_name,standard_name
Example rows:
ABATACEPT,ABATACEPT
ABATACEPT 250MG POWDER FOR...,ABATACEPT
Args:
db_manager: DatabaseManager instance. Uses default if not provided.
paths: PathConfig instance for locating CSV file. Uses default if not provided.
Returns:
MigrationResult with statistics about the migration.
"""
if db_manager is None:
db_manager = DatabaseManager()
if paths is None:
paths = default_paths
source_file = paths.drugnames_csv
table_name = "ref_drug_names"
logger.info(f"Migrating drug names from {source_file} to {table_name}")
# Validate source file exists
if not source_file.exists():
error_msg = f"Source file not found: {source_file}"
logger.error(error_msg)
return MigrationResult(
table_name=table_name,
source_file=str(source_file),
rows_read=0,
rows_inserted=0,
rows_skipped=0,
success=False,
error_message=error_msg
)
rows_read = 0
rows_inserted = 0
rows_skipped = 0
try:
with db_manager.get_transaction() as conn:
# Read CSV (no header) with encoding fallback
rows = _read_csv_with_fallback_encoding(source_file)
for row in rows:
rows_read += 1
# Validate row format
if len(row) < 2:
logger.warning(f"Skipping malformed row {rows_read}: {row}")
rows_skipped += 1
continue
raw_name = row[0].strip()
standard_name = row[1].strip()
# Skip empty rows
if not raw_name or not standard_name:
logger.warning(f"Skipping row {rows_read} with empty values: {row}")
rows_skipped += 1
continue
# Insert with conflict handling (IGNORE duplicates)
cursor = conn.execute(
"""
INSERT OR IGNORE INTO ref_drug_names (raw_name, standard_name)
VALUES (?, ?)
""",
(raw_name, standard_name)
)
if cursor.rowcount > 0:
rows_inserted += 1
else:
rows_skipped += 1
logger.info(
f"Drug names migration complete: {rows_read} read, "
f"{rows_inserted} inserted, {rows_skipped} skipped"
)
return MigrationResult(
table_name=table_name,
source_file=str(source_file),
rows_read=rows_read,
rows_inserted=rows_inserted,
rows_skipped=rows_skipped,
success=True
)
except Exception as e:
error_msg = f"Migration failed: {e}"
logger.error(error_msg)
return MigrationResult(
table_name=table_name,
source_file=str(source_file),
rows_read=rows_read,
rows_inserted=0,
rows_skipped=0,
success=False,
error_message=error_msg
)
def get_drug_name_counts(conn: sqlite3.Connection) -> dict:
"""
Get statistics about the ref_drug_names table.
Returns:
Dictionary with:
- total_mappings: Total rows in table
- unique_standard_names: Count of distinct standard names
"""
cursor = conn.execute("SELECT COUNT(*) FROM ref_drug_names")
total = cursor.fetchone()[0]
cursor = conn.execute("SELECT COUNT(DISTINCT standard_name) FROM ref_drug_names")
unique_standard = cursor.fetchone()[0]
return {
"total_mappings": total,
"unique_standard_names": unique_standard
}
def migrate_organizations(
db_manager: Optional[DatabaseManager] = None,
paths: Optional[PathConfig] = None
) -> MigrationResult:
"""
Migrate organization codes from CSV to SQLite ref_organizations table.
Source file format (with header):
Name,Code
Example rows:
MANCHESTER UNIVERSITY NHS FOUNDATION TRUST,R0A
BARTS HEALTH NHS TRUST,R1H
Note: The CSV has Name first, then Code. We store as org_code (unique), org_name.
Args:
db_manager: DatabaseManager instance. Uses default if not provided.
paths: PathConfig instance for locating CSV file. Uses default if not provided.
Returns:
MigrationResult with statistics about the migration.
"""
if db_manager is None:
db_manager = DatabaseManager()
if paths is None:
paths = default_paths
source_file = paths.org_codes_csv
table_name = "ref_organizations"
logger.info(f"Migrating organizations from {source_file} to {table_name}")
# Validate source file exists
if not source_file.exists():
error_msg = f"Source file not found: {source_file}"
logger.error(error_msg)
return MigrationResult(
table_name=table_name,
source_file=str(source_file),
rows_read=0,
rows_inserted=0,
rows_skipped=0,
success=False,
error_message=error_msg
)
rows_read = 0
rows_inserted = 0
rows_skipped = 0
try:
with db_manager.get_transaction() as conn:
# Read CSV with encoding fallback
rows = _read_csv_with_fallback_encoding(source_file)
for i, row in enumerate(rows):
# Skip header row
if i == 0 and len(row) >= 2 and row[0].strip().lower() == 'name':
logger.debug("Skipping header row")
continue
rows_read += 1
# Validate row format
if len(row) < 2:
logger.warning(f"Skipping malformed row {rows_read}: {row}")
rows_skipped += 1
continue
org_name = row[0].strip()
org_code = row[1].strip()
# Skip empty rows
if not org_name or not org_code:
logger.warning(f"Skipping row {rows_read} with empty values: {row}")
rows_skipped += 1
continue
# Insert with conflict handling (IGNORE duplicates on org_code)
cursor = conn.execute(
"""
INSERT OR IGNORE INTO ref_organizations (org_code, org_name)
VALUES (?, ?)
""",
(org_code, org_name)
)
if cursor.rowcount > 0:
rows_inserted += 1
else:
rows_skipped += 1
logger.info(
f"Organizations migration complete: {rows_read} read, "
f"{rows_inserted} inserted, {rows_skipped} skipped"
)
return MigrationResult(
table_name=table_name,
source_file=str(source_file),
rows_read=rows_read,
rows_inserted=rows_inserted,
rows_skipped=rows_skipped,
success=True
)
except Exception as e:
error_msg = f"Migration failed: {e}"
logger.error(error_msg)
return MigrationResult(
table_name=table_name,
source_file=str(source_file),
rows_read=rows_read,
rows_inserted=0,
rows_skipped=0,
success=False,
error_message=error_msg
)
def get_organization_counts(conn: sqlite3.Connection) -> dict:
"""
Get statistics about the ref_organizations table.
Returns:
Dictionary with:
- total_organizations: Total rows in table
"""
cursor = conn.execute("SELECT COUNT(*) FROM ref_organizations")
total = cursor.fetchone()[0]
return {
"total_organizations": total
}
def verify_organizations_migration(
db_manager: Optional[DatabaseManager] = None,
paths: Optional[PathConfig] = None
) -> tuple[bool, str]:
"""
Verify that organizations were migrated correctly by comparing CSV to SQLite.
Checks:
- Row count matches (accounting for header and duplicates)
- Sample lookups return expected values
Args:
db_manager: DatabaseManager instance. Uses default if not provided.
paths: PathConfig instance for locating CSV file. Uses default if not provided.
Returns:
Tuple of (success: bool, message: str)
"""
if db_manager is None:
db_manager = DatabaseManager()
if paths is None:
paths = default_paths
source_file = paths.org_codes_csv
# Count rows in CSV using fallback encoding
csv_unique_codes: set[str] = set()
sample_mappings: list[tuple[str, str]] = []
rows = _read_csv_with_fallback_encoding(source_file)
for i, row in enumerate(rows):
# Skip header
if i == 0 and len(row) >= 2 and row[0].strip().lower() == 'name':
continue
if len(row) >= 2 and row[0].strip() and row[1].strip():
org_name = row[0].strip()
org_code = row[1].strip()
csv_unique_codes.add(org_code)
if len(sample_mappings) < 5: # Sample first 5 for verification
sample_mappings.append((org_code, org_name))
# Count rows in SQLite
with db_manager.get_connection() as conn:
stats = get_organization_counts(conn)
# Check row count (should match unique org codes)
if stats["total_organizations"] != len(csv_unique_codes):
return False, (
f"Row count mismatch: CSV has {len(csv_unique_codes)} unique org codes, "
f"SQLite has {stats['total_organizations']} rows"
)
# Verify sample lookups
for org_code, expected_name in sample_mappings:
cursor = conn.execute(
"SELECT org_name FROM ref_organizations WHERE org_code = ?",
(org_code,)
)
result = cursor.fetchone()
if result is None:
return False, f"Missing organization for code: {org_code}"
if result[0] != expected_name:
return False, f"Wrong name for {org_code}: expected '{expected_name}', got '{result[0]}'"
return True, f"Verified {stats['total_organizations']} organizations"
def verify_drug_names_migration(
db_manager: Optional[DatabaseManager] = None,
paths: Optional[PathConfig] = None
) -> tuple[bool, str]:
"""
Verify that drug names were migrated correctly by comparing CSV to SQLite.
Checks:
- Row count matches (accounting for duplicates)
- Sample lookups return expected values
Args:
db_manager: DatabaseManager instance. Uses default if not provided.
paths: PathConfig instance for locating CSV file. Uses default if not provided.
Returns:
Tuple of (success: bool, message: str)
"""
if db_manager is None:
db_manager = DatabaseManager()
if paths is None:
paths = default_paths
source_file = paths.drugnames_csv
# Count rows in CSV using fallback encoding
csv_rows = 0
csv_unique_raw = set()
sample_mappings = []
rows = _read_csv_with_fallback_encoding(source_file)
for i, row in enumerate(rows):
if len(row) >= 2 and row[0].strip() and row[1].strip():
csv_rows += 1
raw = row[0].strip()
std = row[1].strip()
csv_unique_raw.add(raw)
if i < 5: # Sample first 5 for verification
sample_mappings.append((raw, std))
# Count rows in SQLite
with db_manager.get_connection() as conn:
stats = get_drug_name_counts(conn)
# Check row count (should match unique raw names, not total rows)
if stats["total_mappings"] != len(csv_unique_raw):
return False, (
f"Row count mismatch: CSV has {len(csv_unique_raw)} unique raw names, "
f"SQLite has {stats['total_mappings']} rows"
)
# Verify sample lookups
for raw, expected_std in sample_mappings:
cursor = conn.execute(
"SELECT standard_name FROM ref_drug_names WHERE raw_name = ?",
(raw,)
)
result = cursor.fetchone()
if result is None:
return False, f"Missing mapping for: {raw}"
if result[0] != expected_std:
return False, f"Wrong mapping for {raw}: expected '{expected_std}', got '{result[0]}'"
return True, f"Verified {stats['total_mappings']} drug name mappings"
def migrate_directories(
db_manager: Optional[DatabaseManager] = None,
paths: Optional[PathConfig] = None
) -> MigrationResult:
"""
Migrate medical directories from CSV to SQLite ref_directories table.
Source file format (with header):
directory
Example rows:
ONCOLOGY
RHEUMATOLOGY
NEPHROLOGY
Args:
db_manager: DatabaseManager instance. Uses default if not provided.
paths: PathConfig instance for locating CSV file. Uses default if not provided.
Returns:
MigrationResult with statistics about the migration.
"""
if db_manager is None:
db_manager = DatabaseManager()
if paths is None:
paths = default_paths
source_file = paths.directory_list_csv
table_name = "ref_directories"
logger.info(f"Migrating directories from {source_file} to {table_name}")
# Validate source file exists
if not source_file.exists():
error_msg = f"Source file not found: {source_file}"
logger.error(error_msg)
return MigrationResult(
table_name=table_name,
source_file=str(source_file),
rows_read=0,
rows_inserted=0,
rows_skipped=0,
success=False,
error_message=error_msg
)
rows_read = 0
rows_inserted = 0
rows_skipped = 0
try:
with db_manager.get_transaction() as conn:
# Read CSV with encoding fallback
rows = _read_csv_with_fallback_encoding(source_file)
for i, row in enumerate(rows):
# Skip header row (check for 'directory' keyword)
if i == 0 and len(row) >= 1 and row[0].strip().lower() == 'directory':
logger.debug("Skipping header row")
continue
rows_read += 1
# Validate row format (single column)
if len(row) < 1:
logger.warning(f"Skipping empty row {rows_read}")
rows_skipped += 1
continue
directory_name = row[0].strip()
# Skip empty values
if not directory_name:
logger.warning(f"Skipping row {rows_read} with empty directory name")
rows_skipped += 1
continue
# Insert with conflict handling (IGNORE duplicates on directory_name)
cursor = conn.execute(
"""
INSERT OR IGNORE INTO ref_directories (directory_name)
VALUES (?)
""",
(directory_name,)
)
if cursor.rowcount > 0:
rows_inserted += 1
else:
rows_skipped += 1
logger.info(
f"Directories migration complete: {rows_read} read, "
f"{rows_inserted} inserted, {rows_skipped} skipped"
)
return MigrationResult(
table_name=table_name,
source_file=str(source_file),
rows_read=rows_read,
rows_inserted=rows_inserted,
rows_skipped=rows_skipped,
success=True
)
except Exception as e:
error_msg = f"Migration failed: {e}"
logger.error(error_msg)
return MigrationResult(
table_name=table_name,
source_file=str(source_file),
rows_read=rows_read,
rows_inserted=0,
rows_skipped=0,
success=False,
error_message=error_msg
)
def get_directory_counts(conn: sqlite3.Connection) -> dict:
"""
Get statistics about the ref_directories table.
Returns:
Dictionary with:
- total_directories: Total rows in table
"""
cursor = conn.execute("SELECT COUNT(*) FROM ref_directories")
total = cursor.fetchone()[0]
return {
"total_directories": total
}
def verify_directories_migration(
db_manager: Optional[DatabaseManager] = None,
paths: Optional[PathConfig] = None
) -> tuple[bool, str]:
"""
Verify that directories were migrated correctly by comparing CSV to SQLite.
Checks:
- Row count matches (accounting for header and duplicates)
- Sample lookups return expected values
Args:
db_manager: DatabaseManager instance. Uses default if not provided.
paths: PathConfig instance for locating CSV file. Uses default if not provided.
Returns:
Tuple of (success: bool, message: str)
"""
if db_manager is None:
db_manager = DatabaseManager()
if paths is None:
paths = default_paths
source_file = paths.directory_list_csv
# Count unique directories in CSV using fallback encoding
csv_unique_directories: set[str] = set()
sample_directories: list[str] = []
rows = _read_csv_with_fallback_encoding(source_file)
for i, row in enumerate(rows):
# Skip header
if i == 0 and len(row) >= 1 and row[0].strip().lower() == 'directory':
continue
if len(row) >= 1 and row[0].strip():
directory_name = row[0].strip()
csv_unique_directories.add(directory_name)
if len(sample_directories) < 5: # Sample first 5 for verification
sample_directories.append(directory_name)
# Count rows in SQLite
with db_manager.get_connection() as conn:
stats = get_directory_counts(conn)
# Check row count (should match unique directories)
if stats["total_directories"] != len(csv_unique_directories):
return False, (
f"Row count mismatch: CSV has {len(csv_unique_directories)} unique directories, "
f"SQLite has {stats['total_directories']} rows"
)
# Verify sample lookups
for directory_name in sample_directories:
cursor = conn.execute(
"SELECT id FROM ref_directories WHERE directory_name = ?",
(directory_name,)
)
result = cursor.fetchone()
if result is None:
return False, f"Missing directory: {directory_name}"
return True, f"Verified {stats['total_directories']} directories"
def migrate_drug_directory_map(
db_manager: Optional[DatabaseManager] = None,
paths: Optional[PathConfig] = None
) -> MigrationResult:
"""
Migrate drug-to-directory mappings from CSV to SQLite ref_drug_directory_map table.
Source file format (no header):
DRUG_NAME,DIR1|DIR2|DIR3,
The file has pipe-separated directories and often a trailing comma.
Each drug-directory pair becomes a row. The is_single_valid flag is set to 1
if the drug has exactly one valid directory (used for auto-assignment).
Example input:
ABATACEPT,RHEUMATOLOGY|PAEDIATRICS|CLINICAL IMMUNOLOGY,
ROXADUSTAT,NEPHROLOGY
Example output rows:
ABATACEPT, RHEUMATOLOGY, is_single_valid=0
ABATACEPT, PAEDIATRICS, is_single_valid=0
ABATACEPT, CLINICAL IMMUNOLOGY, is_single_valid=0
ROXADUSTAT, NEPHROLOGY, is_single_valid=1
Args:
db_manager: DatabaseManager instance. Uses default if not provided.
paths: PathConfig instance for locating CSV file. Uses default if not provided.
Returns:
MigrationResult with statistics about the migration.
"""
if db_manager is None:
db_manager = DatabaseManager()
if paths is None:
paths = default_paths
source_file = paths.drug_directory_list_csv
table_name = "ref_drug_directory_map"
logger.info(f"Migrating drug-directory map from {source_file} to {table_name}")
# Validate source file exists
if not source_file.exists():
error_msg = f"Source file not found: {source_file}"
logger.error(error_msg)
return MigrationResult(
table_name=table_name,
source_file=str(source_file),
rows_read=0,
rows_inserted=0,
rows_skipped=0,
success=False,
error_message=error_msg
)
rows_read = 0
rows_inserted = 0
rows_skipped = 0
try:
# First pass: Parse CSV and build drug->directories mapping
drug_directories: dict[str, list[str]] = {}
csv_rows = _read_csv_with_fallback_encoding(source_file)
for row in csv_rows:
rows_read += 1
# Validate row format (at least drug name and directories)
if len(row) < 2:
logger.warning(f"Skipping malformed row {rows_read}: {row}")
rows_skipped += 1
continue
drug_name = row[0].strip().upper() # Normalize to uppercase
# Skip empty drug names
if not drug_name:
logger.warning(f"Skipping row {rows_read} with empty drug name")
rows_skipped += 1
continue
# Get directories (pipe-separated)
directories_raw = row[1].strip()
# Skip "NOT A DRUG" entries
if directories_raw.upper() == "NOT A DRUG":
logger.debug(f"Skipping non-drug entry: {drug_name}")
rows_skipped += 1
continue
# Skip empty directories
if not directories_raw:
logger.warning(f"Skipping row {rows_read} with empty directories: {drug_name}")
rows_skipped += 1
continue
# Parse pipe-separated directories
directories = [d.strip().upper() for d in directories_raw.split('|')]
directories = [d for d in directories if d] # Remove empty strings
if not directories:
logger.warning(f"Skipping row {rows_read} with no valid directories: {drug_name}")
rows_skipped += 1
continue
# Store mapping (accumulate for same drug if it appears multiple times)
if drug_name in drug_directories:
# Merge directories, avoiding duplicates
existing = set(drug_directories[drug_name])
for d in directories:
if d not in existing:
drug_directories[drug_name].append(d)
else:
drug_directories[drug_name] = directories
logger.info(f"Parsed {len(drug_directories)} unique drugs from CSV")
# Second pass: Insert into database with is_single_valid flag
with db_manager.get_transaction() as conn:
for drug_name, directories in drug_directories.items():
is_single_valid = 1 if len(directories) == 1 else 0
for directory in directories:
cursor = conn.execute(
"""
INSERT OR IGNORE INTO ref_drug_directory_map
(drug_name, directory_name, is_single_valid)
VALUES (?, ?, ?)
""",
(drug_name, directory, is_single_valid)
)
if cursor.rowcount > 0:
rows_inserted += 1
logger.info(
f"Drug-directory map migration complete: {rows_read} CSV rows read, "
f"{len(drug_directories)} unique drugs, {rows_inserted} mappings inserted, "
f"{rows_skipped} rows skipped"
)
return MigrationResult(
table_name=table_name,
source_file=str(source_file),
rows_read=rows_read,
rows_inserted=rows_inserted,
rows_skipped=rows_skipped,
success=True
)
except Exception as e:
error_msg = f"Migration failed: {e}"
logger.error(error_msg)
return MigrationResult(
table_name=table_name,
source_file=str(source_file),
rows_read=rows_read,
rows_inserted=0,
rows_skipped=0,
success=False,
error_message=error_msg
)
def get_drug_directory_map_counts(conn: sqlite3.Connection) -> dict:
"""
Get statistics about the ref_drug_directory_map table.
Returns:
Dictionary with:
- total_mappings: Total rows in table
- unique_drugs: Count of distinct drug names
- unique_directories: Count of distinct directory names
- single_valid_drugs: Count of drugs with is_single_valid=1
"""
cursor = conn.execute("SELECT COUNT(*) FROM ref_drug_directory_map")
total = cursor.fetchone()[0]
cursor = conn.execute("SELECT COUNT(DISTINCT drug_name) FROM ref_drug_directory_map")
unique_drugs = cursor.fetchone()[0]
cursor = conn.execute("SELECT COUNT(DISTINCT directory_name) FROM ref_drug_directory_map")
unique_directories = cursor.fetchone()[0]
cursor = conn.execute(
"SELECT COUNT(DISTINCT drug_name) FROM ref_drug_directory_map WHERE is_single_valid = 1"
)
single_valid = cursor.fetchone()[0]
return {
"total_mappings": total,
"unique_drugs": unique_drugs,
"unique_directories": unique_directories,
"single_valid_drugs": single_valid
}
def verify_drug_directory_map_migration(
db_manager: Optional[DatabaseManager] = None,
paths: Optional[PathConfig] = None
) -> tuple[bool, str]:
"""
Verify that drug-directory mappings were migrated correctly.
Checks:
- All drugs from CSV are present in SQLite
- is_single_valid flag is set correctly for sample drugs
- Directory counts per drug are correct
Args:
db_manager: DatabaseManager instance. Uses default if not provided.
paths: PathConfig instance for locating CSV file. Uses default if not provided.
Returns:
Tuple of (success: bool, message: str)
"""
if db_manager is None:
db_manager = DatabaseManager()
if paths is None:
paths = default_paths
source_file = paths.drug_directory_list_csv
# Parse CSV to get expected data
expected_drugs: dict[str, list[str]] = {}
csv_rows = _read_csv_with_fallback_encoding(source_file)
for row in csv_rows:
if len(row) < 2:
continue
drug_name = row[0].strip().upper()
directories_raw = row[1].strip()
if not drug_name or not directories_raw or directories_raw.upper() == "NOT A DRUG":
continue
directories = [d.strip().upper() for d in directories_raw.split('|')]
directories = [d for d in directories if d]
if directories:
if drug_name in expected_drugs:
existing = set(expected_drugs[drug_name])
for d in directories:
if d not in existing:
expected_drugs[drug_name].append(d)
else:
expected_drugs[drug_name] = directories
# Verify against SQLite
with db_manager.get_connection() as conn:
stats = get_drug_directory_map_counts(conn)
# Check drug count
if stats["unique_drugs"] != len(expected_drugs):
return False, (
f"Drug count mismatch: CSV has {len(expected_drugs)} unique drugs, "
f"SQLite has {stats['unique_drugs']}"
)
# Verify sample drugs
sample_drugs = list(expected_drugs.keys())[:10]
for drug_name in sample_drugs:
expected_dirs = expected_drugs[drug_name]
expected_single_valid = 1 if len(expected_dirs) == 1 else 0
# Check directories
cursor = conn.execute(
"SELECT directory_name, is_single_valid FROM ref_drug_directory_map WHERE drug_name = ?",
(drug_name,)
)
actual_rows = cursor.fetchall()
if len(actual_rows) != len(expected_dirs):
return False, (
f"Directory count mismatch for {drug_name}: "
f"expected {len(expected_dirs)}, got {len(actual_rows)}"
)
# Check is_single_valid flag
for row in actual_rows:
if row['is_single_valid'] != expected_single_valid:
return False, (
f"is_single_valid mismatch for {drug_name}: "
f"expected {expected_single_valid}, got {row['is_single_valid']}"
)
return True, (
f"Verified {stats['unique_drugs']} drugs with {stats['total_mappings']} mappings "
f"({stats['single_valid_drugs']} single-valid)"
)
def migrate_drug_indication_clusters(
db_manager: Optional[DatabaseManager] = None,
csv_path: Optional[Path] = None
) -> MigrationResult:
"""
Migrate drug indication clusters from CSV to SQLite ref_drug_indication_clusters table.
Source file format (with header):
Drug,Indication,Cluster_ID,Cluster_Description,NICE_TA_Reference
Example rows:
ADALIMUMAB,Rheumatoid arthritis,RARTH_COD,Rheumatoid arthritis diagnosis codes,TA130/TA195/TA375
ADALIMUMAB,Psoriasis,PSORIASIS_COD,Psoriasis codes,TA146/TA455
Args:
db_manager: DatabaseManager instance. Uses default if not provided.
csv_path: Path to the CSV file. Defaults to data/drug_indication_clusters.csv.
Returns:
MigrationResult with statistics about the migration.
"""
if db_manager is None:
db_manager = DatabaseManager()
if csv_path is None:
csv_path = Path("./data/drug_indication_clusters.csv")
table_name = "ref_drug_indication_clusters"
logger.info(f"Migrating drug indication clusters from {csv_path} to {table_name}")
# Validate source file exists
if not csv_path.exists():
error_msg = f"Source file not found: {csv_path}"
logger.error(error_msg)
return MigrationResult(
table_name=table_name,
source_file=str(csv_path),
rows_read=0,
rows_inserted=0,
rows_skipped=0,
success=False,
error_message=error_msg
)
rows_read = 0
rows_inserted = 0
rows_skipped = 0
try:
with db_manager.get_transaction() as conn:
# Read CSV with encoding fallback
rows = _read_csv_with_fallback_encoding(csv_path)
for i, row in enumerate(rows):
# Skip header row
if i == 0 and len(row) >= 3 and row[0].strip().lower() == 'drug':
logger.debug("Skipping header row")
continue
rows_read += 1
# Validate row format (need at least drug, indication, cluster_id)
if len(row) < 3:
logger.warning(f"Skipping malformed row {rows_read}: {row}")
rows_skipped += 1
continue
drug_name = row[0].strip().upper()
indication = row[1].strip()
cluster_id = row[2].strip().upper()
cluster_description = row[3].strip() if len(row) > 3 else ""
nice_ta_reference = row[4].strip() if len(row) > 4 else ""
# Skip empty required fields
if not drug_name or not indication or not cluster_id:
logger.warning(f"Skipping row {rows_read} with empty required fields")
rows_skipped += 1
continue
cursor = conn.execute(
"""
INSERT OR IGNORE INTO ref_drug_indication_clusters
(drug_name, indication, cluster_id, cluster_description, nice_ta_reference)
VALUES (?, ?, ?, ?, ?)
""",
(drug_name, indication, cluster_id, cluster_description, nice_ta_reference)
)
if cursor.rowcount > 0:
rows_inserted += 1
else:
rows_skipped += 1
logger.debug(f"Duplicate row skipped: {drug_name}, {indication}, {cluster_id}")
logger.info(
f"Drug indication clusters migration complete: {rows_read} rows read, "
f"{rows_inserted} inserted, {rows_skipped} skipped"
)
return MigrationResult(
table_name=table_name,
source_file=str(csv_path),
rows_read=rows_read,
rows_inserted=rows_inserted,
rows_skipped=rows_skipped,
success=True
)
except Exception as e:
error_msg = f"Migration failed: {e}"
logger.error(error_msg)
return MigrationResult(
table_name=table_name,
source_file=str(csv_path),
rows_read=rows_read,
rows_inserted=0,
rows_skipped=0,
success=False,
error_message=error_msg
)
def get_drug_indication_cluster_counts(conn: sqlite3.Connection) -> dict:
"""
Get statistics about the ref_drug_indication_clusters table.
Returns:
Dictionary with:
- total_mappings: Total rows in table
- unique_drugs: Count of distinct drug names
- unique_indications: Count of distinct indications
- unique_clusters: Count of distinct cluster IDs
"""
cursor = conn.execute("SELECT COUNT(*) FROM ref_drug_indication_clusters")
total = cursor.fetchone()[0]
cursor = conn.execute("SELECT COUNT(DISTINCT drug_name) FROM ref_drug_indication_clusters")
unique_drugs = cursor.fetchone()[0]
cursor = conn.execute("SELECT COUNT(DISTINCT indication) FROM ref_drug_indication_clusters")
unique_indications = cursor.fetchone()[0]
cursor = conn.execute("SELECT COUNT(DISTINCT cluster_id) FROM ref_drug_indication_clusters")
unique_clusters = cursor.fetchone()[0]
return {
"total_mappings": total,
"unique_drugs": unique_drugs,
"unique_indications": unique_indications,
"unique_clusters": unique_clusters
}
def verify_drug_indication_clusters_migration(
db_manager: Optional[DatabaseManager] = None,
csv_path: Optional[Path] = None
) -> tuple[bool, str]:
"""
Verify that drug indication clusters were migrated correctly.
Checks:
- Row count matches CSV (accounting for header and duplicates)
- Sample lookups return expected values
Args:
db_manager: DatabaseManager instance. Uses default if not provided.
csv_path: Path to the CSV file. Defaults to data/drug_indication_clusters.csv.
Returns:
Tuple of (success: bool, message: str)
"""
if db_manager is None:
db_manager = DatabaseManager()
if csv_path is None:
csv_path = Path("./data/drug_indication_clusters.csv")
# Parse CSV to get expected data
csv_rows = _read_csv_with_fallback_encoding(csv_path)
expected_mappings: set[tuple[str, str, str]] = set()
sample_rows: list[tuple[str, str, str]] = []
for i, row in enumerate(csv_rows):
# Skip header
if i == 0 and len(row) >= 3 and row[0].strip().lower() == 'drug':
continue
if len(row) >= 3 and row[0].strip() and row[1].strip() and row[2].strip():
drug = row[0].strip().upper()
indication = row[1].strip()
cluster = row[2].strip().upper()
expected_mappings.add((drug, indication, cluster))
if len(sample_rows) < 5:
sample_rows.append((drug, indication, cluster))
# Verify against SQLite
with db_manager.get_connection() as conn:
stats = get_drug_indication_cluster_counts(conn)
# Check row count
if stats["total_mappings"] != len(expected_mappings):
return False, (
f"Row count mismatch: CSV has {len(expected_mappings)} unique mappings, "
f"SQLite has {stats['total_mappings']} rows"
)
# Verify sample lookups
for drug, indication, cluster in sample_rows:
cursor = conn.execute(
"""
SELECT cluster_id FROM ref_drug_indication_clusters
WHERE drug_name = ? AND indication = ? AND cluster_id = ?
""",
(drug, indication, cluster)
)
if cursor.fetchone() is None:
return False, f"Missing mapping: {drug} / {indication} / {cluster}"
return True, (
f"Verified {stats['total_mappings']} mappings for {stats['unique_drugs']} drugs "
f"across {stats['unique_clusters']} clusters"
)