Skip to content

Source: DESIGN.md — Pipeline Contract

What it is

DESIGN.md is the canonical design authority for the commodity_hindcast pipeline (confirmed by CLAUDE.md: "Ensure compliance with DESIGN.md"). Every bullet is a single EARS-syntax requirement: WHEN … THE SYSTEM SHALL … or THE SYSTEM SHALL …. It records both the requirement and the rationale so future developers understand why a decision was made. There are 54 clause-bullets grouped into thematic sections. Note: the wiki's own EARS clause index table below lists 35 numbered clauses for the most structurally significant ones; the remaining 19 bullets cover coding style, internal data contracts, and area-imputation detail and are summarised in prose sections rather than numbered separately.

Section-by-section summary

Running the pipeline and config (clauses 1–10)

Clause 1 — Config loading and pydantic-settings:

"WHEN a script is invoked, the system SHALL load configuration via pydantic-settings (CLI > env > YAML > defaults) instead of Hydra/OmegaConf to avoid dependency on stagnant libraries." Output config paths (features_dir, models_dir, preds_dir, reports_dir) shall be directories only — scripts decide their own filenames. Input data paths shall also be directories, with filename resolution handled by the pipeline layer. Projects shall subclass ExperimentConfig to add domain-specific fields. Sensible groups of nested config objects shall be contained within nested classes.

Clause 2 — Config as pure data:

"WHEN defining configuration schemas, the system SHALL treat config as pure data: values, nested config dataclasses, validators only. Config classes SHALL NOT expose build_* factories, data-processing helpers, or I/O methods." Object construction lives in a dedicated build.py beside the built type. This keeps config YAML-round-trippable and prevents hidden compute/I/O in config.py.

Clause 3 — Cross-stage wiring via --run_dir:

"WHEN a script reads or writes data, it SHALL do so exclusively via config paths. Cross-stage wiring SHALL happen via the --run_dir flag." No filesystem globs are used; the Makefile captures RUN_DIR from stdout and passes it to subsequent stages.

Clause 4 — Plot output layout: Per-fold PNGs land under runs/{rundir}/reports/hindcast/ (prefixed with fold label, e.g. 2020_detrend_grid.png); cross-fold experiment plots stay at runs/{rundir}/reports/. MLflow artefact layout mirrors disk via mlflow.log_artifacts(run_dir / "reports", artifact_path="reports").

Clause 5 — Run directory creation:

"WHEN RunRunner.run() is called, the system SHALL resolve a run directory: if --run_dir is set, the existing directory is reused (resume mode); otherwise a new timestamped $INPUT_DATA_DIR/runs/YYYYMMDD_HHMMSS/ directory is created." The runner rewrites output paths into the run directory and creates all rewritten output directories. Config and metadata are written as stage-specific files (config_train.yaml, metadata_train.yaml) to prevent overwrite on resume.

Clause 6 — INPUT_DATA_DIR as sole data-root resolver (CRITICAL):

"WHEN resolving the experiment data root, the system SHALL require INPUT_DATA_DIR to be set in the environment as the single source of truth — user-facing YAML configs SHALL NOT carry a data_root field, and the system SHALL NOT fall back to a cwd-relative default." config.py require_input_data_dir() is the sole resolver, raising RuntimeError with an actionable message at config-load time when the env var is missing or empty. Every entry point (CLI, dashboard, eval shim) shall use this helper. The validation_alias = AliasChoices("INPUT_DATA_DIR", "data_root") is retained only so config_resolved.yaml round-trips on reload — it is run state, not a user-facing config source.

Clause 7 — MLflow tracking:

"WHEN an experiment runs, the system SHALL track it with MLflow (mlflow>=3, hard dependency). In create mode, a new MLflow run is started; in resume mode, the existing mlflow_run_id from metadata_<stage>.yaml is used to resume the same MLflow run." MLflow params are prefixed by stage name (train/random_seed) to avoid write-once collisions on resume. The template SHALL NOT hardcode a model flavour — users call mlflow.<flavour>.log_model() directly. Training scripts SHOULD call mlflow.autolog(log_models=False).

Clause 8 — Stage/RunRunner selection: Pure transforms (stages/run_features.py) or visualisation stages use Config directly without RunRunner. Stages involving training or inference use RunRunner for run isolation, MLflow tracking, and preflight checks.

Clause 9 — Linear DAG via Makefile (no orchestrators):

"WHEN running the pipeline, the system SHALL execute stages as a linear DAG via Makefile targets. Projects SHALL NOT introduce workflow orchestrators (Airflow, Prefect, etc.) inside the template — those are deployment concerns layered on top."

