diff --git a/IMPLEMENTATION_PLAN.md b/IMPLEMENTATION_PLAN.md index 7217514..d376bf9 100644 --- a/IMPLEMENTATION_PLAN.md +++ b/IMPLEMENTATION_PLAN.md @@ -72,12 +72,12 @@ python -m reflex compile ## Phase 2: Pathway Processing Updates ### 2.1 Update Directorate Assignment Logic -- [ ] Modify `tools/data.py` `department_identification()` or create wrapper: +- [x] Modify `tools/data.py` `department_identification()` or create wrapper: - Add `get_directorate_from_diagnosis(upid, drug_name, connector)` function - Logic: Try diagnosis-based first → fallback to department_identification() - Return: (directorate, source) where source is "DIAGNOSIS" or "FALLBACK" -- [ ] Track assignment source for metrics (how many diagnosis-based vs fallback) -- [ ] Verify: Test with sample patient data +- [x] Track assignment source for metrics (how many diagnosis-based vs fallback) +- [x] Verify: Test with sample patient data ### 2.2 Add Chart Type Support to Schema - [ ] Add `chart_type` column to `pathway_nodes` table: diff --git a/data_processing/diagnosis_lookup.py b/data_processing/diagnosis_lookup.py index 39f4523..3ab7908 100644 --- a/data_processing/diagnosis_lookup.py +++ b/data_processing/diagnosis_lookup.py @@ -99,6 +99,18 @@ class DirectSnomedMatchResult: source: str = "DIRECT_SNOMED" # DIRECT_SNOMED | NONE +@dataclass +class DirectorateAssignment: + """Result of directorate assignment for a patient-drug combination.""" + upid: str + drug_name: str + directorate: Optional[str] + search_term: Optional[str] = None + source: str = "FALLBACK" # DIAGNOSIS | FALLBACK + snomed_code: Optional[str] = None + event_date: Optional[datetime] = None + + def get_drug_clusters( drug_name: str, db_manager: Optional[DatabaseManager] = None @@ -325,6 +337,106 @@ def patient_has_indication_direct( return result +def get_directorate_from_diagnosis( + upid: str, + drug_name: str, + connector: Optional[SnowflakeConnector] = None, + db_manager: Optional[DatabaseManager] = None, + before_date: Optional[date] = None, +) -> DirectorateAssignment: + """ + Get directorate assignment for a patient-drug combination using diagnosis-based lookup. + + This function attempts to assign a directorate based on the patient's GP records + (direct SNOMED code matching). If no match is found, it returns a FALLBACK result + indicating that the caller should use alternative assignment methods (e.g., + department_identification() from tools/data.py). + + Workflow: + 1. Get all SNOMED codes for the drug from ref_drug_snomed_mapping + 2. Query patient's GP records for matching SNOMED codes + 3. If match found → return diagnosis-based directorate and search_term + 4. If no match → return FALLBACK result (caller handles fallback logic) + + Args: + upid: Patient's unique patient ID (Provider Code[:3] + PersonKey) + drug_name: Drug name to look up + connector: Optional SnowflakeConnector (defaults to singleton) + db_manager: Optional DatabaseManager (defaults to default_db_manager) + before_date: Optional date - only check diagnoses before this date + + Returns: + DirectorateAssignment with directorate, search_term, and source + """ + result = DirectorateAssignment( + upid=upid, + drug_name=drug_name, + directorate=None, + source="FALLBACK", + ) + + # Step 1: Get SNOMED codes for the drug + drug_snomed_mappings = get_drug_snomed_codes(drug_name, db_manager) + + if not drug_snomed_mappings: + logger.debug(f"No SNOMED mappings found for drug '{drug_name}' - using fallback") + return result + + # Step 2: Check Snowflake availability + if not SNOWFLAKE_AVAILABLE: + logger.debug("Snowflake not available - using fallback") + return result + + if not is_snowflake_configured(): + logger.debug("Snowflake not configured - using fallback") + return result + + # Step 3: Get patient pseudonym from UPID + # UPID format is Provider Code (3 chars) + PersonKey + # We need to query Snowflake to get the PatientPseudonym for this PersonKey + # However, patient_has_indication_direct expects PatientPseudonym, not UPID + # For now, we'll use UPID as the identifier - the actual integration + # will need to happen at the DataFrame level where we have PersonKey + # + # NOTE: This function will be called from the pipeline where we have + # access to PatientPseudonym. The UPID is passed for logging/tracking. + + # Actually, looking at the pipeline, we need PatientPseudonym, not UPID. + # The caller should pass the PatientPseudonym or we need to look it up. + # For now, let's assume the caller will use this in a batch context + # where they can map UPID -> PatientPseudonym. + + # Let me reconsider: the function signature takes UPID but we need + # PatientPseudonym for Snowflake. In the pipeline context (fetch_and_transform_data), + # we'll have the PersonKey column which IS the PatientPseudonym. + # So UPID = ProviderCode[:3] + PersonKey, and PersonKey = PatientPseudonym. + # + # We can extract PatientPseudonym from UPID by removing the first 3 chars. + patient_pseudonym = upid[3:] if len(upid) > 3 else upid + + # Step 4: Check patient's GP records for matching SNOMED codes + match_result = patient_has_indication_direct( + patient_pseudonym=patient_pseudonym, + drug_snomed_mappings=drug_snomed_mappings, + connector=connector, + before_date=before_date, + ) + + if match_result.matched and match_result.primary_directorate: + return DirectorateAssignment( + upid=upid, + drug_name=drug_name, + directorate=match_result.primary_directorate, + search_term=match_result.search_term, + source="DIAGNOSIS", + snomed_code=match_result.snomed_code, + event_date=match_result.event_date, + ) + + # No match found - return fallback result + return result + + def get_cluster_snomed_codes( cluster_id: str, connector: Optional[SnowflakeConnector] = None, @@ -757,6 +869,7 @@ __all__ = [ "DrugIndicationMatchRate", "DrugSnomedMapping", "DirectSnomedMatchResult", + "DirectorateAssignment", # Cluster-based lookup functions (existing) "get_drug_clusters", "get_drug_cluster_ids", @@ -769,4 +882,6 @@ __all__ = [ # Direct SNOMED lookup functions (new) "get_drug_snomed_codes", "patient_has_indication_direct", + # Diagnosis-based directorate assignment + "get_directorate_from_diagnosis", ]