Skip to content

Pipeline Stages

Overview

The stages/ package holds eight run_*.py orchestrators that implement the commodity_hindcast pipeline as a linear DAG. Two top-level orchestrators (run_hindcast, run_forecast) compose the atomic stage modules. Each module is independently runnable against a run_dir on disk; no in-memory state crosses a stage boundary (DESIGN.md:100).

Artefact contract (DESIGN.md:73): FIT → models/{experiment_key}/{label}/; PREDICT → preds/{experiment_key}/{label}/walk_forward_preds.parquet; POSTPROCESS → postprocessed/national.parquet; DELIVER → delivery/Treefera_{experiment_key}_{ADM}_Hindcast_{YYYYMMDD}.csv. Forecast artefacts are isolated under forecast/{season_year}/{init_date}/ (PR #369 — see §run_forecast.py below).

Modules

run_features.py

Purpose: Data preprocessing before training — column guard + state-name enrichment.

Function:

def preprocess_data(
    fit_data: pd.DataFrame,
    pred_data: pd.DataFrame,
    config: ExperimentConfig,
) -> tuple[pd.DataFrame, pd.DataFrame]:

run_features.py:10

Inputs (consumed): fit_data and pred_data DataFrames loaded by the caller (typically run_hindcast._load_and_preprocess) from: - {features_dir}/{experiment_key}/fit.parquet - {features_dir}/{experiment_key}/pred.parquet

Outputs (written): None — pure in-memory transform. Returns the cleaned (fit_data, pred_data) tuple.

Inner call sequence:

  1. add_state_name(fit_data) / add_state_name(pred_data) — geo enrichment (run_features.py:13–14).
  2. fit_data.dropna(axis=1, how="all") — drop all-NaN columns; raises ValueError if any dropped column was a configured feature (run_features.py:20–45).
  3. pred_data = pred_data[list(fit_data.columns)] — align pred columns to fit schema (run_features.py:48).

This module does NOT build features (that is cli run features / features/run.py). It is a preprocessing guard called inside run_hindcast._load_and_preprocess.

run_fit.py

Purpose: Fit detrender + feature imputer + regressor for one fold. Persist artefacts. Return a HindcastSlice handle.

Function:

def train(
    train_data: pd.DataFrame,
    fold_label: str,
    config: ExperimentConfig,
) -> HindcastSlice:

run_fit.py:28

Inputs (consumed): train_data DataFrame (in-memory, passed by caller). Resolved paths from config.

Outputs (written) under {models_dir}/{experiment_key}/{fold_label}/ (run_fit.py:63–65): - detrender.pkl — fitted detrender - feature_fill_values.parquet — median imputation fill values (one row per feature) - regressor file(s) via model.save_model(...) — e.g. model.ridge or model.xgboost

Inner call sequence (run_fit.py:45–149): build_detrender()detrender.fit_transform(train_data)detrender.save(...)MedianImputer().fit(...)write_dataframe(fill_values_df, ...)feature_imputer.transform(...)fit_aggregation_policy.fit_transform(...)model.fit(X_train, y_train, ...)model.save_model(...) → return HindcastSlice.from_config(config, fold_label).

No prediction parquets are produced — that is the sole responsibility of run_predict.

run_predict.py

Purpose: Inference kernel: detrend → score → weather-correct → retrend. Split into pure-compute predict() and pure-persistence write_walk_forward_outputs().

Public surface (run_predict.py:381–386): - predict(run_dir, *, season_year, init_date) -> tuple[pd.DataFrame, PredictionInputs] - write_walk_forward_outputs(wide_prediction_frame, *, year_data, fold_preds_dir) -> None - run_predict(run_dir, *, season_year, init_date) -> None

predict()

def predict(
    run_dir: AnyPath | str,
    *,
    season_year: int,
    init_date: date,
) -> tuple[pd.DataFrame, PredictionInputs]:

run_predict.py:282

Pure compute — no persistence. Loads model artefacts and the prediction slice, runs the kernel, returns the canonical walk_forward_preds-shaped frame plus the resolved PredictionInputs.

Four-step inverse pipeline (_predict, run_predict.py:255–279):

  1. Detrendinputs.detrender.transform(inputs.pred_slice) removes trend.
  2. Scorepredict_kernel(model, pred_slice_detrended, ...) applies the regressor.
  3. Weather-correct — the weather-feature coefficients are embedded in the regression score.
  4. Retrendpredict_kernel calls detrender.inverse_transform(...), yielding sim_yield_kg_ha.

build_wide_prediction_frame(...) assembles the 9-column canonical frame (run_predict.py:313–318).

Fold resolution (run_predict.py:86–98): tries models/{experiment_key}/production/ first, then models/{experiment_key}/{season_year}/; raises FileNotFoundError if neither exists.

Forecast vs canonical pred.parquet routing (run_predict.py:197–233):

Forecast features land at run_dir/forecast/{season_year}/{init_date}/features/pred.parquet (path restructured by PR #369). Canonical features are at {features_dir}/{experiment_key}/pred.parquet. The routing check at run_predict.py:213 prefers the forecast parquet when it exists, otherwise falls back to canonical with grid-snap.

write_walk_forward_outputs()

def write_walk_forward_outputs(
    wide_prediction_frame: pd.DataFrame,
    *,
    year_data: pd.DataFrame,
    fold_preds_dir: Path,
) -> None:

run_predict.py:322

Blind-overwrite semantics (run_predict.py:334): each call replaces any pre-existing file. Multi-init callers (walk-forward CV) MUST accumulate across init_dates in memory and call once per fold to avoid destroying K-1 init_dates on disk.

Outputs written under fold_preds_dir/: - walk_forward_preds.parquet — canonical 9-column county × init_dates frame - year_data.parquet — raw prediction slice for the init_date

run_predict() — single-init wrapper (run_predict.py:346)

Composes predict() + write_walk_forward_outputs(); used by CLI predict and run_forecast. NOT used by walk-forward CV — run.runner.run_walk_forward accumulates K init_dates in memory and calls write_walk_forward_outputs once per fold (run_predict.py:18–25). Writes under {preds_dir}/{experiment_key}/{fold_label}/.

run_meta_models.py

Purpose: Postprocess orchestrator — aggregate rolling preds to national, fit bias corrector, attach conformal CI columns.

Function:

def postprocess_experiment(
    run_dir: Path,
    *,
    included_geo_identifiers: frozenset[str],
) -> None:

run_meta_models.py:138

Inputs (consumed): - Per-fold walk_forward_preds.parquet under run_dir/preds/{experiment_key}/{fold_label}/ - NASS county panel (loaded internally when bias_corrector.kind != "none") - included_geo_identifiers — county set for bias corrector

Outputs (written): - run_dir/conformal/{mode}.parquet — one calibration sidecar per configured mode (run_meta_models.py:67) - run_dir/postprocessed/national.parquet — one row per (year, init_date) with sim/obs/bias/CI columns (run_meta_models.py:193–196) - Per-fold bias_corrector.pkl beside each fold (via bias_corrector.save(period.bias_corrector_path))

Inner call sequence (run_meta_models.py:158–196):

  1. fit_and_save_all_configured(result, ci_levels) — fit + save every mode in config.postprocess.conformalise.
  2. primary_calibration(result, ci_levels) — load the primary calibration (driven by forecast.residual_mode).
  3. For each fold (CV then production if present, _all_folds, run_meta_models.py:199–205): a. period.load_walk_forward_preds() — load fold's predictions. b. aggregate_weighted_frame(...) — area-weighted ADM0 aggregation. c. build_bias_corrector(config).fit(...).save(...) — fit and persist bias corrector. d. build_rows(national_by_init, bias_corrector, calibration, commodity) — assemble national rows.
  4. write_dataframe(pd.DataFrame(national_rows), str(national_path)).

Calibration modes: config.postprocess.conformalise controls which sidecars are persisted; config.forecast.residual_mode selects which mode is applied at forecast time (run_meta_models.py:119–135).

run_deliver.py

Purpose: Generate hindcast delivery CSVs at ADM0/ADM1/ADM2 from CV-fold rolling predictions.

Function:

def deliver_experiment(run_dir: AnyPath | Path | str) -> None:

run_deliver.py:40

Inputs (consumed): - run_dir/postprocessed/national.parquet (via result.has_postprocessed check) - Per-fold walk_forward_preds.parquet (via result.has_walk_forward_preds check)

Outputs (written) under run_dir/delivery/ (run_deliver.py:67):

Treefera_{experiment_key}_{ADM}_Hindcast_{YYYYMMDD}.csv

where ADM{ADM0, ADM1, ADM2}.

Inner call sequence: guard checks → for each ADM level: walk_forward_preds_to_delivery_rows(result, level, mode="hindcast")HindcastDelivery(rows, generated_date)delivery_to_dataframe(...).pipe(apply_delivery_post_transforms, ...)level_df.write_csv(out_path, float_precision=3) (run_deliver.py:47–92).

Forecast delivery is NOT handled here — run_forecast._deliver_forecast writes per-(season_year, init_date) CSVs under run_dir/forecast/{season_year}/{init_date}/delivery/.

run_forecast.py

Purpose: Forecast orchestrator — single end-to-end entrypoint composing feature build → predict → postprocess → deliver for a (season_year, init_date) slice against an already-fitted run_dir.

Public surface (run_forecast.py:11–14): - run(run_dir, *, season_year, init_date, force) -> None — full pipeline - run_features(run_dir, *, season_year, init_date, force) -> None — features only - run_predict(run_dir, *, season_year, init_date) -> None — predict + postprocess + deliver

run() — full forecast pipeline

def run(
    run_dir: AnyPath | str,
    *,
    season_year: int,
    init_date: date,
    force: bool = False,
) -> None:

run_forecast.py:143

Inner call sequence:

  1. ExperimentResult.from_run_dir(run_dir) — load experiment handle.
  2. validate_residual_mode(experiment, config.forecast.residual_mode) — pre-flight guard that refuses runs whose run_dir cannot supply the configured calibration residuals (run_forecast.py:91–141).
  3. run_features(run_dir, season_year=season_year, init_date=init_date, force=force).
  4. run_predict(run_dir, season_year=season_year, init_date=init_date).

run_features() — forecast feature build

Inner call sequence (run_forecast.py:167–215):

  1. Skip-if-exists guard (force=False and both zarr + parquet present → no-op).
  2. preflight_paths_for_forecast_features(config) — check required input paths.
  3. ForecastSlice(run_dir, ..., season_year, init_date) — create path handle.
  4. materialise_forecast_indices(config, results) — build daily weather indices zarr at results.indices_zarr.
  5. _build_forecast_features(config, results) — build pred.parquet at results.features_parquet.

_build_forecast_features (run_forecast.py:264–312) overrides the weather builder to read results.indices_zarr, narrows year window to season_year, calls the long-range stubs (synthesise_long_range_climo_for_unseen_years, synthesise_long_range_stress_for_unseen_years — added by PR #369 for years beyond climo coverage), then build_features(forecast_cfg, output_dir=results.features_dir). Finally _impute_forecast_area fills NaN area_harvested_ha via trailing 3-year median from the canonical pred.parquet.

run_predict() — forecast predict + postprocess + deliver

Inner call sequence (run_forecast.py:221–258):

  1. preflight_paths_for_forecast_predict(config, ...).
  2. ForecastSlice(...) — path handle.
  3. run_predict_stage(experiment.run_dir, season_year=season_year, init_date=init_date) — alias to stages.run_predict.run_predict; writes walk_forward_preds.parquet under the production fold.
  4. _postprocess_forecast(experiment, results) — single-production-fold postprocess; writes results.postprocessed_national_path.
  5. _deliver_forecast(experiment, results) — writes ADM0/ADM1/ADM2 CSVs at results.delivery_dir/Treefera_{key}_{ADM}_Forecast_{init_date}.csv.

Isolation from canonical hindcast artefacts

DESIGN.md:125 mandates: "The forecast pipeline SHALL NOT write to canonical hindcast artefacts under {features_dir}/{experiment_key}/."

PR #369 restructured forecast paths from forecast/{init_date}/ to forecast/{season_year}/{init_date}/ so that multiple season_year values at the same init_date coexist without collision. The single source of truth is ForecastSlice.root (lib/results/results_slice.py). All forecast outputs land exclusively under run_dir/forecast/{season_year}/{init_date}/.

Multi-season_year support (PR #369)

PR #369 (commit f5399b96) enabled forecasting multiple season_year values from one init_date (e.g. YYYY+1 … YYYY+5). Path restructure forecast/{init_date}/ → forecast/{season_year}/{init_date}/ fixed the collision: previously the skip-if-exists guard reused the first season's artefacts for subsequent seasons, causing _load_prediction_slice to raise "No rows in pred.parquet for season_year=N". Touched: stages/run_forecast.py, stages/run_predict.py, run/preflight.py, lib/results/results_slice.py, lib/results/run_result.py.

run_hindcast.py

Purpose: Hindcast orchestrator — walk-forward CV + production fit + postprocess + evaluate + deliver. Top-level entrypoint for the full hindcast pipeline.

Public surface: - run(config_path) -> AnyPath — full hindcast - fit_production(config_path) -> AnyPath — production fit only (fast path for forecast without OOS CIs)

Both run and fit_production are top-level orchestrator entry points exposed via the CLI. run is the standard full-pipeline path (walk-forward CV + production fit + postprocess + evaluate + deliver). fit_production is the fast path used when you want a production model quickly for cli run forecast without needing out-of-sample confidence intervals — it skips the walk-forward fold loop, postprocess, evaluate, and deliver stages. Both return the run_root path so the caller (or Makefile) can chain subsequent commands against the same run directory.

run() — full hindcast

def run(config_path: AnyPath | str) -> AnyPath:

run_hindcast.py:193

Inputs consumed: {features_dir}/{experiment_key}/fit.parquet and pred.parquet (features must be pre-built by cli run features).

Outputs under {run_dir_base}/{YYYYMMDD_HHMMSS}_{experiment_key}/: - config_resolved.yaml, models/{experiment_key}/{fold_label}/ (detrender + fill values + regressor) - preds/{experiment_key}/{fold_label}/walk_forward_preds.parquet, preds/{experiment_key}/train_preds.parquet - conformal/{mode}.parquet, postprocessed/national.parquet - reports/ (metrics + plots), delivery/Treefera_*.csv

Inner call sequence (run_hindcast.py:200–226):

  1. _prepare_config(config_path) — resolve config, set COMMODITY_HINDCAST_CONFIG.
  2. run_preflight(preflight_paths_for_hindcast(config)) — validate required input paths.
  3. _create_run_root(config) — create timestamped directory, rewrite models_dir/preds_dir.
  4. prepare_hindcast_mlflow(...) / hindcast_mlflow_run(...) — MLflow tracking context.
  5. _load_and_preprocess(config) — load fit/pred parquets, call preprocess_data, apply select_by_production county cut.
  6. _persist_included(fit_data, run_root) — write included county frozenset to disk.
  7. _run_walk_forward_phase(config, fit_data, pred_data, references_by_harvest):
  8. ExpandingFoldGenerator generates train/test splits per experiment_protocol.
  9. run_walk_forward(config, data_fold_generator) iterates folds, calling run_experiment per fold (which calls trainpredict_kernel → accumulates preds → write_walk_forward_outputs).
  10. Consolidates per-fold train_preds.parquet into a single preds/{key}/train_preds.parquet.
  11. _run_production_fit_phase(config, fit_data)run_experiment(fold_label="production", ...) trains on all available data.
  12. postprocess_experiment(run_root, included_geo_identifiers=included) — META-MODELS stage.
  13. evaluate_experiment(run_root) — DIAGNOSTICS stage.
  14. deliver_experiment(run_root) — DELIVER stage.
  15. Returns run_root.

fit_production() — fast path

def fit_production(config_path: AnyPath | str) -> AnyPath:

run_hindcast.py:229

Runs only _load_and_preprocess(config, load_references=False) + _run_production_fit_phase. No walk-forward CV, no postprocess, no evaluate, no deliver. Produces the production model needed by cli run forecast when out-of-sample CIs are not required. Returns run_root.

Walk-forward CV detail

_run_walk_forward_phase calls run_walk_forward (run/runner.py) with an ExpandingFoldGenerator. Each fold calls run_experiment(fold_label=str(season_year), ...) — NOT routing through run_predict.run_predict — accumulating predictions across all hindcast_init_season_doys in memory before one write_walk_forward_outputs call per fold.

run_diagnostics.py

Purpose: Evaluation entry point — compute metrics + optionally generate plots.

Functions:

def evaluate_experiment(
    run_dir: Path | AnyPath | str,
    *,
    skip_plots: bool = False,
) -> list[dict[str, float]]:

run_diagnostics.py:12

def enrich_fold_metrics_for_reporting(
    metrics_data: list[dict],
    fold_reporting_contexts: list[dict],
) -> None:

run_diagnostics.py:51

Inputs consumed: Completed FIT + POSTPROCESS run under run_dir.

Outputs written (when skip_plots=False): - run_dir/reports/ — metrics artefacts (write_metrics_artefacts) - run_dir/reports/hindcast/ — per-fold diagnostic PNGs (generate_plots)

Inner call sequence: compute_metrics(run_dir) (always) → if not skip_plots: write_metrics_artefacts(run_dir) + generate_plots(run_dir). DESIGN.md:17 mandates per-fold PNGs under reports/hindcast/, experiment-level plots at reports/.

enrich_fold_metrics_for_reporting attaches rolling_forecast_data and ADM1/ADM2 tables in place; used by the Streamlit dashboard, not by the hindcast run.

Cross-references

Relationships

Module Role Calls
run_features.py Atomic preprocessing add_state_name
run_fit.py Atomic FIT stage train()
run_predict.py Atomic PREDICT stage predict(), write_walk_forward_outputs()
run_meta_models.py Atomic POSTPROCESS stage postprocess_experiment()
run_deliver.py Atomic DELIVER stage deliver_experiment()
run_diagnostics.py Atomic EVALUATE stage evaluate_experiment()
run_hindcast.py Hindcast orchestrator All six atomic stages + run_walk_forward
run_forecast.py Forecast orchestrator run_features, run_predict, _postprocess_forecast, _deliver_forecast

run_hindcast and run_forecast are the only entry points exposed through cli.py. Atomic stage modules can also be called directly via CLI sub-commands for incremental re-runs.

Stage-flow Diagram

flowchart TD
    FE["run_features\npreprocess_data()"]
    FIT["run_fit\ntrain()"]
    PRED["run_predict\npredict() +\nwrite_walk_forward_outputs()"]
    META["run_meta_models\npostprocess_experiment()"]
    DIAG["run_diagnostics\nevaluate_experiment()"]
    DEL["run_deliver\ndeliver_experiment()"]

    FE --> FIT
    FIT --> PRED
    PRED --> META
    META --> DIAG
    DIAG --> DEL

    subgraph HINDCAST["run_hindcast.run()  —  hindcast orchestrator"]
        direction TB
        FE
        FIT
        PRED
        META
        DIAG
        DEL
    end

    FE_F["run_forecast.run_features()\nmaterialise_forecast_indices\nbuild_features"]
    PRED_F["run_forecast.run_predict()\n→ run_predict_stage()"]
    META_F["_postprocess_forecast()"]
    DEL_F["_deliver_forecast()"]

    FE_F --> PRED_F
    PRED_F --> META_F
    META_F --> DEL_F

    subgraph FORECAST["run_forecast.run()  —  forecast orchestrator"]
        direction TB
        FE_F
        PRED_F
        META_F
        DEL_F
    end

    HINDCAST -. "run_dir on disk\n(read-only reference)" .-> FORECAST

The dashed edge denotes that the forecast orchestrator consumes artefacts from a completed hindcast run_dir (production model, conformal calibration sidecars) but never writes back to canonical hindcast paths. ```