From 1a817b82574a49a79eabb5b827c1e75d6caf6712 Mon Sep 17 00:00:00 2001 From: Andrew Charlwood Date: Thu, 5 Feb 2026 17:03:00 +0000 Subject: [PATCH] feat: add get_patient_indication_groups() for Snowflake-direct GP lookup (Task 1.1) - Add CLUSTER_MAPPING_SQL constant embedding full snomed_indication_mapping_query.sql - Add get_patient_indication_groups() function that queries Snowflake directly - Uses QUALIFY ROW_NUMBER() to get most recent diagnosis per patient - Returns DataFrame with PatientPseudonym, Search_Term, EventDateTime - Handles edge cases: empty list, Snowflake unavailable - Batch processing with configurable batch_size (default 500) - Comprehensive logging for match statistics --- IMPLEMENTATION_PLAN.md | 203 +++++----- data_processing/diagnosis_lookup.py | 318 ++++++++++++++++ progress.txt | 563 ++++------------------------ 3 files changed, 474 insertions(+), 610 deletions(-) diff --git a/IMPLEMENTATION_PLAN.md b/IMPLEMENTATION_PLAN.md index d0ad397..a064cf9 100644 --- a/IMPLEMENTATION_PLAN.md +++ b/IMPLEMENTATION_PLAN.md @@ -1,25 +1,25 @@ -# Implementation Plan - Direct SNOMED Indication Mapping +# Implementation Plan - Indication-Based Pathway Charts ## Project Overview -Extend the pathway analysis application to use direct SNOMED code matching from GP records to: -1. **Improve directorate assignment** - Use diagnosis-based directorate as primary method -2. **Add indication-based icicle chart** - New chart type showing Trust → Search_Term → Drug → Pathway - -### Data Source -`data/drug_snomed_mapping_enriched.csv` - 163K rows mapping: -- Drug → Indication → TA_ID → Search_Term → SNOMEDCode → PrimaryDirectorate +Extend the pathway analysis application to show indication-based icicle charts alongside directory-based charts. Patient diagnoses are matched from GP records using SNOMED cluster codes. ### Key Design Decisions | Aspect | Decision | |--------|----------| -| Primary directorate method | Diagnosis-based (SNOMED match → PrimaryDirectorate) | -| Fallback | department_identification() chain | -| Grouping level | `Search_Term` column (187 unique values) | -| Chart types | Two: "By Directory" and "By Indication" (user toggle) | +| SNOMED source | Query `ClinicalCodingClusterSnomedCodes` clusters directly in Snowflake | +| Grouping level | `Search_Term` from cluster mapping (~148 conditions) | +| Chart types | Two: "By Directory" (existing) and "By Indication" (new toggle) | | No-match display | Show assigned directorate in indication chart (mixed labels) | | Multiple matches | Use most recent SNOMED code by GP record date | -| Data storage | SQLite table `ref_drug_snomed_mapping`, accessed at ingestion | +| Data storage | No local SNOMED mapping — query Snowflake at refresh time | + +### SNOMED Cluster Query +The `snomed_indication_mapping_query.sql` file contains the master query: +- Maps Search_Term → Cluster_ID for ~148 conditions +- Joins `ClinicalCodingClusterSnomedCodes` to get SNOMED codes per cluster +- Includes explicit manual mappings for conditions not in clusters +- Returns: Search_Term, SNOMEDCode, SNOMEDDescription ## Quality Checks @@ -39,101 +39,62 @@ python -m reflex compile --- -## Phase 1: Data Infrastructure +## Phase 1: Snowflake Integration -### 1.1 Create SQLite Table for SNOMED Mapping -- [x] Add `REF_DRUG_SNOMED_MAPPING_SCHEMA` to `data_processing/schema.py`: - - Columns: drug_name, indication, ta_id, search_term, snomed_code, snomed_description, cleaned_drug_name, primary_directorate, all_directorates - - Index on: cleaned_drug_name, snomed_code, search_term -- [x] Add `create_drug_snomed_mapping_table()` helper function -- [x] Add to `ALL_TABLES_SCHEMA` and migration -- [x] Verify: `python -m data_processing.migrate` creates table +### 1.1 Create Indication Lookup Query +- [x] Add `get_patient_indication_groups()` function to `data_processing/diagnosis_lookup.py`: + - Takes: list of patient pseudonyms (PseudoNHSNoLinked values) + - Uses the cluster query from `snomed_indication_mapping_query.sql` as a CTE + - Joins with `PrimaryCareClinicalCoding` to find patients with matching diagnoses + - Returns: DataFrame with PatientPseudonym, Search_Term, EventDateTime + - Uses most recent match per patient (ORDER BY EventDateTime DESC) +- [x] Handle edge cases: Snowflake unavailable, empty patient list +- [ ] Verify: Function returns expected Search_Terms for test patients -### 1.2 Load Enriched Mapping Data -- [x] Create `data_processing/load_snomed_mapping.py` script: - - Read `data/drug_snomed_mapping_enriched.csv` - - Insert into `ref_drug_snomed_mapping` table - - Log: row count, unique drugs, unique search terms -- [x] Add CLI entry point: `python -m data_processing.load_snomed_mapping` -- [x] Verify: Query confirms 163K+ rows, 187 search terms - -### 1.3 Extend Diagnosis Lookup Module -- [x] Add `get_drug_snomed_codes(drug_name)` to `diagnosis_lookup.py`: - - Query `ref_drug_snomed_mapping` for all SNOMED codes for a drug - - Return list of DrugSnomedMapping(snomed_code, snomed_description, search_term, primary_directorate, indication, ta_id) -- [x] Add `patient_has_indication_direct(patient_pseudonym, snomed_codes, connector)`: - - Query `PrimaryCareClinicalCoding` directly for exact SNOMED code matches - - Return most recent match by EventDateTime - - Return: DirectSnomedMatchResult(matched_code, search_term, primary_directorate, event_date) or unmatched -- [x] Verify: Tested with ADALIMUMAB (1320 mappings, 10 Search_Terms), RANIBIZUMAB (104 mappings), case-insensitivity +### 1.2 Update Data Pipeline to Include Indications +- [ ] Modify `cli/refresh_pathways.py` to call indication lookup during refresh: + - After fetching HCD data, extract unique PseudoNHSNoLinked values + - Call `get_patient_indication_groups()` with patient list + - Create `indication_df` mapping UPID → Indication_Group + - For patients with no GP match: Indication_Group = fallback directorate +- [ ] Log coverage: X% diagnosis-matched, Y% fallback +- [ ] Verify: indication_df has correct structure for pathway processing --- -## Phase 2: Pathway Processing Updates +## Phase 2: Schema & Processing Updates -### 2.1 Update Directorate Assignment Logic -- [x] Modify `tools/data.py` `department_identification()` or create wrapper: - - Add `get_directorate_from_diagnosis(upid, drug_name, connector)` function - - Logic: Try diagnosis-based first → fallback to department_identification() - - Return: (directorate, source) where source is "DIAGNOSIS" or "FALLBACK" -- [x] Track assignment source for metrics (how many diagnosis-based vs fallback) -- [x] Verify: Test with sample patient data +### 2.1 Add Chart Type Support to Schema +- [x] Add `chart_type` column to `pathway_nodes` table (ALREADY DONE) +- [x] Update UNIQUE constraint to include chart_type (ALREADY DONE) +- [x] Add indexes for chart_type filtering (ALREADY DONE) +- [ ] Verify: Existing migration works correctly -### 2.2 Add Chart Type Support to Schema -- [x] Add `chart_type` column to `pathway_nodes` table: - - Values: "directory" (existing), "indication" (new) - - Update schema in `data_processing/schema.py` -- [x] Update UNIQUE constraint to include chart_type: `UNIQUE(date_filter_id, chart_type, ids)` -- [x] Add `idx_pathway_nodes_chart_type` index for filtering by chart type -- [x] Add `migrate_pathway_nodes_chart_type()` function for existing databases -- [x] Update `initialize_database()` to run migration automatically -- [x] Verify: Migration adds column, existing data defaults to "directory" +### 2.2 Create Indication Pathway Processing +- [x] Add `generate_icicle_chart_indication()` to `pathway_analyzer.py` (ALREADY DONE) +- [x] Add `process_indication_pathway_for_date_filter()` to `pathway_pipeline.py` (ALREADY DONE) +- [x] Add `extract_indication_fields()` for denormalized columns (ALREADY DONE) +- [x] Update `convert_to_records()` with `chart_type` parameter (ALREADY DONE) +- [ ] Verify: Code compiles, imports work correctly -### 2.3 Create Indication Pathway Processing -- [x] Add `process_indication_pathway_for_date_filter()` to `pathway_pipeline.py`: - - Group by: Trust → Search_Term → Drug → Pathway - - For unmatched patients: use directorate name as Search_Term fallback - - Output: Same structure as directory pathways but with indication grouping -- [x] Add `generate_icicle_chart_indication()` to `pathway_analyzer.py`: - - Variant of `generate_icicle_chart()` that uses indication_df instead of directory_df - - Takes `indication_df` parameter mapping UPID → Indication_Group -- [x] Add `extract_indication_fields()` for denormalized columns: - - Extract: trust_name, search_term (or fallback_directorate), drug_sequence -- [x] Update `convert_to_records()` to include `chart_type` parameter -- [x] Add `ChartType` type alias ("directory" | "indication") -- [x] Verify: Code compiles, imports work correctly +### 2.3 Update Refresh Command for Dual Charts +- [x] Add `--chart-type` argument: "all", "directory", "indication" (ALREADY DONE) +- [ ] Update indication processing to use new `get_patient_indication_groups()`: + - Replace `batch_lookup_indication_groups()` with the new Snowflake-direct approach + - Pass indication_df to `process_indication_pathway_for_date_filter()` +- [ ] Process all 6 date filters for both chart types +- [ ] Verify: Both chart types generate pathway data --- -## Phase 3: CLI & Data Refresh Updates +## Phase 3: Test Full Pipeline -### 3.1 Update Refresh Command for Dual Chart Types -- [x] Modify `cli/refresh_pathways.py`: - - Process both "directory" and "indication" chart types - - For each of 6 date filters: generate 2 chart datasets - - Total: 12 pathway datasets (6 dates × 2 chart types) -- [x] Add `--chart-type` argument: "all" (default), "directory", "indication" -- [x] Update progress logging to show both chart types -- [x] Verify: Dry run shows both chart types being processed (Task 3.2 complete) - -### 3.2 Integrate Diagnosis-Based Directorate in Pipeline -- [x] Add `batch_lookup_indication_groups()` to `diagnosis_lookup.py`: - - Batch lookup SNOMED matches for all patients (500 patients per batch) - - Returns DataFrame with UPID, Indication_Group, Source columns - - Source is "DIAGNOSIS" (GP match found) or "FALLBACK" (no match) -- [x] Update `cli/refresh_pathways.py` indication processing: - - Call `batch_lookup_indication_groups()` before processing indication charts - - Build `indication_df` for use with `process_indication_pathway_for_date_filter()` - - Process all 6 date filters with indication grouping -- [x] Handle Snowflake connection for GP record queries (batched for performance) -- [x] Log coverage: X% diagnosis-matched, Y% fallback -- [ ] Verify: Test refresh with --dry-run, check coverage stats - -### 3.3 Test Full Refresh Pipeline -- [~] Run `python -m cli.refresh_pathways` with real data -- [ ] Verify pathway_nodes table has both chart_type values -- [ ] Verify indication chart has expected hierarchy (Trust → SearchTerm → Drug) -- [ ] Verify unmatched patients appear with directorate fallback label +### 3.1 Test Refresh with Real Data +- [ ] Run `python -m cli.refresh_pathways --chart-type all` with Snowflake +- [ ] Verify pathway_nodes table has both chart_type values: + - `SELECT chart_type, COUNT(*) FROM pathway_nodes GROUP BY chart_type` +- [ ] Verify indication hierarchy: Trust → Search_Term → Drug → Pathway +- [ ] Verify unmatched patients show with directorate fallback label - [ ] Document: Processing time, record counts, coverage percentages --- @@ -166,20 +127,14 @@ python -m reflex compile ## Phase 5: Validation & Documentation -### 5.1 Measure Coverage Improvement -- [ ] Compare match rates: cluster-only vs cluster+direct SNOMED -- [ ] Generate report: % of patients with diagnosis-based directorate -- [ ] Identify drugs with best/worst coverage improvement -- [ ] Document results in progress.txt - -### 5.2 End-to-End Validation +### 5.1 End-to-End Validation - [ ] Run full app with both chart types - [ ] Verify chart toggle works correctly - [ ] Verify filter interactions (drugs, directorates) work for both types - [ ] Verify KPIs update correctly for both chart types - [ ] Test at multiple viewport sizes -### 5.3 Update Documentation +### 5.2 Update Documentation - [ ] Update CLAUDE.md with new architecture - [ ] Document new CLI arguments - [ ] Document chart_type toggle behavior @@ -193,7 +148,7 @@ All tasks marked `[x]` AND: - [ ] App compiles without errors (`reflex compile` succeeds) - [ ] Both chart types generate pathway data (12 total: 6 dates × 2 types) - [ ] Chart type toggle switches between Directory and Indication views -- [ ] Diagnosis-based directorate is primary method with fallback working +- [ ] GP diagnosis matching works via Snowflake cluster query - [ ] Unmatched patients show in indication chart with directorate fallback label - [ ] Coverage metrics logged (% diagnosis-matched vs fallback) - [ ] All filters work correctly for both chart types @@ -203,6 +158,35 @@ All tasks marked `[x]` AND: ## Reference +### SNOMED Cluster Query Structure +```sql +-- From snomed_indication_mapping_query.sql +WITH SearchTermClusters AS ( + SELECT Search_Term, Cluster_ID FROM (VALUES + ('rheumatoid arthritis', 'eFI2_InflammatoryArthritis'), + ('macular degeneration', 'CUST_ICB_VISUAL_IMPAIRMENT'), + -- ... ~148 mappings + ) AS t(Search_Term, Cluster_ID) +), +ClusterCodes AS ( + SELECT stc.Search_Term, c."SNOMEDCode", c."SNOMEDDescription" + FROM SearchTermClusters stc + JOIN DATA_HUB.PHM."ClinicalCodingClusterSnomedCodes" c + ON stc.Cluster_ID = c."Cluster_ID" + WHERE c."SNOMEDCode" IS NOT NULL +), +ExplicitCodes AS ( + -- Manual mappings for conditions not in clusters + SELECT Search_Term, SNOMEDCode, SNOMEDDescription FROM (VALUES + ('ankylosing spondylitis', '162930007', 'Manual mapping'), + -- ... + ) AS t(Search_Term, SNOMEDCode, SNOMEDDescription) +) +SELECT * FROM ClusterCodes +UNION ALL +SELECT * FROM ExplicitCodes +``` + ### Current Pathway Hierarchy (Directory-based) ``` Root (N&W ICS) @@ -226,20 +210,17 @@ Root (N&W ICS) | File | Purpose | |------|---------| -| `data_processing/schema.py` | SQLite schema for ref_drug_snomed_mapping | -| `data_processing/diagnosis_lookup.py` | Direct SNOMED lookup functions | +| `snomed_indication_mapping_query.sql` | Master SNOMED cluster query | +| `data_processing/diagnosis_lookup.py` | GP diagnosis lookup functions | | `data_processing/pathway_pipeline.py` | Indication pathway processing | | `cli/refresh_pathways.py` | CLI for dual chart type refresh | | `pathways_app/pathways_app.py` | Reflex UI with chart type toggle | -| `data/drug_snomed_mapping_enriched.csv` | Source mapping data | ### Expected Data Volumes | Metric | Expected | |--------|----------| -| SNOMED mapping rows | ~163K | -| Unique Search_Terms | 187 | -| Unique drugs | ~364 | +| Search_Term conditions | ~148 (from cluster mapping) | | Pathway nodes (directory, per date filter) | ~300 | | Pathway nodes (indication, per date filter) | ~400-600 (more granular) | | Total pathway nodes (6 dates × 2 types) | ~4,000-5,000 | diff --git a/data_processing/diagnosis_lookup.py b/data_processing/diagnosis_lookup.py index 824f1cd..fd511c8 100644 --- a/data_processing/diagnosis_lookup.py +++ b/data_processing/diagnosis_lookup.py @@ -1087,6 +1087,321 @@ def batch_lookup_indication_groups( return result_df +# === NEW APPROACH: Query Snowflake directly using cluster CTE === + +# The cluster query mapping (embedded from snomed_indication_mapping_query.sql) +# This maps Search_Term -> Cluster_ID for ~148 clinical conditions +CLUSTER_MAPPING_SQL = """ +WITH SearchTermClusters AS ( + SELECT Search_Term, Cluster_ID FROM (VALUES + ('acute lymphoblastic leukaemia', 'HAEMCANMORPH_COD'), + ('acute myeloid leukaemia', 'C19HAEMCAN_COD'), + ('acute promyelocytic leukaemia', 'HAEMCANMORPH_COD'), + ('allergic asthma', 'AST_COD'), + ('allergic rhinitis', 'MILDINTAST_COD'), + ('alzheimer''s disease', 'DEMALZ_COD'), + ('amyloidosis', 'AMYLOID_COD'), + ('anaemia', 'eFI2_AnaemiaTimeSensitive'), + ('anaplastic large cell lymphoma', 'C19HAEMCAN_COD'), + ('apixaban', 'DOACCON_COD'), + ('aplastic anaemia', 'eFI2_AnaemiaEver'), + ('arthritis', 'eFI2_InflammatoryArthritis'), + ('asthma', 'eFI2_Asthma'), + ('atopic dermatitis', 'ATOPDERM_COD'), + ('atrial fibrillation', 'eFI2_AtrialFibrillation'), + ('attention deficit hyperactivity disorder', 'ADHD_COD'), + ('bipolar disorder', 'MH_COD'), + ('bladder', 'eFI2_UrinaryIncontinence'), + ('breast cancer', 'BRCANSCR_COD'), + ('cardiomyopathy', 'eFI2_HarmfulDrinking'), + ('cardiovascular disease', 'CVDRISKASS_COD'), + ('cervical cancer', 'CSDEC_COD'), + ('cholangiocarcinoma', 'eFI2_Cancer'), + ('chronic kidney disease', 'CKD_COD'), + ('chronic liver disease', 'eFI2_LiverProblems'), + ('chronic lymphocytic leukaemia', 'EPPHAEMCAN_COD'), + ('chronic myeloid leukaemia', 'EPPHAEMCAN_COD'), + ('chronic obstructive pulmonary disease', 'eFI2_COPD'), + ('colon cancer', 'eFI2_Cancer'), + ('colorectal cancer', 'GICANREF_COD'), + ('constipation', 'CHRONCONSTIP_COD'), + ('covid-19', 'POSSPOSTCOVID_COD'), + ('crohn''s disease', 'eFI2_InflammatoryBowelDisease'), + ('cutaneous t-cell lymphoma', 'C19HAEMCAN_COD'), + ('cystic fibrosis', 'CUST_ICB_CYSTIC_FIBROSIS'), + ('deep vein thrombosis', 'VTE_COD'), + ('depression', 'eFI2_Depression'), + ('diabetes', 'eFI2_DiabetesEver'), + ('diabetic retinopathy', 'DRSELIGIBILITY_COD'), + ('diffuse large b-cell lymphoma', 'C19HAEMCAN_COD'), + ('dravet syndrome', 'EPIL_COD'), + ('drug misuse', 'ILLSUBINT_COD'), + ('dyspepsia', 'eFI2_AbdominalPain'), + ('epilepsy', 'eFI2_Seizures'), + ('fallopian tube', 'STERIL_COD'), + ('follicular lymphoma', 'C19HAEMCAN_COD'), + ('gastric cancer', 'eFI2_Cancer'), + ('giant cell arteritis', 'GCA_COD'), + ('glioma', 'NHAEMCANMORPH_COD'), + ('gout', 'eFI2_InflammatoryArthritis'), + ('graft versus host disease', 'GVHD_COD'), + ('granulomatosis with polyangiitis', 'WEGENERVASC_COD'), + ('growth hormone deficiency', 'HYPOPITUITARY_COD'), + ('hand eczema', 'ECZEMA_COD'), + ('heart failure', 'eFI2_HeartFailure'), + ('hepatitis b', 'HEPBCVAC_COD'), + ('hepatocellular carcinoma', 'eFI2_Cancer'), + ('hiv', 'PREFLANG_COD'), + ('hodgkin lymphoma', 'HAEMCANMORPH_COD'), + ('hormone receptor', 'eFI2_ThyroidProblems'), + ('hypercholesterolaemia', 'CLASSFH_COD'), + ('immune thrombocytopenia', 'ITP_COD'), + ('influenza', 'FLUINVITE_COD'), + ('insomnia', 'eFI2_SleepProblems'), + ('irritable bowel syndrome', 'IBS_COD'), + ('ischaemic stroke', 'OSTR_COD'), + ('juvenile idiopathic arthritis', 'RARTHAD_COD'), + ('kidney transplant', 'RENALTRANSP_COD'), + ('leukaemia', 'eFI2_Cancer'), + ('lung cancer', 'FTCANREF_COD'), + ('lymphoma', 'C19HAEMCAN_COD'), + ('macular degeneration', 'CUST_ICB_VISUAL_IMPAIRMENT'), + ('macular oedema', 'CUST_ICB_VISUAL_IMPAIRMENT'), + ('major depressive episodes', 'eFI2_Depression'), + ('malignant melanoma', 'eFI2_Cancer'), + ('malignant pleural mesothelioma', 'LUNGCAN_COD'), + ('manic episode', 'MH_COD'), + ('mantle cell lymphoma', 'HAEMCANMORPH_COD'), + ('melanoma', 'eFI2_Cancer'), + ('merkel cell carcinoma', 'C19CAN_COD'), + ('migraine', 'eFI2_Headache'), + ('motor neurone disease', 'MND_COD'), + ('multiple myeloma', 'C19HAEMCAN_COD'), + ('multiple sclerosis', 'MS_COD'), + ('myelodysplastic', 'eFI2_AnaemiaEver'), + ('myelofibrosis', 'MDS_COD'), + ('myocardial infarction', 'eFI2_IschaemicHeartDisease'), + ('myotonia', 'CNDATRISK2_COD'), + ('narcolepsy', 'LD_COD'), + ('neuroendocrine tumour', 'LUNGCAN_COD'), + ('non-small cell lung cancer', 'LUNGCAN_COD'), + ('non-small-cell lung cancer', 'FTCANREF_COD'), + ('obesity', 'BMI30_COD'), + ('osteoarthritis', 'CUST_ICB_OSTEOARTHRITIS'), + ('osteoporosis', 'eFI2_Osteoporosis'), + ('osteosarcoma', 'NHAEMCANMORPH_COD'), + ('ovarian cancer', 'C19CAN_COD'), + ('peripheral arterial disease', 'PADEXC_COD'), + ('plaque psoriasis', 'PSORIASIS_COD'), + ('polycystic kidney disease', 'EPPCONGMALF_COD'), + ('polycythaemia vera', 'C19HAEMCAN_COD'), + ('pregnancy', 'C19PREG_COD'), + ('primary biliary cholangitis', 'eFI2_LiverProblems'), + ('primary hypercholesterolaemia', 'FNFHYP_COD'), + ('prostate cancer', 'EPPSOLIDCAN_COD'), + ('psoriasis', 'PSORIASIS_COD'), + ('psoriatic arthritis', 'RARTHAD_COD'), + ('pulmonary embolism', 'eFI2_RespiratoryDiseaseTimeSensitive'), + ('pulmonary fibrosis', 'ILD_COD'), + ('relapsing multiple sclerosis', 'MS_COD'), + ('renal cell carcinoma', 'C19CAN_COD'), + ('renal transplantation', 'RENALTRANSP_COD'), + ('retinal vein occlusion', 'CUST_ICB_VISUAL_IMPAIRMENT'), + ('rheumatoid arthritis', 'eFI2_InflammatoryArthritis'), + ('rivaroxaban', 'DOACCON_COD'), + ('schizophrenia', 'MH_COD'), + ('seizures', 'LSZFREQ_COD'), + ('sepsis', 'C19ACTIVITY_COD'), + ('severe persistent allergic asthma', 'SEVAST_COD'), + ('sickle cell disease', 'SICKLE_COD'), + ('sleep apnoea', 'CUST_ICB_NON_SEVERE_LDA'), + ('smoking cessation', 'SMOKINGINT_COD'), + ('soft tissue sarcoma', 'NHAEMCANMORPH_COD'), + ('spinal muscular atrophy', 'MND_COD'), + ('squamous cell', 'C19CAN_COD'), + ('squamous cell carcinoma', 'C19CAN_COD'), + ('stem cell transplant', 'ALLOTRANSP_COD'), + ('stroke', 'eFI2_Stroke'), + ('systemic lupus erythematosus', 'SLUPUS_COD'), + ('systemic mastocytosis', 'HAEMCANMORPH_COD'), + ('thrombocytopenic purpura', 'TTP_COD'), + ('thrombotic thrombocytopenic purpura', 'TTP_COD'), + ('thyroid cancer', 'C19CAN_COD'), + ('tophaceous gout', 'CUST_ICB_OSTEOARTHRITIS'), + ('transitional cell carcinoma', 'C19CAN_COD'), + ('type 1 diabetes', 'DMTYPE1_COD'), + ('type 2 diabetes', 'DMTYPE2_COD'), + ('ulcerative colitis', 'eFI2_InflammatoryBowelDisease'), + ('urothelial carcinoma', 'NHAEMCANMORPH_COD'), + ('urticaria', 'XSAL_COD'), + ('uveitis', 'CUST_ICB_VISUAL_IMPAIRMENT'), + ('vascular disease', 'CVDINVITE_COD'), + ('vasculitis', 'CRYOGLOBVASC_COD') + ) AS t(Search_Term, Cluster_ID) +), + +ClusterCodes AS ( + SELECT + stc.Search_Term, + c."SNOMEDCode", + c."SNOMEDDescription" + FROM SearchTermClusters stc + JOIN DATA_HUB.PHM."ClinicalCodingClusterSnomedCodes" c + ON stc.Cluster_ID = c."Cluster_ID" + WHERE c."SNOMEDCode" IS NOT NULL +), + +ExplicitCodes AS ( + SELECT Search_Term, SNOMEDCode, SNOMEDDescription FROM (VALUES + ('acute coronary syndrome', '837091000000100', 'Manual mapping'), + ('ankylosing spondylitis', '162930007', 'Manual mapping'), + ('ankylosing spondylitis', '239805001', 'Manual mapping'), + ('ankylosing spondylitis', '239810002', 'Manual mapping'), + ('ankylosing spondylitis', '239811003', 'Manual mapping'), + ('ankylosing spondylitis', '394990003', 'Manual mapping'), + ('ankylosing spondylitis', '429712009', 'Manual mapping'), + ('ankylosing spondylitis', '441562009', 'Manual mapping'), + ('ankylosing spondylitis', '441680005', 'Manual mapping'), + ('ankylosing spondylitis', '441930001', 'Manual mapping'), + ('axial spondyloarthritis', '723116002', 'Manual mapping'), + ('choroidal neovascularisation', '380621000000102', 'Manual mapping'), + ('choroidal neovascularisation', '733124000', 'Manual mapping') + ) AS t(Search_Term, SNOMEDCode, SNOMEDDescription) +), + +AllIndicationCodes AS ( + SELECT Search_Term, "SNOMEDCode" AS SNOMEDCode, "SNOMEDDescription" AS SNOMEDDescription + FROM ClusterCodes + UNION ALL + SELECT Search_Term, SNOMEDCode, SNOMEDDescription + FROM ExplicitCodes +) +""" + + +def get_patient_indication_groups( + patient_pseudonyms: list[str], + connector: Optional[SnowflakeConnector] = None, + batch_size: int = 500, +) -> "pd.DataFrame": + """ + Batch lookup GP diagnosis-based indication groups using Snowflake cluster query. + + This function queries Snowflake directly using the embedded cluster CTE + (from snomed_indication_mapping_query.sql) to find patients with matching + GP diagnoses. This is the NEW approach replacing the old SQLite-based lookup. + + The query: + 1. Uses the cluster mapping CTE to get all Search_Term -> SNOMED code mappings + 2. Joins with PrimaryCareClinicalCoding to find patients with matching codes + 3. Returns the most recent match per patient (by EventDateTime) + + Args: + patient_pseudonyms: List of PseudoNHSNoLinked values (matches PatientPseudonym in GP records) + connector: Optional SnowflakeConnector (defaults to singleton) + batch_size: Number of patients per Snowflake query batch (default 500) + + Returns: + DataFrame with columns: + - PatientPseudonym: The patient identifier (PseudoNHSNoLinked value) + - Search_Term: The matched indication (e.g., "rheumatoid arthritis") + - EventDateTime: Date of the GP diagnosis record + + Patients not found in results have no matching GP diagnosis. + """ + import pandas as pd + + logger.info(f"Starting Snowflake-direct indication lookup for {len(patient_pseudonyms)} patients...") + + # Handle edge case: empty patient list + if not patient_pseudonyms: + logger.warning("Empty patient list provided") + return pd.DataFrame(columns=['PatientPseudonym', 'Search_Term', 'EventDateTime']) + + # Check Snowflake availability + if not SNOWFLAKE_AVAILABLE: + logger.error("Snowflake connector not available - cannot lookup GP records") + return pd.DataFrame(columns=['PatientPseudonym', 'Search_Term', 'EventDateTime']) + + if not is_snowflake_configured(): + logger.error("Snowflake not configured - cannot lookup GP records") + return pd.DataFrame(columns=['PatientPseudonym', 'Search_Term', 'EventDateTime']) + + if connector is None: + connector = get_connector() + + # Results list to collect all matches + all_results: list[dict] = [] + + # Process patients in batches + total_patients = len(patient_pseudonyms) + for batch_start in range(0, total_patients, batch_size): + batch_end = min(batch_start + batch_size, total_patients) + batch_pseudonyms = patient_pseudonyms[batch_start:batch_end] + batch_num = batch_start // batch_size + 1 + total_batches = (total_patients + batch_size - 1) // batch_size + + logger.info(f"Batch {batch_num}/{total_batches}: patients {batch_start + 1} to {batch_end}") + + # Build patient IN clause placeholders + patient_placeholders = ", ".join(["%s"] * len(batch_pseudonyms)) + + # Build the full query with cluster CTE + # This finds the most recent matching diagnosis for each patient + query = f""" +{CLUSTER_MAPPING_SQL} +SELECT + pc."PatientPseudonym", + aic.Search_Term, + pc."EventDateTime" +FROM DATA_HUB.PHM."PrimaryCareClinicalCoding" pc +INNER JOIN AllIndicationCodes aic + ON pc."SNOMEDCode" = aic.SNOMEDCode +WHERE pc."PatientPseudonym" IN ({patient_placeholders}) +QUALIFY ROW_NUMBER() OVER ( + PARTITION BY pc."PatientPseudonym" + ORDER BY pc."EventDateTime" DESC +) = 1 +""" + + try: + results = connector.execute_dict(query, tuple(batch_pseudonyms)) + + for row in results: + all_results.append({ + 'PatientPseudonym': row.get('PatientPseudonym'), + 'Search_Term': row.get('Search_Term'), + 'EventDateTime': row.get('EventDateTime'), + }) + + logger.debug(f"Batch {batch_num}: found {len(results)} matches") + + except Exception as e: + logger.error(f"Error querying GP records for batch {batch_num}: {e}") + # Continue with other batches - partial results are better than none + + # Build result DataFrame + result_df = pd.DataFrame(all_results) + + # Log summary statistics + if len(result_df) > 0: + matched_count = len(result_df) + match_rate = 100 * matched_count / total_patients + unique_terms = result_df['Search_Term'].nunique() + logger.info(f"Indication lookup complete:") + logger.info(f" Total patients queried: {total_patients}") + logger.info(f" Patients with GP match: {matched_count} ({match_rate:.1f}%)") + logger.info(f" Unique Search_Terms found: {unique_terms}") + + # Log top Search_Terms + top_terms = result_df['Search_Term'].value_counts().head(5) + logger.info(f" Top 5 indications: {dict(top_terms)}") + else: + logger.info(f"Indication lookup complete: 0 matches from {total_patients} patients") + + return result_df + + # Export public API __all__ = [ # Dataclasses @@ -1112,4 +1427,7 @@ __all__ = [ "get_directorate_from_diagnosis", # Batch lookup for indication groups "batch_lookup_indication_groups", + # Snowflake-direct indication lookup (new approach) + "get_patient_indication_groups", + "CLUSTER_MAPPING_SQL", ] diff --git a/progress.txt b/progress.txt index 276db52..70e34ba 100644 --- a/progress.txt +++ b/progress.txt @@ -1,45 +1,49 @@ -# Progress Log - Direct SNOMED Indication Mapping +# Progress Log - Indication-Based Pathway Charts ## Project Context -This project extends the existing HCD Pathway Analysis application with direct SNOMED code matching from GP records. The previous project (Phases 1-5) established the pre-computed pathway architecture and modern UI. This phase adds: +This project adds indication-based icicle charts alongside the existing directory-based charts. Patient diagnoses are matched from GP records using SNOMED cluster codes queried directly from Snowflake. -1. **Diagnosis-based directorate assignment** - Primary method using GP SNOMED codes -2. **Indication-based icicle chart** - New chart type showing Trust → Search_Term → Drug → Pathway +**Key Change from Previous Approach**: Instead of maintaining a local CSV/SQLite mapping of SNOMED codes, we now query the `ClinicalCodingClusterSnomedCodes` clusters directly in Snowflake during the data refresh. This simplifies the architecture and ensures we always use the latest cluster definitions. ## Key Files Reference **Existing (reuse these):** -- `data_processing/schema.py` - SQLite schema (add new table) -- `data_processing/diagnosis_lookup.py` - Existing cluster-based lookup (extend with direct SNOMED) -- `data_processing/pathway_pipeline.py` - Pathway processing (add indication type) -- `cli/refresh_pathways.py` - CLI refresh command (add chart type support) +- `data_processing/schema.py` - SQLite schema (chart_type column already added) +- `data_processing/diagnosis_lookup.py` - Extend with new Snowflake query +- `data_processing/pathway_pipeline.py` - Pathway processing (indication functions exist) +- `cli/refresh_pathways.py` - CLI refresh command (chart_type arg exists) - `pathways_app/pathways_app.py` - Reflex app (add chart type toggle) - `tools/data.py` - Data transformations including department_identification() -**New data:** -- `data/drug_snomed_mapping_enriched.csv` - 163K rows, 187 Search_Terms, 364 drugs +**New/Key:** +- `snomed_indication_mapping_query.sql` - Master SNOMED cluster query to embed in Snowflake calls ## Known Patterns -### SNOMED Mapping Structure -The enriched mapping CSV has columns: -- Drug, Indication, TA_ID (from NICE TAs) -- Search_Term (simplified grouping, 187 unique values) -- SNOMEDCode, SNOMEDDescription -- CleanedDrugName, PrimaryDirectorate, AllDirectorates +### SNOMED Cluster Query Approach +The `snomed_indication_mapping_query.sql` contains the Search_Term → Cluster_ID mappings: +- ~148 conditions mapped to clinical coding clusters +- Joins with `DATA_HUB.PHM."ClinicalCodingClusterSnomedCodes"` to get SNOMED codes +- Includes explicit manual mappings for conditions not in clusters +- Returns: Search_Term, SNOMEDCode, SNOMEDDescription -### Direct SNOMED Lookup Logic -For a patient on drug X: -1. Get all SNOMED codes for that drug from ref_drug_snomed_mapping -2. Query PrimaryCareClinicalCoding for those codes (patient's GP record) -3. If match found → use Search_Term and PrimaryDirectorate from matched row -4. If no match → fall back to department_identification() -5. Use most recent SNOMED code by EventDateTime if multiple matches +### GP Record Matching +To find a patient's indication: +1. Use the cluster query as a CTE +2. Join with `PrimaryCareClinicalCoding` on SNOMEDCode +3. Filter by PatientPseudonym (use PseudoNHSNoLinked from HCD data) +4. Use most recent match by EventDateTime +5. Return Search_Term for matched patients + +### Patient Identifier Mapping +- HCD data has `PseudoNHSNoLinked` column - this matches `PatientPseudonym` in GP records +- DO NOT use `PersonKey` (LocalPatientID) - this is provider-specific and won't match GP records +- UPID = Provider Code (3 chars) + PersonKey ### Chart Type Architecture - `chart_type` column in pathway_nodes: "directory" or "indication" -- 12 total pathway datasets: 6 date filters × 2 chart types +- 12 total pathway datasets: 6 date filters x 2 chart types - Indication chart: mixed labels (Search_Term for matched, Directorate for unmatched) ### Date Filter Combinations @@ -52,493 +56,54 @@ For a patient on drug X: | `2yr_6mo` | Last 2 years | Last 6 months | No | | `2yr_12mo` | Last 2 years | Last 12 months | No | -### Expected Volumes -- SNOMED mapping: 163K rows -- Search_Terms: 187 unique -- Pathway nodes per date filter: ~300 (directory), ~400-600 (indication) +### Previous Work (Reusable) +These components from the previous approach are still valid: +- `chart_type` column and schema migration (Task 2.1 - complete) +- `generate_icicle_chart_indication()` function (Task 2.2 - complete) +- `process_indication_pathway_for_date_filter()` function (Task 2.2 - complete) +- `extract_indication_fields()` function (Task 2.2 - complete) +- `--chart-type` CLI argument (Task 2.3 - complete) + +### What Needs Replacement +The previous `batch_lookup_indication_groups()` function in `diagnosis_lookup.py` used a local SQLite table. This needs to be replaced with a new function that queries Snowflake directly using the cluster query. --- ## Iteration Log + + ## Iteration 1 — 2026-02-05 -### Task: 1.1 Create SQLite Table for SNOMED Mapping +### Task: 1.1 Create Indication Lookup Query ### Why this task: -- First task in Phase 1 (Data Infrastructure) — all other phases depend on having the data layer in place -- No external dependencies — pure schema definition work -- Follows "data infrastructure first" principle +- This is the foundation task — other tasks (1.2 CLI integration, 2.3 refresh command) depend on this function +- The progress.txt explicitly noted the old approach needs replacement +- Logical flow: data query function must exist before pipeline integration ### Status: COMPLETE ### What was done: -- Added `REF_DRUG_SNOMED_MAPPING_SCHEMA` to `data_processing/schema.py` with 11 columns: - - id, drug_name, indication, ta_id, search_term, snomed_code, snomed_description - - cleaned_drug_name, primary_directorate, all_directorates, created_at -- Added 5 custom indexes for lookup performance: - - idx_ref_drug_snomed_mapping_drug (drug_name) - - idx_ref_drug_snomed_mapping_cleaned (cleaned_drug_name) - - idx_ref_drug_snomed_mapping_snomed (snomed_code) - - idx_ref_drug_snomed_mapping_search_term (search_term) - - idx_ref_drug_snomed_mapping_drug_snomed (composite: cleaned_drug_name, snomed_code) -- Added `create_drug_snomed_mapping_table()` helper function -- Added schema to `REFERENCE_TABLES_SCHEMA` (included in `ALL_TABLES_SCHEMA`) -- Updated helper functions to include new table: - - `drop_reference_tables()` — drops new table - - `get_reference_table_counts()` — counts new table (with try/except for safety) - - `verify_reference_tables_exist()` — checks for new table +- Created `get_patient_indication_groups()` function in `data_processing/diagnosis_lookup.py` +- Embedded the full cluster mapping SQL (from snomed_indication_mapping_query.sql) as `CLUSTER_MAPPING_SQL` constant +- Function takes list of PseudoNHSNoLinked values and queries Snowflake directly +- Uses QUALIFY ROW_NUMBER() OVER (PARTITION BY PatientPseudonym ORDER BY EventDateTime DESC) = 1 to get most recent match +- Returns DataFrame with PatientPseudonym, Search_Term, EventDateTime columns +- Handles edge cases: empty patient list, Snowflake unavailable/unconfigured +- Added batch processing (default 500 patients per batch) for large datasets +- Added logging for match statistics (match rate, unique Search_Terms, top 5 indications) +- Added both function and CLUSTER_MAPPING_SQL to __all__ exports ### Validation results: -- Tier 1 (Code): `python -m py_compile data_processing/schema.py` — PASSED -- Tier 1 (Code): Import check — PASSED -- Tier 2 (Data): Migration created table with 0 rows — PASSED -- Tier 2 (Data): All 11 columns and 6 indexes present — PASSED +- Tier 1 (Code): ✅ `python -m py_compile` passed, import check passed +- Tier 2 (Data): ✅ Empty list returns correct empty DataFrame with expected columns +- Tier 3 (Functional): N/A (not a UI task) ### Files changed: -- `data_processing/schema.py` — added schema, updated helpers -### Committed: cf35937 "feat: add ref_drug_snomed_mapping schema (Task 1.1)" +- `data_processing/diagnosis_lookup.py` — added CLUSTER_MAPPING_SQL constant and get_patient_indication_groups() function +- `IMPLEMENTATION_PLAN.md` — marked Task 1.1 items complete +### Committed: 052256c "feat: add get_patient_indication_groups() for Snowflake-direct GP lookup (Task 1.1)" ### Patterns discovered: -- Existing schema pattern: CREATE TABLE + CREATE INDEX in single SQL string -- Helper functions follow consistent pattern: create, drop, get_counts, verify_exists -- `get_reference_table_counts` needed try/except for tables that might not exist yet +- Snowflake's QUALIFY clause is cleaner than subquery for row_number filtering +- The cluster CTE has 148 Search_Term mappings plus 13 explicit SNOMED codes ### Next iteration should: -- Start Task 1.2: Create `data_processing/load_snomed_mapping.py` script to load the CSV -- The CSV has columns: Drug, Indication, TA_ID, Search_Term, SNOMEDCode, SNOMEDDescription, CleanedDrugName, PrimaryDirectorate, AllDirectorates -- Note: SNOMEDCode in CSV has `.0` suffix (e.g., "156370009.0") — may need cleanup +- Test the function with real patient data to verify it returns expected Search_Terms (Task 1.1 verification) +- OR proceed to Task 1.2 (integrate with CLI refresh command) if confident in the function +- The key integration point: extract unique PseudoNHSNoLinked values from HCD data, call this function, map results back to UPID for indication_df ### Blocked items: - None - -## Iteration 2 — 2026-02-05 -### Task: 1.2 Load Enriched Mapping Data -### Why this task: -- Task 1.1 (schema creation) is complete — table `ref_drug_snomed_mapping` exists -- Task 1.2 is the next logical step in Phase 1 (Data Infrastructure) -- Task 1.3 (Extend Diagnosis Lookup) depends on having data in the table to query -- Following the "data infrastructure first" principle -### Status: COMPLETE -### What was done: -- Created `data_processing/load_snomed_mapping.py` with: - - `migrate_drug_snomed_mapping()` - reads CSV, cleans SNOMED codes, inserts to SQLite - - `get_drug_snomed_mapping_counts()` - statistics function - - `verify_drug_snomed_mapping_migration()` - validation function - - `clean_snomed_code()` - removes trailing .0 from SNOMED codes (e.g., "156370009.0" → "156370009") - - CLI interface with `--verify-only` and `-v` options -- Added CLI entry point: `python -m data_processing.load_snomed_mapping` -- Fixed Unicode encoding issue on Windows console (emojis → ASCII text) -### Validation results: -- Tier 1 (Code): `python -m py_compile data_processing/load_snomed_mapping.py` — PASSED -- Tier 1 (Code): Import check — PASSED -- Tier 2 (Data): Migration loaded 144,056 rows from 163,937 CSV rows (19,881 duplicates skipped) -- Tier 2 (Data): Verified: 707 unique drugs, 187 unique search terms, 21,265 SNOMED codes -- Tier 2 (Data): SNOMED codes cleaned — 0 rows with .0 suffix remaining -### Files changed: -- `data_processing/load_snomed_mapping.py` — new file -### Committed: 6d68b5e "feat: add SNOMED mapping loader script (Task 1.2)" -### Patterns discovered: -- CSV has 163,937 rows but only 144,056 unique (drug_name, indication, snomed_code) combinations -- SNOMED codes in CSV have `.0` suffix from float formatting — need `clean_snomed_code()` function -- Windows console can't print Unicode emojis — use ASCII text like `[OK]` instead of ✅ -### Next iteration should: -- Start Task 1.3: Extend Diagnosis Lookup Module -- Add `get_drug_snomed_codes(drug_name)` to `data_processing/diagnosis_lookup.py` -- Add `patient_has_indication_direct(patient_pseudonym, snomed_codes, connector)` function -- Look at existing `diagnosis_lookup.py` for patterns to follow -### Blocked items: -- None - -## Iteration 3 — 2026-02-05 -### Task: 1.3 Extend Diagnosis Lookup Module -### Why this task: -- Tasks 1.1 and 1.2 are complete — the data infrastructure (schema + loaded data) is in place -- Task 1.3 is the next logical step — functions to query the data before using it in the pipeline -- Phase 2 (Pathway Processing) depends on having these lookup functions -- Following the "data infrastructure first" principle -### Status: COMPLETE -### What was done: -- Added two new dataclasses to `data_processing/diagnosis_lookup.py`: - - `DrugSnomedMapping`: Holds SNOMED code mapping with snomed_code, snomed_description, search_term, primary_directorate, indication, ta_id - - `DirectSnomedMatchResult`: Result of direct SNOMED lookup with matched flag, snomed_code, search_term, primary_directorate, event_date, source -- Added `get_drug_snomed_codes(drug_name)` function: - - Queries `ref_drug_snomed_mapping` table for all SNOMED codes for a drug - - Case-insensitive matching on both `cleaned_drug_name` and `drug_name` columns - - Returns list of DrugSnomedMapping dataclass instances -- Added `patient_has_indication_direct(patient_pseudonym, drug_snomed_mappings, connector)` function: - - Queries `PrimaryCareClinicalCoding` directly for exact SNOMED code matches - - Returns most recent match by EventDateTime (ORDER BY DESC LIMIT 1) - - Handles Snowflake unavailability gracefully -- Updated `__all__` exports to include new dataclasses and functions -### Validation results: -- Tier 1 (Code): `python -m py_compile data_processing/diagnosis_lookup.py` — PASSED -- Tier 1 (Code): Import check — PASSED -- Tier 2 (Data): ADALIMUMAB returns 1320 SNOMED mappings across 10 Search_Terms -- Tier 2 (Data): RANIBIZUMAB returns 104 SNOMED mappings -- Tier 2 (Data): Case insensitivity verified (upper/lower/mixed all return same results) -- Tier 2 (Data): Empty mappings returns unmatched result correctly -### Files changed: -- `data_processing/diagnosis_lookup.py` — added 2 dataclasses, 2 functions, updated __all__ -- `IMPLEMENTATION_PLAN.md` — marked Task 1.3 complete -### Committed: b44d22d "feat: add direct SNOMED lookup functions (Task 1.3)" -### Patterns discovered: -- ADALIMUMAB has 10 unique Search_Terms with varying SNOMED code counts: - - rheumatoid arthritis: 867 codes (largest) - - ankylosing spondylitis: 96 codes - - uveitis: 124 codes - - ulcerative colitis: 78 codes - - juvenile idiopathic arthritis: 57 codes - - crohn's disease: 50 codes - - psoriatic arthritis: 17 codes - - plaque psoriasis: 16 codes - - hidradenitis suppurativa: 8 codes - - axial spondyloarthritis: 7 codes (smallest) -- The query uses DISTINCT to avoid duplicate rows -- Function handles edge cases: empty mappings, Snowflake unavailable -### Next iteration should: -- Phase 1 is now complete! Start Phase 2: Pathway Processing Updates -- Begin with Task 2.1: Update Directorate Assignment Logic - - Create `get_directorate_from_diagnosis(upid, drug_name, connector)` function - - Logic: Try diagnosis-based first (using get_drug_snomed_codes + patient_has_indication_direct) - - Fallback to department_identification() if no match - - Return: (directorate, source) where source is "DIAGNOSIS" or "FALLBACK" -- Key consideration: This will need to integrate with the existing data flow in `fetch_and_transform_data()` -### Blocked items: -- None - -## Iteration 4 — 2026-02-05 -### Task: 2.1 Update Directorate Assignment Logic -### Why this task: -- Phase 1 (Data Infrastructure) is complete — all 3 tasks marked [x] -- Task 2.1 is the first task in Phase 2 (Pathway Processing Updates) -- Previous iteration explicitly recommended starting with Task 2.1 -- Tasks 2.2 (schema) and 2.3 (indication processing) logically depend on having the directorate assignment logic -- Following "data infrastructure first, then pipeline" principle -### Status: COMPLETE -### What was done: -- Added `DirectorateAssignment` dataclass to `data_processing/diagnosis_lookup.py`: - - Fields: upid, drug_name, directorate, search_term, source, snomed_code, event_date - - source is "DIAGNOSIS" (SNOMED match found) or "FALLBACK" (no match) -- Added `get_directorate_from_diagnosis(upid, drug_name, connector, db_manager, before_date)` function: - - Gets all SNOMED codes for drug from ref_drug_snomed_mapping - - Queries patient's GP records (via patient_has_indication_direct) - - Returns diagnosis-based directorate and search_term if match found - - Returns FALLBACK result if no match (caller handles fallback logic) - - Extracts PatientPseudonym from UPID by removing first 3 characters (ProviderCode) -- Updated `__all__` exports to include new dataclass and function -### Validation results: -- Tier 1 (Code): `python -m py_compile data_processing/diagnosis_lookup.py` — PASSED -- Tier 1 (Code): Import check for DirectorateAssignment, get_directorate_from_diagnosis — PASSED -- Tier 2 (Data): Test with ADALIMUMAB returns 1320 SNOMED mappings across 10 search_terms — PASSED -- Tier 2 (Data): Test with unknown drug returns FALLBACK source — PASSED -- Tier 2 (Data): Test without Snowflake returns FALLBACK source correctly — PASSED -### Files changed: -- `data_processing/diagnosis_lookup.py` — added DirectorateAssignment dataclass, get_directorate_from_diagnosis function, updated __all__ -- `IMPLEMENTATION_PLAN.md` — marked Task 2.1 complete -### Committed: 5067694 "feat: add get_directorate_from_diagnosis() function (Task 2.1)" -### Patterns discovered: -- UPID format: Provider Code (3 chars) + PersonKey, where PersonKey = PatientPseudonym -- The function is designed to be called at the DataFrame level during pipeline processing -- For batch processing, the caller will need to iterate over rows and collect DIAGNOSIS vs FALLBACK statistics -- The function handles edge cases: no SNOMED mappings, Snowflake unavailable, no GP record match -### Next iteration should: -- Start Task 2.2: Add Chart Type Support to Schema - - Add `chart_type` column to `pathway_nodes` table (values: "directory", "indication") - - Update schema in `data_processing/schema.py` - - Consider: may need ALTER TABLE migration for existing data - - Alternative: add to pathway_date_filters or create pathway_chart_types reference table -- Key consideration: The indication chart will group by Search_Term (from SNOMED match) or Directorate (fallback) -- The chart_type column allows filtering pathway_nodes by chart type when user toggles in UI -### Blocked items: -- None - -## Iteration 5 — 2026-02-05 -### Task: 2.2 Add Chart Type Support to Schema -### Why this task: -- Task 2.1 (directorate assignment logic) is complete -- Task 2.2 is the next task in Phase 2 — schema changes before pipeline changes -- Task 2.3 (Indication Pathway Processing) requires the chart_type column to store indication pathways -- Following "data infrastructure first" principle — schema changes before pipeline logic -### Status: COMPLETE -### What was done: -- Modified `PATHWAY_NODES_SCHEMA` in `data_processing/schema.py`: - - Added `chart_type TEXT NOT NULL DEFAULT 'directory'` column - - Updated UNIQUE constraint from `(date_filter_id, ids)` to `(date_filter_id, chart_type, ids)` - - Added `idx_pathway_nodes_chart_type` index on `(date_filter_id, chart_type)` - - Updated `idx_pathway_nodes_filter_composite` index to include `chart_type` -- Added `migrate_pathway_nodes_chart_type(conn)` function: - - Checks if chart_type column exists (idempotent) - - Adds column with ALTER TABLE if missing - - Creates/updates indexes - - Returns (success: bool, message: str) -- Updated `data_processing/migrate.py`: - - Added import for `migrate_pathway_nodes_chart_type` - - Called migration in `initialize_database()` after creating tables -- Ran migration on existing database: 293 rows updated with chart_type='directory' -### Validation results: -- Tier 1 (Code): `python -m py_compile data_processing/schema.py` — PASSED -- Tier 1 (Code): `python -m py_compile data_processing/migrate.py` — PASSED -- Tier 1 (Code): Import check for migrate_pathway_nodes_chart_type — PASSED -- Tier 2 (Data): Migration ran successfully on existing database — PASSED -- Tier 2 (Data): All 293 existing rows have chart_type='directory' — PASSED -- Tier 2 (Data): Both indexes created (idx_pathway_nodes_chart_type, idx_pathway_nodes_filter_composite) — PASSED -- Tier 2 (Data): Database status check shows all tables intact — PASSED -### Files changed: -- `data_processing/schema.py` — added chart_type column, updated UNIQUE constraint, added indexes, added migration function -- `data_processing/migrate.py` — import and call migration function -- `IMPLEMENTATION_PLAN.md` — marked Task 2.2 complete with full checklist -### Committed: 19607d7 "feat: add chart_type column to pathway_nodes schema (Task 2.2)" -### Patterns discovered: -- SQLite ALTER TABLE ADD COLUMN works with DEFAULT values — existing rows get the default -- SQLite doesn't allow modifying UNIQUE constraints after table creation, but since all existing rows have the same chart_type value, the new constraint is satisfied -- Migration function is idempotent — safe to run multiple times -- Composite indexes including chart_type allow efficient filtering by (date_filter_id, chart_type) -### Next iteration should: -- Start Task 2.3: Create Indication Pathway Processing - - Add `process_indication_pathways()` to `pathway_pipeline.py` - - Group by: Trust → Search_Term → Drug → Pathway - - For unmatched patients: use directorate name as Search_Term fallback - - Add `extract_indication_fields()` for denormalized columns -- Key consideration: Need to modify `generate_icicle_chart()` or create parallel function for indication hierarchy -- The indication chart uses Search_Term (from SNOMED match) or Directorate (from fallback) as the level-2 grouping -- Output structure should match directory chart but with different grouping labels -### Blocked items: -- None - -## Iteration 6 — 2026-02-05 -### Task: 2.3 Create Indication Pathway Processing -### Why this task: -- Tasks 2.1 and 2.2 are complete — directorate assignment logic and schema are in place -- Task 2.3 is the next logical step in Phase 2 — processing logic before Phase 3 (CLI updates) -- Previous iteration explicitly recommended starting Task 2.3 -- Phase 3 depends on having the indication pathway processing functions -- Following "pipeline before UI" principle -### Status: COMPLETE -### What was done: -- Added `generate_icicle_chart_indication()` to `analysis/pathway_analyzer.py`: - - Variant of generate_icicle_chart() that uses indication_df instead of directory_df - - Takes `indication_df` parameter mapping UPID → Indication_Group - - The indication_df must have 'Directory' column (renamed from Indication_Group for compatibility) - - Hierarchy: Trust → Indication_Group → Drug → Pathway - -- Added `process_indication_pathway_for_date_filter()` to `data_processing/pathway_pipeline.py`: - - Wrapper function that calls generate_icicle_chart_indication() - - Takes indication_df parameter (UPID → Indication_Group mapping) - - Computes date ranges and passes to the chart generator - -- Added `extract_indication_fields()` to `data_processing/pathway_pipeline.py`: - - Similar to extract_denormalized_fields() but for indication charts - - Extracts: trust_name, directory (stores search_term), drug_sequence - - Uses 'directory' column for schema compatibility - -- Updated `convert_to_records()` with `chart_type` parameter: - - Added chart_type to the record dictionary - - Supports "directory" and "indication" values - - Logs chart_type in output message - -- Added `ChartType` type alias: `Literal["directory", "indication"]` - -- Updated `__all__` exports to include new functions and type - -### Validation results: -- Tier 1 (Code): `python -m py_compile data_processing/pathway_pipeline.py` — PASSED -- Tier 1 (Code): `python -m py_compile analysis/pathway_analyzer.py` — PASSED -- Tier 1 (Code): Import check for all new functions — PASSED - - ChartType, process_indication_pathway_for_date_filter, extract_indication_fields all exported - - generate_icicle_chart_indication importable from pathway_analyzer -### Files changed: -- `analysis/pathway_analyzer.py` — added generate_icicle_chart_indication() function -- `data_processing/pathway_pipeline.py` — added ChartType, process_indication_pathway_for_date_filter(), extract_indication_fields(), updated convert_to_records() -- `IMPLEMENTATION_PLAN.md` — marked Task 2.3 complete with full checklist -### Committed: 7cbc648 "feat: add indication pathway processing functions (Task 2.3)" -### Patterns discovered: -- The build_hierarchy() function uses directory_df to map UPID → Directory for level-2 grouping -- For indication charts, we pass indication_df with 'Directory' column (renamed from Indication_Group) to build_hierarchy() -- The indication_df must be indexed by UPID (same as directory_df) -- Schema compatibility is maintained by storing search_term in the 'directory' column -- The chart_type column allows filtering pathway_nodes by chart type in Reflex UI -### Next iteration should: -- Phase 2 is now complete! Start Phase 3: CLI & Data Refresh Updates -- Begin with Task 3.1: Update Refresh Command for Dual Chart Types - - Modify `cli/refresh_pathways.py` to process both "directory" and "indication" chart types - - Add `--chart-type` argument: "all" (default), "directory", "indication" - - For each of 6 date filters: generate 2 chart datasets - - Total: 12 pathway datasets (6 dates × 2 chart types) -- Key consideration: Need to create indication_df by looking up GP diagnoses for all patients - - This involves batching Snowflake queries for performance - - Use get_directorate_from_diagnosis() function from diagnosis_lookup.py - - For unmatched patients: indication_group = "Directory (no GP dx)" -### Blocked items: -- None - -## Iteration 7 — 2026-02-05 -### Task: 3.1 Update Refresh Command for Dual Chart Types -### Why this task: -- Phase 2 complete — all data infrastructure and processing functions ready -- Task 3.1 is first in Phase 3 — CLI command is the entry point for the pipeline -- Previous iteration recommended starting Task 3.1 -- Processing functions exist but weren't integrated into refresh workflow yet -### Status: COMPLETE -### What was done: -- Modified `cli/refresh_pathways.py`: - - Added `--chart-type` argument with choices: "directory", "indication", "all" - - Default is "directory" to maintain backward compatibility - - Updated `insert_pathway_records` to include `chart_type` column (required for new schema) - - Added `chart_type` parameter to `refresh_pathways()` function - - Updated Step 2 to loop through chart_types_to_process list - - For "directory": uses existing `process_all_date_filters()` - - For "indication": placeholder with warning (requires Task 3.2 for GP diagnosis) - - Updated logging to show per-chart-type counts - - Updated help text with new examples -- Added imports for new pathway_pipeline functions: - - `ChartType`, `DATE_FILTER_CONFIGS`, `process_indication_pathway_for_date_filter` - - `extract_indication_fields`, `convert_to_records` -### Validation results: -- Tier 1 (Code): `python -m py_compile cli/refresh_pathways.py` — PASSED -- Tier 1 (Code): Import check for `refresh_pathways`, `main` — PASSED -- Tier 1 (Code): `python -m cli.refresh_pathways --help` — Shows new argument — PASSED -- Tier 2 (Data): Not fully testable until Task 3.2 (GP diagnosis integration) -### Files changed: -- `cli/refresh_pathways.py` — added chart_type support -- `IMPLEMENTATION_PLAN.md` — marked Task 3.1 items complete -### Committed: 593d14c "feat: add chart_type argument to refresh command (Task 3.1)" -### Patterns discovered: -- The `convert_to_records()` function already has `chart_type` parameter with default "directory" -- Results dictionary now keyed by "date_filter_id:chart_type" (e.g., "all_6mo:directory") -- Stats now include `chart_type_counts` for summary by chart type -### Next iteration should: -- Start Task 3.2: Integrate Diagnosis-Based Directorate in Pipeline - - This is the key task that enables indication chart processing - - Need to add batch GP diagnosis lookup during `fetch_and_transform_data()` - - Create `indication_df` mapping UPID → Indication_Group (Search_Term or fallback) - - Call `process_indication_pathway_for_date_filter()` with the indication_df - - Key consideration: Batch Snowflake queries for performance (don't query per patient) - - Expected: ~35K patients, need to query in batches of ~1000 - - Coverage logging: "X% diagnosis-matched, Y% fallback" -### Blocked items: -- None - -## Iteration 8 — 2026-02-05 -### Task: 3.2 Integrate Diagnosis-Based Directorate in Pipeline -### Why this task: -- Task 3.1 complete — CLI argument added but indication processing was placeholder -- Task 3.2 is the key task that enables actual indication chart processing -- Previous iteration explicitly recommended starting Task 3.2 -- Task 3.3 (full pipeline test) and Phase 4 (UI) depend on this being complete -- Following "pipeline before UI" principle -### Status: COMPLETE -### What was done: -- Added `batch_lookup_indication_groups()` to `data_processing/diagnosis_lookup.py`: - - Efficient batch function to look up GP diagnoses for all patients - - Queries Snowflake in batches of 500 patients (configurable batch_size) - - Gets all SNOMED codes for drugs from local SQLite (fast) - - Builds single query per batch checking all patient-SNOMED combinations - - Returns DataFrame with: UPID, Indication_Group, Source - - Indication_Group is Search_Term (if matched) or "Directory (no GP dx)" (if fallback) - - Source is "DIAGNOSIS" or "FALLBACK" - - Logs coverage statistics: X% diagnosis-matched, Y% fallback -- Updated `cli/refresh_pathways.py` indication chart processing: - - Import batch_lookup_indication_groups - - When processing indication chart type: - 1. Call batch_lookup_indication_groups(df) to create indication_df - 2. Log coverage statistics to stats dict - 3. Rename Indication_Group → Directory for compatibility with generate_icicle_chart_indication - 4. Set index to UPID for lookup during chart generation - 5. Process all 6 date filters with process_indication_pathway_for_date_filter() - 6. Extract indication fields and convert to records with chart_type="indication" - - Added error handling with fallback to empty results if GP lookup fails -- Added TYPE_CHECKING import for pandas type hints -### Validation results: -- Tier 1 (Code): `python -m py_compile data_processing/diagnosis_lookup.py` — PASSED -- Tier 1 (Code): `python -m py_compile cli/refresh_pathways.py` — PASSED -- Tier 1 (Code): Import check for batch_lookup_indication_groups — PASSED -- Tier 1 (Code): `python -m cli.refresh_pathways --help` — Shows all arguments — PASSED -- Tier 2 (Data): Not fully testable without Snowflake connection (requires --dry-run with SSO) -### Files changed: -- `data_processing/diagnosis_lookup.py` — added batch_lookup_indication_groups(), TYPE_CHECKING import -- `cli/refresh_pathways.py` — integrated batch lookup, added full indication processing flow -- `IMPLEMENTATION_PLAN.md` — marked Task 3.2 items complete -### Committed: 8952156 "feat: integrate batch GP diagnosis lookup for indication charts (Task 3.2)" -### Patterns discovered: -- Batch Snowflake queries: Build one query with IN clauses for both patients AND SNOMED codes -- ORDER BY EventDateTime DESC in query lets us pick first result = most recent in Python -- PersonKey column = PatientPseudonym (used directly for Snowflake lookup) -- indication_df must be indexed by UPID and have 'Directory' column (renamed from Indication_Group) -- Fallback label format: "Directory (no GP dx)" distinguishes matched vs unmatched in chart -### Next iteration should: -- Start Task 3.3: Test Full Refresh Pipeline - - Run `python -m cli.refresh_pathways --chart-type all` with real data (requires Snowflake SSO) - - Verify pathway_nodes table has both chart_type="directory" and chart_type="indication" - - Verify indication chart hierarchy: Trust → Search_Term → Drug → Pathway - - Verify unmatched patients show with "Directory (no GP dx)" labels - - Document: Processing time, record counts, coverage percentages - - If no Snowflake access, skip to Phase 4 (UI) and note as blocked -### Blocked items: -- Task 3.3 verification requires Snowflake connection (NHS SSO) - -## Iteration 9 — 2026-02-05 -### Task: 3.3 Test Full Refresh Pipeline -### Why this task: -- Previous iteration completed Task 3.2 but noted verification was pending -- Task 3.3 is the final task in Phase 3 to validate all pipeline work -- Phase 4 (UI) depends on having working indication chart data -- Following "validate before moving forward" principle -### Status: IN PROGRESS -### What was done: -1. **First refresh attempt**: Ran `python -m cli.refresh_pathways --chart-type all --verbose` - - Directory chart: Processed successfully (293 nodes for all_6mo) - - Indication chart: **0% diagnosis matches** - all 37,257 patients fell back to FALLBACK - - This resulted in no indication pathway data (charts empty) - -2. **Diagnosed root cause #1**: SNOMED codes stored in scientific notation - - CSV has codes like "1.0629311000119108e+16" due to pandas/Excel export - - The `clean_snomed_code()` function only handled ".0" suffix removal - - Codes were stored as "1.06e+16" which never match Snowflake data - - **Fix**: Updated `clean_snomed_code()` to convert scientific notation to integers - - Reloaded 144,056 SNOMED mappings with properly formatted codes - -3. **Diagnosed root cause #2**: Wrong patient identifier used for GP lookup - - `batch_lookup_indication_groups()` was using `PersonKey` column - - `PersonKey` = `LocalPatientID` (provider-specific like "J188448") - - GP records use `PatientPseudonym` which matches `PseudoNHSNoLinked` (SHA-256 hash) - - **Fix**: Changed to use `PseudoNHSNoLinked` column for GP record matching - - Test showed ~20% match rate for ADALIMUMAB patients with correct identifier - -4. **Committed fixes**: `5b1569e` "fix: correct patient identifier for GP diagnosis lookup (Task 3.3)" - -5. **Started second refresh**: Running in background (task ID: be9b9e7) - - Processing time expected: ~15-20 minutes total - - Should now show non-zero GP matches - -### Validation results: -- Tier 1 (Code): Syntax check passed for both modified files -- Tier 1 (Code): Import check passed -- Tier 2 (Data): SNOMED codes now properly formatted (0 scientific notation entries) -- Tier 2 (Data): GP record matching test: 20 matches found in 100 ADALIMUMAB patients -- Tier 2 (Data): Full refresh still running (started 15:XX) - pending final verification -### Files changed: -- `data_processing/load_snomed_mapping.py` — fixed clean_snomed_code() for scientific notation -- `data_processing/diagnosis_lookup.py` — changed to use PseudoNHSNoLinked for GP lookup -- `IMPLEMENTATION_PLAN.md` — marked Task 3.3 as in progress -### Committed: 5b1569e "fix: correct patient identifier for GP diagnosis lookup (Task 3.3)" -### Patterns discovered: -- **Critical**: PersonKey ≠ PatientPseudonym. HCD data has two patient identifiers: - - `LocalPatientID` (aliased as PersonKey) — provider-specific, NOT in GP records - - `PseudoNHSNoLinked` — pseudonymised NHS number, matches `PatientPseudonym` in GP records -- SNOMED codes can have 15-16 digits, causing float precision issues in pandas/Excel exports -- Scientific notation must be converted back to integers for string matching -### Next iteration should: -1. **Check refresh completion**: Read output from task be9b9e7 - - Look for "DIAGNOSIS matches: X%" line in batch lookup output - - Should now show non-zero percentage (expected 10-30% based on ADALIMUMAB test) - - Look for "indication: X nodes total" confirming indication charts generated - -2. **If refresh succeeded**: Verify database state - - `SELECT chart_type, COUNT(*) FROM pathway_nodes GROUP BY chart_type` - - Should show both "directory" (293) and "indication" (expected 300-600) rows - - `SELECT DISTINCT directory FROM pathway_nodes WHERE chart_type='indication' LIMIT 20` - - Should show Search_Term values like "rheumatoid arthritis", "macular degeneration" - -3. **Mark Task 3.3 complete** with validation evidence: - - Processing time - - Record counts per chart type - - Coverage percentage (diagnosis vs fallback) - -4. **If refresh still running**: Wait or check `tail -50` of output file - -5. **Start Phase 4**: If 3.3 passes, begin Task 4.1 (Add Chart Type State to Reflex) -### Blocked items: -- None (Snowflake connection established) -