From b71748fa7de62712f7e932d025032fdcd3502e44 Mon Sep 17 00:00:00 2001 From: Andrew Charlwood Date: Fri, 6 Feb 2026 13:02:34 +0000 Subject: [PATCH] feat: add shared pathway query functions for Dash data access (Task 1.1) Extract load_data() and load_pathway_data() logic from Reflex AppState into standalone functions in src/data_processing/pathway_queries.py. Create thin dash_app/data/queries.py wrapper with DB_PATH resolution. --- IMPLEMENTATION_PLAN.md | 8 +- dash_app/data/queries.py | 37 ++++ progress.txt | 43 ++++- src/data_processing/pathway_queries.py | 248 +++++++++++++++++++++++++ 4 files changed, 331 insertions(+), 5 deletions(-) create mode 100644 dash_app/data/queries.py create mode 100644 src/data_processing/pathway_queries.py diff --git a/IMPLEMENTATION_PLAN.md b/IMPLEMENTATION_PLAN.md index 3f99497..9877277 100644 --- a/IMPLEMENTATION_PLAN.md +++ b/IMPLEMENTATION_PLAN.md @@ -98,12 +98,12 @@ Drawer selection → update_drug_selection → app-state store → load_pathway_ ## Phase 1: Data Access Layer ### 1.1 Create shared data access functions -- [ ] Add query functions to `src/data_processing/database.py` (or a new `src/data_processing/pathway_queries.py` if database.py is already large): +- [x] Add query functions to `src/data_processing/pathway_queries.py`: - `load_initial_data(db_path) -> dict` — extracted from `AppState.load_data()` (pathways_app.py lines 407-488): returns `{"available_drugs": [...], "available_directorates": [...], "available_indications": [...], "total_records": int, "last_updated": str}` - - `load_pathway_data(db_path, filter_id, chart_type, selected_drugs=None, selected_directorates=None) -> dict` — extracted from `AppState.load_pathway_data()` (lines 490-642): returns `{"nodes": [...], "unique_patients": int, "total_drugs": int, "total_cost": float, "last_updated": str}` + - `load_pathway_nodes(db_path, filter_id, chart_type, selected_drugs=None, selected_directorates=None) -> dict` — extracted from `AppState.load_pathway_data()` (lines 490-642): returns `{"nodes": [...], "unique_patients": int, "total_drugs": int, "total_cost": float, "last_updated": str}` - These are plain Python functions that accept `db_path` as a parameter (no Reflex state objects) -- [ ] Create thin `dash_app/data/queries.py` that imports and calls the shared functions with the correct `db_path` -- [ ] Return plain dicts/lists — JSON-serializable for dcc.Store +- [x] Create thin `dash_app/data/queries.py` that imports and calls the shared functions with the correct `db_path` +- [x] Return plain dicts/lists — JSON-serializable for dcc.Store - **Checkpoint**: `python -c "from dash_app.data.queries import load_initial_data; print(load_initial_data())"` returns valid data ### 1.2 Build directorate card tree from DimSearchTerm.csv diff --git a/dash_app/data/queries.py b/dash_app/data/queries.py new file mode 100644 index 0000000..358f884 --- /dev/null +++ b/dash_app/data/queries.py @@ -0,0 +1,37 @@ +""" +Thin wrapper around shared pathway query functions. + +Resolves the database path relative to this file's location and delegates +to the shared functions in src/data_processing/pathway_queries.py. +""" + +from pathlib import Path +from typing import Optional + +from data_processing.pathway_queries import ( + load_initial_data as _load_initial_data, + load_pathway_nodes as _load_pathway_nodes, +) + +DB_PATH = Path(__file__).resolve().parents[2] / "data" / "pathways.db" + + +def load_initial_data() -> dict: + """Load reference data (drugs, directorates, indications, refresh info).""" + return _load_initial_data(DB_PATH) + + +def load_pathway_data( + filter_id: str = "all_6mo", + chart_type: str = "directory", + selected_drugs: Optional[list[str]] = None, + selected_directorates: Optional[list[str]] = None, +) -> dict: + """Load pre-computed pathway nodes with optional filters.""" + return _load_pathway_nodes( + DB_PATH, + filter_id=filter_id, + chart_type=chart_type, + selected_drugs=selected_drugs, + selected_directorates=selected_directorates, + ) diff --git a/progress.txt b/progress.txt index af1b90d..2702249 100644 --- a/progress.txt +++ b/progress.txt @@ -91,7 +91,7 @@ Migrating the HCD Analysis frontend from Reflex to Dash (Plotly) + Dash Mantine - `dash_app/components/__init__.py` — empty package - `dash_app/callbacks/__init__.py` — empty package - `dash_app/utils/__init__.py` — empty package -### Committed: (see below) +### Committed: 1c3ece6 "feat: create dash_app skeleton with nhs.css and MantineProvider (Phase 0)" ### Patterns discovered: - Dash 4.0.0 and DMC 2.5.1 installed (plan said 2.x and 0.14.x). The API is compatible — `MantineProvider`, `Drawer`, `Accordion`, `Chip` all available. DMC 2.x is based on Mantine v7. - `dmc.MantineProvider(children=[...])` works for wrapping the layout in both DMC versions. @@ -104,3 +104,44 @@ Migrating the HCD Analysis frontend from Reflex to Dash (Plotly) + Dash Mantine - Verify with `python -c "from dash_app.data.queries import load_initial_data; print(load_initial_data())"` ### Blocked items: - None + +## Iteration 2 — 2026-02-06 +### Task: Phase 1 — Task 1.1 (Create shared data access functions) +### Why this task: +- Phase 0 complete in iteration 1; Phase 1 is next in dependency order +- progress.txt from iteration 1 explicitly recommended this task +- All UI components and callbacks (Phases 2-5) depend on having data access working +### Status: COMPLETE +### What was done: +- Created `src/data_processing/pathway_queries.py` with two shared functions: + - `load_initial_data(db_path)` — returns available drugs (42), directorates (14), indications (32), total_records, last_updated + - `load_pathway_nodes(db_path, filter_id, chart_type, selected_drugs, selected_directorates)` — returns nodes list, unique_patients, total_drugs, total_cost, last_updated +- Both functions extracted directly from AppState methods in pathways_app.py (lines 407-642), with Reflex `self.*` references replaced by function parameters +- All return values are plain dicts/lists — JSON-serializable for dcc.Store +- Created thin wrapper `dash_app/data/queries.py` that resolves DB_PATH and delegates to shared functions +- Used separate file (pathway_queries.py) rather than adding to database.py because database.py is connection management (240 lines), queries are a distinct concern +### Validation results: +- Tier 1 (Code): `python -c "from dash_app.data.queries import load_initial_data"` — OK (requires uv run for .pth file) +- Tier 1 (App starts): `from dash_app.app import app` — OK, layout type is MantineProvider +- Tier 3 (Functional): + - `load_initial_data()`: 42 drugs, 14 directorates, 32 indications, last_updated=2026-02-06T00:08:55 + - `load_pathway_data("all_6mo", "directory")`: 293 nodes, 11,118 patients, 39 drugs, £130.5M cost + - `load_pathway_data("all_6mo", "indication")`: 438 nodes, 11,252 patients + - `load_pathway_data("all_6mo", "directory", selected_drugs=["ADALIMUMAB"])`: 70 nodes (drug filter works) +### Files changed: +- `src/data_processing/pathway_queries.py` — NEW: shared query functions +- `dash_app/data/queries.py` — NEW: thin Dash wrapper with DB_PATH resolution +- `IMPLEMENTATION_PLAN.md` — Task 1.1 marked [x] +### Committed: (pending) +### Patterns discovered: +- `src/` is on sys.path only when using `uv run` (via .pth file created by setup_dev.py). Running `python` directly won't find `data_processing` module. Always use `uv run python` for testing. +- `total_records` from `pathway_refresh_log` returns 0 — the refresh log's `source_row_count` field appears empty despite `completed_at` having a value. This is cosmetic — the KPI can use `unique_patients` from chart-data instead. +- Drug filtering correctly includes nodes with NULL drug_sequence (root, trust, directory levels) alongside matching drug nodes. Root node patient count becomes 0 when drug filter is active — this matches Reflex behavior. +### Next iteration should: +- Start Task 1.2 — Build directorate card tree from DimSearchTerm.csv +- Create `dash_app/data/card_browser.py` with `build_directorate_tree()` and `get_all_drugs()` +- Read `data/DimSearchTerm.csv` to understand the data format +- Import SEARCH_TERM_MERGE_MAP from `data_processing.diagnosis_lookup` for asthma normalization +- Remember: drug fragments in CleanedDrugName are UPPERCASE substrings, not exact matches +### Blocked items: +- None diff --git a/src/data_processing/pathway_queries.py b/src/data_processing/pathway_queries.py new file mode 100644 index 0000000..bf1941d --- /dev/null +++ b/src/data_processing/pathway_queries.py @@ -0,0 +1,248 @@ +""" +Shared query functions for pathway node data. + +These functions extract the data loading logic from the Reflex AppState +into standalone functions that accept db_path as a parameter and return +plain JSON-serializable dicts/lists. Both Reflex and Dash can call these. + +All queries are read-only SELECTs against pathways.db. +""" + +import sqlite3 +from pathlib import Path +from typing import Optional + + +def load_initial_data(db_path: Path) -> dict: + """ + Load reference data from SQLite on app initialization. + + Extracted from AppState.load_data() (pathways_app.py lines 407-488). + + Returns dict with keys: + available_drugs: sorted list of unique drug labels (level 3 nodes) + available_directorates: sorted list of unique directorate labels (level 2, directory charts) + available_indications: sorted list of unique indications from ref_drug_indication_clusters + total_records: source row count from latest completed refresh + last_updated: ISO timestamp of latest completed refresh + """ + if not db_path.exists(): + return { + "available_drugs": [], + "available_directorates": [], + "available_indications": [], + "total_records": 0, + "last_updated": "", + "error": "Database not found", + } + + conn = sqlite3.connect(str(db_path)) + try: + cursor = conn.cursor() + + # Latest completed refresh metadata + cursor.execute(""" + SELECT source_row_count, completed_at + FROM pathway_refresh_log + WHERE status = 'completed' + ORDER BY started_at DESC + LIMIT 1 + """) + refresh_row = cursor.fetchone() + total_records = (refresh_row[0] or 0) if refresh_row else 0 + last_updated = (refresh_row[1] or "") if refresh_row else "" + + # Unique drugs from pathway_nodes level 3 + cursor.execute(""" + SELECT DISTINCT labels + FROM pathway_nodes + WHERE level = 3 AND labels IS NOT NULL AND labels != '' + ORDER BY labels + """) + available_drugs = [row[0] for row in cursor.fetchall()] + + # Unique directorates from directory chart pathway_nodes level 2 + cursor.execute(""" + SELECT DISTINCT labels + FROM pathway_nodes + WHERE level = 2 AND chart_type = 'directory' + AND labels IS NOT NULL AND labels != '' + ORDER BY labels + """) + available_directorates = [row[0] for row in cursor.fetchall()] + + # Unique indications from ref_drug_indication_clusters + cursor.execute(""" + SELECT DISTINCT indication + FROM ref_drug_indication_clusters + WHERE indication IS NOT NULL AND indication != '' + ORDER BY indication + """) + available_indications = [row[0] for row in cursor.fetchall()] + if not available_indications: + available_indications = ["(No indications available)"] + + return { + "available_drugs": available_drugs, + "available_directorates": available_directorates, + "available_indications": available_indications, + "total_records": total_records, + "last_updated": last_updated, + } + except sqlite3.Error as e: + return { + "available_drugs": [], + "available_directorates": [], + "available_indications": [], + "total_records": 0, + "last_updated": "", + "error": f"Database error: {e}", + } + finally: + conn.close() + + +def load_pathway_nodes( + db_path: Path, + filter_id: str, + chart_type: str, + selected_drugs: Optional[list[str]] = None, + selected_directorates: Optional[list[str]] = None, +) -> dict: + """ + Load pre-computed pathway nodes from SQLite. + + Extracted from AppState.load_pathway_data() (pathways_app.py lines 490-642). + + Args: + db_path: Path to pathways.db + filter_id: e.g. "all_6mo", "2yr_12mo" + chart_type: "directory" or "indication" + selected_drugs: optional list of drug names to filter by + selected_directorates: optional list of directorate names to filter by + + Returns dict with keys: + nodes: list of dicts (JSON-serializable) with chart node data + unique_patients: int (from root node) + total_drugs: int (unique drugs across level 3+ nodes) + total_cost: float (from root node) + last_updated: ISO timestamp string + error: optional error string + """ + if not db_path.exists(): + return _empty_result("Database not found") + + conn = sqlite3.connect(str(db_path)) + conn.row_factory = sqlite3.Row + try: + cursor = conn.cursor() + + # Build WHERE clause with parameterized values + where_clauses = ["date_filter_id = ?", "chart_type = ?"] + params: list = [filter_id, chart_type] + + if selected_directorates: + placeholders = ",".join("?" * len(selected_directorates)) + where_clauses.append( + f"(directory IN ({placeholders}) OR directory IS NULL)" + ) + params.extend(selected_directorates) + + if selected_drugs: + drug_conditions = [] + for drug in selected_drugs: + drug_conditions.append("drug_sequence LIKE ?") + params.append(f"%{drug}%") + where_clauses.append( + f"({' OR '.join(drug_conditions)} OR drug_sequence IS NULL)" + ) + + where_clause = " AND ".join(where_clauses) + + query = f""" + SELECT + parents, ids, labels, level, value, + cost, costpp, cost_pp_pa, colour, + first_seen, last_seen, first_seen_parent, last_seen_parent, + average_spacing, average_administered, avg_days, + trust_name, directory, drug_sequence + FROM pathway_nodes + WHERE {where_clause} + ORDER BY level, parents, ids + """ + + cursor.execute(query, params) + rows = cursor.fetchall() + + if not rows: + return _empty_result(f"No pathway data for filter: {filter_id}") + + nodes = [] + root_patients = 0 + root_cost = 0.0 + + for row in rows: + node = { + "parents": row["parents"] or "", + "ids": row["ids"] or "", + "labels": row["labels"] or "", + "value": row["value"] or 0, + "cost": float(row["cost"]) if row["cost"] else 0.0, + "costpp": float(row["costpp"]) if row["costpp"] else 0.0, + "colour": float(row["colour"]) if row["colour"] else 0.0, + "first_seen": row["first_seen"] or "", + "last_seen": row["last_seen"] or "", + "first_seen_parent": row["first_seen_parent"] or "", + "last_seen_parent": row["last_seen_parent"] or "", + "average_spacing": row["average_spacing"] or "", + "cost_pp_pa": row["cost_pp_pa"] or "", + } + nodes.append(node) + + if row["level"] == 0: + root_patients = row["value"] or 0 + root_cost = float(row["cost"]) if row["cost"] else 0.0 + + # Count unique drugs from level 3+ nodes + unique_drugs = set() + for row in rows: + if row["level"] >= 3 and row["drug_sequence"]: + for drug in row["drug_sequence"].split("|"): + if drug: + unique_drugs.add(drug) + + # Data freshness + cursor.execute(""" + SELECT completed_at + FROM pathway_refresh_log + WHERE status = 'completed' + ORDER BY completed_at DESC + LIMIT 1 + """) + refresh_row = cursor.fetchone() + last_updated = ( + refresh_row["completed_at"] if refresh_row and refresh_row["completed_at"] else "" + ) + + return { + "nodes": nodes, + "unique_patients": root_patients, + "total_drugs": len(unique_drugs), + "total_cost": root_cost, + "last_updated": last_updated, + } + except sqlite3.Error as e: + return _empty_result(f"Database error: {e}") + finally: + conn.close() + + +def _empty_result(error: str = "") -> dict: + return { + "nodes": [], + "unique_patients": 0, + "total_drugs": 0, + "total_cost": 0.0, + "last_updated": "", + "error": error, + }