feat: add pathway data architecture schema (Task 1.1)

Add three new tables to support pre-computed pathway data:
- pathway_date_filters: 6 pre-defined date filter combinations
- pathway_nodes: pre-computed pathway hierarchy with all visualization data
- pathway_refresh_log: tracks data refresh status

Includes:
- 8 indexes for efficient filtering by date_filter_id, trust, directory, drug
- Helper functions: create/drop/verify/get_counts for pathway tables
- clear_pathway_nodes() for selective or full data clearing
- get_pathway_refresh_status() for checking last refresh
- Integration with existing ALL_TABLES_SCHEMA and combined helpers
This commit is contained in:
Andrew Charlwood
2026-02-04 23:17:27 +00:00
parent 85b3c20341
commit 34396fef5e
2 changed files with 436 additions and 170 deletions
+299 -1
View File
@@ -116,6 +116,148 @@ CREATE INDEX IF NOT EXISTS idx_ref_drug_indication_clusters_indication ON ref_dr
"""
# =============================================================================
# Pathway Data Architecture Schemas
# =============================================================================
PATHWAY_DATE_FILTERS_SCHEMA = """
-- Stores the 6 pre-computed date filter combinations
-- Each combination represents a different initiated/last_seen date range
-- Used to efficiently query pre-computed pathway data
CREATE TABLE IF NOT EXISTS pathway_date_filters (
id TEXT PRIMARY KEY, -- e.g., 'all_6mo', '1yr_12mo'
initiated_label TEXT NOT NULL, -- e.g., 'All years', 'Last 1 year', 'Last 2 years'
last_seen_label TEXT NOT NULL, -- e.g., 'Last 6 months', 'Last 12 months'
initiated_years INTEGER, -- NULL for 'All', 1, or 2
last_seen_months INTEGER NOT NULL, -- 6 or 12
is_default INTEGER DEFAULT 0, -- 1 for 'all_6mo' (default selection)
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
);
-- Pre-populate the 6 combinations
INSERT OR REPLACE INTO pathway_date_filters (id, initiated_label, last_seen_label, initiated_years, last_seen_months, is_default) VALUES
('all_6mo', 'All years', 'Last 6 months', NULL, 6, 1),
('all_12mo', 'All years', 'Last 12 months', NULL, 12, 0),
('1yr_6mo', 'Last 1 year', 'Last 6 months', 1, 6, 0),
('1yr_12mo', 'Last 1 year', 'Last 12 months', 1, 12, 0),
('2yr_6mo', 'Last 2 years', 'Last 6 months', 2, 6, 0),
('2yr_12mo', 'Last 2 years', 'Last 12 months', 2, 12, 0);
"""
PATHWAY_NODES_SCHEMA = """
-- Main pathway nodes table (one set per date filter combination)
-- Stores pre-computed pathway hierarchy with all visualization data
-- Designed for fast filtering by date_filter_id + trust/directory/drug
CREATE TABLE IF NOT EXISTS pathway_nodes (
id INTEGER PRIMARY KEY AUTOINCREMENT,
-- Date filter combination this belongs to
date_filter_id TEXT NOT NULL,
-- Hierarchy structure (for icicle chart)
parents TEXT NOT NULL, -- Parent node identifier
ids TEXT NOT NULL, -- Unique node identifier (hierarchical path)
labels TEXT NOT NULL, -- Display label
level INTEGER NOT NULL, -- Hierarchy depth (0=root, 1=trust, 2=directory, 3+=drugs)
-- Patient counts (accurate for this date filter combination)
value INTEGER NOT NULL DEFAULT 0, -- Patient count
-- Cost metrics
cost REAL NOT NULL DEFAULT 0.0, -- Total cost
costpp REAL, -- Cost per patient
cost_pp_pa TEXT, -- Cost per patient per annum (formatted string)
-- Visualization
colour REAL NOT NULL DEFAULT 0.0, -- Color value (proportion of parent)
-- Date ranges (for this node)
first_seen TEXT, -- First intervention date (ISO format)
last_seen TEXT, -- Last intervention date (ISO format)
first_seen_parent TEXT, -- Earliest date in parent group
last_seen_parent TEXT, -- Latest date in parent group
-- Treatment statistics
average_spacing TEXT, -- Formatted treatment duration string
average_administered TEXT, -- JSON array of average doses per drug
avg_days REAL, -- Average treatment duration in days
-- Denormalized filter columns (for efficient WHERE clause filtering)
trust_name TEXT, -- Extracted trust name from ids
directory TEXT, -- Extracted directory from ids
drug_sequence TEXT, -- Pipe-separated drug sequence from pathway
-- Metadata
created_at TEXT DEFAULT CURRENT_TIMESTAMP,
data_refresh_id TEXT, -- Links to pathway_refresh_log
-- Unique per date filter + pathway
UNIQUE(date_filter_id, ids),
FOREIGN KEY (date_filter_id) REFERENCES pathway_date_filters(id)
);
-- Indexes for efficient filtering
-- Primary filter: select by date_filter_id
CREATE INDEX IF NOT EXISTS idx_pathway_nodes_date_filter ON pathway_nodes(date_filter_id);
-- Level filter: often used with date_filter_id
CREATE INDEX IF NOT EXISTS idx_pathway_nodes_level ON pathway_nodes(date_filter_id, level);
-- Trust filter: for Trust dropdown filtering
CREATE INDEX IF NOT EXISTS idx_pathway_nodes_trust ON pathway_nodes(date_filter_id, trust_name);
-- Directory filter: for Directory dropdown filtering
CREATE INDEX IF NOT EXISTS idx_pathway_nodes_directory ON pathway_nodes(date_filter_id, directory);
-- Drug sequence filter: for drug filtering (uses LIKE '%DRUG%')
CREATE INDEX IF NOT EXISTS idx_pathway_nodes_drug_seq ON pathway_nodes(drug_sequence);
-- Parents filter: for finding children of a node
CREATE INDEX IF NOT EXISTS idx_pathway_nodes_parents ON pathway_nodes(date_filter_id, parents);
-- Composite index for common filter combination
CREATE INDEX IF NOT EXISTS idx_pathway_nodes_filter_composite
ON pathway_nodes(date_filter_id, trust_name, directory);
"""
PATHWAY_REFRESH_LOG_SCHEMA = """
-- Metadata table for tracking refresh status
-- Tracks when pathway data was last refreshed from Snowflake
CREATE TABLE IF NOT EXISTS pathway_refresh_log (
id INTEGER PRIMARY KEY AUTOINCREMENT,
refresh_id TEXT NOT NULL, -- Unique identifier for this refresh run
started_at TEXT NOT NULL, -- ISO timestamp when refresh started
completed_at TEXT, -- ISO timestamp when refresh completed (NULL if still running)
status TEXT DEFAULT 'running', -- 'running', 'completed', 'failed'
record_count INTEGER, -- Total pathway_nodes records created
date_filter_counts TEXT, -- JSON: {"all_6mo": 1234, "all_12mo": 1567, ...}
error_message TEXT, -- Error details if status='failed'
snowflake_query_date_from TEXT, -- Start date of Snowflake query
snowflake_query_date_to TEXT, -- End date of Snowflake query
processing_duration_seconds REAL, -- How long the refresh took
created_at TEXT DEFAULT CURRENT_TIMESTAMP
);
-- Index for finding latest refresh
CREATE INDEX IF NOT EXISTS idx_pathway_refresh_log_started ON pathway_refresh_log(started_at DESC);
-- Index for finding by status
CREATE INDEX IF NOT EXISTS idx_pathway_refresh_log_status ON pathway_refresh_log(status);
"""
# Combined pathway schema
PATHWAY_TABLES_SCHEMA = f"""
-- Pathway Data Architecture Tables
-- Pre-computed pathway data for fast Reflex filtering
{PATHWAY_DATE_FILTERS_SCHEMA}
{PATHWAY_NODES_SCHEMA}
{PATHWAY_REFRESH_LOG_SCHEMA}
"""
# =============================================================================
# Fact Table Schemas
# =============================================================================
@@ -346,7 +488,7 @@ FACT_TABLES_SCHEMA = f"""
ALL_TABLES_SCHEMA = f"""
-- Complete Database Schema
-- Reference tables + Fact tables + Materialized views + File tracking
-- Reference tables + Fact tables + Materialized views + File tracking + Pathway tables
{REFERENCE_TABLES_SCHEMA}
@@ -355,6 +497,8 @@ ALL_TABLES_SCHEMA = f"""
{MATERIALIZED_VIEWS_SCHEMA}
{FILE_TRACKING_SCHEMA}
{PATHWAY_TABLES_SCHEMA}
"""
@@ -598,6 +742,157 @@ def verify_file_tracking_tables_exist(conn: sqlite3.Connection) -> list[str]:
return missing
# =============================================================================
# Pathway Table Helper Functions
# =============================================================================
def create_pathway_tables(conn: sqlite3.Connection) -> None:
"""
Create pathway data architecture tables in the database.
Creates:
- pathway_date_filters: 6 pre-defined date filter combinations
- pathway_nodes: Pre-computed pathway hierarchy data
- pathway_refresh_log: Refresh tracking metadata
Args:
conn: SQLite database connection.
"""
logger.info("Creating pathway tables...")
conn.executescript(PATHWAY_TABLES_SCHEMA)
logger.info("Pathway tables created successfully")
def drop_pathway_tables(conn: sqlite3.Connection) -> None:
"""
Drop pathway data architecture tables from the database.
Args:
conn: SQLite database connection.
Warning:
This will delete all pre-computed pathway data.
"""
logger.warning("Dropping pathway tables...")
conn.executescript("""
DROP TABLE IF EXISTS pathway_nodes;
DROP TABLE IF EXISTS pathway_refresh_log;
DROP TABLE IF EXISTS pathway_date_filters;
""")
logger.info("Pathway tables dropped")
def get_pathway_table_counts(conn: sqlite3.Connection) -> dict[str, int]:
"""
Get row counts for pathway tables.
Args:
conn: SQLite database connection.
Returns:
Dictionary mapping table name to row count.
"""
tables = ["pathway_date_filters", "pathway_nodes", "pathway_refresh_log"]
counts = {}
for table in tables:
try:
cursor = conn.execute(f"SELECT COUNT(*) FROM {table}")
result = cursor.fetchone()
counts[table] = result[0] if result else 0
except sqlite3.OperationalError:
# Table doesn't exist yet
counts[table] = 0
return counts
def verify_pathway_tables_exist(conn: sqlite3.Connection) -> list[str]:
"""
Verify that pathway tables exist.
Args:
conn: SQLite database connection.
Returns:
List of missing table names. Empty list means all tables exist.
"""
required_tables = ["pathway_date_filters", "pathway_nodes", "pathway_refresh_log"]
missing = []
for table in required_tables:
cursor = conn.execute(
"SELECT name FROM sqlite_master WHERE type='table' AND name=?",
(table,)
)
if cursor.fetchone() is None:
missing.append(table)
return missing
def clear_pathway_nodes(conn: sqlite3.Connection, date_filter_id: str | None = None) -> int:
"""
Clear pathway nodes, optionally for a specific date filter.
Args:
conn: SQLite database connection.
date_filter_id: If provided, only clear nodes for this date filter.
If None, clear all pathway nodes.
Returns:
Number of rows deleted.
"""
if date_filter_id:
cursor = conn.execute(
"DELETE FROM pathway_nodes WHERE date_filter_id = ?",
(date_filter_id,)
)
else:
cursor = conn.execute("DELETE FROM pathway_nodes")
deleted_count = cursor.rowcount
conn.commit()
logger.info(f"Cleared {deleted_count} pathway nodes")
return deleted_count
def get_pathway_refresh_status(conn: sqlite3.Connection) -> dict | None:
"""
Get the status of the most recent pathway refresh.
Args:
conn: SQLite database connection.
Returns:
Dictionary with refresh status, or None if no refresh has been done.
"""
try:
cursor = conn.execute("""
SELECT refresh_id, started_at, completed_at, status, record_count,
date_filter_counts, error_message, processing_duration_seconds
FROM pathway_refresh_log
ORDER BY started_at DESC
LIMIT 1
""")
row = cursor.fetchone()
if row:
return {
"refresh_id": row[0],
"started_at": row[1],
"completed_at": row[2],
"status": row[3],
"record_count": row[4],
"date_filter_counts": row[5],
"error_message": row[6],
"processing_duration_seconds": row[7],
}
return None
except sqlite3.OperationalError:
# Table doesn't exist yet
return None
# =============================================================================
# Combined Helper Functions
# =============================================================================
@@ -625,6 +920,7 @@ def drop_all_tables(conn: sqlite3.Connection) -> None:
This will delete all data. Use with extreme caution.
"""
logger.warning("Dropping all tables...")
drop_pathway_tables(conn)
drop_file_tracking_tables(conn)
drop_fact_tables(conn)
drop_reference_tables(conn)
@@ -645,6 +941,7 @@ def get_all_table_counts(conn: sqlite3.Connection) -> dict[str, int]:
counts.update(get_reference_table_counts(conn))
counts.update(get_fact_table_counts(conn))
counts.update(get_file_tracking_counts(conn))
counts.update(get_pathway_table_counts(conn))
return counts
@@ -662,4 +959,5 @@ def verify_all_tables_exist(conn: sqlite3.Connection) -> list[str]:
missing.extend(verify_reference_tables_exist(conn))
missing.extend(verify_fact_tables_exist(conn))
missing.extend(verify_file_tracking_tables_exist(conn))
missing.extend(verify_pathway_tables_exist(conn))
return missing