feat: add chart_type argument to refresh command (Task 3.1)

- Add --chart-type argument with choices: directory, indication, all
- Update insert_pathway_records to include chart_type column
- Update refresh_pathways to process multiple chart types
- Update logging to show chart type counts
- Indication chart processing deferred to Task 3.2 (GP diagnosis integration)
This commit is contained in:
Andrew Charlwood
2026-02-05 14:38:57 +00:00
parent 0d15000aa0
commit 593d14c70f
2 changed files with 97 additions and 27 deletions
+93 -23
View File
@@ -3,12 +3,16 @@ CLI command for refreshing pathway data from Snowflake.
This command fetches activity data from Snowflake, processes it through the
pathway pipeline for all 6 date filter combinations, and stores the results
in the SQLite pathway_nodes table.
in the SQLite pathway_nodes table. Supports two chart types:
- "directory": Trust → Directory → Drug → Pathway (default)
- "indication": Trust → Search_Term → Drug → Pathway (requires GP diagnosis lookup)
Usage:
python -m cli.refresh_pathways
python -m cli.refresh_pathways --minimum-patients 10
python -m cli.refresh_pathways --provider-codes RGT,RM1
python -m cli.refresh_pathways --chart-type all
python -m cli.refresh_pathways --chart-type directory
python -m cli.refresh_pathways --dry-run
Run `python -m cli.refresh_pathways --help` for full options.
@@ -34,9 +38,15 @@ from data_processing.schema import (
create_pathway_tables,
)
from data_processing.pathway_pipeline import (
ChartType,
DATE_FILTER_CONFIGS,
fetch_and_transform_data,
process_all_date_filters,
process_pathway_for_date_filter,
process_indication_pathway_for_date_filter,
extract_denormalized_fields,
extract_indication_fields,
convert_to_records,
)
logger = get_logger(__name__)
@@ -113,9 +123,9 @@ def insert_pathway_records(
if not records:
return 0
# Column order matching pathway_nodes schema
# Column order matching pathway_nodes schema (includes chart_type)
columns = [
'date_filter_id', 'parents', 'ids', 'labels', 'level',
'date_filter_id', 'chart_type', 'parents', 'ids', 'labels', 'level',
'value', 'cost', 'costpp', 'cost_pp_pa', 'colour',
'first_seen', 'last_seen', 'first_seen_parent', 'last_seen_parent',
'average_spacing', 'average_administered', 'avg_days',
@@ -213,6 +223,7 @@ def refresh_pathways(
db_path: Optional[Path] = None,
paths: Optional[PathConfig] = None,
dry_run: bool = False,
chart_type: str = "directory",
) -> tuple[bool, str, dict]:
"""
Main refresh function that orchestrates the full pipeline.
@@ -226,6 +237,7 @@ def refresh_pathways(
db_path: Path to SQLite database (uses default if None)
paths: PathConfig for file paths
dry_run: If True, don't actually insert records
chart_type: Which chart type to process: "directory", "indication", or "all"
Returns:
Tuple of (success: bool, message: str, stats: dict)
@@ -255,6 +267,12 @@ def refresh_pathways(
if not drug_filter:
return False, "No drugs specified and could not load defaults", {}
# Determine which chart types to process
if chart_type == "all":
chart_types_to_process: list[ChartType] = ["directory", "indication"]
else:
chart_types_to_process = [chart_type] # type: ignore
logger.info("=" * 60)
logger.info("Pathway Data Refresh Starting")
logger.info("=" * 60)
@@ -263,6 +281,7 @@ def refresh_pathways(
logger.info(f"Drug filter: {len(drug_filter)} drugs")
logger.info(f"Directory filter: {len(directory_filter)} directories")
logger.info(f"Provider codes: {provider_codes or 'All'}")
logger.info(f"Chart type(s): {', '.join(chart_types_to_process)}")
logger.info(f"Database: {db_manager.db_path}")
logger.info(f"Dry run: {dry_run}")
logger.info("=" * 60)
@@ -306,28 +325,64 @@ def refresh_pathways(
stats["snowflake_rows"] = len(df)
logger.info(f"Fetched {len(df)} records from Snowflake")
# Step 2: Process all date filters
# Step 2: Process all date filters for each chart type
num_date_filters = len(DATE_FILTER_CONFIGS)
num_chart_types = len(chart_types_to_process)
total_datasets = num_date_filters * num_chart_types
logger.info("")
logger.info("Step 2/4: Processing pathway data for 6 date filter combinations...")
logger.info(f"Step 2/4: Processing pathway data for {total_datasets} datasets "
f"({num_date_filters} date filters x {num_chart_types} chart types)...")
results = process_all_date_filters(
df=df,
trust_filter=trust_filter,
drug_filter=drug_filter,
directory_filter=directory_filter,
minimum_patients=minimum_patients,
refresh_id=refresh_id,
paths=paths,
)
# Store results keyed by "date_filter_id:chart_type"
results: dict[str, list[dict]] = {}
# Count records per filter
for filter_id, records in results.items():
stats["date_filter_counts"][filter_id] = len(records)
for current_chart_type in chart_types_to_process:
logger.info("")
logger.info(f"Processing chart type: {current_chart_type}")
if current_chart_type == "directory":
# Use existing process_all_date_filters for directory charts
dir_results = process_all_date_filters(
df=df,
trust_filter=trust_filter,
drug_filter=drug_filter,
directory_filter=directory_filter,
minimum_patients=minimum_patients,
refresh_id=refresh_id,
paths=paths,
)
# Add results with chart_type suffix
for filter_id, records in dir_results.items():
# Records already have chart_type set by convert_to_records
results[f"{filter_id}:directory"] = records
elif current_chart_type == "indication":
# For indication charts, we need indication_df from GP diagnosis lookups
# This will be implemented in Task 3.2
# For now, log that indication processing requires the diagnosis pipeline
logger.warning("Indication chart processing not yet fully integrated")
logger.warning("Task 3.2 will add GP diagnosis lookup integration")
logger.info("Skipping indication charts for now...")
for config in DATE_FILTER_CONFIGS:
results[f"{config.id}:indication"] = []
# Count records per filter and chart type
stats["chart_type_counts"] = {}
for key, records in results.items():
stats["date_filter_counts"][key] = len(records)
stats["total_records"] += len(records)
# Also track by chart type
_, ct = key.split(":")
stats["chart_type_counts"][ct] = stats["chart_type_counts"].get(ct, 0) + len(records)
logger.info("")
logger.info(f"Processed {stats['total_records']} total pathway nodes")
for filter_id, count in stats["date_filter_counts"].items():
logger.info(f" {filter_id}: {count} nodes")
for chart_type_name, count in stats.get("chart_type_counts", {}).items():
logger.info(f" {chart_type_name}: {count} nodes total")
for key, count in sorted(stats["date_filter_counts"].items()):
if count > 0:
logger.info(f" {key}: {count} nodes")
if dry_run:
logger.info("")
@@ -344,13 +399,13 @@ def refresh_pathways(
deleted = clear_pathway_nodes(conn)
logger.info(f"Cleared {deleted} existing pathway nodes")
# Insert new records for each date filter
# Insert new records for each date filter + chart type combination
total_inserted = 0
for filter_id, records in results.items():
for key, records in results.items():
if records:
inserted = insert_pathway_records(conn, records)
total_inserted += len(records)
logger.info(f" Inserted {len(records)} records for {filter_id}")
logger.info(f" Inserted {len(records)} records for {key}")
# Step 4: Log completion
logger.info("")
@@ -401,9 +456,15 @@ def main() -> int:
formatter_class=argparse.RawDescriptionHelpFormatter,
epilog="""
Examples:
# Basic refresh with defaults
# Basic refresh with defaults (directory chart only)
python -m cli.refresh_pathways
# Refresh both chart types (directory and indication)
python -m cli.refresh_pathways --chart-type all
# Refresh only indication-based charts
python -m cli.refresh_pathways --chart-type indication
# Refresh with custom minimum patients
python -m cli.refresh_pathways --minimum-patients 10
@@ -445,6 +506,14 @@ Examples:
help="Process data but don't insert into database"
)
parser.add_argument(
"--chart-type",
type=str,
choices=["directory", "indication", "all"],
default="directory",
help="Chart type to process: 'directory' (default), 'indication', or 'all'"
)
parser.add_argument(
"--verbose", "-v",
action="store_true",
@@ -472,6 +541,7 @@ Examples:
provider_codes=provider_codes,
db_path=db_path,
dry_run=args.dry_run,
chart_type=args.chart_type,
)
if success: