feat: add 5 trust-comparison query functions for Phase 10.6

New per-trust-within-directorate queries:
- get_trust_market_share: drugs by trust within a directorate
- get_trust_cost_waterfall: cost per patient by trust
- get_trust_dosing: drug dosing intervals by trust
- get_trust_heatmap: trust x drug matrix for one directorate
- get_trust_durations: drug durations by trust

Also verified existing get_pathway_costs(directory=X) works for
directorate-scoped Cost Effectiveness (no new function needed).

Thin wrappers added in dash_app/data/queries.py.
This commit is contained in:
Andrew Charlwood
2026-02-06 22:04:43 +00:00
parent e123599312
commit 9d4e32910d
3 changed files with 309 additions and 4 deletions
+252
View File
@@ -812,3 +812,255 @@ def get_treatment_durations(
return []
finally:
conn.close()
# ---------------------------------------------------------------------------
# Trust Comparison query functions (Phase 10)
#
# These functions provide per-trust breakdowns within a single directorate.
# Unlike Phase 9 functions which aggregate across trusts, these preserve
# per-trust detail for comparing drugs across trusts within a directorate.
# ---------------------------------------------------------------------------
def get_trust_market_share(
db_path: Path,
date_filter_id: str,
chart_type: str,
directory: str,
) -> list[dict]:
"""Drug market share broken down by trust within a single directorate.
Returns list of dicts: [{trust_name, drug, patients, proportion, cost, cost_pp_pa}]
Sorted by trust total patients desc, then drug patients desc within each trust.
"""
conn = sqlite3.connect(str(db_path))
conn.row_factory = sqlite3.Row
try:
query = """
SELECT labels AS drug, trust_name, value AS patients, cost, cost_pp_pa
FROM pathway_nodes
WHERE date_filter_id = ? AND chart_type = ? AND level = 3
AND directory = ?
ORDER BY trust_name, value DESC
"""
rows = conn.execute(query, (date_filter_id, chart_type, directory)).fetchall()
# Compute proportions within each trust
trust_totals = {}
for r in rows:
trust = r["trust_name"] or ""
trust_totals[trust] = trust_totals.get(trust, 0) + (r["patients"] or 0)
result = []
for r in rows:
trust = r["trust_name"] or ""
patients = r["patients"] or 0
cost = float(r["cost"]) if r["cost"] else 0.0
total = trust_totals.get(trust, 1)
result.append({
"trust_name": trust,
"drug": r["drug"],
"patients": patients,
"proportion": round(patients / total, 4) if total else 0,
"cost": round(cost, 2),
"cost_pp_pa": _safe_float(r["cost_pp_pa"]),
})
result.sort(key=lambda x: (-trust_totals.get(x["trust_name"], 0), -x["patients"]))
return result
except sqlite3.Error:
return []
finally:
conn.close()
def get_trust_cost_waterfall(
db_path: Path,
date_filter_id: str,
chart_type: str,
directory: str,
) -> list[dict]:
"""Cost per patient by trust within a single directorate.
Returns list of dicts: [{trust_name, patients, total_cost, cost_pp}]
Sorted by cost_pp desc.
"""
conn = sqlite3.connect(str(db_path))
conn.row_factory = sqlite3.Row
try:
query = """
SELECT trust_name, SUM(value) AS patients, SUM(cost) AS total_cost
FROM pathway_nodes
WHERE date_filter_id = ? AND chart_type = ? AND level = 3
AND directory = ?
GROUP BY trust_name
HAVING patients > 0
ORDER BY total_cost DESC
"""
rows = conn.execute(query, (date_filter_id, chart_type, directory)).fetchall()
result = []
for r in rows:
patients = r["patients"] or 0
total_cost = float(r["total_cost"]) if r["total_cost"] else 0.0
result.append({
"trust_name": r["trust_name"] or "",
"patients": patients,
"total_cost": round(total_cost, 2),
"cost_pp": round(total_cost / patients, 2) if patients else 0,
})
result.sort(key=lambda x: -x["cost_pp"])
return result
except sqlite3.Error:
return []
finally:
conn.close()
def get_trust_dosing(
db_path: Path,
date_filter_id: str,
chart_type: str,
directory: str,
) -> list[dict]:
"""Drug dosing intervals by trust within a single directorate.
Returns list of dicts:
[{drug, trust_name, weekly_interval, dose_count, total_weeks, patients}]
"""
from data_processing.parsing import parse_average_spacing
conn = sqlite3.connect(str(db_path))
conn.row_factory = sqlite3.Row
try:
query = """
SELECT labels AS drug, trust_name, value AS patients, average_spacing
FROM pathway_nodes
WHERE date_filter_id = ? AND chart_type = ? AND level = 3
AND directory = ?
AND average_spacing IS NOT NULL AND average_spacing != ''
ORDER BY labels, trust_name
"""
rows = conn.execute(query, (date_filter_id, chart_type, directory)).fetchall()
result = []
for r in rows:
parsed = parse_average_spacing(r["average_spacing"])
for entry in parsed:
result.append({
"drug": entry["drug_name"],
"trust_name": r["trust_name"] or "",
"weekly_interval": entry["weekly_interval"],
"dose_count": entry["dose_count"],
"total_weeks": entry["total_weeks"],
"patients": r["patients"] or 0,
})
return result
except sqlite3.Error:
return []
finally:
conn.close()
def get_trust_heatmap(
db_path: Path,
date_filter_id: str,
chart_type: str,
directory: str,
) -> dict:
"""Trust x drug matrix for a single directorate.
Returns dict with:
trusts: sorted list of trust names (rows)
drugs: sorted list of drug names (columns)
matrix: {trust_name: {drug: {patients, cost, cost_pp_pa}}}
"""
conn = sqlite3.connect(str(db_path))
conn.row_factory = sqlite3.Row
try:
query = """
SELECT labels AS drug, trust_name, value AS patients, cost, cost_pp_pa
FROM pathway_nodes
WHERE date_filter_id = ? AND chart_type = ? AND level = 3
AND directory = ?
ORDER BY trust_name, labels
"""
rows = conn.execute(query, (date_filter_id, chart_type, directory)).fetchall()
matrix = {}
trust_totals = {}
drug_totals = {}
for r in rows:
trust = r["trust_name"] or ""
drug = r["drug"] or ""
patients = r["patients"] or 0
cost = float(r["cost"]) if r["cost"] else 0.0
if trust not in matrix:
matrix[trust] = {}
matrix[trust][drug] = {
"patients": patients,
"cost": round(cost, 2),
"cost_pp_pa": _safe_float(r["cost_pp_pa"]),
}
trust_totals[trust] = trust_totals.get(trust, 0) + patients
drug_totals[drug] = drug_totals.get(drug, 0) + patients
trusts = sorted(trust_totals, key=lambda x: -trust_totals[x])
drugs = sorted(drug_totals, key=lambda x: -drug_totals[x])
return {"trusts": trusts, "drugs": drugs, "matrix": matrix}
except sqlite3.Error:
return {"trusts": [], "drugs": [], "matrix": {}}
finally:
conn.close()
def get_trust_durations(
db_path: Path,
date_filter_id: str,
chart_type: str,
directory: str,
) -> list[dict]:
"""Drug treatment durations by trust within a single directorate.
Returns list of dicts: [{drug, trust_name, avg_days, patients}]
Sorted by drug name, then trust. No aggregation — each row is one
trust+drug combination.
"""
conn = sqlite3.connect(str(db_path))
conn.row_factory = sqlite3.Row
try:
query = """
SELECT labels AS drug, trust_name, value AS patients, avg_days
FROM pathway_nodes
WHERE date_filter_id = ? AND chart_type = ? AND level = 3
AND directory = ?
AND avg_days IS NOT NULL
ORDER BY labels, trust_name
"""
rows = conn.execute(query, (date_filter_id, chart_type, directory)).fetchall()
result = []
for r in rows:
patients = r["patients"] or 0
days = _safe_float(r["avg_days"])
if patients == 0 or days == 0:
continue
result.append({
"drug": r["drug"],
"trust_name": r["trust_name"] or "",
"avg_days": round(days, 1),
"patients": patients,
})
return result
except sqlite3.Error:
return []
finally:
conn.close()