Clause 10 — Config discovery by walking up to pyproject.toml: _find_project_root() walks up to pyproject.toml so ExperimentConfig() resolves correctly from any subdirectory — scripts, notebooks, tests — without os.chdir hacks.

Clause 11 — Dataset lineage via SHA-256: runner.log_data_hash(path) records a SHA-256 hash of input data as an MLflow param — dataset versioning without DVC.

Clause 12 — Dual persistence (MLflow + local YAML):

"WHEN persisting experiment metadata, the system SHALL write both to MLflow … and to local YAML files … inside the run directory. The local files survive independently of MLflow — if the tracking server is migrated, corrupted, or unavailable, each run directory remains a self-describing, reproducible record."

Clause 13 — uv for dependency management:

"WHEN managing dependencies, the project SHALL use uv exclusively (uv sync, uv add, uv run) and ruff for linting/formatting."

Clause 14 — HPO with nested MLflow runs: HPO uses RunRunner with Optuna; each trial creates a nested child run. Writes best_params.yaml into run dir. HPO SHALL NOT be wired into make pipeline — it is a standalone exploratory step.

Code and style (clauses 15–26)

Clause 15 — DAG as df.pipe().pipe() chain:

"WHEN writing data science code the pipeline should be thought of as a DAG that can be chained together with df.pipe().pipe() … such that it can be read right->left."

Clause 16 — Marimo notebooks (not Jupyter):

"WHEN exploring data or results interactively, the system SHALL use marimo notebooks (notebooks/*.py) instead of Jupyter — marimo's reactive DAG execution eliminates hidden state, stores notebooks as pure .py files that git-diff cleanly."

Clause 17 — Calendar and datetime invariants: Three sub-invariants: (a) temporal position expressed as season_doy (integer, CAN exceed 366 for cross-year crops) — no crosses_year boolean; (b) all geographic identifiers normalised to lowercase ADM path ADM0:usa/ADM1:{state}/ADM2:{county} — no FIPS codes, no mixed case; (c) accumulation features reset at season start use prefix gstd (growing season to date), not ytd.

Clause 18 — Module interface contracts:

"THE SYSTEM SHALL define internal modules with a shared and consistent interface. Internal stages shall be defined as modules." Internal objects need clear interfaces in module_name/interface.py (e.g. features/builders/interface.py) and a registry in module_name/registry.py.

Clause 19 — Single-direction import DAG: Imports flow: config.py (+ pure utilities) → cross-cutting helpers (lib/) → aggregate-root + slices (lib/results/) → domain services (features/, models/, delivery/, diagnostics/) → execution-frame (run/) → stage orchestrators (stages/run_*.py) → entry points (cli.py, app/). A module shall only import from layers closer to the root, with one explicit exception: stages/ modules MAY compose sibling stages/ modules. Runtime cycles are forbidden.

Clause 20 — Registry-based dispatch:

"WHEN adding a new builder, detrender, regressor, or plot, the system SHALL register it in the module's registry.py rather than branching on a string." if kind == "ridge" / elif kind == "xgb" chains outside a registry are a code smell and SHALL be refactored into registry lookups.

Clause 21 — Single Responsibility Principle: When abstractions are similar they should be unified — one location per concept. But atomic separation of concerns: if the underlying object or process is unique it should have its own instantiation.

Clause 22 — Single canonical implementation per concept:

"WHEN a concept has a single canonical implementation, that implementation SHALL live in exactly one file: unit conversion → lib/unit_utils.py; area-weighted aggregation → lib/geo/aggregation.py; geo identifier normalisation → lib/geo/identifiers.py; missing-value policy → lib/edit_and_imputation/imputation.py. Duplicating any of these helpers inline at a call site is forbidden — if the existing helper doesn't fit, change the helper, don't fork it."

Clause 23 — No business logic in __init__.py. Clause 24 — Imports at top of files unless preventing circular imports. Clause 25 — Precise naming in loops (for builder in builders: not for X in builders:). Clause 26 — Explicit xarray/pandas/polars getters:

"WHEN working with xarray / pandas / polars THE SYSTEM SHALL always use explicit getters for dimension names. ds["time"] instead of ds.time." Ensures syntax highlighting differentiates columns from methods.

Clause 27 — S3-safe paths via cloudpathlib.AnyPath:

"WHEN handling file or directory paths that may originate from S3, the system SHALL represent them via cloudpathlib.AnyPath." Functions that accept such paths shall type-hint as Path | AnyPath. The system SHALL NOT wrap an S3Path in pathlib.Path(...) — this invokes os.fspath() which collapses the URI to a local-cache path and silently drops S3 semantics. This was the root cause of the QA failure in PR #345 (tl/fix-path-issues).

