feat: add shared pathway query functions for Dash data access (Task 1.1)
Extract load_data() and load_pathway_data() logic from Reflex AppState into standalone functions in src/data_processing/pathway_queries.py. Create thin dash_app/data/queries.py wrapper with DB_PATH resolution.
This commit is contained in:
@@ -98,12 +98,12 @@ Drawer selection → update_drug_selection → app-state store → load_pathway_
|
||||
## Phase 1: Data Access Layer
|
||||
|
||||
### 1.1 Create shared data access functions
|
||||
- [ ] Add query functions to `src/data_processing/database.py` (or a new `src/data_processing/pathway_queries.py` if database.py is already large):
|
||||
- [x] Add query functions to `src/data_processing/pathway_queries.py`:
|
||||
- `load_initial_data(db_path) -> dict` — extracted from `AppState.load_data()` (pathways_app.py lines 407-488): returns `{"available_drugs": [...], "available_directorates": [...], "available_indications": [...], "total_records": int, "last_updated": str}`
|
||||
- `load_pathway_data(db_path, filter_id, chart_type, selected_drugs=None, selected_directorates=None) -> dict` — extracted from `AppState.load_pathway_data()` (lines 490-642): returns `{"nodes": [...], "unique_patients": int, "total_drugs": int, "total_cost": float, "last_updated": str}`
|
||||
- `load_pathway_nodes(db_path, filter_id, chart_type, selected_drugs=None, selected_directorates=None) -> dict` — extracted from `AppState.load_pathway_data()` (lines 490-642): returns `{"nodes": [...], "unique_patients": int, "total_drugs": int, "total_cost": float, "last_updated": str}`
|
||||
- These are plain Python functions that accept `db_path` as a parameter (no Reflex state objects)
|
||||
- [ ] Create thin `dash_app/data/queries.py` that imports and calls the shared functions with the correct `db_path`
|
||||
- [ ] Return plain dicts/lists — JSON-serializable for dcc.Store
|
||||
- [x] Create thin `dash_app/data/queries.py` that imports and calls the shared functions with the correct `db_path`
|
||||
- [x] Return plain dicts/lists — JSON-serializable for dcc.Store
|
||||
- **Checkpoint**: `python -c "from dash_app.data.queries import load_initial_data; print(load_initial_data())"` returns valid data
|
||||
|
||||
### 1.2 Build directorate card tree from DimSearchTerm.csv
|
||||
|
||||
@@ -0,0 +1,37 @@
|
||||
"""
|
||||
Thin wrapper around shared pathway query functions.
|
||||
|
||||
Resolves the database path relative to this file's location and delegates
|
||||
to the shared functions in src/data_processing/pathway_queries.py.
|
||||
"""
|
||||
|
||||
from pathlib import Path
|
||||
from typing import Optional
|
||||
|
||||
from data_processing.pathway_queries import (
|
||||
load_initial_data as _load_initial_data,
|
||||
load_pathway_nodes as _load_pathway_nodes,
|
||||
)
|
||||
|
||||
DB_PATH = Path(__file__).resolve().parents[2] / "data" / "pathways.db"
|
||||
|
||||
|
||||
def load_initial_data() -> dict:
|
||||
"""Load reference data (drugs, directorates, indications, refresh info)."""
|
||||
return _load_initial_data(DB_PATH)
|
||||
|
||||
|
||||
def load_pathway_data(
|
||||
filter_id: str = "all_6mo",
|
||||
chart_type: str = "directory",
|
||||
selected_drugs: Optional[list[str]] = None,
|
||||
selected_directorates: Optional[list[str]] = None,
|
||||
) -> dict:
|
||||
"""Load pre-computed pathway nodes with optional filters."""
|
||||
return _load_pathway_nodes(
|
||||
DB_PATH,
|
||||
filter_id=filter_id,
|
||||
chart_type=chart_type,
|
||||
selected_drugs=selected_drugs,
|
||||
selected_directorates=selected_directorates,
|
||||
)
|
||||
+42
-1
@@ -91,7 +91,7 @@ Migrating the HCD Analysis frontend from Reflex to Dash (Plotly) + Dash Mantine
|
||||
- `dash_app/components/__init__.py` — empty package
|
||||
- `dash_app/callbacks/__init__.py` — empty package
|
||||
- `dash_app/utils/__init__.py` — empty package
|
||||
### Committed: (see below)
|
||||
### Committed: 1c3ece6 "feat: create dash_app skeleton with nhs.css and MantineProvider (Phase 0)"
|
||||
### Patterns discovered:
|
||||
- Dash 4.0.0 and DMC 2.5.1 installed (plan said 2.x and 0.14.x). The API is compatible — `MantineProvider`, `Drawer`, `Accordion`, `Chip` all available. DMC 2.x is based on Mantine v7.
|
||||
- `dmc.MantineProvider(children=[...])` works for wrapping the layout in both DMC versions.
|
||||
@@ -104,3 +104,44 @@ Migrating the HCD Analysis frontend from Reflex to Dash (Plotly) + Dash Mantine
|
||||
- Verify with `python -c "from dash_app.data.queries import load_initial_data; print(load_initial_data())"`
|
||||
### Blocked items:
|
||||
- None
|
||||
|
||||
## Iteration 2 — 2026-02-06
|
||||
### Task: Phase 1 — Task 1.1 (Create shared data access functions)
|
||||
### Why this task:
|
||||
- Phase 0 complete in iteration 1; Phase 1 is next in dependency order
|
||||
- progress.txt from iteration 1 explicitly recommended this task
|
||||
- All UI components and callbacks (Phases 2-5) depend on having data access working
|
||||
### Status: COMPLETE
|
||||
### What was done:
|
||||
- Created `src/data_processing/pathway_queries.py` with two shared functions:
|
||||
- `load_initial_data(db_path)` — returns available drugs (42), directorates (14), indications (32), total_records, last_updated
|
||||
- `load_pathway_nodes(db_path, filter_id, chart_type, selected_drugs, selected_directorates)` — returns nodes list, unique_patients, total_drugs, total_cost, last_updated
|
||||
- Both functions extracted directly from AppState methods in pathways_app.py (lines 407-642), with Reflex `self.*` references replaced by function parameters
|
||||
- All return values are plain dicts/lists — JSON-serializable for dcc.Store
|
||||
- Created thin wrapper `dash_app/data/queries.py` that resolves DB_PATH and delegates to shared functions
|
||||
- Used separate file (pathway_queries.py) rather than adding to database.py because database.py is connection management (240 lines), queries are a distinct concern
|
||||
### Validation results:
|
||||
- Tier 1 (Code): `python -c "from dash_app.data.queries import load_initial_data"` — OK (requires uv run for .pth file)
|
||||
- Tier 1 (App starts): `from dash_app.app import app` — OK, layout type is MantineProvider
|
||||
- Tier 3 (Functional):
|
||||
- `load_initial_data()`: 42 drugs, 14 directorates, 32 indications, last_updated=2026-02-06T00:08:55
|
||||
- `load_pathway_data("all_6mo", "directory")`: 293 nodes, 11,118 patients, 39 drugs, £130.5M cost
|
||||
- `load_pathway_data("all_6mo", "indication")`: 438 nodes, 11,252 patients
|
||||
- `load_pathway_data("all_6mo", "directory", selected_drugs=["ADALIMUMAB"])`: 70 nodes (drug filter works)
|
||||
### Files changed:
|
||||
- `src/data_processing/pathway_queries.py` — NEW: shared query functions
|
||||
- `dash_app/data/queries.py` — NEW: thin Dash wrapper with DB_PATH resolution
|
||||
- `IMPLEMENTATION_PLAN.md` — Task 1.1 marked [x]
|
||||
### Committed: (pending)
|
||||
### Patterns discovered:
|
||||
- `src/` is on sys.path only when using `uv run` (via .pth file created by setup_dev.py). Running `python` directly won't find `data_processing` module. Always use `uv run python` for testing.
|
||||
- `total_records` from `pathway_refresh_log` returns 0 — the refresh log's `source_row_count` field appears empty despite `completed_at` having a value. This is cosmetic — the KPI can use `unique_patients` from chart-data instead.
|
||||
- Drug filtering correctly includes nodes with NULL drug_sequence (root, trust, directory levels) alongside matching drug nodes. Root node patient count becomes 0 when drug filter is active — this matches Reflex behavior.
|
||||
### Next iteration should:
|
||||
- Start Task 1.2 — Build directorate card tree from DimSearchTerm.csv
|
||||
- Create `dash_app/data/card_browser.py` with `build_directorate_tree()` and `get_all_drugs()`
|
||||
- Read `data/DimSearchTerm.csv` to understand the data format
|
||||
- Import SEARCH_TERM_MERGE_MAP from `data_processing.diagnosis_lookup` for asthma normalization
|
||||
- Remember: drug fragments in CleanedDrugName are UPPERCASE substrings, not exact matches
|
||||
### Blocked items:
|
||||
- None
|
||||
|
||||
@@ -0,0 +1,248 @@
|
||||
"""
|
||||
Shared query functions for pathway node data.
|
||||
|
||||
These functions extract the data loading logic from the Reflex AppState
|
||||
into standalone functions that accept db_path as a parameter and return
|
||||
plain JSON-serializable dicts/lists. Both Reflex and Dash can call these.
|
||||
|
||||
All queries are read-only SELECTs against pathways.db.
|
||||
"""
|
||||
|
||||
import sqlite3
|
||||
from pathlib import Path
|
||||
from typing import Optional
|
||||
|
||||
|
||||
def load_initial_data(db_path: Path) -> dict:
|
||||
"""
|
||||
Load reference data from SQLite on app initialization.
|
||||
|
||||
Extracted from AppState.load_data() (pathways_app.py lines 407-488).
|
||||
|
||||
Returns dict with keys:
|
||||
available_drugs: sorted list of unique drug labels (level 3 nodes)
|
||||
available_directorates: sorted list of unique directorate labels (level 2, directory charts)
|
||||
available_indications: sorted list of unique indications from ref_drug_indication_clusters
|
||||
total_records: source row count from latest completed refresh
|
||||
last_updated: ISO timestamp of latest completed refresh
|
||||
"""
|
||||
if not db_path.exists():
|
||||
return {
|
||||
"available_drugs": [],
|
||||
"available_directorates": [],
|
||||
"available_indications": [],
|
||||
"total_records": 0,
|
||||
"last_updated": "",
|
||||
"error": "Database not found",
|
||||
}
|
||||
|
||||
conn = sqlite3.connect(str(db_path))
|
||||
try:
|
||||
cursor = conn.cursor()
|
||||
|
||||
# Latest completed refresh metadata
|
||||
cursor.execute("""
|
||||
SELECT source_row_count, completed_at
|
||||
FROM pathway_refresh_log
|
||||
WHERE status = 'completed'
|
||||
ORDER BY started_at DESC
|
||||
LIMIT 1
|
||||
""")
|
||||
refresh_row = cursor.fetchone()
|
||||
total_records = (refresh_row[0] or 0) if refresh_row else 0
|
||||
last_updated = (refresh_row[1] or "") if refresh_row else ""
|
||||
|
||||
# Unique drugs from pathway_nodes level 3
|
||||
cursor.execute("""
|
||||
SELECT DISTINCT labels
|
||||
FROM pathway_nodes
|
||||
WHERE level = 3 AND labels IS NOT NULL AND labels != ''
|
||||
ORDER BY labels
|
||||
""")
|
||||
available_drugs = [row[0] for row in cursor.fetchall()]
|
||||
|
||||
# Unique directorates from directory chart pathway_nodes level 2
|
||||
cursor.execute("""
|
||||
SELECT DISTINCT labels
|
||||
FROM pathway_nodes
|
||||
WHERE level = 2 AND chart_type = 'directory'
|
||||
AND labels IS NOT NULL AND labels != ''
|
||||
ORDER BY labels
|
||||
""")
|
||||
available_directorates = [row[0] for row in cursor.fetchall()]
|
||||
|
||||
# Unique indications from ref_drug_indication_clusters
|
||||
cursor.execute("""
|
||||
SELECT DISTINCT indication
|
||||
FROM ref_drug_indication_clusters
|
||||
WHERE indication IS NOT NULL AND indication != ''
|
||||
ORDER BY indication
|
||||
""")
|
||||
available_indications = [row[0] for row in cursor.fetchall()]
|
||||
if not available_indications:
|
||||
available_indications = ["(No indications available)"]
|
||||
|
||||
return {
|
||||
"available_drugs": available_drugs,
|
||||
"available_directorates": available_directorates,
|
||||
"available_indications": available_indications,
|
||||
"total_records": total_records,
|
||||
"last_updated": last_updated,
|
||||
}
|
||||
except sqlite3.Error as e:
|
||||
return {
|
||||
"available_drugs": [],
|
||||
"available_directorates": [],
|
||||
"available_indications": [],
|
||||
"total_records": 0,
|
||||
"last_updated": "",
|
||||
"error": f"Database error: {e}",
|
||||
}
|
||||
finally:
|
||||
conn.close()
|
||||
|
||||
|
||||
def load_pathway_nodes(
|
||||
db_path: Path,
|
||||
filter_id: str,
|
||||
chart_type: str,
|
||||
selected_drugs: Optional[list[str]] = None,
|
||||
selected_directorates: Optional[list[str]] = None,
|
||||
) -> dict:
|
||||
"""
|
||||
Load pre-computed pathway nodes from SQLite.
|
||||
|
||||
Extracted from AppState.load_pathway_data() (pathways_app.py lines 490-642).
|
||||
|
||||
Args:
|
||||
db_path: Path to pathways.db
|
||||
filter_id: e.g. "all_6mo", "2yr_12mo"
|
||||
chart_type: "directory" or "indication"
|
||||
selected_drugs: optional list of drug names to filter by
|
||||
selected_directorates: optional list of directorate names to filter by
|
||||
|
||||
Returns dict with keys:
|
||||
nodes: list of dicts (JSON-serializable) with chart node data
|
||||
unique_patients: int (from root node)
|
||||
total_drugs: int (unique drugs across level 3+ nodes)
|
||||
total_cost: float (from root node)
|
||||
last_updated: ISO timestamp string
|
||||
error: optional error string
|
||||
"""
|
||||
if not db_path.exists():
|
||||
return _empty_result("Database not found")
|
||||
|
||||
conn = sqlite3.connect(str(db_path))
|
||||
conn.row_factory = sqlite3.Row
|
||||
try:
|
||||
cursor = conn.cursor()
|
||||
|
||||
# Build WHERE clause with parameterized values
|
||||
where_clauses = ["date_filter_id = ?", "chart_type = ?"]
|
||||
params: list = [filter_id, chart_type]
|
||||
|
||||
if selected_directorates:
|
||||
placeholders = ",".join("?" * len(selected_directorates))
|
||||
where_clauses.append(
|
||||
f"(directory IN ({placeholders}) OR directory IS NULL)"
|
||||
)
|
||||
params.extend(selected_directorates)
|
||||
|
||||
if selected_drugs:
|
||||
drug_conditions = []
|
||||
for drug in selected_drugs:
|
||||
drug_conditions.append("drug_sequence LIKE ?")
|
||||
params.append(f"%{drug}%")
|
||||
where_clauses.append(
|
||||
f"({' OR '.join(drug_conditions)} OR drug_sequence IS NULL)"
|
||||
)
|
||||
|
||||
where_clause = " AND ".join(where_clauses)
|
||||
|
||||
query = f"""
|
||||
SELECT
|
||||
parents, ids, labels, level, value,
|
||||
cost, costpp, cost_pp_pa, colour,
|
||||
first_seen, last_seen, first_seen_parent, last_seen_parent,
|
||||
average_spacing, average_administered, avg_days,
|
||||
trust_name, directory, drug_sequence
|
||||
FROM pathway_nodes
|
||||
WHERE {where_clause}
|
||||
ORDER BY level, parents, ids
|
||||
"""
|
||||
|
||||
cursor.execute(query, params)
|
||||
rows = cursor.fetchall()
|
||||
|
||||
if not rows:
|
||||
return _empty_result(f"No pathway data for filter: {filter_id}")
|
||||
|
||||
nodes = []
|
||||
root_patients = 0
|
||||
root_cost = 0.0
|
||||
|
||||
for row in rows:
|
||||
node = {
|
||||
"parents": row["parents"] or "",
|
||||
"ids": row["ids"] or "",
|
||||
"labels": row["labels"] or "",
|
||||
"value": row["value"] or 0,
|
||||
"cost": float(row["cost"]) if row["cost"] else 0.0,
|
||||
"costpp": float(row["costpp"]) if row["costpp"] else 0.0,
|
||||
"colour": float(row["colour"]) if row["colour"] else 0.0,
|
||||
"first_seen": row["first_seen"] or "",
|
||||
"last_seen": row["last_seen"] or "",
|
||||
"first_seen_parent": row["first_seen_parent"] or "",
|
||||
"last_seen_parent": row["last_seen_parent"] or "",
|
||||
"average_spacing": row["average_spacing"] or "",
|
||||
"cost_pp_pa": row["cost_pp_pa"] or "",
|
||||
}
|
||||
nodes.append(node)
|
||||
|
||||
if row["level"] == 0:
|
||||
root_patients = row["value"] or 0
|
||||
root_cost = float(row["cost"]) if row["cost"] else 0.0
|
||||
|
||||
# Count unique drugs from level 3+ nodes
|
||||
unique_drugs = set()
|
||||
for row in rows:
|
||||
if row["level"] >= 3 and row["drug_sequence"]:
|
||||
for drug in row["drug_sequence"].split("|"):
|
||||
if drug:
|
||||
unique_drugs.add(drug)
|
||||
|
||||
# Data freshness
|
||||
cursor.execute("""
|
||||
SELECT completed_at
|
||||
FROM pathway_refresh_log
|
||||
WHERE status = 'completed'
|
||||
ORDER BY completed_at DESC
|
||||
LIMIT 1
|
||||
""")
|
||||
refresh_row = cursor.fetchone()
|
||||
last_updated = (
|
||||
refresh_row["completed_at"] if refresh_row and refresh_row["completed_at"] else ""
|
||||
)
|
||||
|
||||
return {
|
||||
"nodes": nodes,
|
||||
"unique_patients": root_patients,
|
||||
"total_drugs": len(unique_drugs),
|
||||
"total_cost": root_cost,
|
||||
"last_updated": last_updated,
|
||||
}
|
||||
except sqlite3.Error as e:
|
||||
return _empty_result(f"Database error: {e}")
|
||||
finally:
|
||||
conn.close()
|
||||
|
||||
|
||||
def _empty_result(error: str = "") -> dict:
|
||||
return {
|
||||
"nodes": [],
|
||||
"unique_patients": 0,
|
||||
"total_drugs": 0,
|
||||
"total_cost": 0.0,
|
||||
"last_updated": "",
|
||||
"error": error,
|
||||
}
|
||||
Reference in New Issue
Block a user