""" Test to verify that the refactored analysis pipeline produces matching output. This test compares the output of the refactored generate_icicle_chart() function from analysis/pathway_analyzer.py with expected output characteristics. Since the original generate_graph() function calls figure() directly without returning data, we verify the refactored pipeline by: 1. Running the pipeline with known test data 2. Verifying the output DataFrame has correct structure 3. Verifying statistical calculations are reasonable """ import pytest import pandas as pd import numpy as np from datetime import datetime from pathlib import Path # Skip if we can't import the modules try: from analysis.pathway_analyzer import ( generate_icicle_chart, prepare_data, calculate_statistics, build_hierarchy, prepare_chart_data, ) from core import default_paths HAS_MODULES = True except ImportError: HAS_MODULES = False # Standard test filters (matching sample data) TEST_TRUST_FILTER = [ 'MANCHESTER UNIVERSITY NHS FOUNDATION TRUST', # R0A code 'BARTS HEALTH NHS TRUST', # R1H code ] TEST_DRUG_FILTER = ['ADALIMUMAB', 'ETANERCEPT', 'INFLIXIMAB'] TEST_DIRECTORY_FILTER = ['Rheumatology', 'Dermatology', 'Gastroenterology'] @pytest.fixture def sample_intervention_data(): """ Create sample intervention data similar to what comes from the data loader. The data mimics the structure expected by generate_icicle_chart(): - UPID: Unique patient identifier (Provider Code prefix + PersonKey) - Drug Name: Standardized drug name - Directory: Medical specialty - Intervention Date: Date of treatment - Price Actual: Cost of treatment - Provider Code: NHS Trust code (will be mapped to name via org_codes.csv) Uses real trust codes from org_codes.csv: - R0A = MANCHESTER UNIVERSITY NHS FOUNDATION TRUST - R1H = BARTS HEALTH NHS TRUST """ # Create data for a small number of patients with varied pathways data = { 'UPID': [ # Patient 1: Trust1 (R0A), Rheumatology, Adalimumab only (5 treatments) 'R0A12345', 'R0A12345', 'R0A12345', 'R0A12345', 'R0A12345', # Patient 2: Trust1 (R0A), Rheumatology, Adalimumab then Etanercept (4 treatments) 'R0A67890', 'R0A67890', 'R0A67890', 'R0A67890', # Patient 3: Trust1 (R0A), Dermatology, Adalimumab only (3 treatments) 'R0A11111', 'R0A11111', 'R0A11111', # Patient 4: Trust2 (R1H), Rheumatology, Etanercept only (6 treatments) 'R1H22222', 'R1H22222', 'R1H22222', 'R1H22222', 'R1H22222', 'R1H22222', # Patient 5: Trust2 (R1H), Gastro, Infliximab only (4 treatments) 'R1H33333', 'R1H33333', 'R1H33333', 'R1H33333', ], 'Drug Name': [ 'ADALIMUMAB', 'ADALIMUMAB', 'ADALIMUMAB', 'ADALIMUMAB', 'ADALIMUMAB', 'ADALIMUMAB', 'ADALIMUMAB', 'ETANERCEPT', 'ETANERCEPT', 'ADALIMUMAB', 'ADALIMUMAB', 'ADALIMUMAB', 'ETANERCEPT', 'ETANERCEPT', 'ETANERCEPT', 'ETANERCEPT', 'ETANERCEPT', 'ETANERCEPT', 'INFLIXIMAB', 'INFLIXIMAB', 'INFLIXIMAB', 'INFLIXIMAB', ], 'Directory': [ 'Rheumatology', 'Rheumatology', 'Rheumatology', 'Rheumatology', 'Rheumatology', 'Rheumatology', 'Rheumatology', 'Rheumatology', 'Rheumatology', 'Dermatology', 'Dermatology', 'Dermatology', 'Rheumatology', 'Rheumatology', 'Rheumatology', 'Rheumatology', 'Rheumatology', 'Rheumatology', 'Gastroenterology', 'Gastroenterology', 'Gastroenterology', 'Gastroenterology', ], 'Intervention Date': [ # Patient 1 dates (every 2 weeks) datetime(2023, 1, 1), datetime(2023, 1, 15), datetime(2023, 1, 29), datetime(2023, 2, 12), datetime(2023, 2, 26), # Patient 2 dates (switch after 2 months) datetime(2023, 1, 5), datetime(2023, 2, 5), datetime(2023, 3, 5), datetime(2023, 4, 5), # Patient 3 dates datetime(2023, 2, 1), datetime(2023, 3, 1), datetime(2023, 4, 1), # Patient 4 dates (weekly for 6 weeks) datetime(2023, 1, 1), datetime(2023, 1, 8), datetime(2023, 1, 15), datetime(2023, 1, 22), datetime(2023, 1, 29), datetime(2023, 2, 5), # Patient 5 dates (every 4 weeks) datetime(2023, 1, 10), datetime(2023, 2, 7), datetime(2023, 3, 7), datetime(2023, 4, 4), ], 'Price Actual': [ # Patient 1 costs 500.0, 500.0, 500.0, 500.0, 500.0, # Patient 2 costs 500.0, 500.0, 600.0, 600.0, # Patient 3 costs 500.0, 500.0, 500.0, # Patient 4 costs 400.0, 400.0, 400.0, 400.0, 400.0, 400.0, # Patient 5 costs 800.0, 800.0, 800.0, 800.0, ], 'Provider Code': [ # Trust codes (R0A = Manchester, R1H = Barts) 'R0A', 'R0A', 'R0A', 'R0A', 'R0A', 'R0A', 'R0A', 'R0A', 'R0A', 'R0A', 'R0A', 'R0A', 'R1H', 'R1H', 'R1H', 'R1H', 'R1H', 'R1H', 'R1H', 'R1H', 'R1H', 'R1H', ], } return pd.DataFrame(data) @pytest.mark.skipif(not HAS_MODULES, reason="Required modules not available") class TestOutputStructure: """Test that the refactored pipeline produces correct output structure.""" def test_ice_df_has_required_columns(self, sample_intervention_data): """Verify ice_df has all required columns for Plotly icicle chart.""" if default_paths.validate(): # Non-empty list means errors pytest.skip("Reference data files not available") df = sample_intervention_data.copy() ice_df, title = generate_icicle_chart( df=df, start_date='2022-01-01', end_date='2024-01-01', last_seen_date='2022-06-01', trust_filter=TEST_TRUST_FILTER, drug_filter=TEST_DRUG_FILTER, directory_filter=TEST_DIRECTORY_FILTER, minimum_num_patients=1, title="Test Output", paths=default_paths, ) if ice_df is None: pytest.skip("No data matched filters (trust code mapping may not match)") # Required columns for Plotly icicle chart required_columns = ['parents', 'labels', 'ids', 'value', 'cost'] for col in required_columns: assert col in ice_df.columns, f"Missing required column: {col}" def test_ice_df_hierarchy_structure(self, sample_intervention_data): """Verify the ice_df hierarchy is valid (parents reference existing ids).""" if default_paths.validate(): # Non-empty list means errors pytest.skip("Reference data files not available") df = sample_intervention_data.copy() ice_df, title = generate_icicle_chart( df=df, start_date='2022-01-01', end_date='2024-01-01', last_seen_date='2022-06-01', trust_filter=TEST_TRUST_FILTER, drug_filter=TEST_DRUG_FILTER, directory_filter=TEST_DIRECTORY_FILTER, minimum_num_patients=1, title="Test Output", ) if ice_df is None: pytest.skip("No data matched filters") # Every parent should be in ids (except root which has empty parent) ids_set = set(ice_df['ids'].unique()) for parent in ice_df['parents'].unique(): if parent != '': # Root has empty parent assert parent in ids_set, f"Parent '{parent}' not found in ids" def test_values_sum_correctly(self, sample_intervention_data): """Verify that child values sum to parent values (with branchvalues='total').""" if default_paths.validate(): # Non-empty list means errors pytest.skip("Reference data files not available") df = sample_intervention_data.copy() ice_df, title = generate_icicle_chart( df=df, start_date='2022-01-01', end_date='2024-01-01', last_seen_date='2022-06-01', trust_filter=TEST_TRUST_FILTER, drug_filter=TEST_DRUG_FILTER, directory_filter=TEST_DIRECTORY_FILTER, minimum_num_patients=1, title="Test Output", ) if ice_df is None: pytest.skip("No data matched filters") # Verify the structure is valid: # - Root (N&WICS) should have the highest value # - All child values should sum to at most their parent value root_row = ice_df[ice_df['ids'] == 'N&WICS'] if len(root_row) > 0: root_value = root_row['value'].iloc[0] assert root_value > 0, "Root should have positive value" # Check that children sum to parent value for nodes at same level # Note: The icicle chart uses branchvalues='total' so children should sum to parent # However, at pathway level, patients may appear in multiple pathway branches for parent_id in ice_df['ids'].unique(): parent_row = ice_df[ice_df['ids'] == parent_id] if len(parent_row) == 0: continue parent_value = parent_row['value'].iloc[0] children = ice_df[ice_df['parents'] == parent_id] if len(children) > 0: children_sum = children['value'].sum() # Children should sum to parent value in a properly constructed icicle chart # Allow for small differences due to filtering at minimum_num_patients assert children_sum <= parent_value, \ f"Children of '{parent_id}' sum to {children_sum}, exceeds parent {parent_value}" @pytest.mark.skipif(not HAS_MODULES, reason="Required modules not available") class TestPrepareData: """Test the prepare_data() function independently.""" def test_prepare_data_filters_correctly(self, sample_intervention_data): """Verify prepare_data applies filters correctly.""" if default_paths.validate(): # Non-empty list means errors pytest.skip("Reference data files not available") df = sample_intervention_data.copy() # Filter to single drug result = prepare_data( df, TEST_TRUST_FILTER, ['ADALIMUMAB'], # Only Adalimumab TEST_DIRECTORY_FILTER ) if result[0] is None: pytest.skip("No data matched filters") filtered_df, org_codes, directory_df = result # Should only have Adalimumab rows assert set(filtered_df['Drug Name'].unique()) == {'ADALIMUMAB'} def test_prepare_data_creates_upid_treatment(self, sample_intervention_data): """Verify prepare_data creates UPIDTreatment column.""" if default_paths.validate(): # Non-empty list means errors pytest.skip("Reference data files not available") df = sample_intervention_data.copy() result = prepare_data( df, TEST_TRUST_FILTER, TEST_DRUG_FILTER, TEST_DIRECTORY_FILTER ) if result[0] is None: pytest.skip("No data matched filters") filtered_df, org_codes, directory_df = result # UPIDTreatment should be UPID + Drug Name assert 'UPIDTreatment' in filtered_df.columns # Check first row first_row = filtered_df.iloc[0] expected = first_row['UPID'] + first_row['Drug Name'] assert first_row['UPIDTreatment'] == expected @pytest.mark.skipif(not HAS_MODULES, reason="Required modules not available") class TestCalculateStatistics: """Test the calculate_statistics() function independently.""" def test_date_filtering(self, sample_intervention_data): """Verify date filtering in calculate_statistics.""" if default_paths.validate(): # Non-empty list means errors pytest.skip("Reference data files not available") df = sample_intervention_data.copy() df['UPIDTreatment'] = df['UPID'] + df['Drug Name'] # These dates should include all our sample data start_date = '2022-01-01' end_date = '2024-01-01' last_seen_date = '2022-06-01' result = calculate_statistics(df, start_date, end_date, last_seen_date, "Test") if result[0] is None: pytest.skip("No data matched date filters") patient_info, date_df, title = result # Should have patient info DataFrame assert patient_info is not None assert len(patient_info) > 0 @pytest.mark.skipif(not HAS_MODULES, reason="Required modules not available") class TestMinimumPatientFilter: """Test that minimum_num_patients filter works correctly.""" def test_filters_small_pathways(self, sample_intervention_data): """Verify pathways with fewer patients than threshold are excluded.""" if default_paths.validate(): # Non-empty list means errors pytest.skip("Reference data files not available") df = sample_intervention_data.copy() # With minimum 10, nothing should pass (we only have 5 patients) ice_df, title = generate_icicle_chart( df=df, start_date='2022-01-01', end_date='2024-01-01', last_seen_date='2022-06-01', trust_filter=TEST_TRUST_FILTER, drug_filter=TEST_DRUG_FILTER, directory_filter=TEST_DIRECTORY_FILTER, minimum_num_patients=10, # Higher than our patient count title="Test Output", ) # Either None or empty DataFrame if ice_df is not None: # If filtered, should have very few or no patient pathways patient_rows = ice_df[ice_df['value'] < 10] # All remaining rows should have value >= 10 remaining = ice_df[ice_df['value'] >= 10] # This may include aggregated rows pass # Test passes if no error if __name__ == '__main__': pytest.main([__file__, '-v'])