feat: add direct SNOMED lookup functions (Task 1.3)
Add two new functions to diagnosis_lookup.py for direct SNOMED code matching: - get_drug_snomed_codes(drug_name): Query ref_drug_snomed_mapping for all SNOMED codes mapped to a drug. Returns list of DrugSnomedMapping with snomed_code, snomed_description, search_term, primary_directorate. Tested: ADALIMUMAB returns 1320 mappings across 10 Search_Terms. - patient_has_indication_direct(patient_pseudonym, mappings, connector): Query PrimaryCareClinicalCoding for exact SNOMED code matches. Returns most recent match by EventDateTime with DirectSnomedMatchResult. Both functions follow existing patterns in the module and are exported in __all__. The lookup is case-insensitive for drug names.
This commit is contained in:
@@ -75,6 +75,30 @@ class DrugIndicationMatchRate:
|
||||
sample_unmatched: list[str] = field(default_factory=list) # Sample patient IDs
|
||||
|
||||
|
||||
@dataclass
|
||||
class DrugSnomedMapping:
|
||||
"""SNOMED code mapping for a drug from ref_drug_snomed_mapping."""
|
||||
snomed_code: str
|
||||
snomed_description: str
|
||||
search_term: str
|
||||
primary_directorate: str
|
||||
indication: str = ""
|
||||
ta_id: str = ""
|
||||
|
||||
|
||||
@dataclass
|
||||
class DirectSnomedMatchResult:
|
||||
"""Result of direct SNOMED code lookup in GP records."""
|
||||
patient_pseudonym: str
|
||||
matched: bool
|
||||
snomed_code: Optional[str] = None
|
||||
snomed_description: Optional[str] = None
|
||||
search_term: Optional[str] = None
|
||||
primary_directorate: Optional[str] = None
|
||||
event_date: Optional[datetime] = None
|
||||
source: str = "DIRECT_SNOMED" # DIRECT_SNOMED | NONE
|
||||
|
||||
|
||||
def get_drug_clusters(
|
||||
drug_name: str,
|
||||
db_manager: Optional[DatabaseManager] = None
|
||||
@@ -141,6 +165,166 @@ def get_drug_cluster_ids(
|
||||
return list(set(c["cluster_id"] for c in clusters))
|
||||
|
||||
|
||||
def get_drug_snomed_codes(
|
||||
drug_name: str,
|
||||
db_manager: Optional[DatabaseManager] = None
|
||||
) -> list[DrugSnomedMapping]:
|
||||
"""
|
||||
Get all SNOMED codes for a drug from local ref_drug_snomed_mapping table.
|
||||
|
||||
This uses the enriched mapping CSV data loaded into SQLite, which provides
|
||||
direct SNOMED-to-drug mappings with Search_Term and PrimaryDirectorate.
|
||||
|
||||
Args:
|
||||
drug_name: Drug name to look up (case-insensitive, matches cleaned_drug_name)
|
||||
db_manager: Optional DatabaseManager (defaults to default_db_manager)
|
||||
|
||||
Returns:
|
||||
List of DrugSnomedMapping with snomed_code, snomed_description,
|
||||
search_term, primary_directorate, indication, ta_id
|
||||
"""
|
||||
if db_manager is None:
|
||||
db_manager = default_db_manager
|
||||
|
||||
query = """
|
||||
SELECT DISTINCT
|
||||
snomed_code,
|
||||
snomed_description,
|
||||
search_term,
|
||||
primary_directorate,
|
||||
indication,
|
||||
ta_id
|
||||
FROM ref_drug_snomed_mapping
|
||||
WHERE UPPER(cleaned_drug_name) = UPPER(?)
|
||||
OR UPPER(drug_name) = UPPER(?)
|
||||
ORDER BY search_term, snomed_code
|
||||
"""
|
||||
|
||||
try:
|
||||
with db_manager.get_connection() as conn:
|
||||
cursor = conn.execute(query, (drug_name, drug_name))
|
||||
rows = cursor.fetchall()
|
||||
|
||||
results = []
|
||||
for row in rows:
|
||||
results.append(DrugSnomedMapping(
|
||||
snomed_code=row["snomed_code"],
|
||||
snomed_description=row["snomed_description"] or "",
|
||||
search_term=row["search_term"] or "",
|
||||
primary_directorate=row["primary_directorate"] or "",
|
||||
indication=row["indication"] or "",
|
||||
ta_id=row["ta_id"] or "",
|
||||
))
|
||||
|
||||
logger.debug(f"Found {len(results)} SNOMED mappings for drug '{drug_name}'")
|
||||
return results
|
||||
|
||||
except Exception as e:
|
||||
logger.error(f"Error getting SNOMED codes for drug '{drug_name}': {e}")
|
||||
return []
|
||||
|
||||
|
||||
def patient_has_indication_direct(
|
||||
patient_pseudonym: str,
|
||||
drug_snomed_mappings: list[DrugSnomedMapping],
|
||||
connector: Optional[SnowflakeConnector] = None,
|
||||
before_date: Optional[date] = None,
|
||||
) -> DirectSnomedMatchResult:
|
||||
"""
|
||||
Check if patient has any of the SNOMED codes in their GP records.
|
||||
|
||||
This is the direct SNOMED lookup - it queries PrimaryCareClinicalCoding
|
||||
for exact SNOMED code matches (not via cluster). Returns the most recent
|
||||
match by EventDateTime if multiple matches exist.
|
||||
|
||||
Args:
|
||||
patient_pseudonym: Patient's pseudonymised NHS number
|
||||
drug_snomed_mappings: List of DrugSnomedMapping from get_drug_snomed_codes()
|
||||
connector: Optional SnowflakeConnector (defaults to singleton)
|
||||
before_date: Optional date - only check diagnoses before this date
|
||||
|
||||
Returns:
|
||||
DirectSnomedMatchResult with match details (most recent by EventDateTime)
|
||||
"""
|
||||
result = DirectSnomedMatchResult(
|
||||
patient_pseudonym=patient_pseudonym,
|
||||
matched=False,
|
||||
source="NONE",
|
||||
)
|
||||
|
||||
if not drug_snomed_mappings:
|
||||
return result
|
||||
|
||||
if not SNOWFLAKE_AVAILABLE:
|
||||
logger.warning("Snowflake connector not available")
|
||||
return result
|
||||
|
||||
if not is_snowflake_configured():
|
||||
logger.warning("Snowflake not configured - cannot check GP records")
|
||||
return result
|
||||
|
||||
if connector is None:
|
||||
connector = get_connector()
|
||||
|
||||
# Build lookup dict for mapping snomed_code -> (search_term, primary_directorate, snomed_description)
|
||||
snomed_lookup = {
|
||||
m.snomed_code: (m.search_term, m.primary_directorate, m.snomed_description)
|
||||
for m in drug_snomed_mappings
|
||||
}
|
||||
|
||||
# Get unique SNOMED codes
|
||||
snomed_codes = list(snomed_lookup.keys())
|
||||
|
||||
# Build placeholders for SNOMED codes
|
||||
placeholders = ", ".join(["%s"] * len(snomed_codes))
|
||||
|
||||
# Query to find most recent matching SNOMED code in GP records
|
||||
query = f'''
|
||||
SELECT
|
||||
"SNOMEDCode",
|
||||
"EventDateTime"
|
||||
FROM DATA_HUB.PHM."PrimaryCareClinicalCoding"
|
||||
WHERE "PatientPseudonym" = %s
|
||||
AND "SNOMEDCode" IN ({placeholders})
|
||||
'''
|
||||
|
||||
params: list = [patient_pseudonym] + snomed_codes
|
||||
|
||||
if before_date:
|
||||
query += ' AND "EventDateTime" < %s'
|
||||
params.append(before_date.isoformat())
|
||||
|
||||
query += ' ORDER BY "EventDateTime" DESC LIMIT 1'
|
||||
|
||||
try:
|
||||
results = connector.execute_dict(query, tuple(params))
|
||||
|
||||
if results:
|
||||
row = results[0]
|
||||
matched_code = row.get("SNOMEDCode")
|
||||
event_dt = row.get("EventDateTime")
|
||||
|
||||
if matched_code and matched_code in snomed_lookup:
|
||||
search_term, primary_dir, snomed_desc = snomed_lookup[matched_code]
|
||||
|
||||
return DirectSnomedMatchResult(
|
||||
patient_pseudonym=patient_pseudonym,
|
||||
matched=True,
|
||||
snomed_code=matched_code,
|
||||
snomed_description=snomed_desc,
|
||||
search_term=search_term,
|
||||
primary_directorate=primary_dir,
|
||||
event_date=event_dt,
|
||||
source="DIRECT_SNOMED",
|
||||
)
|
||||
|
||||
return result
|
||||
|
||||
except Exception as e:
|
||||
logger.error(f"Error checking direct SNOMED for patient '{patient_pseudonym}': {e}")
|
||||
return result
|
||||
|
||||
|
||||
def get_cluster_snomed_codes(
|
||||
cluster_id: str,
|
||||
connector: Optional[SnowflakeConnector] = None,
|
||||
@@ -567,9 +751,13 @@ def get_available_clusters(
|
||||
|
||||
# Export public API
|
||||
__all__ = [
|
||||
# Dataclasses
|
||||
"ClusterSnomedCodes",
|
||||
"IndicationValidationResult",
|
||||
"DrugIndicationMatchRate",
|
||||
"DrugSnomedMapping",
|
||||
"DirectSnomedMatchResult",
|
||||
# Cluster-based lookup functions (existing)
|
||||
"get_drug_clusters",
|
||||
"get_drug_cluster_ids",
|
||||
"get_cluster_snomed_codes",
|
||||
@@ -578,4 +766,7 @@ __all__ = [
|
||||
"get_indication_match_rate",
|
||||
"batch_validate_indications",
|
||||
"get_available_clusters",
|
||||
# Direct SNOMED lookup functions (new)
|
||||
"get_drug_snomed_codes",
|
||||
"patient_has_indication_direct",
|
||||
]
|
||||
|
||||
Reference in New Issue
Block a user