Clause 28 — AnyPathParam Click parameter type:

"WHEN exposing a path argument via Click, the system SHALL use the AnyPathParam ParamType defined at the top of cli.py rather than click.Path(...). Click's built-in Path validator uses os.path.exists, which only understands the local filesystem and rejects s3:// URIs." This enables cli predict s3://.../<run_dir> to short-circuit a ~50-minute ECS round-trip.

Clause 29 — str(path) for polars/pandas readers:

"WHEN passing a path to polars.scan_parquet, polars.read_parquet, or pandas.read_parquet, the system SHALL convert via str(path)." Polars/pandas reject CloudPath objects with a TypeError. The canonical pattern is pl.scan_parquet(str(path)).

Clause 30 — Index-as-explicit-column on persist:

"WHEN persisting a pandas object whose index carries meaning … the system SHALL materialise that label as an explicit named column before writing and restore it with set_index(...) on read." Relying on the writer to "preserve the index" is forbidden. Root cause of a silent regression where feature_fill_values.parquet round-tripped with a RangeIndex, making fv.reindex(feature_cols) return all-NaN and turning MedianImputer.transform into a silent no-op. The canonical write pattern is series.to_frame(value_col).rename_axis(label_col).reset_index().

Clause 31 — Preflight derives check set from _iter_resolvable_fields:

"WHEN a config field is typed ResolvablePath, the system SHALL assert the resolved target exists via check_path_exists in a per-stage preflight_paths_for_<stage> invoked before the consuming stage executes." The check set must be derived from _iter_resolvable_fields(config), not hand-maintained, "so that adding a new ResolvablePath field extends preflight coverage by construction."

Clause 32 — f-strings everywhere: f"{object}" — no positional {} args, no .format(). Clause 33 — No magic numbers: all constants in a clear, central location with descriptive names.

Internal data contracts (clauses 34–35 and sub-clauses)

Clause 34 — Stage artefact contract:

"WHEN creating new predictions THE SYSTEM SHALL adhere to the following contract: FIT → models/{experiment_key}/{label}/ + train_preds.parquet. PREDICT → preds/{experiment_key}/{label}/walk_forward_preds.parquet. POSTPROCESS reads every fold → postprocessed/{experiment_key}_national.parquet. DELIVER reads walk_forward_preds + postprocessed → single CSV per ADM level."

Features section

Clause — CommodityConfig as single source of truth for commodity constants:

"WHEN building features for a commodity, the system SHALL follow one code path parameterised by a CommodityConfig — the single source of truth for all commodity-specific constants." Builders SHALL NOT hardcode domain constants; commodity differences expressed as different CommodityConfig instances in a registry dict, not as if commodity == "corn": branches.

Clause — fit.parquet column layout: INDEX_COLS as leading columns, then target column, then feature columns. Index columns SHALL NEVER be separated into a distinct file. Downstream consumers SHALL use metadata.json to slice columns.

Clause — Feature-cols decoupling: regression_params.feature_cols (per-regressor subset) is decoupled from commodity.feature_cols (feature-matrix universe). The feature factory produces the full matrix; the experiment protocol selects a subset for a particular experiment.

Predict section

Clause — Atomic predict operation (production forecast):

"WHEN running the forecast stage (cli run forecast) for a production prediction, THE SYSTEM SHALL invoke the atomic predict operation exactly once for the CLI-provided (--season-year, --init-date) pair and produce exactly one row per geography per aggregation level." The forecast stage SHALL NOT iterate across cfg.commodity.hindcast_init_season_doys; the feature build is the single point of truth for which init_dates a forecast run covers.

Clause — Walk-forward hindcast predict (per season_year): The model is fit once per season_year on harvest-time data then reused at every init_date in hindcast_init_season_doys with varying feature values. The model SHALL NOT be refit at each init_date — only the design matrix changes.

Clause — Confidence intervals from past errors: forecast.residual_mode controls which past errors to use; the validator checks existence before compute.

Experiment phase separation

Clause — run_dir as sole cross-stage contract:

"WHEN handing off state between experiment pipeline stages, the system SHALL use the run directory on disk as the sole contract — no in-memory objects, dicts, or callbacks SHALL cross a phase boundary." Each stage reads its inputs via ExperimentResult.from_run_dir(run_dir), a frozen dataclass that lazily discovers fold artefacts on disk.

Clause — Four discrete stages:

