Skip to content

Pipeline: Feature Build

Purpose

The feature build pipeline transforms raw input files (NASS yield parquets, daily weather zarrs, climatology zarrs, NDVI CSVs, stress parquets) into two canonical feature matrices per experiment: fit.parquet and pred.parquet. These are the sole inputs consumed by every downstream training stage. The pipeline is invoked by cli run features and delegates to features/run.build_features, which iterates builders in declaration order, then calls features/assemble.assemble to merge, split, and validate the final matrices. An in-module preprocess_data function (stages/run_features.py:10) acts as a secondary guard called after feature build by run_hindcast._load_and_preprocess — it is not part of the build itself.

Inputs

Input Path Format Producer
NASS yield parquet {data_root}/{commodity.builders.yields.filepath} Parquet External — NASS download
Weather indices zarr {data_root}/{commodity.builders.weather.filepath} Zarr External — weather pipeline
Climatology zarr {data_root}/{commodity.builders.climo.filepath} Zarr External — climo pipeline
NDVI county CSV {data_root}/{commodity.builders.ndvi.filepath} CSV External — NDVI source
Stress parquet (or zarr) {data_root}/{commodity.builders.stress.filepath} or assembled from zarr Parquet External or assembled in-stage
ExperimentConfig In-memory Pydantic model _prepare_config() in cli.py:112

Outputs

Output Path Format Consumer
Per-builder intermediate parquets {features_dir}/{experiment_key}/builders/{name}.parquet Parquet assemble
fit.parquet {features_dir}/{experiment_key}/fit.parquet Parquet run_fit.train, run_hindcast._load_and_preprocess
pred.parquet {features_dir}/{experiment_key}/pred.parquet Parquet run_predict._load_prediction_inputs, run_forecast._impute_forecast_area
metadata.json {features_dir}/{experiment_key}/metadata.json JSON Dashboard, diagnostics

Step-by-step flow

  1. Preflightpreflight_paths_for_features(config) checks all ResolvablePath inputs exist; skips the stress filepath when assemble_stress_from_indices is set (preflight.py:110). run_preflight(checks) raises SystemExit on first critical failure (preflight.py:42).

  2. Resolve output directorybuild_features resolves output_dir from cfg.features_dir / experiment_key when not supplied (features/run.py:56–59). Skip-if-exists: if both fit.parquet and pred.parquet already exist and force=False, the entire pipeline is a no-op (run.py:65–71).

  3. Builder loop — iterate cfg.commodity.builders.keys() in declaration order (run.py:81). For each builder name: a. Check if builders/{name}.parquet already exists; skip if so and force=False (run.py:83–88). b. Call run_builder(name, cfg, years) from builders/registry.py:39 — looks up the function, resolves the filepath, calls the builder, validates output via validate_builder_output, returns a pd.DataFrame. c. Write to builders/{name}.parquet (run.py:89–91). d. del the DataFrame immediately to free memory (run.py:92).

  4. Assembleassemble(builder_paths, cfg, output_dir) (features/assemble.py:183): a. Split builder_paths into required_paths (inner-join; define the universe) and other_paths (left-join) based on cfg.commodity.builders[name].required_for_pred_parquet. b. _merge_builders — inner-join required builders sequentially, then left-join others. Raises on column namespace collisions and on duplicate (year, geo_identifier, init_date) keys. Paths sorted alphabetically by stem for reproducibility (assemble.py:85–86). c. _split_fit — the fit/pred split is based on harvest-date NaN filter. Rows pass the fit filter when init_date == harvest_date_by_year[year] and target_col is non-NaN (assemble.py:154–168). The .notna() clause is load-bearing: forecast-year rows whose init date happens to match the harvest date are excluded from fit because the target is NaN for years not yet harvested. d. _nan_diagnostic — per-builder post-left-join diagnostic that counts all-NaN rows and affected counties; surfaces coverage gaps (assemble.py:19–47). e. validate_merged_columns — confirms every declared feature_col and auxiliary_col is present (interface.py:68–108). f. Write fit.parquet, pred.parquet, metadata.json (assemble.py:215–235).

  5. preprocess_data (post-build guard, called later)stages/run_features.py:10 is NOT called during feature build; it is called by run_hindcast._load_and_preprocess after loading the parquets. It adds state_name via add_state_name, drops all-NaN columns from fit_data, and aligns pred_data columns to match (run_features.py:13–48). Any dropped column that was a configured feature raises ValueError with an actionable message.

Mermaid flow diagram

flowchart LR
    NASS["yields builder\nbuilders/yields.py"]
    WX["weather builder\nbuilders/weather.py"]
    CLM["climo builder\nbuilders/climo.py"]
    NDVI["ndvi builder\nbuilders/ndvi.py"]
    STR["stress builder\nbuilders/stress.py"]

    subgraph BUILDERS["Builder loop — features/run.py:81"]
        NASS
        WX
        CLM
        NDVI
        STR
    end

    BPQS["builders/*.parquet\n(intermediate)"]
    BUILDERS --> BPQS

    subgraph ASSEMBLE["assemble — features/assemble.py:183"]
        MERGE["_merge_builders\n(inner + left join)"]
        SPLIT["_split_fit\n(harvest-date NaN filter)"]
        VALIDATE["validate_merged_columns"]
        WRITE["write fit.parquet\npred.parquet\nmetadata.json"]
        MERGE --> SPLIT --> VALIDATE --> WRITE
    end

    BPQS --> ASSEMBLE

    FIT["fit.parquet\n{features_dir}/{key}/"]
    PRED["pred.parquet\n{features_dir}/{key}/"]
    META["metadata.json"]

    WRITE --> FIT
    WRITE --> PRED
    WRITE --> META

Invariants and contracts

DESIGN.md on fit.parquet column layout (verbatim):

"fit.parquet column layout: INDEX_COLS as leading columns, then target column, then feature columns. Index columns SHALL NEVER be separated into a distinct file. Downstream consumers SHALL use metadata.json to slice columns."

The _split_fit NaN filter is load-bearing (from features/assemble.py:154–168 comment):

"The .notna() clause is load-bearing: forecast-year rows at the harvest init date must be excluded from fit even when their init date matches."

DESIGN.md Clause 34 (artefact contract): FIT reads features_dir/{key}/fit.parquet — these parquets are the declared precondition for the hindcast stage.

Column name stability (DESIGN.md verbatim): "sim_yield_kg_ha in walk_forward_preds.parquet is sim_yield_kg_ha in {experiment_key}_national.parquet … Renames at stage boundaries are forbidden."

Failure modes and recovery

Symptom Likely cause Recovery
SystemExit at preflight Missing raw input file Check data_root config and verify raw data is downloaded
ValueError: Namespace collision in _merge_builders Two builders emit columns with the same name Rename columns in the offending builder config rename_map
ValueError: preprocess_data dropped N configured feature column(s) A builder emits all-NaN for a feature at the fit init_date Fix builder or remove column from commodity.feature_cols in YAML
ValueError: No rows in pred.parquet for season_year=N (at predict time) Year not covered in feature_end_year Raise feature_end_year in config and re-run features
Stale builder parquet reused after source data update force=False skip-if-exists fired Re-run with make features-force or cli run features --force

Cross-references

PRs that materially changed this stage

  • PR #369 (f5399b96) — added synthesise_long_range_climo_for_unseen_years and synthesise_long_range_stress_for_unseen_years stubs for multi-year forecast support; these run before build_features in the forecast path only, not in the hindcast feature build.