diff --git a/IMPLEMENTATION_PLAN.md b/IMPLEMENTATION_PLAN.md index c93feea..68b2f2b 100644 --- a/IMPLEMENTATION_PLAN.md +++ b/IMPLEMENTATION_PLAN.md @@ -107,7 +107,7 @@ Only assign a drug to an indication if BOTH conditions are met. If a patient's d ## Phase 2: Drug-Aware Indication Matching Logic ### 2.1 Create `assign_drug_indications()` function -- [ ] Add to `diagnosis_lookup.py` or `pathway_pipeline.py`: +- [x] Add to `diagnosis_lookup.py` or `pathway_pipeline.py`: ``` def assign_drug_indications( df: pd.DataFrame, # HCD data with UPID, Drug Name columns @@ -118,25 +118,25 @@ Only assign a drug to an indication if BOTH conditions are met. If a patient's d - modified_df: HCD data with UPID replaced by {UPID}|{indication} - indication_df: mapping modified_UPID → Search_Term ``` -- [ ] Logic per UPID + Drug Name pair: +- [x] Logic per UPID + Drug Name pair: 1. Get patient's GP-matched Search_Terms with code_frequency (from gp_matches_df via PseudoNHSNoLinked) 2. Get which Search_Terms include this drug (from drug_mapping) 3. Intersection = valid indications for this drug-patient pair 4. If 1 match: use it 5. If multiple matches: use highest code_frequency as tiebreaker (most GP coding activity = most likely treatment indication) 6. If 0 matches: use fallback directory -- [ ] Modify UPID in df rows: `{original_UPID}|{matched_search_term}` -- [ ] Build indication_df: `{modified_UPID}` → `Search_Term` (or fallback label) -- [ ] Verify: Function compiles, handles edge cases (no GP match, no drug match) +- [x] Modify UPID in df rows: `{original_UPID}|{matched_search_term}` +- [x] Build indication_df: `{modified_UPID}` → `Search_Term` (or fallback label) +- [x] Verify: Function compiles, handles edge cases (no GP match, no drug match) ### 2.2 Handle tiebreaker for multiple indication matches -- [ ] When a drug matches multiple Search_Terms AND patient has GP dx for multiple: +- [x] When a drug matches multiple Search_Terms AND patient has GP dx for multiple: - Use `code_frequency` from the GP query (COUNT of matching SNOMED codes per Search_Term) - Higher code_frequency = more clinical activity for that condition = more likely treatment indication - E.g., patient with 47 RA codes and 2 crohn's codes → ADALIMUMAB assigned to RA - code_frequency is already returned by the updated query in Task 1.1 -- [ ] Verify: Tiebreaker logic correctly picks highest-frequency diagnosis -- [ ] Verify: Tie on frequency (rare but possible) falls back to alphabetical Search_Term for determinism +- [x] Verify: Tiebreaker logic correctly picks highest-frequency diagnosis +- [x] Verify: Tie on frequency (rare but possible) falls back to alphabetical Search_Term for determinism --- diff --git a/data_processing/diagnosis_lookup.py b/data_processing/diagnosis_lookup.py index 4b4fed8..f6fa933 100644 --- a/data_processing/diagnosis_lookup.py +++ b/data_processing/diagnosis_lookup.py @@ -1204,6 +1204,175 @@ def get_search_terms_for_drug( return matched_terms +def assign_drug_indications( + df: "pd.DataFrame", + gp_matches_df: "pd.DataFrame", + search_term_to_fragments: dict[str, list[str]], +) -> "tuple[pd.DataFrame, pd.DataFrame]": + """ + Assign drug-aware indications by cross-referencing GP diagnoses with drug mappings. + + For each UPID + Drug Name pair in the HCD data: + 1. Look up patient's GP-matched Search_Terms (via PseudoNHSNoLinked → gp_matches_df) + 2. Look up which Search_Terms list this drug (via search_term_to_fragments) + 3. Intersect = valid indications for this drug-patient pair + 4. If 1 match: use it + 5. If multiple: pick highest code_frequency (most GP coding = most likely indication) + 6. If tied frequency: alphabetical Search_Term for determinism + 7. If 0 matches: fallback to "{Directory} (no GP dx)" + + Modifies UPID to include indication: "{UPID}|{search_term}" or "{UPID}|{Directory} (no GP dx)". + This makes drugs under different indications appear as separate pathways. + + Args: + df: HCD DataFrame with columns: UPID, Drug Name, PseudoNHSNoLinked, Directory + gp_matches_df: GP matches from get_patient_indication_groups() with columns: + PatientPseudonym, Search_Term, code_frequency + (multiple rows per patient, one per matched Search_Term) + search_term_to_fragments: From load_drug_indication_mapping()[1]. + Maps search_term -> list of drug fragments (UPPERCASE). + + Returns: + Tuple of (modified_df, indication_df): + - modified_df: Copy of df with UPID replaced by "{UPID}|{indication}" + - indication_df: DataFrame indexed by modified UPID with 'Directory' column + containing the Search_Term (or fallback label) for pathway hierarchy + """ + import pandas as pd + + modified_df = df.copy() + + # Build GP match lookup: PseudoNHSNoLinked -> {Search_Term: code_frequency} + gp_lookup: dict[str, dict[str, int]] = {} + if not gp_matches_df.empty: + for _, row in gp_matches_df.iterrows(): + pseudo = row['PatientPseudonym'] + term = row['Search_Term'] + freq = int(row.get('code_frequency', 0)) + if pseudo not in gp_lookup: + gp_lookup[pseudo] = {} + gp_lookup[pseudo][term] = freq + + logger.info(f"GP lookup built: {len(gp_lookup)} patients with GP matches") + + # Cache drug -> Search_Terms lookups to avoid recomputing per row + drug_search_terms_cache: dict[str, list[str]] = {} + + def get_drug_terms(drug_name: str) -> list[str]: + if drug_name not in drug_search_terms_cache: + drug_search_terms_cache[drug_name] = get_search_terms_for_drug( + drug_name, search_term_to_fragments + ) + return drug_search_terms_cache[drug_name] + + # Process each row: determine indication for each UPID + Drug Name + # We work at the (UPID, Drug Name) pair level, then apply to all rows + # Key: (UPID, Drug Name) -> (matched_search_term_or_fallback, is_diagnosis) + pair_indication_cache: dict[tuple[str, str], tuple[str, bool]] = {} + + # Get unique (UPID, Drug Name, PseudoNHSNoLinked, Directory) combos + required_cols = ['UPID', 'Drug Name', 'PseudoNHSNoLinked', 'Directory'] + unique_pairs = modified_df[required_cols].drop_duplicates( + subset=['UPID', 'Drug Name'] + ) + + match_count = 0 + fallback_count = 0 + tiebreak_count = 0 + + for _, pair_row in unique_pairs.iterrows(): + upid = pair_row['UPID'] + drug_name = pair_row['Drug Name'] + pseudo = pair_row['PseudoNHSNoLinked'] + directory = pair_row['Directory'] + + # Get Search_Terms this drug maps to + drug_terms = get_drug_terms(drug_name) + + # Get patient's GP-matched Search_Terms + patient_gp_terms = gp_lookup.get(pseudo, {}) if pd.notna(pseudo) else {} + + # Intersect: Search_Terms that list this drug AND patient has GP dx for + valid_indications = { + term: patient_gp_terms[term] + for term in drug_terms + if term in patient_gp_terms + } + + if len(valid_indications) == 1: + matched_term = next(iter(valid_indications)) + pair_indication_cache[(upid, drug_name)] = (matched_term, True) + match_count += 1 + + elif len(valid_indications) > 1: + # Tiebreaker: highest code_frequency, then alphabetical + sorted_terms = sorted( + valid_indications.items(), + key=lambda x: (-x[1], x[0]), + ) + matched_term = sorted_terms[0][0] + pair_indication_cache[(upid, drug_name)] = (matched_term, True) + match_count += 1 + tiebreak_count += 1 + + else: + # No intersection: fallback to directory + if pd.notna(directory): + fallback_label = f"{directory} (no GP dx)" + else: + fallback_label = "UNKNOWN (no GP dx)" + pair_indication_cache[(upid, drug_name)] = (fallback_label, False) + fallback_count += 1 + + total_pairs = len(unique_pairs) + logger.info(f"Drug-indication matching complete:") + logger.info(f" Total UPID-Drug pairs: {total_pairs}") + logger.info(f" Matched (GP dx + drug mapping): {match_count} ({100*match_count/total_pairs:.1f}%)" if total_pairs > 0 else f" Matched: 0") + logger.info(f" Tiebreaker used: {tiebreak_count}") + logger.info(f" Fallback (no match): {fallback_count} ({100*fallback_count/total_pairs:.1f}%)" if total_pairs > 0 else f" Fallback: 0") + + # Apply modified UPIDs to all rows + # Build vectorized lookup: original UPID + Drug Name -> modified UPID + def build_modified_upid(row: "pd.Series") -> str: + upid = row['UPID'] + drug_name = row['Drug Name'] + key = (upid, drug_name) + if key in pair_indication_cache: + indication, _ = pair_indication_cache[key] + return f"{upid}|{indication}" + # Shouldn't happen, but fallback + return f"{upid}|UNKNOWN (no GP dx)" + + modified_df['UPID'] = modified_df.apply(build_modified_upid, axis=1) + + # Build indication_df: modified UPID -> Search_Term/fallback label + # This maps each unique modified UPID to its indication for the pathway hierarchy + indication_records: list[dict[str, str]] = [] + seen_upids: set[str] = set() + + for (original_upid, drug_name), (indication, is_diagnosis) in pair_indication_cache.items(): + modified_upid = f"{original_upid}|{indication}" + if modified_upid not in seen_upids: + indication_records.append({ + 'UPID': modified_upid, + 'Directory': indication, + }) + seen_upids.add(modified_upid) + + indication_df = pd.DataFrame(indication_records) + if not indication_df.empty: + indication_df = indication_df.set_index('UPID') + + logger.info(f" Unique modified UPIDs: {len(indication_df)}") + + # Log top indications + if not indication_df.empty: + top_terms = indication_df['Directory'].value_counts().head(5) + logger.info(f" Top 5 indications: {dict(top_terms)}") + + return modified_df, indication_df + + # === NEW APPROACH: Query Snowflake directly using cluster CTE === # The cluster query mapping (embedded from snomed_indication_mapping_query.sql) @@ -1567,6 +1736,8 @@ __all__ = [ "SEARCH_TERM_MERGE_MAP", "load_drug_indication_mapping", "get_search_terms_for_drug", + # Drug-aware indication assignment + "assign_drug_indications", # Snowflake-direct indication lookup (new approach) "get_patient_indication_groups", "CLUSTER_MAPPING_SQL",