"WHEN executing the experiment pipeline, the system SHALL decompose it into four discrete, independently runnable stages — FIT (detrend + fit + predict + save artefacts to disk, zero plots, zero metrics), POSTPROCESS (aggregation + selection bias correction + conformal intervals, optional), EVALUATE (compute metrics then optionally generate plots), and DELIVER (client-facing CSVs from postprocessed results)." Each stage writes outputs atomically (temp + rename).

Plotting API

Clause — Two-layer plot separation: (a) pure plot functions with signature plot_fn(df: pd.DataFrame, *, explicit_kwargs) -> Figure | list[Figure] — they SHALL NOT access disk, config, or any I/O; (b) a PlotRunner owns all side effects. Plot functions grouped into PlotGroup instances sharing a prepare_data callable (data loaded once per group, not per plot).

Clause — READ-then-PLOT: Plot and reporting functions shall never call predict() except where perturbed inputs are genuinely needed (PDP). These are consumers, not producers.

Internal data contracts — unit system

Clause — kg/ha canonical unit system:

"WHEN handling yield, area, or production values internally, the system SHALL use kg/ha (yield), hectares (area), and kg (production) … throughout all pipeline stages." Unit conversions occur only at two system boundaries: (a) input boundary (NASS bu/ac → kg/ha by preprocessor), (b) output boundary (final metrics/plots converted from kg/ha to reporting unit). Intermediate DataFrames and parquet artefacts SHALL use explicit unit suffixes on column names (yield_kg_ha, area_harvested_ha, production_kg).

Clause — Column name stability:

"WHEN a column represents the same quantity across stages, the system SHALL use the same name at every stage. sim_yield_kg_ha in walk_forward_preds.parquet is sim_yield_kg_ha in {experiment_key}_national.parquet is sim_yield_kg_ha on the gen_metrics input. Renames at stage boundaries are forbidden."

Clause — Area-weighted aggregation: unweighted mean of yields is forbidden.

Forecast pipeline

Clause — Forecast pipeline isolation:

"WHEN the forecast pipeline runs, the system SHALL treat canonical hindcast artefacts as read-only reference data." External sources (raw_obs_filepath, materialised_climo_filepath) are consumed exclusively in feature-creation stages, baking outputs into run_dir/forecast/{init_date}/. All subsequent stages read only from that subdirectory. The forecast pipeline SHALL NOT write to canonical hindcast artefacts.

Area imputation

Clause — Three-layer defence for missing area: Three layers in order: (1) production_cumulative_threshold filters county universe; (2) declarative panel_null_impute edit rule fills remaining NaN area; (3) area_weighted_mean fails loud if any residual NaN slips through.

Clause — Declarative panel_null_impute (not bespoke matrix plumbing):

"WHEN filling missing area values in the NASS panel, the system SHALL do so declaratively through the panel_null_impute edit rule declared in the commodity YAML, NOT through bespoke matrix plumbing in the feature builder."

Clause — Trailing-median fill bounds:

"WHEN impute_missing_area returns, the system SHALL assert that every imputed value is bounded above by the geo's observed historical maximum. … a runtime violation is always a bug — the check raises ValueError naming the violating rows."

Clause — strictly_causal: true default on panel edit rules.

Clause — Year-1 lag via post-pivot shift:

"WHEN computing the year-1 lag of area_harvested_ha for the fit matrix, the system SHALL use a post-pivot groupby("geo_identifier")["area_harvested_ha"].shift(1).fillna(current) in yields.py."

Notable claims (the load-bearing ones)

  • INPUT_DATA_DIR env var is the sole data-root resolver — no YAML data_root field, no cwd-relative fallback, no silent default. Missing env var → RuntimeError at config-load time (Clause 6).
  • run_dir is the sole cross-stage hand-off contract — no in-memory objects cross phase boundaries (Clause 34).
  • Stage artefact paths are canonically specified: FIT writes models/{key}/{fold}/, PREDICT writes preds/{key}/{fold}/walk_forward_preds.parquet, POSTPROCESS writes postprocessed/{key}_national.parquet (Clause 34).
  • S3Path must never be wrapped in pathlib.Path() — this was the root cause of PR #345 QA failure (Clause 27).
  • _iter_resolvable_fields must be the source for preflight check sets — hand-maintained lists are forbidden (Clause 31).
  • feature_fill_values.parquet must write index as explicit column — a silent no-op regression already occurred from violating this (Clause 30).
  • Four stages are discrete and atomic — FIT has zero plots/metrics; EVALUATE is a read-only consumer (combined clause).
  • Unweighted mean of yields is forbidden — must weight by area.
  • Column names are stable across all stagessim_yield_kg_ha end-to-end.
  • gstd prefix (not ytd) for growing-season accumulation features.
  • Registry-based dispatch — no if kind == "ridge" chains outside a registry.
  • AnyPathParam for Click path arguments — not click.Path().
  • str(path) for polars/pandas readsCloudPath objects rejected with TypeError.
  • Marimo, not Jupyter for interactive exploration.

