diff --git a/IMPLEMENTATION_PLAN.md b/IMPLEMENTATION_PLAN.md index f76819b..5df08f9 100644 --- a/IMPLEMENTATION_PLAN.md +++ b/IMPLEMENTATION_PLAN.md @@ -90,13 +90,18 @@ python -m reflex compile - [x] Verify: Migration adds column, existing data defaults to "directory" ### 2.3 Create Indication Pathway Processing -- [ ] Add `process_indication_pathways()` to `pathway_pipeline.py`: +- [x] Add `process_indication_pathway_for_date_filter()` to `pathway_pipeline.py`: - Group by: Trust → Search_Term → Drug → Pathway - For unmatched patients: use directorate name as Search_Term fallback - Output: Same structure as directory pathways but with indication grouping -- [ ] Add `extract_indication_fields()` for denormalized columns: +- [x] Add `generate_icicle_chart_indication()` to `pathway_analyzer.py`: + - Variant of `generate_icicle_chart()` that uses indication_df instead of directory_df + - Takes `indication_df` parameter mapping UPID → Indication_Group +- [x] Add `extract_indication_fields()` for denormalized columns: - Extract: trust_name, search_term (or fallback_directorate), drug_sequence -- [ ] Verify: Process sample data, check hierarchy structure +- [x] Update `convert_to_records()` to include `chart_type` parameter +- [x] Add `ChartType` type alias ("directory" | "indication") +- [x] Verify: Code compiles, imports work correctly --- diff --git a/analysis/pathway_analyzer.py b/analysis/pathway_analyzer.py index 8ffc34b..053f7e6 100644 --- a/analysis/pathway_analyzer.py +++ b/analysis/pathway_analyzer.py @@ -749,3 +749,191 @@ def generate_icicle_chart( ice_df = prepare_chart_data(ice_df, minimum_num_patients) return ice_df, final_title + + +def generate_icicle_chart_indication( + df: pd.DataFrame, + indication_df: pd.DataFrame, + start_date: str, + end_date: str, + last_seen_date: str, + trust_filter: list[str], + drug_filter: list[str], + directory_filter: list[str], + minimum_num_patients: int, + title: str = "", + paths: Optional[PathConfig] = None, +) -> tuple[pd.DataFrame, str]: + """ + Generate icicle chart data with indication-based grouping. + + This is a variant of generate_icicle_chart() that groups by Search_Term + (from GP diagnosis match) instead of Directory. For patients without + a GP diagnosis match, the fallback directorate is used with a "(no GP dx)" + suffix to distinguish them. + + Hierarchy: Trust → Indication_Group → Drug → Pathway + + Args: + df: DataFrame with processed patient intervention data + indication_df: DataFrame mapping UPID → Indication_Group + Must have 'UPID' as index and 'Indication_Group' column + Values are either Search_Term or "Directory (no GP dx)" + start_date: Start date for patient initiation filter + end_date: End date for patient initiation filter + last_seen_date: Filter for patients last seen after this date + trust_filter: List of trust names to include + drug_filter: List of drug names to include + directory_filter: List of directories to include + minimum_num_patients: Minimum number of patients to include a pathway + title: Chart title (auto-generated if empty) + paths: PathConfig for file paths (uses default if None) + + Returns: + Tuple of (ice_df for chart, final_title) or (None, "") if no data + """ + if paths is None: + paths = default_paths + + # Prepare data - use standard prepare_data function + result = prepare_data(df, trust_filter, drug_filter, directory_filter, paths) + if result[0] is None: + return None, "" + filtered_df, org_codes, directory_df = result + + # For indication charts, we replace directory_df with indication_df + # First, ensure indication_df has the correct format (UPID as index) + if indication_df is not None and not indication_df.empty: + if 'UPID' in indication_df.columns: + indication_df = indication_df.set_index('UPID') + # Rename column for compatibility with build_hierarchy() + if 'Indication_Group' in indication_df.columns: + indication_df = indication_df.rename(columns={'Indication_Group': 'Directory'}) + elif 'indication_group' in indication_df.columns: + indication_df = indication_df.rename(columns={'indication_group': 'Directory'}) + else: + # Fall back to directory if no indication data provided + logger.warning("No indication data provided, falling back to directory grouping") + indication_df = directory_df + + cost_df = filtered_df[["UPID", "Price Actual"]] + total_costs = pd.DataFrame(cost_df.groupby("UPID").sum()) + total_costs.rename(columns={"Price Actual": "Total cost"}, inplace=True) + + result = calculate_statistics(filtered_df, start_date, end_date, last_seen_date, title) + if result[0] is None: + return None, "" + patient_info, date_df, final_title = result + + df_drug_freq = ( + filtered_df.groupby("UPID") + .agg({"Drug Name": lambda x: list(x)}) + .reset_index() + .set_index("UPID") + ) + df_drug_cost = ( + filtered_df.groupby("UPID") + .agg({"Price Actual": lambda x: list(x)}) + .reset_index() + .set_index("UPID") + ) + df_drug_freq["Price Actual"] = df_drug_freq.index.map(df_drug_cost["Price Actual"]) + df_drug_freq["Drug Name"] = df_drug_freq["Drug Name"].apply(_count_list_values) + df_drug_freq["Drug cost total"] = df_drug_freq.apply(lambda x: _sum_list_values(x), axis=1) + + df1_unique = _drop_duplicate_treatments(filtered_df, True) + df_drugs = ( + df1_unique.groupby("UPID") + .agg({"Drug Name": lambda x: list(x)}) + .reset_index() + .set_index("UPID") + ) + df_dates = ( + df1_unique.groupby("UPID") + .agg({"Intervention Date": lambda x: list(x)}) + .reset_index() + .set_index("UPID") + ) + + df_dates_unwrapped = pd.DataFrame( + df_dates["Intervention Date"].values.tolist(), index=df_dates.index + ).add_prefix("date_") + df_drugs_unwrapped = pd.DataFrame( + df_drugs["Drug Name"].values.tolist(), index=df_drugs.index + ).add_prefix("drug_") + + start_dates = ( + filtered_df[["UPIDTreatment", "Intervention Date"]] + .sort_values(by=["Intervention Date"], ascending=True) + .drop_duplicates(subset="UPIDTreatment") + .set_index("UPIDTreatment") + ) + end_dates = ( + filtered_df[["UPIDTreatment", "Intervention Date"]] + .sort_values(by=["Intervention Date"], ascending=False) + .drop_duplicates(subset="UPIDTreatment") + .set_index("UPIDTreatment") + ) + + df_drugs_unwrapped["start_dates"] = df_drugs_unwrapped.apply( + lambda x: _start_date_drug(start_dates, x), axis=1 + ) + df_start_dates_unwrapped = pd.DataFrame( + df_drugs_unwrapped["start_dates"].values.tolist(), index=df_drugs_unwrapped.index + ).add_prefix("start_date_") + df_drugs_unwrapped.drop(["start_dates"], inplace=True, axis=1) + + df_drugs_unwrapped["end_dates"] = df_drugs_unwrapped.apply( + lambda x: _start_date_drug(end_dates, x), axis=1 + ) + df_end_dates_unwrapped_2 = pd.DataFrame( + df_drugs_unwrapped["end_dates"].values.tolist(), index=df_drugs_unwrapped.index + ).add_prefix("end_date_") + df_drugs_unwrapped.drop(["end_dates"], inplace=True, axis=1) + + df_drugs_unwrapped = pd.merge( + df_drugs_unwrapped, df_start_dates_unwrapped, left_index=True, right_index=True + ) + df_drugs_unwrapped = pd.merge( + df_drugs_unwrapped, df_end_dates_unwrapped_2, left_index=True, right_index=True + ) + + df_freq_for_merge = pd.DataFrame( + df_drug_freq["Drug Name"].values.tolist(), index=df_drugs_unwrapped.index + ).add_prefix("freq_") + df_drugs_unwrapped = pd.merge( + df_drugs_unwrapped, df_freq_for_merge, left_index=True, right_index=True + ) + df_drugs_unwrapped["frequency"] = df_drugs_unwrapped.apply( + lambda x: _drug_frequency_average(x), axis=1 + ) + + df_spacing_unwrapped = pd.DataFrame( + df_drugs_unwrapped["frequency"].values.tolist(), index=df_drugs_unwrapped.index + ).add_prefix("spacing_") + df_drugs_unwrapped = pd.merge( + df_drugs_unwrapped, df_spacing_unwrapped, left_index=True, right_index=True + ) + + df_cost_unwrapped = pd.DataFrame( + df_drug_freq["Drug cost total"].values.tolist(), index=df_drugs_unwrapped.index + ).add_prefix("total_cost_drug_") + df_drugs_unwrapped = pd.merge( + df_drugs_unwrapped, df_cost_unwrapped, left_index=True, right_index=True + ) + df_drugs_unwrapped.drop(["frequency"], inplace=True, axis=1) + + # Build hierarchy with indication_df instead of directory_df + ice_df = build_hierarchy( + patient_info, + date_df, + filtered_df, + org_codes, + indication_df, # Use indication mapping instead of directory + total_costs, + df_drugs_unwrapped, + ) + + ice_df = prepare_chart_data(ice_df, minimum_num_patients) + + return ice_df, final_title diff --git a/data_processing/pathway_pipeline.py b/data_processing/pathway_pipeline.py index 436bae5..476f742 100644 --- a/data_processing/pathway_pipeline.py +++ b/data_processing/pathway_pipeline.py @@ -15,18 +15,22 @@ The pipeline integrates with: from dataclasses import dataclass from datetime import date, timedelta -from typing import Optional +from typing import Optional, Literal import json +import numpy as np import pandas as pd from core import PathConfig, default_paths from core.logging_config import get_logger -from analysis.pathway_analyzer import generate_icicle_chart +from analysis.pathway_analyzer import generate_icicle_chart, generate_icicle_chart_indication from tools.data import patient_id, drug_names, department_identification logger = get_logger(__name__) +# Type alias for chart types +ChartType = Literal["directory", "indication"] + @dataclass class DateFilterConfig: @@ -238,6 +242,74 @@ def process_pathway_for_date_filter( return ice_df +def process_indication_pathway_for_date_filter( + df: pd.DataFrame, + indication_df: pd.DataFrame, + config: DateFilterConfig, + trust_filter: list[str], + drug_filter: list[str], + directory_filter: list[str], + minimum_patients: int = 5, + max_date: Optional[date] = None, + paths: Optional[PathConfig] = None, +) -> Optional[pd.DataFrame]: + """ + Process indication-based pathway data for a single date filter configuration. + + This is similar to process_pathway_for_date_filter() but uses indication-based + grouping (Search_Term from GP diagnosis) instead of directory grouping. + + Hierarchy: Trust → Indication_Group → Drug → Pathway + + Args: + df: Transformed DataFrame from fetch_and_transform_data() + indication_df: DataFrame with UPID → Indication_Group mapping + Must have columns: UPID, Indication_Group + Indication_Group is either Search_Term or "Directory (no GP dx)" + config: DateFilterConfig specifying the date filter combination + trust_filter: List of trust names to include + drug_filter: List of drug names to include + directory_filter: List of directories to include + minimum_patients: Minimum patients to include a pathway + max_date: Reference date for computing date ranges + paths: PathConfig for file paths + + Returns: + DataFrame with pathway hierarchy (ice_df) or None if no data + """ + if paths is None: + paths = default_paths + + # Compute actual date ranges for this filter config + start_date, end_date, last_seen_date = compute_date_ranges(config, max_date) + + logger.info(f"Processing indication pathway for {config.id}") + logger.info(f" Date range: {start_date} to {end_date}") + logger.info(f" Last seen after: {last_seen_date}") + + # Use the indication-aware pathway analyzer + ice_df, title = generate_icicle_chart_indication( + df=df, + indication_df=indication_df, + start_date=start_date, + end_date=end_date, + last_seen_date=last_seen_date, + trust_filter=trust_filter, + drug_filter=drug_filter, + directory_filter=directory_filter, + minimum_num_patients=minimum_patients, + title="", + paths=paths, + ) + + if ice_df is None or len(ice_df) == 0: + logger.warning(f"No indication pathway data for filter {config.id}") + return None + + logger.info(f"Generated {len(ice_df)} indication pathway nodes for {config.id}") + return ice_df + + def extract_denormalized_fields(ice_df: pd.DataFrame) -> pd.DataFrame: """ Extract denormalized filter columns from the ids column. @@ -299,10 +371,79 @@ def extract_denormalized_fields(ice_df: pd.DataFrame) -> pd.DataFrame: return df +def extract_indication_fields(ice_df: pd.DataFrame) -> pd.DataFrame: + """ + Extract denormalized filter columns from the ids column for indication charts. + + Similar to extract_denormalized_fields() but for indication-based charts where + the level-2 grouping is Search_Term (or fallback directorate) instead of Directory. + + The ids column contains hierarchical paths like: + - "N&WICS" (root) + - "N&WICS - NNUH" (trust level) + - "N&WICS - NNUH - rheumatoid arthritis" (search_term level - matched patient) + - "N&WICS - NNUH - RHEUMATOLOGY (no GP dx)" (fallback level - unmatched patient) + - "N&WICS - NNUH - rheumatoid arthritis - ADALIMUMAB" (first drug) + - "N&WICS - NNUH - rheumatoid arthritis - ADALIMUMAB - ETANERCEPT" (pathway) + + This function extracts: + - trust_name: The trust component (level 1) + - search_term: The Search_Term or fallback directorate (level 2) + - drug_sequence: Pipe-separated drugs (level 3+) + + Note: For indication charts, 'directory' column contains the search_term + to maintain schema compatibility with the pathway_nodes table. + + Args: + ice_df: DataFrame from generate_icicle_chart() with indication grouping + + Returns: + DataFrame with added columns: trust_name, directory (=search_term), drug_sequence + """ + df = ice_df.copy() + + def extract_components(ids_str: str) -> tuple[str, str, str]: + """Extract trust, search_term, and drug sequence from ids string.""" + if not ids_str or pd.isna(ids_str): + return ("", "", "") + + parts = ids_str.split(" - ") + + # Level 0: Root (e.g., "N&WICS") + if len(parts) <= 1: + return ("", "", "") + + # Level 1+: Trust is always parts[1] + trust_name = parts[1] if len(parts) > 1 else "" + + # Level 2+: Search_term (or fallback) is parts[2] + search_term = parts[2] if len(parts) > 2 else "" + + # Level 3+: Drugs are parts[3:] + drugs = parts[3:] if len(parts) > 3 else [] + drug_sequence = "|".join(drugs) if drugs else "" + + return (trust_name, search_term, drug_sequence) + + # Apply extraction to all rows + extracted = df['ids'].apply(extract_components) + df['trust_name'] = extracted.apply(lambda x: x[0]) + # Use 'directory' column to store search_term for schema compatibility + df['directory'] = extracted.apply(lambda x: x[1]) + df['drug_sequence'] = extracted.apply(lambda x: x[2]) + + logger.info(f"Extracted indication fields for {len(df)} nodes") + logger.info(f" Unique trusts: {df['trust_name'].nunique()}") + logger.info(f" Unique search_terms: {df['directory'].nunique()}") + + return df + + def convert_to_records( ice_df: pd.DataFrame, date_filter_id: str, refresh_id: Optional[str] = None, + chart_type: ChartType = "directory", ) -> list[dict]: """ Convert ice_df to a list of dictionaries for SQLite insertion. @@ -318,12 +459,14 @@ def convert_to_records( - avg_days: From ice_df['avg_days'] - trust_name, directory, drug_sequence: Denormalized fields - date_filter_id: The filter combination ID + - chart_type: "directory" or "indication" - data_refresh_id: Optional refresh tracking ID Args: ice_df: DataFrame from generate_icicle_chart() with denormalized fields date_filter_id: The date filter combination ID (e.g., 'all_6mo') refresh_id: Optional refresh tracking ID + chart_type: Chart type - "directory" (default) or "indication" Returns: List of dictionaries ready for SQLite insertion @@ -379,6 +522,7 @@ def convert_to_records( record = { 'date_filter_id': date_filter_id, + 'chart_type': chart_type, 'parents': str(row.get('parents', '')) if pd.notna(row.get('parents')) else '', 'ids': str(row.get('ids', '')) if pd.notna(row.get('ids')) else '', 'labels': str(row.get('labels', '')) if pd.notna(row.get('labels')) else '', @@ -402,7 +546,7 @@ def convert_to_records( } records.append(record) - logger.info(f"Converted {len(records)} pathway nodes to records for {date_filter_id}") + logger.info(f"Converted {len(records)} pathway nodes to records for {date_filter_id} ({chart_type})") return records @@ -478,12 +622,21 @@ def process_all_date_filters( # Export public API __all__ = [ + # Types + "ChartType", + # Data classes "DateFilterConfig", "DATE_FILTER_CONFIGS", + # Core functions "compute_date_ranges", "fetch_and_transform_data", + # Directory chart processing "process_pathway_for_date_filter", "extract_denormalized_fields", + # Indication chart processing + "process_indication_pathway_for_date_filter", + "extract_indication_fields", + # Common utilities "convert_to_records", "process_all_date_filters", ]