feat: integrate drug-aware indication matching into refresh pipeline (Task 3.1)

Replace old per-patient indication matching in refresh_pathways.py with
drug-aware matching via assign_drug_indications(). Each drug is now
cross-referenced against both the patient's GP diagnoses AND the
DimSearchTerm.csv drug mapping. GP codes restricted to HCD data window
via earliest_hcd_date parameter.
This commit is contained in:
Andrew Charlwood
2026-02-05 23:11:01 +00:00
parent d9891c8991
commit 920570b437
3 changed files with 91 additions and 98 deletions
+42 -95
View File
@@ -48,7 +48,11 @@ from data_processing.pathway_pipeline import (
extract_indication_fields,
convert_to_records,
)
from data_processing.diagnosis_lookup import get_patient_indication_groups
from data_processing.diagnosis_lookup import (
assign_drug_indications,
get_patient_indication_groups,
load_drug_indication_mapping,
)
logger = get_logger(__name__)
@@ -359,9 +363,11 @@ def refresh_pathways(
results[f"{filter_id}:directory"] = records
elif current_chart_type == "indication":
# For indication charts, we need to look up GP diagnoses for all patients
# using the new Snowflake-direct approach via get_patient_indication_groups()
logger.info("Building indication groups from GP diagnosis lookups (Snowflake-direct)...")
# For indication charts, use drug-aware matching:
# 1. Get ALL GP diagnosis matches per patient (with code_frequency)
# 2. Cross-reference with drug-to-Search_Term mapping from DimSearchTerm.csv
# 3. Assign each drug to its matched indication via modified UPIDs
logger.info("Building drug-aware indication groups...")
# Check Snowflake availability
from data_processing.snowflake_connector import get_connector, is_snowflake_available
@@ -376,115 +382,60 @@ def refresh_pathways(
import pandas as pd
connector = get_connector()
# Step 1: Extract unique PseudoNHSNoLinked values from df
# This is the patient identifier that matches PatientPseudonym in GP records
if 'PseudoNHSNoLinked' not in df.columns:
logger.error("DataFrame missing 'PseudoNHSNoLinked' column - cannot lookup GP records")
for config in DATE_FILTER_CONFIGS:
results[f"{config.id}:indication"] = []
continue
# Get unique patient pseudonyms for GP lookup (avoid redundant queries)
patient_pseudonyms = df['PseudoNHSNoLinked'].dropna().unique().tolist()
# Step 1: Load drug-to-Search_Term mapping from DimSearchTerm.csv
_, search_term_to_fragments = load_drug_indication_mapping()
logger.info(f"Loaded drug mapping: {len(search_term_to_fragments)} Search_Terms")
# Step 2: Get ALL GP diagnosis matches per patient (with code_frequency)
patient_pseudonyms = df['PseudoNHSNoLinked'].dropna().unique().tolist()
logger.info(f"Looking up GP diagnoses for {len(patient_pseudonyms)} unique patients...")
# Step 2: Call the new Snowflake-direct indication lookup
# Restrict GP codes to HCD data window (reduces noise from old diagnoses)
earliest_hcd_date = df['Intervention Date'].min()
if pd.notna(earliest_hcd_date):
earliest_hcd_date_str = pd.Timestamp(earliest_hcd_date).strftime('%Y-%m-%d')
logger.info(f"Restricting GP codes to HCD window: >= {earliest_hcd_date_str}")
else:
earliest_hcd_date_str = None
gp_matches_df = get_patient_indication_groups(
patient_pseudonyms=patient_pseudonyms,
connector=connector,
batch_size=500,
earliest_hcd_date=earliest_hcd_date_str,
)
# Step 3: Build indication_df mapping UPID -> Indication_Group
# For matched patients: Indication_Group = Search_Term
# For unmatched patients: Indication_Group = Directory + " (no GP dx)"
#
# IMPORTANT: We need ALL unique UPIDs, not just unique PseudoNHSNoLinked.
# A patient can have multiple UPIDs if they visited multiple providers.
# Step 3: Assign drug-aware indications using cross-referencing
# This replaces the old per-patient approach with per-drug matching
modified_df, indication_df = assign_drug_indications(
df=df,
gp_matches_df=gp_matches_df,
search_term_to_fragments=search_term_to_fragments,
)
# Get all unique UPID records with their PseudoNHSNoLinked and Directory
upid_lookup = df[['UPID', 'PseudoNHSNoLinked', 'Directory']].drop_duplicates(
subset=['UPID']
).copy()
logger.info(f"Drug-aware indication matching complete. "
f"Modified UPIDs: {modified_df['UPID'].nunique()}, "
f"Indication groups: {len(indication_df)}")
if gp_matches_df.empty:
logger.warning("No GP matches found - all patients will use fallback directory")
# All patients use fallback
indication_records = []
for _, row in upid_lookup.iterrows():
directory = row['Directory']
indication_records.append({
'UPID': row['UPID'],
'Indication_Group': str(directory) + " (no GP dx)" if pd.notna(directory) else "UNKNOWN (no GP dx)",
'Source': 'FALLBACK',
})
indication_df = pd.DataFrame(indication_records)
if indication_df.empty:
logger.warning("Empty indication_df - skipping indication charts")
for config in DATE_FILTER_CONFIGS:
results[f"{config.id}:indication"] = []
else:
# Create lookup dict: PseudoNHSNoLinked -> Search_Term
match_lookup = dict(zip(
gp_matches_df['PatientPseudonym'],
gp_matches_df['Search_Term']
))
# Build indication records for each unique UPID
indication_records = []
for _, row in upid_lookup.iterrows():
pseudo = row['PseudoNHSNoLinked']
upid = row['UPID']
directory = row['Directory']
if pd.notna(pseudo) and pseudo in match_lookup:
indication_records.append({
'UPID': upid,
'Indication_Group': match_lookup[pseudo],
'Source': 'DIAGNOSIS',
})
else:
# Use fallback: Directory + " (no GP dx)"
fallback_label = str(directory) + " (no GP dx)" if pd.notna(directory) else "UNKNOWN (no GP dx)"
indication_records.append({
'UPID': upid,
'Indication_Group': fallback_label,
'Source': 'FALLBACK',
})
indication_df = pd.DataFrame(indication_records)
# Log coverage statistics
if not indication_df.empty:
diagnosis_count = (indication_df['Source'] == 'DIAGNOSIS').sum()
fallback_count = (indication_df['Source'] == 'FALLBACK').sum()
total = len(indication_df)
stats["diagnosis_coverage"] = {
"diagnosis": int(diagnosis_count),
"fallback": int(fallback_count),
"total": total,
"diagnosis_pct": round(100 * diagnosis_count / total, 1) if total > 0 else 0,
}
logger.info(f"Indication coverage: {diagnosis_count}/{total} ({stats['diagnosis_coverage']['diagnosis_pct']}%) diagnosis-matched")
# Log top indication groups
top_indications = indication_df[indication_df['Source'] == 'DIAGNOSIS']['Indication_Group'].value_counts().head(5)
if len(top_indications) > 0:
logger.info(f"Top 5 indications: {dict(top_indications)}")
# Rename column for compatibility with generate_icicle_chart_indication
# It expects indication_df to have 'Directory' column (mapped from Indication_Group)
indication_df_for_chart = indication_df[['UPID', 'Indication_Group']].copy()
indication_df_for_chart = indication_df_for_chart.rename(columns={'Indication_Group': 'Directory'})
# Ensure unique UPID index (build_hierarchy requires uniquely valued Index)
# Keep first occurrence - DIAGNOSIS entries should come before FALLBACK
indication_df_for_chart = indication_df_for_chart.drop_duplicates(subset=['UPID'], keep='first')
indication_df_for_chart = indication_df_for_chart.set_index('UPID')
# Process each date filter with indication grouping
# Process each date filter with drug-aware indication grouping
# Use modified_df (with indication-aware UPIDs) instead of original df
for config in DATE_FILTER_CONFIGS:
logger.info(f"Processing indication pathway for {config.id}")
ice_df = process_indication_pathway_for_date_filter(
df=df,
indication_df=indication_df_for_chart,
df=modified_df,
indication_df=indication_df,
config=config,
trust_filter=trust_filter,
drug_filter=drug_filter,
@@ -506,10 +457,6 @@ def refresh_pathways(
results[f"{config.id}:indication"] = records
logger.info(f"Completed {config.id}:indication: {len(records)} nodes")
else:
logger.warning("Empty indication_df - skipping indication charts")
for config in DATE_FILTER_CONFIGS:
results[f"{config.id}:indication"] = []
except Exception as e:
logger.error(f"Error processing indication charts: {e}")