""" NHS High-Cost Drug Patient Pathway Analysis Tool - Reflex Application. This is the main Reflex application module containing state management and page components for the pathway analysis tool. """ import reflex as rx from datetime import date, timedelta from typing import Optional import pandas as pd import numpy as np from pathlib import Path import plotly.graph_objects as go import traceback import os from rxconfig import config from pathways_app.components.layout import main_layout, content_area # NHS Color constants NHS_BLUE = "rgb(0, 94, 184)" NHS_DARK_BLUE = "rgb(0, 48, 135)" # Supported file extensions SUPPORTED_EXTENSIONS = [".csv", ".parquet", ".pq"] class State(rx.State): """ Application state for the NHS High-Cost Drug Patient Pathway Analysis Tool. Manages all filter variables, reference data, and analysis state. This corresponds to the AnalysisFilters dataclass in core/models.py but is adapted for Reflex's reactive state system. """ # Date filter state start_date: str = "" # ISO format YYYY-MM-DD end_date: str = "" last_seen_date: str = "" # Selection filters (list of selected items) selected_trusts: list[str] = [] selected_drugs: list[str] = [] selected_directories: list[str] = [] # Analysis parameters minimum_patients: int = 0 custom_title: str = "" # Reference data (available options loaded from CSV/SQLite) available_trusts: list[str] = [] available_drugs: list[str] = [] available_directories: list[str] = [] # Drug default selections (Include=1 in include.csv) default_drugs: list[str] = [] # Analysis state analysis_running: bool = False status_message: str = "" error_message: str = "" # Chart state - the Plotly figure chart_data: go.Figure = go.Figure() has_chart: bool = False # Data source state data_file_path: str = "" data_source: str = "file" # "file", "sqlite", "snowflake" data_loaded: bool = False data_row_count: int = 0 # Snowflake connection state snowflake_available: bool = False snowflake_configured: bool = False snowflake_connected: bool = False # File upload state uploaded_file_name: str = "" uploaded_file_size: int = 0 # bytes file_upload_error: str = "" file_upload_success: bool = False file_processing: bool = False # SQLite database state sqlite_available: bool = False sqlite_row_count: int = 0 sqlite_patient_count: int = 0 # Search/filter state for selection pages drug_search: str = "" trust_search: str = "" directory_search: str = "" # Export state last_export_path: str = "" export_message: str = "" export_error: str = "" # Indication validation state indication_validation_enabled: bool = True indication_validation_running: bool = False indication_validation_results: dict = {} # drug_name -> {total, matched, rate} indication_validation_summary: str = "" # Store the underlying data for export _analysis_data: pd.DataFrame = pd.DataFrame() def _set_default_dates(self): """Set default date values based on typical analysis period.""" today = date.today() one_year_ago = today - timedelta(days=365) self.start_date = one_year_ago.isoformat() self.end_date = today.isoformat() self.last_seen_date = one_year_ago.isoformat() def load_reference_data(self): """ Load reference data from CSV files. This loads the available drugs, trusts, and directories that can be selected in the filters. """ data_dir = Path("data") # Load drugs from include.csv try: drugs_df = pd.read_csv(data_dir / "include.csv") self.available_drugs = sorted(drugs_df.iloc[:, 0].astype(str).tolist()) # Get default selections (Include=1) if "Include" in drugs_df.columns: self.default_drugs = drugs_df[drugs_df["Include"] == 1].iloc[:, 0].astype(str).tolist() self.selected_drugs = self.default_drugs.copy() self.status_message = f"Loaded {len(self.available_drugs)} drugs" except Exception as e: self.error_message = f"Failed to load drugs: {e}" # Load trusts from defaultTrusts.csv try: trusts_df = pd.read_csv(data_dir / "defaultTrusts.csv") self.available_trusts = sorted(trusts_df.iloc[:, 0].astype(str).tolist()) # By default, no trusts selected (include all) self.selected_trusts = [] except Exception as e: self.error_message = f"Failed to load trusts: {e}" # Load directories from directory_list.csv try: dirs_df = pd.read_csv(data_dir / "directory_list.csv") self.available_directories = sorted(dirs_df.iloc[:, 0].astype(str).tolist()) # By default, no directories selected (include all) self.selected_directories = [] except Exception as e: self.error_message = f"Failed to load directories: {e}" # Set default dates self._set_default_dates() # Check Snowflake availability try: from data_processing.snowflake_connector import is_snowflake_available, is_snowflake_configured self.snowflake_available = is_snowflake_available() self.snowflake_configured = is_snowflake_configured() except ImportError: self.snowflake_available = False self.snowflake_configured = False # Check SQLite database status self.check_sqlite_status() # Auto-select best data source if self.sqlite_available and self.sqlite_row_count > 0: self.data_source = "sqlite" elif self.snowflake_configured: self.data_source = "snowflake" else: self.data_source = "file" # Date setters def set_start_date(self, value: str): """Set the start date for analysis.""" self.start_date = value def set_end_date(self, value: str): """Set the end date for analysis.""" self.end_date = value def set_last_seen_date(self, value: str): """Set the last seen date filter.""" self.last_seen_date = value # Selection setters def set_selected_trusts(self, trusts: list[str]): """Set the selected NHS trusts.""" self.selected_trusts = trusts def toggle_trust(self, trust: str): """Toggle a trust selection.""" if trust in self.selected_trusts: self.selected_trusts = [t for t in self.selected_trusts if t != trust] else: self.selected_trusts = self.selected_trusts + [trust] def select_all_trusts(self): """Select all available trusts.""" self.selected_trusts = self.available_trusts.copy() def clear_trusts(self): """Clear all trust selections.""" self.selected_trusts = [] def set_selected_drugs(self, drugs: list[str]): """Set the selected drugs.""" self.selected_drugs = drugs def toggle_drug(self, drug: str): """Toggle a drug selection.""" if drug in self.selected_drugs: self.selected_drugs = [d for d in self.selected_drugs if d != drug] else: self.selected_drugs = self.selected_drugs + [drug] def select_all_drugs(self): """Select all available drugs.""" self.selected_drugs = self.available_drugs.copy() def select_default_drugs(self): """Select only the default drugs (Include=1).""" self.selected_drugs = self.default_drugs.copy() def clear_drugs(self): """Clear all drug selections.""" self.selected_drugs = [] def set_selected_directories(self, directories: list[str]): """Set the selected directories.""" self.selected_directories = directories def toggle_directory(self, directory: str): """Toggle a directory selection.""" if directory in self.selected_directories: self.selected_directories = [d for d in self.selected_directories if d != directory] else: self.selected_directories = self.selected_directories + [directory] def select_all_directories(self): """Select all available directories.""" self.selected_directories = self.available_directories.copy() def clear_directories(self): """Clear all directory selections.""" self.selected_directories = [] # Analysis parameter setters def set_minimum_patients(self, value: int): """Set the minimum patients threshold.""" self.minimum_patients = max(0, value) def set_minimum_patients_from_input(self, value: str): """Set minimum patients threshold from string input.""" try: self.minimum_patients = max(0, int(value)) if value else 0 except ValueError: pass # Ignore invalid input def set_minimum_patients_from_slider(self, values: list[float]): """Set minimum patients threshold from slider value (list).""" if values: self.minimum_patients = max(0, int(values[0])) def set_custom_title(self, value: str): """Set a custom title for the analysis.""" self.custom_title = value # Data source methods def set_data_file_path(self, path: str): """Set the data file path for analysis.""" self.data_file_path = path def set_data_source(self, source: str): """Set the data source type (file, sqlite, snowflake).""" if source in ("file", "sqlite", "snowflake"): self.data_source = source # Status methods def set_status(self, message: str): """Update the status message.""" self.status_message = message def set_error(self, message: str): """Set an error message.""" self.error_message = message def clear_error(self): """Clear the error message.""" self.error_message = "" # File handling methods async def handle_file_upload(self, files: list[rx.UploadFile]): """ Handle file upload for CSV/Parquet data files. This accepts uploaded files and processes them for analysis. """ self.file_upload_error = "" self.file_upload_success = False if not files: self.file_upload_error = "No file selected" return file = files[0] # Take first file only file_name = file.filename file_ext = Path(file_name).suffix.lower() # Validate file extension if file_ext not in SUPPORTED_EXTENSIONS: self.file_upload_error = f"Unsupported file type: {file_ext}. Please upload CSV or Parquet files." return self.file_processing = True self.status_message = f"Processing {file_name}..." yield # Update UI try: # Read file content file_content = await file.read() file_size = len(file_content) self.uploaded_file_size = file_size # Save to uploads directory upload_dir = Path("data/uploads") upload_dir.mkdir(parents=True, exist_ok=True) upload_path = upload_dir / file_name with open(upload_path, "wb") as f: f.write(file_content) self.uploaded_file_name = file_name self.data_file_path = str(upload_path) self.data_source = "file" self.file_upload_success = True # Format file size for display if file_size < 1024: size_str = f"{file_size} bytes" elif file_size < 1024 * 1024: size_str = f"{file_size / 1024:.1f} KB" else: size_str = f"{file_size / (1024 * 1024):.1f} MB" self.status_message = f"Uploaded {file_name} ({size_str})" except Exception as e: self.file_upload_error = f"Upload failed: {str(e)}" self.file_upload_success = False finally: self.file_processing = False def clear_uploaded_file(self): """Clear the uploaded file and reset file state.""" self.uploaded_file_name = "" self.uploaded_file_size = 0 self.data_file_path = "" self.file_upload_success = False self.file_upload_error = "" self.status_message = "File cleared" def check_sqlite_status(self): """Check if SQLite database is available and get statistics.""" try: from data_processing.database import default_db_manager from data_processing.patient_data import get_patient_data_stats if default_db_manager.exists: stats = get_patient_data_stats(default_db_manager) self.sqlite_available = stats.get("total_rows", 0) > 0 self.sqlite_row_count = stats.get("total_rows", 0) self.sqlite_patient_count = stats.get("unique_patients", 0) if self.sqlite_available: self.status_message = f"SQLite database: {self.sqlite_row_count:,} rows, {self.sqlite_patient_count:,} patients" else: self.status_message = "SQLite database exists but has no data" else: self.sqlite_available = False self.sqlite_row_count = 0 self.sqlite_patient_count = 0 self.status_message = "SQLite database not found" except ImportError: self.sqlite_available = False self.status_message = "Data processing module not available" except Exception as e: self.sqlite_available = False self.status_message = f"Error checking SQLite: {str(e)}" def use_sqlite_source(self): """Set data source to SQLite database.""" self.data_source = "sqlite" self.data_file_path = "" self.status_message = "Using SQLite database as data source" def use_file_source(self): """Set data source to uploaded file.""" if self.uploaded_file_name: self.data_source = "file" self.status_message = f"Using uploaded file: {self.uploaded_file_name}" else: self.status_message = "No file uploaded. Please upload a file first." def use_snowflake_source(self): """Set data source to Snowflake (if available).""" if self.snowflake_configured: self.data_source = "snowflake" self.status_message = "Using Snowflake as data source" else: self.status_message = "Snowflake is not configured. Check config/snowflake.toml" @rx.var def data_source_display(self) -> str: """Human-readable data source description.""" if self.data_source == "file": if self.uploaded_file_name: return f"File: {self.uploaded_file_name}" return "File: No file selected" elif self.data_source == "sqlite": if self.sqlite_available: return f"SQLite: {self.sqlite_row_count:,} rows" return "SQLite: Not available" elif self.data_source == "snowflake": if self.snowflake_configured: return "Snowflake: Ready" return "Snowflake: Not configured" return "Unknown" @rx.var def file_size_display(self) -> str: """Human-readable file size.""" if self.uploaded_file_size == 0: return "" if self.uploaded_file_size < 1024: return f"{self.uploaded_file_size} bytes" elif self.uploaded_file_size < 1024 * 1024: return f"{self.uploaded_file_size / 1024:.1f} KB" else: return f"{self.uploaded_file_size / (1024 * 1024):.1f} MB" # Validation def validate_filters(self) -> list[str]: """ Validate the current filter configuration. Returns a list of error messages (empty if valid). """ errors = [] # Check dates are set if not self.start_date: errors.append("Start date is required") if not self.end_date: errors.append("End date is required") if not self.last_seen_date: errors.append("Last seen date is required") # Check date order if self.start_date and self.end_date: if self.end_date < self.start_date: errors.append("End date cannot be before start date") if self.last_seen_date and self.end_date: if self.last_seen_date > self.end_date: errors.append("Last seen date is after end date (would exclude all patients)") # Check minimum patients if self.minimum_patients < 0: errors.append("Minimum patients cannot be negative") # Check at least some drugs are selected (warning, not error) # Empty selection means "include all" return errors @rx.var def filter_summary(self) -> str: """Generate a summary of current filter settings.""" lines = [] if self.start_date and self.end_date: lines.append(f"Date range: {self.start_date} to {self.end_date}") if self.last_seen_date: lines.append(f"Last seen after: {self.last_seen_date}") lines.append(f"Minimum patients: {self.minimum_patients}") if self.selected_trusts: lines.append(f"Trusts: {len(self.selected_trusts)} selected") else: lines.append("Trusts: All") if self.selected_drugs: lines.append(f"Drugs: {len(self.selected_drugs)} selected") else: lines.append("Drugs: All") if self.selected_directories: lines.append(f"Directories: {len(self.selected_directories)} selected") else: lines.append("Directories: All") return "\n".join(lines) @rx.var def display_title(self) -> str: """Generate the display title for the analysis.""" if self.custom_title: return self.custom_title if self.start_date and self.end_date: return f"Patients initiated from {self.start_date} to {self.end_date}" return "Patient Pathway Analysis" @rx.var def drug_selection_count(self) -> str: """Display count of selected drugs.""" return f"{len(self.selected_drugs)} of {len(self.available_drugs)} drugs selected" @rx.var def trust_selection_count(self) -> str: """Display count of selected trusts.""" if not self.selected_trusts: return f"All {len(self.available_trusts)} trusts (none selected)" return f"{len(self.selected_trusts)} of {len(self.available_trusts)} trusts selected" @rx.var def directory_selection_count(self) -> str: """Display count of selected directories.""" if not self.selected_directories: return f"All {len(self.available_directories)} directories (none selected)" return f"{len(self.selected_directories)} of {len(self.available_directories)} directories selected" # Search setters def set_drug_search(self, value: str): """Set the drug search filter text.""" self.drug_search = value def set_trust_search(self, value: str): """Set the trust search filter text.""" self.trust_search = value def set_directory_search(self, value: str): """Set the directory search filter text.""" self.directory_search = value def clear_drug_search(self): """Clear the drug search filter.""" self.drug_search = "" def clear_trust_search(self): """Clear the trust search filter.""" self.trust_search = "" def clear_directory_search(self): """Clear the directory search filter.""" self.directory_search = "" @rx.var def filtered_drugs(self) -> list[str]: """Get the list of drugs filtered by search text.""" if not self.drug_search: return self.available_drugs search_lower = self.drug_search.lower() return [d for d in self.available_drugs if search_lower in d.lower()] @rx.var def filtered_trusts(self) -> list[str]: """Get the list of trusts filtered by search text.""" if not self.trust_search: return self.available_trusts search_lower = self.trust_search.lower() return [t for t in self.available_trusts if search_lower in t.lower()] @rx.var def filtered_directories(self) -> list[str]: """Get the list of directories filtered by search text.""" if not self.directory_search: return self.available_directories search_lower = self.directory_search.lower() return [d for d in self.available_directories if search_lower in d.lower()] @rx.var def drug_search_result_count(self) -> str: """Display count of drugs matching search.""" total = len(self.available_drugs) filtered = len(self.filtered_drugs) if not self.drug_search: return f"{total} drugs" return f"Showing {filtered} of {total} drugs" @rx.var def trust_search_result_count(self) -> str: """Display count of trusts matching search.""" total = len(self.available_trusts) filtered = len(self.filtered_trusts) if not self.trust_search: return f"{total} trusts" return f"Showing {filtered} of {total} trusts" @rx.var def directory_search_result_count(self) -> str: """Display count of directories matching search.""" total = len(self.available_directories) filtered = len(self.filtered_directories) if not self.directory_search: return f"{total} directories" return f"Showing {filtered} of {total} directories" # Analysis methods def run_analysis(self): """ Run the patient pathway analysis with current filter settings. This is an async generator that yields state updates for progress indication. Uses the existing analysis pipeline from tools/dashboard_gui.py. """ # Validate filters first errors = self.validate_filters() if errors: self.error_message = "Validation errors:\n" + "\n".join(errors) return self.analysis_running = True self.error_message = "" self.status_message = "Starting analysis..." self.has_chart = False yield # Update UI to show running state try: # Import analysis modules from core import AnalysisFilters, PathConfig, default_paths from data_processing.data_source import get_data from tools.dashboard_gui import generate_graph # Get the data using fallback chain (cache -> Snowflake -> SQLite -> file) self.status_message = "Loading patient data..." yield # Build filter parameters trusts = self.selected_trusts if self.selected_trusts else self.available_trusts drugs = self.selected_drugs if self.selected_drugs else self.available_drugs directories = self.selected_directories if self.selected_directories else self.available_directories # Get data from the data source manager result = get_data( start_date=self.start_date, end_date=self.end_date, trusts=trusts, drugs=drugs, directories=directories, ) if result.df is None or len(result.df) == 0: self.error_message = "No data available. Please check your data source configuration." self.analysis_running = False return self.data_source = result.source_type.value self.data_row_count = len(result.df) self.status_message = f"Loaded {self.data_row_count:,} rows from {self.data_source}" yield # Create AnalysisFilters object for generate_graph self.status_message = "Processing pathways..." yield # Generate the chart data (without writing to file) # We'll create the figure data directly instead of calling generate_graph # which writes to file and opens browser fig_data = self._generate_chart_data( df=result.df, trusts=trusts, drugs=drugs, directories=directories, ) if fig_data is not None: self.chart_data = fig_data self.has_chart = True self.status_message = f"Analysis complete! Showing {self.data_row_count:,} interventions." else: self.error_message = "No data found matching the selected filters." self.has_chart = False except Exception as e: self.error_message = f"Analysis failed: {str(e)}\n\n{traceback.format_exc()}" self.has_chart = False finally: self.analysis_running = False yield # Final UI update def _generate_chart_data( self, df: pd.DataFrame, trusts: list[str], drugs: list[str], directories: list[str], ) -> Optional[go.Figure]: """ Generate Plotly chart data from processed DataFrame. This replicates the core logic of generate_graph() and figure() but returns the figure dict instead of writing to file and opening browser. This is a workaround to avoid modifying generate_graph() internals (which is deferred to Phase 5). """ from core import default_paths # Use the org_codes mapping org_codes = pd.read_csv(default_paths.org_codes_csv, index_col=1) # Make a copy to avoid modifying original df1 = df.copy() # Create UPID + Treatment column for deduplication df1["UPIDTreatment"] = df1["UPID"] + df1["Drug Name"] # Map provider codes to names df1["Provider Code"] = df1["Provider Code"].map(org_codes["Name"]) # Apply filters df1 = df1[ (df1["Provider Code"].isin(trusts)) & (df1["Drug Name"].isin(drugs)) & (df1["Directory"].isin(directories)) ] if len(df1) == 0: return None # Apply date filters df1 = df1[ (df1["Intervention Date"] >= self.start_date) & (df1["Intervention Date"] <= self.end_date) ] if len(df1) == 0: return None # Add indication validation columns (if enabled and Snowflake available) df1 = self._add_indication_validation(df1) # Store filtered data for CSV export (now includes indication columns) self._analysis_data = df1.copy() # Build a simplified hierarchy for the icicle chart # Group by Trust -> Directory -> Drug to get patient counts hierarchy_data = self._build_hierarchy(df1, org_codes) if hierarchy_data.empty: return None # Apply minimum patients filter hierarchy_data = hierarchy_data[hierarchy_data['value'] >= self.minimum_patients] if hierarchy_data.empty: return None # Create the Plotly icicle figure fig = go.Figure(go.Icicle( labels=hierarchy_data['labels'].tolist(), ids=hierarchy_data['ids'].tolist(), parents=hierarchy_data['parents'].tolist(), values=hierarchy_data['value'].tolist(), branchvalues="total", marker=dict( colors=hierarchy_data['colour'].tolist() if 'colour' in hierarchy_data.columns else None, colorscale='Viridis', ), maxdepth=3, texttemplate='%{label}
Patients: %{value}', hovertemplate='%{label}
Patients: %{value}', )) # Set chart title title_text = self.custom_title if self.custom_title else f"Patients initiated {self.start_date} to {self.end_date}" fig.update_layout( margin=dict(t=60, l=1, r=1, b=60), title=f"Norfolk & Waveney ICS High-Cost Drug Patient Pathways - {title_text}", title_x=0.5, hoverlabel=dict(font_size=16), ) # Return figure for rx.plotly() return fig def _build_hierarchy(self, df: pd.DataFrame, org_codes: pd.DataFrame) -> pd.DataFrame: """ Build a hierarchical dataframe for icicle chart. Creates Trust -> Directory -> Drug hierarchy with patient counts. """ # Create directory mapping from UPID directory_df = df[["UPID", "Directory"]].drop_duplicates("UPID").set_index("UPID") # Get unique patients per drug patient_drugs = df[["UPID", "Drug Name", "Provider Code", "Directory"]].drop_duplicates(subset=["UPID", "Drug Name"]) # Build hierarchy: Trust -> Directory -> Drug rows = [] # Root node total_patients = patient_drugs["UPID"].nunique() rows.append({ 'parents': '', 'ids': 'N&WICS', 'labels': 'N&WICS', 'value': total_patients, 'colour': 1.0, }) # Trust level trust_counts = patient_drugs.groupby("Provider Code")["UPID"].nunique().reset_index() trust_counts.columns = ["trust", "count"] for _, row in trust_counts.iterrows(): trust = row["trust"] if pd.isna(trust): continue rows.append({ 'parents': 'N&WICS', 'ids': f'N&WICS - {trust}', 'labels': trust, 'value': row["count"], 'colour': row["count"] / total_patients, }) # Directory level (under each trust) trust_dir_counts = patient_drugs.groupby(["Provider Code", "Directory"])["UPID"].nunique().reset_index() trust_dir_counts.columns = ["trust", "directory", "count"] for _, row in trust_dir_counts.iterrows(): trust = row["trust"] directory = row["directory"] if pd.isna(trust) or pd.isna(directory): continue trust_total = trust_counts[trust_counts["trust"] == trust]["count"].values trust_total = trust_total[0] if len(trust_total) > 0 else 1 rows.append({ 'parents': f'N&WICS - {trust}', 'ids': f'N&WICS - {trust} - {directory}', 'labels': directory, 'value': row["count"], 'colour': row["count"] / trust_total, }) # Drug level (under each trust-directory) trust_dir_drug_counts = patient_drugs.groupby(["Provider Code", "Directory", "Drug Name"])["UPID"].nunique().reset_index() trust_dir_drug_counts.columns = ["trust", "directory", "drug", "count"] for _, row in trust_dir_drug_counts.iterrows(): trust = row["trust"] directory = row["directory"] drug = row["drug"] if pd.isna(trust) or pd.isna(directory) or pd.isna(drug): continue dir_total = trust_dir_counts[ (trust_dir_counts["trust"] == trust) & (trust_dir_counts["directory"] == directory) ]["count"].values dir_total = dir_total[0] if len(dir_total) > 0 else 1 rows.append({ 'parents': f'N&WICS - {trust} - {directory}', 'ids': f'N&WICS - {trust} - {directory} - {drug}', 'labels': drug, 'value': row["count"], 'colour': row["count"] / dir_total, }) return pd.DataFrame(rows) def _add_indication_validation(self, df: pd.DataFrame) -> pd.DataFrame: """ Add indication validation columns to the DataFrame. Adds columns: - Indication_Valid: Boolean indicating if patient has valid GP diagnosis - Indication_Source: "GP_SNOMED" | "NONE" | "NOT_CHECKED" - Indication_Cluster: The matched SNOMED cluster ID (if any) This requires Snowflake connectivity for GP record lookups. If Snowflake is not available, columns are added with "NOT_CHECKED" status. """ # Initialize columns with default values df = df.copy() df["Indication_Valid"] = False df["Indication_Source"] = "NOT_CHECKED" df["Indication_Cluster"] = "" # Check if indication validation is enabled and Snowflake is available if not self.indication_validation_enabled: return df try: from data_processing.snowflake_connector import ( is_snowflake_available, is_snowflake_configured, get_connector, ) from data_processing.diagnosis_lookup import ( get_drug_cluster_ids, patient_has_indication, ) if not is_snowflake_available() or not is_snowflake_configured(): # Snowflake not available - can't validate indications self.indication_validation_summary = "Indication validation skipped (Snowflake not configured)" return df self.indication_validation_running = True # Get unique patient-drug pairs patient_drug_pairs = df[["UPID", "Drug Name"]].drop_duplicates() total_pairs = len(patient_drug_pairs) # Cache drug clusters to avoid repeated lookups drug_clusters_cache = {} # Track results for summary validation_results = {} # drug -> {total, matched} connector = get_connector() for idx, (_, row) in enumerate(patient_drug_pairs.iterrows()): upid = row["UPID"] drug_name = row["Drug Name"] # Get drug clusters (cached) drug_upper = drug_name.upper() if drug_name else "" if drug_upper not in drug_clusters_cache: drug_clusters_cache[drug_upper] = get_drug_cluster_ids(drug_name) cluster_ids = drug_clusters_cache[drug_upper] # Initialize drug in results tracking if drug_upper not in validation_results: validation_results[drug_upper] = {"total": 0, "matched": 0, "name": drug_name} validation_results[drug_upper]["total"] += 1 if not cluster_ids: # No cluster mapping for this drug - mark as NONE mask = (df["UPID"] == upid) & (df["Drug Name"] == drug_name) df.loc[mask, "Indication_Source"] = "NONE" continue # Check patient indication in GP records # Note: We use the UPID as patient identifier - this may need mapping to pseudonymised NHS number # For now, assume UPID can be used directly or is already the pseudonymised ID has_indication, matched_cluster, _, _ = patient_has_indication( patient_pseudonym=upid, cluster_ids=cluster_ids, connector=connector, ) # Update dataframe for this patient-drug combination mask = (df["UPID"] == upid) & (df["Drug Name"] == drug_name) df.loc[mask, "Indication_Valid"] = has_indication df.loc[mask, "Indication_Source"] = "GP_SNOMED" if has_indication else "NONE" if matched_cluster: df.loc[mask, "Indication_Cluster"] = matched_cluster if has_indication: validation_results[drug_upper]["matched"] += 1 # Store validation results and create summary self.indication_validation_results = { drug: { "drug_name": data["name"], "total_patients": data["total"], "patients_with_indication": data["matched"], "match_rate": round(data["matched"] / data["total"] * 100, 1) if data["total"] > 0 else 0, } for drug, data in validation_results.items() } # Create summary text total_patients = sum(d["total"] for d in validation_results.values()) matched_patients = sum(d["matched"] for d in validation_results.values()) overall_rate = round(matched_patients / total_patients * 100, 1) if total_patients > 0 else 0 self.indication_validation_summary = ( f"GP Indication Validation: {matched_patients}/{total_patients} " f"({overall_rate}%) patients have valid GP diagnosis" ) except Exception as e: self.indication_validation_summary = f"Indication validation error: {str(e)}" # Don't fail the whole analysis - just leave columns as NOT_CHECKED finally: self.indication_validation_running = False return df def toggle_indication_validation(self): """Toggle indication validation on/off.""" self.indication_validation_enabled = not self.indication_validation_enabled @rx.var def indication_validation_status(self) -> str: """Get human-readable indication validation status.""" if self.indication_validation_running: return "Validating patient indications..." if self.indication_validation_summary: return self.indication_validation_summary if self.indication_validation_enabled: return "Enabled (will check GP records)" return "Disabled" @rx.var def indication_results_list(self) -> list[dict]: """ Get indication validation results as a list for display. Returns list of dicts with: drug_name, total_patients, patients_with_indication, match_rate Sorted by match rate ascending (worst first) for easy identification of issues. """ if not self.indication_validation_results: return [] results = [] for drug_key, data in self.indication_validation_results.items(): results.append({ "drug_name": data.get("drug_name", drug_key), "total_patients": data.get("total_patients", 0), "patients_with_indication": data.get("patients_with_indication", 0), "match_rate": data.get("match_rate", 0), }) # Sort by match rate ascending (lowest first to highlight issues) results.sort(key=lambda x: x["match_rate"]) return results @rx.var def has_indication_results(self) -> bool: """Check if there are indication validation results to display.""" return len(self.indication_validation_results) > 0 def export_chart_html(self): """ Export the current chart as an interactive HTML file. The file is saved to data/exports/ directory with a timestamped filename. """ if not self.has_chart: self.export_error = "No chart to export. Please run analysis first." return self.export_error = "" self.export_message = "" try: from datetime import datetime # Create exports directory export_dir = Path("data/exports") export_dir.mkdir(parents=True, exist_ok=True) # Generate filename with timestamp timestamp = datetime.now().strftime("%Y%m%d_%H%M%S") filename = f"pathway_chart_{timestamp}.html" filepath = export_dir / filename # Export the chart to HTML self.chart_data.write_html( str(filepath), include_plotlyjs=True, full_html=True, ) self.last_export_path = str(filepath) self.export_message = f"Chart exported to {filename}" except Exception as e: self.export_error = f"Export failed: {str(e)}" def export_data_csv(self): """ Export the underlying analysis data as a CSV file. The file is saved to data/exports/ directory with a timestamped filename. """ if self._analysis_data is None or len(self._analysis_data) == 0: self.export_error = "No data to export. Please run analysis first." return self.export_error = "" self.export_message = "" try: from datetime import datetime # Create exports directory export_dir = Path("data/exports") export_dir.mkdir(parents=True, exist_ok=True) # Generate filename with timestamp timestamp = datetime.now().strftime("%Y%m%d_%H%M%S") filename = f"pathway_data_{timestamp}.csv" filepath = export_dir / filename # Export the data to CSV self._analysis_data.to_csv(filepath, index=False) self.last_export_path = str(filepath) self.export_message = f"Data exported to {filename}" except Exception as e: self.export_error = f"Export failed: {str(e)}" def clear_export_messages(self): """Clear export status messages.""" self.export_message = "" self.export_error = "" # ============================================================================= # Page Components # ============================================================================= def info_card(title: str, value: str, icon: str) -> rx.Component: """Create an info card showing a statistic.""" return rx.box( rx.vstack( rx.hstack( rx.icon(icon, size=20, color=NHS_BLUE), rx.text(title, size="2", color="gray"), spacing="2", align="center", ), rx.text(value, size="5", weight="bold"), spacing="1", align="start", ), padding="16px", background="white", border_radius="8px", border="1px solid rgb(229, 231, 235)", width="100%", ) def date_input(label: str, value: rx.Var, on_change, help_text: str = "", input_id: str = "") -> rx.Component: """Create a labeled date input component with accessibility support.""" # Generate a unique ID if not provided label_id = f"{input_id}-label" if input_id else "" help_id = f"{input_id}-help" if input_id else "" return rx.vstack( rx.el.label( label, html_for=input_id, font_size="14px", font_weight="500", color=NHS_DARK_BLUE, ), rx.input( type="date", value=value, on_change=on_change, width="100%", id=input_id, aria_describedby=help_id if help_text else "", ), rx.cond( help_text != "", rx.text(help_text, size="1", color="gray", id=help_id), ), spacing="1", align="start", width="100%", ) def data_source_selector() -> rx.Component: """Data source selector with file upload, SQLite, and Snowflake options.""" return rx.box( rx.vstack( rx.heading("Data Source", size="5", color=NHS_DARK_BLUE), rx.text( "Select where to load patient data from", size="2", color="gray", ), rx.divider(margin_y="8px"), # Current data source display rx.hstack( rx.text("Current source:", weight="medium"), rx.badge( State.data_source_display, color_scheme=rx.cond( State.data_source == "sqlite", "green", rx.cond( State.data_source == "snowflake", "blue", "gray", ), ), size="2", ), spacing="2", align="center", ), rx.divider(margin_y="8px"), # Data source options rx.vstack( # SQLite option rx.box( rx.hstack( rx.icon("database", size=20, color=NHS_BLUE), rx.vstack( rx.hstack( rx.text("SQLite Database", weight="medium"), rx.cond( State.sqlite_available, rx.badge("Available", color_scheme="green", size="1"), rx.badge("No data", color_scheme="gray", size="1"), ), spacing="2", ), rx.cond( State.sqlite_available, rx.text( f"Contains pre-loaded patient data", size="1", color="gray", ), rx.text( "Run data migration to populate", size="1", color="gray", ), ), spacing="1", align="start", ), rx.spacer(), rx.button( "Use SQLite", on_click=State.use_sqlite_source, variant=rx.cond(State.data_source == "sqlite", "solid", "outline"), color_scheme="green", size="2", disabled=~State.sqlite_available, ), spacing="3", align="center", width="100%", ), padding="12px", background=rx.cond( State.data_source == "sqlite", "rgba(0, 94, 184, 0.05)", "transparent", ), border_radius="6px", border=rx.cond( State.data_source == "sqlite", "1px solid rgb(0, 94, 184)", "1px solid transparent", ), width="100%", ), # File upload option rx.box( rx.vstack( rx.hstack( rx.icon("upload", size=20, color=NHS_BLUE), rx.vstack( rx.hstack( rx.text("Upload File", weight="medium"), rx.cond( State.file_upload_success, rx.badge(State.file_size_display, color_scheme="green", size="1"), ), spacing="2", ), rx.text( "Upload CSV or Parquet file", size="1", color="gray", ), spacing="1", align="start", ), rx.spacer(), rx.cond( State.file_upload_success, rx.hstack( rx.button( "Use File", on_click=State.use_file_source, variant=rx.cond(State.data_source == "file", "solid", "outline"), color_scheme="blue", size="2", ), rx.button( rx.icon("x", size=14), on_click=State.clear_uploaded_file, variant="ghost", color_scheme="red", size="1", ), spacing="1", ), ), spacing="3", align="center", width="100%", ), rx.cond( State.file_upload_success, rx.text( State.uploaded_file_name, size="2", color=NHS_BLUE, font_family="monospace", ), rx.upload( rx.vstack( rx.cond( State.file_processing, rx.spinner(size="2"), rx.icon("file-up", size=24, color="gray"), ), rx.text( "Drag & drop or click to browse", size="2", color="gray", ), rx.text( "Supports CSV, Parquet", size="1", color="gray", ), spacing="2", align="center", padding="16px", ), id="file_upload", accept={ "text/csv": [".csv"], "application/octet-stream": [".parquet", ".pq"], }, max_files=1, border="1px dashed rgb(200, 200, 200)", border_radius="6px", padding="4px", width="100%", on_drop=State.handle_file_upload(rx.upload_files(upload_id="file_upload")), ), ), rx.cond( State.file_upload_error != "", rx.text( State.file_upload_error, size="2", color="red", ), ), spacing="2", width="100%", ), padding="12px", background=rx.cond( (State.data_source == "file") & State.file_upload_success, "rgba(0, 94, 184, 0.05)", "transparent", ), border_radius="6px", border=rx.cond( (State.data_source == "file") & State.file_upload_success, "1px solid rgb(0, 94, 184)", "1px solid transparent", ), width="100%", ), # Snowflake option rx.box( rx.hstack( rx.icon("cloud", size=20, color=NHS_BLUE), rx.vstack( rx.hstack( rx.text("Snowflake", weight="medium"), rx.cond( State.snowflake_configured, rx.badge("Configured", color_scheme="blue", size="1"), rx.badge("Not configured", color_scheme="gray", size="1"), ), spacing="2", ), rx.text( "Query live data from Snowflake", size="1", color="gray", ), spacing="1", align="start", ), rx.spacer(), rx.button( "Use Snowflake", on_click=State.use_snowflake_source, variant=rx.cond(State.data_source == "snowflake", "solid", "outline"), color_scheme="blue", size="2", disabled=~State.snowflake_configured, ), spacing="3", align="center", width="100%", ), padding="12px", background=rx.cond( State.data_source == "snowflake", "rgba(0, 94, 184, 0.05)", "transparent", ), border_radius="6px", border=rx.cond( State.data_source == "snowflake", "1px solid rgb(0, 94, 184)", "1px solid transparent", ), width="100%", ), spacing="2", width="100%", ), spacing="3", align="start", width="100%", ), padding="20px", background="white", border_radius="8px", border="1px solid rgb(229, 231, 235)", width="100%", ) def filter_controls() -> rx.Component: """Filter controls section with date pickers, minimum patients, and custom title.""" return rx.box( rx.vstack( rx.heading("Analysis Settings", size="5", color=NHS_DARK_BLUE, id="analysis-settings-heading"), # Date range row rx.hstack( date_input( "Start Date", State.start_date, State.set_start_date, "Include patients initiated from this date", input_id="start-date", ), date_input( "End Date", State.end_date, State.set_end_date, "Include patients initiated until this date", input_id="end-date", ), date_input( "Last Seen After", State.last_seen_date, State.set_last_seen_date, "Only include patients seen after this date", input_id="last-seen-date", ), spacing="4", width="100%", flex_wrap="wrap", role="group", aria_label="Date range filters", ), rx.divider(margin_y="12px"), # Additional settings row rx.hstack( # Minimum patients rx.vstack( rx.el.label( "Minimum Patients", html_for="min-patients", font_size="14px", font_weight="500", color=NHS_DARK_BLUE, ), rx.hstack( rx.input( type="number", value=State.minimum_patients.to_string(), on_change=State.set_minimum_patients_from_input, min="0", max="1000", width="100px", id="min-patients", aria_describedby="min-patients-help", ), rx.slider( value=[State.minimum_patients], on_change=State.set_minimum_patients_from_slider, min=0, max=100, step=1, width="150px", aria_label="Minimum patients slider", ), spacing="3", align="center", ), rx.text( "Hide pathways with fewer patients", size="1", color="gray", id="min-patients-help", ), spacing="1", align="start", ), # Custom title rx.vstack( rx.el.label( "Custom Title (Optional)", html_for="custom-title", font_size="14px", font_weight="500", color=NHS_DARK_BLUE, ), rx.input( placeholder="Leave empty for auto-generated title", value=State.custom_title, on_change=State.set_custom_title, width="300px", id="custom-title", aria_describedby="custom-title-help", ), rx.text( "Override the default chart title", size="1", color="gray", id="custom-title-help", ), spacing="1", align="start", ), spacing="6", width="100%", flex_wrap="wrap", align="start", ), spacing="4", align="start", width="100%", ), padding="20px", background="white", border_radius="8px", border="1px solid rgb(229, 231, 235)", width="100%", role="region", aria_labelledby="analysis-settings-heading", ) def indication_result_row(result: dict) -> rx.Component: """Render a single row in the indication validation results table.""" match_rate = result["match_rate"] # Color code: green for high match rates, amber for moderate, red for low # Use .to(int) to cast Reflex Var for comparison (rx.foreach items are Vars) rate_color = rx.cond( match_rate.to(int) >= 80, "green", rx.cond(match_rate.to(int) >= 50, "orange", "red"), ) return rx.table.row( rx.table.cell(rx.text(result["drug_name"], weight="medium")), rx.table.cell(result["total_patients"].to_string()), rx.table.cell(result["patients_with_indication"].to_string()), rx.table.cell( rx.hstack( rx.progress( value=match_rate, max=100, width="60px", height="8px", color_scheme=rate_color, ), rx.text( match_rate.to_string() + "%", size="2", color=rate_color, weight="medium", ), spacing="2", align="center", ) ), ) def indication_validation_summary() -> rx.Component: """ Component to display indication validation results per drug. Shows a collapsible section with a table of per-drug match rates, helping users identify which drugs have good vs poor GP diagnosis coverage. """ return rx.cond( State.has_indication_results, rx.el.section( rx.vstack( # Header with overall summary rx.hstack( rx.hstack( rx.icon("clipboard-check", size=20, color=NHS_DARK_BLUE, aria_hidden="true"), rx.heading( "GP Indication Validation Results", size="5", color=NHS_DARK_BLUE, id="indication-results-heading", ), spacing="2", align="center", ), rx.spacer(), rx.badge( State.indication_validation_summary, color_scheme="blue", size="2", ), width="100%", align="center", ), rx.text( "Shows the percentage of patients with valid GP diagnoses matching their prescribed drug's indication. " "Lower rates may indicate prescribing for off-label use, data quality issues, or patients treated across multiple providers.", size="2", color="gray", ), # Results table rx.table.root( rx.table.header( rx.table.row( rx.table.column_header_cell("Drug Name"), rx.table.column_header_cell("Total Patients"), rx.table.column_header_cell("With GP Indication"), rx.table.column_header_cell("Match Rate"), ), ), rx.table.body( rx.foreach(State.indication_results_list, indication_result_row) ), width="100%", size="2", ), # Legend rx.hstack( rx.text("Legend:", size="1", color="gray", weight="medium"), rx.hstack( rx.badge("80%+", color_scheme="green", size="1"), rx.text("Good coverage", size="1", color="gray"), spacing="1", align="center", ), rx.hstack( rx.badge("50-79%", color_scheme="orange", size="1"), rx.text("Moderate", size="1", color="gray"), spacing="1", align="center", ), rx.hstack( rx.badge("<50%", color_scheme="red", size="1"), rx.text("Low coverage", size="1", color="gray"), spacing="1", align="center", ), spacing="4", flex_wrap="wrap", ), spacing="3", width="100%", align="start", ), padding="20px", background="white", border_radius="8px", border="1px solid rgb(229, 231, 235)", width="100%", aria_labelledby="indication-results-heading", ), ) def home_content() -> rx.Component: """Home page content with filter configuration and analysis controls.""" return rx.vstack( # Hero section rx.box( rx.vstack( rx.image( src="/logo.png", height="60px", alt="NHS Logo", ), rx.heading( "Patient Pathway Analysis", size="8", color=NHS_DARK_BLUE, ), rx.text( "Analyze secondary care treatment pathways for high-cost drugs", size="4", color="gray", ), spacing="3", align="center", ), padding="32px", background="white", border_radius="12px", border="1px solid rgb(229, 231, 235)", width="100%", text_align="center", ), # Status cards rx.hstack( info_card("Drugs Loaded", State.drug_selection_count, "pill"), info_card("Trusts", State.trust_selection_count, "building"), info_card("Directories", State.directory_selection_count, "folder"), spacing="4", width="100%", flex_wrap="wrap", ), # Data source selector data_source_selector(), # Filter controls (date pickers, minimum patients, custom title) filter_controls(), # Filter summary rx.box( rx.vstack( rx.heading("Current Filter Settings", size="4", color=NHS_DARK_BLUE), rx.text( State.filter_summary, white_space="pre-wrap", font_family="monospace", font_size="13px", color="gray", ), spacing="2", align="start", width="100%", ), padding="20px", background="white", border_radius="8px", border="1px solid rgb(229, 231, 235)", width="100%", ), # Action buttons rx.hstack( rx.button( rx.icon("database", size=16, aria_hidden="true"), "Load Reference Data", on_click=State.load_reference_data, color_scheme="blue", size="3", disabled=State.analysis_running, aria_label="Load reference data from CSV files", ), rx.button( rx.cond( State.analysis_running, rx.hstack( rx.spinner(size="1"), rx.text("Running..."), spacing="2", align="center", ), rx.hstack( rx.icon("play", size=16, aria_hidden="true"), rx.text("Run Analysis"), spacing="2", align="center", ), ), on_click=State.run_analysis, color_scheme="green", size="3", disabled=State.analysis_running, aria_label="Run patient pathway analysis", aria_busy=State.analysis_running, ), spacing="3", role="toolbar", aria_label="Analysis actions", ), # Messages with live regions for screen readers rx.cond( State.status_message != "", rx.callout( State.status_message, icon="info", color="blue", role="status", aria_live="polite", ), ), rx.cond( State.error_message != "", rx.callout( State.error_message, icon="triangle-alert", color="red", role="alert", aria_live="assertive", ), ), # Chart display rx.cond( State.has_chart, rx.el.section( rx.vstack( rx.hstack( rx.heading("Patient Pathway Chart", size="5", color=NHS_DARK_BLUE, id="chart-heading"), rx.spacer(), rx.hstack( rx.button( rx.icon("download", size=14, aria_hidden="true"), "Export HTML", on_click=State.export_chart_html, variant="outline", size="2", aria_label="Export chart as interactive HTML file", ), rx.button( rx.icon("file-spreadsheet", size=14, aria_hidden="true"), "Export CSV", on_click=State.export_data_csv, variant="outline", size="2", aria_label="Export data as CSV spreadsheet", ), spacing="2", role="toolbar", aria_label="Export options", ), width="100%", align="center", ), rx.text( "Click on sections to zoom in. Use the toolbar for additional options.", size="2", color="gray", ), # Export messages rx.cond( State.export_message != "", rx.callout( State.export_message, icon="check", color="green", role="status", aria_live="polite", ), ), rx.cond( State.export_error != "", rx.callout( State.export_error, icon="triangle-alert", color="red", role="alert", ), ), rx.el.figure( rx.plotly(data=State.chart_data), aria_label="Interactive patient pathway icicle chart showing treatment hierarchy", ), spacing="3", width="100%", ), padding="20px", background="white", border_radius="8px", border="1px solid rgb(229, 231, 235)", width="100%", aria_labelledby="chart-heading", ), ), # Indication validation results (shown after chart) indication_validation_summary(), spacing="5", width="100%", align="start", ) def selection_page_content( title: str, description: str, items: rx.Var, selected_items: rx.Var, toggle_handler, select_all_handler, clear_handler, count_text: rx.Var, search_value: rx.Var, search_handler, clear_search_handler, search_result_text: rx.Var, extra_buttons: list[rx.Component] = None, page_id: str = "selection", ) -> rx.Component: """Generic selection page content for drugs, trusts, directories with search and accessibility.""" heading_id = f"{page_id}-heading" search_id = f"{page_id}-search" list_id = f"{page_id}-list" buttons = [ rx.button( "Select All", on_click=select_all_handler, variant="outline", size="2", aria_label=f"Select all {title.lower()}", ), rx.button( "Clear All", on_click=clear_handler, variant="outline", size="2", aria_label=f"Clear all {title.lower()} selections", ), ] if extra_buttons: buttons.extend(extra_buttons) return rx.vstack( # Header rx.el.header( rx.vstack( rx.heading(title, size="6", color=NHS_DARK_BLUE, id=heading_id), rx.text(description, color="gray"), rx.el.div( count_text, font_weight="500", color=NHS_BLUE, aria_live="polite", aria_atomic="true", ), spacing="2", align="start", ), padding="20px", background="white", border_radius="8px", border="1px solid rgb(229, 231, 235)", width="100%", ), # Search input rx.box( rx.hstack( rx.icon("search", size=16, color="gray", aria_hidden="true"), rx.input( placeholder=f"Search {title.lower()}...", value=search_value, on_change=search_handler, width="100%", id=search_id, aria_label=f"Search {title.lower()}", aria_controls=list_id, ), rx.cond( search_value != "", rx.button( rx.icon("x", size=14, aria_hidden="true"), on_click=clear_search_handler, variant="ghost", color_scheme="gray", size="1", aria_label="Clear search", ), ), spacing="2", align="center", width="100%", ), padding="12px 16px", background="white", border_radius="8px", border="1px solid rgb(229, 231, 235)", width="100%", role="search", ), # Action buttons and search result count rx.hstack( rx.hstack(*buttons, spacing="2", role="toolbar", aria_label="Selection actions"), rx.spacer(), rx.el.div( search_result_text, font_size="14px", color="gray", aria_live="polite", ), spacing="3", width="100%", align="center", ), # Selection grid rx.box( rx.vstack( rx.foreach( items, lambda item: rx.box( rx.checkbox( item, checked=selected_items.contains(item), on_change=lambda: toggle_handler(item), size="2", ), padding="8px 12px", background=rx.cond( selected_items.contains(item), "rgba(0, 94, 184, 0.1)", "transparent", ), border_radius="4px", width="100%", ), ), spacing="1", width="100%", max_height="500px", overflow_y="auto", id=list_id, role="group", aria_labelledby=heading_id, ), padding="16px", background="white", border_radius="8px", border="1px solid rgb(229, 231, 235)", width="100%", ), spacing="4", width="100%", align="start", ) def drugs_content() -> rx.Component: """Drug selection page content.""" return selection_page_content( title="Drug Selection", description="Select which high-cost drugs to include in the analysis", items=State.filtered_drugs, selected_items=State.selected_drugs, toggle_handler=State.toggle_drug, select_all_handler=State.select_all_drugs, clear_handler=State.clear_drugs, count_text=State.drug_selection_count, search_value=State.drug_search, search_handler=State.set_drug_search, clear_search_handler=State.clear_drug_search, search_result_text=State.drug_search_result_count, extra_buttons=[ rx.button( "Select Defaults", on_click=State.select_default_drugs, variant="outline", size="2", aria_label="Select default drugs (Include=1)", ), ], page_id="drugs", ) def trusts_content() -> rx.Component: """Trust selection page content.""" return selection_page_content( title="Trust Selection", description="Select NHS trusts to include (leave empty for all trusts)", items=State.filtered_trusts, selected_items=State.selected_trusts, toggle_handler=State.toggle_trust, select_all_handler=State.select_all_trusts, clear_handler=State.clear_trusts, count_text=State.trust_selection_count, search_value=State.trust_search, search_handler=State.set_trust_search, clear_search_handler=State.clear_trust_search, search_result_text=State.trust_search_result_count, page_id="trusts", ) def directories_content() -> rx.Component: """Directory selection page content.""" return selection_page_content( title="Directory Selection", description="Select medical directories/specialties to include (leave empty for all)", items=State.filtered_directories, selected_items=State.selected_directories, toggle_handler=State.toggle_directory, select_all_handler=State.select_all_directories, clear_handler=State.clear_directories, count_text=State.directory_selection_count, search_value=State.directory_search, search_handler=State.set_directory_search, clear_search_handler=State.clear_directory_search, search_result_text=State.directory_search_result_count, page_id="directories", ) # ============================================================================= # Page Definitions # ============================================================================= def index() -> rx.Component: """Home page.""" return main_layout( content_area(home_content(), page_title="Home"), current_page="home", ) def drugs_page() -> rx.Component: """Drug selection page.""" return main_layout( content_area(drugs_content(), page_title=""), current_page="drugs", ) def trusts_page() -> rx.Component: """Trust selection page.""" return main_layout( content_area(trusts_content(), page_title=""), current_page="trusts", ) def directories_page() -> rx.Component: """Directory selection page.""" return main_layout( content_area(directories_content(), page_title=""), current_page="directories", ) # ============================================================================= # App Configuration # ============================================================================= app = rx.App( theme=rx.theme( accent_color="blue", gray_color="slate", radius="medium", ), ) # Add pages app.add_page(index, route="/", title="Home | NHS HCD Analysis") app.add_page(drugs_page, route="/drugs", title="Drug Selection | NHS HCD Analysis") app.add_page(trusts_page, route="/trusts", title="Trust Selection | NHS HCD Analysis") app.add_page(directories_page, route="/directories", title="Directory Selection | NHS HCD Analysis")