diff --git a/IMPLEMENTATION_PLAN.md b/IMPLEMENTATION_PLAN.md
index bca0321..b0c0612 100644
--- a/IMPLEMENTATION_PLAN.md
+++ b/IMPLEMENTATION_PLAN.md
@@ -103,15 +103,20 @@ cd pathways_app && timeout 60 python -m reflex run 2>&1 | head -30
## Phase 3: Reflex Integration
### 3.1 Update AppState
-- [ ] Replace date picker state with dropdown state:
+- [x] Replace date picker state with dropdown state:
- `selected_initiated: str = "all"` ("all", "1yr", "2yr")
- `selected_last_seen: str = "6mo"` ("6mo", "12mo")
-- [ ] Add `date_filter_id` computed property: `f"{selected_initiated}_{selected_last_seen}"`
-- [ ] Rewrite `load_pathway_data()` to query `pathway_nodes` table:
+ - Added `initiated_options` and `last_seen_options` for dropdown rendering
+ - Added `set_initiated_filter()` and `set_last_seen_filter()` event handlers
+- [x] Add `date_filter_id` computed property: `f"{selected_initiated}_{selected_last_seen}"`
+- [x] Rewrite `load_pathway_data()` to query `pathway_nodes` table:
- Base filter: `WHERE date_filter_id = ?`
- Trust/directory/drug filters on denormalized columns
-- [ ] Add `recalculate_parent_totals()` for filtered hierarchies
-- [ ] Update KPI calculations from root node data
+ - Updated all filter handlers to call `load_pathway_data()` instead of `apply_filters()`
+- [x] Add `recalculate_parent_totals()` for filtered hierarchies
+- [x] Update KPI calculations from root node data
+ - KPIs now extracted from root node (level 0) in pathway_nodes
+ - `unique_patients`, `total_cost`, `total_drugs` updated from query results
### 3.2 Update Icicle Figure
- [ ] Update `icicle_figure` computed property to use all pathway_nodes columns
diff --git a/pathways_app/pathways_app.py b/pathways_app/pathways_app.py
index 5526fe1..e4eee2b 100644
--- a/pathways_app/pathways_app.py
+++ b/pathways_app/pathways_app.py
@@ -1,2168 +1,2277 @@
"""
-NHS High-Cost Drug Patient Pathway Analysis Tool - Reflex Application.
+HCD Analysis v2 - Redesigned Reflex Application.
-This is the main Reflex application module containing state management
-and page components for the pathway analysis tool.
+Single-page dashboard with reactive filtering and real-time chart updates.
+Design reference: DESIGN_SYSTEM.md
"""
-import reflex as rx
-from datetime import date, timedelta
-from typing import Optional
-import pandas as pd
-import numpy as np
+from datetime import datetime, timedelta
from pathlib import Path
+from typing import Any
+
import plotly.graph_objects as go
-import traceback
-import os
+import reflex as rx
-from rxconfig import config
-from pathways_app.components.layout import main_layout, content_area
+from pathways_app.styles import (
+ Colors,
+ Typography,
+ Spacing,
+ Radii,
+ Shadows,
+ Transitions,
+ TOP_BAR_HEIGHT,
+ PAGE_MAX_WIDTH,
+ PAGE_PADDING,
+ card_style,
+ input_style,
+ text_h1,
+ text_h3,
+ text_caption,
+ button_ghost_style,
+ kpi_card_style,
+ kpi_value_style,
+ kpi_label_style,
+)
-# NHS Color constants
-NHS_BLUE = "rgb(0, 94, 184)"
-NHS_DARK_BLUE = "rgb(0, 48, 135)"
+# =============================================================================
+# State
+# =============================================================================
-# Supported file extensions
-SUPPORTED_EXTENSIONS = [".csv", ".parquet", ".pq"]
-
-
-class State(rx.State):
+class AppState(rx.State):
"""
- Application state for the NHS High-Cost Drug Patient Pathway Analysis Tool.
+ Application state for HCD Analysis v2.
- Manages all filter variables, reference data, and analysis state.
- This corresponds to the AnalysisFilters dataclass in core/models.py
- but is adapted for Reflex's reactive state system.
+ This is a minimal placeholder state for the app skeleton.
+ Will be expanded in Phase 3 with full filter state and data management.
"""
- # Date filter state
- start_date: str = "" # ISO format YYYY-MM-DD
- end_date: str = ""
- last_seen_date: str = ""
+ # =========================================================================
+ # Data State Variables
+ # =========================================================================
- # Selection filters (list of selected items)
- selected_trusts: list[str] = []
- selected_drugs: list[str] = []
- selected_directories: list[str] = []
-
- # Analysis parameters
- minimum_patients: int = 0
- custom_title: str = ""
-
- # Reference data (available options loaded from CSV/SQLite)
- available_trusts: list[str] = []
- available_drugs: list[str] = []
- available_directories: list[str] = []
-
- # Drug default selections (Include=1 in include.csv)
- default_drugs: list[str] = []
-
- # Analysis state
- analysis_running: bool = False
- status_message: str = ""
+ # Data loading status
+ data_loaded: bool = False
+ total_records: int = 0
+ chart_loading: bool = False
error_message: str = ""
- # Chart state - the Plotly figure
- chart_data: go.Figure = go.Figure()
- has_chart: bool = False
+ # Data freshness tracking
+ last_updated: str = "" # ISO format timestamp of last data load
- # Data source state
- data_file_path: str = ""
- data_source: str = "file" # "file", "sqlite", "snowflake"
- data_loaded: bool = False
- data_row_count: int = 0
+ # Raw data storage - list of dicts (Reflex-friendly)
+ # Each dict represents a patient record with keys like:
+ # UPID, Drug Name, Intervention Date, Price Actual, Directory, etc.
+ raw_data: list[dict[str, Any]] = []
- # Snowflake connection state
- snowflake_available: bool = False
- snowflake_configured: bool = False
- snowflake_connected: bool = False
+ # Latest date in dataset (detected on load, used for "to" date defaults)
+ latest_date_in_data: str = ""
- # File upload state
- uploaded_file_name: str = ""
- uploaded_file_size: int = 0 # bytes
- file_upload_error: str = ""
- file_upload_success: bool = False
- file_processing: bool = False
+ # =========================================================================
+ # UI State Variables
+ # =========================================================================
- # SQLite database state
- sqlite_available: bool = False
- sqlite_row_count: int = 0
- sqlite_patient_count: int = 0
+ # Placeholder for current chart type (for top bar tabs)
+ current_chart: str = "icicle"
- # Search/filter state for selection pages
+ # =========================================================================
+ # Filter State Variables
+ # =========================================================================
+
+ # Date filter dropdowns (replaces date pickers)
+ # These map to pathway_date_filters table: all_6mo, all_12mo, 1yr_6mo, etc.
+ selected_initiated: str = "all" # "all", "1yr", "2yr"
+ selected_last_seen: str = "6mo" # "6mo", "12mo"
+
+ # Available options for date filter dropdowns
+ initiated_options: list[dict[str, str]] = [
+ {"value": "all", "label": "All years"},
+ {"value": "1yr", "label": "Last 1 year"},
+ {"value": "2yr", "label": "Last 2 years"},
+ ]
+ last_seen_options: list[dict[str, str]] = [
+ {"value": "6mo", "label": "Last 6 months"},
+ {"value": "12mo", "label": "Last 12 months"},
+ ]
+
+ # Legacy date filter state (kept for backwards compatibility, will be removed)
+ # Filter toggle state
+ initiated_filter_enabled: bool = False
+ last_seen_filter_enabled: bool = True
+
+ # Date filter values (ISO format strings YYYY-MM-DD)
+ # Initiated filter: Defaults empty (filter is OFF by default)
+ initiated_from_date: str = ""
+ initiated_to_date: str = ""
+
+ # Last Seen filter: Defaults to last 6 months (filter is ON by default)
+ # These will be updated on data load to use actual latest date
+ last_seen_from_date: str = (datetime.now() - timedelta(days=180)).strftime("%Y-%m-%d")
+ last_seen_to_date: str = datetime.now().strftime("%Y-%m-%d")
+
+ # Available options for dropdowns (populated from data in Phase 3)
+ available_drugs: list[str] = ["Drug A", "Drug B", "Drug C", "Drug D", "Drug E"]
+ available_indications: list[str] = ["Indication 1", "Indication 2", "Indication 3"]
+ available_directorates: list[str] = ["Medical", "Surgical", "Oncology", "Rheumatology"]
+
+ # Selected items (empty = all)
+ selected_drugs: list[str] = []
+ selected_indications: list[str] = []
+ selected_directorates: list[str] = []
+
+ # Search text for dropdowns
drug_search: str = ""
- trust_search: str = ""
- directory_search: str = ""
+ indication_search: str = ""
+ directorate_search: str = ""
- # Export state
- last_export_path: str = ""
- export_message: str = ""
- export_error: str = ""
+ # Dropdown visibility state
+ drug_dropdown_open: bool = False
+ indication_dropdown_open: bool = False
+ directorate_dropdown_open: bool = False
- # Indication validation state
- indication_validation_enabled: bool = True
- indication_validation_running: bool = False
- indication_validation_results: dict = {} # drug_name -> {total, matched, rate}
- indication_validation_summary: str = ""
+ # Event handlers for filter toggles
+ def toggle_initiated_filter(self):
+ """Toggle initiated date filter on/off."""
+ self.initiated_filter_enabled = not self.initiated_filter_enabled
+ if self.data_loaded:
+ self.apply_filters()
- # Store the underlying data for export
- _analysis_data: pd.DataFrame = pd.DataFrame()
+ def toggle_last_seen_filter(self):
+ """Toggle last seen date filter on/off."""
+ self.last_seen_filter_enabled = not self.last_seen_filter_enabled
+ if self.data_loaded:
+ self.apply_filters()
- def _set_default_dates(self):
- """Set default date values based on typical analysis period."""
- today = date.today()
- one_year_ago = today - timedelta(days=365)
+ # Event handlers for date changes
+ def set_initiated_from(self, value: str):
+ """Set initiated from date."""
+ self.initiated_from_date = value
+ if self.data_loaded:
+ self.apply_filters()
- self.start_date = one_year_ago.isoformat()
- self.end_date = today.isoformat()
- self.last_seen_date = one_year_ago.isoformat()
+ def set_initiated_to(self, value: str):
+ """Set initiated to date."""
+ self.initiated_to_date = value
+ if self.data_loaded:
+ self.apply_filters()
- def load_reference_data(self):
+ def set_last_seen_from(self, value: str):
+ """Set last seen from date."""
+ self.last_seen_from_date = value
+ if self.data_loaded:
+ self.apply_filters()
+
+ def set_last_seen_to(self, value: str):
+ """Set last seen to date."""
+ self.last_seen_to_date = value
+ if self.data_loaded:
+ self.apply_filters()
+
+ # Event handlers for date filter dropdowns (new pathway_nodes approach)
+ def set_initiated_filter(self, value: str):
+ """Set initiated filter dropdown value."""
+ self.selected_initiated = value
+ if self.data_loaded:
+ self.load_pathway_data()
+
+ def set_last_seen_filter(self, value: str):
+ """Set last seen filter dropdown value."""
+ self.selected_last_seen = value
+ if self.data_loaded:
+ self.load_pathway_data()
+
+ # Computed property for date filter ID
+ @rx.var
+ def date_filter_id(self) -> str:
"""
- Load reference data from CSV files.
+ Compute the date_filter_id from selected_initiated and selected_last_seen.
- This loads the available drugs, trusts, and directories
- that can be selected in the filters.
+ Returns IDs like: all_6mo, all_12mo, 1yr_6mo, 1yr_12mo, 2yr_6mo, 2yr_12mo
+ These match the pathway_date_filters table.
"""
- data_dir = Path("data")
+ return f"{self.selected_initiated}_{self.selected_last_seen}"
- # Load drugs from include.csv
- try:
- drugs_df = pd.read_csv(data_dir / "include.csv")
- self.available_drugs = sorted(drugs_df.iloc[:, 0].astype(str).tolist())
- # Get default selections (Include=1)
- if "Include" in drugs_df.columns:
- self.default_drugs = drugs_df[drugs_df["Include"] == 1].iloc[:, 0].astype(str).tolist()
- self.selected_drugs = self.default_drugs.copy()
- self.status_message = f"Loaded {len(self.available_drugs)} drugs"
- except Exception as e:
- self.error_message = f"Failed to load drugs: {e}"
+ # Event handlers for search
+ def set_drug_search(self, value: str):
+ """Update drug search text."""
+ self.drug_search = value
- # Load trusts from defaultTrusts.csv
- try:
- trusts_df = pd.read_csv(data_dir / "defaultTrusts.csv")
- self.available_trusts = sorted(trusts_df.iloc[:, 0].astype(str).tolist())
- # By default, no trusts selected (include all)
- self.selected_trusts = []
- except Exception as e:
- self.error_message = f"Failed to load trusts: {e}"
+ def set_indication_search(self, value: str):
+ """Update indication search text."""
+ self.indication_search = value
- # Load directories from directory_list.csv
- try:
- dirs_df = pd.read_csv(data_dir / "directory_list.csv")
- self.available_directories = sorted(dirs_df.iloc[:, 0].astype(str).tolist())
- # By default, no directories selected (include all)
- self.selected_directories = []
- except Exception as e:
- self.error_message = f"Failed to load directories: {e}"
+ def set_directorate_search(self, value: str):
+ """Update directorate search text."""
+ self.directorate_search = value
- # Set default dates
- self._set_default_dates()
+ # Event handlers for dropdown visibility
+ def toggle_drug_dropdown(self):
+ """Toggle drug dropdown visibility."""
+ self.drug_dropdown_open = not self.drug_dropdown_open
+ # Close other dropdowns
+ self.indication_dropdown_open = False
+ self.directorate_dropdown_open = False
- # Check Snowflake availability
- try:
- from data_processing.snowflake_connector import is_snowflake_available, is_snowflake_configured
- self.snowflake_available = is_snowflake_available()
- self.snowflake_configured = is_snowflake_configured()
- except ImportError:
- self.snowflake_available = False
- self.snowflake_configured = False
+ def toggle_indication_dropdown(self):
+ """Toggle indication dropdown visibility."""
+ self.indication_dropdown_open = not self.indication_dropdown_open
+ # Close other dropdowns
+ self.drug_dropdown_open = False
+ self.directorate_dropdown_open = False
- # Check SQLite database status
- self.check_sqlite_status()
+ def toggle_directorate_dropdown(self):
+ """Toggle directorate dropdown visibility."""
+ self.directorate_dropdown_open = not self.directorate_dropdown_open
+ # Close other dropdowns
+ self.drug_dropdown_open = False
+ self.indication_dropdown_open = False
- # Auto-select best data source
- if self.sqlite_available and self.sqlite_row_count > 0:
- self.data_source = "sqlite"
- elif self.snowflake_configured:
- self.data_source = "snowflake"
- else:
- self.data_source = "file"
-
- # Date setters
- def set_start_date(self, value: str):
- """Set the start date for analysis."""
- self.start_date = value
-
- def set_end_date(self, value: str):
- """Set the end date for analysis."""
- self.end_date = value
-
- def set_last_seen_date(self, value: str):
- """Set the last seen date filter."""
- self.last_seen_date = value
-
- # Selection setters
- def set_selected_trusts(self, trusts: list[str]):
- """Set the selected NHS trusts."""
- self.selected_trusts = trusts
-
- def toggle_trust(self, trust: str):
- """Toggle a trust selection."""
- if trust in self.selected_trusts:
- self.selected_trusts = [t for t in self.selected_trusts if t != trust]
- else:
- self.selected_trusts = self.selected_trusts + [trust]
-
- def select_all_trusts(self):
- """Select all available trusts."""
- self.selected_trusts = self.available_trusts.copy()
-
- def clear_trusts(self):
- """Clear all trust selections."""
- self.selected_trusts = []
-
- def set_selected_drugs(self, drugs: list[str]):
- """Set the selected drugs."""
- self.selected_drugs = drugs
+ def close_all_dropdowns(self):
+ """Close all dropdowns."""
+ self.drug_dropdown_open = False
+ self.indication_dropdown_open = False
+ self.directorate_dropdown_open = False
+ # Event handlers for item selection
def toggle_drug(self, drug: str):
"""Toggle a drug selection."""
if drug in self.selected_drugs:
self.selected_drugs = [d for d in self.selected_drugs if d != drug]
else:
self.selected_drugs = self.selected_drugs + [drug]
+ if self.data_loaded:
+ self.load_pathway_data()
+ def toggle_indication(self, indication: str):
+ """Toggle an indication selection."""
+ if indication in self.selected_indications:
+ self.selected_indications = [i for i in self.selected_indications if i != indication]
+ else:
+ self.selected_indications = self.selected_indications + [indication]
+ # Note: Indication filter not yet implemented at database level
+ # Will be added when indication-based filtering is required
+
+ def toggle_directorate(self, directorate: str):
+ """Toggle a directorate selection."""
+ if directorate in self.selected_directorates:
+ self.selected_directorates = [d for d in self.selected_directorates if d != directorate]
+ else:
+ self.selected_directorates = self.selected_directorates + [directorate]
+ if self.data_loaded:
+ self.load_pathway_data()
+
+ # Select/clear all handlers
def select_all_drugs(self):
"""Select all available drugs."""
self.selected_drugs = self.available_drugs.copy()
+ if self.data_loaded:
+ self.load_pathway_data()
- def select_default_drugs(self):
- """Select only the default drugs (Include=1)."""
- self.selected_drugs = self.default_drugs.copy()
-
- def clear_drugs(self):
+ def clear_all_drugs(self):
"""Clear all drug selections."""
self.selected_drugs = []
+ if self.data_loaded:
+ self.load_pathway_data()
- def set_selected_directories(self, directories: list[str]):
- """Set the selected directories."""
- self.selected_directories = directories
+ def select_all_indications(self):
+ """Select all available indications."""
+ self.selected_indications = self.available_indications.copy()
+ # Note: Indication filter not yet implemented at database level
- def toggle_directory(self, directory: str):
- """Toggle a directory selection."""
- if directory in self.selected_directories:
- self.selected_directories = [d for d in self.selected_directories if d != directory]
- else:
- self.selected_directories = self.selected_directories + [directory]
+ def clear_all_indications(self):
+ """Clear all indication selections."""
+ self.selected_indications = []
+ # Note: Indication filter not yet implemented at database level
- def select_all_directories(self):
- """Select all available directories."""
- self.selected_directories = self.available_directories.copy()
+ def select_all_directorates(self):
+ """Select all available directorates."""
+ self.selected_directorates = self.available_directorates.copy()
+ if self.data_loaded:
+ self.load_pathway_data()
- def clear_directories(self):
- """Clear all directory selections."""
- self.selected_directories = []
-
- # Analysis parameter setters
- def set_minimum_patients(self, value: int):
- """Set the minimum patients threshold."""
- self.minimum_patients = max(0, value)
-
- def set_minimum_patients_from_input(self, value: str):
- """Set minimum patients threshold from string input."""
- try:
- self.minimum_patients = max(0, int(value)) if value else 0
- except ValueError:
- pass # Ignore invalid input
-
- def set_minimum_patients_from_slider(self, values: list[float]):
- """Set minimum patients threshold from slider value (list)."""
- if values:
- self.minimum_patients = max(0, int(values[0]))
-
- def set_custom_title(self, value: str):
- """Set a custom title for the analysis."""
- self.custom_title = value
-
- # Data source methods
- def set_data_file_path(self, path: str):
- """Set the data file path for analysis."""
- self.data_file_path = path
-
- def set_data_source(self, source: str):
- """Set the data source type (file, sqlite, snowflake)."""
- if source in ("file", "sqlite", "snowflake"):
- self.data_source = source
-
- # Status methods
- def set_status(self, message: str):
- """Update the status message."""
- self.status_message = message
-
- def set_error(self, message: str):
- """Set an error message."""
- self.error_message = message
-
- def clear_error(self):
- """Clear the error message."""
- self.error_message = ""
-
- # File handling methods
- async def handle_file_upload(self, files: list[rx.UploadFile]):
- """
- Handle file upload for CSV/Parquet data files.
-
- This accepts uploaded files and processes them for analysis.
- """
- self.file_upload_error = ""
- self.file_upload_success = False
-
- if not files:
- self.file_upload_error = "No file selected"
- return
-
- file = files[0] # Take first file only
- file_name = file.filename
- file_ext = Path(file_name).suffix.lower()
-
- # Validate file extension
- if file_ext not in SUPPORTED_EXTENSIONS:
- self.file_upload_error = f"Unsupported file type: {file_ext}. Please upload CSV or Parquet files."
- return
-
- self.file_processing = True
- self.status_message = f"Processing {file_name}..."
- yield # Update UI
-
- try:
- # Read file content
- file_content = await file.read()
- file_size = len(file_content)
- self.uploaded_file_size = file_size
-
- # Save to uploads directory
- upload_dir = Path("data/uploads")
- upload_dir.mkdir(parents=True, exist_ok=True)
-
- upload_path = upload_dir / file_name
- with open(upload_path, "wb") as f:
- f.write(file_content)
-
- self.uploaded_file_name = file_name
- self.data_file_path = str(upload_path)
- self.data_source = "file"
- self.file_upload_success = True
-
- # Format file size for display
- if file_size < 1024:
- size_str = f"{file_size} bytes"
- elif file_size < 1024 * 1024:
- size_str = f"{file_size / 1024:.1f} KB"
- else:
- size_str = f"{file_size / (1024 * 1024):.1f} MB"
-
- self.status_message = f"Uploaded {file_name} ({size_str})"
-
- except Exception as e:
- self.file_upload_error = f"Upload failed: {str(e)}"
- self.file_upload_success = False
-
- finally:
- self.file_processing = False
-
- def clear_uploaded_file(self):
- """Clear the uploaded file and reset file state."""
- self.uploaded_file_name = ""
- self.uploaded_file_size = 0
- self.data_file_path = ""
- self.file_upload_success = False
- self.file_upload_error = ""
- self.status_message = "File cleared"
-
- def check_sqlite_status(self):
- """Check if SQLite database is available and get statistics."""
- try:
- from data_processing.database import default_db_manager
- from data_processing.patient_data import get_patient_data_stats
-
- if default_db_manager.exists:
- stats = get_patient_data_stats(default_db_manager)
- self.sqlite_available = stats.get("total_rows", 0) > 0
- self.sqlite_row_count = stats.get("total_rows", 0)
- self.sqlite_patient_count = stats.get("unique_patients", 0)
-
- if self.sqlite_available:
- self.status_message = f"SQLite database: {self.sqlite_row_count:,} rows, {self.sqlite_patient_count:,} patients"
- else:
- self.status_message = "SQLite database exists but has no data"
- else:
- self.sqlite_available = False
- self.sqlite_row_count = 0
- self.sqlite_patient_count = 0
- self.status_message = "SQLite database not found"
- except ImportError:
- self.sqlite_available = False
- self.status_message = "Data processing module not available"
- except Exception as e:
- self.sqlite_available = False
- self.status_message = f"Error checking SQLite: {str(e)}"
-
- def use_sqlite_source(self):
- """Set data source to SQLite database."""
- self.data_source = "sqlite"
- self.data_file_path = ""
- self.status_message = "Using SQLite database as data source"
-
- def use_file_source(self):
- """Set data source to uploaded file."""
- if self.uploaded_file_name:
- self.data_source = "file"
- self.status_message = f"Using uploaded file: {self.uploaded_file_name}"
- else:
- self.status_message = "No file uploaded. Please upload a file first."
-
- def use_snowflake_source(self):
- """Set data source to Snowflake (if available)."""
- if self.snowflake_configured:
- self.data_source = "snowflake"
- self.status_message = "Using Snowflake as data source"
- else:
- self.status_message = "Snowflake is not configured. Check config/snowflake.toml"
-
- @rx.var
- def data_source_display(self) -> str:
- """Human-readable data source description."""
- if self.data_source == "file":
- if self.uploaded_file_name:
- return f"File: {self.uploaded_file_name}"
- return "File: No file selected"
- elif self.data_source == "sqlite":
- if self.sqlite_available:
- return f"SQLite: {self.sqlite_row_count:,} rows"
- return "SQLite: Not available"
- elif self.data_source == "snowflake":
- if self.snowflake_configured:
- return "Snowflake: Ready"
- return "Snowflake: Not configured"
- return "Unknown"
-
- @rx.var
- def file_size_display(self) -> str:
- """Human-readable file size."""
- if self.uploaded_file_size == 0:
- return ""
- if self.uploaded_file_size < 1024:
- return f"{self.uploaded_file_size} bytes"
- elif self.uploaded_file_size < 1024 * 1024:
- return f"{self.uploaded_file_size / 1024:.1f} KB"
- else:
- return f"{self.uploaded_file_size / (1024 * 1024):.1f} MB"
-
- # Validation
- def validate_filters(self) -> list[str]:
- """
- Validate the current filter configuration.
-
- Returns a list of error messages (empty if valid).
- """
- errors = []
-
- # Check dates are set
- if not self.start_date:
- errors.append("Start date is required")
- if not self.end_date:
- errors.append("End date is required")
- if not self.last_seen_date:
- errors.append("Last seen date is required")
-
- # Check date order
- if self.start_date and self.end_date:
- if self.end_date < self.start_date:
- errors.append("End date cannot be before start date")
-
- if self.last_seen_date and self.end_date:
- if self.last_seen_date > self.end_date:
- errors.append("Last seen date is after end date (would exclude all patients)")
-
- # Check minimum patients
- if self.minimum_patients < 0:
- errors.append("Minimum patients cannot be negative")
-
- # Check at least some drugs are selected (warning, not error)
- # Empty selection means "include all"
-
- return errors
-
- @rx.var
- def filter_summary(self) -> str:
- """Generate a summary of current filter settings."""
- lines = []
-
- if self.start_date and self.end_date:
- lines.append(f"Date range: {self.start_date} to {self.end_date}")
- if self.last_seen_date:
- lines.append(f"Last seen after: {self.last_seen_date}")
- lines.append(f"Minimum patients: {self.minimum_patients}")
-
- if self.selected_trusts:
- lines.append(f"Trusts: {len(self.selected_trusts)} selected")
- else:
- lines.append("Trusts: All")
-
- if self.selected_drugs:
- lines.append(f"Drugs: {len(self.selected_drugs)} selected")
- else:
- lines.append("Drugs: All")
-
- if self.selected_directories:
- lines.append(f"Directories: {len(self.selected_directories)} selected")
- else:
- lines.append("Directories: All")
-
- return "\n".join(lines)
-
- @rx.var
- def display_title(self) -> str:
- """Generate the display title for the analysis."""
- if self.custom_title:
- return self.custom_title
- if self.start_date and self.end_date:
- return f"Patients initiated from {self.start_date} to {self.end_date}"
- return "Patient Pathway Analysis"
-
- @rx.var
- def drug_selection_count(self) -> str:
- """Display count of selected drugs."""
- return f"{len(self.selected_drugs)} of {len(self.available_drugs)} drugs selected"
-
- @rx.var
- def trust_selection_count(self) -> str:
- """Display count of selected trusts."""
- if not self.selected_trusts:
- return f"All {len(self.available_trusts)} trusts (none selected)"
- return f"{len(self.selected_trusts)} of {len(self.available_trusts)} trusts selected"
-
- @rx.var
- def directory_selection_count(self) -> str:
- """Display count of selected directories."""
- if not self.selected_directories:
- return f"All {len(self.available_directories)} directories (none selected)"
- return f"{len(self.selected_directories)} of {len(self.available_directories)} directories selected"
-
- # Search setters
- def set_drug_search(self, value: str):
- """Set the drug search filter text."""
- self.drug_search = value
-
- def set_trust_search(self, value: str):
- """Set the trust search filter text."""
- self.trust_search = value
-
- def set_directory_search(self, value: str):
- """Set the directory search filter text."""
- self.directory_search = value
-
- def clear_drug_search(self):
- """Clear the drug search filter."""
- self.drug_search = ""
-
- def clear_trust_search(self):
- """Clear the trust search filter."""
- self.trust_search = ""
-
- def clear_directory_search(self):
- """Clear the directory search filter."""
- self.directory_search = ""
+ def clear_all_directorates(self):
+ """Clear all directorate selections."""
+ self.selected_directorates = []
+ if self.data_loaded:
+ self.load_pathway_data()
+ # Computed vars for filtered options based on search
@rx.var
def filtered_drugs(self) -> list[str]:
- """Get the list of drugs filtered by search text."""
+ """Return drugs filtered by search text."""
if not self.drug_search:
return self.available_drugs
search_lower = self.drug_search.lower()
return [d for d in self.available_drugs if search_lower in d.lower()]
@rx.var
- def filtered_trusts(self) -> list[str]:
- """Get the list of trusts filtered by search text."""
- if not self.trust_search:
- return self.available_trusts
- search_lower = self.trust_search.lower()
- return [t for t in self.available_trusts if search_lower in t.lower()]
+ def filtered_indications(self) -> list[str]:
+ """Return indications filtered by search text."""
+ if not self.indication_search:
+ return self.available_indications
+ search_lower = self.indication_search.lower()
+ return [i for i in self.available_indications if search_lower in i.lower()]
@rx.var
- def filtered_directories(self) -> list[str]:
- """Get the list of directories filtered by search text."""
- if not self.directory_search:
- return self.available_directories
- search_lower = self.directory_search.lower()
- return [d for d in self.available_directories if search_lower in d.lower()]
+ def filtered_directorates(self) -> list[str]:
+ """Return directorates filtered by search text."""
+ if not self.directorate_search:
+ return self.available_directorates
+ search_lower = self.directorate_search.lower()
+ return [d for d in self.available_directorates if search_lower in d.lower()]
+ # Computed vars for selection counts
@rx.var
- def drug_search_result_count(self) -> str:
- """Display count of drugs matching search."""
+ def drug_selection_text(self) -> str:
+ """Display text for drug selection count."""
+ count = len(self.selected_drugs)
total = len(self.available_drugs)
- filtered = len(self.filtered_drugs)
- if not self.drug_search:
- return f"{total} drugs"
- return f"Showing {filtered} of {total} drugs"
+ if count == 0:
+ return f"All {total} drugs"
+ return f"{count} of {total} selected"
@rx.var
- def trust_search_result_count(self) -> str:
- """Display count of trusts matching search."""
- total = len(self.available_trusts)
- filtered = len(self.filtered_trusts)
- if not self.trust_search:
- return f"{total} trusts"
- return f"Showing {filtered} of {total} trusts"
+ def indication_selection_text(self) -> str:
+ """Display text for indication selection count."""
+ count = len(self.selected_indications)
+ total = len(self.available_indications)
+ if count == 0:
+ return f"All {total} indications"
+ return f"{count} of {total} selected"
@rx.var
- def directory_search_result_count(self) -> str:
- """Display count of directories matching search."""
- total = len(self.available_directories)
- filtered = len(self.filtered_directories)
- if not self.directory_search:
- return f"{total} directories"
- return f"Showing {filtered} of {total} directories"
+ def directorate_selection_text(self) -> str:
+ """Display text for directorate selection count."""
+ count = len(self.selected_directorates)
+ total = len(self.available_directorates)
+ if count == 0:
+ return f"All {total} directorates"
+ return f"{count} of {total} selected"
- # Analysis methods
- def run_analysis(self):
- """
- Run the patient pathway analysis with current filter settings.
+ # =========================================================================
+ # KPI State Variables
+ # =========================================================================
- This is an async generator that yields state updates for progress indication.
- Uses the existing analysis pipeline from tools/dashboard_gui.py.
+ # Placeholder KPI values (will be computed from filtered data in Phase 3)
+ # For now, these are static placeholders that demonstrate reactivity
+ unique_patients: int = 0
+ total_drugs: int = 0
+ total_cost: float = 0.0
+ indication_match_rate: float = 0.0
+
+ # Computed KPI display values
+ @rx.var
+ def unique_patients_display(self) -> str:
+ """Format unique patients count for display."""
+ if self.unique_patients == 0:
+ return "—"
+ return f"{self.unique_patients:,}"
+
+ @rx.var
+ def total_drugs_display(self) -> str:
+ """Format total drugs count for display."""
+ if self.total_drugs == 0:
+ return "—"
+ return f"{self.total_drugs:,}"
+
+ @rx.var
+ def total_cost_display(self) -> str:
+ """Format total cost for display."""
+ if self.total_cost == 0.0:
+ return "—"
+ # Format as £X.XM or £X.XK depending on magnitude
+ if self.total_cost >= 1_000_000:
+ return f"£{self.total_cost / 1_000_000:.1f}M"
+ if self.total_cost >= 1_000:
+ return f"£{self.total_cost / 1_000:.1f}K"
+ return f"£{self.total_cost:,.0f}"
+
+ @rx.var
+ def match_rate_display(self) -> str:
+ """Format indication match rate for display."""
+ if self.indication_match_rate == 0.0:
+ return "—"
+ return f"{self.indication_match_rate:.0f}%"
+
+ @rx.var
+ def last_updated_display(self) -> str:
+ """Format last updated timestamp for display in top bar."""
+ if not self.last_updated:
+ return "Never"
+ try:
+ # Parse ISO format timestamp
+ dt = datetime.fromisoformat(self.last_updated)
+ now = datetime.now()
+ diff = now - dt
+
+ if diff.days == 0:
+ if diff.seconds < 60:
+ return "Just now"
+ if diff.seconds < 3600:
+ mins = diff.seconds // 60
+ return f"{mins}m ago"
+ hours = diff.seconds // 3600
+ return f"{hours}h ago"
+ if diff.days == 1:
+ return "Yesterday"
+ if diff.days < 7:
+ return f"{diff.days}d ago"
+ return dt.strftime("%d %b %Y")
+ except (ValueError, TypeError):
+ return "Unknown"
+
+ # =========================================================================
+ # Filter Logic Methods
+ # =========================================================================
+
+ def apply_filters(self):
"""
- # Validate filters first
- errors = self.validate_filters()
- if errors:
- self.error_message = "Validation errors:\n" + "\n".join(errors)
+ Apply current filter state to data and update KPI values.
+
+ This method queries the SQLite database with the current filter settings:
+ - Initiated date filter: filters patients whose FIRST intervention date is within range
+ - Last Seen date filter: filters patients whose LAST intervention date is within range
+ - Drug filter: filters by selected drugs (empty = all)
+ - Directorate filter: filters by selected directorates (empty = all)
+
+ Note: Indication filter is not implemented at the database level since indications
+ are derived from drug mappings, not stored directly in fact_interventions.
+
+ Updates: unique_patients, total_drugs, total_cost, and filtered_record_count
+ """
+ import sqlite3
+
+ db_path = Path("data/pathways.db")
+
+ if not db_path.exists():
+ self.error_message = "Unable to connect to database. Please ensure data has been loaded."
return
- self.analysis_running = True
- self.error_message = ""
- self.status_message = "Starting analysis..."
- self.has_chart = False
- yield # Update UI to show running state
+ try:
+ conn = sqlite3.connect(str(db_path))
+ cursor = conn.cursor()
+
+ # Build the filter query dynamically
+ # We use a CTE to compute first_seen and last_seen dates per patient,
+ # then filter based on those dates if date filters are enabled
+
+ where_clauses = []
+ params = []
+
+ # Drug filter (if any drugs selected)
+ if self.selected_drugs:
+ placeholders = ",".join("?" * len(self.selected_drugs))
+ where_clauses.append(f"drug_name_std IN ({placeholders})")
+ params.extend(self.selected_drugs)
+
+ # Directorate filter (if any directorates selected)
+ if self.selected_directorates:
+ placeholders = ",".join("?" * len(self.selected_directorates))
+ where_clauses.append(f"directory IN ({placeholders})")
+ params.extend(self.selected_directorates)
+
+ # Build WHERE clause for base data filtering
+ base_where = ""
+ if where_clauses:
+ base_where = "WHERE " + " AND ".join(where_clauses)
+
+ # Date filter logic:
+ # - "Initiated" filters patients whose FIRST intervention is within the date range
+ # - "Last Seen" filters patients whose LAST intervention is within the date range
+ # We need to use a subquery to compute patient-level date ranges
+
+ having_clauses = []
+ having_params = []
+
+ # Initiated filter (when enabled)
+ if self.initiated_filter_enabled and self.initiated_from_date:
+ having_clauses.append("first_seen_date >= ?")
+ having_params.append(self.initiated_from_date)
+ if self.initiated_filter_enabled and self.initiated_to_date:
+ having_clauses.append("first_seen_date <= ?")
+ having_params.append(self.initiated_to_date)
+
+ # Last Seen filter (when enabled)
+ if self.last_seen_filter_enabled and self.last_seen_from_date:
+ having_clauses.append("last_seen_date >= ?")
+ having_params.append(self.last_seen_from_date)
+ if self.last_seen_filter_enabled and self.last_seen_to_date:
+ having_clauses.append("last_seen_date <= ?")
+ having_params.append(self.last_seen_to_date)
+
+ having_clause = ""
+ if having_clauses:
+ having_clause = "HAVING " + " AND ".join(having_clauses)
+
+ # Query to get filtered patient UPIDs
+ # This computes per-patient first/last seen dates and filters accordingly
+ patient_filter_query = f"""
+ WITH patient_dates AS (
+ SELECT
+ upid,
+ MIN(intervention_date) as first_seen_date,
+ MAX(intervention_date) as last_seen_date
+ FROM fact_interventions
+ {base_where}
+ GROUP BY upid
+ {having_clause}
+ )
+ SELECT upid FROM patient_dates
+ """
+
+ # Now get KPI values for filtered patients
+ kpi_query = f"""
+ WITH filtered_patients AS (
+ {patient_filter_query}
+ )
+ SELECT
+ COUNT(DISTINCT f.upid) as unique_patients,
+ COUNT(DISTINCT f.drug_name_std) as unique_drugs,
+ COALESCE(SUM(f.price_actual), 0) as total_cost,
+ COUNT(*) as record_count
+ FROM fact_interventions f
+ INNER JOIN filtered_patients fp ON f.upid = fp.upid
+ {base_where.replace('WHERE', 'AND') if base_where else ''}
+ """
+
+ # Combine all params: base params for CTE, having params, then base params again for final join
+ all_params = params + having_params
+ if where_clauses:
+ all_params.extend(params) # For the AND conditions in the final query
+
+ cursor.execute(kpi_query, all_params)
+ result = cursor.fetchone()
+
+ if result:
+ self.unique_patients = result[0] or 0
+ self.total_drugs = result[1] or 0
+ self.total_cost = float(result[2]) if result[2] else 0.0
+ # Note: filtered_record_count could be stored if needed
+
+ conn.close()
+ self.error_message = ""
+
+ # Update chart data with new filtered results
+ self.prepare_chart_data()
+
+ except sqlite3.Error as e:
+ self.error_message = f"Unable to filter data. Database error: {str(e)}"
+ except Exception as e:
+ self.error_message = f"An unexpected error occurred while filtering. Details: {str(e)}"
+
+ # =========================================================================
+ # Data Loading Methods
+ # =========================================================================
+
+ def load_data(self):
+ """
+ Load data from SQLite database on app initialization.
+
+ This method:
+ 1. Connects to the SQLite database (data/pathways.db)
+ 2. Loads available drugs, indications, directorates from actual data
+ 3. Detects the latest date in the dataset for "to" date defaults
+ 4. Updates total_records, last_updated, and data_loaded state
+ """
+ import sqlite3
+
+ db_path = Path("data/pathways.db")
+
+ if not db_path.exists():
+ self.error_message = "Database not found. Please ensure the data has been loaded (data/pathways.db)."
+ return
try:
- # Import analysis modules
- from core import AnalysisFilters, PathConfig, default_paths
- from data_processing.data_source import get_data
- from tools.dashboard_gui import generate_graph
+ conn = sqlite3.connect(str(db_path))
+ cursor = conn.cursor()
- # Get the data using fallback chain (cache -> Snowflake -> SQLite -> file)
- self.status_message = "Loading patient data..."
- yield
+ # Get total records
+ cursor.execute("SELECT COUNT(*) FROM fact_interventions")
+ self.total_records = cursor.fetchone()[0]
- # Build filter parameters
- trusts = self.selected_trusts if self.selected_trusts else self.available_trusts
- drugs = self.selected_drugs if self.selected_drugs else self.available_drugs
- directories = self.selected_directories if self.selected_directories else self.available_directories
-
- # Get data from the data source manager
- result = get_data(
- start_date=self.start_date,
- end_date=self.end_date,
- trusts=trusts,
- drugs=drugs,
- directories=directories,
- )
-
- if result.df is None or len(result.df) == 0:
- self.error_message = "No data available. Please check your data source configuration."
- self.analysis_running = False
+ if self.total_records == 0:
+ self.error_message = "The database is empty. No patient records found."
+ conn.close()
return
- self.data_source = result.source_type.value
- self.data_row_count = len(result.df)
- self.status_message = f"Loaded {self.data_row_count:,} rows from {self.data_source}"
- yield
+ # Get available drugs (distinct, sorted)
+ cursor.execute("""
+ SELECT DISTINCT drug_name_std
+ FROM fact_interventions
+ WHERE drug_name_std IS NOT NULL AND drug_name_std != ''
+ ORDER BY drug_name_std
+ """)
+ self.available_drugs = [row[0] for row in cursor.fetchall()]
- # Create AnalysisFilters object for generate_graph
- self.status_message = "Processing pathways..."
- yield
+ # Get available directories (distinct, sorted)
+ cursor.execute("""
+ SELECT DISTINCT directory
+ FROM fact_interventions
+ WHERE directory IS NOT NULL AND directory != ''
+ ORDER BY directory
+ """)
+ self.available_directorates = [row[0] for row in cursor.fetchall()]
- # Generate the chart data (without writing to file)
- # We'll create the figure data directly instead of calling generate_graph
- # which writes to file and opens browser
- fig_data = self._generate_chart_data(
- df=result.df,
- trusts=trusts,
- drugs=drugs,
- directories=directories,
- )
+ # Get available indications from ref_drug_indication_clusters
+ cursor.execute("""
+ SELECT DISTINCT indication
+ FROM ref_drug_indication_clusters
+ WHERE indication IS NOT NULL AND indication != ''
+ ORDER BY indication
+ """)
+ self.available_indications = [row[0] for row in cursor.fetchall()]
- if fig_data is not None:
- self.chart_data = fig_data
- self.has_chart = True
- self.status_message = f"Analysis complete! Showing {self.data_row_count:,} interventions."
- else:
- self.error_message = "No data found matching the selected filters."
- self.has_chart = False
+ # If no indications in reference table, use placeholder
+ if not self.available_indications:
+ self.available_indications = ["(No indications available)"]
+ # Get date range from data
+ cursor.execute("""
+ SELECT MIN(intervention_date), MAX(intervention_date)
+ FROM fact_interventions
+ """)
+ date_range = cursor.fetchone()
+ min_date, max_date = date_range
+
+ # Update latest_date_in_data and set "to" date defaults
+ if max_date:
+ self.latest_date_in_data = max_date
+ self.last_seen_to_date = max_date
+ self.initiated_to_date = max_date
+
+ # Set "from" date for last_seen filter (6 months before max_date)
+ max_dt = datetime.strptime(max_date, "%Y-%m-%d")
+ six_months_ago = max_dt - timedelta(days=180)
+ self.last_seen_from_date = six_months_ago.strftime("%Y-%m-%d")
+
+ # Get unique patient count for KPIs
+ cursor.execute("SELECT COUNT(DISTINCT upid) FROM fact_interventions")
+ self.unique_patients = cursor.fetchone()[0]
+
+ # Get unique drug count
+ self.total_drugs = len(self.available_drugs)
+
+ # Get total cost
+ cursor.execute("SELECT SUM(price_actual) FROM fact_interventions")
+ total_cost_result = cursor.fetchone()[0]
+ self.total_cost = float(total_cost_result) if total_cost_result else 0.0
+
+ conn.close()
+
+ # Set data_loaded and last_updated
+ self.data_loaded = True
+ self.last_updated = datetime.now().isoformat()
+ self.error_message = ""
+
+ # Load pre-computed pathway data for the default date filter
+ # This replaces apply_filters() which used dynamic calculation
+ self.load_pathway_data()
+
+ except sqlite3.Error as e:
+ self.error_message = f"Unable to load data. Database error: {str(e)}"
+ self.data_loaded = False
except Exception as e:
- self.error_message = f"Analysis failed: {str(e)}\n\n{traceback.format_exc()}"
- self.has_chart = False
+ self.error_message = f"Failed to load data. Please check the database file. Details: {str(e)}"
+ self.data_loaded = False
- finally:
- self.analysis_running = False
-
- yield # Final UI update
-
- def _generate_chart_data(
- self,
- df: pd.DataFrame,
- trusts: list[str],
- drugs: list[str],
- directories: list[str],
- ) -> Optional[go.Figure]:
+ def load_pathway_data(self):
"""
- Generate Plotly chart data from processed DataFrame.
+ Load pre-computed pathway data from pathway_nodes table.
- This replicates the core logic of generate_graph() and figure() but
- returns the figure dict instead of writing to file and opening browser.
- This is a workaround to avoid modifying generate_graph() internals
- (which is deferred to Phase 5).
+ This method queries the pathway_nodes table using the current date_filter_id
+ and applies trust/directory/drug filters. It replaces the dynamic calculation
+ approach with pre-computed data for faster performance.
+
+ Filters:
+ - date_filter_id: Computed from selected_initiated + selected_last_seen
+ - trust filter: Uses denormalized trust_name column
+ - directory filter: Uses denormalized directory column
+ - drug filter: Uses drug_sequence column with LIKE patterns
"""
- from core import default_paths
+ import sqlite3
- # Use the org_codes mapping
- org_codes = pd.read_csv(default_paths.org_codes_csv, index_col=1)
+ db_path = Path("data/pathways.db")
- # Make a copy to avoid modifying original
- df1 = df.copy()
+ if not db_path.exists():
+ self.error_message = "Database not found. Please ensure the data has been loaded (data/pathways.db)."
+ return
- # Create UPID + Treatment column for deduplication
- df1["UPIDTreatment"] = df1["UPID"] + df1["Drug Name"]
-
- # Map provider codes to names
- df1["Provider Code"] = df1["Provider Code"].map(org_codes["Name"])
-
- # Apply filters
- df1 = df1[
- (df1["Provider Code"].isin(trusts)) &
- (df1["Drug Name"].isin(drugs)) &
- (df1["Directory"].isin(directories))
- ]
-
- if len(df1) == 0:
- return None
-
- # Apply date filters
- df1 = df1[
- (df1["Intervention Date"] >= self.start_date) &
- (df1["Intervention Date"] <= self.end_date)
- ]
-
- if len(df1) == 0:
- return None
-
- # Add indication validation columns (if enabled and Snowflake available)
- df1 = self._add_indication_validation(df1)
-
- # Store filtered data for CSV export (now includes indication columns)
- self._analysis_data = df1.copy()
-
- # Build a simplified hierarchy for the icicle chart
- # Group by Trust -> Directory -> Drug to get patient counts
- hierarchy_data = self._build_hierarchy(df1, org_codes)
-
- if hierarchy_data.empty:
- return None
-
- # Apply minimum patients filter
- hierarchy_data = hierarchy_data[hierarchy_data['value'] >= self.minimum_patients]
-
- if hierarchy_data.empty:
- return None
-
- # Create the Plotly icicle figure
- fig = go.Figure(go.Icicle(
- labels=hierarchy_data['labels'].tolist(),
- ids=hierarchy_data['ids'].tolist(),
- parents=hierarchy_data['parents'].tolist(),
- values=hierarchy_data['value'].tolist(),
- branchvalues="total",
- marker=dict(
- colors=hierarchy_data['colour'].tolist() if 'colour' in hierarchy_data.columns else None,
- colorscale='Viridis',
- ),
- maxdepth=3,
- texttemplate='%{label}
Patients: %{value}',
- hovertemplate='%{label}
Patients: %{value}',
- ))
-
- # Set chart title
- title_text = self.custom_title if self.custom_title else f"Patients initiated {self.start_date} to {self.end_date}"
-
- fig.update_layout(
- margin=dict(t=60, l=1, r=1, b=60),
- title=f"Norfolk & Waveney ICS High-Cost Drug Patient Pathways - {title_text}",
- title_x=0.5,
- hoverlabel=dict(font_size=16),
- )
-
- # Return figure for rx.plotly()
- return fig
-
- def _build_hierarchy(self, df: pd.DataFrame, org_codes: pd.DataFrame) -> pd.DataFrame:
- """
- Build a hierarchical dataframe for icicle chart.
-
- Creates Trust -> Directory -> Drug hierarchy with patient counts.
- """
- # Create directory mapping from UPID
- directory_df = df[["UPID", "Directory"]].drop_duplicates("UPID").set_index("UPID")
-
- # Get unique patients per drug
- patient_drugs = df[["UPID", "Drug Name", "Provider Code", "Directory"]].drop_duplicates(subset=["UPID", "Drug Name"])
-
- # Build hierarchy: Trust -> Directory -> Drug
- rows = []
-
- # Root node
- total_patients = patient_drugs["UPID"].nunique()
- rows.append({
- 'parents': '',
- 'ids': 'N&WICS',
- 'labels': 'N&WICS',
- 'value': total_patients,
- 'colour': 1.0,
- })
-
- # Trust level
- trust_counts = patient_drugs.groupby("Provider Code")["UPID"].nunique().reset_index()
- trust_counts.columns = ["trust", "count"]
-
- for _, row in trust_counts.iterrows():
- trust = row["trust"]
- if pd.isna(trust):
- continue
- rows.append({
- 'parents': 'N&WICS',
- 'ids': f'N&WICS - {trust}',
- 'labels': trust,
- 'value': row["count"],
- 'colour': row["count"] / total_patients,
- })
-
- # Directory level (under each trust)
- trust_dir_counts = patient_drugs.groupby(["Provider Code", "Directory"])["UPID"].nunique().reset_index()
- trust_dir_counts.columns = ["trust", "directory", "count"]
-
- for _, row in trust_dir_counts.iterrows():
- trust = row["trust"]
- directory = row["directory"]
- if pd.isna(trust) or pd.isna(directory):
- continue
- trust_total = trust_counts[trust_counts["trust"] == trust]["count"].values
- trust_total = trust_total[0] if len(trust_total) > 0 else 1
- rows.append({
- 'parents': f'N&WICS - {trust}',
- 'ids': f'N&WICS - {trust} - {directory}',
- 'labels': directory,
- 'value': row["count"],
- 'colour': row["count"] / trust_total,
- })
-
- # Drug level (under each trust-directory)
- trust_dir_drug_counts = patient_drugs.groupby(["Provider Code", "Directory", "Drug Name"])["UPID"].nunique().reset_index()
- trust_dir_drug_counts.columns = ["trust", "directory", "drug", "count"]
-
- for _, row in trust_dir_drug_counts.iterrows():
- trust = row["trust"]
- directory = row["directory"]
- drug = row["drug"]
- if pd.isna(trust) or pd.isna(directory) or pd.isna(drug):
- continue
- dir_total = trust_dir_counts[
- (trust_dir_counts["trust"] == trust) &
- (trust_dir_counts["directory"] == directory)
- ]["count"].values
- dir_total = dir_total[0] if len(dir_total) > 0 else 1
- rows.append({
- 'parents': f'N&WICS - {trust} - {directory}',
- 'ids': f'N&WICS - {trust} - {directory} - {drug}',
- 'labels': drug,
- 'value': row["count"],
- 'colour': row["count"] / dir_total,
- })
-
- return pd.DataFrame(rows)
-
- def _add_indication_validation(self, df: pd.DataFrame) -> pd.DataFrame:
- """
- Add indication validation columns to the DataFrame.
-
- Adds columns:
- - Indication_Valid: Boolean indicating if patient has valid GP diagnosis
- - Indication_Source: "GP_SNOMED" | "NONE" | "NOT_CHECKED"
- - Indication_Cluster: The matched SNOMED cluster ID (if any)
-
- This requires Snowflake connectivity for GP record lookups.
- If Snowflake is not available, columns are added with "NOT_CHECKED" status.
- """
- # Initialize columns with default values
- df = df.copy()
- df["Indication_Valid"] = False
- df["Indication_Source"] = "NOT_CHECKED"
- df["Indication_Cluster"] = ""
-
- # Check if indication validation is enabled and Snowflake is available
- if not self.indication_validation_enabled:
- return df
+ self.chart_loading = True
try:
- from data_processing.snowflake_connector import (
- is_snowflake_available,
- is_snowflake_configured,
- get_connector,
- )
- from data_processing.diagnosis_lookup import (
- get_drug_cluster_ids,
- patient_has_indication,
- )
+ conn = sqlite3.connect(str(db_path))
+ conn.row_factory = sqlite3.Row # Enable column access by name
+ cursor = conn.cursor()
- if not is_snowflake_available() or not is_snowflake_configured():
- # Snowflake not available - can't validate indications
- self.indication_validation_summary = "Indication validation skipped (Snowflake not configured)"
- return df
+ # Build the date filter ID
+ filter_id = f"{self.selected_initiated}_{self.selected_last_seen}"
- self.indication_validation_running = True
+ # Build WHERE clause for filters
+ where_clauses = ["date_filter_id = ?"]
+ params = [filter_id]
- # Get unique patient-drug pairs
- patient_drug_pairs = df[["UPID", "Drug Name"]].drop_duplicates()
- total_pairs = len(patient_drug_pairs)
+ # Trust filter (if any directorates selected, they map to trust_name)
+ # Note: In the schema, trust_name is extracted from the hierarchy
+ # For now, we filter at the directory level since that's what users select
- # Cache drug clusters to avoid repeated lookups
- drug_clusters_cache = {}
+ # Directory filter (if any directorates selected)
+ if self.selected_directorates:
+ placeholders = ",".join("?" * len(self.selected_directorates))
+ where_clauses.append(f"(directory IN ({placeholders}) OR directory IS NULL)")
+ params.extend(self.selected_directorates)
- # Track results for summary
- validation_results = {} # drug -> {total, matched}
- connector = get_connector()
+ # Drug filter (if any drugs selected)
+ # Drug names appear in the drug_sequence column, use LIKE for matching
+ if self.selected_drugs:
+ drug_conditions = []
+ for drug in self.selected_drugs:
+ drug_conditions.append("drug_sequence LIKE ?")
+ params.append(f"%{drug}%")
+ where_clauses.append(f"({' OR '.join(drug_conditions)} OR drug_sequence IS NULL)")
- for idx, (_, row) in enumerate(patient_drug_pairs.iterrows()):
- upid = row["UPID"]
- drug_name = row["Drug Name"]
+ where_clause = " AND ".join(where_clauses)
- # Get drug clusters (cached)
- drug_upper = drug_name.upper() if drug_name else ""
- if drug_upper not in drug_clusters_cache:
- drug_clusters_cache[drug_upper] = get_drug_cluster_ids(drug_name)
+ # Query pathway nodes for the selected date filter
+ query = f"""
+ SELECT
+ parents, ids, labels, level, value,
+ cost, costpp, cost_pp_pa, colour,
+ first_seen, last_seen, first_seen_parent, last_seen_parent,
+ average_spacing, average_administered, avg_days,
+ trust_name, directory, drug_sequence
+ FROM pathway_nodes
+ WHERE {where_clause}
+ ORDER BY level, parents, ids
+ """
- cluster_ids = drug_clusters_cache[drug_upper]
+ cursor.execute(query, params)
+ rows = cursor.fetchall()
- # Initialize drug in results tracking
- if drug_upper not in validation_results:
- validation_results[drug_upper] = {"total": 0, "matched": 0, "name": drug_name}
+ if not rows:
+ # No data for this filter combination
+ self.chart_data = []
+ self.unique_patients = 0
+ self.total_drugs = 0
+ self.total_cost = 0.0
+ self.chart_loading = False
+ self.error_message = f"No pathway data found for filter: {filter_id}"
+ conn.close()
+ return
- validation_results[drug_upper]["total"] += 1
+ # Convert rows to chart_data format
+ chart_data = []
+ root_patients = 0
+ root_cost = 0.0
- if not cluster_ids:
- # No cluster mapping for this drug - mark as NONE
- mask = (df["UPID"] == upid) & (df["Drug Name"] == drug_name)
- df.loc[mask, "Indication_Source"] = "NONE"
+ for row in rows:
+ node = {
+ "parents": row["parents"] or "",
+ "ids": row["ids"] or "",
+ "labels": row["labels"] or "",
+ "value": row["value"] or 0,
+ "cost": float(row["cost"]) if row["cost"] else 0.0,
+ "costpp": float(row["costpp"]) if row["costpp"] else 0.0,
+ "colour": float(row["colour"]) if row["colour"] else 0.0,
+ # Additional fields for hover template
+ "first_seen": row["first_seen"] or "",
+ "last_seen": row["last_seen"] or "",
+ "first_seen_parent": row["first_seen_parent"] or "",
+ "last_seen_parent": row["last_seen_parent"] or "",
+ "average_spacing": row["average_spacing"] or "",
+ "cost_pp_pa": row["cost_pp_pa"] or "",
+ }
+ chart_data.append(node)
+
+ # Track root node for KPIs (level 0)
+ if row["level"] == 0:
+ root_patients = row["value"] or 0
+ root_cost = float(row["cost"]) if row["cost"] else 0.0
+
+ self.chart_data = chart_data
+
+ # Update KPIs from root node
+ self.unique_patients = root_patients
+ self.total_cost = root_cost
+
+ # Count unique drugs from level 3+ nodes
+ drug_nodes = [r for r in rows if r["level"] >= 3]
+ unique_drugs = set()
+ for r in drug_nodes:
+ if r["drug_sequence"]:
+ # drug_sequence is pipe-separated
+ for drug in r["drug_sequence"].split("|"):
+ if drug:
+ unique_drugs.add(drug)
+ self.total_drugs = len(unique_drugs)
+
+ # Get data freshness from pathway_refresh_log
+ cursor.execute("""
+ SELECT completed_at
+ FROM pathway_refresh_log
+ WHERE status = 'completed'
+ ORDER BY completed_at DESC
+ LIMIT 1
+ """)
+ refresh_row = cursor.fetchone()
+ if refresh_row and refresh_row["completed_at"]:
+ self.last_updated = refresh_row["completed_at"]
+
+ # Update chart title
+ self.chart_title = self._generate_pathway_chart_title()
+
+ conn.close()
+ self.chart_loading = False
+ self.error_message = ""
+
+ except sqlite3.Error as e:
+ self.error_message = f"Unable to load pathway data. Database error: {str(e)}"
+ self.chart_data = []
+ self.chart_loading = False
+ except Exception as e:
+ self.error_message = f"Failed to load pathway data. Details: {str(e)}"
+ self.chart_data = []
+ self.chart_loading = False
+
+ def _generate_pathway_chart_title(self) -> str:
+ """Generate chart title based on current pathway filter state."""
+ parts = []
+
+ # Date filter info
+ initiated_label = "All years"
+ if self.selected_initiated == "1yr":
+ initiated_label = "Last 1 year"
+ elif self.selected_initiated == "2yr":
+ initiated_label = "Last 2 years"
+
+ last_seen_label = "Last 6 months" if self.selected_last_seen == "6mo" else "Last 12 months"
+ parts.append(f"{initiated_label} / {last_seen_label}")
+
+ # Drug selection info
+ if self.selected_drugs:
+ if len(self.selected_drugs) <= 3:
+ parts.append(", ".join(self.selected_drugs))
+ else:
+ parts.append(f"{len(self.selected_drugs)} drugs selected")
+
+ # Directorate selection info
+ if self.selected_directorates:
+ if len(self.selected_directorates) <= 2:
+ parts.append(", ".join(self.selected_directorates))
+ else:
+ parts.append(f"{len(self.selected_directorates)} directorates")
+
+ if parts:
+ return " | ".join(parts)
+ return "All Patients"
+
+ def recalculate_parent_totals(self):
+ """
+ Recalculate parent node totals after filtering.
+
+ When trust/directory/drug filters are applied, the parent nodes
+ (root, trust, directory) need their totals recalculated to sum
+ only the visible children.
+
+ This method walks up the hierarchy and recalculates values.
+ """
+ if not self.chart_data:
+ return
+
+ # Build parent-child relationships
+ children_by_parent = {}
+ nodes_by_id = {}
+
+ for node in self.chart_data:
+ node_id = node["ids"]
+ parent_id = node["parents"]
+ nodes_by_id[node_id] = node
+
+ if parent_id not in children_by_parent:
+ children_by_parent[parent_id] = []
+ children_by_parent[parent_id].append(node)
+
+ # Recalculate from bottom up
+ # Find all leaf nodes (nodes with no children)
+ all_ids = set(nodes_by_id.keys())
+ parent_ids = set(children_by_parent.keys())
+ leaf_ids = all_ids - parent_ids
+
+ # Walk up from leaves, recalculating parent totals
+ processed = set()
+ to_process = list(leaf_ids)
+
+ while to_process:
+ node_id = to_process.pop(0)
+ if node_id in processed or node_id not in nodes_by_id:
+ continue
+
+ node = nodes_by_id[node_id]
+ parent_id = node["parents"]
+
+ if parent_id and parent_id in nodes_by_id:
+ parent = nodes_by_id[parent_id]
+ # Sum children's values
+ children = children_by_parent.get(parent_id, [])
+ parent["value"] = sum(c["value"] for c in children)
+ parent["cost"] = sum(c["cost"] for c in children)
+
+ # Recalculate colour (proportion of grandparent)
+ grandparent_id = parent["parents"]
+ if grandparent_id and grandparent_id in nodes_by_id:
+ grandparent_value = nodes_by_id[grandparent_id]["value"]
+ if grandparent_value > 0:
+ parent["colour"] = parent["value"] / grandparent_value
+
+ # Queue parent for processing
+ if parent_id not in processed:
+ to_process.append(parent_id)
+
+ processed.add(node_id)
+
+ # Update the chart_data list
+ self.chart_data = list(nodes_by_id.values())
+
+ # Update KPIs from root
+ for node in self.chart_data:
+ if node["parents"] == "": # Root node
+ self.unique_patients = node["value"]
+ self.total_cost = node["cost"]
+ break
+
+ # =========================================================================
+ # Chart Data Preparation Methods
+ # =========================================================================
+
+ # Chart data stored as list of dicts for Reflex serialization
+ # Structure: [{"parents": str, "ids": str, "labels": str, "value": int, "cost": float, "colour": float}, ...]
+ chart_data: list[dict[str, Any]] = []
+ chart_title: str = ""
+
+ def prepare_chart_data(self):
+ """
+ Prepare hierarchical data for Plotly icicle chart.
+
+ This method queries the filtered patient data and transforms it into
+ a hierarchical structure: Root → Trust → Directory → Drug
+
+ The chart data is stored in self.chart_data as a list of dicts with:
+ - parents: Parent node identifier
+ - ids: Unique node identifier (hierarchical path)
+ - labels: Display label
+ - value: Patient count
+ - cost: Total cost
+ - colour: Color value (proportion of parent)
+
+ Updates: chart_data, chart_title, chart_loading
+ """
+ import sqlite3
+
+ db_path = Path("data/pathways.db")
+
+ if not db_path.exists():
+ self.error_message = "Unable to generate chart. Database not found."
+ self.chart_data = []
+ return
+
+ self.chart_loading = True
+
+ try:
+ conn = sqlite3.connect(str(db_path))
+ cursor = conn.cursor()
+
+ # Build WHERE clause for filters
+ where_clauses = []
+ params = []
+
+ # Drug filter (if any drugs selected)
+ if self.selected_drugs:
+ placeholders = ",".join("?" * len(self.selected_drugs))
+ where_clauses.append(f"drug_name_std IN ({placeholders})")
+ params.extend(self.selected_drugs)
+
+ # Directorate filter (if any directorates selected)
+ if self.selected_directorates:
+ placeholders = ",".join("?" * len(self.selected_directorates))
+ where_clauses.append(f"directory IN ({placeholders})")
+ params.extend(self.selected_directorates)
+
+ base_where = ""
+ if where_clauses:
+ base_where = "WHERE " + " AND ".join(where_clauses)
+
+ # Build date filter HAVING clauses for patient-level filtering
+ having_clauses = []
+ having_params = []
+
+ if self.initiated_filter_enabled and self.initiated_from_date:
+ having_clauses.append("first_seen >= ?")
+ having_params.append(self.initiated_from_date)
+ if self.initiated_filter_enabled and self.initiated_to_date:
+ having_clauses.append("first_seen <= ?")
+ having_params.append(self.initiated_to_date)
+
+ if self.last_seen_filter_enabled and self.last_seen_from_date:
+ having_clauses.append("last_seen >= ?")
+ having_params.append(self.last_seen_from_date)
+ if self.last_seen_filter_enabled and self.last_seen_to_date:
+ having_clauses.append("last_seen <= ?")
+ having_params.append(self.last_seen_to_date)
+
+ having_clause = ""
+ if having_clauses:
+ having_clause = "HAVING " + " AND ".join(having_clauses)
+
+ # Query to get aggregated data by Trust -> Directory -> Drug
+ # fact_interventions already has org_name, use it directly
+ chart_query = f"""
+ WITH filtered_patients AS (
+ SELECT upid
+ FROM (
+ SELECT
+ upid,
+ MIN(intervention_date) as first_seen,
+ MAX(intervention_date) as last_seen
+ FROM fact_interventions
+ {base_where}
+ GROUP BY upid
+ {having_clause}
+ )
+ ),
+ patient_records AS (
+ SELECT
+ f.upid,
+ COALESCE(f.org_name, f.provider_code) as trust_name,
+ f.directory,
+ f.drug_name_std,
+ f.price_actual
+ FROM fact_interventions f
+ INNER JOIN filtered_patients fp ON f.upid = fp.upid
+ {base_where.replace('WHERE', 'AND') if base_where else ''}
+ )
+ SELECT
+ trust_name,
+ directory,
+ drug_name_std,
+ COUNT(DISTINCT upid) as patient_count,
+ COALESCE(SUM(price_actual), 0) as total_cost
+ FROM patient_records
+ GROUP BY trust_name, directory, drug_name_std
+ ORDER BY trust_name, directory, drug_name_std
+ """
+
+ all_params = params + having_params
+ if where_clauses:
+ all_params.extend(params)
+
+ cursor.execute(chart_query, all_params)
+ rows = cursor.fetchall()
+
+ conn.close()
+
+ # Build hierarchical chart data
+ chart_data = []
+ hierarchy_totals = {} # Track totals for calculating color values
+
+ # Root node
+ root_id = "N&WICS"
+ chart_data.append({
+ "parents": "",
+ "ids": root_id,
+ "labels": "Norfolk & Waveney ICS",
+ "value": 0,
+ "cost": 0.0,
+ "colour": 1.0,
+ })
+
+ # Process rows to build hierarchy
+ trust_totals = {}
+ directory_totals = {}
+ drug_data = []
+
+ for row in rows:
+ trust_name, directory, drug_name, patient_count, cost = row
+
+ if not trust_name or not directory or not drug_name:
continue
- # Check patient indication in GP records
- # Note: We use the UPID as patient identifier - this may need mapping to pseudonymised NHS number
- # For now, assume UPID can be used directly or is already the pseudonymised ID
- has_indication, matched_cluster, _, _ = patient_has_indication(
- patient_pseudonym=upid,
- cluster_ids=cluster_ids,
- connector=connector,
- )
+ # Trust level
+ trust_id = f"{root_id} - {trust_name}"
+ if trust_id not in trust_totals:
+ trust_totals[trust_id] = {"value": 0, "cost": 0.0, "label": trust_name}
+ trust_totals[trust_id]["value"] += patient_count
+ trust_totals[trust_id]["cost"] += cost
- # Update dataframe for this patient-drug combination
- mask = (df["UPID"] == upid) & (df["Drug Name"] == drug_name)
- df.loc[mask, "Indication_Valid"] = has_indication
- df.loc[mask, "Indication_Source"] = "GP_SNOMED" if has_indication else "NONE"
- if matched_cluster:
- df.loc[mask, "Indication_Cluster"] = matched_cluster
+ # Directory level
+ dir_id = f"{trust_id} - {directory}"
+ if dir_id not in directory_totals:
+ directory_totals[dir_id] = {
+ "value": 0,
+ "cost": 0.0,
+ "label": directory,
+ "parent": trust_id,
+ }
+ directory_totals[dir_id]["value"] += patient_count
+ directory_totals[dir_id]["cost"] += cost
- if has_indication:
- validation_results[drug_upper]["matched"] += 1
+ # Drug level (leaf)
+ drug_id = f"{dir_id} - {drug_name}"
+ drug_data.append({
+ "ids": drug_id,
+ "labels": drug_name,
+ "parent": dir_id,
+ "value": patient_count,
+ "cost": float(cost),
+ })
- # Store validation results and create summary
- self.indication_validation_results = {
- drug: {
- "drug_name": data["name"],
- "total_patients": data["total"],
- "patients_with_indication": data["matched"],
- "match_rate": round(data["matched"] / data["total"] * 100, 1) if data["total"] > 0 else 0,
- }
- for drug, data in validation_results.items()
- }
+ # Calculate root total
+ root_total = sum(t["value"] for t in trust_totals.values())
+ root_cost = sum(t["cost"] for t in trust_totals.values())
+ chart_data[0]["value"] = root_total
+ chart_data[0]["cost"] = root_cost
- # Create summary text
- total_patients = sum(d["total"] for d in validation_results.values())
- matched_patients = sum(d["matched"] for d in validation_results.values())
- overall_rate = round(matched_patients / total_patients * 100, 1) if total_patients > 0 else 0
+ # Add trust nodes with color proportions
+ for trust_id, data in trust_totals.items():
+ colour = data["value"] / root_total if root_total > 0 else 0
+ chart_data.append({
+ "parents": root_id,
+ "ids": trust_id,
+ "labels": data["label"],
+ "value": data["value"],
+ "cost": data["cost"],
+ "colour": colour,
+ })
- self.indication_validation_summary = (
- f"GP Indication Validation: {matched_patients}/{total_patients} "
- f"({overall_rate}%) patients have valid GP diagnosis"
+ # Add directory nodes with color proportions
+ for dir_id, data in directory_totals.items():
+ parent_total = trust_totals[data["parent"]]["value"]
+ colour = data["value"] / parent_total if parent_total > 0 else 0
+ chart_data.append({
+ "parents": data["parent"],
+ "ids": dir_id,
+ "labels": data["label"],
+ "value": data["value"],
+ "cost": data["cost"],
+ "colour": colour,
+ })
+
+ # Add drug nodes with color proportions
+ for drug in drug_data:
+ parent_dir = drug["parent"]
+ parent_total = directory_totals[parent_dir]["value"]
+ colour = drug["value"] / parent_total if parent_total > 0 else 0
+ chart_data.append({
+ "parents": parent_dir,
+ "ids": drug["ids"],
+ "labels": drug["labels"],
+ "value": drug["value"],
+ "cost": drug["cost"],
+ "colour": colour,
+ })
+
+ self.chart_data = chart_data
+ self.chart_title = self._generate_chart_title()
+ self.chart_loading = False
+ self.error_message = ""
+
+ except sqlite3.Error as e:
+ self.error_message = f"Unable to generate chart. Database error: {str(e)}"
+ self.chart_data = []
+ self.chart_loading = False
+ except Exception as e:
+ self.error_message = f"Unable to generate chart. Details: {str(e)}"
+ self.chart_data = []
+ self.chart_loading = False
+
+ def _generate_chart_title(self) -> str:
+ """Generate chart title based on current filter state."""
+ parts = []
+
+ # Date range info
+ if self.last_seen_filter_enabled:
+ parts.append(f"Last seen: {self.last_seen_from_date} to {self.last_seen_to_date}")
+ elif self.initiated_filter_enabled:
+ parts.append(f"Initiated: {self.initiated_from_date} to {self.initiated_to_date}")
+
+ # Drug selection info
+ if self.selected_drugs:
+ if len(self.selected_drugs) <= 3:
+ parts.append(", ".join(self.selected_drugs))
+ else:
+ parts.append(f"{len(self.selected_drugs)} drugs selected")
+
+ # Directorate selection info
+ if self.selected_directorates:
+ if len(self.selected_directorates) <= 2:
+ parts.append(", ".join(self.selected_directorates))
+ else:
+ parts.append(f"{len(self.selected_directorates)} directorates")
+
+ if parts:
+ return " | ".join(parts)
+ return "All Patients"
+
+ # =========================================================================
+ # Plotly Chart Generation
+ # =========================================================================
+
+ @rx.var
+ def icicle_figure(self) -> go.Figure:
+ """
+ Generate Plotly icicle chart from chart_data.
+
+ This computed property creates a go.Figure with the hierarchical icicle chart
+ using data from prepare_chart_data(). The chart displays patient pathways:
+ Root → Trust → Directory → Drug
+
+ Colors use a custom NHS-inspired blue gradient colorscale.
+ Hover displays patient count, cost, and percentage of parent.
+
+ Returns:
+ Plotly Figure object ready for rx.plotly() component
+ """
+ # Return empty figure if no data
+ if not self.chart_data:
+ return go.Figure()
+
+ # Extract lists from chart_data
+ parents = [d.get("parents", "") for d in self.chart_data]
+ ids = [d.get("ids", "") for d in self.chart_data]
+ labels = [d.get("labels", "") for d in self.chart_data]
+ values = [d.get("value", 0) for d in self.chart_data]
+ costs = [d.get("cost", 0.0) for d in self.chart_data]
+ colours = [d.get("colour", 0.0) for d in self.chart_data]
+
+ # NHS-inspired blue gradient colorscale (from design system)
+ # Heritage Blue → Primary Blue → Vibrant Blue → Sky Blue → Pale Blue
+ colorscale = [
+ [0.0, "#003087"], # Heritage Blue
+ [0.25, "#0066CC"], # Primary Blue
+ [0.5, "#1E88E5"], # Vibrant Blue
+ [0.75, "#4FC3F7"], # Sky Blue
+ [1.0, "#E3F2FD"], # Pale Blue
+ ]
+
+ # Create the icicle chart
+ fig = go.Figure(
+ go.Icicle(
+ labels=labels,
+ ids=ids,
+ parents=parents,
+ values=values,
+ branchvalues="total",
+ marker=dict(
+ colors=colours,
+ colorscale=colorscale,
+ line=dict(width=1, color="#FFFFFF"),
+ ),
+ maxdepth=3,
+ # Custom data for hover template
+ customdata=list(zip(values, colours, costs)),
+ # Text shown on chart segments
+ texttemplate="%{label}
%{value:,} patients",
+ # Hover text with full details
+ hovertemplate=(
+ "%{label}
"
+ "Patients: %{customdata[0]:,} (%{customdata[1]:.1%} of parent)
"
+ "Total Cost: £%{customdata[2]:,.0f}"
+ ""
+ ),
+ textfont=dict(
+ family="Inter, system-ui, sans-serif",
+ size=12,
+ ),
)
+ )
- except Exception as e:
- self.indication_validation_summary = f"Indication validation error: {str(e)}"
- # Don't fail the whole analysis - just leave columns as NOT_CHECKED
-
- finally:
- self.indication_validation_running = False
-
- return df
-
- def toggle_indication_validation(self):
- """Toggle indication validation on/off."""
- self.indication_validation_enabled = not self.indication_validation_enabled
-
- @rx.var
- def indication_validation_status(self) -> str:
- """Get human-readable indication validation status."""
- if self.indication_validation_running:
- return "Validating patient indications..."
- if self.indication_validation_summary:
- return self.indication_validation_summary
- if self.indication_validation_enabled:
- return "Enabled (will check GP records)"
- return "Disabled"
-
- @rx.var
- def indication_results_list(self) -> list[dict]:
- """
- Get indication validation results as a list for display.
-
- Returns list of dicts with: drug_name, total_patients, patients_with_indication, match_rate
- Sorted by match rate ascending (worst first) for easy identification of issues.
- """
- if not self.indication_validation_results:
- return []
-
- results = []
- for drug_key, data in self.indication_validation_results.items():
- results.append({
- "drug_name": data.get("drug_name", drug_key),
- "total_patients": data.get("total_patients", 0),
- "patients_with_indication": data.get("patients_with_indication", 0),
- "match_rate": data.get("match_rate", 0),
- })
-
- # Sort by match rate ascending (lowest first to highlight issues)
- results.sort(key=lambda x: x["match_rate"])
- return results
-
- @rx.var
- def has_indication_results(self) -> bool:
- """Check if there are indication validation results to display."""
- return len(self.indication_validation_results) > 0
-
- def export_chart_html(self):
- """
- Export the current chart as an interactive HTML file.
-
- The file is saved to data/exports/ directory with a timestamped filename.
- """
- if not self.has_chart:
- self.export_error = "No chart to export. Please run analysis first."
- return
-
- self.export_error = ""
- self.export_message = ""
-
- try:
- from datetime import datetime
-
- # Create exports directory
- export_dir = Path("data/exports")
- export_dir.mkdir(parents=True, exist_ok=True)
-
- # Generate filename with timestamp
- timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
- filename = f"pathway_chart_{timestamp}.html"
- filepath = export_dir / filename
-
- # Export the chart to HTML
- self.chart_data.write_html(
- str(filepath),
- include_plotlyjs=True,
- full_html=True,
- )
-
- self.last_export_path = str(filepath)
- self.export_message = f"Chart exported to {filename}"
-
- except Exception as e:
- self.export_error = f"Export failed: {str(e)}"
-
- def export_data_csv(self):
- """
- Export the underlying analysis data as a CSV file.
-
- The file is saved to data/exports/ directory with a timestamped filename.
- """
- if self._analysis_data is None or len(self._analysis_data) == 0:
- self.export_error = "No data to export. Please run analysis first."
- return
-
- self.export_error = ""
- self.export_message = ""
-
- try:
- from datetime import datetime
-
- # Create exports directory
- export_dir = Path("data/exports")
- export_dir.mkdir(parents=True, exist_ok=True)
-
- # Generate filename with timestamp
- timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
- filename = f"pathway_data_{timestamp}.csv"
- filepath = export_dir / filename
-
- # Export the data to CSV
- self._analysis_data.to_csv(filepath, index=False)
-
- self.last_export_path = str(filepath)
- self.export_message = f"Data exported to {filename}"
-
- except Exception as e:
- self.export_error = f"Export failed: {str(e)}"
-
- def clear_export_messages(self):
- """Clear export status messages."""
- self.export_message = ""
- self.export_error = ""
-
-
-# =============================================================================
-# Page Components
-# =============================================================================
-
-def info_card(title: str, value: str, icon: str) -> rx.Component:
- """Create an info card showing a statistic."""
- return rx.box(
- rx.vstack(
- rx.hstack(
- rx.icon(icon, size=20, color=NHS_BLUE),
- rx.text(title, size="2", color="gray"),
- spacing="2",
- align="center",
+ # Configure layout
+ fig.update_layout(
+ title=dict(
+ text=f"Patient Pathways — {self.chart_title}",
+ font=dict(
+ family="Inter, system-ui, sans-serif",
+ size=18,
+ color="#1E293B", # Slate 900
+ ),
+ x=0.5,
+ xanchor="center",
),
- rx.text(value, size="5", weight="bold"),
- spacing="1",
- align="start",
- ),
- padding="16px",
- background="white",
- border_radius="8px",
- border="1px solid rgb(229, 231, 235)",
- width="100%",
- )
+ margin=dict(t=60, l=10, r=10, b=30),
+ hoverlabel=dict(
+ bgcolor="#FFFFFF",
+ bordercolor="#CBD5E1", # Slate 300
+ font=dict(
+ family="Inter, system-ui, sans-serif",
+ size=13,
+ color="#1E293B", # Slate 900
+ ),
+ ),
+ paper_bgcolor="rgba(0,0,0,0)", # Transparent background
+ plot_bgcolor="rgba(0,0,0,0)",
+ # Responsive sizing - height set but width auto
+ height=600,
+ # Enable interactivity
+ clickmode="event+select",
+ )
+
+ # Disable sort to maintain hierarchy order
+ fig.update_traces(sort=False)
+
+ return fig
-def date_input(label: str, value: rx.Var, on_change, help_text: str = "", input_id: str = "") -> rx.Component:
- """Create a labeled date input component with accessibility support."""
- # Generate a unique ID if not provided
- label_id = f"{input_id}-label" if input_id else ""
- help_id = f"{input_id}-help" if input_id else ""
+# =============================================================================
+# Layout Components
+# =============================================================================
+def date_range_picker(
+ label: str,
+ enabled: rx.Var[bool],
+ toggle_handler,
+ from_value: rx.Var[str],
+ to_value: rx.Var[str],
+ on_from_change,
+ on_to_change,
+) -> rx.Component:
+ """
+ Date range picker with enable/disable checkbox.
+
+ Uses debounced inputs (300ms) to prevent excessive filter updates.
+
+ Args:
+ label: Label for the date range (e.g., "Initiated", "Last Seen")
+ enabled: Whether the filter is active
+ toggle_handler: Event handler to toggle enabled state
+ from_value: Current "from" date value
+ to_value: Current "to" date value
+ on_from_change: Handler for from date change
+ on_to_change: Handler for to date change
+ """
return rx.vstack(
- rx.el.label(
- label,
- html_for=input_id,
- font_size="14px",
- font_weight="500",
- color=NHS_DARK_BLUE,
- ),
- rx.input(
- type="date",
- value=value,
- on_change=on_change,
- width="100%",
- id=input_id,
- aria_describedby=help_id if help_text else "",
- ),
- rx.cond(
- help_text != "",
- rx.text(help_text, size="1", color="gray", id=help_id),
- ),
- spacing="1",
- align="start",
- width="100%",
- )
-
-
-def data_source_selector() -> rx.Component:
- """Data source selector with file upload, SQLite, and Snowflake options."""
- return rx.box(
- rx.vstack(
- rx.heading("Data Source", size="5", color=NHS_DARK_BLUE),
- rx.text(
- "Select where to load patient data from",
+ # Header with checkbox
+ rx.hstack(
+ rx.checkbox(
+ checked=enabled,
+ on_change=toggle_handler,
size="2",
- color="gray",
),
- rx.divider(margin_y="8px"),
- # Current data source display
- rx.hstack(
- rx.text("Current source:", weight="medium"),
- rx.badge(
- State.data_source_display,
- color_scheme=rx.cond(
- State.data_source == "sqlite",
- "green",
- rx.cond(
- State.data_source == "snowflake",
- "blue",
- "gray",
- ),
- ),
- size="2",
- ),
- spacing="2",
- align="center",
+ rx.text(
+ label,
+ font_size=Typography.H3_SIZE,
+ font_weight=Typography.H3_WEIGHT,
+ color=Colors.SLATE_900,
+ font_family=Typography.FONT_FAMILY,
),
- rx.divider(margin_y="8px"),
- # Data source options
+ align="center",
+ spacing="2",
+ ),
+ # Date inputs (debounced 300ms)
+ rx.hstack(
rx.vstack(
- # SQLite option
- rx.box(
- rx.hstack(
- rx.icon("database", size=20, color=NHS_BLUE),
- rx.vstack(
- rx.hstack(
- rx.text("SQLite Database", weight="medium"),
- rx.cond(
- State.sqlite_available,
- rx.badge("Available", color_scheme="green", size="1"),
- rx.badge("No data", color_scheme="gray", size="1"),
- ),
- spacing="2",
- ),
- rx.cond(
- State.sqlite_available,
- rx.text(
- f"Contains pre-loaded patient data",
- size="1",
- color="gray",
- ),
- rx.text(
- "Run data migration to populate",
- size="1",
- color="gray",
- ),
- ),
- spacing="1",
- align="start",
- ),
- rx.spacer(),
- rx.button(
- "Use SQLite",
- on_click=State.use_sqlite_source,
- variant=rx.cond(State.data_source == "sqlite", "solid", "outline"),
- color_scheme="green",
- size="2",
- disabled=~State.sqlite_available,
- ),
- spacing="3",
- align="center",
- width="100%",
- ),
- padding="12px",
- background=rx.cond(
- State.data_source == "sqlite",
- "rgba(0, 94, 184, 0.05)",
- "transparent",
- ),
- border_radius="6px",
- border=rx.cond(
- State.data_source == "sqlite",
- "1px solid rgb(0, 94, 184)",
- "1px solid transparent",
- ),
- width="100%",
+ rx.text(
+ "From",
+ **text_caption(),
),
- # File upload option
- rx.box(
- rx.vstack(
- rx.hstack(
- rx.icon("upload", size=20, color=NHS_BLUE),
- rx.vstack(
- rx.hstack(
- rx.text("Upload File", weight="medium"),
- rx.cond(
- State.file_upload_success,
- rx.badge(State.file_size_display, color_scheme="green", size="1"),
- ),
- spacing="2",
- ),
- rx.text(
- "Upload CSV or Parquet file",
- size="1",
- color="gray",
- ),
- spacing="1",
- align="start",
- ),
- rx.spacer(),
- rx.cond(
- State.file_upload_success,
- rx.hstack(
- rx.button(
- "Use File",
- on_click=State.use_file_source,
- variant=rx.cond(State.data_source == "file", "solid", "outline"),
- color_scheme="blue",
- size="2",
- ),
- rx.button(
- rx.icon("x", size=14),
- on_click=State.clear_uploaded_file,
- variant="ghost",
- color_scheme="red",
- size="1",
- ),
- spacing="1",
- ),
- ),
- spacing="3",
- align="center",
- width="100%",
- ),
- rx.cond(
- State.file_upload_success,
- rx.text(
- State.uploaded_file_name,
- size="2",
- color=NHS_BLUE,
- font_family="monospace",
- ),
- rx.upload(
- rx.vstack(
- rx.cond(
- State.file_processing,
- rx.spinner(size="2"),
- rx.icon("file-up", size=24, color="gray"),
- ),
- rx.text(
- "Drag & drop or click to browse",
- size="2",
- color="gray",
- ),
- rx.text(
- "Supports CSV, Parquet",
- size="1",
- color="gray",
- ),
- spacing="2",
- align="center",
- padding="16px",
- ),
- id="file_upload",
- accept={
- "text/csv": [".csv"],
- "application/octet-stream": [".parquet", ".pq"],
- },
- max_files=1,
- border="1px dashed rgb(200, 200, 200)",
- border_radius="6px",
- padding="4px",
- width="100%",
- on_drop=State.handle_file_upload(rx.upload_files(upload_id="file_upload")),
- ),
- ),
- rx.cond(
- State.file_upload_error != "",
- rx.text(
- State.file_upload_error,
- size="2",
- color="red",
- ),
- ),
- spacing="2",
- width="100%",
+ rx.debounce_input(
+ rx.input(
+ type="date",
+ value=from_value,
+ on_change=on_from_change,
+ disabled=~enabled,
+ **input_style(),
+ width="140px",
+ opacity=rx.cond(enabled, "1", "0.5"),
),
- padding="12px",
- background=rx.cond(
- (State.data_source == "file") & State.file_upload_success,
- "rgba(0, 94, 184, 0.05)",
- "transparent",
- ),
- border_radius="6px",
- border=rx.cond(
- (State.data_source == "file") & State.file_upload_success,
- "1px solid rgb(0, 94, 184)",
- "1px solid transparent",
- ),
- width="100%",
+ debounce_timeout=300,
),
- # Snowflake option
- rx.box(
- rx.hstack(
- rx.icon("cloud", size=20, color=NHS_BLUE),
- rx.vstack(
- rx.hstack(
- rx.text("Snowflake", weight="medium"),
- rx.cond(
- State.snowflake_configured,
- rx.badge("Configured", color_scheme="blue", size="1"),
- rx.badge("Not configured", color_scheme="gray", size="1"),
- ),
- spacing="2",
- ),
- rx.text(
- "Query live data from Snowflake",
- size="1",
- color="gray",
- ),
- spacing="1",
- align="start",
- ),
- rx.spacer(),
- rx.button(
- "Use Snowflake",
- on_click=State.use_snowflake_source,
- variant=rx.cond(State.data_source == "snowflake", "solid", "outline"),
- color_scheme="blue",
- size="2",
- disabled=~State.snowflake_configured,
- ),
- spacing="3",
- align="center",
- width="100%",
- ),
- padding="12px",
- background=rx.cond(
- State.data_source == "snowflake",
- "rgba(0, 94, 184, 0.05)",
- "transparent",
- ),
- border_radius="6px",
- border=rx.cond(
- State.data_source == "snowflake",
- "1px solid rgb(0, 94, 184)",
- "1px solid transparent",
- ),
- width="100%",
+ spacing="1",
+ align="start",
+ ),
+ rx.vstack(
+ rx.text(
+ "To",
+ **text_caption(),
),
- spacing="2",
- width="100%",
+ rx.debounce_input(
+ rx.input(
+ type="date",
+ value=to_value,
+ on_change=on_to_change,
+ disabled=~enabled,
+ **input_style(),
+ width="140px",
+ opacity=rx.cond(enabled, "1", "0.5"),
+ ),
+ debounce_timeout=300,
+ ),
+ spacing="1",
+ align="start",
),
spacing="3",
+ align="end",
+ ),
+ spacing="2",
+ align="start",
+ )
+
+
+def searchable_dropdown(
+ label: str,
+ selection_text: rx.Var[str],
+ is_open: rx.Var[bool],
+ toggle_handler,
+ search_value: rx.Var[str],
+ on_search_change,
+ filtered_items: rx.Var[list[str]],
+ selected_items: rx.Var[list[str]],
+ toggle_item_handler,
+ select_all_handler,
+ clear_all_handler,
+) -> rx.Component:
+ """
+ Searchable multi-select dropdown component.
+
+ Uses debounced search input (300ms) for smooth filtering.
+
+ Args:
+ label: Label for the dropdown
+ selection_text: Text showing selection count
+ is_open: Whether dropdown is expanded
+ toggle_handler: Handler to toggle dropdown open/close
+ search_value: Current search text
+ on_search_change: Handler for search input change
+ filtered_items: Items filtered by search
+ selected_items: Currently selected items
+ toggle_item_handler: Handler to toggle item selection
+ select_all_handler: Handler to select all
+ clear_all_handler: Handler to clear selection
+ """
+ return rx.box(
+ rx.vstack(
+ # Label
+ rx.text(
+ label,
+ font_size=Typography.CAPTION_SIZE,
+ font_weight=Typography.CAPTION_WEIGHT,
+ color=Colors.SLATE_700,
+ font_family=Typography.FONT_FAMILY,
+ ),
+ # Dropdown trigger button
+ rx.box(
+ rx.hstack(
+ rx.text(
+ selection_text,
+ font_size=Typography.BODY_SIZE,
+ color=Colors.SLATE_900,
+ font_family=Typography.FONT_FAMILY,
+ flex="1",
+ ),
+ rx.icon(
+ rx.cond(is_open, "chevron-up", "chevron-down"),
+ size=16,
+ color=Colors.SLATE_500,
+ ),
+ justify="between",
+ align="center",
+ width="100%",
+ ),
+ **input_style(),
+ display="flex",
+ align_items="center",
+ cursor="pointer",
+ on_click=toggle_handler,
+ width="100%",
+ ),
+ # Dropdown panel
+ rx.cond(
+ is_open,
+ rx.box(
+ rx.vstack(
+ # Search input (debounced 300ms)
+ rx.hstack(
+ rx.icon("search", size=14, color=Colors.SLATE_500),
+ rx.debounce_input(
+ rx.input(
+ placeholder="Search...",
+ value=search_value,
+ on_change=on_search_change,
+ variant="soft",
+ size="2",
+ width="100%",
+ ),
+ debounce_timeout=300,
+ ),
+ spacing="2",
+ align="center",
+ width="100%",
+ padding=Spacing.SM,
+ background_color=Colors.SLATE_100,
+ border_radius=Radii.SM,
+ ),
+ # Action buttons
+ rx.hstack(
+ rx.button(
+ "Select All",
+ on_click=select_all_handler,
+ variant="ghost",
+ size="1",
+ color_scheme="blue",
+ ),
+ rx.button(
+ "Clear",
+ on_click=clear_all_handler,
+ variant="ghost",
+ size="1",
+ color_scheme="gray",
+ ),
+ spacing="2",
+ ),
+ # Items list
+ rx.box(
+ rx.foreach(
+ filtered_items,
+ lambda item: rx.box(
+ rx.checkbox(
+ item,
+ checked=selected_items.contains(item),
+ on_change=lambda: toggle_item_handler(item),
+ size="2",
+ ),
+ padding_y=Spacing.XS,
+ padding_x=Spacing.SM,
+ border_radius=Radii.SM,
+ background_color=rx.cond(
+ selected_items.contains(item),
+ Colors.PALE,
+ "transparent",
+ ),
+ _hover={
+ "background_color": Colors.SLATE_100,
+ },
+ width="100%",
+ ),
+ ),
+ max_height="200px",
+ overflow_y="auto",
+ width="100%",
+ ),
+ spacing="2",
+ align="start",
+ width="100%",
+ padding=Spacing.SM,
+ ),
+ position="absolute",
+ top="100%",
+ left="0",
+ right="0",
+ background_color=Colors.WHITE,
+ border=f"1px solid {Colors.SLATE_300}",
+ border_radius=Radii.MD,
+ box_shadow=Shadows.LG,
+ z_index="50",
+ margin_top=Spacing.XS,
+ ),
+ ),
+ spacing="1",
align="start",
width="100%",
),
- padding="20px",
- background="white",
- border_radius="8px",
- border="1px solid rgb(229, 231, 235)",
+ position="relative",
width="100%",
)
-def filter_controls() -> rx.Component:
- """Filter controls section with date pickers, minimum patients, and custom title."""
+def chart_tab(label: str, chart_type: str, is_active: bool = False) -> rx.Component:
+ """
+ Individual chart type tab/pill for top bar navigation.
+
+ Active state: White background with Heritage Blue text
+ Inactive state: Transparent with white text, hover shows Vibrant Blue background
+ """
+ return rx.box(
+ rx.text(
+ label,
+ font_size=Typography.BODY_SMALL_SIZE,
+ font_weight="500",
+ color=Colors.HERITAGE_BLUE if is_active else Colors.WHITE,
+ font_family=Typography.FONT_FAMILY,
+ ),
+ background_color=Colors.WHITE if is_active else "transparent",
+ padding_x=Spacing.LG,
+ padding_y=Spacing.SM,
+ border_radius=Radii.FULL,
+ cursor="pointer",
+ transition=f"background-color {Transitions.COLOR}",
+ _hover={
+ "background_color": Colors.WHITE if is_active else "rgba(255,255,255,0.15)",
+ },
+ # Future: on_click handler to switch chart type
+ )
+
+
+def top_bar() -> rx.Component:
+ """
+ Top navigation bar component.
+
+ Contains: Logo + App Name | Chart Type Tabs | Data Freshness Indicator
+ Fixed height: 64px (from design system)
+ Heritage Blue background with white text.
+ """
+ return rx.box(
+ rx.hstack(
+ # Left: Logo and App Title
+ rx.hstack(
+ rx.image(
+ src="/logo.png",
+ height="36px",
+ alt="NHS Logo",
+ ),
+ rx.text(
+ "HCD Analysis",
+ font_size=Typography.H2_SIZE,
+ font_weight=Typography.H2_WEIGHT,
+ color=Colors.WHITE,
+ font_family=Typography.FONT_FAMILY,
+ letter_spacing="-0.01em",
+ ),
+ align="center",
+ spacing="3",
+ ),
+ # Center: Chart Type Tabs
+ rx.hstack(
+ chart_tab("Icicle", "icicle", is_active=True),
+ chart_tab("Sankey", "sankey", is_active=False),
+ chart_tab("Timeline", "timeline", is_active=False),
+ spacing="2",
+ align="center",
+ background_color="rgba(255,255,255,0.1)",
+ padding=Spacing.XS,
+ border_radius=Radii.FULL,
+ ),
+ # Right: Data Freshness Indicator
+ rx.hstack(
+ rx.icon(
+ "database",
+ size=16,
+ color=Colors.SKY,
+ ),
+ rx.vstack(
+ rx.text(
+ rx.cond(
+ AppState.data_loaded,
+ AppState.total_records.to_string() + " records",
+ "Loading data...",
+ ),
+ font_size=Typography.CAPTION_SIZE,
+ font_weight="500",
+ color=Colors.WHITE,
+ font_family=Typography.FONT_FAMILY,
+ ),
+ rx.text(
+ rx.cond(
+ AppState.data_loaded,
+ "Refreshed: " + AppState.last_updated_display,
+ "Connecting...",
+ ),
+ font_size="11px",
+ color=Colors.WHITE,
+ opacity="0.7",
+ font_family=Typography.FONT_FAMILY,
+ ),
+ spacing="0",
+ align="end",
+ ),
+ spacing="2",
+ align="center",
+ ),
+ justify="between",
+ align="center",
+ width="100%",
+ max_width=PAGE_MAX_WIDTH,
+ margin_x="auto",
+ padding_x=Spacing.XL,
+ ),
+ background_color=Colors.HERITAGE_BLUE,
+ height=TOP_BAR_HEIGHT,
+ width="100%",
+ display="flex",
+ align_items="center",
+ position="sticky",
+ top="0",
+ z_index="100",
+ box_shadow=Shadows.MD,
+ )
+
+
+def filter_section() -> rx.Component:
+ """
+ Filter section component.
+
+ Contains:
+ - Two date range pickers: Initiated (default OFF), Last Seen (default ON)
+ - Three searchable multi-select dropdowns: Drugs, Indications, Directorates
+
+ Layout: Two rows
+ - Row 1: Date pickers side by side
+ - Row 2: Three dropdowns in a grid
+ """
return rx.box(
rx.vstack(
- rx.heading("Analysis Settings", size="5", color=NHS_DARK_BLUE, id="analysis-settings-heading"),
- # Date range row
+ # Header
+ rx.text(
+ "Filters",
+ **text_h1(),
+ ),
+ # Row 1: Date range pickers
rx.hstack(
- date_input(
- "Start Date",
- State.start_date,
- State.set_start_date,
- "Include patients initiated from this date",
- input_id="start-date",
+ date_range_picker(
+ label="Initiated",
+ enabled=AppState.initiated_filter_enabled,
+ toggle_handler=AppState.toggle_initiated_filter,
+ from_value=AppState.initiated_from_date,
+ to_value=AppState.initiated_to_date,
+ on_from_change=AppState.set_initiated_from,
+ on_to_change=AppState.set_initiated_to,
),
- date_input(
- "End Date",
- State.end_date,
- State.set_end_date,
- "Include patients initiated until this date",
- input_id="end-date",
+ rx.divider(orientation="vertical", size="3"),
+ date_range_picker(
+ label="Last Seen",
+ enabled=AppState.last_seen_filter_enabled,
+ toggle_handler=AppState.toggle_last_seen_filter,
+ from_value=AppState.last_seen_from_date,
+ to_value=AppState.last_seen_to_date,
+ on_from_change=AppState.set_last_seen_from,
+ on_to_change=AppState.set_last_seen_to,
),
- date_input(
- "Last Seen After",
- State.last_seen_date,
- State.set_last_seen_date,
- "Only include patients seen after this date",
- input_id="last-seen-date",
+ spacing="5",
+ align="start",
+ flex_wrap="wrap",
+ ),
+ # Divider
+ rx.divider(size="4"),
+ # Row 2: Searchable dropdowns
+ rx.hstack(
+ rx.box(
+ searchable_dropdown(
+ label="Drugs",
+ selection_text=AppState.drug_selection_text,
+ is_open=AppState.drug_dropdown_open,
+ toggle_handler=AppState.toggle_drug_dropdown,
+ search_value=AppState.drug_search,
+ on_search_change=AppState.set_drug_search,
+ filtered_items=AppState.filtered_drugs,
+ selected_items=AppState.selected_drugs,
+ toggle_item_handler=AppState.toggle_drug,
+ select_all_handler=AppState.select_all_drugs,
+ clear_all_handler=AppState.clear_all_drugs,
+ ),
+ flex="1",
+ min_width="200px",
+ ),
+ rx.box(
+ searchable_dropdown(
+ label="Indications",
+ selection_text=AppState.indication_selection_text,
+ is_open=AppState.indication_dropdown_open,
+ toggle_handler=AppState.toggle_indication_dropdown,
+ search_value=AppState.indication_search,
+ on_search_change=AppState.set_indication_search,
+ filtered_items=AppState.filtered_indications,
+ selected_items=AppState.selected_indications,
+ toggle_item_handler=AppState.toggle_indication,
+ select_all_handler=AppState.select_all_indications,
+ clear_all_handler=AppState.clear_all_indications,
+ ),
+ flex="1",
+ min_width="200px",
+ ),
+ rx.box(
+ searchable_dropdown(
+ label="Directorates",
+ selection_text=AppState.directorate_selection_text,
+ is_open=AppState.directorate_dropdown_open,
+ toggle_handler=AppState.toggle_directorate_dropdown,
+ search_value=AppState.directorate_search,
+ on_search_change=AppState.set_directorate_search,
+ filtered_items=AppState.filtered_directorates,
+ selected_items=AppState.selected_directorates,
+ toggle_item_handler=AppState.toggle_directorate,
+ select_all_handler=AppState.select_all_directorates,
+ clear_all_handler=AppState.clear_all_directorates,
+ ),
+ flex="1",
+ min_width="200px",
),
spacing="4",
width="100%",
flex_wrap="wrap",
- role="group",
- aria_label="Date range filters",
- ),
- rx.divider(margin_y="12px"),
- # Additional settings row
- rx.hstack(
- # Minimum patients
- rx.vstack(
- rx.el.label(
- "Minimum Patients",
- html_for="min-patients",
- font_size="14px",
- font_weight="500",
- color=NHS_DARK_BLUE,
- ),
- rx.hstack(
- rx.input(
- type="number",
- value=State.minimum_patients.to_string(),
- on_change=State.set_minimum_patients_from_input,
- min="0",
- max="1000",
- width="100px",
- id="min-patients",
- aria_describedby="min-patients-help",
- ),
- rx.slider(
- value=[State.minimum_patients],
- on_change=State.set_minimum_patients_from_slider,
- min=0,
- max=100,
- step=1,
- width="150px",
- aria_label="Minimum patients slider",
- ),
- spacing="3",
- align="center",
- ),
- rx.text(
- "Hide pathways with fewer patients",
- size="1",
- color="gray",
- id="min-patients-help",
- ),
- spacing="1",
- align="start",
- ),
- # Custom title
- rx.vstack(
- rx.el.label(
- "Custom Title (Optional)",
- html_for="custom-title",
- font_size="14px",
- font_weight="500",
- color=NHS_DARK_BLUE,
- ),
- rx.input(
- placeholder="Leave empty for auto-generated title",
- value=State.custom_title,
- on_change=State.set_custom_title,
- width="300px",
- id="custom-title",
- aria_describedby="custom-title-help",
- ),
- rx.text(
- "Override the default chart title",
- size="1",
- color="gray",
- id="custom-title-help",
- ),
- spacing="1",
- align="start",
- ),
- spacing="6",
- width="100%",
- flex_wrap="wrap",
- align="start",
),
spacing="4",
+ width="100%",
align="start",
- width="100%",
),
- padding="20px",
- background="white",
- border_radius="8px",
- border="1px solid rgb(229, 231, 235)",
+ **card_style(),
width="100%",
- role="region",
- aria_labelledby="analysis-settings-heading",
)
-def indication_result_row(result: dict) -> rx.Component:
- """Render a single row in the indication validation results table."""
- match_rate = result["match_rate"]
- # Color code: green for high match rates, amber for moderate, red for low
- # Use .to(int) to cast Reflex Var for comparison (rx.foreach items are Vars)
- rate_color = rx.cond(
- match_rate.to(int) >= 80,
- "green",
- rx.cond(match_rate.to(int) >= 50, "orange", "red"),
- )
- return rx.table.row(
- rx.table.cell(rx.text(result["drug_name"], weight="medium")),
- rx.table.cell(result["total_patients"].to_string()),
- rx.table.cell(result["patients_with_indication"].to_string()),
- rx.table.cell(
- rx.hstack(
- rx.progress(
- value=match_rate,
- max=100,
- width="60px",
- height="8px",
- color_scheme=rate_color,
- ),
- rx.text(
- match_rate.to_string() + "%",
- size="2",
- color=rate_color,
- weight="medium",
- ),
- spacing="2",
- align="center",
- )
- ),
- )
-
-
-def indication_validation_summary() -> rx.Component:
- """
- Component to display indication validation results per drug.
-
- Shows a collapsible section with a table of per-drug match rates,
- helping users identify which drugs have good vs poor GP diagnosis coverage.
- """
- return rx.cond(
- State.has_indication_results,
- rx.el.section(
- rx.vstack(
- # Header with overall summary
- rx.hstack(
- rx.hstack(
- rx.icon("clipboard-check", size=20, color=NHS_DARK_BLUE, aria_hidden="true"),
- rx.heading(
- "GP Indication Validation Results",
- size="5",
- color=NHS_DARK_BLUE,
- id="indication-results-heading",
- ),
- spacing="2",
- align="center",
- ),
- rx.spacer(),
- rx.badge(
- State.indication_validation_summary,
- color_scheme="blue",
- size="2",
- ),
- width="100%",
- align="center",
- ),
- rx.text(
- "Shows the percentage of patients with valid GP diagnoses matching their prescribed drug's indication. "
- "Lower rates may indicate prescribing for off-label use, data quality issues, or patients treated across multiple providers.",
- size="2",
- color="gray",
- ),
- # Results table
- rx.table.root(
- rx.table.header(
- rx.table.row(
- rx.table.column_header_cell("Drug Name"),
- rx.table.column_header_cell("Total Patients"),
- rx.table.column_header_cell("With GP Indication"),
- rx.table.column_header_cell("Match Rate"),
- ),
- ),
- rx.table.body(
- rx.foreach(State.indication_results_list, indication_result_row)
- ),
- width="100%",
- size="2",
- ),
- # Legend
- rx.hstack(
- rx.text("Legend:", size="1", color="gray", weight="medium"),
- rx.hstack(
- rx.badge("80%+", color_scheme="green", size="1"),
- rx.text("Good coverage", size="1", color="gray"),
- spacing="1",
- align="center",
- ),
- rx.hstack(
- rx.badge("50-79%", color_scheme="orange", size="1"),
- rx.text("Moderate", size="1", color="gray"),
- spacing="1",
- align="center",
- ),
- rx.hstack(
- rx.badge("<50%", color_scheme="red", size="1"),
- rx.text("Low coverage", size="1", color="gray"),
- spacing="1",
- align="center",
- ),
- spacing="4",
- flex_wrap="wrap",
- ),
- spacing="3",
- width="100%",
- align="start",
- ),
- padding="20px",
- background="white",
- border_radius="8px",
- border="1px solid rgb(229, 231, 235)",
- width="100%",
- aria_labelledby="indication-results-heading",
- ),
- )
-
-
-def home_content() -> rx.Component:
- """Home page content with filter configuration and analysis controls."""
- return rx.vstack(
- # Hero section
- rx.box(
- rx.vstack(
- rx.image(
- src="/logo.png",
- height="60px",
- alt="NHS Logo",
- ),
- rx.heading(
- "Patient Pathway Analysis",
- size="8",
- color=NHS_DARK_BLUE,
- ),
- rx.text(
- "Analyze secondary care treatment pathways for high-cost drugs",
- size="4",
- color="gray",
- ),
- spacing="3",
- align="center",
- ),
- padding="32px",
- background="white",
- border_radius="12px",
- border="1px solid rgb(229, 231, 235)",
- width="100%",
- text_align="center",
- ),
- # Status cards
- rx.hstack(
- info_card("Drugs Loaded", State.drug_selection_count, "pill"),
- info_card("Trusts", State.trust_selection_count, "building"),
- info_card("Directories", State.directory_selection_count, "folder"),
- spacing="4",
- width="100%",
- flex_wrap="wrap",
- ),
- # Data source selector
- data_source_selector(),
- # Filter controls (date pickers, minimum patients, custom title)
- filter_controls(),
- # Filter summary
- rx.box(
- rx.vstack(
- rx.heading("Current Filter Settings", size="4", color=NHS_DARK_BLUE),
- rx.text(
- State.filter_summary,
- white_space="pre-wrap",
- font_family="monospace",
- font_size="13px",
- color="gray",
- ),
- spacing="2",
- align="start",
- width="100%",
- ),
- padding="20px",
- background="white",
- border_radius="8px",
- border="1px solid rgb(229, 231, 235)",
- width="100%",
- ),
- # Action buttons
- rx.hstack(
- rx.button(
- rx.icon("database", size=16, aria_hidden="true"),
- "Load Reference Data",
- on_click=State.load_reference_data,
- color_scheme="blue",
- size="3",
- disabled=State.analysis_running,
- aria_label="Load reference data from CSV files",
- ),
- rx.button(
- rx.cond(
- State.analysis_running,
- rx.hstack(
- rx.spinner(size="1"),
- rx.text("Running..."),
- spacing="2",
- align="center",
- ),
- rx.hstack(
- rx.icon("play", size=16, aria_hidden="true"),
- rx.text("Run Analysis"),
- spacing="2",
- align="center",
- ),
- ),
- on_click=State.run_analysis,
- color_scheme="green",
- size="3",
- disabled=State.analysis_running,
- aria_label="Run patient pathway analysis",
- aria_busy=State.analysis_running,
- ),
- spacing="3",
- role="toolbar",
- aria_label="Analysis actions",
- ),
- # Messages with live regions for screen readers
- rx.cond(
- State.status_message != "",
- rx.callout(
- State.status_message,
- icon="info",
- color="blue",
- role="status",
- aria_live="polite",
- ),
- ),
- rx.cond(
- State.error_message != "",
- rx.callout(
- State.error_message,
- icon="triangle-alert",
- color="red",
- role="alert",
- aria_live="assertive",
- ),
- ),
- # Chart display
- rx.cond(
- State.has_chart,
- rx.el.section(
- rx.vstack(
- rx.hstack(
- rx.heading("Patient Pathway Chart", size="5", color=NHS_DARK_BLUE, id="chart-heading"),
- rx.spacer(),
- rx.hstack(
- rx.button(
- rx.icon("download", size=14, aria_hidden="true"),
- "Export HTML",
- on_click=State.export_chart_html,
- variant="outline",
- size="2",
- aria_label="Export chart as interactive HTML file",
- ),
- rx.button(
- rx.icon("file-spreadsheet", size=14, aria_hidden="true"),
- "Export CSV",
- on_click=State.export_data_csv,
- variant="outline",
- size="2",
- aria_label="Export data as CSV spreadsheet",
- ),
- spacing="2",
- role="toolbar",
- aria_label="Export options",
- ),
- width="100%",
- align="center",
- ),
- rx.text(
- "Click on sections to zoom in. Use the toolbar for additional options.",
- size="2",
- color="gray",
- ),
- # Export messages
- rx.cond(
- State.export_message != "",
- rx.callout(
- State.export_message,
- icon="check",
- color="green",
- role="status",
- aria_live="polite",
- ),
- ),
- rx.cond(
- State.export_error != "",
- rx.callout(
- State.export_error,
- icon="triangle-alert",
- color="red",
- role="alert",
- ),
- ),
- rx.el.figure(
- rx.plotly(data=State.chart_data),
- aria_label="Interactive patient pathway icicle chart showing treatment hierarchy",
- ),
- spacing="3",
- width="100%",
- ),
- padding="20px",
- background="white",
- border_radius="8px",
- border="1px solid rgb(229, 231, 235)",
- width="100%",
- aria_labelledby="chart-heading",
- ),
- ),
- # Indication validation results (shown after chart)
- indication_validation_summary(),
- spacing="5",
- width="100%",
- align="start",
- )
-
-
-def selection_page_content(
- title: str,
- description: str,
- items: rx.Var,
- selected_items: rx.Var,
- toggle_handler,
- select_all_handler,
- clear_handler,
- count_text: rx.Var,
- search_value: rx.Var,
- search_handler,
- clear_search_handler,
- search_result_text: rx.Var,
- extra_buttons: list[rx.Component] = None,
- page_id: str = "selection",
+def kpi_card(
+ value: rx.Var[str],
+ label: str,
+ icon_name: str = None,
+ highlight: bool = False,
) -> rx.Component:
- """Generic selection page content for drugs, trusts, directories with search and accessibility."""
- heading_id = f"{page_id}-heading"
- search_id = f"{page_id}-search"
- list_id = f"{page_id}-list"
+ """
+ KPI card component displaying a metric value with label.
- buttons = [
- rx.button(
- "Select All",
- on_click=select_all_handler,
- variant="outline",
- size="2",
- aria_label=f"Select all {title.lower()}",
- ),
- rx.button(
- "Clear All",
- on_click=clear_handler,
- variant="outline",
- size="2",
- aria_label=f"Clear all {title.lower()} selections",
- ),
- ]
- if extra_buttons:
- buttons.extend(extra_buttons)
+ Args:
+ value: The display value (should be a formatted string from computed var)
+ label: Label describing the metric
+ icon_name: Optional Lucide icon name to display
+ highlight: If True, uses Pale Blue background tint
- return rx.vstack(
- # Header
- rx.el.header(
- rx.vstack(
- rx.heading(title, size="6", color=NHS_DARK_BLUE, id=heading_id),
- rx.text(description, color="gray"),
- rx.el.div(
- count_text,
- font_weight="500",
- color=NHS_BLUE,
- aria_live="polite",
- aria_atomic="true",
- ),
- spacing="2",
- align="start",
+ Design specs from DESIGN_SYSTEM.md:
+ - Large mono number: 32-48px, Slate 900
+ - Label: Caption size, Slate 500
+ - Background: White or Pale Blue tint
+ """
+ # Build content - icon only if provided
+ content_items = []
+ if icon_name:
+ content_items.append(
+ rx.icon(
+ icon_name,
+ size=20,
+ color=Colors.PRIMARY,
+ )
+ )
+
+ return rx.box(
+ rx.vstack(
+ # Optional icon
+ rx.icon(
+ icon_name if icon_name else "activity",
+ size=20,
+ color=Colors.PRIMARY,
+ ) if icon_name else rx.fragment(),
+ # Value
+ rx.text(
+ value,
+ **kpi_value_style(),
),
- padding="20px",
- background="white",
- border_radius="8px",
- border="1px solid rgb(229, 231, 235)",
- width="100%",
- ),
- # Search input
- rx.box(
- rx.hstack(
- rx.icon("search", size=16, color="gray", aria_hidden="true"),
- rx.input(
- placeholder=f"Search {title.lower()}...",
- value=search_value,
- on_change=search_handler,
- width="100%",
- id=search_id,
- aria_label=f"Search {title.lower()}",
- aria_controls=list_id,
- ),
- rx.cond(
- search_value != "",
- rx.button(
- rx.icon("x", size=14, aria_hidden="true"),
- on_click=clear_search_handler,
- variant="ghost",
- color_scheme="gray",
- size="1",
- aria_label="Clear search",
- ),
- ),
- spacing="2",
- align="center",
- width="100%",
+ # Label
+ rx.text(
+ label,
+ **kpi_label_style(),
),
- padding="12px 16px",
- background="white",
- border_radius="8px",
- border="1px solid rgb(229, 231, 235)",
- width="100%",
- role="search",
- ),
- # Action buttons and search result count
- rx.hstack(
- rx.hstack(*buttons, spacing="2", role="toolbar", aria_label="Selection actions"),
- rx.spacer(),
- rx.el.div(
- search_result_text,
- font_size="14px",
- color="gray",
- aria_live="polite",
- ),
- spacing="3",
- width="100%",
+ spacing="1",
align="center",
),
- # Selection grid
- rx.box(
- rx.vstack(
- rx.foreach(
- items,
- lambda item: rx.box(
- rx.checkbox(
- item,
- checked=selected_items.contains(item),
- on_change=lambda: toggle_handler(item),
- size="2",
- ),
- padding="8px 12px",
- background=rx.cond(
- selected_items.contains(item),
- "rgba(0, 94, 184, 0.1)",
- "transparent",
- ),
- border_radius="4px",
- width="100%",
- ),
- ),
- spacing="1",
- width="100%",
- max_height="500px",
- overflow_y="auto",
- id=list_id,
- role="group",
- aria_labelledby=heading_id,
- ),
- padding="16px",
- background="white",
- border_radius="8px",
- border="1px solid rgb(229, 231, 235)",
- width="100%",
+ # Apply card styling manually to allow background_color override
+ background_color=Colors.PALE if highlight else Colors.WHITE,
+ border=f"1px solid {Colors.SLATE_300}",
+ border_radius=Radii.LG,
+ padding=Spacing.XL,
+ box_shadow=Shadows.SM,
+ text_align="center",
+ min_width="180px",
+ flex="1",
+ transition=f"box-shadow {Transitions.SHADOW}, transform {Transitions.TRANSFORM}",
+ _hover={
+ "box_shadow": Shadows.MD,
+ "transform": "translateY(-2px)",
+ },
+ )
+
+
+def kpi_row() -> rx.Component:
+ """
+ KPI metrics row component with responsive grid layout.
+
+ Contains:
+ - Unique Patients: COUNT(DISTINCT patient_id)
+ - Total Drugs: Count of selected/filtered drugs
+ - Total Cost: Sum of costs in filtered data
+ - Match Rate: Indication match percentage
+
+ Layout: Responsive flex row that wraps on smaller screens.
+ KPIs update reactively when filters change (Phase 3).
+ """
+ return rx.hstack(
+ # Unique Patients KPI - highlighted as primary metric
+ kpi_card(
+ value=AppState.unique_patients_display,
+ label="Unique Patients",
+ icon_name="users",
+ highlight=True,
+ ),
+ # Total Drugs KPI
+ kpi_card(
+ value=AppState.total_drugs_display,
+ label="Drug Types",
+ icon_name="pill",
+ highlight=False,
+ ),
+ # Total Cost KPI
+ kpi_card(
+ value=AppState.total_cost_display,
+ label="Total Cost",
+ icon_name="pound-sterling",
+ highlight=False,
+ ),
+ # Indication Match Rate KPI
+ kpi_card(
+ value=AppState.match_rate_display,
+ label="Indication Match",
+ icon_name="circle-check",
+ highlight=False,
),
spacing="4",
width="100%",
- align="start",
+ flex_wrap="wrap",
+ align="stretch",
)
-def drugs_content() -> rx.Component:
- """Drug selection page content."""
- return selection_page_content(
- title="Drug Selection",
- description="Select which high-cost drugs to include in the analysis",
- items=State.filtered_drugs,
- selected_items=State.selected_drugs,
- toggle_handler=State.toggle_drug,
- select_all_handler=State.select_all_drugs,
- clear_handler=State.clear_drugs,
- count_text=State.drug_selection_count,
- search_value=State.drug_search,
- search_handler=State.set_drug_search,
- clear_search_handler=State.clear_drug_search,
- search_result_text=State.drug_search_result_count,
- extra_buttons=[
- rx.button(
- "Select Defaults",
- on_click=State.select_default_drugs,
- variant="outline",
- size="2",
- aria_label="Select default drugs (Include=1)",
+def chart_loading_skeleton() -> rx.Component:
+ """
+ Loading skeleton for the chart area.
+
+ Displays animated pulsing bars to indicate loading state.
+ Uses design system colors and spacing.
+ """
+ return rx.box(
+ rx.vstack(
+ # Simulated chart bars at different heights
+ rx.hstack(
+ rx.box(
+ background_color=Colors.SLATE_300,
+ width="12%",
+ height="60%",
+ border_radius=Radii.SM,
+ animation="pulse 1.5s ease-in-out infinite",
+ ),
+ rx.box(
+ background_color=Colors.SLATE_300,
+ width="12%",
+ height="80%",
+ border_radius=Radii.SM,
+ animation="pulse 1.5s ease-in-out infinite",
+ animation_delay="0.1s",
+ ),
+ rx.box(
+ background_color=Colors.SLATE_300,
+ width="12%",
+ height="45%",
+ border_radius=Radii.SM,
+ animation="pulse 1.5s ease-in-out infinite",
+ animation_delay="0.2s",
+ ),
+ rx.box(
+ background_color=Colors.SLATE_300,
+ width="12%",
+ height="70%",
+ border_radius=Radii.SM,
+ animation="pulse 1.5s ease-in-out infinite",
+ animation_delay="0.3s",
+ ),
+ rx.box(
+ background_color=Colors.SLATE_300,
+ width="12%",
+ height="55%",
+ border_radius=Radii.SM,
+ animation="pulse 1.5s ease-in-out infinite",
+ animation_delay="0.4s",
+ ),
+ rx.box(
+ background_color=Colors.SLATE_300,
+ width="12%",
+ height="90%",
+ border_radius=Radii.SM,
+ animation="pulse 1.5s ease-in-out infinite",
+ animation_delay="0.5s",
+ ),
+ spacing="3",
+ align="end",
+ justify="center",
+ height="100%",
+ width="100%",
),
- ],
- page_id="drugs",
+ # Loading text
+ rx.hstack(
+ rx.spinner(size="2"),
+ rx.text(
+ "Generating chart...",
+ font_size=Typography.BODY_SIZE,
+ font_weight=Typography.BODY_WEIGHT,
+ color=Colors.SLATE_500,
+ font_family=Typography.FONT_FAMILY,
+ ),
+ spacing="2",
+ align="center",
+ ),
+ spacing="4",
+ align="center",
+ justify="center",
+ height="100%",
+ width="100%",
+ ),
+ background_color=Colors.SLATE_100,
+ border_radius=Radii.MD,
+ width="100%",
+ height="500px",
+ padding=Spacing.XL,
)
-def trusts_content() -> rx.Component:
- """Trust selection page content."""
- return selection_page_content(
- title="Trust Selection",
- description="Select NHS trusts to include (leave empty for all trusts)",
- items=State.filtered_trusts,
- selected_items=State.selected_trusts,
- toggle_handler=State.toggle_trust,
- select_all_handler=State.select_all_trusts,
- clear_handler=State.clear_trusts,
- count_text=State.trust_selection_count,
- search_value=State.trust_search,
- search_handler=State.set_trust_search,
- clear_search_handler=State.clear_trust_search,
- search_result_text=State.trust_search_result_count,
- page_id="trusts",
+def chart_error_state(error_message: rx.Var[str]) -> rx.Component:
+ """
+ Error state for the chart area.
+
+ Displays a friendly error message with an icon and the error details.
+ Provides guidance on how to resolve the issue.
+ """
+ return rx.box(
+ rx.center(
+ rx.vstack(
+ rx.icon(
+ "triangle-alert",
+ size=48,
+ color=Colors.WARNING,
+ ),
+ rx.text(
+ "Unable to Generate Chart",
+ font_size=Typography.H2_SIZE,
+ font_weight=Typography.H2_WEIGHT,
+ color=Colors.SLATE_900,
+ font_family=Typography.FONT_FAMILY,
+ ),
+ rx.text(
+ error_message,
+ font_size=Typography.BODY_SIZE,
+ font_weight=Typography.BODY_WEIGHT,
+ color=Colors.SLATE_700,
+ font_family=Typography.FONT_FAMILY,
+ text_align="center",
+ max_width="400px",
+ ),
+ rx.text(
+ "Try adjusting the filters or check the data source.",
+ font_size=Typography.CAPTION_SIZE,
+ font_weight=Typography.CAPTION_WEIGHT,
+ color=Colors.SLATE_500,
+ font_family=Typography.FONT_FAMILY,
+ ),
+ spacing="3",
+ align="center",
+ ),
+ width="100%",
+ height="100%",
+ ),
+ background_color=Colors.SLATE_100,
+ border_radius=Radii.MD,
+ width="100%",
+ height="500px",
+ padding=Spacing.XL,
)
-def directories_content() -> rx.Component:
- """Directory selection page content."""
- return selection_page_content(
- title="Directory Selection",
- description="Select medical directories/specialties to include (leave empty for all)",
- items=State.filtered_directories,
- selected_items=State.selected_directories,
- toggle_handler=State.toggle_directory,
- select_all_handler=State.select_all_directories,
- clear_handler=State.clear_directories,
- count_text=State.directory_selection_count,
- search_value=State.directory_search,
- search_handler=State.set_directory_search,
- clear_search_handler=State.clear_directory_search,
- search_result_text=State.directory_search_result_count,
- page_id="directories",
+def chart_empty_state() -> rx.Component:
+ """
+ Empty state for when no data matches the filters.
+
+ Displays a friendly message encouraging filter adjustment.
+ """
+ return rx.box(
+ rx.center(
+ rx.vstack(
+ rx.icon(
+ "search-x",
+ size=48,
+ color=Colors.SLATE_500,
+ ),
+ rx.text(
+ "No Data to Display",
+ font_size=Typography.H2_SIZE,
+ font_weight=Typography.H2_WEIGHT,
+ color=Colors.SLATE_900,
+ font_family=Typography.FONT_FAMILY,
+ ),
+ rx.text(
+ "No patient records match your current filter criteria.",
+ font_size=Typography.BODY_SIZE,
+ font_weight=Typography.BODY_WEIGHT,
+ color=Colors.SLATE_700,
+ font_family=Typography.FONT_FAMILY,
+ text_align="center",
+ ),
+ rx.text(
+ "Try widening your date range or selecting more drugs/indications.",
+ font_size=Typography.CAPTION_SIZE,
+ font_weight=Typography.CAPTION_WEIGHT,
+ color=Colors.SLATE_500,
+ font_family=Typography.FONT_FAMILY,
+ ),
+ spacing="3",
+ align="center",
+ ),
+ width="100%",
+ height="100%",
+ ),
+ background_color=Colors.SLATE_100,
+ border_radius=Radii.MD,
+ width="100%",
+ height="500px",
+ padding=Spacing.XL,
+ )
+
+
+def chart_display() -> rx.Component:
+ """
+ Plotly icicle chart display component.
+
+ Renders the interactive icicle chart from AppState.icicle_figure.
+ The figure is a computed property that updates reactively when
+ chart_data changes (which happens when filters change).
+
+ Uses rx.plotly() to render the Plotly figure object.
+ """
+ return rx.box(
+ rx.plotly(
+ data=AppState.icicle_figure,
+ width="100%",
+ height="600px",
+ ),
+ width="100%",
+ min_height="600px",
+ )
+
+
+def chart_section() -> rx.Component:
+ """
+ Main chart section component.
+
+ Contains: Plotly icicle chart with loading, error, empty, and ready states.
+
+ State handling:
+ - Loading: Shows skeleton animation when chart_loading is True
+ - Error: Shows error message when error_message is not empty
+ - Empty: Shows empty state when data_loaded but unique_patients is 0
+ - Ready: Shows interactive Plotly icicle chart
+
+ The chart updates reactively when filters change via the icicle_figure computed property.
+ """
+ return rx.box(
+ rx.vstack(
+ # Header row with title and chart type info
+ rx.hstack(
+ rx.text(
+ "Patient Pathway Visualization",
+ **text_h1(),
+ ),
+ rx.hstack(
+ rx.icon(
+ "info",
+ size=14,
+ color=Colors.SLATE_500,
+ ),
+ rx.text(
+ "Hierarchical view: Trust → Directorate → Drug → Patient Pathway",
+ font_size=Typography.CAPTION_SIZE,
+ font_weight=Typography.CAPTION_WEIGHT,
+ color=Colors.SLATE_500,
+ font_family=Typography.FONT_FAMILY,
+ ),
+ spacing="1",
+ align="center",
+ ),
+ justify="between",
+ align="center",
+ width="100%",
+ flex_wrap="wrap",
+ ),
+ # Chart container with state-based rendering
+ rx.cond(
+ # Priority 1: Loading state
+ AppState.chart_loading,
+ chart_loading_skeleton(),
+ # Not loading - check for error
+ rx.cond(
+ # Priority 2: Error state
+ AppState.error_message != "",
+ chart_error_state(AppState.error_message),
+ # No error - check if data loaded
+ rx.cond(
+ # Priority 3: Data loaded but empty
+ AppState.data_loaded & (AppState.unique_patients == 0),
+ chart_empty_state(),
+ # Priority 4: Ready state - show interactive Plotly chart
+ chart_display(),
+ ),
+ ),
+ ),
+ spacing="4",
+ width="100%",
+ align="start",
+ ),
+ **card_style(),
+ width="100%",
+ )
+
+
+def main_content() -> rx.Component:
+ """
+ Main content area below the top bar.
+
+ Layout: Filter Section → KPI Row → Chart Section
+ Max width constrained to PAGE_MAX_WIDTH, centered.
+ """
+ return rx.box(
+ rx.vstack(
+ filter_section(),
+ kpi_row(),
+ chart_section(),
+ spacing="5",
+ width="100%",
+ align="stretch",
+ ),
+ width="100%",
+ max_width=PAGE_MAX_WIDTH,
+ margin_x="auto",
+ padding=PAGE_PADDING,
+ padding_top=Spacing.XL,
+ )
+
+
+def page_layout() -> rx.Component:
+ """
+ Full page layout combining top bar and main content.
+
+ Structure:
+ - Sticky top bar (64px)
+ - Scrollable main content area
+ - White background
+ """
+ return rx.box(
+ rx.vstack(
+ top_bar(),
+ main_content(),
+ spacing="0",
+ width="100%",
+ min_height="100vh",
+ ),
+ background_color=Colors.WHITE,
+ font_family=Typography.FONT_FAMILY,
+ width="100%",
)
# =============================================================================
-# Page Definitions
+# Page Definition
# =============================================================================
def index() -> rx.Component:
- """Home page."""
- return main_layout(
- content_area(home_content(), page_title="Home"),
- current_page="home",
- )
-
-
-def drugs_page() -> rx.Component:
- """Drug selection page."""
- return main_layout(
- content_area(drugs_content(), page_title=""),
- current_page="drugs",
- )
-
-
-def trusts_page() -> rx.Component:
- """Trust selection page."""
- return main_layout(
- content_area(trusts_content(), page_title=""),
- current_page="trusts",
- )
-
-
-def directories_page() -> rx.Component:
- """Directory selection page."""
- return main_layout(
- content_area(directories_content(), page_title=""),
- current_page="directories",
- )
+ """Main page for HCD Analysis v2."""
+ return page_layout()
# =============================================================================
@@ -2175,10 +2284,11 @@ app = rx.App(
gray_color="slate",
radius="medium",
),
+ stylesheets=[
+ # Google Fonts - Inter (primary) and JetBrains Mono (monospace)
+ "https://fonts.googleapis.com/css2?family=Inter:wght@400;500;600;700&family=JetBrains+Mono:wght@400;500&display=swap",
+ ],
)
-# Add pages
-app.add_page(index, route="/", title="Home | NHS HCD Analysis")
-app.add_page(drugs_page, route="/drugs", title="Drug Selection | NHS HCD Analysis")
-app.add_page(trusts_page, route="/trusts", title="Trust Selection | NHS HCD Analysis")
-app.add_page(directories_page, route="/directories", title="Directory Selection | NHS HCD Analysis")
+# Register page with on_load handler to load data on app initialization
+app.add_page(index, route="/", title="HCD Analysis | Patient Pathways", on_load=AppState.load_data)