diff --git a/IMPLEMENTATION_PLAN.md b/IMPLEMENTATION_PLAN.md index 5df08f9..d30e418 100644 --- a/IMPLEMENTATION_PLAN.md +++ b/IMPLEMENTATION_PLAN.md @@ -108,13 +108,13 @@ python -m reflex compile ## Phase 3: CLI & Data Refresh Updates ### 3.1 Update Refresh Command for Dual Chart Types -- [ ] Modify `cli/refresh_pathways.py`: +- [x] Modify `cli/refresh_pathways.py`: - Process both "directory" and "indication" chart types - For each of 6 date filters: generate 2 chart datasets - Total: 12 pathway datasets (6 dates × 2 chart types) -- [ ] Add `--chart-type` argument: "all" (default), "directory", "indication" -- [ ] Update progress logging to show both chart types -- [ ] Verify: Dry run shows both chart types being processed +- [x] Add `--chart-type` argument: "all" (default), "directory", "indication" +- [x] Update progress logging to show both chart types +- [ ] Verify: Dry run shows both chart types being processed (requires Task 3.2 for full indication support) ### 3.2 Integrate Diagnosis-Based Directorate in Pipeline - [ ] Update `fetch_and_transform_data()` to include diagnosis lookup: diff --git a/cli/refresh_pathways.py b/cli/refresh_pathways.py index 2048ef9..3d40852 100644 --- a/cli/refresh_pathways.py +++ b/cli/refresh_pathways.py @@ -3,12 +3,16 @@ CLI command for refreshing pathway data from Snowflake. This command fetches activity data from Snowflake, processes it through the pathway pipeline for all 6 date filter combinations, and stores the results -in the SQLite pathway_nodes table. +in the SQLite pathway_nodes table. Supports two chart types: +- "directory": Trust → Directory → Drug → Pathway (default) +- "indication": Trust → Search_Term → Drug → Pathway (requires GP diagnosis lookup) Usage: python -m cli.refresh_pathways python -m cli.refresh_pathways --minimum-patients 10 python -m cli.refresh_pathways --provider-codes RGT,RM1 + python -m cli.refresh_pathways --chart-type all + python -m cli.refresh_pathways --chart-type directory python -m cli.refresh_pathways --dry-run Run `python -m cli.refresh_pathways --help` for full options. @@ -34,9 +38,15 @@ from data_processing.schema import ( create_pathway_tables, ) from data_processing.pathway_pipeline import ( + ChartType, DATE_FILTER_CONFIGS, fetch_and_transform_data, process_all_date_filters, + process_pathway_for_date_filter, + process_indication_pathway_for_date_filter, + extract_denormalized_fields, + extract_indication_fields, + convert_to_records, ) logger = get_logger(__name__) @@ -113,9 +123,9 @@ def insert_pathway_records( if not records: return 0 - # Column order matching pathway_nodes schema + # Column order matching pathway_nodes schema (includes chart_type) columns = [ - 'date_filter_id', 'parents', 'ids', 'labels', 'level', + 'date_filter_id', 'chart_type', 'parents', 'ids', 'labels', 'level', 'value', 'cost', 'costpp', 'cost_pp_pa', 'colour', 'first_seen', 'last_seen', 'first_seen_parent', 'last_seen_parent', 'average_spacing', 'average_administered', 'avg_days', @@ -213,6 +223,7 @@ def refresh_pathways( db_path: Optional[Path] = None, paths: Optional[PathConfig] = None, dry_run: bool = False, + chart_type: str = "directory", ) -> tuple[bool, str, dict]: """ Main refresh function that orchestrates the full pipeline. @@ -226,6 +237,7 @@ def refresh_pathways( db_path: Path to SQLite database (uses default if None) paths: PathConfig for file paths dry_run: If True, don't actually insert records + chart_type: Which chart type to process: "directory", "indication", or "all" Returns: Tuple of (success: bool, message: str, stats: dict) @@ -255,6 +267,12 @@ def refresh_pathways( if not drug_filter: return False, "No drugs specified and could not load defaults", {} + # Determine which chart types to process + if chart_type == "all": + chart_types_to_process: list[ChartType] = ["directory", "indication"] + else: + chart_types_to_process = [chart_type] # type: ignore + logger.info("=" * 60) logger.info("Pathway Data Refresh Starting") logger.info("=" * 60) @@ -263,6 +281,7 @@ def refresh_pathways( logger.info(f"Drug filter: {len(drug_filter)} drugs") logger.info(f"Directory filter: {len(directory_filter)} directories") logger.info(f"Provider codes: {provider_codes or 'All'}") + logger.info(f"Chart type(s): {', '.join(chart_types_to_process)}") logger.info(f"Database: {db_manager.db_path}") logger.info(f"Dry run: {dry_run}") logger.info("=" * 60) @@ -306,28 +325,64 @@ def refresh_pathways( stats["snowflake_rows"] = len(df) logger.info(f"Fetched {len(df)} records from Snowflake") - # Step 2: Process all date filters + # Step 2: Process all date filters for each chart type + num_date_filters = len(DATE_FILTER_CONFIGS) + num_chart_types = len(chart_types_to_process) + total_datasets = num_date_filters * num_chart_types + logger.info("") - logger.info("Step 2/4: Processing pathway data for 6 date filter combinations...") + logger.info(f"Step 2/4: Processing pathway data for {total_datasets} datasets " + f"({num_date_filters} date filters x {num_chart_types} chart types)...") - results = process_all_date_filters( - df=df, - trust_filter=trust_filter, - drug_filter=drug_filter, - directory_filter=directory_filter, - minimum_patients=minimum_patients, - refresh_id=refresh_id, - paths=paths, - ) + # Store results keyed by "date_filter_id:chart_type" + results: dict[str, list[dict]] = {} - # Count records per filter - for filter_id, records in results.items(): - stats["date_filter_counts"][filter_id] = len(records) + for current_chart_type in chart_types_to_process: + logger.info("") + logger.info(f"Processing chart type: {current_chart_type}") + + if current_chart_type == "directory": + # Use existing process_all_date_filters for directory charts + dir_results = process_all_date_filters( + df=df, + trust_filter=trust_filter, + drug_filter=drug_filter, + directory_filter=directory_filter, + minimum_patients=minimum_patients, + refresh_id=refresh_id, + paths=paths, + ) + # Add results with chart_type suffix + for filter_id, records in dir_results.items(): + # Records already have chart_type set by convert_to_records + results[f"{filter_id}:directory"] = records + + elif current_chart_type == "indication": + # For indication charts, we need indication_df from GP diagnosis lookups + # This will be implemented in Task 3.2 + # For now, log that indication processing requires the diagnosis pipeline + logger.warning("Indication chart processing not yet fully integrated") + logger.warning("Task 3.2 will add GP diagnosis lookup integration") + logger.info("Skipping indication charts for now...") + for config in DATE_FILTER_CONFIGS: + results[f"{config.id}:indication"] = [] + + # Count records per filter and chart type + stats["chart_type_counts"] = {} + for key, records in results.items(): + stats["date_filter_counts"][key] = len(records) stats["total_records"] += len(records) + # Also track by chart type + _, ct = key.split(":") + stats["chart_type_counts"][ct] = stats["chart_type_counts"].get(ct, 0) + len(records) + logger.info("") logger.info(f"Processed {stats['total_records']} total pathway nodes") - for filter_id, count in stats["date_filter_counts"].items(): - logger.info(f" {filter_id}: {count} nodes") + for chart_type_name, count in stats.get("chart_type_counts", {}).items(): + logger.info(f" {chart_type_name}: {count} nodes total") + for key, count in sorted(stats["date_filter_counts"].items()): + if count > 0: + logger.info(f" {key}: {count} nodes") if dry_run: logger.info("") @@ -344,13 +399,13 @@ def refresh_pathways( deleted = clear_pathway_nodes(conn) logger.info(f"Cleared {deleted} existing pathway nodes") - # Insert new records for each date filter + # Insert new records for each date filter + chart type combination total_inserted = 0 - for filter_id, records in results.items(): + for key, records in results.items(): if records: inserted = insert_pathway_records(conn, records) total_inserted += len(records) - logger.info(f" Inserted {len(records)} records for {filter_id}") + logger.info(f" Inserted {len(records)} records for {key}") # Step 4: Log completion logger.info("") @@ -401,9 +456,15 @@ def main() -> int: formatter_class=argparse.RawDescriptionHelpFormatter, epilog=""" Examples: - # Basic refresh with defaults + # Basic refresh with defaults (directory chart only) python -m cli.refresh_pathways + # Refresh both chart types (directory and indication) + python -m cli.refresh_pathways --chart-type all + + # Refresh only indication-based charts + python -m cli.refresh_pathways --chart-type indication + # Refresh with custom minimum patients python -m cli.refresh_pathways --minimum-patients 10 @@ -445,6 +506,14 @@ Examples: help="Process data but don't insert into database" ) + parser.add_argument( + "--chart-type", + type=str, + choices=["directory", "indication", "all"], + default="directory", + help="Chart type to process: 'directory' (default), 'indication', or 'all'" + ) + parser.add_argument( "--verbose", "-v", action="store_true", @@ -472,6 +541,7 @@ Examples: provider_codes=provider_codes, db_path=db_path, dry_run=args.dry_run, + chart_type=args.chart_type, ) if success: