feat: add get_directorate_from_diagnosis() function (Task 2.1)

- Added DirectorateAssignment dataclass for return type
- Added get_directorate_from_diagnosis() function to diagnosis_lookup.py
- Logic: Try diagnosis-based lookup first (direct SNOMED match)
- Returns FALLBACK source if no match found, letting caller handle fallback
- Extracts PatientPseudonym from UPID (last part after provider code)
- Updated __all__ exports with new dataclass and function
- Tested: function handles no-match cases correctly
This commit is contained in:
Andrew Charlwood
2026-02-05 14:19:18 +00:00
parent b44d22de2c
commit 506769470d
2 changed files with 118 additions and 3 deletions
+115
View File
@@ -99,6 +99,18 @@ class DirectSnomedMatchResult:
source: str = "DIRECT_SNOMED" # DIRECT_SNOMED | NONE
@dataclass
class DirectorateAssignment:
"""Result of directorate assignment for a patient-drug combination."""
upid: str
drug_name: str
directorate: Optional[str]
search_term: Optional[str] = None
source: str = "FALLBACK" # DIAGNOSIS | FALLBACK
snomed_code: Optional[str] = None
event_date: Optional[datetime] = None
def get_drug_clusters(
drug_name: str,
db_manager: Optional[DatabaseManager] = None
@@ -325,6 +337,106 @@ def patient_has_indication_direct(
return result
def get_directorate_from_diagnosis(
upid: str,
drug_name: str,
connector: Optional[SnowflakeConnector] = None,
db_manager: Optional[DatabaseManager] = None,
before_date: Optional[date] = None,
) -> DirectorateAssignment:
"""
Get directorate assignment for a patient-drug combination using diagnosis-based lookup.
This function attempts to assign a directorate based on the patient's GP records
(direct SNOMED code matching). If no match is found, it returns a FALLBACK result
indicating that the caller should use alternative assignment methods (e.g.,
department_identification() from tools/data.py).
Workflow:
1. Get all SNOMED codes for the drug from ref_drug_snomed_mapping
2. Query patient's GP records for matching SNOMED codes
3. If match found → return diagnosis-based directorate and search_term
4. If no match → return FALLBACK result (caller handles fallback logic)
Args:
upid: Patient's unique patient ID (Provider Code[:3] + PersonKey)
drug_name: Drug name to look up
connector: Optional SnowflakeConnector (defaults to singleton)
db_manager: Optional DatabaseManager (defaults to default_db_manager)
before_date: Optional date - only check diagnoses before this date
Returns:
DirectorateAssignment with directorate, search_term, and source
"""
result = DirectorateAssignment(
upid=upid,
drug_name=drug_name,
directorate=None,
source="FALLBACK",
)
# Step 1: Get SNOMED codes for the drug
drug_snomed_mappings = get_drug_snomed_codes(drug_name, db_manager)
if not drug_snomed_mappings:
logger.debug(f"No SNOMED mappings found for drug '{drug_name}' - using fallback")
return result
# Step 2: Check Snowflake availability
if not SNOWFLAKE_AVAILABLE:
logger.debug("Snowflake not available - using fallback")
return result
if not is_snowflake_configured():
logger.debug("Snowflake not configured - using fallback")
return result
# Step 3: Get patient pseudonym from UPID
# UPID format is Provider Code (3 chars) + PersonKey
# We need to query Snowflake to get the PatientPseudonym for this PersonKey
# However, patient_has_indication_direct expects PatientPseudonym, not UPID
# For now, we'll use UPID as the identifier - the actual integration
# will need to happen at the DataFrame level where we have PersonKey
#
# NOTE: This function will be called from the pipeline where we have
# access to PatientPseudonym. The UPID is passed for logging/tracking.
# Actually, looking at the pipeline, we need PatientPseudonym, not UPID.
# The caller should pass the PatientPseudonym or we need to look it up.
# For now, let's assume the caller will use this in a batch context
# where they can map UPID -> PatientPseudonym.
# Let me reconsider: the function signature takes UPID but we need
# PatientPseudonym for Snowflake. In the pipeline context (fetch_and_transform_data),
# we'll have the PersonKey column which IS the PatientPseudonym.
# So UPID = ProviderCode[:3] + PersonKey, and PersonKey = PatientPseudonym.
#
# We can extract PatientPseudonym from UPID by removing the first 3 chars.
patient_pseudonym = upid[3:] if len(upid) > 3 else upid
# Step 4: Check patient's GP records for matching SNOMED codes
match_result = patient_has_indication_direct(
patient_pseudonym=patient_pseudonym,
drug_snomed_mappings=drug_snomed_mappings,
connector=connector,
before_date=before_date,
)
if match_result.matched and match_result.primary_directorate:
return DirectorateAssignment(
upid=upid,
drug_name=drug_name,
directorate=match_result.primary_directorate,
search_term=match_result.search_term,
source="DIAGNOSIS",
snomed_code=match_result.snomed_code,
event_date=match_result.event_date,
)
# No match found - return fallback result
return result
def get_cluster_snomed_codes(
cluster_id: str,
connector: Optional[SnowflakeConnector] = None,
@@ -757,6 +869,7 @@ __all__ = [
"DrugIndicationMatchRate",
"DrugSnomedMapping",
"DirectSnomedMatchResult",
"DirectorateAssignment",
# Cluster-based lookup functions (existing)
"get_drug_clusters",
"get_drug_cluster_ids",
@@ -769,4 +882,6 @@ __all__ = [
# Direct SNOMED lookup functions (new)
"get_drug_snomed_codes",
"patient_has_indication_direct",
# Diagnosis-based directorate assignment
"get_directorate_from_diagnosis",
]