Skip to content

Pipeline: Deliver

Purpose

The DELIVER pipeline is the final stage of the hindcast run. It reads the per-fold walk_forward_preds.parquet files produced by the walk-forward CV loop, aggregates them to three ADM levels, converts internal kg/ha values to client delivery units (bu/ac for grains, lbs/ac for cotton), attaches conformal CI bands and benchmark columns, validates every row through a strict Pydantic schema, and writes three QUBE-format CSVs under run_dir/delivery/. Forecast delivery is a separate code path (run_forecast._deliver_forecast) and is not handled here.

Inputs

Artefact Path Role
postprocessed/national.parquet run_dir/postprocessed/ Guard check — must exist before delivering
Per-fold walk_forward_preds.parquet preds/{experiment_key}/{fold_label}/ Source of per-county predictions
conformal/{mode}.parquet run_dir/conformal/ CI half-width calibration sidecar
NASS county panel loaded internally Full-universe benchmark columns (ADM0 only)
config_resolved.yaml run_dir/ bushel_weight_lbs, ci_levels, delivery.* settings

Outputs

Artefact Path Notes
Treefera_{experiment_key}_ADM0_Hindcast_{YYYYMMDD}.csv run_dir/delivery/ National aggregation; includes nass_actual_area_weighted_all + nass_actual_prod_div_area_all
Treefera_{experiment_key}_ADM1_Hindcast_{YYYYMMDD}.csv run_dir/delivery/ State-level area-weighted aggregation
Treefera_{experiment_key}_ADM2_Hindcast_{YYYYMMDD}.csv run_dir/delivery/ County pass-through, no spatial aggregation

{YYYYMMDD} is the wall-clock date at pipeline execution time (run_deliver.py:63).

Step-by-step

1. Entry — deliver_experiment (run_deliver.py:40)

Called by run_hindcast.run() as step 11 (after EVALUATE). Also callable directly from the CLI for incremental re-runs.

Two guard checks (run_deliver.py:49–54): - result.has_postprocessed — raises FileNotFoundError if postprocessed/national.parquet absent. - result.has_walk_forward_preds — raises FileNotFoundError if no fold predictions present.

2. Column-order computation

build_delivery_column_order(ci_columns) (schemas.py:58) assembles the canonical CSV column sequence: _DELIVERY_PREFIX columns + CI columns (50/68/80/90/95 percentile pairs) + _DELIVERY_SUFFIX. This order is invariant across all three ADM levels.

3. Wide → long transformation per ADM level

For each level in ("ADM0", "ADM1", "ADM2") (run_deliver.py:69):

walk_forward_preds_to_delivery_rows(result, level, ci_levels, mode="hindcast") (conversions.py:182):

  1. compute_hindcast_preds(result) (conversions.py:117) — concatenates all non-production-fold walk_forward_preds.parquet files; fills missing area_harvested_ha via 3-year trailing median from the NASS panel.
  2. _aggregate_to_level(df, value_cols, level, ...) (conversions.py:55) — ADM2 passes through county rows directly; ADM1 and ADM0 call aggregate_weighted_frame with area_harvested_ha as the weight column. ADM0 additionally joins two full-universe NASS benchmark series (nass_actual_area_weighted_all, nass_actual_prod_div_area_all).
  3. Unit conversion (conversions.py:208):
scale = kg_ha_to_bu_acre(1.0, bushel_weight_lbs)  # lib/unit_utils.py:33

The formula (unit_utils.py:33): kg_ha × HA_PER_ACRE / (bushel_weight_lbs × KG_PER_LB) where HA_PER_ACRE = 0.404686, KG_PER_LB = 0.453592. For cotton, bushel_weight_lbs = 1, yielding lbs/ac. The weather_correction_bu_ac column (detrended signal, sim_yield_kg_ha_detrended) uses the same scale and is permanently written in bu/ac — it is explicitly excluded from the reverse conversion in export.py:56.

  1. Yield values are clamped to [lo, hi] from CommodityConfig.yield_range via clip_yield_to_delivery_range (unit_utils.py:93); out-of-range rows emit logger.warning before clipping.
  2. _build_hw_records_from_calibration (conversions.py:73) — calls primary_calibration(result, ci_levels) to load the saved CalibrationResult, then iterates each (year, init_date) pair to obtain conformal half-widths. Half-widths are re-centred on each row's mean to produce lower_{pct} / upper_{pct} band pairs.
  3. Each row is instantiated as a DeliveryRow (schemas.py:109) with extra="forbid". If any cfg.reference_data spec produces an f"{spec.name}_in_season" column whose name is not declared in DeliveryRow, Pydantic raises ValidationError immediately rather than silently dropping the column.

4. HindcastDelivery document validation

