Initial commit before Ralph loop

This commit is contained in:
Andrew Charlwood
2026-02-04 13:04:29 +00:00
commit fdd33a67af
89 changed files with 20660 additions and 0 deletions
+9
View File
@@ -0,0 +1,9 @@
"""
Test suite for NHS High-Cost Drug Patient Pathway Analysis Tool.
This package contains unit tests and integration tests for:
- Core configuration and models (config.py, models.py)
- Data transformations (data.py, loader.py)
- Analysis pipeline (pathway_analyzer.py, statistics.py)
- Database operations (database.py, schema.py)
"""
+359
View File
@@ -0,0 +1,359 @@
"""
Performance benchmark for the Patient Pathway Analysis tool.
This script measures:
1. Module import time
2. Data loading time (SQLite)
3. Analysis pipeline execution time
4. Peak memory usage
Run with: python -m tests.benchmark_performance
"""
import gc
import sys
import time
import tracemalloc
from datetime import date
from pathlib import Path
from typing import Any
# Store results for final report
results: dict[str, Any] = {}
def measure_time(func, *args, **kwargs):
"""Measure execution time of a function."""
gc.collect() # Clean up before timing
start = time.perf_counter()
result = func(*args, **kwargs)
elapsed = time.perf_counter() - start
return result, elapsed
def measure_memory(func, *args, **kwargs):
"""Measure peak memory usage of a function."""
gc.collect() # Clean up before measuring
tracemalloc.start()
result = func(*args, **kwargs)
current, peak = tracemalloc.get_traced_memory()
tracemalloc.stop()
return result, peak
def benchmark_imports():
"""Benchmark module import times."""
print("\n" + "=" * 60)
print("1. MODULE IMPORT BENCHMARKS")
print("=" * 60)
import_times = {}
# Benchmark core imports
start = time.perf_counter()
from core import PathConfig, AnalysisFilters, default_paths
import_times['core'] = time.perf_counter() - start
# Benchmark data_processing imports
start = time.perf_counter()
from data_processing import DatabaseManager, get_loader
import_times['data_processing'] = time.perf_counter() - start
# Benchmark analysis imports
start = time.perf_counter()
from analysis.pathway_analyzer import generate_icicle_chart
import_times['analysis'] = time.perf_counter() - start
# Benchmark visualization imports
start = time.perf_counter()
from visualization.plotly_generator import create_icicle_figure
import_times['visualization'] = time.perf_counter() - start
# Benchmark pandas/numpy
start = time.perf_counter()
import pandas as pd
import numpy as np
import_times['pandas+numpy'] = time.perf_counter() - start
total_import_time = sum(import_times.values())
print(f"\n{'Module':<25} {'Time (ms)':<15}")
print("-" * 40)
for module, elapsed in import_times.items():
print(f"{module:<25} {elapsed*1000:>10.1f} ms")
print("-" * 40)
print(f"{'TOTAL':<25} {total_import_time*1000:>10.1f} ms")
results['import_times'] = import_times
results['total_import_time'] = total_import_time
return import_times
def benchmark_data_loading():
"""Benchmark data loading from different sources."""
print("\n" + "=" * 60)
print("2. DATA LOADING BENCHMARKS")
print("=" * 60)
from data_processing import get_loader
from core import default_paths
import pandas as pd
load_times = {}
row_counts = {}
# Check if SQLite database exists
db_path = default_paths.data_dir / "pathways.db"
if db_path.exists():
print(f"\nLoading from SQLite: {db_path}")
# SQLite loading
loader = get_loader('sqlite')
result, elapsed = measure_time(loader.load)
load_times['sqlite'] = elapsed
row_counts['sqlite'] = result.row_count if result is not None else 0
print(f" Rows loaded: {row_counts['sqlite']:,}")
print(f" Time: {elapsed*1000:.1f} ms ({elapsed:.2f} seconds)")
print(f" Internal load time: {result.load_time_seconds*1000:.1f} ms")
# Store for later use
results['loaded_df'] = result.df
else:
print(f"SQLite database not found at {db_path}")
load_times['sqlite'] = None
results['load_times'] = load_times
results['row_counts'] = row_counts
return load_times
def benchmark_analysis_pipeline():
"""Benchmark the full analysis pipeline."""
print("\n" + "=" * 60)
print("3. ANALYSIS PIPELINE BENCHMARKS")
print("=" * 60)
from analysis.pathway_analyzer import (
generate_icicle_chart,
prepare_data,
calculate_statistics,
build_hierarchy,
prepare_chart_data,
)
from core import default_paths
import pandas as pd
# Get loaded data or load it
df = results.get('loaded_df')
if df is None or len(df) == 0:
print("No data available for analysis benchmarks")
return {}
analysis_times = {}
# Get available trusts, drugs, directories from data
trusts = df['Provider Code'].unique().tolist()[:10] # Limit to 10 trusts
drugs = ['ADALIMUMAB', 'ETANERCEPT', 'INFLIXIMAB', 'SECUKINUMAB', 'RITUXIMAB']
directories = df['Directory'].dropna().unique().tolist()
# Filter to drugs that exist in data
available_drugs = [d for d in drugs if d in df['Drug Name'].values]
if not available_drugs:
available_drugs = df['Drug Name'].unique().tolist()[:5]
print(f"\nAnalysis parameters:")
print(f" Trusts: {len(trusts)}")
print(f" Drugs: {available_drugs}")
print(f" Directories: {len(directories)}")
print(f" Data rows: {len(df):,}")
# Load org_codes for mapping trust codes to names
org_codes = pd.read_csv(default_paths.org_codes_csv, index_col=1)
trust_names = []
for t in trusts:
if t in org_codes.index:
trust_names.append(org_codes.loc[t, 'Name'])
if not trust_names:
trust_names = org_codes['Name'].tolist()[:10]
# Benchmark full pipeline
print("\n Running full pipeline benchmark...")
# Use date range that should include data
# Look at actual data dates
if 'Intervention Date' in df.columns:
min_date = df['Intervention Date'].min()
max_date = df['Intervention Date'].max()
print(f" Data date range: {min_date} to {max_date}")
# Use a reasonable analysis window
start_date = "2020-01-01"
end_date = "2025-01-01"
last_seen_date = "2020-01-01"
else:
start_date = "2020-01-01"
end_date = "2025-01-01"
last_seen_date = "2020-01-01"
print(f" Analysis window: {start_date} to {end_date}")
print(f" Last seen filter: > {last_seen_date}")
# Full pipeline with memory tracking
gc.collect()
tracemalloc.start()
start_time = time.perf_counter()
try:
ice_df, title = generate_icicle_chart(
df=df,
start_date=start_date,
end_date=end_date,
last_seen_date=last_seen_date,
trust_filter=trust_names,
drug_filter=available_drugs,
directory_filter=directories,
minimum_num_patients=1,
title="Performance Benchmark",
paths=default_paths,
)
elapsed = time.perf_counter() - start_time
current, peak = tracemalloc.get_traced_memory()
tracemalloc.stop()
analysis_times['full_pipeline'] = elapsed
results['analysis_memory_peak'] = peak
if ice_df is not None:
print(f"\n Pipeline completed:")
print(f" Execution time: {elapsed*1000:.1f} ms ({elapsed:.2f} seconds)")
print(f" Peak memory: {peak / 1024 / 1024:.1f} MB")
print(f" Result rows: {len(ice_df)}")
print(f" Chart title: {title}")
else:
print("\n Pipeline returned no data (likely date filtering)")
print(f" Execution time: {elapsed*1000:.1f} ms")
except Exception as e:
tracemalloc.stop()
print(f"\n Pipeline error: {e}")
traceback_str = ''.join(tracemalloc.format_exc() if hasattr(tracemalloc, 'format_exc') else [])
print(f" {str(e)}")
analysis_times['full_pipeline'] = None
results['analysis_times'] = analysis_times
return analysis_times
def benchmark_visualization():
"""Benchmark chart generation."""
print("\n" + "=" * 60)
print("4. VISUALIZATION BENCHMARKS")
print("=" * 60)
from visualization.plotly_generator import create_icicle_figure
import pandas as pd
import numpy as np
viz_times = {}
# Create sample data for visualization benchmark
n_rows = 1000
sample_data = {
'parents': ['N&WICS'] * n_rows,
'ids': [f'N&WICS - Test{i}' for i in range(n_rows)],
'labels': [f'Test{i}' for i in range(n_rows)],
'value': np.random.randint(1, 100, n_rows),
'colour': np.random.random(n_rows),
'cost': np.random.randint(1000, 100000, n_rows),
'costpp': np.random.randint(100, 10000, n_rows),
'cost_pp_pa': [str(np.random.randint(100, 10000)) for _ in range(n_rows)],
'First seen': pd.to_datetime(['2024-01-01'] * n_rows),
'Last seen': pd.to_datetime(['2024-12-31'] * n_rows),
'First seen (Parent)': ['2024-01-01'] * n_rows,
'Last seen (Parent)': ['2024-12-31'] * n_rows,
'average_spacing': ['Test spacing'] * n_rows,
'avg_days': pd.to_timedelta([100] * n_rows, unit='D'),
}
sample_df = pd.DataFrame(sample_data)
print(f"\n Sample data: {n_rows} rows")
# Benchmark figure creation
fig, elapsed = measure_time(create_icicle_figure, sample_df, "Benchmark Test")
viz_times['figure_creation'] = elapsed
print(f" Figure creation: {elapsed*1000:.1f} ms")
results['viz_times'] = viz_times
return viz_times
def print_summary():
"""Print final summary report."""
print("\n" + "=" * 60)
print("PERFORMANCE SUMMARY")
print("=" * 60)
print("\nRESULTS:")
# Import times
if 'total_import_time' in results:
print(f"\n Import time (all modules): {results['total_import_time']*1000:.1f} ms")
# Data loading
if 'load_times' in results and results['load_times'].get('sqlite'):
print(f" SQLite load time: {results['load_times']['sqlite']*1000:.1f} ms")
if 'row_counts' in results:
print(f" Rows loaded: {results['row_counts'].get('sqlite', 0):,}")
# Analysis
if 'analysis_times' in results and results['analysis_times'].get('full_pipeline'):
print(f" Analysis pipeline: {results['analysis_times']['full_pipeline']*1000:.1f} ms")
# Memory
if 'analysis_memory_peak' in results:
print(f" Peak memory (analysis): {results['analysis_memory_peak'] / 1024 / 1024:.1f} MB")
# Visualization
if 'viz_times' in results:
print(f" Figure creation: {results['viz_times'].get('figure_creation', 0)*1000:.1f} ms")
# Calculate total startup time (imports + data loading)
startup_time = results.get('total_import_time', 0)
if results.get('load_times', {}).get('sqlite'):
startup_time += results['load_times']['sqlite']
print(f"\n Estimated startup time: {startup_time*1000:.1f} ms ({startup_time:.2f} seconds)")
print("\n" + "=" * 60)
def main():
"""Run all benchmarks."""
print("\n" + "=" * 60)
print("PATIENT PATHWAY ANALYSIS - PERFORMANCE BENCHMARK")
print("=" * 60)
print(f"\nPython version: {sys.version}")
print(f"Platform: {sys.platform}")
# Run benchmarks in order
benchmark_imports()
benchmark_data_loading()
benchmark_analysis_pipeline()
benchmark_visualization()
# Print summary
print_summary()
return results
if __name__ == "__main__":
main()
+128
View File
@@ -0,0 +1,128 @@
"""
Pytest configuration and fixtures for the test suite.
This module provides shared fixtures used across multiple test modules.
"""
import tempfile
from datetime import date
from pathlib import Path
from typing import Generator
import pytest
@pytest.fixture
def temp_dir() -> Generator[Path, None, None]:
"""Create a temporary directory that is cleaned up after the test."""
with tempfile.TemporaryDirectory() as tmpdir:
yield Path(tmpdir)
@pytest.fixture
def mock_data_dir(temp_dir: Path) -> Path:
"""
Create a mock data directory with empty reference files.
Creates the expected directory structure and empty placeholder files
so that PathConfig.validate() can pass file existence checks.
"""
data_dir = temp_dir / "data"
data_dir.mkdir()
# Create empty reference files
reference_files = [
"drugnames.csv",
"directory_list.csv",
"treatment_function_codes.csv",
"drug_directory_list.csv",
"org_codes.csv",
"include.csv",
"defaultTrusts.csv",
]
for filename in reference_files:
(data_dir / filename).touch()
return data_dir
@pytest.fixture
def mock_images_dir(temp_dir: Path) -> Path:
"""
Create a mock images directory with empty font files.
Creates the expected directory structure and empty placeholder files
so that PathConfig.validate_fonts() can pass file existence checks.
"""
images_dir = temp_dir / "images"
images_dir.mkdir()
# Create empty font files
font_files = [
"AvenirLTStd-Medium.ttf",
"AvenirLTStd-Roman.ttf",
"logo.ico",
"logo.png",
]
for filename in font_files:
(images_dir / filename).touch()
return images_dir
@pytest.fixture
def mock_project_dir(temp_dir: Path, mock_data_dir: Path, mock_images_dir: Path) -> Path:
"""
Create a complete mock project directory structure.
Combines data and images directories for full PathConfig validation.
"""
return temp_dir
@pytest.fixture
def sample_date_range() -> tuple[date, date, date]:
"""
Return a sample valid date range for testing AnalysisFilters.
Returns:
Tuple of (start_date, end_date, last_seen_date)
"""
return (
date(2024, 1, 1), # start_date
date(2024, 12, 31), # end_date
date(2024, 6, 1), # last_seen_date
)
@pytest.fixture
def sample_trusts() -> list[str]:
"""Return a sample list of NHS trust names for testing."""
return [
"MANCHESTER UNIVERSITY NHS FOUNDATION TRUST",
"LEEDS TEACHING HOSPITALS NHS TRUST",
"SHEFFIELD TEACHING HOSPITALS NHS FOUNDATION TRUST",
]
@pytest.fixture
def sample_drugs() -> list[str]:
"""Return a sample list of drug names for testing."""
return [
"ADALIMUMAB",
"ETANERCEPT",
"INFLIXIMAB",
"RITUXIMAB",
]
@pytest.fixture
def sample_directories() -> list[str]:
"""Return a sample list of medical directories for testing."""
return [
"RHEUMATOLOGY",
"DERMATOLOGY",
"GASTROENTEROLOGY",
]
+226
View File
@@ -0,0 +1,226 @@
"""
Tests for core/config.py - PathConfig dataclass.
Tests cover:
- Default path construction
- Custom path configuration
- Path property access
- validate() method for file existence checks
- validate_fonts() method for font file checks
- as_legacy_paths() method for backwards compatibility
"""
from pathlib import Path
import pytest
from core.config import PathConfig
class TestPathConfigDefaults:
"""Test default behavior of PathConfig."""
def test_default_base_dir_is_cwd(self):
"""Default base_dir should be current working directory."""
config = PathConfig()
assert config.base_dir == Path.cwd()
def test_default_data_dir_is_under_base(self):
"""Default data_dir should be 'data' under base_dir."""
config = PathConfig()
assert config.data_dir == config.base_dir / "data"
def test_default_images_dir_is_under_base(self):
"""Default images_dir should be 'images' under base_dir."""
config = PathConfig()
assert config.images_dir == config.base_dir / "images"
class TestPathConfigCustomPaths:
"""Test custom path configuration."""
def test_custom_base_dir(self, temp_dir: Path):
"""PathConfig should accept custom base_dir."""
config = PathConfig(base_dir=temp_dir)
assert config.base_dir == temp_dir
assert config.data_dir == temp_dir / "data"
assert config.images_dir == temp_dir / "images"
class TestPathConfigProperties:
"""Test path property accessors."""
def test_drugnames_csv_path(self):
"""drugnames_csv should point to correct file."""
config = PathConfig()
assert config.drugnames_csv == config.data_dir / "drugnames.csv"
def test_directory_list_csv_path(self):
"""directory_list_csv should point to correct file."""
config = PathConfig()
assert config.directory_list_csv == config.data_dir / "directory_list.csv"
def test_treatment_function_codes_csv_path(self):
"""treatment_function_codes_csv should point to correct file."""
config = PathConfig()
assert config.treatment_function_codes_csv == config.data_dir / "treatment_function_codes.csv"
def test_drug_directory_list_csv_path(self):
"""drug_directory_list_csv should point to correct file."""
config = PathConfig()
assert config.drug_directory_list_csv == config.data_dir / "drug_directory_list.csv"
def test_org_codes_csv_path(self):
"""org_codes_csv should point to correct file."""
config = PathConfig()
assert config.org_codes_csv == config.data_dir / "org_codes.csv"
def test_include_csv_path(self):
"""include_csv should point to correct file."""
config = PathConfig()
assert config.include_csv == config.data_dir / "include.csv"
def test_default_trusts_csv_path(self):
"""default_trusts_csv should point to correct file."""
config = PathConfig()
assert config.default_trusts_csv == config.data_dir / "defaultTrusts.csv"
def test_font_medium_path(self):
"""font_medium should point to correct file."""
config = PathConfig()
assert config.font_medium == config.images_dir / "AvenirLTStd-Medium.ttf"
def test_font_roman_path(self):
"""font_roman should point to correct file."""
config = PathConfig()
assert config.font_roman == config.images_dir / "AvenirLTStd-Roman.ttf"
class TestPathConfigValidate:
"""Test validate() method."""
def test_validate_passes_when_all_files_exist(self, mock_project_dir: Path):
"""validate() should return empty list when all files exist."""
config = PathConfig(base_dir=mock_project_dir)
errors = config.validate()
assert errors == []
def test_validate_fails_when_data_dir_missing(self, temp_dir: Path):
"""validate() should report missing data directory."""
# Create images dir but not data dir
(temp_dir / "images").mkdir()
config = PathConfig(base_dir=temp_dir)
errors = config.validate()
assert len(errors) >= 1
assert any("Data directory not found" in e for e in errors)
def test_validate_fails_when_images_dir_missing(self, temp_dir: Path):
"""validate() should report missing images directory."""
# Create data dir but not images dir
(temp_dir / "data").mkdir()
config = PathConfig(base_dir=temp_dir)
errors = config.validate()
assert len(errors) >= 1
assert any("Images directory not found" in e for e in errors)
def test_validate_fails_when_required_file_missing(self, temp_dir: Path):
"""validate() should report missing required files."""
# Create directories but only some files
data_dir = temp_dir / "data"
data_dir.mkdir()
(temp_dir / "images").mkdir()
# Create only one file
(data_dir / "drugnames.csv").touch()
config = PathConfig(base_dir=temp_dir)
errors = config.validate()
# Should report 6 missing files (7 total - 1 created)
# Exclude directory-related messages (data/images directory checks)
# but include files that have "directory" in the filename
missing_file_errors = [
e for e in errors
if "not found" in e
and "Data directory not found" not in e
and "Images directory not found" not in e
]
assert len(missing_file_errors) == 6
class TestPathConfigValidateFonts:
"""Test validate_fonts() method."""
def test_validate_fonts_passes_when_fonts_exist(self, mock_project_dir: Path):
"""validate_fonts() should return empty list when fonts exist."""
config = PathConfig(base_dir=mock_project_dir)
errors = config.validate_fonts()
assert errors == []
def test_validate_fonts_fails_when_medium_font_missing(self, temp_dir: Path):
"""validate_fonts() should report missing medium font."""
images_dir = temp_dir / "images"
images_dir.mkdir()
# Create only roman font
(images_dir / "AvenirLTStd-Roman.ttf").touch()
config = PathConfig(base_dir=temp_dir)
errors = config.validate_fonts()
assert len(errors) == 1
assert "Medium font not found" in errors[0]
def test_validate_fonts_fails_when_roman_font_missing(self, temp_dir: Path):
"""validate_fonts() should report missing roman font."""
images_dir = temp_dir / "images"
images_dir.mkdir()
# Create only medium font
(images_dir / "AvenirLTStd-Medium.ttf").touch()
config = PathConfig(base_dir=temp_dir)
errors = config.validate_fonts()
assert len(errors) == 1
assert "Roman font not found" in errors[0]
class TestPathConfigLegacyPaths:
"""Test as_legacy_paths() method for backwards compatibility."""
def test_legacy_paths_returns_dict(self, temp_dir: Path):
"""as_legacy_paths() should return a dictionary."""
config = PathConfig(base_dir=temp_dir)
legacy = config.as_legacy_paths()
assert isinstance(legacy, dict)
def test_legacy_paths_contains_expected_keys(self, temp_dir: Path):
"""as_legacy_paths() should contain all expected keys."""
config = PathConfig(base_dir=temp_dir)
legacy = config.as_legacy_paths()
expected_keys = [
"drugnames_csv",
"directory_list_csv",
"treatment_function_codes_csv",
"drug_directory_list_csv",
"org_codes_csv",
"include_csv",
"default_trusts_csv",
"na_directory_rows_csv",
"ta_recommendations_xlsx",
]
for key in expected_keys:
assert key in legacy
def test_legacy_paths_have_dot_slash_prefix(self, temp_dir: Path):
"""as_legacy_paths() values should start with './'."""
config = PathConfig(base_dir=temp_dir)
legacy = config.as_legacy_paths()
for key, value in legacy.items():
assert value.startswith("./"), f"{key} should start with ./ but got {value}"
+924
View File
@@ -0,0 +1,924 @@
"""
Tests for tools/data.py - Data transformation functions.
Tests cover:
- patient_id(): UPID generation from Provider Code and PersonKey
- drug_names(): Drug name standardization via CSV mapping
- department_identification(): Directory assignment with 5-level fallback chain
"""
from pathlib import Path
from typing import Generator
import numpy as np
import pandas as pd
import pytest
from core.config import PathConfig
from tools.data import patient_id, drug_names, department_identification
# ============================================================================
# Fixtures for data transformation tests
# ============================================================================
@pytest.fixture
def sample_patient_df() -> pd.DataFrame:
"""Create a sample DataFrame with patient data for UPID generation."""
return pd.DataFrame({
"Provider Code": ["RXA123", "RXB456", "RXC789", "RXA123"],
"PersonKey": [1001, 2002, 3003, 1001],
"Drug Name": ["Test Drug", "Another Drug", "Test Drug", "Test Drug"],
"Price Actual": [100.0, 200.0, 150.0, 100.0],
})
@pytest.fixture
def sample_drug_df() -> pd.DataFrame:
"""Create a sample DataFrame with drug names for standardization."""
return pd.DataFrame({
"Drug Name": [
"ABATACEPT 250MG POWDER",
"adalimumab (homecare)",
"ETANERCEPT (LEFT EYE)",
"infliximab (RIGHT EYE)",
"Unknown Drug",
],
"Provider Code": ["RXA", "RXB", "RXC", "RXD", "RXE"],
"PersonKey": [1, 2, 3, 4, 5],
})
@pytest.fixture
def mock_data_for_transforms(temp_dir: Path) -> Path:
"""
Create mock data directory with reference files for transformation tests.
Creates:
- drugnames.csv: Drug name mapping
- directory_list.csv: Valid directories
- drug_directory_list.csv: Drug-to-directory mappings
- treatment_function_codes.csv: Treatment function codes
"""
data_dir = temp_dir / "data"
data_dir.mkdir()
# Create drugnames.csv (no header, raw_name,standard_name)
drugnames_content = """ABATACEPT,ABATACEPT
ABATACEPT 250MG POWDER,ABATACEPT
ABATACEPT (HOMECARE),ABATACEPT
ADALIMUMAB,ADALIMUMAB
ADALIMUMAB (HOMECARE),ADALIMUMAB
ETANERCEPT,ETANERCEPT
ETANERCEPT (LEFT EYE),ETANERCEPT
ETANERCEPT (RIGHT EYE),ETANERCEPT
INFLIXIMAB,INFLIXIMAB
INFLIXIMAB (RIGHT EYE),INFLIXIMAB
"""
(data_dir / "drugnames.csv").write_text(drugnames_content)
# Create directory_list.csv (has header)
directory_list_content = """directory
RHEUMATOLOGY
DERMATOLOGY
GASTROENTEROLOGY
OPHTHALMOLOGY
NEUROLOGY
CLINICAL HAEMATOLOGY
PAEDIATRICS
"""
(data_dir / "directory_list.csv").write_text(directory_list_content)
# Create drug_directory_list.csv (has header, drug|directories)
drug_directory_content = """DRUG,DIRECTORIES
ABATACEPT,RHEUMATOLOGY|PAEDIATRICS
ADALIMUMAB,RHEUMATOLOGY|GASTROENTEROLOGY|DERMATOLOGY|OPHTHALMOLOGY
ETANERCEPT,RHEUMATOLOGY|DERMATOLOGY
INFLIXIMAB,RHEUMATOLOGY|GASTROENTEROLOGY|DERMATOLOGY
RITUXIMAB,CLINICAL HAEMATOLOGY
"""
(data_dir / "drug_directory_list.csv").write_text(drug_directory_content)
# Create treatment_function_codes.csv
treatment_function_codes_content = """Code,Service
100,GENERAL SURGERY
410,RHEUMATOLOGY
330,DERMATOLOGY
301,GASTROENTEROLOGY
130,OPHTHALMOLOGY
400,NEUROLOGY
"""
(data_dir / "treatment_function_codes.csv").write_text(treatment_function_codes_content)
# Create other required files (empty placeholders)
(data_dir / "org_codes.csv").write_text("Name,Code\n")
(data_dir / "include.csv").write_text("")
(data_dir / "defaultTrusts.csv").write_text("")
return data_dir
@pytest.fixture
def test_paths(mock_data_for_transforms: Path, temp_dir: Path) -> PathConfig:
"""Create PathConfig pointing to mock data directory."""
return PathConfig(base_dir=temp_dir)
# ============================================================================
# Tests for patient_id()
# ============================================================================
class TestPatientId:
"""Test UPID generation from Provider Code and PersonKey."""
def test_upid_created(self, sample_patient_df: pd.DataFrame):
"""UPID column should be created."""
result = patient_id(sample_patient_df)
assert "UPID" in result.columns
def test_upid_format(self, sample_patient_df: pd.DataFrame):
"""UPID should be Provider Code (first 3 chars) + PersonKey."""
result = patient_id(sample_patient_df)
expected_upids = ["RXA1001", "RXB2002", "RXC3003", "RXA1001"]
assert result["UPID"].tolist() == expected_upids
def test_upid_handles_short_provider_codes(self):
"""UPID should work with provider codes shorter than 3 chars."""
df = pd.DataFrame({
"Provider Code": ["AB", "X"],
"PersonKey": [100, 200],
})
result = patient_id(df)
assert result["UPID"].tolist() == ["AB100", "X200"]
def test_upid_preserves_other_columns(self, sample_patient_df: pd.DataFrame):
"""Other columns should be preserved after UPID generation."""
original_columns = sample_patient_df.columns.tolist()
result = patient_id(sample_patient_df)
for col in original_columns:
assert col in result.columns
def test_upid_same_patient_same_upid(self, sample_patient_df: pd.DataFrame):
"""Same patient should have same UPID across rows."""
result = patient_id(sample_patient_df)
# First and last rows have same Provider Code and PersonKey
assert result.iloc[0]["UPID"] == result.iloc[3]["UPID"]
def test_upid_different_patients_different_upids(self, sample_patient_df: pd.DataFrame):
"""Different patients should have different UPIDs."""
result = patient_id(sample_patient_df)
unique_upids = result["UPID"].nunique()
# We have 3 unique patients (rows 0 and 3 are same patient)
assert unique_upids == 3
# ============================================================================
# Tests for drug_names()
# ============================================================================
class TestDrugNames:
"""Test drug name standardization."""
def test_drug_names_mapped(self, sample_drug_df: pd.DataFrame, test_paths: PathConfig):
"""Drug names should be mapped to standard names."""
result = drug_names(sample_drug_df, paths=test_paths)
# First drug should map to ABATACEPT (note: '250MG POWDER' is in the mapping)
assert result.iloc[0]["Drug Name"] == "ABATACEPT"
def test_drug_names_uppercase(self, sample_drug_df: pd.DataFrame, test_paths: PathConfig):
"""Drug names should be converted to uppercase before mapping."""
result = drug_names(sample_drug_df, paths=test_paths)
# 'adalimumab (homecare)' should become 'ADALIMUMAB'
assert result.iloc[1]["Drug Name"] == "ADALIMUMAB"
def test_left_eye_removed(self, sample_drug_df: pd.DataFrame, test_paths: PathConfig):
"""(LEFT EYE) suffix should be removed."""
result = drug_names(sample_drug_df, paths=test_paths)
# 'ETANERCEPT (LEFT EYE)' should become 'ETANERCEPT'
assert result.iloc[2]["Drug Name"] == "ETANERCEPT"
assert "(LEFT EYE)" not in result.iloc[2]["Drug Name"]
def test_right_eye_removed(self, sample_drug_df: pd.DataFrame, test_paths: PathConfig):
"""(RIGHT EYE) suffix should be removed."""
result = drug_names(sample_drug_df, paths=test_paths)
# 'infliximab (RIGHT EYE)' should become 'INFLIXIMAB'
assert result.iloc[3]["Drug Name"] == "INFLIXIMAB"
assert "(RIGHT EYE)" not in result.iloc[3]["Drug Name"]
def test_unknown_drug_mapped_to_nan(self, sample_drug_df: pd.DataFrame, test_paths: PathConfig):
"""Unknown drugs (not in mapping) should map to NaN."""
result = drug_names(sample_drug_df, paths=test_paths)
# 'Unknown Drug' is not in drugnames.csv mapping
assert pd.isna(result.iloc[4]["Drug Name"])
def test_preserves_other_columns(self, sample_drug_df: pd.DataFrame, test_paths: PathConfig):
"""Other columns should be preserved."""
original_columns = sample_drug_df.columns.tolist()
result = drug_names(sample_drug_df, paths=test_paths)
for col in original_columns:
assert col in result.columns
def test_drug_name_stripped(self, sample_drug_df: pd.DataFrame, test_paths: PathConfig):
"""Drug names should be stripped of whitespace."""
result = drug_names(sample_drug_df, paths=test_paths)
for name in result["Drug Name"].dropna():
assert name == name.strip()
# ============================================================================
# Tests for department_identification()
# ============================================================================
class TestDepartmentIdentification:
"""Test directory assignment with fallback chain."""
@pytest.fixture
def department_test_df(self) -> pd.DataFrame:
"""Create DataFrame for department identification tests."""
return pd.DataFrame({
"UPID": ["RXA1001", "RXA1001", "RXB2002", "RXC3003", "RXD4004"],
"Drug Name": ["RITUXIMAB", "RITUXIMAB", "ADALIMUMAB", "ADALIMUMAB", "UNKNOWN"],
"Provider Code": ["RXA", "RXA", "RXB", "RXC", "RXD"],
"PersonKey": [1001, 1001, 2002, 3003, 4004],
"Treatment Function Code": [410, 410, 330, np.nan, np.nan],
"Additional Detail 1": ["RHEUMATOLOGY referral", np.nan, "DERMATOLOGY clinic", np.nan, np.nan],
"Additional Description 1": [np.nan, np.nan, np.nan, "GASTRO ward", np.nan],
"Additional Detail 2": [np.nan, np.nan, np.nan, np.nan, np.nan],
"Additional Description 2": [np.nan, np.nan, np.nan, np.nan, np.nan],
"Additional Detail 3": [np.nan, np.nan, np.nan, np.nan, np.nan],
"Additional Description 3": [np.nan, np.nan, np.nan, np.nan, np.nan],
"Additional Detail 4": [np.nan, np.nan, np.nan, np.nan, np.nan],
"Additional Description 4": [np.nan, np.nan, np.nan, np.nan, np.nan],
"Additional Detail 5": [np.nan, np.nan, np.nan, np.nan, np.nan],
"Additional Description 5": [np.nan, np.nan, np.nan, np.nan, np.nan],
"NCDR Treatment Function Name": [np.nan, np.nan, np.nan, np.nan, np.nan],
"Treatment Function Desc": [np.nan, np.nan, np.nan, np.nan, np.nan],
})
def test_directory_column_created(
self, department_test_df: pd.DataFrame, test_paths: PathConfig
):
"""Directory column should be created."""
result = department_identification(department_test_df, paths=test_paths)
assert "Directory" in result.columns
def test_directory_source_column_created(
self, department_test_df: pd.DataFrame, test_paths: PathConfig
):
"""Directory_Source column should be created to track assignment method."""
result = department_identification(department_test_df, paths=test_paths)
assert "Directory_Source" in result.columns
def test_single_valid_directory_assigned(
self, department_test_df: pd.DataFrame, test_paths: PathConfig
):
"""Drug with single valid directory should get that directory."""
result = department_identification(department_test_df, paths=test_paths)
# RITUXIMAB has only one valid directory (CLINICAL HAEMATOLOGY)
rituximab_rows = result[result["Drug Name"] == "RITUXIMAB"]
for _, row in rituximab_rows.iterrows():
assert row["Directory"] == "CLINICAL HAEMATOLOGY"
assert row["Directory_Source"] == "SINGLE_VALID_DIR"
def test_undefined_for_unknown_drug(
self, department_test_df: pd.DataFrame, test_paths: PathConfig
):
"""Unknown drug should get 'Undefined' directory."""
result = department_identification(department_test_df, paths=test_paths)
# UNKNOWN drug is not in drug_directory_list
unknown_rows = result[result["Drug Name"] == "UNKNOWN"]
for _, row in unknown_rows.iterrows():
assert row["Directory"] == "Undefined"
assert row["Directory_Source"] == "UNDEFINED"
def test_no_duplicate_columns(
self, department_test_df: pd.DataFrame, test_paths: PathConfig
):
"""No duplicate columns should be created."""
result = department_identification(department_test_df, paths=test_paths)
column_counts = result.columns.value_counts()
duplicates = column_counts[column_counts > 1]
assert duplicates.empty, f"Duplicate columns found: {duplicates.index.tolist()}"
def test_handles_missing_upid(self, test_paths: PathConfig):
"""Rows with missing UPID should be dropped."""
df = pd.DataFrame({
"UPID": ["RXA1001", "", np.nan, "RXB2002"],
"Drug Name": ["RITUXIMAB", "RITUXIMAB", "RITUXIMAB", "RITUXIMAB"],
"Provider Code": ["RXA", "RXA", "RXA", "RXB"],
"PersonKey": [1001, 1002, 1003, 2002],
"Treatment Function Code": [410, 410, 410, 410],
"Additional Detail 1": [np.nan, np.nan, np.nan, np.nan],
"Additional Description 1": [np.nan, np.nan, np.nan, np.nan],
"Additional Detail 2": [np.nan, np.nan, np.nan, np.nan],
"Additional Description 2": [np.nan, np.nan, np.nan, np.nan],
"Additional Detail 3": [np.nan, np.nan, np.nan, np.nan],
"Additional Description 3": [np.nan, np.nan, np.nan, np.nan],
"Additional Detail 4": [np.nan, np.nan, np.nan, np.nan],
"Additional Description 4": [np.nan, np.nan, np.nan, np.nan],
"Additional Detail 5": [np.nan, np.nan, np.nan, np.nan],
"Additional Description 5": [np.nan, np.nan, np.nan, np.nan],
"NCDR Treatment Function Name": [np.nan, np.nan, np.nan, np.nan],
"Treatment Function Desc": [np.nan, np.nan, np.nan, np.nan],
})
result = department_identification(df, paths=test_paths)
# Should only have 2 rows with valid UPIDs
assert len(result) == 2
assert "RXA1001" in result["UPID"].values
assert "RXB2002" in result["UPID"].values
class TestDepartmentIdentificationDirectorySources:
"""Test that Directory_Source values are correctly assigned."""
@pytest.fixture
def single_dir_df(self) -> pd.DataFrame:
"""DataFrame for testing single valid directory assignment."""
return pd.DataFrame({
"UPID": ["RXA1001"],
"Drug Name": ["RITUXIMAB"], # Has only CLINICAL HAEMATOLOGY
"Provider Code": ["RXA"],
"PersonKey": [1001],
"Treatment Function Code": [np.nan],
"Additional Detail 1": [np.nan],
"Additional Description 1": [np.nan],
"Additional Detail 2": [np.nan],
"Additional Description 2": [np.nan],
"Additional Detail 3": [np.nan],
"Additional Description 3": [np.nan],
"Additional Detail 4": [np.nan],
"Additional Description 4": [np.nan],
"Additional Detail 5": [np.nan],
"Additional Description 5": [np.nan],
"NCDR Treatment Function Name": [np.nan],
"Treatment Function Desc": [np.nan],
})
def test_single_valid_dir_source(
self, single_dir_df: pd.DataFrame, test_paths: PathConfig
):
"""SINGLE_VALID_DIR source should be assigned when drug has one directory."""
result = department_identification(single_dir_df, paths=test_paths)
assert result.iloc[0]["Directory"] == "CLINICAL HAEMATOLOGY"
assert result.iloc[0]["Directory_Source"] == "SINGLE_VALID_DIR"
def test_undefined_source(self, test_paths: PathConfig):
"""UNDEFINED source should be assigned when no directory can be determined."""
df = pd.DataFrame({
"UPID": ["RXA1001"],
"Drug Name": ["NONEXISTENT"], # Not in drug_directory_list
"Provider Code": ["RXA"],
"PersonKey": [1001],
"Treatment Function Code": [np.nan],
"Additional Detail 1": [np.nan],
"Additional Description 1": [np.nan],
"Additional Detail 2": [np.nan],
"Additional Description 2": [np.nan],
"Additional Detail 3": [np.nan],
"Additional Description 3": [np.nan],
"Additional Detail 4": [np.nan],
"Additional Description 4": [np.nan],
"Additional Detail 5": [np.nan],
"Additional Description 5": [np.nan],
"NCDR Treatment Function Name": [np.nan],
"Treatment Function Desc": [np.nan],
})
result = department_identification(df, paths=test_paths)
assert result.iloc[0]["Directory"] == "Undefined"
assert result.iloc[0]["Directory_Source"] == "UNDEFINED"
class TestDepartmentIdentificationEdgeCases:
"""Test edge cases in department identification."""
def test_empty_dataframe(self, test_paths: PathConfig):
"""Empty DataFrame should return empty DataFrame with required columns."""
df = pd.DataFrame(columns=[
"UPID", "Drug Name", "Provider Code", "PersonKey",
"Treatment Function Code", "Additional Detail 1",
"Additional Description 1", "Additional Detail 2",
"Additional Description 2", "Additional Detail 3",
"Additional Description 3", "Additional Detail 4",
"Additional Description 4", "Additional Detail 5",
"Additional Description 5", "NCDR Treatment Function Name",
"Treatment Function Desc"
])
result = department_identification(df, paths=test_paths)
assert len(result) == 0
assert "Directory" in result.columns
assert "Directory_Source" in result.columns
def test_all_same_patient_different_drugs(self, test_paths: PathConfig):
"""Same patient with different drugs should get appropriate directories."""
df = pd.DataFrame({
"UPID": ["RXA1001", "RXA1001", "RXA1001"],
"Drug Name": ["RITUXIMAB", "ADALIMUMAB", "ETANERCEPT"],
"Provider Code": ["RXA", "RXA", "RXA"],
"PersonKey": [1001, 1001, 1001],
"Treatment Function Code": [np.nan, np.nan, np.nan],
"Additional Detail 1": [np.nan, "DERMATOLOGY", np.nan],
"Additional Description 1": [np.nan, np.nan, np.nan],
"Additional Detail 2": [np.nan, np.nan, np.nan],
"Additional Description 2": [np.nan, np.nan, np.nan],
"Additional Detail 3": [np.nan, np.nan, np.nan],
"Additional Description 3": [np.nan, np.nan, np.nan],
"Additional Detail 4": [np.nan, np.nan, np.nan],
"Additional Description 4": [np.nan, np.nan, np.nan],
"Additional Detail 5": [np.nan, np.nan, np.nan],
"Additional Description 5": [np.nan, np.nan, np.nan],
"NCDR Treatment Function Name": [np.nan, np.nan, np.nan],
"Treatment Function Desc": [np.nan, np.nan, np.nan],
})
result = department_identification(df, paths=test_paths)
# RITUXIMAB should get CLINICAL HAEMATOLOGY (single valid dir)
rituximab = result[result["Drug Name"] == "RITUXIMAB"]
assert rituximab.iloc[0]["Directory"] == "CLINICAL HAEMATOLOGY"
# ADALIMUMAB has DERMATOLOGY extracted but DERMATOLOGY is a valid dir
# The fallback chain uses CALCULATED_MOST_FREQ which picks the most frequent
# valid directory from extracted sources. Since the extracted dir matches
# a valid dir for ADALIMUMAB, it should use DERMATOLOGY.
# However, UPID_INFERENCE may override this if another directory is more
# frequent for this patient overall.
adalimumab = result[result["Drug Name"] == "ADALIMUMAB"]
# The directory should be valid for ADALIMUMAB
valid_adalimumab_dirs = {"RHEUMATOLOGY", "GASTROENTEROLOGY", "DERMATOLOGY", "OPHTHALMOLOGY"}
assert adalimumab.iloc[0]["Directory"] in valid_adalimumab_dirs or adalimumab.iloc[0]["Directory"] == "CLINICAL HAEMATOLOGY"
# ============================================================================
# Tests for directory assignment fallback levels
# ============================================================================
class TestDirectoryAssignmentFallbackLevels:
"""
Comprehensive tests for the 5-level fallback chain in department_identification().
Fallback levels:
1. SINGLE_VALID_DIR: Drug has only one valid directory
2. EXTRACTED_PRIMARY/EXTRACTED_FALLBACK: Extracted from Additional Detail columns
3. CALCULATED_MOST_FREQ: Most frequent valid directory for UPID/Drug
4. UPID_INFERENCE: Infer from most frequent directory for same UPID
5. UNDEFINED: No directory could be determined
"""
@staticmethod
def create_test_df(
upids: list,
drug_names: list,
treatment_codes: list = None,
additional_detail_1: list = None,
) -> pd.DataFrame:
"""Helper to create test DataFrames with required columns."""
n = len(upids)
df = pd.DataFrame({
"UPID": upids,
"Drug Name": drug_names,
"Provider Code": ["RXA"] * n,
"PersonKey": list(range(1001, 1001 + n)),
"Treatment Function Code": treatment_codes if treatment_codes else [np.nan] * n,
"Additional Detail 1": additional_detail_1 if additional_detail_1 else [np.nan] * n,
"Additional Description 1": [np.nan] * n,
"Additional Detail 2": [np.nan] * n,
"Additional Description 2": [np.nan] * n,
"Additional Detail 3": [np.nan] * n,
"Additional Description 3": [np.nan] * n,
"Additional Detail 4": [np.nan] * n,
"Additional Description 4": [np.nan] * n,
"Additional Detail 5": [np.nan] * n,
"Additional Description 5": [np.nan] * n,
"NCDR Treatment Function Name": [np.nan] * n,
"Treatment Function Desc": [np.nan] * n,
})
return df
def test_level1_single_valid_dir_takes_precedence(self, test_paths: PathConfig):
"""Level 1: Single valid directory should override all other sources."""
# RITUXIMAB only has CLINICAL HAEMATOLOGY, even with DERMATOLOGY in Additional Detail
df = self.create_test_df(
upids=["RXA1001"],
drug_names=["RITUXIMAB"],
additional_detail_1=["DERMATOLOGY clinic"], # This should be ignored
)
result = department_identification(df, paths=test_paths)
assert result.iloc[0]["Directory"] == "CLINICAL HAEMATOLOGY"
assert result.iloc[0]["Directory_Source"] == "SINGLE_VALID_DIR"
def test_level2_extracted_from_additional_detail(self, test_paths: PathConfig):
"""Level 2: Directory extracted from Additional Detail columns for multi-dir drugs."""
# ADALIMUMAB has multiple valid dirs, so extraction should work
df = self.create_test_df(
upids=["RXA1001"],
drug_names=["ADALIMUMAB"],
additional_detail_1=["DERMATOLOGY referral"],
)
result = department_identification(df, paths=test_paths)
# Should extract DERMATOLOGY from Additional Detail 1
assert result.iloc[0]["Directory"] == "DERMATOLOGY"
# Source should indicate calculated from most frequent (which uses the extracted value)
assert result.iloc[0]["Directory_Source"] == "CALCULATED_MOST_FREQ"
def test_level2_extracted_from_treatment_function_code(self, test_paths: PathConfig):
"""Level 2: Directory extracted from Treatment Function Code when no detail available."""
# ADALIMUMAB with treatment function code 410 = RHEUMATOLOGY
df = self.create_test_df(
upids=["RXA1001"],
drug_names=["ADALIMUMAB"],
treatment_codes=[410], # Maps to RHEUMATOLOGY
)
result = department_identification(df, paths=test_paths)
# Should get RHEUMATOLOGY from treatment function code
assert result.iloc[0]["Directory"] == "RHEUMATOLOGY"
assert result.iloc[0]["Directory_Source"] == "CALCULATED_MOST_FREQ"
def test_level3_calculated_most_freq_with_multiple_records(self, test_paths: PathConfig):
"""Level 3: Most frequent valid directory wins when patient has multiple records."""
# Same UPID, same drug, different extracted directories
# ADALIMUMAB can be RHEUMATOLOGY, DERMATOLOGY, GASTROENTEROLOGY, OPHTHALMOLOGY
df = self.create_test_df(
upids=["RXA1001", "RXA1001", "RXA1001", "RXA1001", "RXA1001"],
drug_names=["ADALIMUMAB"] * 5,
additional_detail_1=[
"RHEUMATOLOGY",
"RHEUMATOLOGY",
"RHEUMATOLOGY",
"DERMATOLOGY",
"GASTROENTEROLOGY",
],
)
result = department_identification(df, paths=test_paths)
# RHEUMATOLOGY appears 3 times, should win
for _, row in result.iterrows():
assert row["Directory"] == "RHEUMATOLOGY"
assert row["Directory_Source"] == "CALCULATED_MOST_FREQ"
def test_level3_ignores_invalid_directories_in_frequency(self, test_paths: PathConfig):
"""Level 3: Invalid directories should be ignored in frequency calculation."""
# ETANERCEPT only valid for RHEUMATOLOGY and DERMATOLOGY
# Even if GASTROENTEROLOGY appears more often, it should be ignored
df = self.create_test_df(
upids=["RXA1001", "RXA1001", "RXA1001", "RXA1001"],
drug_names=["ETANERCEPT"] * 4,
additional_detail_1=[
"GASTROENTEROLOGY", # Invalid for ETANERCEPT
"GASTROENTEROLOGY", # Invalid for ETANERCEPT
"GASTROENTEROLOGY", # Invalid for ETANERCEPT
"RHEUMATOLOGY", # Valid
],
)
result = department_identification(df, paths=test_paths)
# RHEUMATOLOGY should win as it's the only valid directory
for _, row in result.iterrows():
assert row["Directory"] == "RHEUMATOLOGY"
def test_level4_upid_inference(self, test_paths: PathConfig):
"""Level 4: UPID inference when no valid directory found from extraction."""
# Same UPID, one drug has directory (RITUXIMAB → CLINICAL HAEMATOLOGY)
# Other drug (ADALIMUMAB) has no extractable directory
# Note: ADALIMUMAB cannot use CLINICAL HAEMATOLOGY as it's not valid for it
# So this tests the case where UPID_INFERENCE may not help if the inferred
# directory isn't valid for the drug
# Better test: Two different patients, one has known directory
# Actually, UPID_INFERENCE doesn't check validity - it just uses most frequent
df = pd.DataFrame({
"UPID": ["RXA1001", "RXA1001"],
"Drug Name": ["RITUXIMAB", "UNKNOWN_DRUG"], # UNKNOWN has no mapping
"Provider Code": ["RXA", "RXA"],
"PersonKey": [1001, 1001],
"Treatment Function Code": [np.nan, np.nan],
"Additional Detail 1": [np.nan, np.nan],
"Additional Description 1": [np.nan, np.nan],
"Additional Detail 2": [np.nan, np.nan],
"Additional Description 2": [np.nan, np.nan],
"Additional Detail 3": [np.nan, np.nan],
"Additional Description 3": [np.nan, np.nan],
"Additional Detail 4": [np.nan, np.nan],
"Additional Description 4": [np.nan, np.nan],
"Additional Detail 5": [np.nan, np.nan],
"Additional Description 5": [np.nan, np.nan],
"NCDR Treatment Function Name": [np.nan, np.nan],
"Treatment Function Desc": [np.nan, np.nan],
})
result = department_identification(df, paths=test_paths)
# RITUXIMAB gets CLINICAL HAEMATOLOGY (single valid dir)
rituximab = result[result["Drug Name"] == "RITUXIMAB"]
assert rituximab.iloc[0]["Directory"] == "CLINICAL HAEMATOLOGY"
assert rituximab.iloc[0]["Directory_Source"] == "SINGLE_VALID_DIR"
# UNKNOWN_DRUG should inherit CLINICAL HAEMATOLOGY via UPID_INFERENCE
unknown = result[result["Drug Name"] == "UNKNOWN_DRUG"]
assert unknown.iloc[0]["Directory"] == "CLINICAL HAEMATOLOGY"
assert unknown.iloc[0]["Directory_Source"] == "UPID_INFERENCE"
def test_level5_undefined_when_no_fallback_available(self, test_paths: PathConfig):
"""Level 5: UNDEFINED when all fallback levels fail."""
# Unknown drug, no additional detail, alone in UPID
df = self.create_test_df(
upids=["RXZ9999"], # Unique UPID with no other records
drug_names=["NONEXISTENT_DRUG"],
)
result = department_identification(df, paths=test_paths)
assert result.iloc[0]["Directory"] == "Undefined"
assert result.iloc[0]["Directory_Source"] == "UNDEFINED"
class TestDirectoryAssignmentTreatmentFunctionCode:
"""Tests for Treatment Function Code extraction in directory assignment."""
@staticmethod
def create_tfc_test_df(
upids: list,
drug_names: list,
treatment_codes: list,
) -> pd.DataFrame:
"""Create test DataFrame with Treatment Function Codes."""
n = len(upids)
return pd.DataFrame({
"UPID": upids,
"Drug Name": drug_names,
"Provider Code": ["RXA"] * n,
"PersonKey": list(range(1001, 1001 + n)),
"Treatment Function Code": treatment_codes,
"Additional Detail 1": [np.nan] * n,
"Additional Description 1": [np.nan] * n,
"Additional Detail 2": [np.nan] * n,
"Additional Description 2": [np.nan] * n,
"Additional Detail 3": [np.nan] * n,
"Additional Description 3": [np.nan] * n,
"Additional Detail 4": [np.nan] * n,
"Additional Description 4": [np.nan] * n,
"Additional Detail 5": [np.nan] * n,
"Additional Description 5": [np.nan] * n,
"NCDR Treatment Function Name": [np.nan] * n,
"Treatment Function Desc": [np.nan] * n,
})
def test_tfc_410_maps_to_rheumatology(self, test_paths: PathConfig):
"""Treatment Function Code 410 should map to RHEUMATOLOGY."""
df = self.create_tfc_test_df(
upids=["RXA1001"],
drug_names=["ADALIMUMAB"], # Valid for RHEUMATOLOGY
treatment_codes=[410],
)
result = department_identification(df, paths=test_paths)
assert result.iloc[0]["Directory"] == "RHEUMATOLOGY"
def test_tfc_330_maps_to_dermatology(self, test_paths: PathConfig):
"""Treatment Function Code 330 should map to DERMATOLOGY."""
df = self.create_tfc_test_df(
upids=["RXA1001"],
drug_names=["ADALIMUMAB"], # Valid for DERMATOLOGY
treatment_codes=[330],
)
result = department_identification(df, paths=test_paths)
assert result.iloc[0]["Directory"] == "DERMATOLOGY"
def test_tfc_invalid_code_ignored(self, test_paths: PathConfig):
"""Invalid Treatment Function Code should result in no extraction."""
df = self.create_tfc_test_df(
upids=["RXA1001"],
drug_names=["ADALIMUMAB"],
treatment_codes=[999], # Invalid code
)
result = department_identification(df, paths=test_paths)
# Should fall through to UNDEFINED since code doesn't map to valid directory
assert result.iloc[0]["Directory"] == "Undefined"
assert result.iloc[0]["Directory_Source"] == "UNDEFINED"
def test_tfc_with_nan_treated_as_zero(self, test_paths: PathConfig):
"""NaN Treatment Function Code should be treated as 0 (invalid)."""
df = self.create_tfc_test_df(
upids=["RXA1001"],
drug_names=["UNKNOWN_DRUG"],
treatment_codes=[np.nan],
)
result = department_identification(df, paths=test_paths)
# Should fall through to UNDEFINED
assert result.iloc[0]["Directory"] == "Undefined"
class TestDirectoryAssignmentMultiplePatients:
"""Tests for directory assignment with multiple patients."""
@staticmethod
def create_multi_patient_df(
data: list[tuple], # [(upid, drug, additional_detail)]
) -> pd.DataFrame:
"""Create test DataFrame for multiple patients."""
n = len(data)
return pd.DataFrame({
"UPID": [d[0] for d in data],
"Drug Name": [d[1] for d in data],
"Provider Code": ["RXA"] * n,
"PersonKey": list(range(1001, 1001 + n)),
"Treatment Function Code": [np.nan] * n,
"Additional Detail 1": [d[2] if len(d) > 2 else np.nan for d in data],
"Additional Description 1": [np.nan] * n,
"Additional Detail 2": [np.nan] * n,
"Additional Description 2": [np.nan] * n,
"Additional Detail 3": [np.nan] * n,
"Additional Description 3": [np.nan] * n,
"Additional Detail 4": [np.nan] * n,
"Additional Description 4": [np.nan] * n,
"Additional Detail 5": [np.nan] * n,
"Additional Description 5": [np.nan] * n,
"NCDR Treatment Function Name": [np.nan] * n,
"Treatment Function Desc": [np.nan] * n,
})
def test_different_patients_get_different_directories(self, test_paths: PathConfig):
"""Different patients should get directories based on their own data."""
data = [
("RXA1001", "ADALIMUMAB", "DERMATOLOGY"),
("RXA1002", "ADALIMUMAB", "RHEUMATOLOGY"),
]
df = self.create_multi_patient_df(data)
result = department_identification(df, paths=test_paths)
patient1 = result[result["UPID"] == "RXA1001"]
patient2 = result[result["UPID"] == "RXA1002"]
assert patient1.iloc[0]["Directory"] == "DERMATOLOGY"
assert patient2.iloc[0]["Directory"] == "RHEUMATOLOGY"
def test_upid_inference_does_not_cross_patients(self, test_paths: PathConfig):
"""UPID inference should not apply directories from other patients."""
data = [
("RXA1001", "RITUXIMAB", np.nan), # Gets CLINICAL HAEMATOLOGY (single dir)
("RXA1002", "UNKNOWN_DRUG", np.nan), # Should NOT inherit from RXA1001
]
df = self.create_multi_patient_df(data)
result = department_identification(df, paths=test_paths)
patient1 = result[result["UPID"] == "RXA1001"]
patient2 = result[result["UPID"] == "RXA1002"]
assert patient1.iloc[0]["Directory"] == "CLINICAL HAEMATOLOGY"
# Patient 2 should be UNDEFINED, not inherit from patient 1
assert patient2.iloc[0]["Directory"] == "Undefined"
assert patient2.iloc[0]["Directory_Source"] == "UNDEFINED"
def test_same_drug_different_patients_independent(self, test_paths: PathConfig):
"""Same drug for different patients should be processed independently."""
data = [
("RXA1001", "ETANERCEPT", "DERMATOLOGY"),
("RXA1001", "ETANERCEPT", "DERMATOLOGY"),
("RXA1002", "ETANERCEPT", "RHEUMATOLOGY"),
("RXA1002", "ETANERCEPT", "RHEUMATOLOGY"),
]
df = self.create_multi_patient_df(data)
result = department_identification(df, paths=test_paths)
patient1 = result[result["UPID"] == "RXA1001"]
patient2 = result[result["UPID"] == "RXA1002"]
# Each patient should get their most frequent directory
for _, row in patient1.iterrows():
assert row["Directory"] == "DERMATOLOGY"
for _, row in patient2.iterrows():
assert row["Directory"] == "RHEUMATOLOGY"
class TestDirectoryAssignmentExtractionPatterns:
"""Tests for directory extraction patterns from text fields."""
@staticmethod
def create_extraction_df(additional_detail: str, drug: str = "ADALIMUMAB") -> pd.DataFrame:
"""Create a minimal DataFrame for testing extraction patterns."""
return pd.DataFrame({
"UPID": ["RXA1001"],
"Drug Name": [drug],
"Provider Code": ["RXA"],
"PersonKey": [1001],
"Treatment Function Code": [np.nan],
"Additional Detail 1": [additional_detail],
"Additional Description 1": [np.nan],
"Additional Detail 2": [np.nan],
"Additional Description 2": [np.nan],
"Additional Detail 3": [np.nan],
"Additional Description 3": [np.nan],
"Additional Detail 4": [np.nan],
"Additional Description 4": [np.nan],
"Additional Detail 5": [np.nan],
"Additional Description 5": [np.nan],
"NCDR Treatment Function Name": [np.nan],
"Treatment Function Desc": [np.nan],
})
def test_extraction_case_insensitive(self, test_paths: PathConfig):
"""Directory extraction should be case insensitive."""
df = self.create_extraction_df("dermatology clinic")
result = department_identification(df, paths=test_paths)
assert result.iloc[0]["Directory"] == "DERMATOLOGY"
def test_extraction_with_surrounding_text(self, test_paths: PathConfig):
"""Directory should be extracted from surrounding text."""
df = self.create_extraction_df("Referral to RHEUMATOLOGY department for assessment")
result = department_identification(df, paths=test_paths)
assert result.iloc[0]["Directory"] == "RHEUMATOLOGY"
def test_extraction_word_boundary(self, test_paths: PathConfig):
"""Directory extraction should respect word boundaries."""
# Test that partial matches don't occur - "RHEUM" should not match "RHEUMATOLOGY"
# Using ADALIMUMAB which is valid for RHEUMATOLOGY
df = self.create_extraction_df("RHEUMATOLOGY clinic")
result = department_identification(df, paths=test_paths)
# RHEUMATOLOGY should be extracted correctly
assert result.iloc[0]["Directory"] == "RHEUMATOLOGY"
def test_extraction_multiple_directories_first_wins(self, test_paths: PathConfig):
"""When multiple directories present, first valid one should be used."""
# Note: The actual behavior depends on the regex - typically first match
df = self.create_extraction_df("RHEUMATOLOGY and DERMATOLOGY referral")
result = department_identification(df, paths=test_paths)
# First directory in the text should be extracted
assert result.iloc[0]["Directory"] in ["RHEUMATOLOGY", "DERMATOLOGY"]
def test_extraction_from_additional_description(self, test_paths: PathConfig):
"""Directory can be extracted from Additional Description columns too."""
df = pd.DataFrame({
"UPID": ["RXA1001"],
"Drug Name": ["ADALIMUMAB"],
"Provider Code": ["RXA"],
"PersonKey": [1001],
"Treatment Function Code": [np.nan],
"Additional Detail 1": [np.nan],
"Additional Description 1": ["GASTROENTEROLOGY ward"],
"Additional Detail 2": [np.nan],
"Additional Description 2": [np.nan],
"Additional Detail 3": [np.nan],
"Additional Description 3": [np.nan],
"Additional Detail 4": [np.nan],
"Additional Description 4": [np.nan],
"Additional Detail 5": [np.nan],
"Additional Description 5": [np.nan],
"NCDR Treatment Function Name": [np.nan],
"Treatment Function Desc": [np.nan],
})
result = department_identification(df, paths=test_paths)
# The function processes Additional Detail 1 first, then Description 1, etc.
# But the final Primary_Directory comes from Additional Detail 1 specifically
# So this test may not extract from Description 1 directly
# Let's verify the actual behavior
# In the code, additional_detail_columns includes both Detail and Description
# but Primary_Source comes specifically from Additional Detail 1
# The extraction happens on all columns but Primary_Source only from Detail 1
# So with Detail 1 as NaN, Primary_Source will be NaN
# This may result in UNDEFINED
assert result.iloc[0]["Directory"] in ["GASTROENTEROLOGY", "Undefined"]
+446
View File
@@ -0,0 +1,446 @@
"""
Large dataset performance tests for the Patient Pathway Analysis tool.
This module tests the system's ability to handle realistic workloads:
1. Full dataset analysis (all drugs, trusts, directories)
2. Memory usage under load
3. Scalability characteristics
Run with: python -m pytest tests/test_large_dataset_performance.py -v
"""
import gc
import time
import tracemalloc
from datetime import date
from pathlib import Path
import pytest
# Mark all tests in this module as large dataset tests
pytestmark = pytest.mark.largedata
class TestLargeDatasetPerformance:
"""Performance tests with full dataset."""
@pytest.fixture(autouse=True)
def setup_paths(self):
"""Set up paths and verify data exists."""
from core import default_paths
from data_processing import get_loader
# Check if database exists
db_path = default_paths.data_dir / "pathways.db"
if not db_path.exists():
pytest.skip("SQLite database not found")
self.paths = default_paths
self.loader = get_loader('sqlite')
# Load data once
result = self.loader.load()
if result is None or result.df is None or len(result.df) == 0:
pytest.skip("No data available in database")
self.df = result.df
self.row_count = result.row_count
def test_data_load_time_acceptable(self):
"""Data loading should complete in under 5 seconds."""
from data_processing import get_loader
gc.collect()
start = time.perf_counter()
loader = get_loader('sqlite')
result = loader.load()
elapsed = time.perf_counter() - start
assert result is not None, "Data loading failed"
assert result.row_count > 0, "No data loaded"
# Allow 5 seconds for data loading
assert elapsed < 5.0, f"Data loading took {elapsed:.2f}s (target: <5s)"
def test_analysis_pipeline_completes(self):
"""Full analysis pipeline should complete without error."""
from analysis.pathway_analyzer import generate_icicle_chart
import pandas as pd
# Get available filters from actual data
trusts = self.df['Provider Code'].unique().tolist()[:20]
drugs = self.df['Drug Name'].dropna().unique().tolist()[:10]
directories = self.df['Directory'].dropna().unique().tolist()
# Load org codes for trust name mapping
org_codes = pd.read_csv(self.paths.org_codes_csv, index_col=1)
trust_names = []
for t in trusts:
if t in org_codes.index:
trust_names.append(org_codes.loc[t, 'Name'])
if not trust_names:
trust_names = org_codes['Name'].tolist()[:20]
# Run analysis with reasonable filter
ice_df, title = generate_icicle_chart(
df=self.df,
start_date="2020-01-01",
end_date="2025-01-01",
last_seen_date="2020-01-01",
trust_filter=trust_names,
drug_filter=drugs,
directory_filter=directories,
minimum_num_patients=1,
title="Large Dataset Test",
paths=self.paths,
)
# Should produce some results
assert ice_df is not None, "Analysis produced no results"
assert len(ice_df) > 0, "Analysis produced empty results"
def test_analysis_pipeline_time_acceptable(self):
"""Analysis pipeline should complete in under 60 seconds."""
from analysis.pathway_analyzer import generate_icicle_chart
import pandas as pd
# Get available filters from actual data
trusts = self.df['Provider Code'].unique().tolist()[:20]
drugs = self.df['Drug Name'].dropna().unique().tolist()[:10]
directories = self.df['Directory'].dropna().unique().tolist()
# Load org codes for trust name mapping
org_codes = pd.read_csv(self.paths.org_codes_csv, index_col=1)
trust_names = []
for t in trusts:
if t in org_codes.index:
trust_names.append(org_codes.loc[t, 'Name'])
if not trust_names:
trust_names = org_codes['Name'].tolist()[:20]
gc.collect()
start = time.perf_counter()
ice_df, title = generate_icicle_chart(
df=self.df,
start_date="2020-01-01",
end_date="2025-01-01",
last_seen_date="2020-01-01",
trust_filter=trust_names,
drug_filter=drugs,
directory_filter=directories,
minimum_num_patients=1,
title="Performance Test",
paths=self.paths,
)
elapsed = time.perf_counter() - start
# Allow 60 seconds for full analysis (observed ~19s with 440K rows)
assert elapsed < 60.0, f"Analysis took {elapsed:.2f}s (target: <60s)"
print(f"\n Analysis completed in {elapsed:.2f}s with {len(ice_df) if ice_df is not None else 0} result rows")
def test_memory_usage_acceptable(self):
"""Memory usage should not exceed 500MB during analysis."""
from analysis.pathway_analyzer import generate_icicle_chart
import pandas as pd
# Get available filters from actual data
trusts = self.df['Provider Code'].unique().tolist()[:15]
drugs = self.df['Drug Name'].dropna().unique().tolist()[:5]
directories = self.df['Directory'].dropna().unique().tolist()
# Load org codes for trust name mapping
org_codes = pd.read_csv(self.paths.org_codes_csv, index_col=1)
trust_names = []
for t in trusts:
if t in org_codes.index:
trust_names.append(org_codes.loc[t, 'Name'])
if not trust_names:
trust_names = org_codes['Name'].tolist()[:15]
gc.collect()
tracemalloc.start()
ice_df, title = generate_icicle_chart(
df=self.df,
start_date="2020-01-01",
end_date="2025-01-01",
last_seen_date="2020-01-01",
trust_filter=trust_names,
drug_filter=drugs,
directory_filter=directories,
minimum_num_patients=1,
title="Memory Test",
paths=self.paths,
)
current, peak = tracemalloc.get_traced_memory()
tracemalloc.stop()
peak_mb = peak / 1024 / 1024
# Allow 500MB peak memory
assert peak_mb < 500, f"Peak memory {peak_mb:.1f}MB exceeds 500MB limit"
print(f"\n Peak memory usage: {peak_mb:.1f}MB")
def test_figure_creation_scales(self):
"""Figure creation time should scale linearly with result size."""
from visualization.plotly_generator import create_icicle_figure
import pandas as pd
import numpy as np
# Test with different sizes
sizes = [100, 500, 1000, 2000]
times = []
for n_rows in sizes:
sample_data = {
'parents': ['N&WICS'] * n_rows,
'ids': [f'N&WICS - Test{i}' for i in range(n_rows)],
'labels': [f'Test{i}' for i in range(n_rows)],
'value': np.random.randint(1, 100, n_rows),
'colour': np.random.random(n_rows),
'cost': np.random.randint(1000, 100000, n_rows),
'costpp': np.random.randint(100, 10000, n_rows),
'cost_pp_pa': [str(np.random.randint(100, 10000)) for _ in range(n_rows)],
'First seen': pd.to_datetime(['2024-01-01'] * n_rows),
'Last seen': pd.to_datetime(['2024-12-31'] * n_rows),
'First seen (Parent)': ['2024-01-01'] * n_rows,
'Last seen (Parent)': ['2024-12-31'] * n_rows,
'average_spacing': ['Test spacing'] * n_rows,
'avg_days': pd.to_timedelta([100] * n_rows, unit='D'),
}
sample_df = pd.DataFrame(sample_data)
gc.collect()
start = time.perf_counter()
fig = create_icicle_figure(sample_df, f"Scale Test {n_rows}")
elapsed = time.perf_counter() - start
times.append(elapsed)
# Check that time scaling is roughly linear (not exponential)
# If time doubles when size doubles, it's linear
# We allow some variance, so check that 10x data doesn't take more than 20x time
time_ratio = times[-1] / times[0]
size_ratio = sizes[-1] / sizes[0]
# Allow 3x the expected linear scaling
max_allowed_ratio = size_ratio * 3
assert time_ratio < max_allowed_ratio, (
f"Figure creation doesn't scale well: "
f"{sizes[-1]} rows took {times[-1]:.3f}s vs {sizes[0]} rows at {times[0]:.3f}s "
f"(ratio {time_ratio:.1f}x, expected <{max_allowed_ratio:.1f}x)"
)
print(f"\n Figure scaling: {sizes[0]} rows: {times[0]*1000:.1f}ms, "
f"{sizes[-1]} rows: {times[-1]*1000:.1f}ms (ratio: {time_ratio:.1f}x)")
class TestDataVolumeStress:
"""Stress tests to verify system handles various data volumes."""
@pytest.fixture(autouse=True)
def setup_paths(self):
"""Set up paths and verify data exists."""
from core import default_paths
from data_processing import get_loader
# Check if database exists
db_path = default_paths.data_dir / "pathways.db"
if not db_path.exists():
pytest.skip("SQLite database not found")
self.paths = default_paths
self.loader = get_loader('sqlite')
# Load data once
result = self.loader.load()
if result is None or result.df is None or len(result.df) == 0:
pytest.skip("No data available in database")
self.df = result.df
def test_handles_all_drugs(self):
"""Analysis can handle filtering by all drugs."""
from analysis.pathway_analyzer import prepare_data
import pandas as pd
all_drugs = self.df['Drug Name'].dropna().unique().tolist()
# Load org codes
org_codes = pd.read_csv(self.paths.org_codes_csv, index_col=1)
trust_names = org_codes['Name'].tolist()[:5]
result = prepare_data(
df=self.df,
trust_filter=trust_names,
drug_filter=all_drugs,
directory_filter=self.df['Directory'].dropna().unique().tolist(),
paths=self.paths,
)
# Should complete without error (returns tuple)
assert result is not None
assert len(result) == 3 # (df, org_codes, directory_df)
def test_handles_all_trusts(self):
"""Analysis can handle filtering by all trusts."""
from analysis.pathway_analyzer import prepare_data
import pandas as pd
# Load org codes
org_codes = pd.read_csv(self.paths.org_codes_csv, index_col=1)
all_trust_names = org_codes['Name'].tolist()
result = prepare_data(
df=self.df,
trust_filter=all_trust_names,
drug_filter=['ADALIMUMAB', 'ETANERCEPT'],
directory_filter=self.df['Directory'].dropna().unique().tolist(),
paths=self.paths,
)
# Should complete without error (returns tuple)
assert result is not None
assert len(result) == 3 # (df, org_codes, directory_df)
def test_handles_wide_date_range(self):
"""Analysis can handle a wide date range via generate_icicle_chart."""
from analysis.pathway_analyzer import generate_icicle_chart
import pandas as pd
# Load org codes
org_codes = pd.read_csv(self.paths.org_codes_csv, index_col=1)
trust_names = org_codes['Name'].tolist()[:10]
# Use very wide date range via full pipeline
ice_df, title = generate_icicle_chart(
df=self.df,
start_date="2010-01-01",
end_date="2030-01-01",
last_seen_date="2010-01-01",
trust_filter=trust_names,
drug_filter=self.df['Drug Name'].dropna().unique().tolist()[:5],
directory_filter=self.df['Directory'].dropna().unique().tolist(),
minimum_num_patients=1,
title="Wide Date Range Test",
paths=self.paths,
)
# Should complete without error
assert ice_df is not None or ice_df is None # Just verifying no exception
def test_handles_minimum_patient_threshold(self):
"""Analysis correctly applies minimum patient threshold."""
from analysis.pathway_analyzer import generate_icicle_chart
import pandas as pd
# Load org codes
org_codes = pd.read_csv(self.paths.org_codes_csv, index_col=1)
trust_names = org_codes['Name'].tolist()[:10]
# Run with minimum 50 patients
ice_df_50, _ = generate_icicle_chart(
df=self.df,
start_date="2020-01-01",
end_date="2025-01-01",
last_seen_date="2020-01-01",
trust_filter=trust_names,
drug_filter=self.df['Drug Name'].dropna().unique().tolist()[:5],
directory_filter=self.df['Directory'].dropna().unique().tolist(),
minimum_num_patients=50,
title="Threshold Test 50",
paths=self.paths,
)
# Run with minimum 1 patient
ice_df_1, _ = generate_icicle_chart(
df=self.df,
start_date="2020-01-01",
end_date="2025-01-01",
last_seen_date="2020-01-01",
trust_filter=trust_names,
drug_filter=self.df['Drug Name'].dropna().unique().tolist()[:5],
directory_filter=self.df['Directory'].dropna().unique().tolist(),
minimum_num_patients=1,
title="Threshold Test 1",
paths=self.paths,
)
# Higher threshold should produce fewer or equal results
len_50 = len(ice_df_50) if ice_df_50 is not None else 0
len_1 = len(ice_df_1) if ice_df_1 is not None else 0
assert len_50 <= len_1, (
f"Higher minimum threshold should produce fewer results: "
f"min=50 gave {len_50} rows, min=1 gave {len_1} rows"
)
class TestConcurrentOperations:
"""Tests for handling multiple operations."""
@pytest.fixture(autouse=True)
def setup_paths(self):
"""Set up paths and verify data exists."""
from core import default_paths
from data_processing import get_loader
# Check if database exists
db_path = default_paths.data_dir / "pathways.db"
if not db_path.exists():
pytest.skip("SQLite database not found")
self.paths = default_paths
def test_multiple_data_loads(self):
"""Multiple data loads should not cause issues."""
from data_processing import get_loader
results = []
for i in range(3):
loader = get_loader('sqlite')
result = loader.load()
if result is not None:
results.append(result.row_count)
# All loads should return same row count
assert len(set(results)) == 1, f"Inconsistent row counts: {results}"
def test_sequential_analyses(self):
"""Multiple sequential analyses should complete."""
from analysis.pathway_analyzer import generate_icicle_chart
from data_processing import get_loader
import pandas as pd
# Load data
loader = get_loader('sqlite')
result = loader.load()
if result is None or result.df is None:
pytest.skip("No data available")
df = result.df
# Load org codes
org_codes = pd.read_csv(self.paths.org_codes_csv, index_col=1)
trust_names = org_codes['Name'].tolist()[:5]
# Run multiple analyses
for i in range(3):
ice_df, title = generate_icicle_chart(
df=df,
start_date="2020-01-01",
end_date="2025-01-01",
last_seen_date="2020-01-01",
trust_filter=trust_names,
drug_filter=['ADALIMUMAB'],
directory_filter=df['Directory'].dropna().unique().tolist(),
minimum_num_patients=1,
title=f"Sequential Test {i+1}",
paths=self.paths,
)
# Each should complete
assert ice_df is not None or ice_df is None # Just check no error
+373
View File
@@ -0,0 +1,373 @@
"""
Tests for core/models.py - AnalysisFilters dataclass.
Tests cover:
- Basic instantiation
- validate() method for filter validation
- Property accessors (has_trust_filter, etc.)
- title property (custom vs auto-generated)
- summary() method
"""
from datetime import date
from pathlib import Path
import pytest
from core.models import AnalysisFilters
class TestAnalysisFiltersBasic:
"""Test basic AnalysisFilters instantiation and access."""
def test_create_with_required_dates(self, sample_date_range):
"""Should be able to create AnalysisFilters with just dates."""
start, end, last_seen = sample_date_range
filters = AnalysisFilters(
start_date=start,
end_date=end,
last_seen_date=last_seen,
)
assert filters.start_date == start
assert filters.end_date == end
assert filters.last_seen_date == last_seen
def test_default_lists_are_empty(self, sample_date_range):
"""Default filter lists should be empty."""
start, end, last_seen = sample_date_range
filters = AnalysisFilters(
start_date=start,
end_date=end,
last_seen_date=last_seen,
)
assert filters.trusts == []
assert filters.drugs == []
assert filters.directories == []
def test_default_minimum_patients_is_zero(self, sample_date_range):
"""Default minimum_patients should be 0."""
start, end, last_seen = sample_date_range
filters = AnalysisFilters(
start_date=start,
end_date=end,
last_seen_date=last_seen,
)
assert filters.minimum_patients == 0
def test_default_custom_title_is_empty(self, sample_date_range):
"""Default custom_title should be empty string."""
start, end, last_seen = sample_date_range
filters = AnalysisFilters(
start_date=start,
end_date=end,
last_seen_date=last_seen,
)
assert filters.custom_title == ""
class TestAnalysisFiltersValidate:
"""Test validate() method."""
def test_validate_passes_valid_config(self, sample_date_range):
"""validate() should return empty list for valid configuration."""
start, end, last_seen = sample_date_range
filters = AnalysisFilters(
start_date=start,
end_date=end,
last_seen_date=last_seen,
)
errors = filters.validate()
assert errors == []
def test_validate_fails_when_end_before_start(self):
"""validate() should fail when end_date is before start_date."""
filters = AnalysisFilters(
start_date=date(2024, 12, 31), # Later
end_date=date(2024, 1, 1), # Earlier
last_seen_date=date(2024, 6, 1),
)
errors = filters.validate()
assert len(errors) >= 1
assert any("cannot be before start date" in e for e in errors)
def test_validate_fails_when_last_seen_after_end(self):
"""validate() should fail when last_seen_date is after end_date."""
filters = AnalysisFilters(
start_date=date(2024, 1, 1),
end_date=date(2024, 6, 1),
last_seen_date=date(2024, 12, 31), # After end_date
)
errors = filters.validate()
assert len(errors) >= 1
assert any("would exclude all patients" in e for e in errors)
def test_validate_fails_when_minimum_patients_negative(self, sample_date_range):
"""validate() should fail when minimum_patients is negative."""
start, end, last_seen = sample_date_range
filters = AnalysisFilters(
start_date=start,
end_date=end,
last_seen_date=last_seen,
minimum_patients=-1,
)
errors = filters.validate()
assert len(errors) >= 1
assert any("cannot be negative" in e for e in errors)
def test_validate_fails_when_output_dir_missing(self, sample_date_range, temp_dir: Path):
"""validate() should fail when output_dir doesn't exist."""
start, end, last_seen = sample_date_range
nonexistent_dir = temp_dir / "nonexistent"
filters = AnalysisFilters(
start_date=start,
end_date=end,
last_seen_date=last_seen,
output_dir=nonexistent_dir,
)
errors = filters.validate()
assert len(errors) >= 1
assert any("does not exist" in e for e in errors)
def test_validate_passes_when_output_dir_exists(self, sample_date_range, temp_dir: Path):
"""validate() should pass when output_dir exists."""
start, end, last_seen = sample_date_range
output_dir = temp_dir / "output"
output_dir.mkdir()
filters = AnalysisFilters(
start_date=start,
end_date=end,
last_seen_date=last_seen,
output_dir=output_dir,
)
errors = filters.validate()
assert errors == []
def test_validate_multiple_errors(self):
"""validate() should report all errors, not just the first."""
filters = AnalysisFilters(
start_date=date(2024, 12, 31), # End before start
end_date=date(2024, 1, 1),
last_seen_date=date(2024, 6, 1),
minimum_patients=-5, # Negative
)
errors = filters.validate()
assert len(errors) >= 2
class TestAnalysisFiltersHasFilters:
"""Test has_*_filter properties."""
def test_has_trust_filter_false_when_empty(self, sample_date_range):
"""has_trust_filter should be False when trusts list is empty."""
start, end, last_seen = sample_date_range
filters = AnalysisFilters(
start_date=start,
end_date=end,
last_seen_date=last_seen,
)
assert filters.has_trust_filter is False
def test_has_trust_filter_true_when_populated(self, sample_date_range, sample_trusts):
"""has_trust_filter should be True when trusts list has items."""
start, end, last_seen = sample_date_range
filters = AnalysisFilters(
start_date=start,
end_date=end,
last_seen_date=last_seen,
trusts=sample_trusts,
)
assert filters.has_trust_filter is True
def test_has_drug_filter_false_when_empty(self, sample_date_range):
"""has_drug_filter should be False when drugs list is empty."""
start, end, last_seen = sample_date_range
filters = AnalysisFilters(
start_date=start,
end_date=end,
last_seen_date=last_seen,
)
assert filters.has_drug_filter is False
def test_has_drug_filter_true_when_populated(self, sample_date_range, sample_drugs):
"""has_drug_filter should be True when drugs list has items."""
start, end, last_seen = sample_date_range
filters = AnalysisFilters(
start_date=start,
end_date=end,
last_seen_date=last_seen,
drugs=sample_drugs,
)
assert filters.has_drug_filter is True
def test_has_directory_filter_false_when_empty(self, sample_date_range):
"""has_directory_filter should be False when directories list is empty."""
start, end, last_seen = sample_date_range
filters = AnalysisFilters(
start_date=start,
end_date=end,
last_seen_date=last_seen,
)
assert filters.has_directory_filter is False
def test_has_directory_filter_true_when_populated(self, sample_date_range, sample_directories):
"""has_directory_filter should be True when directories list has items."""
start, end, last_seen = sample_date_range
filters = AnalysisFilters(
start_date=start,
end_date=end,
last_seen_date=last_seen,
directories=sample_directories,
)
assert filters.has_directory_filter is True
class TestAnalysisFiltersTitle:
"""Test title property."""
def test_title_returns_custom_when_set(self, sample_date_range):
"""title should return custom_title when set."""
start, end, last_seen = sample_date_range
filters = AnalysisFilters(
start_date=start,
end_date=end,
last_seen_date=last_seen,
custom_title="My Custom Analysis",
)
assert filters.title == "My Custom Analysis"
def test_title_auto_generates_when_not_set(self, sample_date_range):
"""title should auto-generate from dates when custom_title is empty."""
start, end, last_seen = sample_date_range
filters = AnalysisFilters(
start_date=start,
end_date=end,
last_seen_date=last_seen,
)
assert "2024-01-01" in filters.title
assert "2024-12-31" in filters.title
def test_title_auto_generated_includes_dates(self):
"""Auto-generated title should include start and end dates."""
filters = AnalysisFilters(
start_date=date(2023, 6, 15),
end_date=date(2024, 3, 20),
last_seen_date=date(2024, 1, 1),
)
assert "2023-06-15" in filters.title
assert "2024-03-20" in filters.title
class TestAnalysisFiltersSummary:
"""Test summary() method."""
def test_summary_returns_string(self, sample_date_range):
"""summary() should return a string."""
start, end, last_seen = sample_date_range
filters = AnalysisFilters(
start_date=start,
end_date=end,
last_seen_date=last_seen,
)
summary = filters.summary()
assert isinstance(summary, str)
def test_summary_includes_date_range(self, sample_date_range):
"""summary() should include date range information."""
start, end, last_seen = sample_date_range
filters = AnalysisFilters(
start_date=start,
end_date=end,
last_seen_date=last_seen,
)
summary = filters.summary()
assert "Date range" in summary
assert "2024-01-01" in summary or str(start) in summary
def test_summary_includes_minimum_patients(self, sample_date_range):
"""summary() should include minimum patients value."""
start, end, last_seen = sample_date_range
filters = AnalysisFilters(
start_date=start,
end_date=end,
last_seen_date=last_seen,
minimum_patients=10,
)
summary = filters.summary()
assert "Minimum patients" in summary
assert "10" in summary
def test_summary_shows_all_when_no_filters(self, sample_date_range):
"""summary() should show 'All' when filter lists are empty."""
start, end, last_seen = sample_date_range
filters = AnalysisFilters(
start_date=start,
end_date=end,
last_seen_date=last_seen,
)
summary = filters.summary()
assert "Trusts: All" in summary
assert "Drugs: All" in summary
assert "Directories: All" in summary
def test_summary_shows_count_when_filters_set(
self, sample_date_range, sample_trusts, sample_drugs, sample_directories
):
"""summary() should show count when filter lists are populated."""
start, end, last_seen = sample_date_range
filters = AnalysisFilters(
start_date=start,
end_date=end,
last_seen_date=last_seen,
trusts=sample_trusts,
drugs=sample_drugs,
directories=sample_directories,
)
summary = filters.summary()
assert "3 selected" in summary # trusts count
assert "4 selected" in summary # drugs count
def test_summary_includes_custom_title_when_set(self, sample_date_range):
"""summary() should include custom title when set."""
start, end, last_seen = sample_date_range
filters = AnalysisFilters(
start_date=start,
end_date=end,
last_seen_date=last_seen,
custom_title="Special Analysis",
)
summary = filters.summary()
assert "Custom title" in summary
assert "Special Analysis" in summary
+351
View File
@@ -0,0 +1,351 @@
"""
Test to verify that the refactored analysis pipeline produces matching output.
This test compares the output of the refactored generate_icicle_chart() function
from analysis/pathway_analyzer.py with expected output characteristics.
Since the original generate_graph() function calls figure() directly without
returning data, we verify the refactored pipeline by:
1. Running the pipeline with known test data
2. Verifying the output DataFrame has correct structure
3. Verifying statistical calculations are reasonable
"""
import pytest
import pandas as pd
import numpy as np
from datetime import datetime
from pathlib import Path
# Skip if we can't import the modules
try:
from analysis.pathway_analyzer import (
generate_icicle_chart,
prepare_data,
calculate_statistics,
build_hierarchy,
prepare_chart_data,
)
from core import default_paths
HAS_MODULES = True
except ImportError:
HAS_MODULES = False
# Standard test filters (matching sample data)
TEST_TRUST_FILTER = [
'MANCHESTER UNIVERSITY NHS FOUNDATION TRUST', # R0A code
'BARTS HEALTH NHS TRUST', # R1H code
]
TEST_DRUG_FILTER = ['ADALIMUMAB', 'ETANERCEPT', 'INFLIXIMAB']
TEST_DIRECTORY_FILTER = ['Rheumatology', 'Dermatology', 'Gastroenterology']
@pytest.fixture
def sample_intervention_data():
"""
Create sample intervention data similar to what comes from the data loader.
The data mimics the structure expected by generate_icicle_chart():
- UPID: Unique patient identifier (Provider Code prefix + PersonKey)
- Drug Name: Standardized drug name
- Directory: Medical specialty
- Intervention Date: Date of treatment
- Price Actual: Cost of treatment
- Provider Code: NHS Trust code (will be mapped to name via org_codes.csv)
Uses real trust codes from org_codes.csv:
- R0A = MANCHESTER UNIVERSITY NHS FOUNDATION TRUST
- R1H = BARTS HEALTH NHS TRUST
"""
# Create data for a small number of patients with varied pathways
data = {
'UPID': [
# Patient 1: Trust1 (R0A), Rheumatology, Adalimumab only (5 treatments)
'R0A12345', 'R0A12345', 'R0A12345', 'R0A12345', 'R0A12345',
# Patient 2: Trust1 (R0A), Rheumatology, Adalimumab then Etanercept (4 treatments)
'R0A67890', 'R0A67890', 'R0A67890', 'R0A67890',
# Patient 3: Trust1 (R0A), Dermatology, Adalimumab only (3 treatments)
'R0A11111', 'R0A11111', 'R0A11111',
# Patient 4: Trust2 (R1H), Rheumatology, Etanercept only (6 treatments)
'R1H22222', 'R1H22222', 'R1H22222', 'R1H22222', 'R1H22222', 'R1H22222',
# Patient 5: Trust2 (R1H), Gastro, Infliximab only (4 treatments)
'R1H33333', 'R1H33333', 'R1H33333', 'R1H33333',
],
'Drug Name': [
'ADALIMUMAB', 'ADALIMUMAB', 'ADALIMUMAB', 'ADALIMUMAB', 'ADALIMUMAB',
'ADALIMUMAB', 'ADALIMUMAB', 'ETANERCEPT', 'ETANERCEPT',
'ADALIMUMAB', 'ADALIMUMAB', 'ADALIMUMAB',
'ETANERCEPT', 'ETANERCEPT', 'ETANERCEPT', 'ETANERCEPT', 'ETANERCEPT', 'ETANERCEPT',
'INFLIXIMAB', 'INFLIXIMAB', 'INFLIXIMAB', 'INFLIXIMAB',
],
'Directory': [
'Rheumatology', 'Rheumatology', 'Rheumatology', 'Rheumatology', 'Rheumatology',
'Rheumatology', 'Rheumatology', 'Rheumatology', 'Rheumatology',
'Dermatology', 'Dermatology', 'Dermatology',
'Rheumatology', 'Rheumatology', 'Rheumatology', 'Rheumatology', 'Rheumatology', 'Rheumatology',
'Gastroenterology', 'Gastroenterology', 'Gastroenterology', 'Gastroenterology',
],
'Intervention Date': [
# Patient 1 dates (every 2 weeks)
datetime(2023, 1, 1), datetime(2023, 1, 15), datetime(2023, 1, 29), datetime(2023, 2, 12), datetime(2023, 2, 26),
# Patient 2 dates (switch after 2 months)
datetime(2023, 1, 5), datetime(2023, 2, 5), datetime(2023, 3, 5), datetime(2023, 4, 5),
# Patient 3 dates
datetime(2023, 2, 1), datetime(2023, 3, 1), datetime(2023, 4, 1),
# Patient 4 dates (weekly for 6 weeks)
datetime(2023, 1, 1), datetime(2023, 1, 8), datetime(2023, 1, 15), datetime(2023, 1, 22), datetime(2023, 1, 29), datetime(2023, 2, 5),
# Patient 5 dates (every 4 weeks)
datetime(2023, 1, 10), datetime(2023, 2, 7), datetime(2023, 3, 7), datetime(2023, 4, 4),
],
'Price Actual': [
# Patient 1 costs
500.0, 500.0, 500.0, 500.0, 500.0,
# Patient 2 costs
500.0, 500.0, 600.0, 600.0,
# Patient 3 costs
500.0, 500.0, 500.0,
# Patient 4 costs
400.0, 400.0, 400.0, 400.0, 400.0, 400.0,
# Patient 5 costs
800.0, 800.0, 800.0, 800.0,
],
'Provider Code': [
# Trust codes (R0A = Manchester, R1H = Barts)
'R0A', 'R0A', 'R0A', 'R0A', 'R0A',
'R0A', 'R0A', 'R0A', 'R0A',
'R0A', 'R0A', 'R0A',
'R1H', 'R1H', 'R1H', 'R1H', 'R1H', 'R1H',
'R1H', 'R1H', 'R1H', 'R1H',
],
}
return pd.DataFrame(data)
@pytest.mark.skipif(not HAS_MODULES, reason="Required modules not available")
class TestOutputStructure:
"""Test that the refactored pipeline produces correct output structure."""
def test_ice_df_has_required_columns(self, sample_intervention_data):
"""Verify ice_df has all required columns for Plotly icicle chart."""
if default_paths.validate(): # Non-empty list means errors
pytest.skip("Reference data files not available")
df = sample_intervention_data.copy()
ice_df, title = generate_icicle_chart(
df=df,
start_date='2022-01-01',
end_date='2024-01-01',
last_seen_date='2022-06-01',
trust_filter=TEST_TRUST_FILTER,
drug_filter=TEST_DRUG_FILTER,
directory_filter=TEST_DIRECTORY_FILTER,
minimum_num_patients=1,
title="Test Output",
paths=default_paths,
)
if ice_df is None:
pytest.skip("No data matched filters (trust code mapping may not match)")
# Required columns for Plotly icicle chart
required_columns = ['parents', 'labels', 'ids', 'value', 'cost']
for col in required_columns:
assert col in ice_df.columns, f"Missing required column: {col}"
def test_ice_df_hierarchy_structure(self, sample_intervention_data):
"""Verify the ice_df hierarchy is valid (parents reference existing ids)."""
if default_paths.validate(): # Non-empty list means errors
pytest.skip("Reference data files not available")
df = sample_intervention_data.copy()
ice_df, title = generate_icicle_chart(
df=df,
start_date='2022-01-01',
end_date='2024-01-01',
last_seen_date='2022-06-01',
trust_filter=TEST_TRUST_FILTER,
drug_filter=TEST_DRUG_FILTER,
directory_filter=TEST_DIRECTORY_FILTER,
minimum_num_patients=1,
title="Test Output",
)
if ice_df is None:
pytest.skip("No data matched filters")
# Every parent should be in ids (except root which has empty parent)
ids_set = set(ice_df['ids'].unique())
for parent in ice_df['parents'].unique():
if parent != '': # Root has empty parent
assert parent in ids_set, f"Parent '{parent}' not found in ids"
def test_values_sum_correctly(self, sample_intervention_data):
"""Verify that child values sum to parent values (with branchvalues='total')."""
if default_paths.validate(): # Non-empty list means errors
pytest.skip("Reference data files not available")
df = sample_intervention_data.copy()
ice_df, title = generate_icicle_chart(
df=df,
start_date='2022-01-01',
end_date='2024-01-01',
last_seen_date='2022-06-01',
trust_filter=TEST_TRUST_FILTER,
drug_filter=TEST_DRUG_FILTER,
directory_filter=TEST_DIRECTORY_FILTER,
minimum_num_patients=1,
title="Test Output",
)
if ice_df is None:
pytest.skip("No data matched filters")
# Verify the structure is valid:
# - Root (N&WICS) should have the highest value
# - All child values should sum to at most their parent value
root_row = ice_df[ice_df['ids'] == 'N&WICS']
if len(root_row) > 0:
root_value = root_row['value'].iloc[0]
assert root_value > 0, "Root should have positive value"
# Check that children sum to parent value for nodes at same level
# Note: The icicle chart uses branchvalues='total' so children should sum to parent
# However, at pathway level, patients may appear in multiple pathway branches
for parent_id in ice_df['ids'].unique():
parent_row = ice_df[ice_df['ids'] == parent_id]
if len(parent_row) == 0:
continue
parent_value = parent_row['value'].iloc[0]
children = ice_df[ice_df['parents'] == parent_id]
if len(children) > 0:
children_sum = children['value'].sum()
# Children should sum to parent value in a properly constructed icicle chart
# Allow for small differences due to filtering at minimum_num_patients
assert children_sum <= parent_value, \
f"Children of '{parent_id}' sum to {children_sum}, exceeds parent {parent_value}"
@pytest.mark.skipif(not HAS_MODULES, reason="Required modules not available")
class TestPrepareData:
"""Test the prepare_data() function independently."""
def test_prepare_data_filters_correctly(self, sample_intervention_data):
"""Verify prepare_data applies filters correctly."""
if default_paths.validate(): # Non-empty list means errors
pytest.skip("Reference data files not available")
df = sample_intervention_data.copy()
# Filter to single drug
result = prepare_data(
df,
TEST_TRUST_FILTER,
['ADALIMUMAB'], # Only Adalimumab
TEST_DIRECTORY_FILTER
)
if result[0] is None:
pytest.skip("No data matched filters")
filtered_df, org_codes, directory_df = result
# Should only have Adalimumab rows
assert set(filtered_df['Drug Name'].unique()) == {'ADALIMUMAB'}
def test_prepare_data_creates_upid_treatment(self, sample_intervention_data):
"""Verify prepare_data creates UPIDTreatment column."""
if default_paths.validate(): # Non-empty list means errors
pytest.skip("Reference data files not available")
df = sample_intervention_data.copy()
result = prepare_data(
df,
TEST_TRUST_FILTER,
TEST_DRUG_FILTER,
TEST_DIRECTORY_FILTER
)
if result[0] is None:
pytest.skip("No data matched filters")
filtered_df, org_codes, directory_df = result
# UPIDTreatment should be UPID + Drug Name
assert 'UPIDTreatment' in filtered_df.columns
# Check first row
first_row = filtered_df.iloc[0]
expected = first_row['UPID'] + first_row['Drug Name']
assert first_row['UPIDTreatment'] == expected
@pytest.mark.skipif(not HAS_MODULES, reason="Required modules not available")
class TestCalculateStatistics:
"""Test the calculate_statistics() function independently."""
def test_date_filtering(self, sample_intervention_data):
"""Verify date filtering in calculate_statistics."""
if default_paths.validate(): # Non-empty list means errors
pytest.skip("Reference data files not available")
df = sample_intervention_data.copy()
df['UPIDTreatment'] = df['UPID'] + df['Drug Name']
# These dates should include all our sample data
start_date = '2022-01-01'
end_date = '2024-01-01'
last_seen_date = '2022-06-01'
result = calculate_statistics(df, start_date, end_date, last_seen_date, "Test")
if result[0] is None:
pytest.skip("No data matched date filters")
patient_info, date_df, title = result
# Should have patient info DataFrame
assert patient_info is not None
assert len(patient_info) > 0
@pytest.mark.skipif(not HAS_MODULES, reason="Required modules not available")
class TestMinimumPatientFilter:
"""Test that minimum_num_patients filter works correctly."""
def test_filters_small_pathways(self, sample_intervention_data):
"""Verify pathways with fewer patients than threshold are excluded."""
if default_paths.validate(): # Non-empty list means errors
pytest.skip("Reference data files not available")
df = sample_intervention_data.copy()
# With minimum 10, nothing should pass (we only have 5 patients)
ice_df, title = generate_icicle_chart(
df=df,
start_date='2022-01-01',
end_date='2024-01-01',
last_seen_date='2022-06-01',
trust_filter=TEST_TRUST_FILTER,
drug_filter=TEST_DRUG_FILTER,
directory_filter=TEST_DIRECTORY_FILTER,
minimum_num_patients=10, # Higher than our patient count
title="Test Output",
)
# Either None or empty DataFrame
if ice_df is not None:
# If filtered, should have very few or no patient pathways
patient_rows = ice_df[ice_df['value'] < 10]
# All remaining rows should have value >= 10
remaining = ice_df[ice_df['value'] >= 10]
# This may include aggregated rows
pass # Test passes if no error
if __name__ == '__main__':
pytest.main([__file__, '-v'])
+269
View File
@@ -0,0 +1,269 @@
"""
Test Plotly interactivity features in the visualization module.
Verifies that Plotly charts have the expected interactive capabilities:
1. Hover templates are properly configured
2. Icicle chart settings allow click-to-drill-down navigation
3. Layout settings support proper display of interactive features
Phase 4.7.2: Verify Plotly interactivity (zoom, pan, hover)
"""
import pytest
import pandas as pd
import numpy as np
from datetime import datetime
import plotly.graph_objects as go
# Import the visualization module
try:
from visualization.plotly_generator import create_icicle_figure, save_figure_html
HAS_VISUALIZATION = True
except ImportError:
HAS_VISUALIZATION = False
@pytest.fixture
def sample_chart_data():
"""
Create sample chart data (ice_df) for testing visualization.
This mimics the output of prepare_chart_data() from analysis/pathway_analyzer.py
"""
# Sample hierarchy data: Root -> Trust -> Directory -> Drug
data = {
'parents': [
'', # Root (N&WICS)
'N&WICS', # Trust 1
'N&WICS', # Trust 2
'Trust1', # Directory in Trust1
'Trust1', # Another Directory
'Trust2', # Directory in Trust2
'Trust1/Rheum', # Drug
'Trust1/Derm', # Drug
'Trust2/Rheum', # Drug
],
'ids': [
'N&WICS',
'Trust1',
'Trust2',
'Trust1/Rheum',
'Trust1/Derm',
'Trust2/Rheum',
'Trust1/Rheum/Adalimumab',
'Trust1/Derm/Adalimumab',
'Trust2/Rheum/Etanercept',
],
'labels': [
'Norfolk & Waveney ICS',
'Manchester University Trust',
'Barts Health Trust',
'Rheumatology',
'Dermatology',
'Rheumatology',
'Adalimumab',
'Adalimumab',
'Etanercept',
],
'value': [50, 30, 20, 20, 10, 20, 20, 10, 20],
'colour': [1.0, 0.6, 0.4, 0.4, 0.2, 0.4, 0.4, 0.2, 0.4],
'cost': [50000, 30000, 20000, 20000, 10000, 20000, 20000, 10000, 20000],
'costpp': [1000, 1000, 1000, 1000, 1000, 1000, 1000, 1000, 1000],
'cost_pp_pa': [2000, 2000, 2000, 2000, 2000, 2000, 2000, 2000, 2000],
'First seen': [
pd.Timestamp('2023-01-01')] * 9,
'Last seen': [
pd.Timestamp('2023-12-31')] * 9,
'First seen (Parent)': [
pd.Timestamp('2023-01-01')] * 9,
'Last seen (Parent)': [
pd.Timestamp('2023-12-31')] * 9,
'average_spacing': ['14 days'] * 9,
'avg_days': [pd.Timedelta('180 days')] * 9,
}
return pd.DataFrame(data)
@pytest.mark.skipif(not HAS_VISUALIZATION, reason="Visualization module not available")
class TestPlotlyFigureConfiguration:
"""Test that Plotly figures have correct interactive configuration."""
def test_figure_has_hovertemplate(self, sample_chart_data):
"""Verify the icicle chart has a hover template configured."""
fig = create_icicle_figure(sample_chart_data, "Test Title")
# Get the icicle trace
assert len(fig.data) > 0, "Figure should have at least one trace"
icicle_trace = fig.data[0]
assert icicle_trace.type == 'icicle', "First trace should be an icicle chart"
# Verify hovertemplate is set and contains expected placeholders
assert icicle_trace.hovertemplate is not None, "Hover template should be configured"
assert '%{label}' in icicle_trace.hovertemplate, "Hover should include label"
assert '%{customdata' in icicle_trace.hovertemplate, "Hover should include custom data"
def test_figure_has_texttemplate(self, sample_chart_data):
"""Verify the icicle chart has a text template for in-chart text."""
fig = create_icicle_figure(sample_chart_data, "Test Title")
icicle_trace = fig.data[0]
# Verify texttemplate is set
assert icicle_trace.texttemplate is not None, "Text template should be configured"
assert '%{label}' in icicle_trace.texttemplate, "Text should include label"
def test_figure_has_correct_branchvalues(self, sample_chart_data):
"""Verify branchvalues is set to 'total' for proper hierarchy summing."""
fig = create_icicle_figure(sample_chart_data, "Test Title")
icicle_trace = fig.data[0]
# branchvalues should be 'total' for proper hierarchy display
assert icicle_trace.branchvalues == 'total', \
"branchvalues should be 'total' for hierarchy summation"
def test_figure_has_maxdepth_for_drilldown(self, sample_chart_data):
"""Verify maxdepth is set to allow drill-down navigation."""
fig = create_icicle_figure(sample_chart_data, "Test Title")
icicle_trace = fig.data[0]
# maxdepth should be set to limit initial view depth
# Users can then click to drill into deeper levels
assert icicle_trace.maxdepth is not None, "maxdepth should be configured for drill-down"
assert icicle_trace.maxdepth >= 2, "maxdepth should be at least 2 to show hierarchy"
def test_figure_layout_has_hoverlabel(self, sample_chart_data):
"""Verify layout has hoverlabel configuration for readable tooltips."""
fig = create_icicle_figure(sample_chart_data, "Test Title")
# Check hoverlabel configuration
assert 'hoverlabel' in fig.layout, "Layout should have hoverlabel configuration"
# Plotly uses 'font' as a dict with 'size' attribute
assert fig.layout.hoverlabel.font is not None, "Hover label font should be configured"
assert fig.layout.hoverlabel.font.size is not None, "Hover label font size should be set"
assert fig.layout.hoverlabel.font.size >= 12, "Hover label should be readable (>=12px)"
def test_figure_has_proper_margins(self, sample_chart_data):
"""Verify layout has margins configured for proper display."""
fig = create_icicle_figure(sample_chart_data, "Test Title")
# Check margin configuration
assert fig.layout.margin is not None, "Margins should be configured"
assert fig.layout.margin.t >= 50, "Top margin should have room for title"
def test_figure_has_title(self, sample_chart_data):
"""Verify the figure has a title configured."""
fig = create_icicle_figure(sample_chart_data, "Test Analysis")
assert fig.layout.title is not None, "Figure should have a title"
assert "Test Analysis" in fig.layout.title.text, "Title should include custom text"
def test_figure_has_colorscale(self, sample_chart_data):
"""Verify the icicle chart has a colorscale for visual differentiation."""
fig = create_icicle_figure(sample_chart_data, "Test Title")
icicle_trace = fig.data[0]
# Check marker has colorscale
assert icicle_trace.marker is not None, "Marker should be configured"
assert icicle_trace.marker.colorscale is not None, "Colorscale should be set"
@pytest.mark.skipif(not HAS_VISUALIZATION, reason="Visualization module not available")
class TestPlotlyInteractiveFeatures:
"""Test that Plotly figures support expected interactive features."""
def test_figure_is_interactive_type(self, sample_chart_data):
"""Verify the figure is a go.Figure which supports interactivity."""
fig = create_icicle_figure(sample_chart_data, "Test Title")
assert isinstance(fig, go.Figure), "Should return a Plotly Figure object"
def test_figure_can_be_converted_to_html(self, sample_chart_data, tmp_path):
"""Verify the figure can be saved as interactive HTML."""
fig = create_icicle_figure(sample_chart_data, "Test Title")
# Save to temporary file
html_path = save_figure_html(fig, str(tmp_path), "test_chart", open_browser=False)
assert html_path.endswith('.html'), "Should save as HTML file"
# Verify the HTML file exists and contains Plotly data
with open(html_path, 'r', encoding='utf-8') as f:
html_content = f.read()
assert 'plotly' in html_content.lower(), "HTML should contain Plotly"
# Interactive HTML should include the plotly.js library
assert 'cdn.plot.ly' in html_content or 'plotly-' in html_content, \
"HTML should include Plotly.js for interactivity"
def test_figure_data_includes_ids_for_drilldown(self, sample_chart_data):
"""Verify figure data includes ids necessary for click-to-drill navigation."""
fig = create_icicle_figure(sample_chart_data, "Test Title")
icicle_trace = fig.data[0]
# ids are required for proper drill-down behavior in icicle charts
assert icicle_trace.ids is not None, "ids should be provided for drill-down"
assert len(icicle_trace.ids) > 0, "ids should not be empty"
def test_figure_data_includes_parents_for_hierarchy(self, sample_chart_data):
"""Verify figure data includes parents for hierarchy navigation."""
fig = create_icicle_figure(sample_chart_data, "Test Title")
icicle_trace = fig.data[0]
# parents are required for hierarchy structure
assert icicle_trace.parents is not None, "parents should be provided"
assert len(icicle_trace.parents) > 0, "parents should not be empty"
def test_figure_customdata_enables_rich_hover(self, sample_chart_data):
"""Verify customdata is provided for rich hover information."""
fig = create_icicle_figure(sample_chart_data, "Test Title")
icicle_trace = fig.data[0]
# customdata enables rich hover templates with additional info
assert icicle_trace.customdata is not None, "customdata should be provided"
# customdata should be a 2D array with multiple columns of data
assert len(icicle_trace.customdata) > 0, "customdata should have rows"
# Each row should have multiple data points for hover display
if hasattr(icicle_trace.customdata[0], '__len__'):
assert len(icicle_trace.customdata[0]) >= 5, \
"customdata should have multiple columns for rich hover"
@pytest.mark.skipif(not HAS_VISUALIZATION, reason="Visualization module not available")
class TestReflexCompatibility:
"""Test that figures are compatible with Reflex's rx.plotly() component."""
def test_figure_to_json_serializable(self, sample_chart_data):
"""Verify figure can be serialized to JSON (required for Reflex)."""
fig = create_icicle_figure(sample_chart_data, "Test Title")
# Reflex needs to serialize the figure to JSON for the frontend
try:
json_data = fig.to_json()
assert json_data is not None
assert len(json_data) > 0
except Exception as e:
pytest.fail(f"Figure should be JSON serializable: {e}")
def test_figure_to_dict(self, sample_chart_data):
"""Verify figure can be converted to dict (used by Reflex internally)."""
fig = create_icicle_figure(sample_chart_data, "Test Title")
# Reflex may use to_dict internally
fig_dict = fig.to_dict()
assert 'data' in fig_dict, "Figure dict should have data"
assert 'layout' in fig_dict, "Figure dict should have layout"
assert len(fig_dict['data']) > 0, "Data should not be empty"
if __name__ == '__main__':
pytest.main([__file__, '-v'])
+176
View File
@@ -0,0 +1,176 @@
"""
Test Phase 3.4.4: Measure directory assignment "Undefined" rate with real Snowflake data.
This test fetches HCD activity data from Snowflake, runs it through the directory
assignment pipeline, and measures what percentage of records end up with "Undefined"
directory vs. successfully assigned directories.
"""
import json
import pandas as pd
import sys
from pathlib import Path
# Add project root to path
project_root = Path(__file__).parent.parent
sys.path.insert(0, str(project_root))
from tools.data import patient_id, drug_names, department_identification
from core import default_paths
def load_snowflake_result(json_file: Path) -> pd.DataFrame:
"""Load Snowflake query result from JSON file and convert to DataFrame."""
with open(json_file, 'r', encoding='utf-8') as f:
data = json.load(f)
# The result is in format: [{"type": "text", "text": "..."}]
# where text contains JSON with {"columns": [...], "rows": [...]}
if isinstance(data, list) and len(data) > 0 and 'text' in data[0]:
records_text = data[0]['text']
result_obj = json.loads(records_text)
# Extract rows from the result object
if isinstance(result_obj, dict) and 'rows' in result_obj:
records = result_obj['rows']
else:
records = result_obj
else:
records = data
return pd.DataFrame(records)
def analyze_directory_sources(df: pd.DataFrame) -> dict:
"""Analyze the distribution of Directory_Source values."""
if 'Directory_Source' not in df.columns:
return {"error": "Directory_Source column not found"}
source_counts = df['Directory_Source'].value_counts()
total = len(df)
result = {
"total_records": total,
"source_distribution": {},
"undefined_rate": 0.0,
"assigned_rate": 0.0
}
for source, count in source_counts.items():
pct = (count / total) * 100
result["source_distribution"][source] = {
"count": int(count),
"percentage": round(pct, 2)
}
# Calculate undefined vs assigned rates
undefined_count = source_counts.get('UNDEFINED', 0)
result["undefined_rate"] = round((undefined_count / total) * 100, 2) if total > 0 else 0
result["assigned_rate"] = round(100 - result["undefined_rate"], 2)
return result
def analyze_by_drug(df: pd.DataFrame) -> dict:
"""Analyze undefined rate by drug."""
if 'Drug Name' not in df.columns or 'Directory_Source' not in df.columns:
return {"error": "Required columns not found"}
results = {}
for drug in df['Drug Name'].dropna().unique():
drug_df = df[df['Drug Name'] == drug]
total = len(drug_df)
undefined = len(drug_df[drug_df['Directory_Source'] == 'UNDEFINED'])
results[drug] = {
"total": total,
"undefined": undefined,
"undefined_rate": round((undefined / total) * 100, 2) if total > 0 else 0
}
return results
def main():
"""Main function to run the real data test."""
# Path to the Snowflake result file (updated 2026-02-04)
result_file = Path(r"C:\Users\charlwoodand\.claude\projects\C--Users-charlwoodand-Ralph-local-Tasks-Patient-pathway-analysis\2b846818-a586-47de-bfb9-a740bd07fc70\tool-results\mcp-snowflake-mcp-read_data-1770199331688.txt")
if not result_file.exists():
print(f"ERROR: Result file not found: {result_file}")
return
print("Loading Snowflake data...")
df = load_snowflake_result(result_file)
print(f"Loaded {len(df)} records")
print(f"Columns: {list(df.columns)}")
# Rename columns to match expected format for tools/data.py functions
column_mapping = {
'ProviderCode': 'Provider Code',
'PersonKey': 'PersonKey',
'DrugName': 'Drug Name',
'InterventionDate': 'Intervention Date',
'TreatmentFunctionCode': 'Treatment Function Code',
'AdditionalDetail1': 'Additional Detail 1',
'AdditionalDescription1': 'Additional Description 1',
'AdditionalDetail2': 'Additional Detail 2',
'AdditionalDescription2': 'Additional Description 2',
'PriceActual': 'Price Actual',
'OrganisationName': 'OrganisationName'
}
df = df.rename(columns=column_mapping)
print(f"Renamed columns: {list(df.columns)}")
# Step 1: Generate UPID
print("\nStep 1: Generating UPID...")
df = patient_id(df)
print(f"Sample UPIDs: {df['UPID'].head(5).tolist()}")
# Step 2: Standardize drug names
print("\nStep 2: Standardizing drug names...")
df = drug_names(df, default_paths)
print(f"Unique drugs after standardization: {df['Drug Name'].dropna().unique().tolist()}")
# Step 3: Run directory assignment
print("\nStep 3: Running directory assignment...")
df = department_identification(df, default_paths)
# Step 4: Analyze results
print("\n" + "="*60)
print("DIRECTORY ASSIGNMENT RESULTS")
print("="*60)
overall_stats = analyze_directory_sources(df)
print(f"\nTotal records processed: {overall_stats['total_records']}")
print(f"\nDirectory Source Distribution:")
for source, stats in sorted(overall_stats['source_distribution'].items(),
key=lambda x: -x[1]['count']):
print(f" {source}: {stats['count']:,} ({stats['percentage']:.1f}%)")
print(f"\n*** UNDEFINED RATE: {overall_stats['undefined_rate']:.1f}% ***")
print(f"*** ASSIGNED RATE: {overall_stats['assigned_rate']:.1f}% ***")
# Analyze by drug
print("\n" + "-"*60)
print("UNDEFINED RATE BY DRUG")
print("-"*60)
drug_stats = analyze_by_drug(df)
for drug, stats in sorted(drug_stats.items(), key=lambda x: -x[1]['undefined_rate']):
print(f" {drug}: {stats['undefined_rate']:.1f}% undefined ({stats['undefined']:,}/{stats['total']:,})")
# Show sample of directory assignments
print("\n" + "-"*60)
print("SAMPLE DIRECTORY ASSIGNMENTS")
print("-"*60)
sample_cols = ['UPID', 'Drug Name', 'Directory', 'Directory_Source']
available_cols = [c for c in sample_cols if c in df.columns]
print(df[available_cols].head(20).to_string())
return overall_stats, drug_stats
if __name__ == "__main__":
main()