feat: add indication pathway processing functions (Task 2.3)

- Add generate_icicle_chart_indication() to pathway_analyzer.py
  - Variant that uses indication_df instead of directory_df
  - Groups by Trust → Search_Term → Drug → Pathway
  - Accepts indication_df mapping UPID → Indication_Group

- Add process_indication_pathway_for_date_filter() to pathway_pipeline.py
  - Processes indication-based pathway for a single date filter
  - Uses generate_icicle_chart_indication() for hierarchy building

- Add extract_indication_fields() to pathway_pipeline.py
  - Extracts trust_name, search_term, drug_sequence from ids column
  - Similar to extract_denormalized_fields() but for indication charts

- Update convert_to_records() with chart_type parameter
  - Includes chart_type column in output records
  - Supports "directory" and "indication" values

- Add ChartType type alias (Literal["directory", "indication"])

- Update __all__ exports with new functions
This commit is contained in:
Andrew Charlwood
2026-02-05 14:32:28 +00:00
parent aabe4bf45d
commit 7cbc648c6d
3 changed files with 352 additions and 6 deletions
+156 -3
View File
@@ -15,18 +15,22 @@ The pipeline integrates with:
from dataclasses import dataclass
from datetime import date, timedelta
from typing import Optional
from typing import Optional, Literal
import json
import numpy as np
import pandas as pd
from core import PathConfig, default_paths
from core.logging_config import get_logger
from analysis.pathway_analyzer import generate_icicle_chart
from analysis.pathway_analyzer import generate_icicle_chart, generate_icicle_chart_indication
from tools.data import patient_id, drug_names, department_identification
logger = get_logger(__name__)
# Type alias for chart types
ChartType = Literal["directory", "indication"]
@dataclass
class DateFilterConfig:
@@ -238,6 +242,74 @@ def process_pathway_for_date_filter(
return ice_df
def process_indication_pathway_for_date_filter(
df: pd.DataFrame,
indication_df: pd.DataFrame,
config: DateFilterConfig,
trust_filter: list[str],
drug_filter: list[str],
directory_filter: list[str],
minimum_patients: int = 5,
max_date: Optional[date] = None,
paths: Optional[PathConfig] = None,
) -> Optional[pd.DataFrame]:
"""
Process indication-based pathway data for a single date filter configuration.
This is similar to process_pathway_for_date_filter() but uses indication-based
grouping (Search_Term from GP diagnosis) instead of directory grouping.
Hierarchy: Trust → Indication_Group → Drug → Pathway
Args:
df: Transformed DataFrame from fetch_and_transform_data()
indication_df: DataFrame with UPID → Indication_Group mapping
Must have columns: UPID, Indication_Group
Indication_Group is either Search_Term or "Directory (no GP dx)"
config: DateFilterConfig specifying the date filter combination
trust_filter: List of trust names to include
drug_filter: List of drug names to include
directory_filter: List of directories to include
minimum_patients: Minimum patients to include a pathway
max_date: Reference date for computing date ranges
paths: PathConfig for file paths
Returns:
DataFrame with pathway hierarchy (ice_df) or None if no data
"""
if paths is None:
paths = default_paths
# Compute actual date ranges for this filter config
start_date, end_date, last_seen_date = compute_date_ranges(config, max_date)
logger.info(f"Processing indication pathway for {config.id}")
logger.info(f" Date range: {start_date} to {end_date}")
logger.info(f" Last seen after: {last_seen_date}")
# Use the indication-aware pathway analyzer
ice_df, title = generate_icicle_chart_indication(
df=df,
indication_df=indication_df,
start_date=start_date,
end_date=end_date,
last_seen_date=last_seen_date,
trust_filter=trust_filter,
drug_filter=drug_filter,
directory_filter=directory_filter,
minimum_num_patients=minimum_patients,
title="",
paths=paths,
)
if ice_df is None or len(ice_df) == 0:
logger.warning(f"No indication pathway data for filter {config.id}")
return None
logger.info(f"Generated {len(ice_df)} indication pathway nodes for {config.id}")
return ice_df
def extract_denormalized_fields(ice_df: pd.DataFrame) -> pd.DataFrame:
"""
Extract denormalized filter columns from the ids column.
@@ -299,10 +371,79 @@ def extract_denormalized_fields(ice_df: pd.DataFrame) -> pd.DataFrame:
return df
def extract_indication_fields(ice_df: pd.DataFrame) -> pd.DataFrame:
"""
Extract denormalized filter columns from the ids column for indication charts.
Similar to extract_denormalized_fields() but for indication-based charts where
the level-2 grouping is Search_Term (or fallback directorate) instead of Directory.
The ids column contains hierarchical paths like:
- "N&WICS" (root)
- "N&WICS - NNUH" (trust level)
- "N&WICS - NNUH - rheumatoid arthritis" (search_term level - matched patient)
- "N&WICS - NNUH - RHEUMATOLOGY (no GP dx)" (fallback level - unmatched patient)
- "N&WICS - NNUH - rheumatoid arthritis - ADALIMUMAB" (first drug)
- "N&WICS - NNUH - rheumatoid arthritis - ADALIMUMAB - ETANERCEPT" (pathway)
This function extracts:
- trust_name: The trust component (level 1)
- search_term: The Search_Term or fallback directorate (level 2)
- drug_sequence: Pipe-separated drugs (level 3+)
Note: For indication charts, 'directory' column contains the search_term
to maintain schema compatibility with the pathway_nodes table.
Args:
ice_df: DataFrame from generate_icicle_chart() with indication grouping
Returns:
DataFrame with added columns: trust_name, directory (=search_term), drug_sequence
"""
df = ice_df.copy()
def extract_components(ids_str: str) -> tuple[str, str, str]:
"""Extract trust, search_term, and drug sequence from ids string."""
if not ids_str or pd.isna(ids_str):
return ("", "", "")
parts = ids_str.split(" - ")
# Level 0: Root (e.g., "N&WICS")
if len(parts) <= 1:
return ("", "", "")
# Level 1+: Trust is always parts[1]
trust_name = parts[1] if len(parts) > 1 else ""
# Level 2+: Search_term (or fallback) is parts[2]
search_term = parts[2] if len(parts) > 2 else ""
# Level 3+: Drugs are parts[3:]
drugs = parts[3:] if len(parts) > 3 else []
drug_sequence = "|".join(drugs) if drugs else ""
return (trust_name, search_term, drug_sequence)
# Apply extraction to all rows
extracted = df['ids'].apply(extract_components)
df['trust_name'] = extracted.apply(lambda x: x[0])
# Use 'directory' column to store search_term for schema compatibility
df['directory'] = extracted.apply(lambda x: x[1])
df['drug_sequence'] = extracted.apply(lambda x: x[2])
logger.info(f"Extracted indication fields for {len(df)} nodes")
logger.info(f" Unique trusts: {df['trust_name'].nunique()}")
logger.info(f" Unique search_terms: {df['directory'].nunique()}")
return df
def convert_to_records(
ice_df: pd.DataFrame,
date_filter_id: str,
refresh_id: Optional[str] = None,
chart_type: ChartType = "directory",
) -> list[dict]:
"""
Convert ice_df to a list of dictionaries for SQLite insertion.
@@ -318,12 +459,14 @@ def convert_to_records(
- avg_days: From ice_df['avg_days']
- trust_name, directory, drug_sequence: Denormalized fields
- date_filter_id: The filter combination ID
- chart_type: "directory" or "indication"
- data_refresh_id: Optional refresh tracking ID
Args:
ice_df: DataFrame from generate_icicle_chart() with denormalized fields
date_filter_id: The date filter combination ID (e.g., 'all_6mo')
refresh_id: Optional refresh tracking ID
chart_type: Chart type - "directory" (default) or "indication"
Returns:
List of dictionaries ready for SQLite insertion
@@ -379,6 +522,7 @@ def convert_to_records(
record = {
'date_filter_id': date_filter_id,
'chart_type': chart_type,
'parents': str(row.get('parents', '')) if pd.notna(row.get('parents')) else '',
'ids': str(row.get('ids', '')) if pd.notna(row.get('ids')) else '',
'labels': str(row.get('labels', '')) if pd.notna(row.get('labels')) else '',
@@ -402,7 +546,7 @@ def convert_to_records(
}
records.append(record)
logger.info(f"Converted {len(records)} pathway nodes to records for {date_filter_id}")
logger.info(f"Converted {len(records)} pathway nodes to records for {date_filter_id} ({chart_type})")
return records
@@ -478,12 +622,21 @@ def process_all_date_filters(
# Export public API
__all__ = [
# Types
"ChartType",
# Data classes
"DateFilterConfig",
"DATE_FILTER_CONFIGS",
# Core functions
"compute_date_ranges",
"fetch_and_transform_data",
# Directory chart processing
"process_pathway_for_date_filter",
"extract_denormalized_fields",
# Indication chart processing
"process_indication_pathway_for_date_filter",
"extract_indication_fields",
# Common utilities
"convert_to_records",
"process_all_date_filters",
]