From 920570b4378f5931245b917351da3edb9d0e92f6 Mon Sep 17 00:00:00 2001 From: Andrew Charlwood Date: Thu, 5 Feb 2026 23:11:01 +0000 Subject: [PATCH] feat: integrate drug-aware indication matching into refresh pipeline (Task 3.1) Replace old per-patient indication matching in refresh_pathways.py with drug-aware matching via assign_drug_indications(). Each drug is now cross-referenced against both the patient's GP diagnoses AND the DimSearchTerm.csv drug mapping. GP codes restricted to HCD data window via earliest_hcd_date parameter. --- IMPLEMENTATION_PLAN.md | 6 +- cli/refresh_pathways.py | 137 ++++++++++++---------------------------- progress.txt | 46 ++++++++++++++ 3 files changed, 91 insertions(+), 98 deletions(-) diff --git a/IMPLEMENTATION_PLAN.md b/IMPLEMENTATION_PLAN.md index 68b2f2b..2647963 100644 --- a/IMPLEMENTATION_PLAN.md +++ b/IMPLEMENTATION_PLAN.md @@ -143,14 +143,14 @@ Only assign a drug to an indication if BOTH conditions are met. If a patient's d ## Phase 3: Pipeline Integration ### 3.1 Update `refresh_pathways.py` indication processing -- [ ] In the `elif current_chart_type == "indication":` block: +- [x] In the `elif current_chart_type == "indication":` block: 1. Call `get_patient_indication_groups()` as before (but now returns ALL matches) 2. Load drug mapping: `drug_mapping = load_drug_indication_mapping()` 3. Call `assign_drug_indications(df, gp_matches_df, drug_mapping)` 4. Use modified_df (with indication-aware UPIDs) for pathway processing 5. Use indication_df for the indication mapping -- [ ] Pass modified_df (not original df) to `process_indication_pathway_for_date_filter()` -- [ ] Verify: Pipeline compiles, `python -m py_compile cli/refresh_pathways.py` +- [x] Pass modified_df (not original df) to `process_indication_pathway_for_date_filter()` +- [x] Verify: Pipeline compiles, `python -m py_compile cli/refresh_pathways.py` ### 3.2 Test with dry run - [ ] Run `python -m cli.refresh_pathways --chart-type indication --dry-run -v` diff --git a/cli/refresh_pathways.py b/cli/refresh_pathways.py index 041b5cb..2942754 100644 --- a/cli/refresh_pathways.py +++ b/cli/refresh_pathways.py @@ -48,7 +48,11 @@ from data_processing.pathway_pipeline import ( extract_indication_fields, convert_to_records, ) -from data_processing.diagnosis_lookup import get_patient_indication_groups +from data_processing.diagnosis_lookup import ( + assign_drug_indications, + get_patient_indication_groups, + load_drug_indication_mapping, +) logger = get_logger(__name__) @@ -359,9 +363,11 @@ def refresh_pathways( results[f"{filter_id}:directory"] = records elif current_chart_type == "indication": - # For indication charts, we need to look up GP diagnoses for all patients - # using the new Snowflake-direct approach via get_patient_indication_groups() - logger.info("Building indication groups from GP diagnosis lookups (Snowflake-direct)...") + # For indication charts, use drug-aware matching: + # 1. Get ALL GP diagnosis matches per patient (with code_frequency) + # 2. Cross-reference with drug-to-Search_Term mapping from DimSearchTerm.csv + # 3. Assign each drug to its matched indication via modified UPIDs + logger.info("Building drug-aware indication groups...") # Check Snowflake availability from data_processing.snowflake_connector import get_connector, is_snowflake_available @@ -376,115 +382,60 @@ def refresh_pathways( import pandas as pd connector = get_connector() - # Step 1: Extract unique PseudoNHSNoLinked values from df - # This is the patient identifier that matches PatientPseudonym in GP records if 'PseudoNHSNoLinked' not in df.columns: logger.error("DataFrame missing 'PseudoNHSNoLinked' column - cannot lookup GP records") for config in DATE_FILTER_CONFIGS: results[f"{config.id}:indication"] = [] continue - # Get unique patient pseudonyms for GP lookup (avoid redundant queries) - patient_pseudonyms = df['PseudoNHSNoLinked'].dropna().unique().tolist() + # Step 1: Load drug-to-Search_Term mapping from DimSearchTerm.csv + _, search_term_to_fragments = load_drug_indication_mapping() + logger.info(f"Loaded drug mapping: {len(search_term_to_fragments)} Search_Terms") + # Step 2: Get ALL GP diagnosis matches per patient (with code_frequency) + patient_pseudonyms = df['PseudoNHSNoLinked'].dropna().unique().tolist() logger.info(f"Looking up GP diagnoses for {len(patient_pseudonyms)} unique patients...") - # Step 2: Call the new Snowflake-direct indication lookup + # Restrict GP codes to HCD data window (reduces noise from old diagnoses) + earliest_hcd_date = df['Intervention Date'].min() + if pd.notna(earliest_hcd_date): + earliest_hcd_date_str = pd.Timestamp(earliest_hcd_date).strftime('%Y-%m-%d') + logger.info(f"Restricting GP codes to HCD window: >= {earliest_hcd_date_str}") + else: + earliest_hcd_date_str = None + gp_matches_df = get_patient_indication_groups( patient_pseudonyms=patient_pseudonyms, connector=connector, batch_size=500, + earliest_hcd_date=earliest_hcd_date_str, ) - # Step 3: Build indication_df mapping UPID -> Indication_Group - # For matched patients: Indication_Group = Search_Term - # For unmatched patients: Indication_Group = Directory + " (no GP dx)" - # - # IMPORTANT: We need ALL unique UPIDs, not just unique PseudoNHSNoLinked. - # A patient can have multiple UPIDs if they visited multiple providers. + # Step 3: Assign drug-aware indications using cross-referencing + # This replaces the old per-patient approach with per-drug matching + modified_df, indication_df = assign_drug_indications( + df=df, + gp_matches_df=gp_matches_df, + search_term_to_fragments=search_term_to_fragments, + ) - # Get all unique UPID records with their PseudoNHSNoLinked and Directory - upid_lookup = df[['UPID', 'PseudoNHSNoLinked', 'Directory']].drop_duplicates( - subset=['UPID'] - ).copy() + logger.info(f"Drug-aware indication matching complete. " + f"Modified UPIDs: {modified_df['UPID'].nunique()}, " + f"Indication groups: {len(indication_df)}") - if gp_matches_df.empty: - logger.warning("No GP matches found - all patients will use fallback directory") - # All patients use fallback - indication_records = [] - for _, row in upid_lookup.iterrows(): - directory = row['Directory'] - indication_records.append({ - 'UPID': row['UPID'], - 'Indication_Group': str(directory) + " (no GP dx)" if pd.notna(directory) else "UNKNOWN (no GP dx)", - 'Source': 'FALLBACK', - }) - indication_df = pd.DataFrame(indication_records) + if indication_df.empty: + logger.warning("Empty indication_df - skipping indication charts") + for config in DATE_FILTER_CONFIGS: + results[f"{config.id}:indication"] = [] else: - # Create lookup dict: PseudoNHSNoLinked -> Search_Term - match_lookup = dict(zip( - gp_matches_df['PatientPseudonym'], - gp_matches_df['Search_Term'] - )) - - # Build indication records for each unique UPID - indication_records = [] - for _, row in upid_lookup.iterrows(): - pseudo = row['PseudoNHSNoLinked'] - upid = row['UPID'] - directory = row['Directory'] - - if pd.notna(pseudo) and pseudo in match_lookup: - indication_records.append({ - 'UPID': upid, - 'Indication_Group': match_lookup[pseudo], - 'Source': 'DIAGNOSIS', - }) - else: - # Use fallback: Directory + " (no GP dx)" - fallback_label = str(directory) + " (no GP dx)" if pd.notna(directory) else "UNKNOWN (no GP dx)" - indication_records.append({ - 'UPID': upid, - 'Indication_Group': fallback_label, - 'Source': 'FALLBACK', - }) - - indication_df = pd.DataFrame(indication_records) - - # Log coverage statistics - if not indication_df.empty: - diagnosis_count = (indication_df['Source'] == 'DIAGNOSIS').sum() - fallback_count = (indication_df['Source'] == 'FALLBACK').sum() - total = len(indication_df) - stats["diagnosis_coverage"] = { - "diagnosis": int(diagnosis_count), - "fallback": int(fallback_count), - "total": total, - "diagnosis_pct": round(100 * diagnosis_count / total, 1) if total > 0 else 0, - } - logger.info(f"Indication coverage: {diagnosis_count}/{total} ({stats['diagnosis_coverage']['diagnosis_pct']}%) diagnosis-matched") - - # Log top indication groups - top_indications = indication_df[indication_df['Source'] == 'DIAGNOSIS']['Indication_Group'].value_counts().head(5) - if len(top_indications) > 0: - logger.info(f"Top 5 indications: {dict(top_indications)}") - - # Rename column for compatibility with generate_icicle_chart_indication - # It expects indication_df to have 'Directory' column (mapped from Indication_Group) - indication_df_for_chart = indication_df[['UPID', 'Indication_Group']].copy() - indication_df_for_chart = indication_df_for_chart.rename(columns={'Indication_Group': 'Directory'}) - # Ensure unique UPID index (build_hierarchy requires uniquely valued Index) - # Keep first occurrence - DIAGNOSIS entries should come before FALLBACK - indication_df_for_chart = indication_df_for_chart.drop_duplicates(subset=['UPID'], keep='first') - indication_df_for_chart = indication_df_for_chart.set_index('UPID') - - # Process each date filter with indication grouping + # Process each date filter with drug-aware indication grouping + # Use modified_df (with indication-aware UPIDs) instead of original df for config in DATE_FILTER_CONFIGS: logger.info(f"Processing indication pathway for {config.id}") ice_df = process_indication_pathway_for_date_filter( - df=df, - indication_df=indication_df_for_chart, + df=modified_df, + indication_df=indication_df, config=config, trust_filter=trust_filter, drug_filter=drug_filter, @@ -506,10 +457,6 @@ def refresh_pathways( results[f"{config.id}:indication"] = records logger.info(f"Completed {config.id}:indication: {len(records)} nodes") - else: - logger.warning("Empty indication_df - skipping indication charts") - for config in DATE_FILTER_CONFIGS: - results[f"{config.id}:indication"] = [] except Exception as e: logger.error(f"Error processing indication charts: {e}") diff --git a/progress.txt b/progress.txt index 96fd0ed..d31820a 100644 --- a/progress.txt +++ b/progress.txt @@ -253,3 +253,49 @@ This project extends the indication-based pathway charts (Phase 1-5 complete) wi - Can verify with py_compile; full Snowflake test via --dry-run ### Blocked items: - None + +## Iteration 5 — 2026-02-05 +### Task: 3.1 — Update refresh_pathways.py indication processing to use assign_drug_indications() +### Why this task: +- All Phase 1 & 2 dependencies complete (query returns all matches, drug mapping loaded, assign_drug_indications() exists) +- Task 3.1 is the pipeline integration step — wires the new drug-aware matching into the actual refresh pipeline +- Must be done before Task 3.2 (dry run test) which validates the integrated pipeline +### Status: COMPLETE +### What was done: +- Updated imports at top of `cli/refresh_pathways.py`: + - Added `assign_drug_indications` and `load_drug_indication_mapping` from `data_processing.diagnosis_lookup` +- Replaced the entire indication processing block (old ~90 lines → new ~60 lines): + - **Old approach**: `dict(zip(gp_matches_df['PatientPseudonym'], gp_matches_df['Search_Term']))` — only kept LAST match per patient, no drug awareness + - **New approach**: + 1. `load_drug_indication_mapping()` → `search_term_to_fragments` + 2. Compute `earliest_hcd_date` from `df['Intervention Date'].min()` as ISO string + 3. `get_patient_indication_groups(earliest_hcd_date=earliest_hcd_date_str)` → all GP matches with code_frequency + 4. `assign_drug_indications(df, gp_matches_df, search_term_to_fragments)` → `(modified_df, indication_df)` + 5. Pass `modified_df` (not original `df`) to `process_indication_pathway_for_date_filter()` + 6. `indication_df` already indexed by modified UPID with 'Directory' column — directly compatible +- Removed: old `match_lookup`, `upid_lookup`, manual `indication_records` building, `indication_df_for_chart` renaming +- Kept: Snowflake availability check, PseudoNHSNoLinked column check, error handling, date filter loop +### Validation results: +- Tier 1 (Code): py_compile PASSED, individual imports PASSED, full module import PASSED +- Tier 2 (Data): N/A — requires live Snowflake for dry run test (Task 3.2) +- Tier 3 (Functional): N/A — no UI changes +### Files changed: +- cli/refresh_pathways.py (updated imports, replaced indication processing block) +- IMPLEMENTATION_PLAN.md (marked 3.1 subtasks [x]) +### Committed: [pending] +### Patterns discovered: +- `assign_drug_indications()` returns `indication_df` already indexed by modified UPID with 'Directory' column — no need for intermediate renaming/reindexing steps that the old code required +- `earliest_hcd_date` must be converted via `pd.Timestamp(...).strftime('%Y-%m-%d')` because `df['Intervention Date'].min()` may return a Timestamp or string depending on data source +- The old code had a "stats['diagnosis_coverage']" tracking block — this is now handled internally by `assign_drug_indications()` logging. If stats tracking in the return dict is needed later, can add it back. +### Next iteration should: +- Work on Task 3.2: Run `python -m cli.refresh_pathways --chart-type indication --dry-run -v` + - This requires a live Snowflake connection + - Verify: modified UPIDs appear in logs, match rates logged, pathway nodes generated + - If dry run passes, move to Phase 4 (full refresh + validation) +- Key things to check in dry run output: + - "Drug-aware indication matching complete" log message with match/fallback counts + - "Modified UPIDs" count should be HIGHER than unique patient count (patients with multiple drugs for different indications) + - Pathway node counts for indication charts should be in same ballpark as before (~300 per date filter) + - No errors in indication pathway processing +### Blocked items: +- None