feat: integrate batch GP diagnosis lookup for indication charts (Task 3.2)

- Add batch_lookup_indication_groups() to diagnosis_lookup.py
  - Efficient batch Snowflake queries (500 patients per batch)
  - Returns UPID → Indication_Group mapping
  - Source tracking: DIAGNOSIS vs FALLBACK
- Update cli/refresh_pathways.py indication processing
  - Call batch_lookup_indication_groups() before chart generation
  - Build indication_df for process_indication_pathway_for_date_filter()
  - Log diagnosis coverage statistics
- Enables full --chart-type all functionality
This commit is contained in:
Andrew Charlwood
2026-02-05 14:45:06 +00:00
parent 50b8548688
commit 8952156798
3 changed files with 320 additions and 15 deletions
+11 -6
View File
@@ -114,14 +114,19 @@ python -m reflex compile
- Total: 12 pathway datasets (6 dates × 2 chart types)
- [x] Add `--chart-type` argument: "all" (default), "directory", "indication"
- [x] Update progress logging to show both chart types
- [ ] Verify: Dry run shows both chart types being processed (requires Task 3.2 for full indication support)
- [x] Verify: Dry run shows both chart types being processed (Task 3.2 complete)
### 3.2 Integrate Diagnosis-Based Directorate in Pipeline
- [ ] Update `fetch_and_transform_data()` to include diagnosis lookup:
- After UPID creation, batch lookup SNOMED matches for all patients
- Store: matched_search_term, matched_directorate, match_source
- [ ] Handle Snowflake connection for GP record queries (batched for performance)
- [ ] Log coverage: X% diagnosis-matched, Y% fallback
- [x] Add `batch_lookup_indication_groups()` to `diagnosis_lookup.py`:
- Batch lookup SNOMED matches for all patients (500 patients per batch)
- Returns DataFrame with UPID, Indication_Group, Source columns
- Source is "DIAGNOSIS" (GP match found) or "FALLBACK" (no match)
- [x] Update `cli/refresh_pathways.py` indication processing:
- Call `batch_lookup_indication_groups()` before processing indication charts
- Build `indication_df` for use with `process_indication_pathway_for_date_filter()`
- Process all 6 date filters with indication grouping
- [x] Handle Snowflake connection for GP record queries (batched for performance)
- [x] Log coverage: X% diagnosis-matched, Y% fallback
- [ ] Verify: Test refresh with --dry-run, check coverage stats
### 3.3 Test Full Refresh Pipeline
+81 -8
View File
@@ -48,6 +48,7 @@ from data_processing.pathway_pipeline import (
extract_indication_fields,
convert_to_records,
)
from data_processing.diagnosis_lookup import batch_lookup_indication_groups
logger = get_logger(__name__)
@@ -358,14 +359,86 @@ def refresh_pathways(
results[f"{filter_id}:directory"] = records
elif current_chart_type == "indication":
# For indication charts, we need indication_df from GP diagnosis lookups
# This will be implemented in Task 3.2
# For now, log that indication processing requires the diagnosis pipeline
logger.warning("Indication chart processing not yet fully integrated")
logger.warning("Task 3.2 will add GP diagnosis lookup integration")
logger.info("Skipping indication charts for now...")
for config in DATE_FILTER_CONFIGS:
results[f"{config.id}:indication"] = []
# For indication charts, we need to look up GP diagnoses for all patients
# This creates indication_df mapping UPID -> Indication_Group
logger.info("Building indication groups from GP diagnosis lookups...")
# Get Snowflake connector for GP lookups
from data_processing.snowflake_connector import get_connector, is_snowflake_available
if not is_snowflake_available():
logger.warning("Snowflake not available - cannot process indication charts")
for config in DATE_FILTER_CONFIGS:
results[f"{config.id}:indication"] = []
continue
try:
connector = get_connector()
# Batch lookup indication groups for all patients
indication_df = batch_lookup_indication_groups(
df=df,
connector=connector,
batch_size=500,
)
# Log coverage statistics
if not indication_df.empty:
diagnosis_count = (indication_df['Source'] == 'DIAGNOSIS').sum()
fallback_count = (indication_df['Source'] == 'FALLBACK').sum()
total = len(indication_df)
stats["diagnosis_coverage"] = {
"diagnosis": diagnosis_count,
"fallback": fallback_count,
"total": total,
"diagnosis_pct": round(100 * diagnosis_count / total, 1) if total > 0 else 0,
}
logger.info(f"Indication coverage: {diagnosis_count}/{total} ({stats['diagnosis_coverage']['diagnosis_pct']}%) diagnosis-matched")
# Rename column for compatibility with generate_icicle_chart_indication
# It expects indication_df to have 'Directory' column (mapped from Indication_Group)
indication_df_for_chart = indication_df[['UPID', 'Indication_Group']].copy()
indication_df_for_chart = indication_df_for_chart.rename(columns={'Indication_Group': 'Directory'})
indication_df_for_chart = indication_df_for_chart.set_index('UPID')
# Process each date filter with indication grouping
for config in DATE_FILTER_CONFIGS:
logger.info(f"Processing indication pathway for {config.id}")
ice_df = process_indication_pathway_for_date_filter(
df=df,
indication_df=indication_df_for_chart,
config=config,
trust_filter=trust_filter,
drug_filter=drug_filter,
directory_filter=directory_filter,
minimum_patients=minimum_patients,
paths=paths,
)
if ice_df is None:
logger.warning(f"No indication pathway data for {config.id}")
results[f"{config.id}:indication"] = []
continue
# Extract denormalized fields (using indication variant)
ice_df = extract_indication_fields(ice_df)
# Convert to records with chart_type="indication"
records = convert_to_records(ice_df, config.id, refresh_id, chart_type="indication")
results[f"{config.id}:indication"] = records
logger.info(f"Completed {config.id}:indication: {len(records)} nodes")
else:
logger.warning("Empty indication_df - skipping indication charts")
for config in DATE_FILTER_CONFIGS:
results[f"{config.id}:indication"] = []
except Exception as e:
logger.error(f"Error processing indication charts: {e}")
logger.exception(e)
for config in DATE_FILTER_CONFIGS:
results[f"{config.id}:indication"] = []
# Count records per filter and chart type
stats["chart_type_counts"] = {}
+228 -1
View File
@@ -18,9 +18,12 @@ GP/Primary Care data (PrimaryCareClinicalCoding) as the authoritative source.
from dataclasses import dataclass, field
from datetime import date, datetime
from pathlib import Path
from typing import Optional, Callable, Any, cast
from typing import Optional, Callable, Any, cast, TYPE_CHECKING
import csv
if TYPE_CHECKING:
import pandas as pd
from core.logging_config import get_logger
from data_processing.database import DatabaseManager, default_db_manager
from data_processing.snowflake_connector import (
@@ -861,6 +864,228 @@ def get_available_clusters(
return []
def batch_lookup_indication_groups(
df: "pd.DataFrame",
connector: Optional[SnowflakeConnector] = None,
db_manager: Optional[DatabaseManager] = None,
batch_size: int = 500,
) -> "pd.DataFrame":
"""
Batch lookup GP diagnosis-based indication groups for a DataFrame of patients.
This is the efficient batch version of get_directorate_from_diagnosis().
Instead of querying Snowflake per patient, it batches the lookups for performance.
Strategy:
1. Get all unique (PersonKey, Drug Name) pairs from DataFrame
2. For each unique drug, get all SNOMED codes from local SQLite
3. Build batched Snowflake queries to check GP records
4. Return indication_df mapping UPID → Indication_Group
For unmatched patients, Indication_Group will be their Directory (with suffix).
Args:
df: DataFrame with columns: UPID, Drug Name, Directory, PersonKey
connector: Optional SnowflakeConnector (defaults to singleton)
db_manager: Optional DatabaseManager (defaults to default_db_manager)
batch_size: Number of patients per Snowflake query batch
Returns:
DataFrame with columns: UPID, Indication_Group, Source
- Indication_Group: Search_Term (if matched) or "Directory (no GP dx)" (if not)
- Source: "DIAGNOSIS" or "FALLBACK"
"""
import pandas as pd
if db_manager is None:
db_manager = default_db_manager
logger.info(f"Starting batch indication lookup for {len(df)} records...")
# Step 1: Get unique (UPID, Drug Name, PersonKey, Directory) combinations
# We need PersonKey to query Snowflake (it's the PatientPseudonym)
if 'PersonKey' not in df.columns:
logger.error("DataFrame missing 'PersonKey' column - cannot lookup GP records")
# Return fallback for all patients
result_df = df[['UPID', 'Directory']].drop_duplicates().copy()
result_df['Indication_Group'] = result_df['Directory'] + " (no GP dx)"
result_df['Source'] = "FALLBACK"
return result_df[['UPID', 'Indication_Group', 'Source']]
# Get unique patient-drug combinations (we need one lookup per patient-drug pair)
unique_pairs = df[['UPID', 'Drug Name', 'PersonKey', 'Directory']].drop_duplicates()
logger.info(f"Found {len(unique_pairs)} unique patient-drug combinations")
# Step 2: Get all unique drugs and their SNOMED codes
unique_drugs = unique_pairs['Drug Name'].unique()
logger.info(f"Building SNOMED lookup for {len(unique_drugs)} unique drugs...")
# Build drug -> list of DrugSnomedMapping dict
drug_snomed_map: dict[str, list[DrugSnomedMapping]] = {}
all_snomed_codes: set[str] = set()
snomed_to_drug_searchterm: dict[str, list[tuple[str, str, str]]] = {} # snomed -> [(drug, search_term, primary_dir), ...]
for drug_name in unique_drugs:
mappings = get_drug_snomed_codes(drug_name, db_manager)
drug_snomed_map[drug_name] = mappings
for m in mappings:
all_snomed_codes.add(m.snomed_code)
if m.snomed_code not in snomed_to_drug_searchterm:
snomed_to_drug_searchterm[m.snomed_code] = []
snomed_to_drug_searchterm[m.snomed_code].append(
(drug_name, m.search_term, m.primary_directorate)
)
logger.info(f"Total SNOMED codes to check: {len(all_snomed_codes)}")
# Step 3: Check Snowflake availability
if not SNOWFLAKE_AVAILABLE or not is_snowflake_configured():
logger.warning("Snowflake not available - returning fallback for all patients")
result_df = unique_pairs[['UPID', 'Directory']].copy()
result_df['Indication_Group'] = result_df['Directory'] + " (no GP dx)"
result_df['Source'] = "FALLBACK"
return result_df[['UPID', 'Indication_Group', 'Source']].drop_duplicates(subset=['UPID'])
if connector is None:
connector = get_connector()
# Step 4: Query GP records for all patients in batches
# The query finds the most recent matching SNOMED code for each patient
# Get unique PersonKeys (each PersonKey = one patient)
unique_patients = unique_pairs[['PersonKey', 'UPID', 'Directory']].drop_duplicates(subset=['PersonKey'])
person_keys = unique_patients['PersonKey'].tolist()
logger.info(f"Querying GP records for {len(person_keys)} unique patients in batches of {batch_size}...")
# Results dict: PersonKey -> (snomed_code, event_date)
gp_matches: dict[str, tuple[str, Any]] = {}
# Convert SNOMED codes to list for query
snomed_list = list(all_snomed_codes)
if not snomed_list:
logger.warning("No SNOMED codes to check - returning fallback for all patients")
result_df = unique_pairs[['UPID', 'Directory']].copy()
result_df['Indication_Group'] = result_df['Directory'] + " (no GP dx)"
result_df['Source'] = "FALLBACK"
return result_df[['UPID', 'Indication_Group', 'Source']].drop_duplicates(subset=['UPID'])
# Build SNOMED IN clause (reused across batches)
snomed_placeholders = ", ".join(["%s"] * len(snomed_list))
# Process patients in batches
for batch_start in range(0, len(person_keys), batch_size):
batch_end = min(batch_start + batch_size, len(person_keys))
batch_person_keys = person_keys[batch_start:batch_end]
logger.info(f"Batch {batch_start//batch_size + 1}: patients {batch_start} to {batch_end}")
# Build patient IN clause
patient_placeholders = ", ".join(["%s"] * len(batch_person_keys))
# Query to find all matching SNOMED codes for these patients
# We'll get all matches and pick the most recent per patient in Python
query = f'''
SELECT
"PatientPseudonym",
"SNOMEDCode",
"EventDateTime"
FROM DATA_HUB.PHM."PrimaryCareClinicalCoding"
WHERE "PatientPseudonym" IN ({patient_placeholders})
AND "SNOMEDCode" IN ({snomed_placeholders})
ORDER BY "PatientPseudonym", "EventDateTime" DESC
'''
params = tuple(batch_person_keys) + tuple(snomed_list)
try:
results = connector.execute_dict(query, params)
# Process results - pick most recent per patient
for row in results:
person_key = row.get("PatientPseudonym")
snomed_code = row.get("SNOMEDCode")
event_date = row.get("EventDateTime")
if person_key and snomed_code:
# Keep only if we haven't seen this patient yet (first = most recent due to ORDER BY)
if person_key not in gp_matches:
gp_matches[person_key] = (snomed_code, event_date)
except Exception as e:
logger.error(f"Error querying GP records for batch: {e}")
# Continue with other batches
logger.info(f"Found GP matches for {len(gp_matches)} patients")
# Step 5: Build result DataFrame
# For each unique_pair, determine Indication_Group based on match status
results_list = []
# We need to dedupe by UPID - a patient might be on multiple drugs
# Strategy: For each UPID, use the most recent match (if any)
upid_to_match: dict[str, tuple[str, str]] = {} # UPID -> (Indication_Group, Source)
for _, row in unique_pairs.iterrows():
upid = row['UPID']
drug_name = row['Drug Name']
person_key = row['PersonKey']
directory = row['Directory']
# Check if patient has GP match
if person_key in gp_matches:
matched_snomed, event_date = gp_matches[person_key]
# Find the search_term for this SNOMED code and drug
# (A SNOMED code might map to multiple drugs with different search_terms)
if matched_snomed in snomed_to_drug_searchterm:
# Look for match with current drug first
search_term = None
for drug, st, pd in snomed_to_drug_searchterm[matched_snomed]:
if drug.upper() == drug_name.upper():
search_term = st
break
# If no drug-specific match, use any match
if search_term is None:
search_term = snomed_to_drug_searchterm[matched_snomed][0][1]
# Only update if we don't have a match for this UPID yet
if upid not in upid_to_match:
upid_to_match[upid] = (search_term, "DIAGNOSIS")
else:
# Shouldn't happen but fallback just in case
if upid not in upid_to_match:
upid_to_match[upid] = (directory + " (no GP dx)", "FALLBACK")
else:
# No GP match - use fallback
if upid not in upid_to_match:
upid_to_match[upid] = (directory + " (no GP dx)", "FALLBACK")
# Build result DataFrame
for upid, (indication_group, source) in upid_to_match.items():
results_list.append({
'UPID': upid,
'Indication_Group': indication_group,
'Source': source,
})
result_df = pd.DataFrame(results_list)
# Log statistics
diagnosis_count = len([s for s in result_df['Source'] if s == "DIAGNOSIS"])
fallback_count = len([s for s in result_df['Source'] if s == "FALLBACK"])
total = len(result_df)
logger.info(f"Indication lookup complete:")
logger.info(f" Total unique patients: {total}")
logger.info(f" DIAGNOSIS matches: {diagnosis_count} ({100*diagnosis_count/total:.1f}%)")
logger.info(f" FALLBACK (no GP match): {fallback_count} ({100*fallback_count/total:.1f}%)")
return result_df
# Export public API
__all__ = [
# Dataclasses
@@ -884,4 +1109,6 @@ __all__ = [
"patient_has_indication_direct",
# Diagnosis-based directorate assignment
"get_directorate_from_diagnosis",
# Batch lookup for indication groups
"batch_lookup_indication_groups",
]