Skip to content

Source: Delivery (Client-facing CSV)

Overview

The delivery subsystem is the final client-facing layer of the commodity hindcast pipeline. It consumes wide rolling-prediction DataFrames produced by the walk-forward loop, aggregates them to one of three ADM levels (ADM0 national / ADM1 state / ADM2 county), converts internal kg/ha values to delivery units (bu/ac for grains, lbs/ac for cotton), attaches conformal-interval bands and benchmark columns, validates every row through a strict Pydantic schema (DeliveryRow), and writes the result as a QUBE-format CSV. A second export path (export.py) ingests the already-written CSVs, joins them to geometry.parquet for warehouse UUID resolution, converts back to kg/ha for the model-ingestion table, and hands off to the shared model_output_export_util pipeline.

All unit conversion at the delivery boundary is performed exclusively here (DESIGN.md line 116); kg/ha columns never appear in the output CSVs.

Modules

schemas.py (284 lines)

Purpose: Defines the row-level and document-level Pydantic models that act as the typed contract for every value written to a delivery CSV.

Public classes:

  • DeliveryRow (schemas.py:109) — frozen Pydantic model for a single CSV row. extra="forbid" is load-bearing: if a cfg.reference_data spec produces an f"{spec.name}_in_season" column whose name does not match any declared field, Pydantic raises ValidationError immediately rather than silently discarding the benchmark column.

  • HindcastDelivery (schemas.py:227) — frozen container holding rows: list[DeliveryRow] plus generated_date: str. Validates structural integrity across all rows.

Public function:

  • build_delivery_column_order(ci_columns) (schemas.py:58) — returns a tuple[str, ...] giving the canonical CSV column sequence: _DELIVERY_PREFIX + CI columns + _DELIVERY_SUFFIX. Defaults to the full CI superset (50 / 68 / 80 / 90 / 95 percentiles). Called by run_deliver.py when writing the final CSV.

DeliveryRow field inventory:

Field Type Notes
commodity str upper-cased commodity name
year int harvest season year
init_date str ISO-8601 date string (YYYY-MM-DD)
geo_identifier str default "adm0:usa"
variable str default "yield_bu_acre"
model str default "commodity_hindcast"
mean float area-weighted predicted yield in delivery units
weather_correction_bu_ac float detrended yield signal (always in bu/ac; not converted by export)
nass_actual float \| None area-weighted observed NASS yield from predicted counties
nass_actual_area_weighted_all float \| None full-universe NASS survey yield (ADM0 only)
nass_actual_prod_div_area_all float \| None full-universe production ÷ area (ADM0 only)
wasde_in_season float \| None as-of-init_date WASDE estimate
conab_final_in_season float \| None as-of-init_date CONAB final estimate
conab_lev_in_season float \| None as-of-init_date CONAB levantamento estimate
lower_95upper_95 float \| None conformal CI bands at 50/68/80/90/95 % levels

Validators (non-obvious invariants):

  • _validate_ci_ordering (schemas.py:176) — enforces lower_95 ≤ lower_90 ≤ … ≤ lower_50 ≤ mean ≤ upper_50 ≤ … ≤ upper_95 for every present (non-None) band. Absent bands are skipped, so partial CI subsets still pass. Violation raises ValueError at row construction time — the CI band ordering invariant cannot be violated silently.

  • _validate_init_date_year (schemas.py:197) — requires init_date calendar year to lie within [year - LONG_RANGE_HORIZON_YEARS, year + 1] where LONG_RANGE_HORIZON_YEARS = 10 (schemas.py:106). A forecast issued one calendar year after the season it predicts is rejected; forecasts up to 10 years ahead (long-range stubs) are accepted.

  • _validate_no_duplicate_keys (schemas.py:248) — rejects any HindcastDelivery with repeated (year, init_date, geo_identifier) tuples.

  • _validate_fold_consistency (schemas.py:260) — asserts all (year, geo_identifier) groups carry the same count of init_date entries; a mismatch indicates a truncated fold.

conversions.py (419 lines)

Purpose: Converts wide rolling-prediction DataFrames (one row per county × year × init_date, values in kg/ha) into lists of DeliveryRow objects at the requested ADM level, performing area-weighted aggregation, unit conversion, benchmark attachment, and CI half-width computation.

Public functions:

  • walk_forward_preds_to_delivery_rows(result, *, level, ci_levels, mode, walk_forward_preds) (conversions.py:182) — top-level entry point. Accepts mode="hindcast" (reads CV folds) or mode="forecast" (reads the production fold or an explicit DataFrame). Returns list[DeliveryRow].

  • delivery_to_dataframe(delivery, *, column_order) (conversions.py:406) — serialises a HindcastDelivery to a Polars DataFrame in the canonical column order returned by build_delivery_column_order.

  • compute_hindcast_preds(result) (conversions.py:117) — loads and concatenates all non-production-fold walk_forward_preds parquets; fills missing area_harvested_ha via 3-year trailing median from the full NASS panel using impute_missing_area. Returns None when no non-production folds exist.

  • compute_forecast_preds(result, *, walk_forward_preds) (conversions.py:162) — returns forecast-mode rolling predictions from either the supplied DataFrame or the production slice's path. Returns None when neither source has rows.

