adc1dbfc58
Tested full refresh pipeline end-to-end with real Snowflake data: - Fixed trust filter to read Name column from defaultTrusts.csv - Fixed Decimal type handling in calculate_cost_per_patient_per_annum - Fixed array handling in convert_to_records for average_administered - Added required reference CSV files to data/ directory - Configured Snowflake connection (account, warehouse, user) Results: - Snowflake fetch: 656,695 records in ~7s - Transformations: 519,848 records after UPID/drug/directory - Pathway nodes: 293 for all_6mo (8 trusts, 14 directories) - Total processing time: ~6.2 minutes
490 lines
17 KiB
Python
490 lines
17 KiB
Python
"""
|
|
Pathway data processing pipeline.
|
|
|
|
This module provides functions to:
|
|
1. Fetch and transform raw intervention data from Snowflake
|
|
2. Process data for each of the 6 date filter combinations
|
|
3. Extract denormalized fields from hierarchical path strings
|
|
4. Convert processed data to records for SQLite storage
|
|
|
|
The pipeline integrates with:
|
|
- analysis/pathway_analyzer.py: generate_icicle_chart() for pathway processing
|
|
- data_processing/snowflake_connector.py: fetch_activity_data() for data retrieval
|
|
- tools/data.py: patient_id(), drug_names(), department_identification()
|
|
"""
|
|
|
|
from dataclasses import dataclass
|
|
from datetime import date, timedelta
|
|
from typing import Optional
|
|
import json
|
|
|
|
import pandas as pd
|
|
|
|
from core import PathConfig, default_paths
|
|
from core.logging_config import get_logger
|
|
from analysis.pathway_analyzer import generate_icicle_chart
|
|
from tools.data import patient_id, drug_names, department_identification
|
|
|
|
logger = get_logger(__name__)
|
|
|
|
|
|
@dataclass
|
|
class DateFilterConfig:
|
|
"""Configuration for a date filter combination."""
|
|
|
|
id: str # e.g., 'all_6mo', '1yr_12mo'
|
|
initiated_years: Optional[int] # None for 'All', 1, or 2
|
|
last_seen_months: int # 6 or 12
|
|
|
|
|
|
# Pre-defined date filter configurations matching pathway_date_filters table
|
|
DATE_FILTER_CONFIGS = [
|
|
DateFilterConfig(id="all_6mo", initiated_years=None, last_seen_months=6),
|
|
DateFilterConfig(id="all_12mo", initiated_years=None, last_seen_months=12),
|
|
DateFilterConfig(id="1yr_6mo", initiated_years=1, last_seen_months=6),
|
|
DateFilterConfig(id="1yr_12mo", initiated_years=1, last_seen_months=12),
|
|
DateFilterConfig(id="2yr_6mo", initiated_years=2, last_seen_months=6),
|
|
DateFilterConfig(id="2yr_12mo", initiated_years=2, last_seen_months=12),
|
|
]
|
|
|
|
|
|
def compute_date_ranges(
|
|
config: DateFilterConfig,
|
|
max_date: Optional[date] = None,
|
|
) -> tuple[str, str, str]:
|
|
"""
|
|
Compute actual date strings from a date filter configuration.
|
|
|
|
Args:
|
|
config: DateFilterConfig with initiated_years and last_seen_months
|
|
max_date: Reference date (defaults to today)
|
|
|
|
Returns:
|
|
Tuple of (start_date, end_date, last_seen_date) as ISO format strings
|
|
- start_date: Start of initiated filter period
|
|
- end_date: End of initiated filter period (usually max_date)
|
|
- last_seen_date: Date threshold for last_seen filter
|
|
"""
|
|
if max_date is None:
|
|
max_date = date.today()
|
|
|
|
# Calculate end_date (always max_date)
|
|
end_date = max_date
|
|
|
|
# Calculate start_date based on initiated_years
|
|
if config.initiated_years is None:
|
|
# "All years" - use a very old date
|
|
start_date = date(2000, 1, 1)
|
|
else:
|
|
# Last N years from max_date
|
|
start_date = max_date.replace(year=max_date.year - config.initiated_years)
|
|
|
|
# Calculate last_seen_date based on last_seen_months
|
|
# Patients must have been seen within the last N months
|
|
last_seen_date = max_date - timedelta(days=config.last_seen_months * 30)
|
|
|
|
return (
|
|
start_date.isoformat(),
|
|
end_date.isoformat(),
|
|
last_seen_date.isoformat(),
|
|
)
|
|
|
|
|
|
def fetch_and_transform_data(
|
|
start_date: Optional[date] = None,
|
|
end_date: Optional[date] = None,
|
|
provider_codes: Optional[list[str]] = None,
|
|
paths: Optional[PathConfig] = None,
|
|
) -> pd.DataFrame:
|
|
"""
|
|
Fetch data from Snowflake and apply standard transformations.
|
|
|
|
This function:
|
|
1. Fetches raw intervention data from Snowflake
|
|
2. Applies UPID generation (Provider Code[:3] + PersonKey)
|
|
3. Standardizes drug names via drugnames.csv mapping
|
|
4. Assigns directories using the 5-level fallback logic
|
|
|
|
Args:
|
|
start_date: Optional start date filter for Snowflake query
|
|
end_date: Optional end date filter for Snowflake query
|
|
provider_codes: Optional list of provider codes to filter
|
|
paths: PathConfig for file paths (uses default if None)
|
|
|
|
Returns:
|
|
DataFrame with columns: UPID, Drug Name, Directory, Intervention Date,
|
|
Price Actual, Provider Code, PersonKey, OrganisationName, etc.
|
|
|
|
Raises:
|
|
ImportError: If snowflake-connector-python is not installed
|
|
SnowflakeConnectionError: If connection fails
|
|
"""
|
|
if paths is None:
|
|
paths = default_paths
|
|
|
|
# Import here to avoid circular imports and handle optional dependency
|
|
from data_processing.snowflake_connector import get_connector, is_snowflake_available
|
|
|
|
if not is_snowflake_available():
|
|
raise ImportError(
|
|
"snowflake-connector-python is not installed. "
|
|
"Install it with: pip install snowflake-connector-python"
|
|
)
|
|
|
|
logger.info("Fetching activity data from Snowflake...")
|
|
|
|
connector = get_connector()
|
|
raw_data = connector.fetch_activity_data(
|
|
start_date=start_date,
|
|
end_date=end_date,
|
|
provider_codes=provider_codes,
|
|
max_rows=0, # No limit
|
|
)
|
|
|
|
if not raw_data:
|
|
logger.warning("No data returned from Snowflake")
|
|
return pd.DataFrame()
|
|
|
|
logger.info(f"Fetched {len(raw_data)} records from Snowflake")
|
|
|
|
# Convert to DataFrame
|
|
df = pd.DataFrame(raw_data)
|
|
|
|
# Apply transformations in the standard order
|
|
logger.info("Applying data transformations...")
|
|
|
|
# 1. Generate UPID
|
|
df = patient_id(df)
|
|
logger.info(f"Generated UPID for {df['UPID'].nunique()} unique patients")
|
|
|
|
# 2. Standardize drug names
|
|
df = drug_names(df, paths)
|
|
# Remove rows where drug name mapping failed (NaN)
|
|
before_count = len(df)
|
|
df = df.dropna(subset=['Drug Name'])
|
|
after_count = len(df)
|
|
if before_count != after_count:
|
|
logger.info(f"Removed {before_count - after_count} rows with unmapped drug names")
|
|
|
|
# 3. Assign directories
|
|
df = department_identification(df, paths)
|
|
logger.info(f"Assigned directories to {len(df)} records")
|
|
|
|
# Ensure Intervention Date is datetime
|
|
df['Intervention Date'] = pd.to_datetime(df['Intervention Date'])
|
|
|
|
logger.info(f"Data transformation complete. Final record count: {len(df)}")
|
|
return df
|
|
|
|
|
|
def process_pathway_for_date_filter(
|
|
df: pd.DataFrame,
|
|
config: DateFilterConfig,
|
|
trust_filter: list[str],
|
|
drug_filter: list[str],
|
|
directory_filter: list[str],
|
|
minimum_patients: int = 5,
|
|
max_date: Optional[date] = None,
|
|
paths: Optional[PathConfig] = None,
|
|
) -> Optional[pd.DataFrame]:
|
|
"""
|
|
Process pathway data for a single date filter configuration.
|
|
|
|
Uses the existing generate_icicle_chart() function from pathway_analyzer.py
|
|
to build the pathway hierarchy with treatment statistics.
|
|
|
|
Args:
|
|
df: Transformed DataFrame from fetch_and_transform_data()
|
|
config: DateFilterConfig specifying the date filter combination
|
|
trust_filter: List of trust names to include
|
|
drug_filter: List of drug names to include
|
|
directory_filter: List of directories to include
|
|
minimum_patients: Minimum patients to include a pathway
|
|
max_date: Reference date for computing date ranges
|
|
paths: PathConfig for file paths
|
|
|
|
Returns:
|
|
DataFrame with pathway hierarchy (ice_df) or None if no data
|
|
"""
|
|
if paths is None:
|
|
paths = default_paths
|
|
|
|
# Compute actual date ranges for this filter config
|
|
start_date, end_date, last_seen_date = compute_date_ranges(config, max_date)
|
|
|
|
logger.info(f"Processing pathway for {config.id}")
|
|
logger.info(f" Date range: {start_date} to {end_date}")
|
|
logger.info(f" Last seen after: {last_seen_date}")
|
|
|
|
# Use the existing pathway analyzer
|
|
ice_df, title = generate_icicle_chart(
|
|
df=df,
|
|
start_date=start_date,
|
|
end_date=end_date,
|
|
last_seen_date=last_seen_date,
|
|
trust_filter=trust_filter,
|
|
drug_filter=drug_filter,
|
|
directory_filter=directory_filter,
|
|
minimum_num_patients=minimum_patients,
|
|
title="",
|
|
paths=paths,
|
|
)
|
|
|
|
if ice_df is None or len(ice_df) == 0:
|
|
logger.warning(f"No pathway data for filter {config.id}")
|
|
return None
|
|
|
|
logger.info(f"Generated {len(ice_df)} pathway nodes for {config.id}")
|
|
return ice_df
|
|
|
|
|
|
def extract_denormalized_fields(ice_df: pd.DataFrame) -> pd.DataFrame:
|
|
"""
|
|
Extract denormalized filter columns from the ids column.
|
|
|
|
The ids column contains hierarchical paths like:
|
|
- "N&WICS" (root)
|
|
- "N&WICS - NNUH" (trust level)
|
|
- "N&WICS - NNUH - OPHTHALMOLOGY" (directory level)
|
|
- "N&WICS - NNUH - OPHTHALMOLOGY - RANIBIZUMAB" (first drug)
|
|
- "N&WICS - NNUH - OPHTHALMOLOGY - RANIBIZUMAB - AFLIBERCEPT" (pathway)
|
|
|
|
This function extracts:
|
|
- trust_name: The trust component (level 1)
|
|
- directory: The directory component (level 2)
|
|
- drug_sequence: Pipe-separated drugs (level 3+)
|
|
|
|
Args:
|
|
ice_df: DataFrame from generate_icicle_chart()
|
|
|
|
Returns:
|
|
DataFrame with added columns: trust_name, directory, drug_sequence
|
|
"""
|
|
df = ice_df.copy()
|
|
|
|
# Split ids by " - " delimiter
|
|
def extract_components(ids_str: str) -> tuple[str, str, str]:
|
|
"""Extract trust, directory, and drug sequence from ids string."""
|
|
if not ids_str or pd.isna(ids_str):
|
|
return ("", "", "")
|
|
|
|
parts = ids_str.split(" - ")
|
|
|
|
# Level 0: Root (e.g., "N&WICS")
|
|
if len(parts) <= 1:
|
|
return ("", "", "")
|
|
|
|
# Level 1+: Trust is always parts[1]
|
|
trust_name = parts[1] if len(parts) > 1 else ""
|
|
|
|
# Level 2+: Directory is parts[2]
|
|
directory = parts[2] if len(parts) > 2 else ""
|
|
|
|
# Level 3+: Drugs are parts[3:]
|
|
drugs = parts[3:] if len(parts) > 3 else []
|
|
drug_sequence = "|".join(drugs) if drugs else ""
|
|
|
|
return (trust_name, directory, drug_sequence)
|
|
|
|
# Apply extraction to all rows
|
|
extracted = df['ids'].apply(extract_components)
|
|
df['trust_name'] = extracted.apply(lambda x: x[0])
|
|
df['directory'] = extracted.apply(lambda x: x[1])
|
|
df['drug_sequence'] = extracted.apply(lambda x: x[2])
|
|
|
|
logger.info(f"Extracted denormalized fields for {len(df)} nodes")
|
|
logger.info(f" Unique trusts: {df['trust_name'].nunique()}")
|
|
logger.info(f" Unique directories: {df['directory'].nunique()}")
|
|
|
|
return df
|
|
|
|
|
|
def convert_to_records(
|
|
ice_df: pd.DataFrame,
|
|
date_filter_id: str,
|
|
refresh_id: Optional[str] = None,
|
|
) -> list[dict]:
|
|
"""
|
|
Convert ice_df to a list of dictionaries for SQLite insertion.
|
|
|
|
Maps ice_df columns to pathway_nodes table schema:
|
|
- parents, ids, labels: Direct mapping
|
|
- level: From ice_df['level']
|
|
- value, cost, costpp, colour: Direct mapping
|
|
- cost_pp_pa: From ice_df['cost_pp_pa']
|
|
- first_seen, last_seen, first_seen_parent, last_seen_parent: Date columns
|
|
- average_spacing: From ice_df['average_spacing']
|
|
- average_administered: JSON serialization of list
|
|
- avg_days: From ice_df['avg_days']
|
|
- trust_name, directory, drug_sequence: Denormalized fields
|
|
- date_filter_id: The filter combination ID
|
|
- data_refresh_id: Optional refresh tracking ID
|
|
|
|
Args:
|
|
ice_df: DataFrame from generate_icicle_chart() with denormalized fields
|
|
date_filter_id: The date filter combination ID (e.g., 'all_6mo')
|
|
refresh_id: Optional refresh tracking ID
|
|
|
|
Returns:
|
|
List of dictionaries ready for SQLite insertion
|
|
"""
|
|
records = []
|
|
|
|
for _, row in ice_df.iterrows():
|
|
# Handle date formatting
|
|
first_seen = None
|
|
last_seen = None
|
|
first_seen_parent = None
|
|
last_seen_parent = None
|
|
|
|
if pd.notna(row.get('First seen')):
|
|
if hasattr(row['First seen'], 'isoformat'):
|
|
first_seen = row['First seen'].isoformat()
|
|
else:
|
|
first_seen = str(row['First seen'])
|
|
|
|
if pd.notna(row.get('Last seen')):
|
|
if hasattr(row['Last seen'], 'isoformat'):
|
|
last_seen = row['Last seen'].isoformat()
|
|
else:
|
|
last_seen = str(row['Last seen'])
|
|
|
|
if pd.notna(row.get('First seen (Parent)')):
|
|
first_seen_parent = str(row['First seen (Parent)'])
|
|
|
|
if pd.notna(row.get('Last seen (Parent)')):
|
|
last_seen_parent = str(row['Last seen (Parent)'])
|
|
|
|
# Handle average_administered (could be list, ndarray, or None)
|
|
average_administered = None
|
|
val = row.get('average_administered')
|
|
if val is not None:
|
|
# Check for scalar None-like values
|
|
try:
|
|
if pd.isna(val):
|
|
average_administered = None
|
|
elif isinstance(val, (list, np.ndarray)):
|
|
average_administered = json.dumps(list(val) if hasattr(val, 'tolist') else val)
|
|
else:
|
|
average_administered = str(val)
|
|
except (ValueError, TypeError):
|
|
# pd.isna raises ValueError for arrays with >1 element
|
|
# In that case, val is an array/list, so convert to JSON
|
|
if hasattr(val, 'tolist'):
|
|
average_administered = json.dumps(val.tolist())
|
|
elif isinstance(val, list):
|
|
average_administered = json.dumps(val)
|
|
else:
|
|
average_administered = str(val)
|
|
|
|
record = {
|
|
'date_filter_id': date_filter_id,
|
|
'parents': str(row.get('parents', '')) if pd.notna(row.get('parents')) else '',
|
|
'ids': str(row.get('ids', '')) if pd.notna(row.get('ids')) else '',
|
|
'labels': str(row.get('labels', '')) if pd.notna(row.get('labels')) else '',
|
|
'level': int(row.get('level', 0)) if pd.notna(row.get('level')) else 0,
|
|
'value': int(row.get('value', 0)) if pd.notna(row.get('value')) else 0,
|
|
'cost': float(row.get('cost', 0)) if pd.notna(row.get('cost')) else 0.0,
|
|
'costpp': float(row.get('costpp')) if pd.notna(row.get('costpp')) else None,
|
|
'cost_pp_pa': str(row.get('cost_pp_pa', '')) if pd.notna(row.get('cost_pp_pa')) else None,
|
|
'colour': float(row.get('colour', 0)) if pd.notna(row.get('colour')) else 0.0,
|
|
'first_seen': first_seen,
|
|
'last_seen': last_seen,
|
|
'first_seen_parent': first_seen_parent,
|
|
'last_seen_parent': last_seen_parent,
|
|
'average_spacing': str(row.get('average_spacing', '')) if pd.notna(row.get('average_spacing')) else None,
|
|
'average_administered': average_administered,
|
|
'avg_days': float(row['avg_days'].total_seconds() / 86400) if pd.notna(row.get('avg_days')) and hasattr(row.get('avg_days'), 'total_seconds') else (float(row.get('avg_days')) if pd.notna(row.get('avg_days')) else None),
|
|
'trust_name': row.get('trust_name', '') if pd.notna(row.get('trust_name')) else None,
|
|
'directory': row.get('directory', '') if pd.notna(row.get('directory')) else None,
|
|
'drug_sequence': row.get('drug_sequence', '') if pd.notna(row.get('drug_sequence')) else None,
|
|
'data_refresh_id': refresh_id,
|
|
}
|
|
records.append(record)
|
|
|
|
logger.info(f"Converted {len(records)} pathway nodes to records for {date_filter_id}")
|
|
return records
|
|
|
|
|
|
def process_all_date_filters(
|
|
df: pd.DataFrame,
|
|
trust_filter: list[str],
|
|
drug_filter: list[str],
|
|
directory_filter: list[str],
|
|
minimum_patients: int = 5,
|
|
max_date: Optional[date] = None,
|
|
refresh_id: Optional[str] = None,
|
|
paths: Optional[PathConfig] = None,
|
|
) -> dict[str, list[dict]]:
|
|
"""
|
|
Process pathway data for all 6 date filter combinations.
|
|
|
|
This is a convenience function that processes all DATE_FILTER_CONFIGS
|
|
and returns a dictionary of records ready for SQLite insertion.
|
|
|
|
Args:
|
|
df: Transformed DataFrame from fetch_and_transform_data()
|
|
trust_filter: List of trust names to include
|
|
drug_filter: List of drug names to include
|
|
directory_filter: List of directories to include
|
|
minimum_patients: Minimum patients to include a pathway
|
|
max_date: Reference date for computing date ranges
|
|
refresh_id: Optional refresh tracking ID
|
|
paths: PathConfig for file paths
|
|
|
|
Returns:
|
|
Dictionary mapping date_filter_id to list of record dicts
|
|
e.g., {"all_6mo": [...], "all_12mo": [...], ...}
|
|
"""
|
|
if paths is None:
|
|
paths = default_paths
|
|
|
|
results = {}
|
|
|
|
for config in DATE_FILTER_CONFIGS:
|
|
logger.info(f"Processing date filter: {config.id}")
|
|
|
|
# Process pathway for this date filter
|
|
ice_df = process_pathway_for_date_filter(
|
|
df=df,
|
|
config=config,
|
|
trust_filter=trust_filter,
|
|
drug_filter=drug_filter,
|
|
directory_filter=directory_filter,
|
|
minimum_patients=minimum_patients,
|
|
max_date=max_date,
|
|
paths=paths,
|
|
)
|
|
|
|
if ice_df is None:
|
|
logger.warning(f"Skipping {config.id} - no data")
|
|
results[config.id] = []
|
|
continue
|
|
|
|
# Extract denormalized fields
|
|
ice_df = extract_denormalized_fields(ice_df)
|
|
|
|
# Convert to records
|
|
records = convert_to_records(ice_df, config.id, refresh_id)
|
|
results[config.id] = records
|
|
|
|
logger.info(f"Completed {config.id}: {len(records)} nodes")
|
|
|
|
total_records = sum(len(r) for r in results.values())
|
|
logger.info(f"Total pathway nodes across all filters: {total_records}")
|
|
|
|
return results
|
|
|
|
|
|
# Export public API
|
|
__all__ = [
|
|
"DateFilterConfig",
|
|
"DATE_FILTER_CONFIGS",
|
|
"compute_date_ranges",
|
|
"fetch_and_transform_data",
|
|
"process_pathway_for_date_filter",
|
|
"extract_denormalized_fields",
|
|
"convert_to_records",
|
|
"process_all_date_filters",
|
|
]
|