""" Statistical calculation functions for patient pathway analysis. This module contains functions for calculating: - Drug frequency counts and averages - Cost aggregations (total, per patient, per annum) - Treatment duration calculations - Dosing interval calculations These functions are extracted from the analysis pipeline to enable: - Independent testing - Reuse across different analysis contexts - Clearer separation of concerns """ from itertools import groupby from typing import Optional import numpy as np import pandas as pd def count_consecutive_values(values: list) -> list[int]: """ Count consecutive occurrences of each value in a sorted list. Used to count how many times each drug was administered. Args: values: List of values (typically drug names) Returns: List of counts for each unique value in sorted order Example: >>> count_consecutive_values(['A', 'A', 'B', 'A']) [3, 1] # 'A' appears 3 times, 'B' appears 1 time (sorted) """ return [len(list(group)) for key, group in groupby(sorted(values))] def calculate_drug_costs(drug_counts: list[int], prices: list[float]) -> list[float]: """ Calculate total cost for each drug based on counts and prices. Splits the price list based on drug administration counts and sums each drug's portion. Args: drug_counts: List of administration counts per drug (from count_consecutive_values) prices: List of individual administration prices (Price Actual values) Returns: List of total costs per drug Example: >>> calculate_drug_costs([3, 2], [100, 100, 100, 200, 200]) [300.0, 400.0] # Drug 1: 3x$100 = $300, Drug 2: 2x$200 = $400 """ sum_list = [] cumulative = 0 for count in drug_counts: drug_cost = sum(prices[cumulative:cumulative + count]) sum_list.append(float(drug_cost)) cumulative += count return sum_list def calculate_dosing_frequency( freq: int, start_date: pd.Timestamp, end_date: pd.Timestamp, ) -> float: """ Calculate average dosing interval in days. Computes the average number of days between administrations. Args: freq: Number of administrations start_date: First administration date end_date: Last administration date Returns: Average days between administrations, or 0 if only one dose Example: >>> start = pd.Timestamp('2024-01-01') >>> end = pd.Timestamp('2024-01-22') >>> calculate_dosing_frequency(4, start, end) 7.0 # 21 days / (4-1) = 7 days between doses """ if freq <= 1: return 0.0 duration_days = (end_date - start_date) / np.timedelta64(1, "D") if duration_days <= 0: return 0.0 return duration_days / (freq - 1) def calculate_drug_frequency_row(row: pd.Series) -> list[float]: """ Calculate average dosing frequency for each drug in a patient's treatment. Used with DataFrame.apply() on rows containing drug_*, freq_*, start_date_*, end_date_* columns. Args: row: Series with drug names, frequencies, start dates, and end dates Returns: List of average dosing intervals (days) for each drug """ drug_count = row.index.str.contains("drug_").sum() frequencies = [] for d in range(drug_count): freq_col = f"freq_{d}" start_col = f"start_date_{d}" end_col = f"end_date_{d}" freq = row.get(freq_col, 0) if freq is None or pd.isna(freq): freq = 0 else: freq = int(freq) if freq > 1: start_date = row.get(start_col) end_date = row.get(end_col) if pd.notna(start_date) and pd.notna(end_date): interval = calculate_dosing_frequency(freq, start_date, end_date) else: interval = 0.0 else: interval = 0.0 frequencies.append(interval) return frequencies def calculate_cost_per_patient_per_annum( total_cost: float, days_treated: Optional[pd.Timedelta], ) -> Optional[float]: """ Calculate annualized cost per patient. Normalizes costs to a per-year basis to enable comparison across patients with different treatment durations. Args: total_cost: Total cost for the patient days_treated: Treatment duration as timedelta Returns: Annualized cost, or None if days_treated is 0 or None Example: >>> calculate_cost_per_patient_per_annum(5000, pd.Timedelta(days=182.5)) 10000.0 # Half year treatment, so annual cost is 2x """ if days_treated is None or pd.isna(days_treated): return None days = days_treated / np.timedelta64(1, "D") if hasattr(days_treated, '__truediv__') else float(days_treated) if days <= 0: return None return total_cost / (days / 365) def calculate_treatment_duration( first_seen: pd.Timestamp, last_seen: pd.Timestamp, ) -> pd.Timedelta: """ Calculate treatment duration from first to last seen dates. Args: first_seen: Date of first treatment last_seen: Date of last treatment Returns: Duration as timedelta """ return last_seen - first_seen def calculate_pathway_proportion(value: int, parent_value: int) -> float: """ Calculate proportion of parent value for color scaling. Used to determine color intensity in the icicle chart based on what proportion of the parent category this pathway represents. Args: value: Patient count for this pathway parent_value: Total patient count for the parent category Returns: Proportion (0.0 to 1.0) """ if parent_value <= 0: return 0.0 return value / parent_value def aggregate_patient_costs(df: pd.DataFrame) -> pd.DataFrame: """ Calculate total cost per patient (UPID). Args: df: DataFrame with UPID and Price Actual columns Returns: DataFrame indexed by UPID with Total cost column """ cost_df = df[["UPID", "Price Actual"]] total_costs = cost_df.groupby("UPID").sum() total_costs.rename(columns={"Price Actual": "Total cost"}, inplace=True) return total_costs def aggregate_drug_frequencies(df: pd.DataFrame) -> pd.DataFrame: """ Calculate drug administration frequency per patient. Groups by UPID and returns counts of each drug's administrations. Args: df: DataFrame with UPID and Drug Name columns Returns: DataFrame indexed by UPID with Drug Name as list of counts """ return ( df.groupby("UPID") .agg({"Drug Name": lambda x: count_consecutive_values(list(x))}) .reset_index() .set_index("UPID") ) def calculate_average_spacing_for_pathway( upid_drugs_df: pd.DataFrame, pathway_value: str, ) -> list[float]: """ Calculate average dosing spacing for a treatment pathway. Groups patients by pathway and calculates mean spacing for each drug position. Args: upid_drugs_df: DataFrame with patient pathway data and spacing columns pathway_value: Pathway identifier string Returns: List of average spacing values (days) for each drug in pathway """ spacing_cols = [col for col in upid_drugs_df.columns if col.startswith("spacing_")] pathway_data = upid_drugs_df[upid_drugs_df["value"] == pathway_value] if len(pathway_data) == 0: return [] averages = pathway_data[spacing_cols].mean() return [round(v, 0) if pd.notna(v) else 0.0 for v in averages.tolist()] def format_treatment_statistics( drug_names: list[str], average_administered: list[float], average_spacing: list[float], average_cost: list[float], ) -> str: """ Format drug treatment statistics into a readable string for chart display. Creates an HTML-formatted string with drug name, average administrations, dosing interval, and total treatment length. Args: drug_names: List of drug names in treatment sequence average_administered: Average number of administrations per drug average_spacing: Average days between doses per drug average_cost: Average cost per drug Returns: HTML-formatted string for chart hover text """ ret_string = "" for i, drug_name in enumerate(drug_names): admin_count = average_administered[i] if i < len(average_administered) else 0 spacing_days = average_spacing[i] if i < len(average_spacing) else 0 # Convert to weeks spacing_weeks = spacing_days / 7 if spacing_days > 0 else 0 total_weeks = spacing_weeks * admin_count if admin_count > 0 else 0 string = ( f"
{drug_name}
On average given " f"{round(admin_count, 1)} times with a " f"{round(spacing_weeks, 1)} weekly interval (" f"{round(total_weeks, 0)} weeks total treatment length)" ) ret_string += string return ret_string def remove_nan_values(values: list) -> list: """ Remove NaN string values from a list. Used to clean up aggregated statistics that may contain 'nan' strings. Args: values: List potentially containing 'nan' strings Returns: Filtered list without 'nan' strings """ return [x for x in values if str(x).lower() != "nan"]