Private helpers:

  • _aggregate_to_level(df, value_cols, *, level, group_cols, area_col) (conversions.py:55) — wraps lib/geo/aggregation.aggregate_weighted_frame; ADM2 mode skips aggregation and returns a per-county slice directly.

  • _build_hw_records_from_calibration(result, agg, *, ci_levels, bushel_weight_lbs) (conversions.py:73) — calls primary_calibration(result, ci_levels) (imported from stages/run_meta_models.py) to retrieve the saved CalibrationResult, then iterates each unique (year, init_date) in agg to obtain conformal half-widths via CalibrationResult.predict_interval. Half-widths are returned in bu/ac; the _hw_{pct} columns are subsequently re-centred on each row's mean to produce lower_{pct} / upper_{pct} bands.

ADM aggregation rules:

  • ADM2: no spatial aggregation; each county row is passed through directly (conversions.py:225).
  • ADM1: aggregate_weighted_frame collapses county rows to state level using area_harvested_ha as the weight column; group keys are ["year", "init_date"] (conversions.py:231).
  • ADM0: same as ADM1 but also joins two additional NASS full-universe benchmark series (nass_actual_area_weighted_all, nass_actual_prod_div_area_all) which are only meaningful at national level (conversions.py:252).

Unit handling:

All internal sim_yield_kg_ha and reference values arrive in kg/ha. Conversion to delivery units is a single multiplicative scale:

scale = kg_ha_to_bu_acre(1.0, bushel_weight_lbs)  # conversions.py:208

This calls lib/unit_utils.kg_ha_to_bu_acre (unit_utils.py:33):

kg_ha * HA_PER_ACRE / (bushel_weight_lbs * KG_PER_LB)

with HA_PER_ACRE = 0.404686, KG_PER_LB = 0.453592 (unit_utils.py:24–25). For cotton, bushel_weight_lbs = 1 effectively yields lbs/ac. The weather_correction_bu_ac column (detrended signal, sim_yield_kg_ha_detrended) is also converted via the same scale and written in bu/ac permanently — it is explicitly excluded from the reverse conversion in export.py (export.py:56–72).

After conversion, all delivery-unit columns are clamped to [lo, hi] from CommodityConfig.yield_range via lib/unit_utils.clip_yield_to_delivery_range (unit_utils.py:93); out-of-range rows emit logger.warning before clipping.

export.py (407 lines)

Purpose: Reads the already-written hindcast/forecast delivery CSVs, converts them to kg/ha for warehouse ingest, joins geo_identifier to geometry.parquet to obtain warehouse geo_id UUIDs, and delegates to the shared model_output_export_util pipeline.

Public functions:

  • delivery_csv_to_export_dataframe(csv_path, *, run_dir, bushel_weight_lbs, geo_map) (export.py:137) — transforms one QUBE delivery CSV to a wide kg/ha export DataFrame without writing to disk. Joins via normalised geo keys; raises ValueError if any geo_identifier fails resolution.

  • delivery_csv_to_hindcast_export_parquet(csv_path, *, run_dir, bushel_weight_lbs, out_path) (export.py:198) — calls delivery_csv_to_export_dataframe then writes to a staging parquet at out_path.

  • forecast_delivery_csvs_to_export_parquet(csv_paths, *, run_dir, bushel_weight_lbs, out_path) (export.py:219) — concatenates multiple forecast delivery CSVs (typically one per ADM level: ADM0 + ADM1 + ADM2) into one staging parquet.

  • find_commodity_hindcast_files(output_dir, config) (export.py:249) — builds per-source staging parquet paths keyed by "hindcast" and/or "forecast". For hindcast, picks the most-recently-modified Treefera_*_Hindcast_*.csv under run_dir/delivery/. For forecast, picks all CSVs under the lexicographically latest init_date directory across all season years.

  • run_export_pipeline(output_dir, config, pipeline_run_id, model_output_path) (export.py:329) — delegates to treefera_market_insights.model_output_export_util.pipeline.run_export_pipeline with file_finder=find_commodity_hindcast_files.

  • run_export_if_configured(*, run_root, export, export_config) (export.py:349) — validates PIPELINE_RUN_ID and MODEL_INGESTION_PATH env vars, loads ModelConfig.from_yaml, then calls run_export_pipeline. Early-exits when export=False.

  • run_export(*, run_root, export, export_config) (export.py:378) — preflight-gated wrapper around run_export_if_configured; loads ExperimentResult and calls run_preflight(preflight_paths_for_export(config)) before delegating.

