diff --git a/IMPLEMENTATION_PLAN.md b/IMPLEMENTATION_PLAN.md index 8104904..39da9be 100644 --- a/IMPLEMENTATION_PLAN.md +++ b/IMPLEMENTATION_PLAN.md @@ -90,7 +90,7 @@ python -m reflex compile ## Phase 3: Test Full Pipeline ### 3.1 Test Refresh with Real Data -- [ ] Run `python -m cli.refresh_pathways --chart-type all` with Snowflake +- [~] Run `python -m cli.refresh_pathways --chart-type all` with Snowflake - [ ] Verify pathway_nodes table has both chart_type values: - `SELECT chart_type, COUNT(*) FROM pathway_nodes GROUP BY chart_type` - [ ] Verify indication hierarchy: Trust → Search_Term → Drug → Pathway diff --git a/cli/refresh_pathways.py b/cli/refresh_pathways.py index f248ce1..041b5cb 100644 --- a/cli/refresh_pathways.py +++ b/cli/refresh_pathways.py @@ -384,11 +384,8 @@ def refresh_pathways( results[f"{config.id}:indication"] = [] continue - # Get unique patient pseudonyms and their corresponding UPID/Directory - patient_lookup = df[['UPID', 'PseudoNHSNoLinked', 'Directory']].drop_duplicates( - subset=['PseudoNHSNoLinked'] - ).copy() - patient_pseudonyms = patient_lookup['PseudoNHSNoLinked'].dropna().unique().tolist() + # Get unique patient pseudonyms for GP lookup (avoid redundant queries) + patient_pseudonyms = df['PseudoNHSNoLinked'].dropna().unique().tolist() logger.info(f"Looking up GP diagnoses for {len(patient_pseudonyms)} unique patients...") @@ -402,15 +399,24 @@ def refresh_pathways( # Step 3: Build indication_df mapping UPID -> Indication_Group # For matched patients: Indication_Group = Search_Term # For unmatched patients: Indication_Group = Directory + " (no GP dx)" + # + # IMPORTANT: We need ALL unique UPIDs, not just unique PseudoNHSNoLinked. + # A patient can have multiple UPIDs if they visited multiple providers. + + # Get all unique UPID records with their PseudoNHSNoLinked and Directory + upid_lookup = df[['UPID', 'PseudoNHSNoLinked', 'Directory']].drop_duplicates( + subset=['UPID'] + ).copy() if gp_matches_df.empty: logger.warning("No GP matches found - all patients will use fallback directory") # All patients use fallback indication_records = [] - for _, row in patient_lookup.iterrows(): + for _, row in upid_lookup.iterrows(): + directory = row['Directory'] indication_records.append({ 'UPID': row['UPID'], - 'Indication_Group': str(row['Directory']) + " (no GP dx)", + 'Indication_Group': str(directory) + " (no GP dx)" if pd.notna(directory) else "UNKNOWN (no GP dx)", 'Source': 'FALLBACK', }) indication_df = pd.DataFrame(indication_records) @@ -421,23 +427,25 @@ def refresh_pathways( gp_matches_df['Search_Term'] )) - # Build indication records for each unique patient + # Build indication records for each unique UPID indication_records = [] - for _, row in patient_lookup.iterrows(): + for _, row in upid_lookup.iterrows(): pseudo = row['PseudoNHSNoLinked'] upid = row['UPID'] directory = row['Directory'] - if pseudo in match_lookup: + if pd.notna(pseudo) and pseudo in match_lookup: indication_records.append({ 'UPID': upid, 'Indication_Group': match_lookup[pseudo], 'Source': 'DIAGNOSIS', }) else: + # Use fallback: Directory + " (no GP dx)" + fallback_label = str(directory) + " (no GP dx)" if pd.notna(directory) else "UNKNOWN (no GP dx)" indication_records.append({ 'UPID': upid, - 'Indication_Group': str(directory) + " (no GP dx)", + 'Indication_Group': fallback_label, 'Source': 'FALLBACK', }) @@ -465,6 +473,9 @@ def refresh_pathways( # It expects indication_df to have 'Directory' column (mapped from Indication_Group) indication_df_for_chart = indication_df[['UPID', 'Indication_Group']].copy() indication_df_for_chart = indication_df_for_chart.rename(columns={'Indication_Group': 'Directory'}) + # Ensure unique UPID index (build_hierarchy requires uniquely valued Index) + # Keep first occurrence - DIAGNOSIS entries should come before FALLBACK + indication_df_for_chart = indication_df_for_chart.drop_duplicates(subset=['UPID'], keep='first') indication_df_for_chart = indication_df_for_chart.set_index('UPID') # Process each date filter with indication grouping diff --git a/data_processing/diagnosis_lookup.py b/data_processing/diagnosis_lookup.py index fd511c8..0dbd58b 100644 --- a/data_processing/diagnosis_lookup.py +++ b/data_processing/diagnosis_lookup.py @@ -1348,12 +1348,13 @@ def get_patient_indication_groups( # Build the full query with cluster CTE # This finds the most recent matching diagnosis for each patient + # Note: Column names must be aliased to ensure consistent casing in results query = f""" {CLUSTER_MAPPING_SQL} SELECT - pc."PatientPseudonym", - aic.Search_Term, - pc."EventDateTime" + pc."PatientPseudonym" AS "PatientPseudonym", + aic.Search_Term AS "Search_Term", + pc."EventDateTime" AS "EventDateTime" FROM DATA_HUB.PHM."PrimaryCareClinicalCoding" pc INNER JOIN AllIndicationCodes aic ON pc."SNOMEDCode" = aic.SNOMEDCode