EARS clause index

For rapid lookup, all 35 clauses are catalogued below with their topic group and a one-line summary. The clause numbering follows the order of appearance in DESIGN.md.

# Group One-line summary
1 Config loading pydantic-settings (CLI > env > YAML); output paths are directories only; subclass ExperimentConfig
2 Config as pure data no build_*, no I/O on config; factories in build.py beside their types
3 Cross-stage wiring via --run_dir config paths only; Makefile captures RUN_DIR from stdout
4 Plot output layout per-fold PNGs under reports/hindcast/; experiment plots at reports/; MLflow mirrors disk
5 Run directory creation $INPUT_DATA_DIR/runs/YYYYMMDD_HHMMSS/ or reuse existing; runner rewrites output paths
6 INPUT_DATA_DIR sole resolver require env var; require_input_data_dir() is the ONLY caller; RuntimeError if missing
7 MLflow tracking mlflow>=3; resume via mlflow_run_id; stage-prefixed params; autolog(log_models=False)
8 Stage vs RunRunner selection pure transforms use Config directly; training/inference stages use RunRunner
9 Linear DAG via Makefile no Airflow/Prefect inside the template
10 Config discovery via pyproject.toml _find_project_root() walk; no os.chdir hacks
11 Dataset lineage via SHA-256 runner.log_data_hash(path) records hash as MLflow param
12 Dual persistence (MLflow + local YAML) run dirs survive MLflow migration or corruption
13 uv for dependency management uv sync, uv add, uv run; ruff for lint/format
14 HPO with nested MLflow runs Optuna + start_run(nested=True); best_params.yaml; HPO not in make pipeline
15 Pipeline as df.pipe().pipe() DAG composable transforms, read right-to-left
16 Marimo not Jupyter .py notebooks, reactive DAG, git-diffable
17 Calendar/datetime invariants season_doy (not calendar); ADM geo IDs; gstd prefix not ytd
18 Module interface contracts interface.py per module; registry.py per type
19 Single-direction import DAG 8 layers; stages may compose sibling stages; no runtime cycles
20 Registry-based dispatch no if kind == "ridge" chains; all dispatch through registry
21 Single Responsibility Principle unify similar abstractions; keep atomic separation for unique concerns
22 Single canonical implementation per concept unit_utils, aggregation, geo identifiers, imputation — one file each
23 No business logic in __init__.py for exports or empty only
24 Imports at top of files exception only for circular-import prevention
25 Precise naming in loops for builder in builders: not for X in builders:
26 Explicit xarray/pandas/polars getters ds["time"] not ds.time; column vs method disambiguation
27 S3-safe paths via AnyPath no Path(S3Path); was root cause of PR #345 QA failure
28 AnyPathParam Click type not click.Path(); allows cli predict s3://.../<run_dir>
29 str(path) for polars/pandas reads pl.scan_parquet(str(path)); CloudPath objects rejected
30 Index-as-explicit-column on persist series.to_frame().rename_axis().reset_index(); silent no-op regression already occurred
31 Preflight from _iter_resolvable_fields adding a ResolvablePath field auto-extends preflight
32 f-strings everywhere no positional {} args, no .format()
33 No magic numbers all constants in config or a constants module with descriptive names
34 Stage artefact contract FIT → models/; PREDICT → preds/walk_forward_preds.parquet; POSTPROCESS → postprocessed/; DELIVER → CSV
35 included_geo_identifiers frozenset[str]; required kwarg at every level; never optional, never falls back to test-fold geo

What this document is NOT

DESIGN.md does not describe the domain entity model (that is in DOMAIN_MODEL.md), the backlog (that is TODO.md), or builder internals. It does not specify column schemas for feature parquets — only that INDEX_COLS must be the leading columns.

Cross-references

  • README.md — pipeline operator guide; references DESIGN.md as canonical
  • CLAUDE.md — single-line enforcement: "Ensure compliance with DESIGN.md"
  • in_package_DOMAIN_MODEL.md — entity model, slice abstraction, invariants
  • TODO.md — open items that are cross-pipeline violations of DESIGN.md clauses
  • features_README.md — assemble protocol (implements Clause on fit.parquet column layout)