feat: return ALL GP matches with code_frequency in get_patient_indication_groups (Task 1.1)
- Replace QUALIFY ROW_NUMBER()=1 with GROUP BY + COUNT(*) to return all matching Search_Terms per patient instead of just the most recent - Add earliest_hcd_date parameter to restrict GP codes to HCD data window - Return code_frequency column (count of matching SNOMED codes per Search_Term) for use as tiebreaker in drug-aware indication matching - Update empty DataFrame returns to match new column format
This commit is contained in:
@@ -1400,49 +1400,57 @@ def get_patient_indication_groups(
|
||||
patient_pseudonyms: list[str],
|
||||
connector: Optional[SnowflakeConnector] = None,
|
||||
batch_size: int = 500,
|
||||
earliest_hcd_date: Optional[str] = None,
|
||||
) -> "pd.DataFrame":
|
||||
"""
|
||||
Batch lookup GP diagnosis-based indication groups using Snowflake cluster query.
|
||||
|
||||
This function queries Snowflake directly using the embedded cluster CTE
|
||||
(from snomed_indication_mapping_query.sql) to find patients with matching
|
||||
GP diagnoses. This is the NEW approach replacing the old SQLite-based lookup.
|
||||
Returns ALL matching Search_Terms per patient with code_frequency (count of
|
||||
matching SNOMED codes). This enables drug-aware indication matching where
|
||||
each drug is cross-referenced against the patient's GP diagnoses.
|
||||
|
||||
The query:
|
||||
1. Uses the cluster mapping CTE to get all Search_Term -> SNOMED code mappings
|
||||
2. Joins with PrimaryCareClinicalCoding to find patients with matching codes
|
||||
3. Returns the most recent match per patient (by EventDateTime)
|
||||
3. Groups by patient + Search_Term and counts matching codes (code_frequency)
|
||||
4. Optionally restricts to GP codes from earliest_hcd_date onwards
|
||||
|
||||
Args:
|
||||
patient_pseudonyms: List of PseudoNHSNoLinked values (matches PatientPseudonym in GP records)
|
||||
connector: Optional SnowflakeConnector (defaults to singleton)
|
||||
batch_size: Number of patients per Snowflake query batch (default 500)
|
||||
earliest_hcd_date: Optional ISO date string (YYYY-MM-DD). If provided, only
|
||||
counts GP codes from this date onwards. Should be MIN(Intervention Date)
|
||||
from the HCD DataFrame to restrict to the HCD data window.
|
||||
|
||||
Returns:
|
||||
DataFrame with columns:
|
||||
- PatientPseudonym: The patient identifier (PseudoNHSNoLinked value)
|
||||
- Search_Term: The matched indication (e.g., "rheumatoid arthritis")
|
||||
- EventDateTime: Date of the GP diagnosis record
|
||||
- code_frequency: Count of matching SNOMED codes for this Search_Term
|
||||
|
||||
Multiple rows per patient (one per matched Search_Term).
|
||||
Patients not found in results have no matching GP diagnosis.
|
||||
"""
|
||||
import pandas as pd
|
||||
|
||||
logger.info(f"Starting Snowflake-direct indication lookup for {len(patient_pseudonyms)} patients...")
|
||||
if earliest_hcd_date:
|
||||
logger.info(f" Restricting GP codes to >= {earliest_hcd_date}")
|
||||
|
||||
# Handle edge case: empty patient list
|
||||
if not patient_pseudonyms:
|
||||
logger.warning("Empty patient list provided")
|
||||
return pd.DataFrame(columns=['PatientPseudonym', 'Search_Term', 'EventDateTime'])
|
||||
return pd.DataFrame(columns=['PatientPseudonym', 'Search_Term', 'code_frequency'])
|
||||
|
||||
# Check Snowflake availability
|
||||
if not SNOWFLAKE_AVAILABLE:
|
||||
logger.error("Snowflake connector not available - cannot lookup GP records")
|
||||
return pd.DataFrame(columns=['PatientPseudonym', 'Search_Term', 'EventDateTime'])
|
||||
return pd.DataFrame(columns=['PatientPseudonym', 'Search_Term', 'code_frequency'])
|
||||
|
||||
if not is_snowflake_configured():
|
||||
logger.error("Snowflake not configured - cannot lookup GP records")
|
||||
return pd.DataFrame(columns=['PatientPseudonym', 'Search_Term', 'EventDateTime'])
|
||||
return pd.DataFrame(columns=['PatientPseudonym', 'Search_Term', 'code_frequency'])
|
||||
|
||||
if connector is None:
|
||||
connector = get_connector()
|
||||
@@ -1463,36 +1471,43 @@ def get_patient_indication_groups(
|
||||
# Build patient IN clause placeholders
|
||||
patient_placeholders = ", ".join(["%s"] * len(batch_pseudonyms))
|
||||
|
||||
# Build WHERE clause with optional date filter
|
||||
date_filter = ""
|
||||
if earliest_hcd_date:
|
||||
date_filter = f"\n AND pc.\"EventDateTime\" >= %s"
|
||||
|
||||
# Build the full query with cluster CTE
|
||||
# This finds the most recent matching diagnosis for each patient
|
||||
# Note: Column names must be aliased to ensure consistent casing in results
|
||||
# Returns ALL matching Search_Terms per patient with code_frequency
|
||||
# code_frequency = COUNT of matching SNOMED codes per Search_Term per patient
|
||||
query = f"""
|
||||
{CLUSTER_MAPPING_SQL}
|
||||
SELECT
|
||||
pc."PatientPseudonym" AS "PatientPseudonym",
|
||||
aic.Search_Term AS "Search_Term",
|
||||
pc."EventDateTime" AS "EventDateTime"
|
||||
COUNT(*) AS "code_frequency"
|
||||
FROM DATA_HUB.PHM."PrimaryCareClinicalCoding" pc
|
||||
INNER JOIN AllIndicationCodes aic
|
||||
ON pc."SNOMEDCode" = aic.SNOMEDCode
|
||||
WHERE pc."PatientPseudonym" IN ({patient_placeholders})
|
||||
QUALIFY ROW_NUMBER() OVER (
|
||||
PARTITION BY pc."PatientPseudonym"
|
||||
ORDER BY pc."EventDateTime" DESC
|
||||
) = 1
|
||||
WHERE pc."PatientPseudonym" IN ({patient_placeholders}){date_filter}
|
||||
GROUP BY pc."PatientPseudonym", aic.Search_Term
|
||||
"""
|
||||
|
||||
# Build params: patient pseudonyms + optional date
|
||||
params = list(batch_pseudonyms)
|
||||
if earliest_hcd_date:
|
||||
params.append(earliest_hcd_date)
|
||||
|
||||
try:
|
||||
results = connector.execute_dict(query, tuple(batch_pseudonyms))
|
||||
results = connector.execute_dict(query, tuple(params))
|
||||
|
||||
for row in results:
|
||||
all_results.append({
|
||||
'PatientPseudonym': row.get('PatientPseudonym'),
|
||||
'Search_Term': row.get('Search_Term'),
|
||||
'EventDateTime': row.get('EventDateTime'),
|
||||
'code_frequency': row.get('code_frequency', 0),
|
||||
})
|
||||
|
||||
logger.debug(f"Batch {batch_num}: found {len(results)} matches")
|
||||
logger.debug(f"Batch {batch_num}: found {len(results)} patient-indication matches")
|
||||
|
||||
except Exception as e:
|
||||
logger.error(f"Error querying GP records for batch {batch_num}: {e}")
|
||||
@@ -1503,12 +1518,15 @@ QUALIFY ROW_NUMBER() OVER (
|
||||
|
||||
# Log summary statistics
|
||||
if len(result_df) > 0:
|
||||
matched_count = len(result_df)
|
||||
match_rate = 100 * matched_count / total_patients
|
||||
unique_patients = result_df['PatientPseudonym'].nunique()
|
||||
total_rows = len(result_df)
|
||||
match_rate = 100 * unique_patients / total_patients
|
||||
unique_terms = result_df['Search_Term'].nunique()
|
||||
avg_indications = total_rows / unique_patients if unique_patients > 0 else 0
|
||||
logger.info(f"Indication lookup complete:")
|
||||
logger.info(f" Total patients queried: {total_patients}")
|
||||
logger.info(f" Patients with GP match: {matched_count} ({match_rate:.1f}%)")
|
||||
logger.info(f" Patients with GP match: {unique_patients} ({match_rate:.1f}%)")
|
||||
logger.info(f" Total patient-indication rows: {total_rows} (avg {avg_indications:.1f} per patient)")
|
||||
logger.info(f" Unique Search_Terms found: {unique_terms}")
|
||||
|
||||
# Log top Search_Terms
|
||||
|
||||
Reference in New Issue
Block a user