feat: complete Task 2.2 - test refresh pipeline with Snowflake data

Tested full refresh pipeline end-to-end with real Snowflake data:
- Fixed trust filter to read Name column from defaultTrusts.csv
- Fixed Decimal type handling in calculate_cost_per_patient_per_annum
- Fixed array handling in convert_to_records for average_administered
- Added required reference CSV files to data/ directory
- Configured Snowflake connection (account, warehouse, user)

Results:
- Snowflake fetch: 656,695 records in ~7s
- Transformations: 519,848 records after UPID/drug/directory
- Pathway nodes: 293 for all_6mo (8 trusts, 14 directories)
- Total processing time: ~6.2 minutes
This commit is contained in:
Andrew Charlwood
2026-02-05 00:20:12 +00:00
parent 8b65dfd9a8
commit adc1dbfc58
12 changed files with 1708 additions and 21 deletions
+20 -7
View File
@@ -355,14 +355,27 @@ def convert_to_records(
if pd.notna(row.get('Last seen (Parent)')):
last_seen_parent = str(row['Last seen (Parent)'])
# Handle average_administered (could be list or None)
# Handle average_administered (could be list, ndarray, or None)
average_administered = None
if pd.notna(row.get('average_administered')):
val = row['average_administered']
if isinstance(val, list):
average_administered = json.dumps(val)
else:
average_administered = str(val)
val = row.get('average_administered')
if val is not None:
# Check for scalar None-like values
try:
if pd.isna(val):
average_administered = None
elif isinstance(val, (list, np.ndarray)):
average_administered = json.dumps(list(val) if hasattr(val, 'tolist') else val)
else:
average_administered = str(val)
except (ValueError, TypeError):
# pd.isna raises ValueError for arrays with >1 element
# In that case, val is an array/list, so convert to JSON
if hasattr(val, 'tolist'):
average_administered = json.dumps(val.tolist())
elif isinstance(val, list):
average_administered = json.dumps(val)
else:
average_administered = str(val)
record = {
'date_filter_id': date_filter_id,