Source: Delivery (Client-facing CSV)¶
Overview¶
The delivery subsystem is the final client-facing layer of the commodity hindcast pipeline. It consumes wide rolling-prediction DataFrames produced by the walk-forward loop, aggregates them to one of three ADM levels (ADM0 national / ADM1 state / ADM2 county), converts internal kg/ha values to delivery units (bu/ac for grains, lbs/ac for cotton), attaches conformal-interval bands and benchmark columns, validates every row through a strict Pydantic schema (DeliveryRow), and writes the result as a QUBE-format CSV. A second export path (export.py) ingests the already-written CSVs, joins them to geometry.parquet for warehouse UUID resolution, converts back to kg/ha for the model-ingestion table, and hands off to the shared model_output_export_util pipeline.
All unit conversion at the delivery boundary is performed exclusively here (DESIGN.md line 116); kg/ha columns never appear in the output CSVs.
Modules¶
schemas.py (284 lines)¶
Purpose: Defines the row-level and document-level Pydantic models that act as the typed contract for every value written to a delivery CSV.
Public classes:
-
DeliveryRow(schemas.py:109) — frozen Pydantic model for a single CSV row.extra="forbid"is load-bearing: if acfg.reference_dataspec produces anf"{spec.name}_in_season"column whose name does not match any declared field, Pydantic raisesValidationErrorimmediately rather than silently discarding the benchmark column. -
HindcastDelivery(schemas.py:227) — frozen container holdingrows: list[DeliveryRow]plusgenerated_date: str. Validates structural integrity across all rows.
Public function:
build_delivery_column_order(ci_columns)(schemas.py:58) — returns atuple[str, ...]giving the canonical CSV column sequence:_DELIVERY_PREFIX+ CI columns +_DELIVERY_SUFFIX. Defaults to the full CI superset (50 / 68 / 80 / 90 / 95 percentiles). Called byrun_deliver.pywhen writing the final CSV.
DeliveryRow field inventory:
| Field | Type | Notes |
|---|---|---|
commodity |
str |
upper-cased commodity name |
year |
int |
harvest season year |
init_date |
str |
ISO-8601 date string (YYYY-MM-DD) |
geo_identifier |
str |
default "adm0:usa" |
variable |
str |
default "yield_bu_acre" |
model |
str |
default "commodity_hindcast" |
mean |
float |
area-weighted predicted yield in delivery units |
weather_correction_bu_ac |
float |
detrended yield signal (always in bu/ac; not converted by export) |
nass_actual |
float \| None |
area-weighted observed NASS yield from predicted counties |
nass_actual_area_weighted_all |
float \| None |
full-universe NASS survey yield (ADM0 only) |
nass_actual_prod_div_area_all |
float \| None |
full-universe production ÷ area (ADM0 only) |
wasde_in_season |
float \| None |
as-of-init_date WASDE estimate |
conab_final_in_season |
float \| None |
as-of-init_date CONAB final estimate |
conab_lev_in_season |
float \| None |
as-of-init_date CONAB levantamento estimate |
lower_95 … upper_95 |
float \| None |
conformal CI bands at 50/68/80/90/95 % levels |
Validators (non-obvious invariants):
-
_validate_ci_ordering(schemas.py:176) — enforceslower_95 ≤ lower_90 ≤ … ≤ lower_50 ≤ mean ≤ upper_50 ≤ … ≤ upper_95for every present (non-None) band. Absent bands are skipped, so partial CI subsets still pass. Violation raisesValueErrorat row construction time — the CI band ordering invariant cannot be violated silently. -
_validate_init_date_year(schemas.py:197) — requiresinit_datecalendar year to lie within[year - LONG_RANGE_HORIZON_YEARS, year + 1]whereLONG_RANGE_HORIZON_YEARS = 10(schemas.py:106). A forecast issued one calendar year after the season it predicts is rejected; forecasts up to 10 years ahead (long-range stubs) are accepted. -
_validate_no_duplicate_keys(schemas.py:248) — rejects anyHindcastDeliverywith repeated(year, init_date, geo_identifier)tuples. -
_validate_fold_consistency(schemas.py:260) — asserts all(year, geo_identifier)groups carry the same count ofinit_dateentries; a mismatch indicates a truncated fold.
conversions.py (419 lines)¶
Purpose: Converts wide rolling-prediction DataFrames (one row per county × year × init_date, values in kg/ha) into lists of DeliveryRow objects at the requested ADM level, performing area-weighted aggregation, unit conversion, benchmark attachment, and CI half-width computation.
Public functions:
-
walk_forward_preds_to_delivery_rows(result, *, level, ci_levels, mode, walk_forward_preds)(conversions.py:182) — top-level entry point. Acceptsmode="hindcast"(reads CV folds) ormode="forecast"(reads the production fold or an explicit DataFrame). Returnslist[DeliveryRow]. -
delivery_to_dataframe(delivery, *, column_order)(conversions.py:406) — serialises aHindcastDeliveryto a Polars DataFrame in the canonical column order returned bybuild_delivery_column_order. -
compute_hindcast_preds(result)(conversions.py:117) — loads and concatenates all non-production-foldwalk_forward_predsparquets; fills missingarea_harvested_havia 3-year trailing median from the full NASS panel usingimpute_missing_area. ReturnsNonewhen no non-production folds exist. -
compute_forecast_preds(result, *, walk_forward_preds)(conversions.py:162) — returns forecast-mode rolling predictions from either the supplied DataFrame or the production slice's path. ReturnsNonewhen neither source has rows.
Private helpers:
-
_aggregate_to_level(df, value_cols, *, level, group_cols, area_col)(conversions.py:55) — wrapslib/geo/aggregation.aggregate_weighted_frame; ADM2 mode skips aggregation and returns a per-county slice directly. -
_build_hw_records_from_calibration(result, agg, *, ci_levels, bushel_weight_lbs)(conversions.py:73) — callsprimary_calibration(result, ci_levels)(imported fromstages/run_meta_models.py) to retrieve the savedCalibrationResult, then iterates each unique(year, init_date)inaggto obtain conformal half-widths viaCalibrationResult.predict_interval. Half-widths are returned inbu/ac; the_hw_{pct}columns are subsequently re-centred on each row's mean to producelower_{pct}/upper_{pct}bands.
ADM aggregation rules:
ADM2: no spatial aggregation; each county row is passed through directly (conversions.py:225).ADM1:aggregate_weighted_framecollapses county rows to state level usingarea_harvested_haas the weight column; group keys are["year", "init_date"](conversions.py:231).ADM0: same as ADM1 but also joins two additional NASS full-universe benchmark series (nass_actual_area_weighted_all,nass_actual_prod_div_area_all) which are only meaningful at national level (conversions.py:252).
Unit handling:
All internal sim_yield_kg_ha and reference values arrive in kg/ha. Conversion to delivery units is a single multiplicative scale:
This calls lib/unit_utils.kg_ha_to_bu_acre (unit_utils.py:33):
with HA_PER_ACRE = 0.404686, KG_PER_LB = 0.453592 (unit_utils.py:24–25). For cotton, bushel_weight_lbs = 1 effectively yields lbs/ac. The weather_correction_bu_ac column (detrended signal, sim_yield_kg_ha_detrended) is also converted via the same scale and written in bu/ac permanently — it is explicitly excluded from the reverse conversion in export.py (export.py:56–72).
After conversion, all delivery-unit columns are clamped to [lo, hi] from CommodityConfig.yield_range via lib/unit_utils.clip_yield_to_delivery_range (unit_utils.py:93); out-of-range rows emit logger.warning before clipping.
export.py (407 lines)¶
Purpose: Reads the already-written hindcast/forecast delivery CSVs, converts them to kg/ha for warehouse ingest, joins geo_identifier to geometry.parquet to obtain warehouse geo_id UUIDs, and delegates to the shared model_output_export_util pipeline.
Public functions:
-
delivery_csv_to_export_dataframe(csv_path, *, run_dir, bushel_weight_lbs, geo_map)(export.py:137) — transforms one QUBE delivery CSV to a wide kg/ha export DataFrame without writing to disk. Joins via normalised geo keys; raisesValueErrorif anygeo_identifierfails resolution. -
delivery_csv_to_hindcast_export_parquet(csv_path, *, run_dir, bushel_weight_lbs, out_path)(export.py:198) — callsdelivery_csv_to_export_dataframethen writes to a staging parquet atout_path. -
forecast_delivery_csvs_to_export_parquet(csv_paths, *, run_dir, bushel_weight_lbs, out_path)(export.py:219) — concatenates multiple forecast delivery CSVs (typically one per ADM level: ADM0 + ADM1 + ADM2) into one staging parquet. -
find_commodity_hindcast_files(output_dir, config)(export.py:249) — builds per-source staging parquet paths keyed by"hindcast"and/or"forecast". For hindcast, picks the most-recently-modifiedTreefera_*_Hindcast_*.csvunderrun_dir/delivery/. For forecast, picks all CSVs under the lexicographically latestinit_datedirectory across all season years. -
run_export_pipeline(output_dir, config, pipeline_run_id, model_output_path)(export.py:329) — delegates totreefera_market_insights.model_output_export_util.pipeline.run_export_pipelinewithfile_finder=find_commodity_hindcast_files. -
run_export_if_configured(*, run_root, export, export_config)(export.py:349) — validatesPIPELINE_RUN_IDandMODEL_INGESTION_PATHenv vars, loadsModelConfig.from_yaml, then callsrun_export_pipeline. Early-exits whenexport=False. -
run_export(*, run_root, export, export_config)(export.py:378) — preflight-gated wrapper aroundrun_export_if_configured; loadsExperimentResultand callsrun_preflight(preflight_paths_for_export(config))before delegating.
S3 path handling:
All paths are threaded as cloudpathlib.AnyPath (export.py:29). _resolve_geometry_parquet_path (export.py:98) probes candidates in priority order: config_resolved.yaml → data_root/geometry.parquet, then INPUT_DATA_DIR/geometry.parquet, then run_dir/geometry.parquet. This follows the S3-safe AnyPath pattern: .as_posix() is never called on cloud paths; existence is checked via p.exists(). Staging files are written to tempfile.NamedTemporaryFile paths (local), never under data_root directly.
Unit inversion in export:
The columns in _BU_AC_TO_KG_HA_COLS (export.py:56) are converted back to kg/ha via bu_acre_to_kg_ha (unit_utils.py:38) using the bushel_weight_lbs read from config_resolved.yaml. weather_correction_bu_ac is deliberately absent from this list; it remains in bu/ac in the warehouse table (export.py:56, comment).
geo_normalise.py (30 lines)¶
Purpose: Provides the canonical key normalisation for the delivery-side of the geometry.parquet join.
Public function:
normalize_geo_identifier_for_commodity_export_join(ident)(geo_normalise.py:21) — strips, ASCII-folds viaexport_geometry_mapping.ascii_fold_to_ascii, then lower-cases the entire identifier. This aligns deliverygeo_identifierstrings (already lower-cased bywalk_forward_preds_to_delivery_rowsviastr.to_lowercase()) with the geometry map keys produced byload_identifier_geo_mapping.
Why it exists: The shared normalize_identifier_for_export_join in export_geometry_mapping preserves segment-prefix casing, leaving adm0:usa unexpandable to ADM0:USA. This module applies a simpler full-lowercase rule that is correct for the delivery context where both sides are already lower-cased.
__init__.py (7 lines)¶
Namespace-only; no symbol re-exports. Callers import directly from delivery.schemas or delivery.conversions.
Cross-references¶
- BOUNDED_CONTEXTS — Context 7 (Delivery) and Anti-corruption layer table.
lib/unit_utils.py—kg_ha_to_bu_acre,bu_acre_to_kg_ha,clip_yield_to_delivery_range.lib/geo/aggregation.py—aggregate_weighted_frame,AggregationLevel.stages/run_meta_models.py—primary_calibration(imported byconversions.py; layering caveat below).run/preflight.py—preflight_paths_for_export,run_preflight(imported byexport.py; layering caveat below).diagnostics/runners.py—yield_asof_array_from_releases(used to construct as-of in-season benchmarks).lib/reference_data/nass_benchmarks.py—nass_national_survey_yield_area_weighted_kg_ha,nass_national_prod_div_area_kg_ha.
Relationships¶
Consumes:
- walk_forward_preds.parquet (per fold, from Experiment & Modelling context)
- CalibrationResult sidecar (via primary_calibration, from Post-processing context)
- NASS and reference-data loaders (via lib/reference_data/)
- geometry.parquet (for geo_id UUID join in export path)
Produces:
- run_dir/delivery/Treefera_{commodity}_ADM{n}_Hindcast_{YYYYMMDD}.csv
- run_dir/forecast/{season_year}/{init_date}/delivery/Treefera_{commodity}_ADM{n}_Forecast_{YYYYMMDD}.csv
- Staging parquets (temp files) for model-ingestion export
Called by:
- stages/run_deliver.py — invokes walk_forward_preds_to_delivery_rows for each configured ADM level, assembles a HindcastDelivery, serialises via delivery_to_dataframe, and writes the CSV.
- stages/run_forecast.py — calls walk_forward_preds_to_delivery_rows with mode="forecast" and an explicit per-init walk_forward_preds DataFrame.
- cli.py run export — invokes run_export.
Layering caveats (tracked tech-debt)¶
Upward import — conformal helpers:
delivery/conversions.py:50 imports primary_calibration directly from stages/run_meta_models.py. This creates an upward edge from the Delivery bounded context into the Experiment orchestration layer (Stages sit above Delivery in the intended DAG). The correct home for primary_calibration and its helpers is lib/ (e.g. lib/conformal/). Tracked in BOUNDED_CONTEXTS.md Anti-corruption layer table and Open question 2.
Upward import — preflight helpers:
delivery/export.py:39 imports preflight_paths_for_export and run_preflight from run/preflight.py. The run/ package is an orchestration-level module; Delivery should not depend on it directly. Same recommended fix: migrate preflight_paths_for_export to lib/. Tracked in the same BOUNDED_CONTEXTS.md table.
Both violations are non-breaking at runtime but violate the single-direction import rule stated in DESIGN.md line 49 and create impediments to testing Delivery in isolation.