From ad10b374cb4caf9b2d7915e5128ef107dd55f6a4 Mon Sep 17 00:00:00 2001 From: Andrew Charlwood Date: Thu, 5 Feb 2026 17:06:34 +0000 Subject: [PATCH] feat: integrate Snowflake-direct indication lookup into CLI refresh (Task 1.2, 2.3) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Replace batch_lookup_indication_groups() with get_patient_indication_groups() for indication chart processing. The new approach: - Extracts unique PseudoNHSNoLinked values from HCD data - Queries Snowflake directly using the cluster CTE - Builds indication_df mapping UPID → Search_Term (matched) or Directory (fallback) - Logs coverage statistics (diagnosis % vs fallback %) This completes the integration of the new Snowflake-direct GP lookup approach. --- IMPLEMENTATION_PLAN.md | 8 ++-- cli/refresh_pathways.py | 84 ++++++++++++++++++++++++++++++++++++----- 2 files changed, 79 insertions(+), 13 deletions(-) diff --git a/IMPLEMENTATION_PLAN.md b/IMPLEMENTATION_PLAN.md index a064cf9..8104904 100644 --- a/IMPLEMENTATION_PLAN.md +++ b/IMPLEMENTATION_PLAN.md @@ -52,12 +52,12 @@ python -m reflex compile - [ ] Verify: Function returns expected Search_Terms for test patients ### 1.2 Update Data Pipeline to Include Indications -- [ ] Modify `cli/refresh_pathways.py` to call indication lookup during refresh: +- [x] Modify `cli/refresh_pathways.py` to call indication lookup during refresh: - After fetching HCD data, extract unique PseudoNHSNoLinked values - Call `get_patient_indication_groups()` with patient list - Create `indication_df` mapping UPID → Indication_Group - For patients with no GP match: Indication_Group = fallback directorate -- [ ] Log coverage: X% diagnosis-matched, Y% fallback +- [x] Log coverage: X% diagnosis-matched, Y% fallback - [ ] Verify: indication_df has correct structure for pathway processing --- @@ -79,10 +79,10 @@ python -m reflex compile ### 2.3 Update Refresh Command for Dual Charts - [x] Add `--chart-type` argument: "all", "directory", "indication" (ALREADY DONE) -- [ ] Update indication processing to use new `get_patient_indication_groups()`: +- [x] Update indication processing to use new `get_patient_indication_groups()`: - Replace `batch_lookup_indication_groups()` with the new Snowflake-direct approach - Pass indication_df to `process_indication_pathway_for_date_filter()` -- [ ] Process all 6 date filters for both chart types +- [x] Process all 6 date filters for both chart types (existing loop already handles this) - [ ] Verify: Both chart types generate pathway data --- diff --git a/cli/refresh_pathways.py b/cli/refresh_pathways.py index a16a933..f248ce1 100644 --- a/cli/refresh_pathways.py +++ b/cli/refresh_pathways.py @@ -48,7 +48,7 @@ from data_processing.pathway_pipeline import ( extract_indication_fields, convert_to_records, ) -from data_processing.diagnosis_lookup import batch_lookup_indication_groups +from data_processing.diagnosis_lookup import get_patient_indication_groups logger = get_logger(__name__) @@ -360,10 +360,10 @@ def refresh_pathways( elif current_chart_type == "indication": # For indication charts, we need to look up GP diagnoses for all patients - # This creates indication_df mapping UPID -> Indication_Group - logger.info("Building indication groups from GP diagnosis lookups...") + # using the new Snowflake-direct approach via get_patient_indication_groups() + logger.info("Building indication groups from GP diagnosis lookups (Snowflake-direct)...") - # Get Snowflake connector for GP lookups + # Check Snowflake availability from data_processing.snowflake_connector import get_connector, is_snowflake_available if not is_snowflake_available(): @@ -373,28 +373,94 @@ def refresh_pathways( continue try: + import pandas as pd connector = get_connector() - # Batch lookup indication groups for all patients - indication_df = batch_lookup_indication_groups( - df=df, + # Step 1: Extract unique PseudoNHSNoLinked values from df + # This is the patient identifier that matches PatientPseudonym in GP records + if 'PseudoNHSNoLinked' not in df.columns: + logger.error("DataFrame missing 'PseudoNHSNoLinked' column - cannot lookup GP records") + for config in DATE_FILTER_CONFIGS: + results[f"{config.id}:indication"] = [] + continue + + # Get unique patient pseudonyms and their corresponding UPID/Directory + patient_lookup = df[['UPID', 'PseudoNHSNoLinked', 'Directory']].drop_duplicates( + subset=['PseudoNHSNoLinked'] + ).copy() + patient_pseudonyms = patient_lookup['PseudoNHSNoLinked'].dropna().unique().tolist() + + logger.info(f"Looking up GP diagnoses for {len(patient_pseudonyms)} unique patients...") + + # Step 2: Call the new Snowflake-direct indication lookup + gp_matches_df = get_patient_indication_groups( + patient_pseudonyms=patient_pseudonyms, connector=connector, batch_size=500, ) + # Step 3: Build indication_df mapping UPID -> Indication_Group + # For matched patients: Indication_Group = Search_Term + # For unmatched patients: Indication_Group = Directory + " (no GP dx)" + + if gp_matches_df.empty: + logger.warning("No GP matches found - all patients will use fallback directory") + # All patients use fallback + indication_records = [] + for _, row in patient_lookup.iterrows(): + indication_records.append({ + 'UPID': row['UPID'], + 'Indication_Group': str(row['Directory']) + " (no GP dx)", + 'Source': 'FALLBACK', + }) + indication_df = pd.DataFrame(indication_records) + else: + # Create lookup dict: PseudoNHSNoLinked -> Search_Term + match_lookup = dict(zip( + gp_matches_df['PatientPseudonym'], + gp_matches_df['Search_Term'] + )) + + # Build indication records for each unique patient + indication_records = [] + for _, row in patient_lookup.iterrows(): + pseudo = row['PseudoNHSNoLinked'] + upid = row['UPID'] + directory = row['Directory'] + + if pseudo in match_lookup: + indication_records.append({ + 'UPID': upid, + 'Indication_Group': match_lookup[pseudo], + 'Source': 'DIAGNOSIS', + }) + else: + indication_records.append({ + 'UPID': upid, + 'Indication_Group': str(directory) + " (no GP dx)", + 'Source': 'FALLBACK', + }) + + indication_df = pd.DataFrame(indication_records) + # Log coverage statistics if not indication_df.empty: diagnosis_count = (indication_df['Source'] == 'DIAGNOSIS').sum() fallback_count = (indication_df['Source'] == 'FALLBACK').sum() total = len(indication_df) stats["diagnosis_coverage"] = { - "diagnosis": diagnosis_count, - "fallback": fallback_count, + "diagnosis": int(diagnosis_count), + "fallback": int(fallback_count), "total": total, "diagnosis_pct": round(100 * diagnosis_count / total, 1) if total > 0 else 0, } logger.info(f"Indication coverage: {diagnosis_count}/{total} ({stats['diagnosis_coverage']['diagnosis_pct']}%) diagnosis-matched") + # Log top indication groups + top_indications = indication_df[indication_df['Source'] == 'DIAGNOSIS']['Indication_Group'].value_counts().head(5) + if len(top_indications) > 0: + logger.info(f"Top 5 indications: {dict(top_indications)}") + # Rename column for compatibility with generate_icicle_chart_indication # It expects indication_df to have 'Directory' column (mapped from Indication_Group) indication_df_for_chart = indication_df[['UPID', 'Indication_Group']].copy()