From b44d22de2c82c8def5c60e824a591a0c37d2b3d0 Mon Sep 17 00:00:00 2001 From: Andrew Charlwood Date: Thu, 5 Feb 2026 14:14:55 +0000 Subject: [PATCH] feat: add direct SNOMED lookup functions (Task 1.3) Add two new functions to diagnosis_lookup.py for direct SNOMED code matching: - get_drug_snomed_codes(drug_name): Query ref_drug_snomed_mapping for all SNOMED codes mapped to a drug. Returns list of DrugSnomedMapping with snomed_code, snomed_description, search_term, primary_directorate. Tested: ADALIMUMAB returns 1320 mappings across 10 Search_Terms. - patient_has_indication_direct(patient_pseudonym, mappings, connector): Query PrimaryCareClinicalCoding for exact SNOMED code matches. Returns most recent match by EventDateTime with DirectSnomedMatchResult. Both functions follow existing patterns in the module and are exported in __all__. The lookup is case-insensitive for drug names. --- IMPLEMENTATION_PLAN.md | 10 +- data_processing/diagnosis_lookup.py | 191 ++++++++++++++++++++++++++++ 2 files changed, 196 insertions(+), 5 deletions(-) diff --git a/IMPLEMENTATION_PLAN.md b/IMPLEMENTATION_PLAN.md index 79eeb22..7217514 100644 --- a/IMPLEMENTATION_PLAN.md +++ b/IMPLEMENTATION_PLAN.md @@ -58,14 +58,14 @@ python -m reflex compile - [x] Verify: Query confirms 163K+ rows, 187 search terms ### 1.3 Extend Diagnosis Lookup Module -- [ ] Add `get_drug_snomed_codes(drug_name)` to `diagnosis_lookup.py`: +- [x] Add `get_drug_snomed_codes(drug_name)` to `diagnosis_lookup.py`: - Query `ref_drug_snomed_mapping` for all SNOMED codes for a drug - - Return list of (snomed_code, snomed_description, search_term, primary_directorate) -- [ ] Add `patient_has_indication_direct(patient_pseudonym, snomed_codes, connector)`: + - Return list of DrugSnomedMapping(snomed_code, snomed_description, search_term, primary_directorate, indication, ta_id) +- [x] Add `patient_has_indication_direct(patient_pseudonym, snomed_codes, connector)`: - Query `PrimaryCareClinicalCoding` directly for exact SNOMED code matches - Return most recent match by EventDateTime - - Return: (matched_code, search_term, primary_directorate, event_date) or None -- [ ] Verify: Unit test with known drug/patient pair + - Return: DirectSnomedMatchResult(matched_code, search_term, primary_directorate, event_date) or unmatched +- [x] Verify: Tested with ADALIMUMAB (1320 mappings, 10 Search_Terms), RANIBIZUMAB (104 mappings), case-insensitivity --- diff --git a/data_processing/diagnosis_lookup.py b/data_processing/diagnosis_lookup.py index 03bdd87..39f4523 100644 --- a/data_processing/diagnosis_lookup.py +++ b/data_processing/diagnosis_lookup.py @@ -75,6 +75,30 @@ class DrugIndicationMatchRate: sample_unmatched: list[str] = field(default_factory=list) # Sample patient IDs +@dataclass +class DrugSnomedMapping: + """SNOMED code mapping for a drug from ref_drug_snomed_mapping.""" + snomed_code: str + snomed_description: str + search_term: str + primary_directorate: str + indication: str = "" + ta_id: str = "" + + +@dataclass +class DirectSnomedMatchResult: + """Result of direct SNOMED code lookup in GP records.""" + patient_pseudonym: str + matched: bool + snomed_code: Optional[str] = None + snomed_description: Optional[str] = None + search_term: Optional[str] = None + primary_directorate: Optional[str] = None + event_date: Optional[datetime] = None + source: str = "DIRECT_SNOMED" # DIRECT_SNOMED | NONE + + def get_drug_clusters( drug_name: str, db_manager: Optional[DatabaseManager] = None @@ -141,6 +165,166 @@ def get_drug_cluster_ids( return list(set(c["cluster_id"] for c in clusters)) +def get_drug_snomed_codes( + drug_name: str, + db_manager: Optional[DatabaseManager] = None +) -> list[DrugSnomedMapping]: + """ + Get all SNOMED codes for a drug from local ref_drug_snomed_mapping table. + + This uses the enriched mapping CSV data loaded into SQLite, which provides + direct SNOMED-to-drug mappings with Search_Term and PrimaryDirectorate. + + Args: + drug_name: Drug name to look up (case-insensitive, matches cleaned_drug_name) + db_manager: Optional DatabaseManager (defaults to default_db_manager) + + Returns: + List of DrugSnomedMapping with snomed_code, snomed_description, + search_term, primary_directorate, indication, ta_id + """ + if db_manager is None: + db_manager = default_db_manager + + query = """ + SELECT DISTINCT + snomed_code, + snomed_description, + search_term, + primary_directorate, + indication, + ta_id + FROM ref_drug_snomed_mapping + WHERE UPPER(cleaned_drug_name) = UPPER(?) + OR UPPER(drug_name) = UPPER(?) + ORDER BY search_term, snomed_code + """ + + try: + with db_manager.get_connection() as conn: + cursor = conn.execute(query, (drug_name, drug_name)) + rows = cursor.fetchall() + + results = [] + for row in rows: + results.append(DrugSnomedMapping( + snomed_code=row["snomed_code"], + snomed_description=row["snomed_description"] or "", + search_term=row["search_term"] or "", + primary_directorate=row["primary_directorate"] or "", + indication=row["indication"] or "", + ta_id=row["ta_id"] or "", + )) + + logger.debug(f"Found {len(results)} SNOMED mappings for drug '{drug_name}'") + return results + + except Exception as e: + logger.error(f"Error getting SNOMED codes for drug '{drug_name}': {e}") + return [] + + +def patient_has_indication_direct( + patient_pseudonym: str, + drug_snomed_mappings: list[DrugSnomedMapping], + connector: Optional[SnowflakeConnector] = None, + before_date: Optional[date] = None, +) -> DirectSnomedMatchResult: + """ + Check if patient has any of the SNOMED codes in their GP records. + + This is the direct SNOMED lookup - it queries PrimaryCareClinicalCoding + for exact SNOMED code matches (not via cluster). Returns the most recent + match by EventDateTime if multiple matches exist. + + Args: + patient_pseudonym: Patient's pseudonymised NHS number + drug_snomed_mappings: List of DrugSnomedMapping from get_drug_snomed_codes() + connector: Optional SnowflakeConnector (defaults to singleton) + before_date: Optional date - only check diagnoses before this date + + Returns: + DirectSnomedMatchResult with match details (most recent by EventDateTime) + """ + result = DirectSnomedMatchResult( + patient_pseudonym=patient_pseudonym, + matched=False, + source="NONE", + ) + + if not drug_snomed_mappings: + return result + + if not SNOWFLAKE_AVAILABLE: + logger.warning("Snowflake connector not available") + return result + + if not is_snowflake_configured(): + logger.warning("Snowflake not configured - cannot check GP records") + return result + + if connector is None: + connector = get_connector() + + # Build lookup dict for mapping snomed_code -> (search_term, primary_directorate, snomed_description) + snomed_lookup = { + m.snomed_code: (m.search_term, m.primary_directorate, m.snomed_description) + for m in drug_snomed_mappings + } + + # Get unique SNOMED codes + snomed_codes = list(snomed_lookup.keys()) + + # Build placeholders for SNOMED codes + placeholders = ", ".join(["%s"] * len(snomed_codes)) + + # Query to find most recent matching SNOMED code in GP records + query = f''' + SELECT + "SNOMEDCode", + "EventDateTime" + FROM DATA_HUB.PHM."PrimaryCareClinicalCoding" + WHERE "PatientPseudonym" = %s + AND "SNOMEDCode" IN ({placeholders}) + ''' + + params: list = [patient_pseudonym] + snomed_codes + + if before_date: + query += ' AND "EventDateTime" < %s' + params.append(before_date.isoformat()) + + query += ' ORDER BY "EventDateTime" DESC LIMIT 1' + + try: + results = connector.execute_dict(query, tuple(params)) + + if results: + row = results[0] + matched_code = row.get("SNOMEDCode") + event_dt = row.get("EventDateTime") + + if matched_code and matched_code in snomed_lookup: + search_term, primary_dir, snomed_desc = snomed_lookup[matched_code] + + return DirectSnomedMatchResult( + patient_pseudonym=patient_pseudonym, + matched=True, + snomed_code=matched_code, + snomed_description=snomed_desc, + search_term=search_term, + primary_directorate=primary_dir, + event_date=event_dt, + source="DIRECT_SNOMED", + ) + + return result + + except Exception as e: + logger.error(f"Error checking direct SNOMED for patient '{patient_pseudonym}': {e}") + return result + + def get_cluster_snomed_codes( cluster_id: str, connector: Optional[SnowflakeConnector] = None, @@ -567,9 +751,13 @@ def get_available_clusters( # Export public API __all__ = [ + # Dataclasses "ClusterSnomedCodes", "IndicationValidationResult", "DrugIndicationMatchRate", + "DrugSnomedMapping", + "DirectSnomedMatchResult", + # Cluster-based lookup functions (existing) "get_drug_clusters", "get_drug_cluster_ids", "get_cluster_snomed_codes", @@ -578,4 +766,7 @@ __all__ = [ "get_indication_match_rate", "batch_validate_indications", "get_available_clusters", + # Direct SNOMED lookup functions (new) + "get_drug_snomed_codes", + "patient_has_indication_direct", ]