HindcastDelivery(rows=level_rows, generated_date=today_iso) (schemas.py:227) runs two additional validators: - _validate_no_duplicate_keys — rejects repeated (year, init_date, geo_identifier) tuples. - _validate_fold_consistency — asserts all (year, geo_identifier) groups carry the same count of init_date entries.

5. Serialise and write

delivery_to_dataframe(level_delivery, column_order=column_order) (conversions.py:406) produces a Polars DataFrame in the canonical column order. .pipe(apply_delivery_post_transforms, enforce_narrowing=..., drop_frozen=...) applies optional CI narrowing and frozen-tail pruning. Finally level_df.write_csv(str(out_path), float_precision=3) (run_deliver.py:91).

Mermaid Flow

flowchart TD
    A["deliver_experiment(run_dir)\nrun_deliver.py:40"]
    B["ExperimentResult.from_run_dir\n+ guard checks"]
    C["build_delivery_column_order\nschemas.py:58"]
    D["For each level ADM0/ADM1/ADM2"]
    E["walk_forward_preds_to_delivery_rows\nconversions.py:182\nmode=hindcast"]
    F["compute_hindcast_preds\nconversions.py:117\n→ concat CV folds"]
    G["_aggregate_to_level\nconversions.py:55\nADM2: pass-through\nADM1/ADM0: area-weighted"]
    H["kg_ha → bu/ac\nunit_utils.py:33\n+ yield_range clamp"]
    I["_build_hw_records_from_calibration\n→ lower_N / upper_N bands"]
    J["DeliveryRow (Pydantic)\nextra=forbid\nCI ordering validator"]
    K["HindcastDelivery\nduplicates + fold-consistency check"]
    L["delivery_to_dataframe\n+ apply_delivery_post_transforms"]
    M["write_csv float_precision=3\n→ delivery/Treefera_*_{ADM}_Hindcast_*.csv"]

    A --> B
    B --> C
    C --> D
    D --> E
    E --> F
    F --> G
    G --> H
    H --> I
    I --> J
    J --> K
    K --> L
    L --> M

Invariants

CI ordering invariant (schemas.py:176): _validate_ci_ordering enforces lower_95 ≤ lower_90 ≤ … ≤ lower_50 ≤ mean ≤ upper_50 ≤ … ≤ upper_95 for every present (non-None) band. Absent bands are skipped. Violation raises ValueError at row construction time — cannot be violated silently.

init_date calendar year invariant (schemas.py:197): _validate_init_date_year requires init_date calendar year to lie within [year - LONG_RANGE_HORIZON_YEARS, year + 1] where LONG_RANGE_HORIZON_YEARS = 10. Long-range stubs (up to 10 years ahead) pass; a forecast issued more than one calendar year after its target season is rejected.

Unit conversion boundary: all kg/ha columns are converted before writing. The weather_correction_bu_ac column is permanently in bu/ac and is excluded from the reverse conversion in the export path (export.py:56).

extra="forbid" row strictness (schemas.py:109): any unknown column name in the assembled dict raises ValidationError at construction, not at write time.

ADM0-only benchmark columns: nass_actual_area_weighted_all and nass_actual_prod_div_area_all are populated only at ADM0 (conversions.py:252). ADM1 and ADM2 rows carry None for these fields.

Failure Modes

  • Missing POSTPROCESS artefact: result.has_postprocessed = FalseFileNotFoundError at entry. Fix: run cli run postprocess.
  • Upward import layering issue: delivery/conversions.py:50 imports primary_calibration from stages/run_meta_models.py — an upward dependency tracked as tech-debt in BOUNDED_CONTEXTS.md. Non-breaking at runtime.
  • Empty level rows (run_deliver.py:78): if level_rows is empty for a level, that CSV is silently skipped (not an error). Can happen if all counties have NaN area_harvested_ha with no trailing history.
  • ValidationError on unknown benchmark column: a new cfg.reference_data spec whose name does not match a DeliveryRow field will raise immediately. Add the field to DeliveryRow before adding the spec.
  • CI ordering violation: conformal calibration that produces upper_N < mean (sign error) will raise at row construction. The bug identified in project MEMORY.md (project_sbc_tmi_bug.md) is upstream of this check.

Cross-references

  • delivery.md — full module-level detail for schemas.py, conversions.py, export.py
  • stages.mdrun_deliver.py function signatures and call sequence
  • forecast.md_deliver_forecast handles the parallel forecast CSV path
  • evaluate.md — reads the same walk_forward_preds.parquet for metrics
  • dashboard.md — reads Treefera_*_ADM0_Hindcast_*.csv directly

PRs

No dedicated PR for the deliver pipeline in the tracked window. PR #369 extended delivery/schemas.py (+26 lines) to accept init_date calendar years up to 10 years before the target season_year, enabling long-range forecast rows to pass the _validate_init_date_year check.