feat: add get_patient_indication_groups() for Snowflake-direct GP lookup (Task 1.1)

- Add CLUSTER_MAPPING_SQL constant embedding full snomed_indication_mapping_query.sql
- Add get_patient_indication_groups() function that queries Snowflake directly
- Uses QUALIFY ROW_NUMBER() to get most recent diagnosis per patient
- Returns DataFrame with PatientPseudonym, Search_Term, EventDateTime
- Handles edge cases: empty list, Snowflake unavailable
- Batch processing with configurable batch_size (default 500)
- Comprehensive logging for match statistics
This commit is contained in:
Andrew Charlwood
2026-02-05 17:03:00 +00:00
parent 99bab08402
commit 1a817b8257
3 changed files with 474 additions and 610 deletions
+318
View File
@@ -1087,6 +1087,321 @@ def batch_lookup_indication_groups(
return result_df
# === NEW APPROACH: Query Snowflake directly using cluster CTE ===
# The cluster query mapping (embedded from snomed_indication_mapping_query.sql)
# This maps Search_Term -> Cluster_ID for ~148 clinical conditions
CLUSTER_MAPPING_SQL = """
WITH SearchTermClusters AS (
SELECT Search_Term, Cluster_ID FROM (VALUES
('acute lymphoblastic leukaemia', 'HAEMCANMORPH_COD'),
('acute myeloid leukaemia', 'C19HAEMCAN_COD'),
('acute promyelocytic leukaemia', 'HAEMCANMORPH_COD'),
('allergic asthma', 'AST_COD'),
('allergic rhinitis', 'MILDINTAST_COD'),
('alzheimer''s disease', 'DEMALZ_COD'),
('amyloidosis', 'AMYLOID_COD'),
('anaemia', 'eFI2_AnaemiaTimeSensitive'),
('anaplastic large cell lymphoma', 'C19HAEMCAN_COD'),
('apixaban', 'DOACCON_COD'),
('aplastic anaemia', 'eFI2_AnaemiaEver'),
('arthritis', 'eFI2_InflammatoryArthritis'),
('asthma', 'eFI2_Asthma'),
('atopic dermatitis', 'ATOPDERM_COD'),
('atrial fibrillation', 'eFI2_AtrialFibrillation'),
('attention deficit hyperactivity disorder', 'ADHD_COD'),
('bipolar disorder', 'MH_COD'),
('bladder', 'eFI2_UrinaryIncontinence'),
('breast cancer', 'BRCANSCR_COD'),
('cardiomyopathy', 'eFI2_HarmfulDrinking'),
('cardiovascular disease', 'CVDRISKASS_COD'),
('cervical cancer', 'CSDEC_COD'),
('cholangiocarcinoma', 'eFI2_Cancer'),
('chronic kidney disease', 'CKD_COD'),
('chronic liver disease', 'eFI2_LiverProblems'),
('chronic lymphocytic leukaemia', 'EPPHAEMCAN_COD'),
('chronic myeloid leukaemia', 'EPPHAEMCAN_COD'),
('chronic obstructive pulmonary disease', 'eFI2_COPD'),
('colon cancer', 'eFI2_Cancer'),
('colorectal cancer', 'GICANREF_COD'),
('constipation', 'CHRONCONSTIP_COD'),
('covid-19', 'POSSPOSTCOVID_COD'),
('crohn''s disease', 'eFI2_InflammatoryBowelDisease'),
('cutaneous t-cell lymphoma', 'C19HAEMCAN_COD'),
('cystic fibrosis', 'CUST_ICB_CYSTIC_FIBROSIS'),
('deep vein thrombosis', 'VTE_COD'),
('depression', 'eFI2_Depression'),
('diabetes', 'eFI2_DiabetesEver'),
('diabetic retinopathy', 'DRSELIGIBILITY_COD'),
('diffuse large b-cell lymphoma', 'C19HAEMCAN_COD'),
('dravet syndrome', 'EPIL_COD'),
('drug misuse', 'ILLSUBINT_COD'),
('dyspepsia', 'eFI2_AbdominalPain'),
('epilepsy', 'eFI2_Seizures'),
('fallopian tube', 'STERIL_COD'),
('follicular lymphoma', 'C19HAEMCAN_COD'),
('gastric cancer', 'eFI2_Cancer'),
('giant cell arteritis', 'GCA_COD'),
('glioma', 'NHAEMCANMORPH_COD'),
('gout', 'eFI2_InflammatoryArthritis'),
('graft versus host disease', 'GVHD_COD'),
('granulomatosis with polyangiitis', 'WEGENERVASC_COD'),
('growth hormone deficiency', 'HYPOPITUITARY_COD'),
('hand eczema', 'ECZEMA_COD'),
('heart failure', 'eFI2_HeartFailure'),
('hepatitis b', 'HEPBCVAC_COD'),
('hepatocellular carcinoma', 'eFI2_Cancer'),
('hiv', 'PREFLANG_COD'),
('hodgkin lymphoma', 'HAEMCANMORPH_COD'),
('hormone receptor', 'eFI2_ThyroidProblems'),
('hypercholesterolaemia', 'CLASSFH_COD'),
('immune thrombocytopenia', 'ITP_COD'),
('influenza', 'FLUINVITE_COD'),
('insomnia', 'eFI2_SleepProblems'),
('irritable bowel syndrome', 'IBS_COD'),
('ischaemic stroke', 'OSTR_COD'),
('juvenile idiopathic arthritis', 'RARTHAD_COD'),
('kidney transplant', 'RENALTRANSP_COD'),
('leukaemia', 'eFI2_Cancer'),
('lung cancer', 'FTCANREF_COD'),
('lymphoma', 'C19HAEMCAN_COD'),
('macular degeneration', 'CUST_ICB_VISUAL_IMPAIRMENT'),
('macular oedema', 'CUST_ICB_VISUAL_IMPAIRMENT'),
('major depressive episodes', 'eFI2_Depression'),
('malignant melanoma', 'eFI2_Cancer'),
('malignant pleural mesothelioma', 'LUNGCAN_COD'),
('manic episode', 'MH_COD'),
('mantle cell lymphoma', 'HAEMCANMORPH_COD'),
('melanoma', 'eFI2_Cancer'),
('merkel cell carcinoma', 'C19CAN_COD'),
('migraine', 'eFI2_Headache'),
('motor neurone disease', 'MND_COD'),
('multiple myeloma', 'C19HAEMCAN_COD'),
('multiple sclerosis', 'MS_COD'),
('myelodysplastic', 'eFI2_AnaemiaEver'),
('myelofibrosis', 'MDS_COD'),
('myocardial infarction', 'eFI2_IschaemicHeartDisease'),
('myotonia', 'CNDATRISK2_COD'),
('narcolepsy', 'LD_COD'),
('neuroendocrine tumour', 'LUNGCAN_COD'),
('non-small cell lung cancer', 'LUNGCAN_COD'),
('non-small-cell lung cancer', 'FTCANREF_COD'),
('obesity', 'BMI30_COD'),
('osteoarthritis', 'CUST_ICB_OSTEOARTHRITIS'),
('osteoporosis', 'eFI2_Osteoporosis'),
('osteosarcoma', 'NHAEMCANMORPH_COD'),
('ovarian cancer', 'C19CAN_COD'),
('peripheral arterial disease', 'PADEXC_COD'),
('plaque psoriasis', 'PSORIASIS_COD'),
('polycystic kidney disease', 'EPPCONGMALF_COD'),
('polycythaemia vera', 'C19HAEMCAN_COD'),
('pregnancy', 'C19PREG_COD'),
('primary biliary cholangitis', 'eFI2_LiverProblems'),
('primary hypercholesterolaemia', 'FNFHYP_COD'),
('prostate cancer', 'EPPSOLIDCAN_COD'),
('psoriasis', 'PSORIASIS_COD'),
('psoriatic arthritis', 'RARTHAD_COD'),
('pulmonary embolism', 'eFI2_RespiratoryDiseaseTimeSensitive'),
('pulmonary fibrosis', 'ILD_COD'),
('relapsing multiple sclerosis', 'MS_COD'),
('renal cell carcinoma', 'C19CAN_COD'),
('renal transplantation', 'RENALTRANSP_COD'),
('retinal vein occlusion', 'CUST_ICB_VISUAL_IMPAIRMENT'),
('rheumatoid arthritis', 'eFI2_InflammatoryArthritis'),
('rivaroxaban', 'DOACCON_COD'),
('schizophrenia', 'MH_COD'),
('seizures', 'LSZFREQ_COD'),
('sepsis', 'C19ACTIVITY_COD'),
('severe persistent allergic asthma', 'SEVAST_COD'),
('sickle cell disease', 'SICKLE_COD'),
('sleep apnoea', 'CUST_ICB_NON_SEVERE_LDA'),
('smoking cessation', 'SMOKINGINT_COD'),
('soft tissue sarcoma', 'NHAEMCANMORPH_COD'),
('spinal muscular atrophy', 'MND_COD'),
('squamous cell', 'C19CAN_COD'),
('squamous cell carcinoma', 'C19CAN_COD'),
('stem cell transplant', 'ALLOTRANSP_COD'),
('stroke', 'eFI2_Stroke'),
('systemic lupus erythematosus', 'SLUPUS_COD'),
('systemic mastocytosis', 'HAEMCANMORPH_COD'),
('thrombocytopenic purpura', 'TTP_COD'),
('thrombotic thrombocytopenic purpura', 'TTP_COD'),
('thyroid cancer', 'C19CAN_COD'),
('tophaceous gout', 'CUST_ICB_OSTEOARTHRITIS'),
('transitional cell carcinoma', 'C19CAN_COD'),
('type 1 diabetes', 'DMTYPE1_COD'),
('type 2 diabetes', 'DMTYPE2_COD'),
('ulcerative colitis', 'eFI2_InflammatoryBowelDisease'),
('urothelial carcinoma', 'NHAEMCANMORPH_COD'),
('urticaria', 'XSAL_COD'),
('uveitis', 'CUST_ICB_VISUAL_IMPAIRMENT'),
('vascular disease', 'CVDINVITE_COD'),
('vasculitis', 'CRYOGLOBVASC_COD')
) AS t(Search_Term, Cluster_ID)
),
ClusterCodes AS (
SELECT
stc.Search_Term,
c."SNOMEDCode",
c."SNOMEDDescription"
FROM SearchTermClusters stc
JOIN DATA_HUB.PHM."ClinicalCodingClusterSnomedCodes" c
ON stc.Cluster_ID = c."Cluster_ID"
WHERE c."SNOMEDCode" IS NOT NULL
),
ExplicitCodes AS (
SELECT Search_Term, SNOMEDCode, SNOMEDDescription FROM (VALUES
('acute coronary syndrome', '837091000000100', 'Manual mapping'),
('ankylosing spondylitis', '162930007', 'Manual mapping'),
('ankylosing spondylitis', '239805001', 'Manual mapping'),
('ankylosing spondylitis', '239810002', 'Manual mapping'),
('ankylosing spondylitis', '239811003', 'Manual mapping'),
('ankylosing spondylitis', '394990003', 'Manual mapping'),
('ankylosing spondylitis', '429712009', 'Manual mapping'),
('ankylosing spondylitis', '441562009', 'Manual mapping'),
('ankylosing spondylitis', '441680005', 'Manual mapping'),
('ankylosing spondylitis', '441930001', 'Manual mapping'),
('axial spondyloarthritis', '723116002', 'Manual mapping'),
('choroidal neovascularisation', '380621000000102', 'Manual mapping'),
('choroidal neovascularisation', '733124000', 'Manual mapping')
) AS t(Search_Term, SNOMEDCode, SNOMEDDescription)
),
AllIndicationCodes AS (
SELECT Search_Term, "SNOMEDCode" AS SNOMEDCode, "SNOMEDDescription" AS SNOMEDDescription
FROM ClusterCodes
UNION ALL
SELECT Search_Term, SNOMEDCode, SNOMEDDescription
FROM ExplicitCodes
)
"""
def get_patient_indication_groups(
patient_pseudonyms: list[str],
connector: Optional[SnowflakeConnector] = None,
batch_size: int = 500,
) -> "pd.DataFrame":
"""
Batch lookup GP diagnosis-based indication groups using Snowflake cluster query.
This function queries Snowflake directly using the embedded cluster CTE
(from snomed_indication_mapping_query.sql) to find patients with matching
GP diagnoses. This is the NEW approach replacing the old SQLite-based lookup.
The query:
1. Uses the cluster mapping CTE to get all Search_Term -> SNOMED code mappings
2. Joins with PrimaryCareClinicalCoding to find patients with matching codes
3. Returns the most recent match per patient (by EventDateTime)
Args:
patient_pseudonyms: List of PseudoNHSNoLinked values (matches PatientPseudonym in GP records)
connector: Optional SnowflakeConnector (defaults to singleton)
batch_size: Number of patients per Snowflake query batch (default 500)
Returns:
DataFrame with columns:
- PatientPseudonym: The patient identifier (PseudoNHSNoLinked value)
- Search_Term: The matched indication (e.g., "rheumatoid arthritis")
- EventDateTime: Date of the GP diagnosis record
Patients not found in results have no matching GP diagnosis.
"""
import pandas as pd
logger.info(f"Starting Snowflake-direct indication lookup for {len(patient_pseudonyms)} patients...")
# Handle edge case: empty patient list
if not patient_pseudonyms:
logger.warning("Empty patient list provided")
return pd.DataFrame(columns=['PatientPseudonym', 'Search_Term', 'EventDateTime'])
# Check Snowflake availability
if not SNOWFLAKE_AVAILABLE:
logger.error("Snowflake connector not available - cannot lookup GP records")
return pd.DataFrame(columns=['PatientPseudonym', 'Search_Term', 'EventDateTime'])
if not is_snowflake_configured():
logger.error("Snowflake not configured - cannot lookup GP records")
return pd.DataFrame(columns=['PatientPseudonym', 'Search_Term', 'EventDateTime'])
if connector is None:
connector = get_connector()
# Results list to collect all matches
all_results: list[dict] = []
# Process patients in batches
total_patients = len(patient_pseudonyms)
for batch_start in range(0, total_patients, batch_size):
batch_end = min(batch_start + batch_size, total_patients)
batch_pseudonyms = patient_pseudonyms[batch_start:batch_end]
batch_num = batch_start // batch_size + 1
total_batches = (total_patients + batch_size - 1) // batch_size
logger.info(f"Batch {batch_num}/{total_batches}: patients {batch_start + 1} to {batch_end}")
# Build patient IN clause placeholders
patient_placeholders = ", ".join(["%s"] * len(batch_pseudonyms))
# Build the full query with cluster CTE
# This finds the most recent matching diagnosis for each patient
query = f"""
{CLUSTER_MAPPING_SQL}
SELECT
pc."PatientPseudonym",
aic.Search_Term,
pc."EventDateTime"
FROM DATA_HUB.PHM."PrimaryCareClinicalCoding" pc
INNER JOIN AllIndicationCodes aic
ON pc."SNOMEDCode" = aic.SNOMEDCode
WHERE pc."PatientPseudonym" IN ({patient_placeholders})
QUALIFY ROW_NUMBER() OVER (
PARTITION BY pc."PatientPseudonym"
ORDER BY pc."EventDateTime" DESC
) = 1
"""
try:
results = connector.execute_dict(query, tuple(batch_pseudonyms))
for row in results:
all_results.append({
'PatientPseudonym': row.get('PatientPseudonym'),
'Search_Term': row.get('Search_Term'),
'EventDateTime': row.get('EventDateTime'),
})
logger.debug(f"Batch {batch_num}: found {len(results)} matches")
except Exception as e:
logger.error(f"Error querying GP records for batch {batch_num}: {e}")
# Continue with other batches - partial results are better than none
# Build result DataFrame
result_df = pd.DataFrame(all_results)
# Log summary statistics
if len(result_df) > 0:
matched_count = len(result_df)
match_rate = 100 * matched_count / total_patients
unique_terms = result_df['Search_Term'].nunique()
logger.info(f"Indication lookup complete:")
logger.info(f" Total patients queried: {total_patients}")
logger.info(f" Patients with GP match: {matched_count} ({match_rate:.1f}%)")
logger.info(f" Unique Search_Terms found: {unique_terms}")
# Log top Search_Terms
top_terms = result_df['Search_Term'].value_counts().head(5)
logger.info(f" Top 5 indications: {dict(top_terms)}")
else:
logger.info(f"Indication lookup complete: 0 matches from {total_patients} patients")
return result_df
# Export public API
__all__ = [
# Dataclasses
@@ -1112,4 +1427,7 @@ __all__ = [
"get_directorate_from_diagnosis",
# Batch lookup for indication groups
"batch_lookup_indication_groups",
# Snowflake-direct indication lookup (new approach)
"get_patient_indication_groups",
"CLUSTER_MAPPING_SQL",
]