feat: implement chart data preparation (Task 4.1)

- Add prepare_chart_data() method for hierarchical chart data
- Build Trust → Directory → Drug hierarchy from filtered SQLite data
- Calculate patient counts and costs at each hierarchy level
- Compute color values (proportions) for visualization
- Generate dynamic chart title based on filter state
- Call prepare_chart_data() from apply_filters() for reactivity
- Mark Task 3.4 complete (KPIs implemented in apply_filters)
This commit is contained in:
Andrew Charlwood
2026-02-04 18:41:37 +00:00
parent cd15ab6cdf
commit 14f970d37b
2 changed files with 286 additions and 6 deletions
+11 -6
View File
@@ -129,17 +129,22 @@ cd pathways_app && timeout 60 python -m reflex run 2>&1 | head -30
- [x] Return filtered DataFrame
### 3.4 KPI Calculations
- [ ] Create computed properties for KPI values:
- [x] Create computed properties for KPI values:
- `unique_patients: int` — COUNT(DISTINCT patient_id) from filtered data
- (Future: drug count, total cost, indication match rate)
- [ ] Ensure KPIs update reactively when filters change
- `total_drugs: int` — COUNT(DISTINCT drug_name_std) from filtered data
- `total_cost: float` — SUM(price_actual) from filtered data
- (Blocked: indication_match_rate requires Snowflake GP data)
- [x] Ensure KPIs update reactively when filters change
- Note: KPIs implemented in apply_filters() method, called by all filter event handlers
## Phase 4: Interactive Chart
### 4.1 Chart Data Preparation
- [ ] Create `prepare_chart_data()` method that transforms filtered data for Plotly icicle
- [ ] Reuse/adapt logic from existing `pathway_analyzer.py`
- [ ] Return data structure compatible with `plotly.express.icicle()`
- [x] Create `prepare_chart_data()` method that transforms filtered data for Plotly icicle
- [x] Reuse/adapt logic from existing `pathway_analyzer.py` (simplified hierarchy: Trust → Directory → Drug)
- [x] Return data structure compatible with `go.Icicle()` (list of dicts with parents, ids, labels, value, cost, colour)
- [x] Generate chart_title based on current filter state
- [x] Call prepare_chart_data() from apply_filters() for reactive updates
### 4.2 Reactive Plotly Integration
- [ ] Create `generate_icicle_chart()` computed property that returns Plotly figure
+275
View File
@@ -509,6 +509,9 @@ class AppState(rx.State):
conn.close()
self.error_message = ""
# Update chart data with new filtered results
self.prepare_chart_data()
except sqlite3.Error as e:
self.error_message = f"Database error: {str(e)}"
except Exception as e:
@@ -628,6 +631,278 @@ class AppState(rx.State):
self.error_message = f"Failed to load data: {str(e)}"
self.data_loaded = False
# =========================================================================
# Chart Data Preparation Methods
# =========================================================================
# Chart data stored as list of dicts for Reflex serialization
# Structure: [{"parents": str, "ids": str, "labels": str, "value": int, "cost": float, "colour": float}, ...]
chart_data: list[dict[str, Any]] = []
chart_title: str = ""
def prepare_chart_data(self):
"""
Prepare hierarchical data for Plotly icicle chart.
This method queries the filtered patient data and transforms it into
a hierarchical structure: Root → Trust → Directory → Drug
The chart data is stored in self.chart_data as a list of dicts with:
- parents: Parent node identifier
- ids: Unique node identifier (hierarchical path)
- labels: Display label
- value: Patient count
- cost: Total cost
- colour: Color value (proportion of parent)
Updates: chart_data, chart_title, chart_loading
"""
import sqlite3
db_path = Path("data/pathways.db")
if not db_path.exists():
self.error_message = "Database not found."
self.chart_data = []
return
self.chart_loading = True
try:
conn = sqlite3.connect(str(db_path))
cursor = conn.cursor()
# Build WHERE clause for filters
where_clauses = []
params = []
# Drug filter (if any drugs selected)
if self.selected_drugs:
placeholders = ",".join("?" * len(self.selected_drugs))
where_clauses.append(f"drug_name_std IN ({placeholders})")
params.extend(self.selected_drugs)
# Directorate filter (if any directorates selected)
if self.selected_directorates:
placeholders = ",".join("?" * len(self.selected_directorates))
where_clauses.append(f"directory IN ({placeholders})")
params.extend(self.selected_directorates)
base_where = ""
if where_clauses:
base_where = "WHERE " + " AND ".join(where_clauses)
# Build date filter HAVING clauses for patient-level filtering
having_clauses = []
having_params = []
if self.initiated_filter_enabled and self.initiated_from_date:
having_clauses.append("first_seen >= ?")
having_params.append(self.initiated_from_date)
if self.initiated_filter_enabled and self.initiated_to_date:
having_clauses.append("first_seen <= ?")
having_params.append(self.initiated_to_date)
if self.last_seen_filter_enabled and self.last_seen_from_date:
having_clauses.append("last_seen >= ?")
having_params.append(self.last_seen_from_date)
if self.last_seen_filter_enabled and self.last_seen_to_date:
having_clauses.append("last_seen <= ?")
having_params.append(self.last_seen_to_date)
having_clause = ""
if having_clauses:
having_clause = "HAVING " + " AND ".join(having_clauses)
# Query to get aggregated data by Trust -> Directory -> Drug
# fact_interventions already has org_name, use it directly
chart_query = f"""
WITH filtered_patients AS (
SELECT upid
FROM (
SELECT
upid,
MIN(intervention_date) as first_seen,
MAX(intervention_date) as last_seen
FROM fact_interventions
{base_where}
GROUP BY upid
{having_clause}
)
),
patient_records AS (
SELECT
f.upid,
COALESCE(f.org_name, f.provider_code) as trust_name,
f.directory,
f.drug_name_std,
f.price_actual
FROM fact_interventions f
INNER JOIN filtered_patients fp ON f.upid = fp.upid
{base_where.replace('WHERE', 'AND') if base_where else ''}
)
SELECT
trust_name,
directory,
drug_name_std,
COUNT(DISTINCT upid) as patient_count,
COALESCE(SUM(price_actual), 0) as total_cost
FROM patient_records
GROUP BY trust_name, directory, drug_name_std
ORDER BY trust_name, directory, drug_name_std
"""
all_params = params + having_params
if where_clauses:
all_params.extend(params)
cursor.execute(chart_query, all_params)
rows = cursor.fetchall()
conn.close()
# Build hierarchical chart data
chart_data = []
hierarchy_totals = {} # Track totals for calculating color values
# Root node
root_id = "N&WICS"
chart_data.append({
"parents": "",
"ids": root_id,
"labels": "Norfolk & Waveney ICS",
"value": 0,
"cost": 0.0,
"colour": 1.0,
})
# Process rows to build hierarchy
trust_totals = {}
directory_totals = {}
drug_data = []
for row in rows:
trust_name, directory, drug_name, patient_count, cost = row
if not trust_name or not directory or not drug_name:
continue
# Trust level
trust_id = f"{root_id} - {trust_name}"
if trust_id not in trust_totals:
trust_totals[trust_id] = {"value": 0, "cost": 0.0, "label": trust_name}
trust_totals[trust_id]["value"] += patient_count
trust_totals[trust_id]["cost"] += cost
# Directory level
dir_id = f"{trust_id} - {directory}"
if dir_id not in directory_totals:
directory_totals[dir_id] = {
"value": 0,
"cost": 0.0,
"label": directory,
"parent": trust_id,
}
directory_totals[dir_id]["value"] += patient_count
directory_totals[dir_id]["cost"] += cost
# Drug level (leaf)
drug_id = f"{dir_id} - {drug_name}"
drug_data.append({
"ids": drug_id,
"labels": drug_name,
"parent": dir_id,
"value": patient_count,
"cost": float(cost),
})
# Calculate root total
root_total = sum(t["value"] for t in trust_totals.values())
root_cost = sum(t["cost"] for t in trust_totals.values())
chart_data[0]["value"] = root_total
chart_data[0]["cost"] = root_cost
# Add trust nodes with color proportions
for trust_id, data in trust_totals.items():
colour = data["value"] / root_total if root_total > 0 else 0
chart_data.append({
"parents": root_id,
"ids": trust_id,
"labels": data["label"],
"value": data["value"],
"cost": data["cost"],
"colour": colour,
})
# Add directory nodes with color proportions
for dir_id, data in directory_totals.items():
parent_total = trust_totals[data["parent"]]["value"]
colour = data["value"] / parent_total if parent_total > 0 else 0
chart_data.append({
"parents": data["parent"],
"ids": dir_id,
"labels": data["label"],
"value": data["value"],
"cost": data["cost"],
"colour": colour,
})
# Add drug nodes with color proportions
for drug in drug_data:
parent_dir = drug["parent"]
parent_total = directory_totals[parent_dir]["value"]
colour = drug["value"] / parent_total if parent_total > 0 else 0
chart_data.append({
"parents": parent_dir,
"ids": drug["ids"],
"labels": drug["labels"],
"value": drug["value"],
"cost": drug["cost"],
"colour": colour,
})
self.chart_data = chart_data
self.chart_title = self._generate_chart_title()
self.chart_loading = False
self.error_message = ""
except sqlite3.Error as e:
self.error_message = f"Chart data error: {str(e)}"
self.chart_data = []
self.chart_loading = False
except Exception as e:
self.error_message = f"Chart preparation failed: {str(e)}"
self.chart_data = []
self.chart_loading = False
def _generate_chart_title(self) -> str:
"""Generate chart title based on current filter state."""
parts = []
# Date range info
if self.last_seen_filter_enabled:
parts.append(f"Last seen: {self.last_seen_from_date} to {self.last_seen_to_date}")
elif self.initiated_filter_enabled:
parts.append(f"Initiated: {self.initiated_from_date} to {self.initiated_to_date}")
# Drug selection info
if self.selected_drugs:
if len(self.selected_drugs) <= 3:
parts.append(", ".join(self.selected_drugs))
else:
parts.append(f"{len(self.selected_drugs)} drugs selected")
# Directorate selection info
if self.selected_directorates:
if len(self.selected_directorates) <= 2:
parts.append(", ".join(self.selected_directorates))
else:
parts.append(f"{len(self.selected_directorates)} directorates")
if parts:
return " | ".join(parts)
return "All Patients"
# =============================================================================
# Layout Components