S3 path handling:

All paths are threaded as cloudpathlib.AnyPath (export.py:29). _resolve_geometry_parquet_path (export.py:98) probes candidates in priority order: config_resolved.yaml → data_root/geometry.parquet, then INPUT_DATA_DIR/geometry.parquet, then run_dir/geometry.parquet. This follows the S3-safe AnyPath pattern: .as_posix() is never called on cloud paths; existence is checked via p.exists(). Staging files are written to tempfile.NamedTemporaryFile paths (local), never under data_root directly.

Unit inversion in export:

The columns in _BU_AC_TO_KG_HA_COLS (export.py:56) are converted back to kg/ha via bu_acre_to_kg_ha (unit_utils.py:38) using the bushel_weight_lbs read from config_resolved.yaml. weather_correction_bu_ac is deliberately absent from this list; it remains in bu/ac in the warehouse table (export.py:56, comment).

geo_normalise.py (30 lines)

Purpose: Provides the canonical key normalisation for the delivery-side of the geometry.parquet join.

Public function:

  • normalize_geo_identifier_for_commodity_export_join(ident) (geo_normalise.py:21) — strips, ASCII-folds via export_geometry_mapping.ascii_fold_to_ascii, then lower-cases the entire identifier. This aligns delivery geo_identifier strings (already lower-cased by walk_forward_preds_to_delivery_rows via str.to_lowercase()) with the geometry map keys produced by load_identifier_geo_mapping.

Why it exists: The shared normalize_identifier_for_export_join in export_geometry_mapping preserves segment-prefix casing, leaving adm0:usa unexpandable to ADM0:USA. This module applies a simpler full-lowercase rule that is correct for the delivery context where both sides are already lower-cased.

__init__.py (7 lines)

Namespace-only; no symbol re-exports. Callers import directly from delivery.schemas or delivery.conversions.

Cross-references

  • BOUNDED_CONTEXTS — Context 7 (Delivery) and Anti-corruption layer table.
  • lib/unit_utils.pykg_ha_to_bu_acre, bu_acre_to_kg_ha, clip_yield_to_delivery_range.
  • lib/geo/aggregation.pyaggregate_weighted_frame, AggregationLevel.
  • stages/run_meta_models.pyprimary_calibration (imported by conversions.py; layering caveat below).
  • run/preflight.pypreflight_paths_for_export, run_preflight (imported by export.py; layering caveat below).
  • diagnostics/runners.pyyield_asof_array_from_releases (used to construct as-of in-season benchmarks).
  • lib/reference_data/nass_benchmarks.pynass_national_survey_yield_area_weighted_kg_ha, nass_national_prod_div_area_kg_ha.

Relationships

Consumes: - walk_forward_preds.parquet (per fold, from Experiment & Modelling context) - CalibrationResult sidecar (via primary_calibration, from Post-processing context) - NASS and reference-data loaders (via lib/reference_data/) - geometry.parquet (for geo_id UUID join in export path)

Produces: - run_dir/delivery/Treefera_{commodity}_ADM{n}_Hindcast_{YYYYMMDD}.csv - run_dir/forecast/{season_year}/{init_date}/delivery/Treefera_{commodity}_ADM{n}_Forecast_{YYYYMMDD}.csv - Staging parquets (temp files) for model-ingestion export

Called by: - stages/run_deliver.py — invokes walk_forward_preds_to_delivery_rows for each configured ADM level, assembles a HindcastDelivery, serialises via delivery_to_dataframe, and writes the CSV. - stages/run_forecast.py — calls walk_forward_preds_to_delivery_rows with mode="forecast" and an explicit per-init walk_forward_preds DataFrame. - cli.py run export — invokes run_export.

Layering caveats (tracked tech-debt)

Upward import — conformal helpers: delivery/conversions.py:50 imports primary_calibration directly from stages/run_meta_models.py. This creates an upward edge from the Delivery bounded context into the Experiment orchestration layer (Stages sit above Delivery in the intended DAG). The correct home for primary_calibration and its helpers is lib/ (e.g. lib/conformal/). Tracked in BOUNDED_CONTEXTS.md Anti-corruption layer table and Open question 2.

Upward import — preflight helpers: delivery/export.py:39 imports preflight_paths_for_export and run_preflight from run/preflight.py. The run/ package is an orchestration-level module; Delivery should not depend on it directly. Same recommended fix: migrate preflight_paths_for_export to lib/. Tracked in the same BOUNDED_CONTEXTS.md table.

Both violations are non-breaking at runtime but violate the single-direction import rule stated in DESIGN.md line 49 and create impediments to testing Delivery in isolation.