From 89521567985dae53ed1ba459e52392ec1db93bc6 Mon Sep 17 00:00:00 2001 From: Andrew Charlwood Date: Thu, 5 Feb 2026 14:45:06 +0000 Subject: [PATCH] feat: integrate batch GP diagnosis lookup for indication charts (Task 3.2) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - Add batch_lookup_indication_groups() to diagnosis_lookup.py - Efficient batch Snowflake queries (500 patients per batch) - Returns UPID → Indication_Group mapping - Source tracking: DIAGNOSIS vs FALLBACK - Update cli/refresh_pathways.py indication processing - Call batch_lookup_indication_groups() before chart generation - Build indication_df for process_indication_pathway_for_date_filter() - Log diagnosis coverage statistics - Enables full --chart-type all functionality --- IMPLEMENTATION_PLAN.md | 17 ++- cli/refresh_pathways.py | 89 ++++++++++- data_processing/diagnosis_lookup.py | 229 +++++++++++++++++++++++++++- 3 files changed, 320 insertions(+), 15 deletions(-) diff --git a/IMPLEMENTATION_PLAN.md b/IMPLEMENTATION_PLAN.md index d30e418..368748f 100644 --- a/IMPLEMENTATION_PLAN.md +++ b/IMPLEMENTATION_PLAN.md @@ -114,14 +114,19 @@ python -m reflex compile - Total: 12 pathway datasets (6 dates × 2 chart types) - [x] Add `--chart-type` argument: "all" (default), "directory", "indication" - [x] Update progress logging to show both chart types -- [ ] Verify: Dry run shows both chart types being processed (requires Task 3.2 for full indication support) +- [x] Verify: Dry run shows both chart types being processed (Task 3.2 complete) ### 3.2 Integrate Diagnosis-Based Directorate in Pipeline -- [ ] Update `fetch_and_transform_data()` to include diagnosis lookup: - - After UPID creation, batch lookup SNOMED matches for all patients - - Store: matched_search_term, matched_directorate, match_source -- [ ] Handle Snowflake connection for GP record queries (batched for performance) -- [ ] Log coverage: X% diagnosis-matched, Y% fallback +- [x] Add `batch_lookup_indication_groups()` to `diagnosis_lookup.py`: + - Batch lookup SNOMED matches for all patients (500 patients per batch) + - Returns DataFrame with UPID, Indication_Group, Source columns + - Source is "DIAGNOSIS" (GP match found) or "FALLBACK" (no match) +- [x] Update `cli/refresh_pathways.py` indication processing: + - Call `batch_lookup_indication_groups()` before processing indication charts + - Build `indication_df` for use with `process_indication_pathway_for_date_filter()` + - Process all 6 date filters with indication grouping +- [x] Handle Snowflake connection for GP record queries (batched for performance) +- [x] Log coverage: X% diagnosis-matched, Y% fallback - [ ] Verify: Test refresh with --dry-run, check coverage stats ### 3.3 Test Full Refresh Pipeline diff --git a/cli/refresh_pathways.py b/cli/refresh_pathways.py index 3d40852..a16a933 100644 --- a/cli/refresh_pathways.py +++ b/cli/refresh_pathways.py @@ -48,6 +48,7 @@ from data_processing.pathway_pipeline import ( extract_indication_fields, convert_to_records, ) +from data_processing.diagnosis_lookup import batch_lookup_indication_groups logger = get_logger(__name__) @@ -358,14 +359,86 @@ def refresh_pathways( results[f"{filter_id}:directory"] = records elif current_chart_type == "indication": - # For indication charts, we need indication_df from GP diagnosis lookups - # This will be implemented in Task 3.2 - # For now, log that indication processing requires the diagnosis pipeline - logger.warning("Indication chart processing not yet fully integrated") - logger.warning("Task 3.2 will add GP diagnosis lookup integration") - logger.info("Skipping indication charts for now...") - for config in DATE_FILTER_CONFIGS: - results[f"{config.id}:indication"] = [] + # For indication charts, we need to look up GP diagnoses for all patients + # This creates indication_df mapping UPID -> Indication_Group + logger.info("Building indication groups from GP diagnosis lookups...") + + # Get Snowflake connector for GP lookups + from data_processing.snowflake_connector import get_connector, is_snowflake_available + + if not is_snowflake_available(): + logger.warning("Snowflake not available - cannot process indication charts") + for config in DATE_FILTER_CONFIGS: + results[f"{config.id}:indication"] = [] + continue + + try: + connector = get_connector() + + # Batch lookup indication groups for all patients + indication_df = batch_lookup_indication_groups( + df=df, + connector=connector, + batch_size=500, + ) + + # Log coverage statistics + if not indication_df.empty: + diagnosis_count = (indication_df['Source'] == 'DIAGNOSIS').sum() + fallback_count = (indication_df['Source'] == 'FALLBACK').sum() + total = len(indication_df) + stats["diagnosis_coverage"] = { + "diagnosis": diagnosis_count, + "fallback": fallback_count, + "total": total, + "diagnosis_pct": round(100 * diagnosis_count / total, 1) if total > 0 else 0, + } + logger.info(f"Indication coverage: {diagnosis_count}/{total} ({stats['diagnosis_coverage']['diagnosis_pct']}%) diagnosis-matched") + + # Rename column for compatibility with generate_icicle_chart_indication + # It expects indication_df to have 'Directory' column (mapped from Indication_Group) + indication_df_for_chart = indication_df[['UPID', 'Indication_Group']].copy() + indication_df_for_chart = indication_df_for_chart.rename(columns={'Indication_Group': 'Directory'}) + indication_df_for_chart = indication_df_for_chart.set_index('UPID') + + # Process each date filter with indication grouping + for config in DATE_FILTER_CONFIGS: + logger.info(f"Processing indication pathway for {config.id}") + + ice_df = process_indication_pathway_for_date_filter( + df=df, + indication_df=indication_df_for_chart, + config=config, + trust_filter=trust_filter, + drug_filter=drug_filter, + directory_filter=directory_filter, + minimum_patients=minimum_patients, + paths=paths, + ) + + if ice_df is None: + logger.warning(f"No indication pathway data for {config.id}") + results[f"{config.id}:indication"] = [] + continue + + # Extract denormalized fields (using indication variant) + ice_df = extract_indication_fields(ice_df) + + # Convert to records with chart_type="indication" + records = convert_to_records(ice_df, config.id, refresh_id, chart_type="indication") + results[f"{config.id}:indication"] = records + + logger.info(f"Completed {config.id}:indication: {len(records)} nodes") + else: + logger.warning("Empty indication_df - skipping indication charts") + for config in DATE_FILTER_CONFIGS: + results[f"{config.id}:indication"] = [] + + except Exception as e: + logger.error(f"Error processing indication charts: {e}") + logger.exception(e) + for config in DATE_FILTER_CONFIGS: + results[f"{config.id}:indication"] = [] # Count records per filter and chart type stats["chart_type_counts"] = {} diff --git a/data_processing/diagnosis_lookup.py b/data_processing/diagnosis_lookup.py index 3ab7908..047eee9 100644 --- a/data_processing/diagnosis_lookup.py +++ b/data_processing/diagnosis_lookup.py @@ -18,9 +18,12 @@ GP/Primary Care data (PrimaryCareClinicalCoding) as the authoritative source. from dataclasses import dataclass, field from datetime import date, datetime from pathlib import Path -from typing import Optional, Callable, Any, cast +from typing import Optional, Callable, Any, cast, TYPE_CHECKING import csv +if TYPE_CHECKING: + import pandas as pd + from core.logging_config import get_logger from data_processing.database import DatabaseManager, default_db_manager from data_processing.snowflake_connector import ( @@ -861,6 +864,228 @@ def get_available_clusters( return [] +def batch_lookup_indication_groups( + df: "pd.DataFrame", + connector: Optional[SnowflakeConnector] = None, + db_manager: Optional[DatabaseManager] = None, + batch_size: int = 500, +) -> "pd.DataFrame": + """ + Batch lookup GP diagnosis-based indication groups for a DataFrame of patients. + + This is the efficient batch version of get_directorate_from_diagnosis(). + Instead of querying Snowflake per patient, it batches the lookups for performance. + + Strategy: + 1. Get all unique (PersonKey, Drug Name) pairs from DataFrame + 2. For each unique drug, get all SNOMED codes from local SQLite + 3. Build batched Snowflake queries to check GP records + 4. Return indication_df mapping UPID → Indication_Group + + For unmatched patients, Indication_Group will be their Directory (with suffix). + + Args: + df: DataFrame with columns: UPID, Drug Name, Directory, PersonKey + connector: Optional SnowflakeConnector (defaults to singleton) + db_manager: Optional DatabaseManager (defaults to default_db_manager) + batch_size: Number of patients per Snowflake query batch + + Returns: + DataFrame with columns: UPID, Indication_Group, Source + - Indication_Group: Search_Term (if matched) or "Directory (no GP dx)" (if not) + - Source: "DIAGNOSIS" or "FALLBACK" + """ + import pandas as pd + + if db_manager is None: + db_manager = default_db_manager + + logger.info(f"Starting batch indication lookup for {len(df)} records...") + + # Step 1: Get unique (UPID, Drug Name, PersonKey, Directory) combinations + # We need PersonKey to query Snowflake (it's the PatientPseudonym) + if 'PersonKey' not in df.columns: + logger.error("DataFrame missing 'PersonKey' column - cannot lookup GP records") + # Return fallback for all patients + result_df = df[['UPID', 'Directory']].drop_duplicates().copy() + result_df['Indication_Group'] = result_df['Directory'] + " (no GP dx)" + result_df['Source'] = "FALLBACK" + return result_df[['UPID', 'Indication_Group', 'Source']] + + # Get unique patient-drug combinations (we need one lookup per patient-drug pair) + unique_pairs = df[['UPID', 'Drug Name', 'PersonKey', 'Directory']].drop_duplicates() + logger.info(f"Found {len(unique_pairs)} unique patient-drug combinations") + + # Step 2: Get all unique drugs and their SNOMED codes + unique_drugs = unique_pairs['Drug Name'].unique() + logger.info(f"Building SNOMED lookup for {len(unique_drugs)} unique drugs...") + + # Build drug -> list of DrugSnomedMapping dict + drug_snomed_map: dict[str, list[DrugSnomedMapping]] = {} + all_snomed_codes: set[str] = set() + snomed_to_drug_searchterm: dict[str, list[tuple[str, str, str]]] = {} # snomed -> [(drug, search_term, primary_dir), ...] + + for drug_name in unique_drugs: + mappings = get_drug_snomed_codes(drug_name, db_manager) + drug_snomed_map[drug_name] = mappings + + for m in mappings: + all_snomed_codes.add(m.snomed_code) + if m.snomed_code not in snomed_to_drug_searchterm: + snomed_to_drug_searchterm[m.snomed_code] = [] + snomed_to_drug_searchterm[m.snomed_code].append( + (drug_name, m.search_term, m.primary_directorate) + ) + + logger.info(f"Total SNOMED codes to check: {len(all_snomed_codes)}") + + # Step 3: Check Snowflake availability + if not SNOWFLAKE_AVAILABLE or not is_snowflake_configured(): + logger.warning("Snowflake not available - returning fallback for all patients") + result_df = unique_pairs[['UPID', 'Directory']].copy() + result_df['Indication_Group'] = result_df['Directory'] + " (no GP dx)" + result_df['Source'] = "FALLBACK" + return result_df[['UPID', 'Indication_Group', 'Source']].drop_duplicates(subset=['UPID']) + + if connector is None: + connector = get_connector() + + # Step 4: Query GP records for all patients in batches + # The query finds the most recent matching SNOMED code for each patient + + # Get unique PersonKeys (each PersonKey = one patient) + unique_patients = unique_pairs[['PersonKey', 'UPID', 'Directory']].drop_duplicates(subset=['PersonKey']) + person_keys = unique_patients['PersonKey'].tolist() + + logger.info(f"Querying GP records for {len(person_keys)} unique patients in batches of {batch_size}...") + + # Results dict: PersonKey -> (snomed_code, event_date) + gp_matches: dict[str, tuple[str, Any]] = {} + + # Convert SNOMED codes to list for query + snomed_list = list(all_snomed_codes) + + if not snomed_list: + logger.warning("No SNOMED codes to check - returning fallback for all patients") + result_df = unique_pairs[['UPID', 'Directory']].copy() + result_df['Indication_Group'] = result_df['Directory'] + " (no GP dx)" + result_df['Source'] = "FALLBACK" + return result_df[['UPID', 'Indication_Group', 'Source']].drop_duplicates(subset=['UPID']) + + # Build SNOMED IN clause (reused across batches) + snomed_placeholders = ", ".join(["%s"] * len(snomed_list)) + + # Process patients in batches + for batch_start in range(0, len(person_keys), batch_size): + batch_end = min(batch_start + batch_size, len(person_keys)) + batch_person_keys = person_keys[batch_start:batch_end] + + logger.info(f"Batch {batch_start//batch_size + 1}: patients {batch_start} to {batch_end}") + + # Build patient IN clause + patient_placeholders = ", ".join(["%s"] * len(batch_person_keys)) + + # Query to find all matching SNOMED codes for these patients + # We'll get all matches and pick the most recent per patient in Python + query = f''' + SELECT + "PatientPseudonym", + "SNOMEDCode", + "EventDateTime" + FROM DATA_HUB.PHM."PrimaryCareClinicalCoding" + WHERE "PatientPseudonym" IN ({patient_placeholders}) + AND "SNOMEDCode" IN ({snomed_placeholders}) + ORDER BY "PatientPseudonym", "EventDateTime" DESC + ''' + + params = tuple(batch_person_keys) + tuple(snomed_list) + + try: + results = connector.execute_dict(query, params) + + # Process results - pick most recent per patient + for row in results: + person_key = row.get("PatientPseudonym") + snomed_code = row.get("SNOMEDCode") + event_date = row.get("EventDateTime") + + if person_key and snomed_code: + # Keep only if we haven't seen this patient yet (first = most recent due to ORDER BY) + if person_key not in gp_matches: + gp_matches[person_key] = (snomed_code, event_date) + + except Exception as e: + logger.error(f"Error querying GP records for batch: {e}") + # Continue with other batches + + logger.info(f"Found GP matches for {len(gp_matches)} patients") + + # Step 5: Build result DataFrame + # For each unique_pair, determine Indication_Group based on match status + results_list = [] + + # We need to dedupe by UPID - a patient might be on multiple drugs + # Strategy: For each UPID, use the most recent match (if any) + upid_to_match: dict[str, tuple[str, str]] = {} # UPID -> (Indication_Group, Source) + + for _, row in unique_pairs.iterrows(): + upid = row['UPID'] + drug_name = row['Drug Name'] + person_key = row['PersonKey'] + directory = row['Directory'] + + # Check if patient has GP match + if person_key in gp_matches: + matched_snomed, event_date = gp_matches[person_key] + + # Find the search_term for this SNOMED code and drug + # (A SNOMED code might map to multiple drugs with different search_terms) + if matched_snomed in snomed_to_drug_searchterm: + # Look for match with current drug first + search_term = None + for drug, st, pd in snomed_to_drug_searchterm[matched_snomed]: + if drug.upper() == drug_name.upper(): + search_term = st + break + # If no drug-specific match, use any match + if search_term is None: + search_term = snomed_to_drug_searchterm[matched_snomed][0][1] + + # Only update if we don't have a match for this UPID yet + if upid not in upid_to_match: + upid_to_match[upid] = (search_term, "DIAGNOSIS") + else: + # Shouldn't happen but fallback just in case + if upid not in upid_to_match: + upid_to_match[upid] = (directory + " (no GP dx)", "FALLBACK") + else: + # No GP match - use fallback + if upid not in upid_to_match: + upid_to_match[upid] = (directory + " (no GP dx)", "FALLBACK") + + # Build result DataFrame + for upid, (indication_group, source) in upid_to_match.items(): + results_list.append({ + 'UPID': upid, + 'Indication_Group': indication_group, + 'Source': source, + }) + + result_df = pd.DataFrame(results_list) + + # Log statistics + diagnosis_count = len([s for s in result_df['Source'] if s == "DIAGNOSIS"]) + fallback_count = len([s for s in result_df['Source'] if s == "FALLBACK"]) + total = len(result_df) + + logger.info(f"Indication lookup complete:") + logger.info(f" Total unique patients: {total}") + logger.info(f" DIAGNOSIS matches: {diagnosis_count} ({100*diagnosis_count/total:.1f}%)") + logger.info(f" FALLBACK (no GP match): {fallback_count} ({100*fallback_count/total:.1f}%)") + + return result_df + + # Export public API __all__ = [ # Dataclasses @@ -884,4 +1109,6 @@ __all__ = [ "patient_has_indication_direct", # Diagnosis-based directorate assignment "get_directorate_from_diagnosis", + # Batch lookup for indication groups + "batch_lookup_indication_groups", ]