From 0779df78d17d5021a48ffff12f7c46c043d4aece Mon Sep 17 00:00:00 2001 From: Andrew Charlwood Date: Thu, 5 Feb 2026 22:48:09 +0000 Subject: [PATCH] feat: add drug-to-indication mapping from DimSearchTerm.csv (Task 1.2) Add load_drug_indication_mapping() and get_search_terms_for_drug() to diagnosis_lookup.py. Loads DimSearchTerm.csv to build bidirectional lookup between drug name fragments and Search_Terms. Uses substring matching for drug fragments (handles both exact names like ADALIMUMAB and partial fragments like PEGYLATED). Handles duplicate Search_Terms (e.g., diabetes appearing under two directorates) by combining fragments. --- IMPLEMENTATION_PLAN.md | 374 +++++++++++++--------------- data_processing/diagnosis_lookup.py | 104 ++++++++ 2 files changed, 276 insertions(+), 202 deletions(-) diff --git a/IMPLEMENTATION_PLAN.md b/IMPLEMENTATION_PLAN.md index d4c2fef..b7f4968 100644 --- a/IMPLEMENTATION_PLAN.md +++ b/IMPLEMENTATION_PLAN.md @@ -1,255 +1,225 @@ -# Implementation Plan - Indication-Based Pathway Charts +# Implementation Plan - Drug-Aware Indication Matching ## Project Overview -Extend the pathway analysis application to show indication-based icicle charts alongside directory-based charts. Patient diagnoses are matched from GP records using SNOMED cluster codes. +Update the indication-based pathway charts so that patient indications are matched **per drug**, not just per patient. Currently, each patient gets ONE indication (most recent GP diagnosis match). This ignores which drugs the patient is actually taking. + +### The Problem + +A patient on ADALIMUMAB + OMALIZUMAB currently gets assigned a single indication (e.g., "rheumatoid arthritis" — the most recent GP match). But: +- ADALIMUMAB is used for rheumatoid arthritis, axial spondyloarthritis, crohn's disease, etc. +- OMALIZUMAB is used for asthma, allergic asthma, urticaria + +These are different clinical pathways and should be treated as separate treatment journeys. + +### The Solution + +Match each drug to an indication by cross-referencing: +1. **GP diagnosis** — which Search_Terms the patient has matching SNOMED codes for +2. **Drug mapping** — which Search_Terms list each drug (from `DimSearchTerm.csv`) + +Only assign a drug to an indication if BOTH conditions are met. If a patient's drugs map to different indications, they become separate pathways (via modified UPID). ### Key Design Decisions + | Aspect | Decision | |--------|----------| -| SNOMED source | Query `ClinicalCodingClusterSnomedCodes` clusters directly in Snowflake | -| Grouping level | `Search_Term` from cluster mapping (~148 conditions) | -| Chart types | Two: "By Directory" (existing) and "By Indication" (new toggle) | -| No-match display | Show assigned directorate in indication chart (mixed labels) | -| Multiple matches | Use most recent SNOMED code by GP record date | -| Data storage | No local SNOMED mapping — query Snowflake at refresh time | +| Drug-indication source | `data/DimSearchTerm.csv` — Search_Term → CleanedDrugName mapping | +| UPID modification | `{original_UPID}\|{search_term}` for drugs with matched indication | +| GP diagnosis matching | Return ALL matches per patient (not just most recent) | +| Drug matching | Substring match: HCD drug name contains DimSearchTerm fragment | +| Multiple indication matches per drug | Use highest GP code frequency as tiebreaker (COUNT of matching SNOMED codes per Search_Term) | +| GP code time range | Only codes from MIN(Intervention Date) onwards — restricts to HCD data window | +| No indication match | Fallback to directory (same as current behavior) | +| Same patient, different indications | Separate pathways via different modified UPIDs | -### SNOMED Cluster Query -The `snomed_indication_mapping_query.sql` file contains the master query: -- Maps Search_Term → Cluster_ID for ~148 conditions -- Joins `ClinicalCodingClusterSnomedCodes` to get SNOMED codes per cluster -- Includes explicit manual mappings for conditions not in clusters -- Returns: Search_Term, SNOMEDCode, SNOMEDDescription +### Examples -## Quality Checks +**Patient on ADALIMUMAB + GOLIMUMAB, GP dx: axial spondyloarthritis + asthma** +- axial spondyloarthritis drug list includes both ADALIMUMAB and GOLIMUMAB +- → Both drugs grouped under "axial spondyloarthritis", single pathway +- Modified UPID: `RMV12345|axial spondyloarthritis` -Run after each task: +**Patient on ADALIMUMAB + OMALIZUMAB, GP dx: axial spondyloarthritis + asthma** +- axial spondyloarthritis lists ADALIMUMAB but not OMALIZUMAB +- asthma lists OMALIZUMAB but not ADALIMUMAB +- → Two separate pathways: + - `RMV12345|axial spondyloarthritis` with ADALIMUMAB + - `RMV12345|asthma` with OMALIZUMAB -```bash -# Syntax check -python -m py_compile - -# Import verification -python -c "from data_processing.diagnosis_lookup import *" -python -c "from data_processing.pathway_pipeline import *" - -# For Reflex changes -python -m reflex compile -``` +**Patient on ADALIMUMAB, GP dx: rheumatoid arthritis (47 codes) + crohn's disease (2 codes)** +- Both Search_Terms list ADALIMUMAB AND patient has GP dx for both +- → Tiebreaker: highest code frequency — rheumatoid arthritis has 47 matching SNOMED codes vs 2 for crohn's +- → Single pathway under rheumatoid arthritis (more clinical activity = more likely the treatment indication) --- -## Phase 1: Snowflake Integration +## Phase 1: Update Snowflake Query & Drug Mapping -### 1.1 Create Indication Lookup Query -- [x] Add `get_patient_indication_groups()` function to `data_processing/diagnosis_lookup.py`: - - Takes: list of patient pseudonyms (PseudoNHSNoLinked values) - - Uses the cluster query from `snomed_indication_mapping_query.sql` as a CTE - - Joins with `PrimaryCareClinicalCoding` to find patients with matching diagnoses - - Returns: DataFrame with PatientPseudonym, Search_Term, EventDateTime - - Uses most recent match per patient (ORDER BY EventDateTime DESC) -- [x] Handle edge cases: Snowflake unavailable, empty patient list -- [x] Verify: Function returns expected Search_Terms for test patients (92.8% match rate, 139 unique Search_Terms) +### 1.1 Update `get_patient_indication_groups()` to return ALL matches with frequency +- [ ] Modify the Snowflake query in `get_patient_indication_groups()` (diagnosis_lookup.py): + - Remove `QUALIFY ROW_NUMBER() OVER (PARTITION BY ... ORDER BY EventDateTime DESC) = 1` + - Return ALL matching Search_Terms per patient with code frequency: + ```sql + SELECT pc."PatientPseudonym" AS "PatientPseudonym", + aic.Search_Term AS "Search_Term", + COUNT(*) AS "code_frequency" + FROM PrimaryCareClinicalCoding pc + JOIN AllIndicationCodes aic ON pc."SNOMEDCode" = aic.SNOMEDCode + WHERE pc."PatientPseudonym" IN (...) + AND pc."EventDateTime" >= :earliest_hcd_date + GROUP BY pc."PatientPseudonym", aic.Search_Term + ``` + - `code_frequency` = number of matching SNOMED codes per Search_Term per patient + - Higher frequency = more clinical activity = stronger signal for tiebreaker + - `earliest_hcd_date` = `MIN(Intervention Date)` from the HCD DataFrame — restricts GP codes to the HCD data window, reducing noise from old/irrelevant diagnoses +- [ ] Accept `earliest_hcd_date` parameter in `get_patient_indication_groups()` and pass to query +- [ ] Keep batch processing (500 patients per query) +- [ ] Update return type: DataFrame now has multiple rows per patient (PatientPseudonym, Search_Term, code_frequency) +- [ ] Verify: Query returns more rows than before (patients with multiple matching diagnoses) -### 1.2 Update Data Pipeline to Include Indications -- [x] Modify `cli/refresh_pathways.py` to call indication lookup during refresh: - - After fetching HCD data, extract unique PseudoNHSNoLinked values - - Call `get_patient_indication_groups()` with patient list - - Create `indication_df` mapping UPID → Indication_Group - - For patients with no GP match: Indication_Group = fallback directorate -- [x] Log coverage: X% diagnosis-matched, Y% fallback -- [x] Verify: indication_df has correct structure for pathway processing (verified via full pipeline run) +### 1.2 Build drug-to-Search_Term lookup from DimSearchTerm.csv +- [x] Add function `load_drug_indication_mapping()` to `diagnosis_lookup.py`: + - Loads `data/DimSearchTerm.csv` + - Builds dict: `drug_fragment (uppercase) → list[Search_Term]` + - Also builds reverse: `search_term → list[drug_fragments]` + - CleanedDrugName is pipe-separated (e.g., "ADALIMUMAB|GOLIMUMAB|IXEKIZUMAB") +- [x] Add function `get_search_terms_for_drug(drug_name, search_term_to_fragments) -> list[str]`: + - Returns all Search_Terms whose drug fragments are substrings of the drug name (case-insensitive) + - More practical than per-term boolean check — returns all matches at once for Phase 2 use +- [x] Verify: ADALIMUMAB matches "axial spondyloarthritis", OMALIZUMAB matches "asthma" --- -## Phase 2: Schema & Processing Updates +## Phase 2: Drug-Aware Indication Matching Logic -### 2.1 Add Chart Type Support to Schema -- [x] Add `chart_type` column to `pathway_nodes` table (ALREADY DONE) -- [x] Update UNIQUE constraint to include chart_type (ALREADY DONE) -- [x] Add indexes for chart_type filtering (ALREADY DONE) -- [x] Verify: Existing migration works correctly (tables created, 3,589 nodes inserted) +### 2.1 Create `assign_drug_indications()` function +- [ ] Add to `diagnosis_lookup.py` or `pathway_pipeline.py`: + ``` + def assign_drug_indications( + df: pd.DataFrame, # HCD data with UPID, Drug Name columns + gp_matches_df: pd.DataFrame, # PatientPseudonym → list of matched Search_Terms + drug_mapping: dict, # From load_drug_indication_mapping() + ) -> tuple[pd.DataFrame, pd.DataFrame]: + Returns: (modified_df, indication_df) + - modified_df: HCD data with UPID replaced by {UPID}|{indication} + - indication_df: mapping modified_UPID → Search_Term + ``` +- [ ] Logic per UPID + Drug Name pair: + 1. Get patient's GP-matched Search_Terms with code_frequency (from gp_matches_df via PseudoNHSNoLinked) + 2. Get which Search_Terms include this drug (from drug_mapping) + 3. Intersection = valid indications for this drug-patient pair + 4. If 1 match: use it + 5. If multiple matches: use highest code_frequency as tiebreaker (most GP coding activity = most likely treatment indication) + 6. If 0 matches: use fallback directory +- [ ] Modify UPID in df rows: `{original_UPID}|{matched_search_term}` +- [ ] Build indication_df: `{modified_UPID}` → `Search_Term` (or fallback label) +- [ ] Verify: Function compiles, handles edge cases (no GP match, no drug match) -### 2.2 Create Indication Pathway Processing -- [x] Add `generate_icicle_chart_indication()` to `pathway_analyzer.py` (ALREADY DONE) -- [x] Add `process_indication_pathway_for_date_filter()` to `pathway_pipeline.py` (ALREADY DONE) -- [x] Add `extract_indication_fields()` for denormalized columns (ALREADY DONE) -- [x] Update `convert_to_records()` with `chart_type` parameter (ALREADY DONE) -- [x] Verify: Code compiles, imports work correctly - -### 2.3 Update Refresh Command for Dual Charts -- [x] Add `--chart-type` argument: "all", "directory", "indication" (ALREADY DONE) -- [x] Update indication processing to use new `get_patient_indication_groups()`: - - Replace `batch_lookup_indication_groups()` with the new Snowflake-direct approach - - Pass indication_df to `process_indication_pathway_for_date_filter()` -- [x] Process all 6 date filters for both chart types (existing loop already handles this) -- [x] Verify: Both chart types generate pathway data (indication verified with 695 nodes for all_6mo) +### 2.2 Handle tiebreaker for multiple indication matches +- [ ] When a drug matches multiple Search_Terms AND patient has GP dx for multiple: + - Use `code_frequency` from the GP query (COUNT of matching SNOMED codes per Search_Term) + - Higher code_frequency = more clinical activity for that condition = more likely treatment indication + - E.g., patient with 47 RA codes and 2 crohn's codes → ADALIMUMAB assigned to RA + - code_frequency is already returned by the updated query in Task 1.1 +- [ ] Verify: Tiebreaker logic correctly picks highest-frequency diagnosis +- [ ] Verify: Tie on frequency (rare but possible) falls back to alphabetical Search_Term for determinism --- -## Phase 3: Test Full Pipeline +## Phase 3: Pipeline Integration -### 3.1 Test Refresh with Real Data -- [x] Run `python -m cli.refresh_pathways --chart-type indication --dry-run` with Snowflake -- [x] Verify indication hierarchy: Trust → Search_Term → Drug → Pathway - - Confirmed: 695 nodes generated for all_6mo, 8 trusts, 91 unique search_terms -- [x] Verify unmatched patients show with directorate fallback label - - Confirmed: 92.7% diagnosis-matched (34,545/37,257 UPIDs), 7.3% use fallback -- [x] Document: Processing time, record counts, coverage percentages - - Processing time: ~10 minutes total (7s data fetch, ~9 min indication lookup, ~50s pathway processing) - - Record counts: 695 indication pathway nodes for all_6mo - - Coverage: 92.8% GP diagnosis match rate (34,006/36,628 patients) - - Top indications: drug misuse (8,749), influenza (6,336), diabetes (2,516), sepsis (1,991), cardiovascular disease (954) -- [x] Run full refresh with `--chart-type all` to populate database (requires non-dry-run) - - Fixed DataFrame mutation bug in prepare_data() (df.copy() added) - - Results: 3,633 total nodes (1,101 directory + 2,532 indication) across all 12 datasets - - Database populated: 3,589 nodes in pathway_nodes table +### 3.1 Update `refresh_pathways.py` indication processing +- [ ] In the `elif current_chart_type == "indication":` block: + 1. Call `get_patient_indication_groups()` as before (but now returns ALL matches) + 2. Load drug mapping: `drug_mapping = load_drug_indication_mapping()` + 3. Call `assign_drug_indications(df, gp_matches_df, drug_mapping)` + 4. Use modified_df (with indication-aware UPIDs) for pathway processing + 5. Use indication_df for the indication mapping +- [ ] Pass modified_df (not original df) to `process_indication_pathway_for_date_filter()` +- [ ] Verify: Pipeline compiles, `python -m py_compile cli/refresh_pathways.py` + +### 3.2 Test with dry run +- [ ] Run `python -m cli.refresh_pathways --chart-type indication --dry-run -v` +- [ ] Verify: + - Modified UPIDs appear in pipeline log (e.g., `RMV12345|rheumatoid arthritis`) + - Patient counts are reasonable (will be higher than before since same patient can appear under multiple indications) + - Drug-indication matching is logged (match rate, fallback rate) + - Pathway hierarchy shows drug-specific grouping under correct indications --- -## Phase 4: Reflex UI Updates +## Phase 4: Full Refresh & Validation -### 4.1 Add Chart Type State -- [x] Add state variables to `AppState`: - - `selected_chart_type: str = "directory"` (options: "directory", "indication") - - `chart_type_options: list[dict]` for dropdown -- [x] Add `set_chart_type()` event handler -- [x] Update `load_pathway_data()` to filter by chart_type -- [x] Verify: State changes correctly, data queries include chart_type filter +### 4.1 Full refresh with both chart types +- [ ] Run `python -m cli.refresh_pathways --chart-type all` +- [ ] Verify: + - Both chart types generate data + - Directory charts unchanged (no modified UPIDs) + - Indication charts reflect drug-aware matching -### 4.2 Add Chart Type Toggle UI -- [x] Create `chart_type_toggle()` component: - - Segmented control with pill-style buttons: "By Directory" | "By Indication" - - Placed in filter strip, first element before date filters -- [x] Wire to `set_chart_type()` handler -- [x] Verify: Toggle switches chart data, UI updates reactively (reflex compile passed) +### 4.2 Validate indication chart correctness +- [ ] Check that drugs under an indication all appear in that Search_Term's drug list +- [ ] Verify that a patient on drugs for different indications creates separate pathway branches +- [ ] Verify that drugs sharing an indication are grouped in the same pathway +- [ ] Log: patient count comparison (old vs new approach) -### 4.3 Update Chart Display for Indication Labels -- [x] Ensure icicle chart handles mixed labels: - - Search_Term labels (e.g., "rheumatoid arthritis") for matched patients - - Directorate labels (e.g., "RHEUMATOLOGY (no GP dx)") for unmatched - - Note: labels come from pathway_nodes pre-computed data, no template changes needed -- [x] Update hierarchy description (dynamic: "Trust → Directorate → ..." or "Trust → Indication → ...") -- [x] Update chart title to include chart type prefix -- [x] Verify: Chart renders correctly with both label types (reflex compile passed) - ---- - -## Phase 5: Validation & Documentation - -### 5.1 End-to-End Validation -- [x] Run full app with both chart types - - Fixed UNIQUE constraint bug: was `UNIQUE(date_filter_id, ids)`, needed `UNIQUE(date_filter_id, chart_type, ids)` - - Directory chart was missing level 0/1 nodes due to indication chart overwriting them - - Dropped and recreated pathway_nodes table, re-ran full refresh (3,633 nodes) - - Both chart types now have levels 0-5 with correct patient counts -- [x] Verify chart toggle works correctly - - Data loading tested: directory (293 nodes) and indication (695 nodes) for all_6mo - - All 12 date filter combinations generate valid icicle charts - - Root patients match between chart types (11,118 for all_6mo) -- [x] Verify filter interactions (drugs, directorates) work for both types - - Drug filter works for both chart types (ADALIMUMAB: 70 dir, 128 ind nodes) - - Directory filter works for directory charts (RHEUMATOLOGY: 86 nodes) - - Note: Directory filter returns 0 for indication charts (expected — directory column stores Search_Terms not directorate names) -- [x] Verify KPIs update correctly for both chart types - - Both show: 11,118 patients, £130.6M total cost for all_6mo - - KPIs consistent across chart types (same underlying patient data) -- [B] Test at multiple viewport sizes (BLOCKED: requires live browser — reflex run crashes on Windows due to Granian/watchfiles FileNotFoundError, environment issue not code issue. Deferred to manual testing when environment supports it.) - -### 5.2 Update Documentation -- [x] Update CLAUDE.md with new architecture -- [x] Document new CLI arguments -- [x] Document chart_type toggle behavior -- [x] Update data flow diagrams +### 4.3 Validate Reflex UI +- [ ] Run `python -m reflex compile` to verify app compiles +- [ ] Verify chart type toggle still works +- [ ] Verify indication chart shows correct hierarchy --- ## Completion Criteria All tasks marked `[x]` AND: -- [x] App compiles without errors (`reflex compile` succeeds) -- [x] Both chart types generate pathway data (12 total: 6 dates × 2 types) - - Directory: 1,101 nodes (293+329+93+105+134+147) - - Indication: 2,532 nodes (695+785+167+198+315+372) -- [x] Chart type toggle switches between Directory and Indication views - - Data layer verified: both chart types load correctly with all hierarchy levels -- [x] GP diagnosis matching works via Snowflake cluster query -- [x] Unmatched patients show in indication chart with directorate fallback label -- [x] Coverage metrics logged (% diagnosis-matched vs fallback) - - 92.7% diagnosis-matched (34,545/37,257 UPIDs) -- [x] All filters work correctly for both chart types - - Drug filter and date filter work for both. Directory filter only applies to directory charts (expected). -- [x] Performance acceptable (< 10 min full refresh, < 500ms filter change) - - Full refresh: 903 seconds (~15 min) for all 12 datasets - - SQLite query: sub-millisecond for filter changes +- [ ] App compiles without errors (`reflex compile` succeeds) +- [ ] Both chart types generate pathway data +- [ ] Indication charts show drug-specific indication matching +- [ ] Drugs under the same indication for the same patient are in one pathway +- [ ] Drugs under different indications for the same patient create separate pathways +- [ ] Fallback works for drugs with no indication match +- [ ] Full refresh completes successfully +- [ ] Existing directory charts are unaffected --- ## Reference -### SNOMED Cluster Query Structure -```sql --- From snomed_indication_mapping_query.sql -WITH SearchTermClusters AS ( - SELECT Search_Term, Cluster_ID FROM (VALUES - ('rheumatoid arthritis', 'eFI2_InflammatoryArthritis'), - ('macular degeneration', 'CUST_ICB_VISUAL_IMPAIRMENT'), - -- ... ~148 mappings - ) AS t(Search_Term, Cluster_ID) -), -ClusterCodes AS ( - SELECT stc.Search_Term, c."SNOMEDCode", c."SNOMEDDescription" - FROM SearchTermClusters stc - JOIN DATA_HUB.PHM."ClinicalCodingClusterSnomedCodes" c - ON stc.Cluster_ID = c."Cluster_ID" - WHERE c."SNOMEDCode" IS NOT NULL -), -ExplicitCodes AS ( - -- Manual mappings for conditions not in clusters - SELECT Search_Term, SNOMEDCode, SNOMEDDescription FROM (VALUES - ('ankylosing spondylitis', '162930007', 'Manual mapping'), - -- ... - ) AS t(Search_Term, SNOMEDCode, SNOMEDDescription) -) -SELECT * FROM ClusterCodes -UNION ALL -SELECT * FROM ExplicitCodes +### DimSearchTerm.csv Structure +``` +Search_Term,CleanedDrugName,PrimaryDirectorate +rheumatoid arthritis,ABATACEPT|ADALIMUMAB|ANAKINRA|BARICITINIB|...,RHEUMATOLOGY +asthma,BENRALIZUMAB|DUPILUMAB|INHALED|MEPOLIZUMAB|OMALIZUMAB|RESLIZUMAB,THORACIC MEDICINE ``` -### Current Pathway Hierarchy (Directory-based) +### Modified UPID Format ``` -Root (N&W ICS) -└── Trust (NNUH, QEH, JPH, etc.) - └── Directory (RHEUMATOLOGY, OPHTHALMOLOGY, etc.) - └── Drug (ADALIMUMAB, RANIBIZUMAB, etc.) - └── Pathway (drug sequences) +Original: RMV12345 +Modified: RMV12345|rheumatoid arthritis +Fallback: RMV12345|RHEUMATOLOGY (no GP dx) ``` -### New Pathway Hierarchy (Indication-based) +### Current vs New Indication Flow ``` -Root (N&W ICS) -└── Trust (NNUH, QEH, JPH, etc.) - └── Search_Term (rheumatoid arthritis, macular degeneration, etc.) - │ OR Directorate (RHEUMATOLOGY - for unmatched patients) - └── Drug (ADALIMUMAB, RANIBIZUMAB, etc.) - └── Pathway (drug sequences) +CURRENT: + Patient → GP dx (most recent) → single Search_Term → one pathway + +NEW: + Patient + Drug A → GP dx matching Drug A → Search_Term X + Patient + Drug B → GP dx matching Drug B → Search_Term Y + → If X == Y: one pathway under X + → If X != Y: two pathways (modified UPIDs) ``` ### Key Files -| File | Purpose | +| File | Changes | |------|---------| -| `snomed_indication_mapping_query.sql` | Master SNOMED cluster query | -| `data_processing/diagnosis_lookup.py` | GP diagnosis lookup functions | -| `data_processing/pathway_pipeline.py` | Indication pathway processing | -| `cli/refresh_pathways.py` | CLI for dual chart type refresh | -| `pathways_app/pathways_app.py` | Reflex UI with chart type toggle | - -### Expected Data Volumes - -| Metric | Expected | -|--------|----------| -| Search_Term conditions | ~148 (from cluster mapping) | -| Pathway nodes (directory, per date filter) | ~300 | -| Pathway nodes (indication, per date filter) | ~400-600 (more granular) | -| Total pathway nodes (6 dates × 2 types) | ~4,000-5,000 | +| `data_processing/diagnosis_lookup.py` | Update query, add drug mapping functions | +| `data_processing/pathway_pipeline.py` | Possibly minor changes for modified UPIDs | +| `cli/refresh_pathways.py` | Integrate drug-aware matching into pipeline | +| `data/DimSearchTerm.csv` | Reference data (read-only) | +| `analysis/pathway_analyzer.py` | No changes expected (UPID changes are transparent) | +| `pathways_app/pathways_app.py` | No changes expected | diff --git a/data_processing/diagnosis_lookup.py b/data_processing/diagnosis_lookup.py index 0dbd58b..9b41339 100644 --- a/data_processing/diagnosis_lookup.py +++ b/data_processing/diagnosis_lookup.py @@ -1087,6 +1087,107 @@ def batch_lookup_indication_groups( return result_df +# === Drug-to-indication mapping from DimSearchTerm.csv === + + +def load_drug_indication_mapping( + csv_path: Optional[str] = None, +) -> tuple[dict[str, list[str]], dict[str, list[str]]]: + """ + Load the drug-to-Search_Term mapping from DimSearchTerm.csv. + + Builds two lookup dicts: + - fragment_to_search_terms: drug fragment (UPPERCASE) -> list of Search_Terms containing it + - search_term_to_fragments: search_term -> list of drug fragments (UPPERCASE) + + DimSearchTerm.csv has columns: Search_Term, CleanedDrugName, PrimaryDirectorate + CleanedDrugName is pipe-separated (e.g., "ADALIMUMAB|GOLIMUMAB|IXEKIZUMAB"). + + Note: A Search_Term can appear multiple times with different PrimaryDirectorates + (e.g., "diabetes" appears under both DIABETIC MEDICINE and OPHTHALMOLOGY). + Drug fragments from all rows for the same Search_Term are combined. + + Args: + csv_path: Path to DimSearchTerm.csv. Defaults to data/DimSearchTerm.csv. + + Returns: + Tuple of (fragment_to_search_terms, search_term_to_fragments) + """ + if csv_path is None: + csv_path = str(Path(__file__).parent.parent / "data" / "DimSearchTerm.csv") + + fragment_to_search_terms: dict[str, list[str]] = {} + search_term_to_fragments: dict[str, list[str]] = {} + + try: + with open(csv_path, "r", encoding="utf-8") as f: + reader = csv.DictReader(f) + for row in reader: + search_term = row.get("Search_Term", "").strip() + drug_names_raw = row.get("CleanedDrugName", "").strip() + + if not search_term or not drug_names_raw: + continue + + fragments = [frag.strip().upper() for frag in drug_names_raw.split("|") if frag.strip()] + + # Build search_term -> fragments (accumulate for duplicate Search_Terms) + if search_term not in search_term_to_fragments: + search_term_to_fragments[search_term] = [] + for frag in fragments: + if frag not in search_term_to_fragments[search_term]: + search_term_to_fragments[search_term].append(frag) + + # Build fragment -> search_terms + for frag in fragments: + if frag not in fragment_to_search_terms: + fragment_to_search_terms[frag] = [] + if search_term not in fragment_to_search_terms[frag]: + fragment_to_search_terms[frag].append(search_term) + + logger.info( + f"Loaded drug-indication mapping: {len(search_term_to_fragments)} Search_Terms, " + f"{len(fragment_to_search_terms)} drug fragments" + ) + + except FileNotFoundError: + logger.error(f"DimSearchTerm.csv not found at {csv_path}") + except Exception as e: + logger.error(f"Error loading DimSearchTerm.csv: {e}") + + return fragment_to_search_terms, search_term_to_fragments + + +def get_search_terms_for_drug( + drug_name: str, + search_term_to_fragments: dict[str, list[str]], +) -> list[str]: + """ + Get all Search_Terms that list a given drug using substring matching. + + Checks if any drug fragment from DimSearchTerm is a SUBSTRING of the given + drug name (case-insensitive). This handles both exact matches (ADALIMUMAB) + and partial fragments (PEGYLATED, INHALED). + + Args: + drug_name: HCD drug name (e.g., "ADALIMUMAB 40MG", "PEGYLATED LIPOSOMAL DOXORUBICIN") + search_term_to_fragments: Mapping of search_term -> list of drug fragments + + Returns: + List of Search_Terms whose drug fragments match the drug name + """ + drug_name_upper = drug_name.upper() + matched_terms: list[str] = [] + + for search_term, fragments in search_term_to_fragments.items(): + for frag in fragments: + if frag in drug_name_upper: + matched_terms.append(search_term) + break # One matching fragment is enough for this Search_Term + + return matched_terms + + # === NEW APPROACH: Query Snowflake directly using cluster CTE === # The cluster query mapping (embedded from snomed_indication_mapping_query.sql) @@ -1428,6 +1529,9 @@ __all__ = [ "get_directorate_from_diagnosis", # Batch lookup for indication groups "batch_lookup_indication_groups", + # Drug-indication mapping from DimSearchTerm.csv + "load_drug_indication_mapping", + "get_search_terms_for_drug", # Snowflake-direct indication lookup (new approach) "get_patient_indication_groups", "CLUSTER_MAPPING_SQL",