Files
HighCostDrugsDemo/pathways_app/pathways_app.py
T
2026-02-04 13:04:29 +00:00

2185 lines
80 KiB
Python

"""
NHS High-Cost Drug Patient Pathway Analysis Tool - Reflex Application.
This is the main Reflex application module containing state management
and page components for the pathway analysis tool.
"""
import reflex as rx
from datetime import date, timedelta
from typing import Optional
import pandas as pd
import numpy as np
from pathlib import Path
import plotly.graph_objects as go
import traceback
import os
from rxconfig import config
from pathways_app.components.layout import main_layout, content_area
# NHS Color constants
NHS_BLUE = "rgb(0, 94, 184)"
NHS_DARK_BLUE = "rgb(0, 48, 135)"
# Supported file extensions
SUPPORTED_EXTENSIONS = [".csv", ".parquet", ".pq"]
class State(rx.State):
"""
Application state for the NHS High-Cost Drug Patient Pathway Analysis Tool.
Manages all filter variables, reference data, and analysis state.
This corresponds to the AnalysisFilters dataclass in core/models.py
but is adapted for Reflex's reactive state system.
"""
# Date filter state
start_date: str = "" # ISO format YYYY-MM-DD
end_date: str = ""
last_seen_date: str = ""
# Selection filters (list of selected items)
selected_trusts: list[str] = []
selected_drugs: list[str] = []
selected_directories: list[str] = []
# Analysis parameters
minimum_patients: int = 0
custom_title: str = ""
# Reference data (available options loaded from CSV/SQLite)
available_trusts: list[str] = []
available_drugs: list[str] = []
available_directories: list[str] = []
# Drug default selections (Include=1 in include.csv)
default_drugs: list[str] = []
# Analysis state
analysis_running: bool = False
status_message: str = ""
error_message: str = ""
# Chart state - the Plotly figure
chart_data: go.Figure = go.Figure()
has_chart: bool = False
# Data source state
data_file_path: str = ""
data_source: str = "file" # "file", "sqlite", "snowflake"
data_loaded: bool = False
data_row_count: int = 0
# Snowflake connection state
snowflake_available: bool = False
snowflake_configured: bool = False
snowflake_connected: bool = False
# File upload state
uploaded_file_name: str = ""
uploaded_file_size: int = 0 # bytes
file_upload_error: str = ""
file_upload_success: bool = False
file_processing: bool = False
# SQLite database state
sqlite_available: bool = False
sqlite_row_count: int = 0
sqlite_patient_count: int = 0
# Search/filter state for selection pages
drug_search: str = ""
trust_search: str = ""
directory_search: str = ""
# Export state
last_export_path: str = ""
export_message: str = ""
export_error: str = ""
# Indication validation state
indication_validation_enabled: bool = True
indication_validation_running: bool = False
indication_validation_results: dict = {} # drug_name -> {total, matched, rate}
indication_validation_summary: str = ""
# Store the underlying data for export
_analysis_data: pd.DataFrame = pd.DataFrame()
def _set_default_dates(self):
"""Set default date values based on typical analysis period."""
today = date.today()
one_year_ago = today - timedelta(days=365)
self.start_date = one_year_ago.isoformat()
self.end_date = today.isoformat()
self.last_seen_date = one_year_ago.isoformat()
def load_reference_data(self):
"""
Load reference data from CSV files.
This loads the available drugs, trusts, and directories
that can be selected in the filters.
"""
data_dir = Path("data")
# Load drugs from include.csv
try:
drugs_df = pd.read_csv(data_dir / "include.csv")
self.available_drugs = sorted(drugs_df.iloc[:, 0].astype(str).tolist())
# Get default selections (Include=1)
if "Include" in drugs_df.columns:
self.default_drugs = drugs_df[drugs_df["Include"] == 1].iloc[:, 0].astype(str).tolist()
self.selected_drugs = self.default_drugs.copy()
self.status_message = f"Loaded {len(self.available_drugs)} drugs"
except Exception as e:
self.error_message = f"Failed to load drugs: {e}"
# Load trusts from defaultTrusts.csv
try:
trusts_df = pd.read_csv(data_dir / "defaultTrusts.csv")
self.available_trusts = sorted(trusts_df.iloc[:, 0].astype(str).tolist())
# By default, no trusts selected (include all)
self.selected_trusts = []
except Exception as e:
self.error_message = f"Failed to load trusts: {e}"
# Load directories from directory_list.csv
try:
dirs_df = pd.read_csv(data_dir / "directory_list.csv")
self.available_directories = sorted(dirs_df.iloc[:, 0].astype(str).tolist())
# By default, no directories selected (include all)
self.selected_directories = []
except Exception as e:
self.error_message = f"Failed to load directories: {e}"
# Set default dates
self._set_default_dates()
# Check Snowflake availability
try:
from data_processing.snowflake_connector import is_snowflake_available, is_snowflake_configured
self.snowflake_available = is_snowflake_available()
self.snowflake_configured = is_snowflake_configured()
except ImportError:
self.snowflake_available = False
self.snowflake_configured = False
# Check SQLite database status
self.check_sqlite_status()
# Auto-select best data source
if self.sqlite_available and self.sqlite_row_count > 0:
self.data_source = "sqlite"
elif self.snowflake_configured:
self.data_source = "snowflake"
else:
self.data_source = "file"
# Date setters
def set_start_date(self, value: str):
"""Set the start date for analysis."""
self.start_date = value
def set_end_date(self, value: str):
"""Set the end date for analysis."""
self.end_date = value
def set_last_seen_date(self, value: str):
"""Set the last seen date filter."""
self.last_seen_date = value
# Selection setters
def set_selected_trusts(self, trusts: list[str]):
"""Set the selected NHS trusts."""
self.selected_trusts = trusts
def toggle_trust(self, trust: str):
"""Toggle a trust selection."""
if trust in self.selected_trusts:
self.selected_trusts = [t for t in self.selected_trusts if t != trust]
else:
self.selected_trusts = self.selected_trusts + [trust]
def select_all_trusts(self):
"""Select all available trusts."""
self.selected_trusts = self.available_trusts.copy()
def clear_trusts(self):
"""Clear all trust selections."""
self.selected_trusts = []
def set_selected_drugs(self, drugs: list[str]):
"""Set the selected drugs."""
self.selected_drugs = drugs
def toggle_drug(self, drug: str):
"""Toggle a drug selection."""
if drug in self.selected_drugs:
self.selected_drugs = [d for d in self.selected_drugs if d != drug]
else:
self.selected_drugs = self.selected_drugs + [drug]
def select_all_drugs(self):
"""Select all available drugs."""
self.selected_drugs = self.available_drugs.copy()
def select_default_drugs(self):
"""Select only the default drugs (Include=1)."""
self.selected_drugs = self.default_drugs.copy()
def clear_drugs(self):
"""Clear all drug selections."""
self.selected_drugs = []
def set_selected_directories(self, directories: list[str]):
"""Set the selected directories."""
self.selected_directories = directories
def toggle_directory(self, directory: str):
"""Toggle a directory selection."""
if directory in self.selected_directories:
self.selected_directories = [d for d in self.selected_directories if d != directory]
else:
self.selected_directories = self.selected_directories + [directory]
def select_all_directories(self):
"""Select all available directories."""
self.selected_directories = self.available_directories.copy()
def clear_directories(self):
"""Clear all directory selections."""
self.selected_directories = []
# Analysis parameter setters
def set_minimum_patients(self, value: int):
"""Set the minimum patients threshold."""
self.minimum_patients = max(0, value)
def set_minimum_patients_from_input(self, value: str):
"""Set minimum patients threshold from string input."""
try:
self.minimum_patients = max(0, int(value)) if value else 0
except ValueError:
pass # Ignore invalid input
def set_minimum_patients_from_slider(self, values: list[float]):
"""Set minimum patients threshold from slider value (list)."""
if values:
self.minimum_patients = max(0, int(values[0]))
def set_custom_title(self, value: str):
"""Set a custom title for the analysis."""
self.custom_title = value
# Data source methods
def set_data_file_path(self, path: str):
"""Set the data file path for analysis."""
self.data_file_path = path
def set_data_source(self, source: str):
"""Set the data source type (file, sqlite, snowflake)."""
if source in ("file", "sqlite", "snowflake"):
self.data_source = source
# Status methods
def set_status(self, message: str):
"""Update the status message."""
self.status_message = message
def set_error(self, message: str):
"""Set an error message."""
self.error_message = message
def clear_error(self):
"""Clear the error message."""
self.error_message = ""
# File handling methods
async def handle_file_upload(self, files: list[rx.UploadFile]):
"""
Handle file upload for CSV/Parquet data files.
This accepts uploaded files and processes them for analysis.
"""
self.file_upload_error = ""
self.file_upload_success = False
if not files:
self.file_upload_error = "No file selected"
return
file = files[0] # Take first file only
file_name = file.filename
file_ext = Path(file_name).suffix.lower()
# Validate file extension
if file_ext not in SUPPORTED_EXTENSIONS:
self.file_upload_error = f"Unsupported file type: {file_ext}. Please upload CSV or Parquet files."
return
self.file_processing = True
self.status_message = f"Processing {file_name}..."
yield # Update UI
try:
# Read file content
file_content = await file.read()
file_size = len(file_content)
self.uploaded_file_size = file_size
# Save to uploads directory
upload_dir = Path("data/uploads")
upload_dir.mkdir(parents=True, exist_ok=True)
upload_path = upload_dir / file_name
with open(upload_path, "wb") as f:
f.write(file_content)
self.uploaded_file_name = file_name
self.data_file_path = str(upload_path)
self.data_source = "file"
self.file_upload_success = True
# Format file size for display
if file_size < 1024:
size_str = f"{file_size} bytes"
elif file_size < 1024 * 1024:
size_str = f"{file_size / 1024:.1f} KB"
else:
size_str = f"{file_size / (1024 * 1024):.1f} MB"
self.status_message = f"Uploaded {file_name} ({size_str})"
except Exception as e:
self.file_upload_error = f"Upload failed: {str(e)}"
self.file_upload_success = False
finally:
self.file_processing = False
def clear_uploaded_file(self):
"""Clear the uploaded file and reset file state."""
self.uploaded_file_name = ""
self.uploaded_file_size = 0
self.data_file_path = ""
self.file_upload_success = False
self.file_upload_error = ""
self.status_message = "File cleared"
def check_sqlite_status(self):
"""Check if SQLite database is available and get statistics."""
try:
from data_processing.database import default_db_manager
from data_processing.patient_data import get_patient_data_stats
if default_db_manager.exists:
stats = get_patient_data_stats(default_db_manager)
self.sqlite_available = stats.get("total_rows", 0) > 0
self.sqlite_row_count = stats.get("total_rows", 0)
self.sqlite_patient_count = stats.get("unique_patients", 0)
if self.sqlite_available:
self.status_message = f"SQLite database: {self.sqlite_row_count:,} rows, {self.sqlite_patient_count:,} patients"
else:
self.status_message = "SQLite database exists but has no data"
else:
self.sqlite_available = False
self.sqlite_row_count = 0
self.sqlite_patient_count = 0
self.status_message = "SQLite database not found"
except ImportError:
self.sqlite_available = False
self.status_message = "Data processing module not available"
except Exception as e:
self.sqlite_available = False
self.status_message = f"Error checking SQLite: {str(e)}"
def use_sqlite_source(self):
"""Set data source to SQLite database."""
self.data_source = "sqlite"
self.data_file_path = ""
self.status_message = "Using SQLite database as data source"
def use_file_source(self):
"""Set data source to uploaded file."""
if self.uploaded_file_name:
self.data_source = "file"
self.status_message = f"Using uploaded file: {self.uploaded_file_name}"
else:
self.status_message = "No file uploaded. Please upload a file first."
def use_snowflake_source(self):
"""Set data source to Snowflake (if available)."""
if self.snowflake_configured:
self.data_source = "snowflake"
self.status_message = "Using Snowflake as data source"
else:
self.status_message = "Snowflake is not configured. Check config/snowflake.toml"
@rx.var
def data_source_display(self) -> str:
"""Human-readable data source description."""
if self.data_source == "file":
if self.uploaded_file_name:
return f"File: {self.uploaded_file_name}"
return "File: No file selected"
elif self.data_source == "sqlite":
if self.sqlite_available:
return f"SQLite: {self.sqlite_row_count:,} rows"
return "SQLite: Not available"
elif self.data_source == "snowflake":
if self.snowflake_configured:
return "Snowflake: Ready"
return "Snowflake: Not configured"
return "Unknown"
@rx.var
def file_size_display(self) -> str:
"""Human-readable file size."""
if self.uploaded_file_size == 0:
return ""
if self.uploaded_file_size < 1024:
return f"{self.uploaded_file_size} bytes"
elif self.uploaded_file_size < 1024 * 1024:
return f"{self.uploaded_file_size / 1024:.1f} KB"
else:
return f"{self.uploaded_file_size / (1024 * 1024):.1f} MB"
# Validation
def validate_filters(self) -> list[str]:
"""
Validate the current filter configuration.
Returns a list of error messages (empty if valid).
"""
errors = []
# Check dates are set
if not self.start_date:
errors.append("Start date is required")
if not self.end_date:
errors.append("End date is required")
if not self.last_seen_date:
errors.append("Last seen date is required")
# Check date order
if self.start_date and self.end_date:
if self.end_date < self.start_date:
errors.append("End date cannot be before start date")
if self.last_seen_date and self.end_date:
if self.last_seen_date > self.end_date:
errors.append("Last seen date is after end date (would exclude all patients)")
# Check minimum patients
if self.minimum_patients < 0:
errors.append("Minimum patients cannot be negative")
# Check at least some drugs are selected (warning, not error)
# Empty selection means "include all"
return errors
@rx.var
def filter_summary(self) -> str:
"""Generate a summary of current filter settings."""
lines = []
if self.start_date and self.end_date:
lines.append(f"Date range: {self.start_date} to {self.end_date}")
if self.last_seen_date:
lines.append(f"Last seen after: {self.last_seen_date}")
lines.append(f"Minimum patients: {self.minimum_patients}")
if self.selected_trusts:
lines.append(f"Trusts: {len(self.selected_trusts)} selected")
else:
lines.append("Trusts: All")
if self.selected_drugs:
lines.append(f"Drugs: {len(self.selected_drugs)} selected")
else:
lines.append("Drugs: All")
if self.selected_directories:
lines.append(f"Directories: {len(self.selected_directories)} selected")
else:
lines.append("Directories: All")
return "\n".join(lines)
@rx.var
def display_title(self) -> str:
"""Generate the display title for the analysis."""
if self.custom_title:
return self.custom_title
if self.start_date and self.end_date:
return f"Patients initiated from {self.start_date} to {self.end_date}"
return "Patient Pathway Analysis"
@rx.var
def drug_selection_count(self) -> str:
"""Display count of selected drugs."""
return f"{len(self.selected_drugs)} of {len(self.available_drugs)} drugs selected"
@rx.var
def trust_selection_count(self) -> str:
"""Display count of selected trusts."""
if not self.selected_trusts:
return f"All {len(self.available_trusts)} trusts (none selected)"
return f"{len(self.selected_trusts)} of {len(self.available_trusts)} trusts selected"
@rx.var
def directory_selection_count(self) -> str:
"""Display count of selected directories."""
if not self.selected_directories:
return f"All {len(self.available_directories)} directories (none selected)"
return f"{len(self.selected_directories)} of {len(self.available_directories)} directories selected"
# Search setters
def set_drug_search(self, value: str):
"""Set the drug search filter text."""
self.drug_search = value
def set_trust_search(self, value: str):
"""Set the trust search filter text."""
self.trust_search = value
def set_directory_search(self, value: str):
"""Set the directory search filter text."""
self.directory_search = value
def clear_drug_search(self):
"""Clear the drug search filter."""
self.drug_search = ""
def clear_trust_search(self):
"""Clear the trust search filter."""
self.trust_search = ""
def clear_directory_search(self):
"""Clear the directory search filter."""
self.directory_search = ""
@rx.var
def filtered_drugs(self) -> list[str]:
"""Get the list of drugs filtered by search text."""
if not self.drug_search:
return self.available_drugs
search_lower = self.drug_search.lower()
return [d for d in self.available_drugs if search_lower in d.lower()]
@rx.var
def filtered_trusts(self) -> list[str]:
"""Get the list of trusts filtered by search text."""
if not self.trust_search:
return self.available_trusts
search_lower = self.trust_search.lower()
return [t for t in self.available_trusts if search_lower in t.lower()]
@rx.var
def filtered_directories(self) -> list[str]:
"""Get the list of directories filtered by search text."""
if not self.directory_search:
return self.available_directories
search_lower = self.directory_search.lower()
return [d for d in self.available_directories if search_lower in d.lower()]
@rx.var
def drug_search_result_count(self) -> str:
"""Display count of drugs matching search."""
total = len(self.available_drugs)
filtered = len(self.filtered_drugs)
if not self.drug_search:
return f"{total} drugs"
return f"Showing {filtered} of {total} drugs"
@rx.var
def trust_search_result_count(self) -> str:
"""Display count of trusts matching search."""
total = len(self.available_trusts)
filtered = len(self.filtered_trusts)
if not self.trust_search:
return f"{total} trusts"
return f"Showing {filtered} of {total} trusts"
@rx.var
def directory_search_result_count(self) -> str:
"""Display count of directories matching search."""
total = len(self.available_directories)
filtered = len(self.filtered_directories)
if not self.directory_search:
return f"{total} directories"
return f"Showing {filtered} of {total} directories"
# Analysis methods
def run_analysis(self):
"""
Run the patient pathway analysis with current filter settings.
This is an async generator that yields state updates for progress indication.
Uses the existing analysis pipeline from tools/dashboard_gui.py.
"""
# Validate filters first
errors = self.validate_filters()
if errors:
self.error_message = "Validation errors:\n" + "\n".join(errors)
return
self.analysis_running = True
self.error_message = ""
self.status_message = "Starting analysis..."
self.has_chart = False
yield # Update UI to show running state
try:
# Import analysis modules
from core import AnalysisFilters, PathConfig, default_paths
from data_processing.data_source import get_data
from tools.dashboard_gui import generate_graph
# Get the data using fallback chain (cache -> Snowflake -> SQLite -> file)
self.status_message = "Loading patient data..."
yield
# Build filter parameters
trusts = self.selected_trusts if self.selected_trusts else self.available_trusts
drugs = self.selected_drugs if self.selected_drugs else self.available_drugs
directories = self.selected_directories if self.selected_directories else self.available_directories
# Get data from the data source manager
result = get_data(
start_date=self.start_date,
end_date=self.end_date,
trusts=trusts,
drugs=drugs,
directories=directories,
)
if result.df is None or len(result.df) == 0:
self.error_message = "No data available. Please check your data source configuration."
self.analysis_running = False
return
self.data_source = result.source_type.value
self.data_row_count = len(result.df)
self.status_message = f"Loaded {self.data_row_count:,} rows from {self.data_source}"
yield
# Create AnalysisFilters object for generate_graph
self.status_message = "Processing pathways..."
yield
# Generate the chart data (without writing to file)
# We'll create the figure data directly instead of calling generate_graph
# which writes to file and opens browser
fig_data = self._generate_chart_data(
df=result.df,
trusts=trusts,
drugs=drugs,
directories=directories,
)
if fig_data is not None:
self.chart_data = fig_data
self.has_chart = True
self.status_message = f"Analysis complete! Showing {self.data_row_count:,} interventions."
else:
self.error_message = "No data found matching the selected filters."
self.has_chart = False
except Exception as e:
self.error_message = f"Analysis failed: {str(e)}\n\n{traceback.format_exc()}"
self.has_chart = False
finally:
self.analysis_running = False
yield # Final UI update
def _generate_chart_data(
self,
df: pd.DataFrame,
trusts: list[str],
drugs: list[str],
directories: list[str],
) -> Optional[go.Figure]:
"""
Generate Plotly chart data from processed DataFrame.
This replicates the core logic of generate_graph() and figure() but
returns the figure dict instead of writing to file and opening browser.
This is a workaround to avoid modifying generate_graph() internals
(which is deferred to Phase 5).
"""
from core import default_paths
# Use the org_codes mapping
org_codes = pd.read_csv(default_paths.org_codes_csv, index_col=1)
# Make a copy to avoid modifying original
df1 = df.copy()
# Create UPID + Treatment column for deduplication
df1["UPIDTreatment"] = df1["UPID"] + df1["Drug Name"]
# Map provider codes to names
df1["Provider Code"] = df1["Provider Code"].map(org_codes["Name"])
# Apply filters
df1 = df1[
(df1["Provider Code"].isin(trusts)) &
(df1["Drug Name"].isin(drugs)) &
(df1["Directory"].isin(directories))
]
if len(df1) == 0:
return None
# Apply date filters
df1 = df1[
(df1["Intervention Date"] >= self.start_date) &
(df1["Intervention Date"] <= self.end_date)
]
if len(df1) == 0:
return None
# Add indication validation columns (if enabled and Snowflake available)
df1 = self._add_indication_validation(df1)
# Store filtered data for CSV export (now includes indication columns)
self._analysis_data = df1.copy()
# Build a simplified hierarchy for the icicle chart
# Group by Trust -> Directory -> Drug to get patient counts
hierarchy_data = self._build_hierarchy(df1, org_codes)
if hierarchy_data.empty:
return None
# Apply minimum patients filter
hierarchy_data = hierarchy_data[hierarchy_data['value'] >= self.minimum_patients]
if hierarchy_data.empty:
return None
# Create the Plotly icicle figure
fig = go.Figure(go.Icicle(
labels=hierarchy_data['labels'].tolist(),
ids=hierarchy_data['ids'].tolist(),
parents=hierarchy_data['parents'].tolist(),
values=hierarchy_data['value'].tolist(),
branchvalues="total",
marker=dict(
colors=hierarchy_data['colour'].tolist() if 'colour' in hierarchy_data.columns else None,
colorscale='Viridis',
),
maxdepth=3,
texttemplate='<b>%{label}</b><br>Patients: %{value}',
hovertemplate='<b>%{label}</b><br>Patients: %{value}<extra></extra>',
))
# Set chart title
title_text = self.custom_title if self.custom_title else f"Patients initiated {self.start_date} to {self.end_date}"
fig.update_layout(
margin=dict(t=60, l=1, r=1, b=60),
title=f"Norfolk & Waveney ICS High-Cost Drug Patient Pathways - {title_text}",
title_x=0.5,
hoverlabel=dict(font_size=16),
)
# Return figure for rx.plotly()
return fig
def _build_hierarchy(self, df: pd.DataFrame, org_codes: pd.DataFrame) -> pd.DataFrame:
"""
Build a hierarchical dataframe for icicle chart.
Creates Trust -> Directory -> Drug hierarchy with patient counts.
"""
# Create directory mapping from UPID
directory_df = df[["UPID", "Directory"]].drop_duplicates("UPID").set_index("UPID")
# Get unique patients per drug
patient_drugs = df[["UPID", "Drug Name", "Provider Code", "Directory"]].drop_duplicates(subset=["UPID", "Drug Name"])
# Build hierarchy: Trust -> Directory -> Drug
rows = []
# Root node
total_patients = patient_drugs["UPID"].nunique()
rows.append({
'parents': '',
'ids': 'N&WICS',
'labels': 'N&WICS',
'value': total_patients,
'colour': 1.0,
})
# Trust level
trust_counts = patient_drugs.groupby("Provider Code")["UPID"].nunique().reset_index()
trust_counts.columns = ["trust", "count"]
for _, row in trust_counts.iterrows():
trust = row["trust"]
if pd.isna(trust):
continue
rows.append({
'parents': 'N&WICS',
'ids': f'N&WICS - {trust}',
'labels': trust,
'value': row["count"],
'colour': row["count"] / total_patients,
})
# Directory level (under each trust)
trust_dir_counts = patient_drugs.groupby(["Provider Code", "Directory"])["UPID"].nunique().reset_index()
trust_dir_counts.columns = ["trust", "directory", "count"]
for _, row in trust_dir_counts.iterrows():
trust = row["trust"]
directory = row["directory"]
if pd.isna(trust) or pd.isna(directory):
continue
trust_total = trust_counts[trust_counts["trust"] == trust]["count"].values
trust_total = trust_total[0] if len(trust_total) > 0 else 1
rows.append({
'parents': f'N&WICS - {trust}',
'ids': f'N&WICS - {trust} - {directory}',
'labels': directory,
'value': row["count"],
'colour': row["count"] / trust_total,
})
# Drug level (under each trust-directory)
trust_dir_drug_counts = patient_drugs.groupby(["Provider Code", "Directory", "Drug Name"])["UPID"].nunique().reset_index()
trust_dir_drug_counts.columns = ["trust", "directory", "drug", "count"]
for _, row in trust_dir_drug_counts.iterrows():
trust = row["trust"]
directory = row["directory"]
drug = row["drug"]
if pd.isna(trust) or pd.isna(directory) or pd.isna(drug):
continue
dir_total = trust_dir_counts[
(trust_dir_counts["trust"] == trust) &
(trust_dir_counts["directory"] == directory)
]["count"].values
dir_total = dir_total[0] if len(dir_total) > 0 else 1
rows.append({
'parents': f'N&WICS - {trust} - {directory}',
'ids': f'N&WICS - {trust} - {directory} - {drug}',
'labels': drug,
'value': row["count"],
'colour': row["count"] / dir_total,
})
return pd.DataFrame(rows)
def _add_indication_validation(self, df: pd.DataFrame) -> pd.DataFrame:
"""
Add indication validation columns to the DataFrame.
Adds columns:
- Indication_Valid: Boolean indicating if patient has valid GP diagnosis
- Indication_Source: "GP_SNOMED" | "NONE" | "NOT_CHECKED"
- Indication_Cluster: The matched SNOMED cluster ID (if any)
This requires Snowflake connectivity for GP record lookups.
If Snowflake is not available, columns are added with "NOT_CHECKED" status.
"""
# Initialize columns with default values
df = df.copy()
df["Indication_Valid"] = False
df["Indication_Source"] = "NOT_CHECKED"
df["Indication_Cluster"] = ""
# Check if indication validation is enabled and Snowflake is available
if not self.indication_validation_enabled:
return df
try:
from data_processing.snowflake_connector import (
is_snowflake_available,
is_snowflake_configured,
get_connector,
)
from data_processing.diagnosis_lookup import (
get_drug_cluster_ids,
patient_has_indication,
)
if not is_snowflake_available() or not is_snowflake_configured():
# Snowflake not available - can't validate indications
self.indication_validation_summary = "Indication validation skipped (Snowflake not configured)"
return df
self.indication_validation_running = True
# Get unique patient-drug pairs
patient_drug_pairs = df[["UPID", "Drug Name"]].drop_duplicates()
total_pairs = len(patient_drug_pairs)
# Cache drug clusters to avoid repeated lookups
drug_clusters_cache = {}
# Track results for summary
validation_results = {} # drug -> {total, matched}
connector = get_connector()
for idx, (_, row) in enumerate(patient_drug_pairs.iterrows()):
upid = row["UPID"]
drug_name = row["Drug Name"]
# Get drug clusters (cached)
drug_upper = drug_name.upper() if drug_name else ""
if drug_upper not in drug_clusters_cache:
drug_clusters_cache[drug_upper] = get_drug_cluster_ids(drug_name)
cluster_ids = drug_clusters_cache[drug_upper]
# Initialize drug in results tracking
if drug_upper not in validation_results:
validation_results[drug_upper] = {"total": 0, "matched": 0, "name": drug_name}
validation_results[drug_upper]["total"] += 1
if not cluster_ids:
# No cluster mapping for this drug - mark as NONE
mask = (df["UPID"] == upid) & (df["Drug Name"] == drug_name)
df.loc[mask, "Indication_Source"] = "NONE"
continue
# Check patient indication in GP records
# Note: We use the UPID as patient identifier - this may need mapping to pseudonymised NHS number
# For now, assume UPID can be used directly or is already the pseudonymised ID
has_indication, matched_cluster, _, _ = patient_has_indication(
patient_pseudonym=upid,
cluster_ids=cluster_ids,
connector=connector,
)
# Update dataframe for this patient-drug combination
mask = (df["UPID"] == upid) & (df["Drug Name"] == drug_name)
df.loc[mask, "Indication_Valid"] = has_indication
df.loc[mask, "Indication_Source"] = "GP_SNOMED" if has_indication else "NONE"
if matched_cluster:
df.loc[mask, "Indication_Cluster"] = matched_cluster
if has_indication:
validation_results[drug_upper]["matched"] += 1
# Store validation results and create summary
self.indication_validation_results = {
drug: {
"drug_name": data["name"],
"total_patients": data["total"],
"patients_with_indication": data["matched"],
"match_rate": round(data["matched"] / data["total"] * 100, 1) if data["total"] > 0 else 0,
}
for drug, data in validation_results.items()
}
# Create summary text
total_patients = sum(d["total"] for d in validation_results.values())
matched_patients = sum(d["matched"] for d in validation_results.values())
overall_rate = round(matched_patients / total_patients * 100, 1) if total_patients > 0 else 0
self.indication_validation_summary = (
f"GP Indication Validation: {matched_patients}/{total_patients} "
f"({overall_rate}%) patients have valid GP diagnosis"
)
except Exception as e:
self.indication_validation_summary = f"Indication validation error: {str(e)}"
# Don't fail the whole analysis - just leave columns as NOT_CHECKED
finally:
self.indication_validation_running = False
return df
def toggle_indication_validation(self):
"""Toggle indication validation on/off."""
self.indication_validation_enabled = not self.indication_validation_enabled
@rx.var
def indication_validation_status(self) -> str:
"""Get human-readable indication validation status."""
if self.indication_validation_running:
return "Validating patient indications..."
if self.indication_validation_summary:
return self.indication_validation_summary
if self.indication_validation_enabled:
return "Enabled (will check GP records)"
return "Disabled"
@rx.var
def indication_results_list(self) -> list[dict]:
"""
Get indication validation results as a list for display.
Returns list of dicts with: drug_name, total_patients, patients_with_indication, match_rate
Sorted by match rate ascending (worst first) for easy identification of issues.
"""
if not self.indication_validation_results:
return []
results = []
for drug_key, data in self.indication_validation_results.items():
results.append({
"drug_name": data.get("drug_name", drug_key),
"total_patients": data.get("total_patients", 0),
"patients_with_indication": data.get("patients_with_indication", 0),
"match_rate": data.get("match_rate", 0),
})
# Sort by match rate ascending (lowest first to highlight issues)
results.sort(key=lambda x: x["match_rate"])
return results
@rx.var
def has_indication_results(self) -> bool:
"""Check if there are indication validation results to display."""
return len(self.indication_validation_results) > 0
def export_chart_html(self):
"""
Export the current chart as an interactive HTML file.
The file is saved to data/exports/ directory with a timestamped filename.
"""
if not self.has_chart:
self.export_error = "No chart to export. Please run analysis first."
return
self.export_error = ""
self.export_message = ""
try:
from datetime import datetime
# Create exports directory
export_dir = Path("data/exports")
export_dir.mkdir(parents=True, exist_ok=True)
# Generate filename with timestamp
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
filename = f"pathway_chart_{timestamp}.html"
filepath = export_dir / filename
# Export the chart to HTML
self.chart_data.write_html(
str(filepath),
include_plotlyjs=True,
full_html=True,
)
self.last_export_path = str(filepath)
self.export_message = f"Chart exported to {filename}"
except Exception as e:
self.export_error = f"Export failed: {str(e)}"
def export_data_csv(self):
"""
Export the underlying analysis data as a CSV file.
The file is saved to data/exports/ directory with a timestamped filename.
"""
if self._analysis_data is None or len(self._analysis_data) == 0:
self.export_error = "No data to export. Please run analysis first."
return
self.export_error = ""
self.export_message = ""
try:
from datetime import datetime
# Create exports directory
export_dir = Path("data/exports")
export_dir.mkdir(parents=True, exist_ok=True)
# Generate filename with timestamp
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
filename = f"pathway_data_{timestamp}.csv"
filepath = export_dir / filename
# Export the data to CSV
self._analysis_data.to_csv(filepath, index=False)
self.last_export_path = str(filepath)
self.export_message = f"Data exported to {filename}"
except Exception as e:
self.export_error = f"Export failed: {str(e)}"
def clear_export_messages(self):
"""Clear export status messages."""
self.export_message = ""
self.export_error = ""
# =============================================================================
# Page Components
# =============================================================================
def info_card(title: str, value: str, icon: str) -> rx.Component:
"""Create an info card showing a statistic."""
return rx.box(
rx.vstack(
rx.hstack(
rx.icon(icon, size=20, color=NHS_BLUE),
rx.text(title, size="2", color="gray"),
spacing="2",
align="center",
),
rx.text(value, size="5", weight="bold"),
spacing="1",
align="start",
),
padding="16px",
background="white",
border_radius="8px",
border="1px solid rgb(229, 231, 235)",
width="100%",
)
def date_input(label: str, value: rx.Var, on_change, help_text: str = "", input_id: str = "") -> rx.Component:
"""Create a labeled date input component with accessibility support."""
# Generate a unique ID if not provided
label_id = f"{input_id}-label" if input_id else ""
help_id = f"{input_id}-help" if input_id else ""
return rx.vstack(
rx.el.label(
label,
html_for=input_id,
font_size="14px",
font_weight="500",
color=NHS_DARK_BLUE,
),
rx.input(
type="date",
value=value,
on_change=on_change,
width="100%",
id=input_id,
aria_describedby=help_id if help_text else "",
),
rx.cond(
help_text != "",
rx.text(help_text, size="1", color="gray", id=help_id),
),
spacing="1",
align="start",
width="100%",
)
def data_source_selector() -> rx.Component:
"""Data source selector with file upload, SQLite, and Snowflake options."""
return rx.box(
rx.vstack(
rx.heading("Data Source", size="5", color=NHS_DARK_BLUE),
rx.text(
"Select where to load patient data from",
size="2",
color="gray",
),
rx.divider(margin_y="8px"),
# Current data source display
rx.hstack(
rx.text("Current source:", weight="medium"),
rx.badge(
State.data_source_display,
color_scheme=rx.cond(
State.data_source == "sqlite",
"green",
rx.cond(
State.data_source == "snowflake",
"blue",
"gray",
),
),
size="2",
),
spacing="2",
align="center",
),
rx.divider(margin_y="8px"),
# Data source options
rx.vstack(
# SQLite option
rx.box(
rx.hstack(
rx.icon("database", size=20, color=NHS_BLUE),
rx.vstack(
rx.hstack(
rx.text("SQLite Database", weight="medium"),
rx.cond(
State.sqlite_available,
rx.badge("Available", color_scheme="green", size="1"),
rx.badge("No data", color_scheme="gray", size="1"),
),
spacing="2",
),
rx.cond(
State.sqlite_available,
rx.text(
f"Contains pre-loaded patient data",
size="1",
color="gray",
),
rx.text(
"Run data migration to populate",
size="1",
color="gray",
),
),
spacing="1",
align="start",
),
rx.spacer(),
rx.button(
"Use SQLite",
on_click=State.use_sqlite_source,
variant=rx.cond(State.data_source == "sqlite", "solid", "outline"),
color_scheme="green",
size="2",
disabled=~State.sqlite_available,
),
spacing="3",
align="center",
width="100%",
),
padding="12px",
background=rx.cond(
State.data_source == "sqlite",
"rgba(0, 94, 184, 0.05)",
"transparent",
),
border_radius="6px",
border=rx.cond(
State.data_source == "sqlite",
"1px solid rgb(0, 94, 184)",
"1px solid transparent",
),
width="100%",
),
# File upload option
rx.box(
rx.vstack(
rx.hstack(
rx.icon("upload", size=20, color=NHS_BLUE),
rx.vstack(
rx.hstack(
rx.text("Upload File", weight="medium"),
rx.cond(
State.file_upload_success,
rx.badge(State.file_size_display, color_scheme="green", size="1"),
),
spacing="2",
),
rx.text(
"Upload CSV or Parquet file",
size="1",
color="gray",
),
spacing="1",
align="start",
),
rx.spacer(),
rx.cond(
State.file_upload_success,
rx.hstack(
rx.button(
"Use File",
on_click=State.use_file_source,
variant=rx.cond(State.data_source == "file", "solid", "outline"),
color_scheme="blue",
size="2",
),
rx.button(
rx.icon("x", size=14),
on_click=State.clear_uploaded_file,
variant="ghost",
color_scheme="red",
size="1",
),
spacing="1",
),
),
spacing="3",
align="center",
width="100%",
),
rx.cond(
State.file_upload_success,
rx.text(
State.uploaded_file_name,
size="2",
color=NHS_BLUE,
font_family="monospace",
),
rx.upload(
rx.vstack(
rx.cond(
State.file_processing,
rx.spinner(size="2"),
rx.icon("file-up", size=24, color="gray"),
),
rx.text(
"Drag & drop or click to browse",
size="2",
color="gray",
),
rx.text(
"Supports CSV, Parquet",
size="1",
color="gray",
),
spacing="2",
align="center",
padding="16px",
),
id="file_upload",
accept={
"text/csv": [".csv"],
"application/octet-stream": [".parquet", ".pq"],
},
max_files=1,
border="1px dashed rgb(200, 200, 200)",
border_radius="6px",
padding="4px",
width="100%",
on_drop=State.handle_file_upload(rx.upload_files(upload_id="file_upload")),
),
),
rx.cond(
State.file_upload_error != "",
rx.text(
State.file_upload_error,
size="2",
color="red",
),
),
spacing="2",
width="100%",
),
padding="12px",
background=rx.cond(
(State.data_source == "file") & State.file_upload_success,
"rgba(0, 94, 184, 0.05)",
"transparent",
),
border_radius="6px",
border=rx.cond(
(State.data_source == "file") & State.file_upload_success,
"1px solid rgb(0, 94, 184)",
"1px solid transparent",
),
width="100%",
),
# Snowflake option
rx.box(
rx.hstack(
rx.icon("cloud", size=20, color=NHS_BLUE),
rx.vstack(
rx.hstack(
rx.text("Snowflake", weight="medium"),
rx.cond(
State.snowflake_configured,
rx.badge("Configured", color_scheme="blue", size="1"),
rx.badge("Not configured", color_scheme="gray", size="1"),
),
spacing="2",
),
rx.text(
"Query live data from Snowflake",
size="1",
color="gray",
),
spacing="1",
align="start",
),
rx.spacer(),
rx.button(
"Use Snowflake",
on_click=State.use_snowflake_source,
variant=rx.cond(State.data_source == "snowflake", "solid", "outline"),
color_scheme="blue",
size="2",
disabled=~State.snowflake_configured,
),
spacing="3",
align="center",
width="100%",
),
padding="12px",
background=rx.cond(
State.data_source == "snowflake",
"rgba(0, 94, 184, 0.05)",
"transparent",
),
border_radius="6px",
border=rx.cond(
State.data_source == "snowflake",
"1px solid rgb(0, 94, 184)",
"1px solid transparent",
),
width="100%",
),
spacing="2",
width="100%",
),
spacing="3",
align="start",
width="100%",
),
padding="20px",
background="white",
border_radius="8px",
border="1px solid rgb(229, 231, 235)",
width="100%",
)
def filter_controls() -> rx.Component:
"""Filter controls section with date pickers, minimum patients, and custom title."""
return rx.box(
rx.vstack(
rx.heading("Analysis Settings", size="5", color=NHS_DARK_BLUE, id="analysis-settings-heading"),
# Date range row
rx.hstack(
date_input(
"Start Date",
State.start_date,
State.set_start_date,
"Include patients initiated from this date",
input_id="start-date",
),
date_input(
"End Date",
State.end_date,
State.set_end_date,
"Include patients initiated until this date",
input_id="end-date",
),
date_input(
"Last Seen After",
State.last_seen_date,
State.set_last_seen_date,
"Only include patients seen after this date",
input_id="last-seen-date",
),
spacing="4",
width="100%",
flex_wrap="wrap",
role="group",
aria_label="Date range filters",
),
rx.divider(margin_y="12px"),
# Additional settings row
rx.hstack(
# Minimum patients
rx.vstack(
rx.el.label(
"Minimum Patients",
html_for="min-patients",
font_size="14px",
font_weight="500",
color=NHS_DARK_BLUE,
),
rx.hstack(
rx.input(
type="number",
value=State.minimum_patients.to_string(),
on_change=State.set_minimum_patients_from_input,
min="0",
max="1000",
width="100px",
id="min-patients",
aria_describedby="min-patients-help",
),
rx.slider(
value=[State.minimum_patients],
on_change=State.set_minimum_patients_from_slider,
min=0,
max=100,
step=1,
width="150px",
aria_label="Minimum patients slider",
),
spacing="3",
align="center",
),
rx.text(
"Hide pathways with fewer patients",
size="1",
color="gray",
id="min-patients-help",
),
spacing="1",
align="start",
),
# Custom title
rx.vstack(
rx.el.label(
"Custom Title (Optional)",
html_for="custom-title",
font_size="14px",
font_weight="500",
color=NHS_DARK_BLUE,
),
rx.input(
placeholder="Leave empty for auto-generated title",
value=State.custom_title,
on_change=State.set_custom_title,
width="300px",
id="custom-title",
aria_describedby="custom-title-help",
),
rx.text(
"Override the default chart title",
size="1",
color="gray",
id="custom-title-help",
),
spacing="1",
align="start",
),
spacing="6",
width="100%",
flex_wrap="wrap",
align="start",
),
spacing="4",
align="start",
width="100%",
),
padding="20px",
background="white",
border_radius="8px",
border="1px solid rgb(229, 231, 235)",
width="100%",
role="region",
aria_labelledby="analysis-settings-heading",
)
def indication_result_row(result: dict) -> rx.Component:
"""Render a single row in the indication validation results table."""
match_rate = result["match_rate"]
# Color code: green for high match rates, amber for moderate, red for low
# Use .to(int) to cast Reflex Var for comparison (rx.foreach items are Vars)
rate_color = rx.cond(
match_rate.to(int) >= 80,
"green",
rx.cond(match_rate.to(int) >= 50, "orange", "red"),
)
return rx.table.row(
rx.table.cell(rx.text(result["drug_name"], weight="medium")),
rx.table.cell(result["total_patients"].to_string()),
rx.table.cell(result["patients_with_indication"].to_string()),
rx.table.cell(
rx.hstack(
rx.progress(
value=match_rate,
max=100,
width="60px",
height="8px",
color_scheme=rate_color,
),
rx.text(
match_rate.to_string() + "%",
size="2",
color=rate_color,
weight="medium",
),
spacing="2",
align="center",
)
),
)
def indication_validation_summary() -> rx.Component:
"""
Component to display indication validation results per drug.
Shows a collapsible section with a table of per-drug match rates,
helping users identify which drugs have good vs poor GP diagnosis coverage.
"""
return rx.cond(
State.has_indication_results,
rx.el.section(
rx.vstack(
# Header with overall summary
rx.hstack(
rx.hstack(
rx.icon("clipboard-check", size=20, color=NHS_DARK_BLUE, aria_hidden="true"),
rx.heading(
"GP Indication Validation Results",
size="5",
color=NHS_DARK_BLUE,
id="indication-results-heading",
),
spacing="2",
align="center",
),
rx.spacer(),
rx.badge(
State.indication_validation_summary,
color_scheme="blue",
size="2",
),
width="100%",
align="center",
),
rx.text(
"Shows the percentage of patients with valid GP diagnoses matching their prescribed drug's indication. "
"Lower rates may indicate prescribing for off-label use, data quality issues, or patients treated across multiple providers.",
size="2",
color="gray",
),
# Results table
rx.table.root(
rx.table.header(
rx.table.row(
rx.table.column_header_cell("Drug Name"),
rx.table.column_header_cell("Total Patients"),
rx.table.column_header_cell("With GP Indication"),
rx.table.column_header_cell("Match Rate"),
),
),
rx.table.body(
rx.foreach(State.indication_results_list, indication_result_row)
),
width="100%",
size="2",
),
# Legend
rx.hstack(
rx.text("Legend:", size="1", color="gray", weight="medium"),
rx.hstack(
rx.badge("80%+", color_scheme="green", size="1"),
rx.text("Good coverage", size="1", color="gray"),
spacing="1",
align="center",
),
rx.hstack(
rx.badge("50-79%", color_scheme="orange", size="1"),
rx.text("Moderate", size="1", color="gray"),
spacing="1",
align="center",
),
rx.hstack(
rx.badge("<50%", color_scheme="red", size="1"),
rx.text("Low coverage", size="1", color="gray"),
spacing="1",
align="center",
),
spacing="4",
flex_wrap="wrap",
),
spacing="3",
width="100%",
align="start",
),
padding="20px",
background="white",
border_radius="8px",
border="1px solid rgb(229, 231, 235)",
width="100%",
aria_labelledby="indication-results-heading",
),
)
def home_content() -> rx.Component:
"""Home page content with filter configuration and analysis controls."""
return rx.vstack(
# Hero section
rx.box(
rx.vstack(
rx.image(
src="/logo.png",
height="60px",
alt="NHS Logo",
),
rx.heading(
"Patient Pathway Analysis",
size="8",
color=NHS_DARK_BLUE,
),
rx.text(
"Analyze secondary care treatment pathways for high-cost drugs",
size="4",
color="gray",
),
spacing="3",
align="center",
),
padding="32px",
background="white",
border_radius="12px",
border="1px solid rgb(229, 231, 235)",
width="100%",
text_align="center",
),
# Status cards
rx.hstack(
info_card("Drugs Loaded", State.drug_selection_count, "pill"),
info_card("Trusts", State.trust_selection_count, "building"),
info_card("Directories", State.directory_selection_count, "folder"),
spacing="4",
width="100%",
flex_wrap="wrap",
),
# Data source selector
data_source_selector(),
# Filter controls (date pickers, minimum patients, custom title)
filter_controls(),
# Filter summary
rx.box(
rx.vstack(
rx.heading("Current Filter Settings", size="4", color=NHS_DARK_BLUE),
rx.text(
State.filter_summary,
white_space="pre-wrap",
font_family="monospace",
font_size="13px",
color="gray",
),
spacing="2",
align="start",
width="100%",
),
padding="20px",
background="white",
border_radius="8px",
border="1px solid rgb(229, 231, 235)",
width="100%",
),
# Action buttons
rx.hstack(
rx.button(
rx.icon("database", size=16, aria_hidden="true"),
"Load Reference Data",
on_click=State.load_reference_data,
color_scheme="blue",
size="3",
disabled=State.analysis_running,
aria_label="Load reference data from CSV files",
),
rx.button(
rx.cond(
State.analysis_running,
rx.hstack(
rx.spinner(size="1"),
rx.text("Running..."),
spacing="2",
align="center",
),
rx.hstack(
rx.icon("play", size=16, aria_hidden="true"),
rx.text("Run Analysis"),
spacing="2",
align="center",
),
),
on_click=State.run_analysis,
color_scheme="green",
size="3",
disabled=State.analysis_running,
aria_label="Run patient pathway analysis",
aria_busy=State.analysis_running,
),
spacing="3",
role="toolbar",
aria_label="Analysis actions",
),
# Messages with live regions for screen readers
rx.cond(
State.status_message != "",
rx.callout(
State.status_message,
icon="info",
color="blue",
role="status",
aria_live="polite",
),
),
rx.cond(
State.error_message != "",
rx.callout(
State.error_message,
icon="triangle-alert",
color="red",
role="alert",
aria_live="assertive",
),
),
# Chart display
rx.cond(
State.has_chart,
rx.el.section(
rx.vstack(
rx.hstack(
rx.heading("Patient Pathway Chart", size="5", color=NHS_DARK_BLUE, id="chart-heading"),
rx.spacer(),
rx.hstack(
rx.button(
rx.icon("download", size=14, aria_hidden="true"),
"Export HTML",
on_click=State.export_chart_html,
variant="outline",
size="2",
aria_label="Export chart as interactive HTML file",
),
rx.button(
rx.icon("file-spreadsheet", size=14, aria_hidden="true"),
"Export CSV",
on_click=State.export_data_csv,
variant="outline",
size="2",
aria_label="Export data as CSV spreadsheet",
),
spacing="2",
role="toolbar",
aria_label="Export options",
),
width="100%",
align="center",
),
rx.text(
"Click on sections to zoom in. Use the toolbar for additional options.",
size="2",
color="gray",
),
# Export messages
rx.cond(
State.export_message != "",
rx.callout(
State.export_message,
icon="check",
color="green",
role="status",
aria_live="polite",
),
),
rx.cond(
State.export_error != "",
rx.callout(
State.export_error,
icon="triangle-alert",
color="red",
role="alert",
),
),
rx.el.figure(
rx.plotly(data=State.chart_data),
aria_label="Interactive patient pathway icicle chart showing treatment hierarchy",
),
spacing="3",
width="100%",
),
padding="20px",
background="white",
border_radius="8px",
border="1px solid rgb(229, 231, 235)",
width="100%",
aria_labelledby="chart-heading",
),
),
# Indication validation results (shown after chart)
indication_validation_summary(),
spacing="5",
width="100%",
align="start",
)
def selection_page_content(
title: str,
description: str,
items: rx.Var,
selected_items: rx.Var,
toggle_handler,
select_all_handler,
clear_handler,
count_text: rx.Var,
search_value: rx.Var,
search_handler,
clear_search_handler,
search_result_text: rx.Var,
extra_buttons: list[rx.Component] = None,
page_id: str = "selection",
) -> rx.Component:
"""Generic selection page content for drugs, trusts, directories with search and accessibility."""
heading_id = f"{page_id}-heading"
search_id = f"{page_id}-search"
list_id = f"{page_id}-list"
buttons = [
rx.button(
"Select All",
on_click=select_all_handler,
variant="outline",
size="2",
aria_label=f"Select all {title.lower()}",
),
rx.button(
"Clear All",
on_click=clear_handler,
variant="outline",
size="2",
aria_label=f"Clear all {title.lower()} selections",
),
]
if extra_buttons:
buttons.extend(extra_buttons)
return rx.vstack(
# Header
rx.el.header(
rx.vstack(
rx.heading(title, size="6", color=NHS_DARK_BLUE, id=heading_id),
rx.text(description, color="gray"),
rx.el.div(
count_text,
font_weight="500",
color=NHS_BLUE,
aria_live="polite",
aria_atomic="true",
),
spacing="2",
align="start",
),
padding="20px",
background="white",
border_radius="8px",
border="1px solid rgb(229, 231, 235)",
width="100%",
),
# Search input
rx.box(
rx.hstack(
rx.icon("search", size=16, color="gray", aria_hidden="true"),
rx.input(
placeholder=f"Search {title.lower()}...",
value=search_value,
on_change=search_handler,
width="100%",
id=search_id,
aria_label=f"Search {title.lower()}",
aria_controls=list_id,
),
rx.cond(
search_value != "",
rx.button(
rx.icon("x", size=14, aria_hidden="true"),
on_click=clear_search_handler,
variant="ghost",
color_scheme="gray",
size="1",
aria_label="Clear search",
),
),
spacing="2",
align="center",
width="100%",
),
padding="12px 16px",
background="white",
border_radius="8px",
border="1px solid rgb(229, 231, 235)",
width="100%",
role="search",
),
# Action buttons and search result count
rx.hstack(
rx.hstack(*buttons, spacing="2", role="toolbar", aria_label="Selection actions"),
rx.spacer(),
rx.el.div(
search_result_text,
font_size="14px",
color="gray",
aria_live="polite",
),
spacing="3",
width="100%",
align="center",
),
# Selection grid
rx.box(
rx.vstack(
rx.foreach(
items,
lambda item: rx.box(
rx.checkbox(
item,
checked=selected_items.contains(item),
on_change=lambda: toggle_handler(item),
size="2",
),
padding="8px 12px",
background=rx.cond(
selected_items.contains(item),
"rgba(0, 94, 184, 0.1)",
"transparent",
),
border_radius="4px",
width="100%",
),
),
spacing="1",
width="100%",
max_height="500px",
overflow_y="auto",
id=list_id,
role="group",
aria_labelledby=heading_id,
),
padding="16px",
background="white",
border_radius="8px",
border="1px solid rgb(229, 231, 235)",
width="100%",
),
spacing="4",
width="100%",
align="start",
)
def drugs_content() -> rx.Component:
"""Drug selection page content."""
return selection_page_content(
title="Drug Selection",
description="Select which high-cost drugs to include in the analysis",
items=State.filtered_drugs,
selected_items=State.selected_drugs,
toggle_handler=State.toggle_drug,
select_all_handler=State.select_all_drugs,
clear_handler=State.clear_drugs,
count_text=State.drug_selection_count,
search_value=State.drug_search,
search_handler=State.set_drug_search,
clear_search_handler=State.clear_drug_search,
search_result_text=State.drug_search_result_count,
extra_buttons=[
rx.button(
"Select Defaults",
on_click=State.select_default_drugs,
variant="outline",
size="2",
aria_label="Select default drugs (Include=1)",
),
],
page_id="drugs",
)
def trusts_content() -> rx.Component:
"""Trust selection page content."""
return selection_page_content(
title="Trust Selection",
description="Select NHS trusts to include (leave empty for all trusts)",
items=State.filtered_trusts,
selected_items=State.selected_trusts,
toggle_handler=State.toggle_trust,
select_all_handler=State.select_all_trusts,
clear_handler=State.clear_trusts,
count_text=State.trust_selection_count,
search_value=State.trust_search,
search_handler=State.set_trust_search,
clear_search_handler=State.clear_trust_search,
search_result_text=State.trust_search_result_count,
page_id="trusts",
)
def directories_content() -> rx.Component:
"""Directory selection page content."""
return selection_page_content(
title="Directory Selection",
description="Select medical directories/specialties to include (leave empty for all)",
items=State.filtered_directories,
selected_items=State.selected_directories,
toggle_handler=State.toggle_directory,
select_all_handler=State.select_all_directories,
clear_handler=State.clear_directories,
count_text=State.directory_selection_count,
search_value=State.directory_search,
search_handler=State.set_directory_search,
clear_search_handler=State.clear_directory_search,
search_result_text=State.directory_search_result_count,
page_id="directories",
)
# =============================================================================
# Page Definitions
# =============================================================================
def index() -> rx.Component:
"""Home page."""
return main_layout(
content_area(home_content(), page_title="Home"),
current_page="home",
)
def drugs_page() -> rx.Component:
"""Drug selection page."""
return main_layout(
content_area(drugs_content(), page_title=""),
current_page="drugs",
)
def trusts_page() -> rx.Component:
"""Trust selection page."""
return main_layout(
content_area(trusts_content(), page_title=""),
current_page="trusts",
)
def directories_page() -> rx.Component:
"""Directory selection page."""
return main_layout(
content_area(directories_content(), page_title=""),
current_page="directories",
)
# =============================================================================
# App Configuration
# =============================================================================
app = rx.App(
theme=rx.theme(
accent_color="blue",
gray_color="slate",
radius="medium",
),
)
# Add pages
app.add_page(index, route="/", title="Home | NHS HCD Analysis")
app.add_page(drugs_page, route="/drugs", title="Drug Selection | NHS HCD Analysis")
app.add_page(trusts_page, route="/trusts", title="Trust Selection | NHS HCD Analysis")
app.add_page(directories_page, route="/directories", title="Directory Selection | NHS HCD Analysis")