Pipeline Stages¶
Overview¶
The stages/ package holds eight run_*.py orchestrators that implement the commodity_hindcast pipeline as a linear DAG. Two top-level orchestrators (run_hindcast, run_forecast) compose the atomic stage modules. Each module is independently runnable against a run_dir on disk; no in-memory state crosses a stage boundary (DESIGN.md:100).
Artefact contract (DESIGN.md:73): FIT → models/{experiment_key}/{label}/; PREDICT → preds/{experiment_key}/{label}/walk_forward_preds.parquet; POSTPROCESS → postprocessed/national.parquet; DELIVER → delivery/Treefera_{experiment_key}_{ADM}_Hindcast_{YYYYMMDD}.csv. Forecast artefacts are isolated under forecast/{season_year}/{init_date}/ (PR #369 — see §run_forecast.py below).
Modules¶
run_features.py¶
Purpose: Data preprocessing before training — column guard + state-name enrichment.
Function:
def preprocess_data(
fit_data: pd.DataFrame,
pred_data: pd.DataFrame,
config: ExperimentConfig,
) -> tuple[pd.DataFrame, pd.DataFrame]:
run_features.py:10
Inputs (consumed): fit_data and pred_data DataFrames loaded by the caller (typically run_hindcast._load_and_preprocess) from:
- {features_dir}/{experiment_key}/fit.parquet
- {features_dir}/{experiment_key}/pred.parquet
Outputs (written): None — pure in-memory transform. Returns the cleaned (fit_data, pred_data) tuple.
Inner call sequence:
add_state_name(fit_data)/add_state_name(pred_data)— geo enrichment (run_features.py:13–14).fit_data.dropna(axis=1, how="all")— drop all-NaN columns; raisesValueErrorif any dropped column was a configured feature (run_features.py:20–45).pred_data = pred_data[list(fit_data.columns)]— align pred columns to fit schema (run_features.py:48).
This module does NOT build features (that is cli run features / features/run.py). It is a preprocessing guard called inside run_hindcast._load_and_preprocess.
run_fit.py¶
Purpose: Fit detrender + feature imputer + regressor for one fold. Persist artefacts. Return a HindcastSlice handle.
Function:
run_fit.py:28
Inputs (consumed): train_data DataFrame (in-memory, passed by caller). Resolved paths from config.
Outputs (written) under {models_dir}/{experiment_key}/{fold_label}/ (run_fit.py:63–65):
- detrender.pkl — fitted detrender
- feature_fill_values.parquet — median imputation fill values (one row per feature)
- regressor file(s) via model.save_model(...) — e.g. model.ridge or model.xgboost
Inner call sequence (run_fit.py:45–149): build_detrender() → detrender.fit_transform(train_data) → detrender.save(...) → MedianImputer().fit(...) → write_dataframe(fill_values_df, ...) → feature_imputer.transform(...) → fit_aggregation_policy.fit_transform(...) → model.fit(X_train, y_train, ...) → model.save_model(...) → return HindcastSlice.from_config(config, fold_label).
No prediction parquets are produced — that is the sole responsibility of run_predict.
run_predict.py¶
Purpose: Inference kernel: detrend → score → weather-correct → retrend. Split into pure-compute predict() and pure-persistence write_walk_forward_outputs().
Public surface (run_predict.py:381–386):
- predict(run_dir, *, season_year, init_date) -> tuple[pd.DataFrame, PredictionInputs]
- write_walk_forward_outputs(wide_prediction_frame, *, year_data, fold_preds_dir) -> None
- run_predict(run_dir, *, season_year, init_date) -> None
predict()¶
def predict(
run_dir: AnyPath | str,
*,
season_year: int,
init_date: date,
) -> tuple[pd.DataFrame, PredictionInputs]:
run_predict.py:282
Pure compute — no persistence. Loads model artefacts and the prediction slice, runs the kernel, returns the canonical walk_forward_preds-shaped frame plus the resolved PredictionInputs.
Four-step inverse pipeline (_predict, run_predict.py:255–279):
- Detrend —
inputs.detrender.transform(inputs.pred_slice)removes trend. - Score —
predict_kernel(model, pred_slice_detrended, ...)applies the regressor. - Weather-correct — the weather-feature coefficients are embedded in the regression score.
- Retrend —
predict_kernelcallsdetrender.inverse_transform(...), yieldingsim_yield_kg_ha.
build_wide_prediction_frame(...) assembles the 9-column canonical frame (run_predict.py:313–318).
Fold resolution (run_predict.py:86–98): tries models/{experiment_key}/production/ first, then models/{experiment_key}/{season_year}/; raises FileNotFoundError if neither exists.
Forecast vs canonical pred.parquet routing (run_predict.py:197–233):
Forecast features land at run_dir/forecast/{season_year}/{init_date}/features/pred.parquet (path restructured by PR #369). Canonical features are at {features_dir}/{experiment_key}/pred.parquet. The routing check at run_predict.py:213 prefers the forecast parquet when it exists, otherwise falls back to canonical with grid-snap.
write_walk_forward_outputs()¶
def write_walk_forward_outputs(
wide_prediction_frame: pd.DataFrame,
*,
year_data: pd.DataFrame,
fold_preds_dir: Path,
) -> None:
run_predict.py:322
Blind-overwrite semantics (run_predict.py:334): each call replaces any pre-existing file. Multi-init callers (walk-forward CV) MUST accumulate across init_dates in memory and call once per fold to avoid destroying K-1 init_dates on disk.
Outputs written under fold_preds_dir/:
- walk_forward_preds.parquet — canonical 9-column county × init_dates frame
- year_data.parquet — raw prediction slice for the init_date
run_predict() — single-init wrapper (run_predict.py:346)¶
Composes predict() + write_walk_forward_outputs(); used by CLI predict and run_forecast. NOT used by walk-forward CV — run.runner.run_walk_forward accumulates K init_dates in memory and calls write_walk_forward_outputs once per fold (run_predict.py:18–25). Writes under {preds_dir}/{experiment_key}/{fold_label}/.
run_meta_models.py¶
Purpose: Postprocess orchestrator — aggregate rolling preds to national, fit bias corrector, attach conformal CI columns.
Function:
run_meta_models.py:138
Inputs (consumed):
- Per-fold walk_forward_preds.parquet under run_dir/preds/{experiment_key}/{fold_label}/
- NASS county panel (loaded internally when bias_corrector.kind != "none")
- included_geo_identifiers — county set for bias corrector
Outputs (written):
- run_dir/conformal/{mode}.parquet — one calibration sidecar per configured mode (run_meta_models.py:67)
- run_dir/postprocessed/national.parquet — one row per (year, init_date) with sim/obs/bias/CI columns (run_meta_models.py:193–196)
- Per-fold bias_corrector.pkl beside each fold (via bias_corrector.save(period.bias_corrector_path))
Inner call sequence (run_meta_models.py:158–196):
fit_and_save_all_configured(result, ci_levels)— fit + save every mode inconfig.postprocess.conformalise.primary_calibration(result, ci_levels)— load the primary calibration (driven byforecast.residual_mode).- For each fold (CV then production if present,
_all_folds,run_meta_models.py:199–205): a.period.load_walk_forward_preds()— load fold's predictions. b.aggregate_weighted_frame(...)— area-weighted ADM0 aggregation. c.build_bias_corrector(config).fit(...).save(...)— fit and persist bias corrector. d.build_rows(national_by_init, bias_corrector, calibration, commodity)— assemble national rows. write_dataframe(pd.DataFrame(national_rows), str(national_path)).
Calibration modes: config.postprocess.conformalise controls which sidecars are persisted; config.forecast.residual_mode selects which mode is applied at forecast time (run_meta_models.py:119–135).
run_deliver.py¶
Purpose: Generate hindcast delivery CSVs at ADM0/ADM1/ADM2 from CV-fold rolling predictions.
Function:
run_deliver.py:40
Inputs (consumed):
- run_dir/postprocessed/national.parquet (via result.has_postprocessed check)
- Per-fold walk_forward_preds.parquet (via result.has_walk_forward_preds check)
Outputs (written) under run_dir/delivery/ (run_deliver.py:67):
where ADM ∈ {ADM0, ADM1, ADM2}.
Inner call sequence: guard checks → for each ADM level: walk_forward_preds_to_delivery_rows(result, level, mode="hindcast") → HindcastDelivery(rows, generated_date) → delivery_to_dataframe(...).pipe(apply_delivery_post_transforms, ...) → level_df.write_csv(out_path, float_precision=3) (run_deliver.py:47–92).
Forecast delivery is NOT handled here — run_forecast._deliver_forecast writes per-(season_year, init_date) CSVs under run_dir/forecast/{season_year}/{init_date}/delivery/.
run_forecast.py¶
Purpose: Forecast orchestrator — single end-to-end entrypoint composing feature build → predict → postprocess → deliver for a (season_year, init_date) slice against an already-fitted run_dir.
Public surface (run_forecast.py:11–14):
- run(run_dir, *, season_year, init_date, force) -> None — full pipeline
- run_features(run_dir, *, season_year, init_date, force) -> None — features only
- run_predict(run_dir, *, season_year, init_date) -> None — predict + postprocess + deliver
run() — full forecast pipeline¶
def run(
run_dir: AnyPath | str,
*,
season_year: int,
init_date: date,
force: bool = False,
) -> None:
run_forecast.py:143
Inner call sequence:
ExperimentResult.from_run_dir(run_dir)— load experiment handle.validate_residual_mode(experiment, config.forecast.residual_mode)— pre-flight guard that refuses runs whoserun_dircannot supply the configured calibration residuals (run_forecast.py:91–141).run_features(run_dir, season_year=season_year, init_date=init_date, force=force).run_predict(run_dir, season_year=season_year, init_date=init_date).
run_features() — forecast feature build¶
Inner call sequence (run_forecast.py:167–215):
- Skip-if-exists guard (
force=Falseand both zarr + parquet present → no-op). preflight_paths_for_forecast_features(config)— check required input paths.ForecastSlice(run_dir, ..., season_year, init_date)— create path handle.materialise_forecast_indices(config, results)— build daily weather indices zarr atresults.indices_zarr._build_forecast_features(config, results)— buildpred.parquetatresults.features_parquet.
_build_forecast_features (run_forecast.py:264–312) overrides the weather builder to read results.indices_zarr, narrows year window to season_year, calls the long-range stubs (synthesise_long_range_climo_for_unseen_years, synthesise_long_range_stress_for_unseen_years — added by PR #369 for years beyond climo coverage), then build_features(forecast_cfg, output_dir=results.features_dir). Finally _impute_forecast_area fills NaN area_harvested_ha via trailing 3-year median from the canonical pred.parquet.
run_predict() — forecast predict + postprocess + deliver¶
Inner call sequence (run_forecast.py:221–258):
preflight_paths_for_forecast_predict(config, ...).ForecastSlice(...)— path handle.run_predict_stage(experiment.run_dir, season_year=season_year, init_date=init_date)— alias tostages.run_predict.run_predict; writeswalk_forward_preds.parquetunder the production fold._postprocess_forecast(experiment, results)— single-production-fold postprocess; writesresults.postprocessed_national_path._deliver_forecast(experiment, results)— writes ADM0/ADM1/ADM2 CSVs atresults.delivery_dir/Treefera_{key}_{ADM}_Forecast_{init_date}.csv.
Isolation from canonical hindcast artefacts¶
DESIGN.md:125 mandates: "The forecast pipeline SHALL NOT write to canonical hindcast artefacts under {features_dir}/{experiment_key}/."
PR #369 restructured forecast paths from forecast/{init_date}/ to forecast/{season_year}/{init_date}/ so that multiple season_year values at the same init_date coexist without collision. The single source of truth is ForecastSlice.root (lib/results/results_slice.py). All forecast outputs land exclusively under run_dir/forecast/{season_year}/{init_date}/.
Multi-season_year support (PR #369)¶
PR #369 (commit f5399b96) enabled forecasting multiple season_year values from one init_date (e.g. YYYY+1 … YYYY+5). Path restructure forecast/{init_date}/ → forecast/{season_year}/{init_date}/ fixed the collision: previously the skip-if-exists guard reused the first season's artefacts for subsequent seasons, causing _load_prediction_slice to raise "No rows in pred.parquet for season_year=N". Touched: stages/run_forecast.py, stages/run_predict.py, run/preflight.py, lib/results/results_slice.py, lib/results/run_result.py.
run_hindcast.py¶
Purpose: Hindcast orchestrator — walk-forward CV + production fit + postprocess + evaluate + deliver. Top-level entrypoint for the full hindcast pipeline.
Public surface:
- run(config_path) -> AnyPath — full hindcast
- fit_production(config_path) -> AnyPath — production fit only (fast path for forecast without OOS CIs)
Both run and fit_production are top-level orchestrator entry points exposed via the CLI. run is the standard full-pipeline path (walk-forward CV + production fit + postprocess + evaluate + deliver). fit_production is the fast path used when you want a production model quickly for cli run forecast without needing out-of-sample confidence intervals — it skips the walk-forward fold loop, postprocess, evaluate, and deliver stages. Both return the run_root path so the caller (or Makefile) can chain subsequent commands against the same run directory.
run() — full hindcast¶
run_hindcast.py:193
Inputs consumed: {features_dir}/{experiment_key}/fit.parquet and pred.parquet (features must be pre-built by cli run features).
Outputs under {run_dir_base}/{YYYYMMDD_HHMMSS}_{experiment_key}/:
- config_resolved.yaml, models/{experiment_key}/{fold_label}/ (detrender + fill values + regressor)
- preds/{experiment_key}/{fold_label}/walk_forward_preds.parquet, preds/{experiment_key}/train_preds.parquet
- conformal/{mode}.parquet, postprocessed/national.parquet
- reports/ (metrics + plots), delivery/Treefera_*.csv
Inner call sequence (run_hindcast.py:200–226):
_prepare_config(config_path)— resolve config, setCOMMODITY_HINDCAST_CONFIG.run_preflight(preflight_paths_for_hindcast(config))— validate required input paths._create_run_root(config)— create timestamped directory, rewritemodels_dir/preds_dir.prepare_hindcast_mlflow(...)/hindcast_mlflow_run(...)— MLflow tracking context._load_and_preprocess(config)— load fit/pred parquets, callpreprocess_data, applyselect_by_productioncounty cut._persist_included(fit_data, run_root)— write included county frozenset to disk._run_walk_forward_phase(config, fit_data, pred_data, references_by_harvest):ExpandingFoldGeneratorgenerates train/test splits perexperiment_protocol.run_walk_forward(config, data_fold_generator)iterates folds, callingrun_experimentper fold (which callstrain→predict_kernel→ accumulates preds →write_walk_forward_outputs).- Consolidates per-fold
train_preds.parquetinto a singlepreds/{key}/train_preds.parquet. _run_production_fit_phase(config, fit_data)—run_experiment(fold_label="production", ...)trains on all available data.postprocess_experiment(run_root, included_geo_identifiers=included)— META-MODELS stage.evaluate_experiment(run_root)— DIAGNOSTICS stage.deliver_experiment(run_root)— DELIVER stage.- Returns
run_root.
fit_production() — fast path¶
run_hindcast.py:229
Runs only _load_and_preprocess(config, load_references=False) + _run_production_fit_phase. No walk-forward CV, no postprocess, no evaluate, no deliver. Produces the production model needed by cli run forecast when out-of-sample CIs are not required. Returns run_root.
Walk-forward CV detail¶
_run_walk_forward_phase calls run_walk_forward (run/runner.py) with an ExpandingFoldGenerator. Each fold calls run_experiment(fold_label=str(season_year), ...) — NOT routing through run_predict.run_predict — accumulating predictions across all hindcast_init_season_doys in memory before one write_walk_forward_outputs call per fold.
run_diagnostics.py¶
Purpose: Evaluation entry point — compute metrics + optionally generate plots.
Functions:
def evaluate_experiment(
run_dir: Path | AnyPath | str,
*,
skip_plots: bool = False,
) -> list[dict[str, float]]:
run_diagnostics.py:12
def enrich_fold_metrics_for_reporting(
metrics_data: list[dict],
fold_reporting_contexts: list[dict],
) -> None:
run_diagnostics.py:51
Inputs consumed: Completed FIT + POSTPROCESS run under run_dir.
Outputs written (when skip_plots=False):
- run_dir/reports/ — metrics artefacts (write_metrics_artefacts)
- run_dir/reports/hindcast/ — per-fold diagnostic PNGs (generate_plots)
Inner call sequence: compute_metrics(run_dir) (always) → if not skip_plots: write_metrics_artefacts(run_dir) + generate_plots(run_dir). DESIGN.md:17 mandates per-fold PNGs under reports/hindcast/, experiment-level plots at reports/.
enrich_fold_metrics_for_reporting attaches rolling_forecast_data and ADM1/ADM2 tables in place; used by the Streamlit dashboard, not by the hindcast run.
Cross-references¶
- run/runner.py —
run_walk_forward,ExpandingFoldGenerator - run/experiment_protocol.py —
run_experiment, fold logic - lib/results/run_result.py —
ExperimentResult.from_run_dir - lib/results/results_slice.py —
HindcastSlice,ForecastSlice - models/meta_models/conformalise.py —
CalibrationResult,ResidualMode delivery/—walk_forward_preds_to_delivery_rows,HindcastDelivery- DESIGN.md — artefact contract (line 73), forecast isolation (line 125), four-stage decomposition (line 102)
Relationships¶
| Module | Role | Calls |
|---|---|---|
run_features.py |
Atomic preprocessing | add_state_name |
run_fit.py |
Atomic FIT stage | train() |
run_predict.py |
Atomic PREDICT stage | predict(), write_walk_forward_outputs() |
run_meta_models.py |
Atomic POSTPROCESS stage | postprocess_experiment() |
run_deliver.py |
Atomic DELIVER stage | deliver_experiment() |
run_diagnostics.py |
Atomic EVALUATE stage | evaluate_experiment() |
run_hindcast.py |
Hindcast orchestrator | All six atomic stages + run_walk_forward |
run_forecast.py |
Forecast orchestrator | run_features, run_predict, _postprocess_forecast, _deliver_forecast |
run_hindcast and run_forecast are the only entry points exposed through cli.py. Atomic stage modules can also be called directly via CLI sub-commands for incremental re-runs.
Stage-flow Diagram¶
flowchart TD
FE["run_features\npreprocess_data()"]
FIT["run_fit\ntrain()"]
PRED["run_predict\npredict() +\nwrite_walk_forward_outputs()"]
META["run_meta_models\npostprocess_experiment()"]
DIAG["run_diagnostics\nevaluate_experiment()"]
DEL["run_deliver\ndeliver_experiment()"]
FE --> FIT
FIT --> PRED
PRED --> META
META --> DIAG
DIAG --> DEL
subgraph HINDCAST["run_hindcast.run() — hindcast orchestrator"]
direction TB
FE
FIT
PRED
META
DIAG
DEL
end
FE_F["run_forecast.run_features()\nmaterialise_forecast_indices\nbuild_features"]
PRED_F["run_forecast.run_predict()\n→ run_predict_stage()"]
META_F["_postprocess_forecast()"]
DEL_F["_deliver_forecast()"]
FE_F --> PRED_F
PRED_F --> META_F
META_F --> DEL_F
subgraph FORECAST["run_forecast.run() — forecast orchestrator"]
direction TB
FE_F
PRED_F
META_F
DEL_F
end
HINDCAST -. "run_dir on disk\n(read-only reference)" .-> FORECAST
The dashed edge denotes that the forecast orchestrator consumes artefacts from a completed hindcast run_dir (production model, conformal calibration sidecars) but never writes back to canonical hindcast paths.
```