diff --git a/IMPLEMENTATION_PLAN.md b/IMPLEMENTATION_PLAN.md index 387693b..c93feea 100644 --- a/IMPLEMENTATION_PLAN.md +++ b/IMPLEMENTATION_PLAN.md @@ -57,7 +57,7 @@ Only assign a drug to an indication if BOTH conditions are met. If a patient's d ## Phase 1: Update Snowflake Query & Drug Mapping ### 1.1 Update `get_patient_indication_groups()` to return ALL matches with frequency -- [ ] Modify the Snowflake query in `get_patient_indication_groups()` (diagnosis_lookup.py): +- [x] Modify the Snowflake query in `get_patient_indication_groups()` (diagnosis_lookup.py): - Remove `QUALIFY ROW_NUMBER() OVER (PARTITION BY ... ORDER BY EventDateTime DESC) = 1` - Return ALL matching Search_Terms per patient with code frequency: ```sql @@ -73,10 +73,10 @@ Only assign a drug to an indication if BOTH conditions are met. If a patient's d - `code_frequency` = number of matching SNOMED codes per Search_Term per patient - Higher frequency = more clinical activity = stronger signal for tiebreaker - `earliest_hcd_date` = `MIN(Intervention Date)` from the HCD DataFrame — restricts GP codes to the HCD data window, reducing noise from old/irrelevant diagnoses -- [ ] Accept `earliest_hcd_date` parameter in `get_patient_indication_groups()` and pass to query -- [ ] Keep batch processing (500 patients per query) -- [ ] Update return type: DataFrame now has multiple rows per patient (PatientPseudonym, Search_Term, code_frequency) -- [ ] Verify: Query returns more rows than before (patients with multiple matching diagnoses) +- [x] Accept `earliest_hcd_date` parameter in `get_patient_indication_groups()` and pass to query +- [x] Keep batch processing (500 patients per query) +- [x] Update return type: DataFrame now has multiple rows per patient (PatientPseudonym, Search_Term, code_frequency) +- [ ] Verify: Query returns more rows than before (patients with multiple matching diagnoses) *(requires live Snowflake — will be verified in Phase 3/4)* ### 1.2 Merge related asthma Search_Terms in CLUSTER_MAPPING_SQL - [x] In `CLUSTER_MAPPING_SQL` (diagnosis_lookup.py), merge these 3 Search_Terms into one `"asthma"` entry: diff --git a/data_processing/diagnosis_lookup.py b/data_processing/diagnosis_lookup.py index eb796b7..4b4fed8 100644 --- a/data_processing/diagnosis_lookup.py +++ b/data_processing/diagnosis_lookup.py @@ -1400,49 +1400,57 @@ def get_patient_indication_groups( patient_pseudonyms: list[str], connector: Optional[SnowflakeConnector] = None, batch_size: int = 500, + earliest_hcd_date: Optional[str] = None, ) -> "pd.DataFrame": """ Batch lookup GP diagnosis-based indication groups using Snowflake cluster query. - This function queries Snowflake directly using the embedded cluster CTE - (from snomed_indication_mapping_query.sql) to find patients with matching - GP diagnoses. This is the NEW approach replacing the old SQLite-based lookup. + Returns ALL matching Search_Terms per patient with code_frequency (count of + matching SNOMED codes). This enables drug-aware indication matching where + each drug is cross-referenced against the patient's GP diagnoses. The query: 1. Uses the cluster mapping CTE to get all Search_Term -> SNOMED code mappings 2. Joins with PrimaryCareClinicalCoding to find patients with matching codes - 3. Returns the most recent match per patient (by EventDateTime) + 3. Groups by patient + Search_Term and counts matching codes (code_frequency) + 4. Optionally restricts to GP codes from earliest_hcd_date onwards Args: patient_pseudonyms: List of PseudoNHSNoLinked values (matches PatientPseudonym in GP records) connector: Optional SnowflakeConnector (defaults to singleton) batch_size: Number of patients per Snowflake query batch (default 500) + earliest_hcd_date: Optional ISO date string (YYYY-MM-DD). If provided, only + counts GP codes from this date onwards. Should be MIN(Intervention Date) + from the HCD DataFrame to restrict to the HCD data window. Returns: DataFrame with columns: - PatientPseudonym: The patient identifier (PseudoNHSNoLinked value) - Search_Term: The matched indication (e.g., "rheumatoid arthritis") - - EventDateTime: Date of the GP diagnosis record + - code_frequency: Count of matching SNOMED codes for this Search_Term + Multiple rows per patient (one per matched Search_Term). Patients not found in results have no matching GP diagnosis. """ import pandas as pd logger.info(f"Starting Snowflake-direct indication lookup for {len(patient_pseudonyms)} patients...") + if earliest_hcd_date: + logger.info(f" Restricting GP codes to >= {earliest_hcd_date}") # Handle edge case: empty patient list if not patient_pseudonyms: logger.warning("Empty patient list provided") - return pd.DataFrame(columns=['PatientPseudonym', 'Search_Term', 'EventDateTime']) + return pd.DataFrame(columns=['PatientPseudonym', 'Search_Term', 'code_frequency']) # Check Snowflake availability if not SNOWFLAKE_AVAILABLE: logger.error("Snowflake connector not available - cannot lookup GP records") - return pd.DataFrame(columns=['PatientPseudonym', 'Search_Term', 'EventDateTime']) + return pd.DataFrame(columns=['PatientPseudonym', 'Search_Term', 'code_frequency']) if not is_snowflake_configured(): logger.error("Snowflake not configured - cannot lookup GP records") - return pd.DataFrame(columns=['PatientPseudonym', 'Search_Term', 'EventDateTime']) + return pd.DataFrame(columns=['PatientPseudonym', 'Search_Term', 'code_frequency']) if connector is None: connector = get_connector() @@ -1463,36 +1471,43 @@ def get_patient_indication_groups( # Build patient IN clause placeholders patient_placeholders = ", ".join(["%s"] * len(batch_pseudonyms)) + # Build WHERE clause with optional date filter + date_filter = "" + if earliest_hcd_date: + date_filter = f"\n AND pc.\"EventDateTime\" >= %s" + # Build the full query with cluster CTE - # This finds the most recent matching diagnosis for each patient - # Note: Column names must be aliased to ensure consistent casing in results + # Returns ALL matching Search_Terms per patient with code_frequency + # code_frequency = COUNT of matching SNOMED codes per Search_Term per patient query = f""" {CLUSTER_MAPPING_SQL} SELECT pc."PatientPseudonym" AS "PatientPseudonym", aic.Search_Term AS "Search_Term", - pc."EventDateTime" AS "EventDateTime" + COUNT(*) AS "code_frequency" FROM DATA_HUB.PHM."PrimaryCareClinicalCoding" pc INNER JOIN AllIndicationCodes aic ON pc."SNOMEDCode" = aic.SNOMEDCode -WHERE pc."PatientPseudonym" IN ({patient_placeholders}) -QUALIFY ROW_NUMBER() OVER ( - PARTITION BY pc."PatientPseudonym" - ORDER BY pc."EventDateTime" DESC -) = 1 +WHERE pc."PatientPseudonym" IN ({patient_placeholders}){date_filter} +GROUP BY pc."PatientPseudonym", aic.Search_Term """ + # Build params: patient pseudonyms + optional date + params = list(batch_pseudonyms) + if earliest_hcd_date: + params.append(earliest_hcd_date) + try: - results = connector.execute_dict(query, tuple(batch_pseudonyms)) + results = connector.execute_dict(query, tuple(params)) for row in results: all_results.append({ 'PatientPseudonym': row.get('PatientPseudonym'), 'Search_Term': row.get('Search_Term'), - 'EventDateTime': row.get('EventDateTime'), + 'code_frequency': row.get('code_frequency', 0), }) - logger.debug(f"Batch {batch_num}: found {len(results)} matches") + logger.debug(f"Batch {batch_num}: found {len(results)} patient-indication matches") except Exception as e: logger.error(f"Error querying GP records for batch {batch_num}: {e}") @@ -1503,12 +1518,15 @@ QUALIFY ROW_NUMBER() OVER ( # Log summary statistics if len(result_df) > 0: - matched_count = len(result_df) - match_rate = 100 * matched_count / total_patients + unique_patients = result_df['PatientPseudonym'].nunique() + total_rows = len(result_df) + match_rate = 100 * unique_patients / total_patients unique_terms = result_df['Search_Term'].nunique() + avg_indications = total_rows / unique_patients if unique_patients > 0 else 0 logger.info(f"Indication lookup complete:") logger.info(f" Total patients queried: {total_patients}") - logger.info(f" Patients with GP match: {matched_count} ({match_rate:.1f}%)") + logger.info(f" Patients with GP match: {unique_patients} ({match_rate:.1f}%)") + logger.info(f" Total patient-indication rows: {total_rows} (avg {avg_indications:.1f} per patient)") logger.info(f" Unique Search_Terms found: {unique_terms}") # Log top Search_Terms diff --git a/progress.txt b/progress.txt index 0e7c298..2b9bfe1 100644 --- a/progress.txt +++ b/progress.txt @@ -149,3 +149,47 @@ This project extends the indication-based pathway charts (Phase 1-5 complete) wi - This is independent of Task 1.1 if you mock the gp_matches_df input ### Blocked items: - None + +## Iteration 3 — 2026-02-05 +### Task: 1.1 — Update get_patient_indication_groups() to return ALL matches with code_frequency +### Why this task: +- Previous iteration recommended this as the next task +- This is the last remaining Phase 1 dependency — Tasks 2.1/2.2 need the GP matches data in the right format (multiple rows per patient with code_frequency) +- Task 1.1 changes the query fundamentals; better to do this before building assign_drug_indications() which consumes the output +### Status: COMPLETE +### What was done: +- Modified `get_patient_indication_groups()` in `diagnosis_lookup.py`: + 1. **Removed** `QUALIFY ROW_NUMBER() OVER (PARTITION BY pc."PatientPseudonym" ORDER BY pc."EventDateTime" DESC) = 1` + 2. **Replaced** with `GROUP BY pc."PatientPseudonym", aic.Search_Term` + `COUNT(*) AS "code_frequency"` + 3. **Added** `earliest_hcd_date: Optional[str] = None` parameter + 4. **Added** optional `AND pc."EventDateTime" >= %s` when earliest_hcd_date is provided + 5. **Updated** return columns from `(PatientPseudonym, Search_Term, EventDateTime)` to `(PatientPseudonym, Search_Term, code_frequency)` + 6. **Updated** all empty DataFrame returns to use new column names + 7. **Updated** logging to show multiple-rows-per-patient stats (avg indications per patient) + 8. **Updated** docstring to describe new behavior and parameters +- Backward compatible: `earliest_hcd_date` defaults to `None`, existing callers still work +- Note: caller in `refresh_pathways.py` (line 424-428) does `dict(zip(...))` which will only keep last match per patient with new multi-row format — this will be updated in Task 3.1 +### Validation results: +- Tier 1 (Code): py_compile PASSED, import check PASSED, function signature verified +- Tier 2 (Data): Empty DataFrame returns correct columns ['PatientPseudonym', 'Search_Term', 'code_frequency']; live Snowflake test deferred to Phase 3/4 +- Tier 3 (Functional): N/A (no UI changes) +### Files changed: +- data_processing/diagnosis_lookup.py (modified get_patient_indication_groups function) +- IMPLEMENTATION_PLAN.md (marked 1.1 subtasks [x]) +### Committed: [pending] +### Patterns discovered: +- The `earliest_hcd_date` parameter is passed as a string in ISO format (YYYY-MM-DD) via Snowflake %s placeholder — Snowflake handles string-to-timestamp comparison implicitly +- The GROUP BY approach naturally deduplicates SNOMED codes within the same Search_Term — a patient with the same SNOMED code recorded 5 times gets code_frequency=5 (reflecting clinical activity intensity) +- params list is built dynamically: `batch_pseudonyms + [earliest_hcd_date]` only when date filter is active +### Next iteration should: +- Work on Task 2.1: Create `assign_drug_indications()` function + - This is now unblocked since 1.1 is complete (return format is known) + - Input: HCD df, gp_matches_df (PatientPseudonym, Search_Term, code_frequency), drug_mapping from load_drug_indication_mapping() + - Output: (modified_df with UPID|search_term, indication_df mapping modified_UPID → Search_Term) + - Can be built and tested with mock data (no Snowflake needed) + - Key logic: for each UPID+Drug pair, intersect drug's Search_Terms with patient's GP matches, pick highest code_frequency as tiebreaker + - The function needs PseudoNHSNoLinked to look up GP matches, so the df must have that column + - Task 2.2 (tiebreaker logic) can be done within 2.1 or as a follow-up +- The final Phase 1 subtask (1.1 verify with live Snowflake) will be tested during Phase 3/4 integration +### Blocked items: +- Task 1.1 final subtask "Verify: Query returns more rows" requires live Snowflake — deferred to Phase 3/4