feat: add assign_drug_indications() for drug-aware indication matching (Task 2.1 + 2.2)
This commit is contained in:
@@ -1204,6 +1204,175 @@ def get_search_terms_for_drug(
|
||||
return matched_terms
|
||||
|
||||
|
||||
def assign_drug_indications(
|
||||
df: "pd.DataFrame",
|
||||
gp_matches_df: "pd.DataFrame",
|
||||
search_term_to_fragments: dict[str, list[str]],
|
||||
) -> "tuple[pd.DataFrame, pd.DataFrame]":
|
||||
"""
|
||||
Assign drug-aware indications by cross-referencing GP diagnoses with drug mappings.
|
||||
|
||||
For each UPID + Drug Name pair in the HCD data:
|
||||
1. Look up patient's GP-matched Search_Terms (via PseudoNHSNoLinked → gp_matches_df)
|
||||
2. Look up which Search_Terms list this drug (via search_term_to_fragments)
|
||||
3. Intersect = valid indications for this drug-patient pair
|
||||
4. If 1 match: use it
|
||||
5. If multiple: pick highest code_frequency (most GP coding = most likely indication)
|
||||
6. If tied frequency: alphabetical Search_Term for determinism
|
||||
7. If 0 matches: fallback to "{Directory} (no GP dx)"
|
||||
|
||||
Modifies UPID to include indication: "{UPID}|{search_term}" or "{UPID}|{Directory} (no GP dx)".
|
||||
This makes drugs under different indications appear as separate pathways.
|
||||
|
||||
Args:
|
||||
df: HCD DataFrame with columns: UPID, Drug Name, PseudoNHSNoLinked, Directory
|
||||
gp_matches_df: GP matches from get_patient_indication_groups() with columns:
|
||||
PatientPseudonym, Search_Term, code_frequency
|
||||
(multiple rows per patient, one per matched Search_Term)
|
||||
search_term_to_fragments: From load_drug_indication_mapping()[1].
|
||||
Maps search_term -> list of drug fragments (UPPERCASE).
|
||||
|
||||
Returns:
|
||||
Tuple of (modified_df, indication_df):
|
||||
- modified_df: Copy of df with UPID replaced by "{UPID}|{indication}"
|
||||
- indication_df: DataFrame indexed by modified UPID with 'Directory' column
|
||||
containing the Search_Term (or fallback label) for pathway hierarchy
|
||||
"""
|
||||
import pandas as pd
|
||||
|
||||
modified_df = df.copy()
|
||||
|
||||
# Build GP match lookup: PseudoNHSNoLinked -> {Search_Term: code_frequency}
|
||||
gp_lookup: dict[str, dict[str, int]] = {}
|
||||
if not gp_matches_df.empty:
|
||||
for _, row in gp_matches_df.iterrows():
|
||||
pseudo = row['PatientPseudonym']
|
||||
term = row['Search_Term']
|
||||
freq = int(row.get('code_frequency', 0))
|
||||
if pseudo not in gp_lookup:
|
||||
gp_lookup[pseudo] = {}
|
||||
gp_lookup[pseudo][term] = freq
|
||||
|
||||
logger.info(f"GP lookup built: {len(gp_lookup)} patients with GP matches")
|
||||
|
||||
# Cache drug -> Search_Terms lookups to avoid recomputing per row
|
||||
drug_search_terms_cache: dict[str, list[str]] = {}
|
||||
|
||||
def get_drug_terms(drug_name: str) -> list[str]:
|
||||
if drug_name not in drug_search_terms_cache:
|
||||
drug_search_terms_cache[drug_name] = get_search_terms_for_drug(
|
||||
drug_name, search_term_to_fragments
|
||||
)
|
||||
return drug_search_terms_cache[drug_name]
|
||||
|
||||
# Process each row: determine indication for each UPID + Drug Name
|
||||
# We work at the (UPID, Drug Name) pair level, then apply to all rows
|
||||
# Key: (UPID, Drug Name) -> (matched_search_term_or_fallback, is_diagnosis)
|
||||
pair_indication_cache: dict[tuple[str, str], tuple[str, bool]] = {}
|
||||
|
||||
# Get unique (UPID, Drug Name, PseudoNHSNoLinked, Directory) combos
|
||||
required_cols = ['UPID', 'Drug Name', 'PseudoNHSNoLinked', 'Directory']
|
||||
unique_pairs = modified_df[required_cols].drop_duplicates(
|
||||
subset=['UPID', 'Drug Name']
|
||||
)
|
||||
|
||||
match_count = 0
|
||||
fallback_count = 0
|
||||
tiebreak_count = 0
|
||||
|
||||
for _, pair_row in unique_pairs.iterrows():
|
||||
upid = pair_row['UPID']
|
||||
drug_name = pair_row['Drug Name']
|
||||
pseudo = pair_row['PseudoNHSNoLinked']
|
||||
directory = pair_row['Directory']
|
||||
|
||||
# Get Search_Terms this drug maps to
|
||||
drug_terms = get_drug_terms(drug_name)
|
||||
|
||||
# Get patient's GP-matched Search_Terms
|
||||
patient_gp_terms = gp_lookup.get(pseudo, {}) if pd.notna(pseudo) else {}
|
||||
|
||||
# Intersect: Search_Terms that list this drug AND patient has GP dx for
|
||||
valid_indications = {
|
||||
term: patient_gp_terms[term]
|
||||
for term in drug_terms
|
||||
if term in patient_gp_terms
|
||||
}
|
||||
|
||||
if len(valid_indications) == 1:
|
||||
matched_term = next(iter(valid_indications))
|
||||
pair_indication_cache[(upid, drug_name)] = (matched_term, True)
|
||||
match_count += 1
|
||||
|
||||
elif len(valid_indications) > 1:
|
||||
# Tiebreaker: highest code_frequency, then alphabetical
|
||||
sorted_terms = sorted(
|
||||
valid_indications.items(),
|
||||
key=lambda x: (-x[1], x[0]),
|
||||
)
|
||||
matched_term = sorted_terms[0][0]
|
||||
pair_indication_cache[(upid, drug_name)] = (matched_term, True)
|
||||
match_count += 1
|
||||
tiebreak_count += 1
|
||||
|
||||
else:
|
||||
# No intersection: fallback to directory
|
||||
if pd.notna(directory):
|
||||
fallback_label = f"{directory} (no GP dx)"
|
||||
else:
|
||||
fallback_label = "UNKNOWN (no GP dx)"
|
||||
pair_indication_cache[(upid, drug_name)] = (fallback_label, False)
|
||||
fallback_count += 1
|
||||
|
||||
total_pairs = len(unique_pairs)
|
||||
logger.info(f"Drug-indication matching complete:")
|
||||
logger.info(f" Total UPID-Drug pairs: {total_pairs}")
|
||||
logger.info(f" Matched (GP dx + drug mapping): {match_count} ({100*match_count/total_pairs:.1f}%)" if total_pairs > 0 else f" Matched: 0")
|
||||
logger.info(f" Tiebreaker used: {tiebreak_count}")
|
||||
logger.info(f" Fallback (no match): {fallback_count} ({100*fallback_count/total_pairs:.1f}%)" if total_pairs > 0 else f" Fallback: 0")
|
||||
|
||||
# Apply modified UPIDs to all rows
|
||||
# Build vectorized lookup: original UPID + Drug Name -> modified UPID
|
||||
def build_modified_upid(row: "pd.Series") -> str:
|
||||
upid = row['UPID']
|
||||
drug_name = row['Drug Name']
|
||||
key = (upid, drug_name)
|
||||
if key in pair_indication_cache:
|
||||
indication, _ = pair_indication_cache[key]
|
||||
return f"{upid}|{indication}"
|
||||
# Shouldn't happen, but fallback
|
||||
return f"{upid}|UNKNOWN (no GP dx)"
|
||||
|
||||
modified_df['UPID'] = modified_df.apply(build_modified_upid, axis=1)
|
||||
|
||||
# Build indication_df: modified UPID -> Search_Term/fallback label
|
||||
# This maps each unique modified UPID to its indication for the pathway hierarchy
|
||||
indication_records: list[dict[str, str]] = []
|
||||
seen_upids: set[str] = set()
|
||||
|
||||
for (original_upid, drug_name), (indication, is_diagnosis) in pair_indication_cache.items():
|
||||
modified_upid = f"{original_upid}|{indication}"
|
||||
if modified_upid not in seen_upids:
|
||||
indication_records.append({
|
||||
'UPID': modified_upid,
|
||||
'Directory': indication,
|
||||
})
|
||||
seen_upids.add(modified_upid)
|
||||
|
||||
indication_df = pd.DataFrame(indication_records)
|
||||
if not indication_df.empty:
|
||||
indication_df = indication_df.set_index('UPID')
|
||||
|
||||
logger.info(f" Unique modified UPIDs: {len(indication_df)}")
|
||||
|
||||
# Log top indications
|
||||
if not indication_df.empty:
|
||||
top_terms = indication_df['Directory'].value_counts().head(5)
|
||||
logger.info(f" Top 5 indications: {dict(top_terms)}")
|
||||
|
||||
return modified_df, indication_df
|
||||
|
||||
|
||||
# === NEW APPROACH: Query Snowflake directly using cluster CTE ===
|
||||
|
||||
# The cluster query mapping (embedded from snomed_indication_mapping_query.sql)
|
||||
@@ -1567,6 +1736,8 @@ __all__ = [
|
||||
"SEARCH_TERM_MERGE_MAP",
|
||||
"load_drug_indication_mapping",
|
||||
"get_search_terms_for_drug",
|
||||
# Drug-aware indication assignment
|
||||
"assign_drug_indications",
|
||||
# Snowflake-direct indication lookup (new approach)
|
||||
"get_patient_indication_groups",
|
||||
"CLUSTER_MAPPING_SQL",
|
||||
|
||||
Reference in New Issue
Block a user