Pipeline: Feature Build¶
Purpose¶
The feature build pipeline transforms raw input files (NASS yield parquets, daily weather zarrs, climatology zarrs, NDVI CSVs, stress parquets) into two canonical feature matrices per experiment: fit.parquet and pred.parquet. These are the sole inputs consumed by every downstream training stage. The pipeline is invoked by cli run features and delegates to features/run.build_features, which iterates builders in declaration order, then calls features/assemble.assemble to merge, split, and validate the final matrices. An in-module preprocess_data function (stages/run_features.py:10) acts as a secondary guard called after feature build by run_hindcast._load_and_preprocess — it is not part of the build itself.
Inputs¶
| Input | Path | Format | Producer |
|---|---|---|---|
| NASS yield parquet | {data_root}/{commodity.builders.yields.filepath} |
Parquet | External — NASS download |
| Weather indices zarr | {data_root}/{commodity.builders.weather.filepath} |
Zarr | External — weather pipeline |
| Climatology zarr | {data_root}/{commodity.builders.climo.filepath} |
Zarr | External — climo pipeline |
| NDVI county CSV | {data_root}/{commodity.builders.ndvi.filepath} |
CSV | External — NDVI source |
| Stress parquet (or zarr) | {data_root}/{commodity.builders.stress.filepath} or assembled from zarr |
Parquet | External or assembled in-stage |
ExperimentConfig |
In-memory | Pydantic model | _prepare_config() in cli.py:112 |
Outputs¶
| Output | Path | Format | Consumer |
|---|---|---|---|
| Per-builder intermediate parquets | {features_dir}/{experiment_key}/builders/{name}.parquet |
Parquet | assemble |
fit.parquet |
{features_dir}/{experiment_key}/fit.parquet |
Parquet | run_fit.train, run_hindcast._load_and_preprocess |
pred.parquet |
{features_dir}/{experiment_key}/pred.parquet |
Parquet | run_predict._load_prediction_inputs, run_forecast._impute_forecast_area |
metadata.json |
{features_dir}/{experiment_key}/metadata.json |
JSON | Dashboard, diagnostics |
Step-by-step flow¶
-
Preflight —
preflight_paths_for_features(config)checks allResolvablePathinputs exist; skips the stressfilepathwhenassemble_stress_from_indicesis set (preflight.py:110).run_preflight(checks)raisesSystemExiton first critical failure (preflight.py:42). -
Resolve output directory —
build_featuresresolvesoutput_dirfromcfg.features_dir / experiment_keywhen not supplied (features/run.py:56–59). Skip-if-exists: if bothfit.parquetandpred.parquetalready exist andforce=False, the entire pipeline is a no-op (run.py:65–71). -
Builder loop — iterate
cfg.commodity.builders.keys()in declaration order (run.py:81). For each builder name: a. Check ifbuilders/{name}.parquetalready exists; skip if so andforce=False(run.py:83–88). b. Callrun_builder(name, cfg, years)frombuilders/registry.py:39— looks up the function, resolves the filepath, calls the builder, validates output viavalidate_builder_output, returns apd.DataFrame. c. Write tobuilders/{name}.parquet(run.py:89–91). d.delthe DataFrame immediately to free memory (run.py:92). -
Assemble —
assemble(builder_paths, cfg, output_dir)(features/assemble.py:183): a. Splitbuilder_pathsintorequired_paths(inner-join; define the universe) andother_paths(left-join) based oncfg.commodity.builders[name].required_for_pred_parquet. b._merge_builders— inner-join required builders sequentially, then left-join others. Raises on column namespace collisions and on duplicate(year, geo_identifier, init_date)keys. Paths sorted alphabetically by stem for reproducibility (assemble.py:85–86). c._split_fit— the fit/pred split is based on harvest-date NaN filter. Rows pass the fit filter wheninit_date == harvest_date_by_year[year]andtarget_colis non-NaN (assemble.py:154–168). The.notna()clause is load-bearing: forecast-year rows whose init date happens to match the harvest date are excluded from fit because the target is NaN for years not yet harvested. d._nan_diagnostic— per-builder post-left-join diagnostic that counts all-NaN rows and affected counties; surfaces coverage gaps (assemble.py:19–47). e.validate_merged_columns— confirms every declaredfeature_colandauxiliary_colis present (interface.py:68–108). f. Writefit.parquet,pred.parquet,metadata.json(assemble.py:215–235). -
preprocess_data(post-build guard, called later) —stages/run_features.py:10is NOT called during feature build; it is called byrun_hindcast._load_and_preprocessafter loading the parquets. It addsstate_nameviaadd_state_name, drops all-NaN columns fromfit_data, and alignspred_datacolumns to match (run_features.py:13–48). Any dropped column that was a configured feature raisesValueErrorwith an actionable message.
Mermaid flow diagram¶
flowchart LR
NASS["yields builder\nbuilders/yields.py"]
WX["weather builder\nbuilders/weather.py"]
CLM["climo builder\nbuilders/climo.py"]
NDVI["ndvi builder\nbuilders/ndvi.py"]
STR["stress builder\nbuilders/stress.py"]
subgraph BUILDERS["Builder loop — features/run.py:81"]
NASS
WX
CLM
NDVI
STR
end
BPQS["builders/*.parquet\n(intermediate)"]
BUILDERS --> BPQS
subgraph ASSEMBLE["assemble — features/assemble.py:183"]
MERGE["_merge_builders\n(inner + left join)"]
SPLIT["_split_fit\n(harvest-date NaN filter)"]
VALIDATE["validate_merged_columns"]
WRITE["write fit.parquet\npred.parquet\nmetadata.json"]
MERGE --> SPLIT --> VALIDATE --> WRITE
end
BPQS --> ASSEMBLE
FIT["fit.parquet\n{features_dir}/{key}/"]
PRED["pred.parquet\n{features_dir}/{key}/"]
META["metadata.json"]
WRITE --> FIT
WRITE --> PRED
WRITE --> META
Invariants and contracts¶
DESIGN.md on fit.parquet column layout (verbatim):
"
fit.parquetcolumn layout: INDEX_COLS as leading columns, then target column, then feature columns. Index columns SHALL NEVER be separated into a distinct file. Downstream consumers SHALL usemetadata.jsonto slice columns."
The _split_fit NaN filter is load-bearing (from features/assemble.py:154–168 comment):
"The
.notna()clause is load-bearing: forecast-year rows at the harvest init date must be excluded from fit even when their init date matches."
DESIGN.md Clause 34 (artefact contract): FIT reads features_dir/{key}/fit.parquet — these parquets are the declared precondition for the hindcast stage.
Column name stability (DESIGN.md verbatim): "sim_yield_kg_ha in walk_forward_preds.parquet is sim_yield_kg_ha in {experiment_key}_national.parquet … Renames at stage boundaries are forbidden."
Failure modes and recovery¶
| Symptom | Likely cause | Recovery |
|---|---|---|
SystemExit at preflight |
Missing raw input file | Check data_root config and verify raw data is downloaded |
ValueError: Namespace collision in _merge_builders |
Two builders emit columns with the same name | Rename columns in the offending builder config rename_map |
ValueError: preprocess_data dropped N configured feature column(s) |
A builder emits all-NaN for a feature at the fit init_date |
Fix builder or remove column from commodity.feature_cols in YAML |
ValueError: No rows in pred.parquet for season_year=N (at predict time) |
Year not covered in feature_end_year |
Raise feature_end_year in config and re-run features |
| Stale builder parquet reused after source data update | force=False skip-if-exists fired |
Re-run with make features-force or cli run features --force |
Cross-references¶
- ExperimentConfig —
commodity.builders,features_dir,feature_cols - SeasonWindow — window definitions consumed by
build_windowed_features - Concept: walk-forward CV —
fit.parquet/pred.parquetare the inputs to every fold - Pipeline: fit — downstream consumer of
fit.parquet - Pipeline: predict — downstream consumer of
pred.parquet - Source: features — full builder subsystem summary
- Source: stages —
run_features.pyrole within the stage DAG - Source: DESIGN.md — Clauses 31, 34
PRs that materially changed this stage¶
- PR #369 (
f5399b96) — addedsynthesise_long_range_climo_for_unseen_yearsandsynthesise_long_range_stress_for_unseen_yearsstubs for multi-year forecast support; these run beforebuild_featuresin the forecast path only, not in the hindcast feature build.