Skip to content

Feature Engineering Subsystem

Overview

The feature engineering subsystem transforms raw inputs (NASS yield parquets, daily weather zarrs, climatology zarrs, NDVI CSVs, stress parquets) into two canonical feature matrices per experiment:

  • fit.parquet — one row per (geo_identifier, year) at the end-of-harvest init_date, with a non-NaN target label. Used for model training.
  • pred.parquet — all (geo_identifier, year, init_date) combinations across every configured init date. Used for in-season walk-forward prediction.

Both files share the column order [INDEX_COLS, target_col, feature_cols, auxiliary_cols].

The execution model is:

  1. build_features (orchestrator) iterates over configured builders, calls each via run_builder from the registry, writes intermediate builders/<name>.parquet, then calls assemble.
  2. assemble merges intermediate parquets, applies the fit split, validates column presence, and writes fit.parquet, pred.parquet, and metadata.json.

Skip-if-exists semantics apply at both levels: existing builder parquets and assembled parquets are reused unless force=True is passed.

Modules

features/run.py

Purpose: Top-level orchestrator. Wires the builder loop with assemble.

Public surface:

def build_features(
    cfg: ExperimentConfig,
    output_dir: AnyPath | None = None,
    *,
    force: bool = False,
) -> None

Key functions:

  • build_features — resolves output_dir from cfg.features_dir / experiment_key when not supplied (run.py:56–59). Iterates cfg.commodity.builders.keys() in declaration order, calling run_builder for each and writing builders/<name>.parquet (run.py:81–91). Calls assemble(builder_paths, cfg, output_dir) at the end (run.py:96). Intermediate DataFrames are explicitly del-ed to free memory (run.py:92).
  • _log_output_tree — optional tree shell command for a directory summary; degrades gracefully if tree is absent (run.py:17–30).

Notes:

  • The output_dir escape hatch lets forecast mode direct a partial rebuild to a scratch location without clobbering canonical hindcast features (run.py:37–43).
  • Skip-if-exists at the assemble level checks for both fit.parquet and pred.parquet (run.py:65–71). Per-builder skip checks builders/<name>.parquet (run.py:83–88).

features/assemble.py

Purpose: Reads builder parquets, merges them onto a common (year, geo_identifier, init_date) universe, splits into fit/pred, writes final output and a JSON metadata file.

Public surface:

def assemble(
    builder_paths: list[Path],
    cfg: ExperimentConfig,
    output_dir: Path,
) -> None

Key functions:

  • assemble — splits builder_paths into required_paths (inner-joined, define universe) vs other_paths (left-joined) based on cfg.commodity.builders[name].required_for_pred_parquet (assemble.py:183–194). Writes fit.parquet, pred.parquet, and metadata.json (assemble.py:215–235).
  • _merge_builders — inner-joins required builders sequentially (anchor on first), then left-joins others. Raises on column namespace collisions and on duplicate index keys (assemble.py:69–151). Paths sorted alphabetically by stem for cross-run reproducibility (assemble.py:85–86).
  • _split_fit — keeps rows where init_date == harvest_date_by_year[year] AND target_col is non-NaN. The .notna() clause is load-bearing: forecast-year rows at the harvest init date must be excluded from fit even when their init date matches (assemble.py:154–168).
  • _nan_diagnostic — per-builder post-left-join diagnostic that counts all-NaN rows and affected counties; surfaces county-level coverage gaps inside years that are nominally covered (assemble.py:19–47).

Notes:

  • Column namespace uniqueness is enforced across builders: any overlap (excluding INDEX_COLS) raises ValueError (assemble.py:100–106, assemble.py:124–130).
  • Metadata written to metadata.json includes feature_cols, index_cols, target_col, row counts, county count, init-date count, and year range (assemble.py:221–234).

features/forecast_weather.py

Purpose: Materialises a forecast-time daily weather indices zarr for one (season_year, init_date) pair by splicing observed data up to init_date with climatology from init_date+1 to harvest_date.

Public surface:

def materialise_forecast_indices(
    config: ExperimentConfig,
    results: ForecastSlice,
) -> None

Key functions:

  • materialise_forecast_indices — opens the raw obs zarr, slices to [season_start, init_date], calls materialise_for_forecast for the full-season climo, then calls extend_with_climatology to stitch them together (forecast_weather.py:43–105). Handles the future forecast case (zero obs days) by using pure climo mean (forecast_weather.py:79–85).
  • _pack_indices_dataset — assembles xr.Dataset with root attributes matching conus_adm2_corn.zarr schema so downstream weather/stress builders consume it unchanged (forecast_weather.py:108–128).
  • _write_zarr — strips all variable encodings before writing to prevent Zarr v2 write errors, chunks to {dim: -1} (forecast_weather.py:131–137).

Notes:

  • Hard-coded corn daily index spec: (daily_gdd, gdd, thresh=10), (daily_edd, edd, thresh=29), (daily_precip, precip, {}) (forecast_weather.py:29–33). Replicating to other commodities requires extending _CORN_DAILY_INDICES.
  • Output zarr path is results.indices_zarr; schema is identical to the hindcast zarr so builder code runs unchanged.

features/forecast_long_range_stub.py

Purpose: STUB — pre-materialises builders/climo.parquet and builders/stress.parquet for season_year values not yet covered by the upstream zarr/parquet sources. Enables multi-year forecasting (PR #369) without requiring upstream data extensions.

Public surface:

def synthesise_long_range_climo_for_unseen_years(
    cfg: ExperimentConfig,
    results: ForecastSlice,
    *,
    method: PanelImputeMethod = "trailing_median",
) -> None

def synthesise_long_range_stress_for_unseen_years(
    cfg: ExperimentConfig,
    results: ForecastSlice,
    *,
    method: PanelImputeMethod = "trailing_median",
) -> None

Key functions:

  • synthesise_long_range_climo_for_unseen_years — no-op when the climo zarr already covers both calendar years of the season (forecast_long_range_stub.py:105). Otherwise builds a NaN-filled frame with one row per geo_identifier, then imputes via impute_missing_panel_columns. Trailing methods read the canonical hindcast pred.parquet; "zero" skips that read (forecast_long_range_stub.py:67–214).
  • synthesise_long_range_stress_for_unseen_years — no-op when source_max_year + lag_years >= season_year (forecast_long_range_stub.py:254). Historical reference for trailing methods is the stress source parquet itself (after year+lag shift), not pred.parquet, because pred.parquet drops stress columns at assemble time (forecast_long_range_stub.py:236–239).

Notes:

  • method options: "trailing_median" (default, per-county 3-year trailing median), "trailing_mean", "zero" (forecast_long_range_stub.py:72–89).
  • Both functions are idempotent: if the target parquet already exists the function logs a warning and returns; delete the file to force regeneration (forecast_long_range_stub.py:108–114, forecast_long_range_stub.py:257–261).
  • Call site is stages/run_forecast.py::_build_forecast_features, invoked before build_features so skip-if-exists semantics preserve the synthetic parquets (forecast_long_range_stub.py:31–34).
  • Removal criteria are documented in the module docstring: delete when upstream sources cover the forecast horizon AND no caller imports the module (forecast_long_range_stub.py:22–29).
  • Backwards-compatible alias: synthesise_zero_anomaly_climo_for_unseen_years points at the climo function (forecast_long_range_stub.py:319).

builders/interface.py

Purpose: Defines the builder contract, shared constants, and validation helpers. Leaf module — no imports from concrete builders.

Public surface:

INDEX_COLS: Final[tuple[str, ...]] = ("year", "geo_identifier", "init_date")

class BuilderFn(Protocol):
    def __call__(self, path: Path | CloudPath, cfg: ExperimentConfig, years: range, /) -> pd.DataFrame: ...

def validate_builder_output(df: pd.DataFrame, name: str) -> pd.DataFrame
def validate_merged_columns(merged: pd.DataFrame, *, expected_feature_cols, expected_auxiliary_cols) -> None

Key functions:

  • validate_builder_output — checks INDEX_COLS present, no duplicate column names, valid geo_identifier format, no duplicate (year, geo_identifier, init_date) keys (interface.py:34–65).
  • validate_merged_columns — checks every declared feature_col and auxiliary_col is present in the merged frame; hints at _x/_y suffix collisions when auxiliaries are missing (interface.py:68–108).

Notes:

  • BuilderFn is @runtime_checkable so isinstance checks work at dispatch time (interface.py:24).
  • INDEX_COLS is the single shared merge contract; any builder violating it fails loudly in validate_builder_output before reaching assemble.

builders/core.py

Purpose: The shared vectorised windowed-feature engine. Used by weather, climo, and ndvi builders — the only differences between builders are aggregation type (sum/mean/max) and column-naming convention.

Public surface:

def build_windowed_features(
    season_arrays: dict[str, np.ndarray],
    cfg: ExperimentConfig,
    yr: int,
    geoids: np.ndarray,
    windows: tuple[SeasonWindow, ...],
    lag_days: int,
    agg: Literal["sum", "mean", "max"],
    col_name: Callable[[str, str], str],
) -> pd.DataFrame

def expand_init_dates(df: pd.DataFrame, cfg: ExperimentConfig, years: range) -> pd.DataFrame
def pre_window_fill(agg: Literal["sum", "mean", "max"]) -> float

Key functions:

  • build_windowed_features — builds prefix arrays (cumsum, cumcount, cummax) once per variable per year, then computes all (init_date × window × variable) combinations with NumPy fancy indexing — no Python loop over init dates (core.py:77–161). Output shape is (n_geo × n_init, n_features + 3).
  • _build_prefix_arrays — returns (cumsum_val, cumsum_cnt, cummax_val) for O(1) window queries (core.py:54–72).
  • pre_window_fillsum windows return 0.0 before the window opens to prevent downstream MedianImputer filling with training median and producing a spurious jump at window-open; mean/max stay NaN (core.py:22–34).
  • expand_init_dates — cross-joins year-level builder output (one row per (geo, year)) against all init_dates for that year; used by yields and stress builders (core.py:37–51).

Notes:

  • freeze_cap_sdoy is applied inside build_windowed_features to clamp the progressive window once init passes the freeze date (core.py:109–124); this is a commodity-level config value.
  • The lag_days parameter shifts init_sdoy left before computing effective window ends, representing the data-availability lag (core.py:107).
  • A TODO comment at core.py:75 flags that this function's logic needs a manual line-by-line sense-check.

builders/yields.py

Purpose: Builds NASS yield features — lagged yield signals and target column — from a NASS parquet.

Public surface:

def build_yields(nass_path: Path | CloudPath, cfg: ExperimentConfig, years: range) -> pd.DataFrame

Key functions:

  • build_yields — loads NASS via _load_nass, loops over years to compute yield features and area lookups, assembles a per-year DataFrame, then calls expand_init_dates (yields.py:174–247).
  • _load_nass — reads NASS parquet, optionally filters by crop_type, applies declarative Fellegi-Holt edits (apply_edits), pivots into (n_geo, n_year) matrices for yield, area (current + lagged), and optional production (yields.py:63–162). dropna=False preserves counties with NaN yield but valid production so select_by_production sees the full 2,815-county universe (yields.py:111–120).
  • _compute_yield_features — returns yield_last (rightmost non-NaN), yield_avg_3, yield_avg_5 (trailing-k mean skipping gap years) (yields.py:165–171).
  • _last_valid_per_row, _nanmean_last_k — vectorised helpers operating on the (n_geo, n_prior_years) pivot matrix (yields.py:27–60).

Notes:

  • area_col carries the year-1 lagged harvested area; the lag is baked into area_lagged_mat via .shift(1, axis=1) post-pivot (yields.py:137). CURRENT_YEAR_area_harvested_ha is also written but flagged as MUST NOT be used as a model feature (leaks harvest outcome) (yields.py:207–211).
  • A TODO at yields.py:219 notes that area forecasts are not yet incorporated; the builder currently relies on Y-1 estimates throughout.

builders/weather.py

Purpose: Builds windowed weather index features (cumulative GDD, EDD, precipitation) from a daily weather zarr.

Public surface:

def build_weather(
    weather_indices_path: Path | CloudPath, cfg: ExperimentConfig, years: range
) -> pd.DataFrame

Key functions:

  • build_weather — reads WeatherBuilder config, opens zarr, loops over years, slices the season window via time-mask (handles cross-year seasons), and calls build_windowed_features with agg="sum" (weather.py:83–141). Column naming: {var.replace('daily_', '')}_{window.name}.
  • _build_season_array_from_daily_zarr — extracts the (n_geo, season_length) array for one variable/year, delegating padding to pad_partial_season (weather.py:55–80).
  • pad_partial_season — pads a truncated season array to expected_length with NaN, logging which sdoy ranges are missing (weather.py:27–53).

Notes:

  • Heartbeat logs every 5 years and on the final year to show a long-running build is alive (weather.py:112–113).
  • If any variable returns None (year absent from zarr), the entire year is skipped rather than emitting a partial row (weather.py:119–124).

builders/climo.py

Purpose: Builds windowed climatology z-score features from a per-year z-score zarr.

Public surface:

def build_climo(
    climo_indices_path: Path | CloudPath, cfg: ExperimentConfig, years: range
) -> pd.DataFrame

Key functions:

  • build_climo — reads ClimoBuilder config, applies optional geo lookup table, loops over years, stitches cross-year season arrays via _build_season_array, calls build_windowed_features with agg="mean" (climo.py:60–131). Column naming: {var}_zscore_{window.name}.
  • _build_season_array — handles single-year (corn, soy) and cross-year (wheat) commodities by slicing one or two calendar-year arrays from the zarr and concatenating (climo.py:34–57). Caps the autumn slice at the real last DOY of the calendar year (365 or 366) via _last_doy_of_year (climo.py:53–54).

Notes:

  • Years missing from the zarr are silently skipped (both calendar years of the season must be present) (climo.py:105–107).
  • The geo lookup (bcfg.geo_lookup_path) maps zarr geoids to canonical geo_identifier format; resolved at config-load time (climo.py:75–83).

builders/ndvi.py

Purpose: Builds peak and cumulative NDVI features from a monthly county-level CSV by constructing a pseudo-daily season array.

Public surface:

def build_ndvi(
    ndvi_county_path: Path | CloudPath, cfg: ExperimentConfig, years: range
) -> pd.DataFrame

Key functions:

  • build_ndvi — reads NDVI CSV, maps county + STATEFP to geo_identifier via ndvi_geo_identifier, loops over years, builds mean and p75 season arrays, calls build_windowed_features twice (once with agg="max" for peak, once with agg="mean" for cumulative), merges results (ndvi.py:147–209).
  • _build_season_array — repeats each month's NDVI value for every day in that month within the season window, producing a (n_geo, season_length_days) array (ndvi.py:97–144).
  • ndvi_geo_identifier — maps NDVI CSV row to canonical geo_identifier; returns None for unknown STATEFP values (ndvi.py:89–94).

Notes:

  • STATEFP_TO_STATE lookup dict is acknowledged as a workaround for dirty NDVI data that formerly used state-level values; the comment flags it for removal (ndvi.py:32–33).
  • Window selection reuses the gstd window from climo_windows (ndvi.py:183).
  • Both peak and cumulative features are produced with lag_days=0 — NDVI has no data-availability lag (ndvi.py:185–203).

builders/stress.py

Purpose: Applies lag-1 shift to a pre-computed stress parquet to produce stress features for model training. Optionally assembles the stress parquet from a daily indices zarr first.

Public surface:

def build_stress(
    stress_path: Path | CloudPath, cfg: ExperimentConfig, years: range
) -> pd.DataFrame

Key functions:

  • build_stress — optionally calls _maybe_assemble_stress_parquet_from_indices, reads stress parquet, renames columns per bcfg.rename_map, shifts year_col + lag_years to produce lag-1 stress features, filters to requested years, then calls expand_init_dates (stress.py:87–112).
  • _maybe_assemble_stress_parquet_from_indices — assembles stress parquet from daily indices zarr when bcfg.assemble_stress_from_indices is configured and the parquet is missing, overwrite is set, or COMMODITY_HINDCAST_FORCE_STRESS_ASSEMBLY env var is truthy (stress.py:42–84).
  • _should_assemble_stress_parquet — three-way guard: env var override, config overwrite flag, or file-not-exists (stress.py:27–39).

Notes:

  • The stress_compute import is deferred inside _maybe_assemble_stress_parquet_from_indices to avoid circular import risk (stress.py:71–75).

builders/stress_compute.py

Purpose: Computes composite weather stress scores (QUBE-sprint parity) from daily indices zarr. Ported from QUBE-sprint/src/features/builders/stress_score.py.

Public surface:

def compute_commodity_stress(
    commodity: str,
    indices_zarr: Path | CloudPath,
    gs_start_doy: int,
    gs_end_doy: int,
    baseline_start: int = 1980,
    baseline_end: int = 2010,
) -> xr.Dataset  # stress_score, z_GS_EDD, z_GS_GDD, z_GS_Ptotal

def write_stress_dataset_to_parquet(ds: xr.Dataset, parquet_path: Path | CloudPath) -> None

Key functions:

  • compute_commodity_stress — looks up COMMODITY_STRESS_VARIABLES for the commodity, builds a seasonal dataset from daily zarr via _build_seasonal_from_daily, computes z-scores and weighted average via calculate_weather_stress_score, returns an xr.Dataset (stress_compute.py:221–253).
  • calculate_weather_stress_score — z-scores each variable against the baseline period, clips at z_clip=3.0, weights and averages, scales to [0, 1] via scale_to_unit(factor=1.5) (stress_compute.py:114–150).
  • _build_seasonal_from_daily — aggregates daily zarr to seasonal totals (sum over DOY window) per season_year; handles cross-year growing seasons (stress_compute.py:160–218).
  • write_stress_dataset_to_parquet — converts xr.Dataset to DataFrame, renames geoidgeo_id and season_yearyear, writes via shared write_dataframe (stress_compute.py:256–263).

Notes:

  • COMMODITY_STRESS_VARIABLES maps commodity name to [StressVariable] list. All three commodities (corn, wheat, soybeans) use the same three variables: GS_Ptotal (inverted), GS_GDD, GS_EDD (stress_compute.py:46–62).
  • StressConfig is frozen dataclass with min_std=1.0, z_clip=3.0, scaling_factor=1.5 (stress_compute.py:26–32).

builders/registry.py

Purpose: Single point of builder discovery and dispatch. The only file that needs updating when a builder is added or removed.

Public surface:

BUILDER_REGISTRY: dict[str, BuilderFn] = {
    "yields": build_yields,
    "stress": build_stress,
    "weather": build_weather,
    "climo": build_climo,
    "ndvi": build_ndvi,
}

def run_builder(name: str, cfg: ExperimentConfig, years: range) -> pd.DataFrame

Key functions:

  • run_builder — looks up builder by string key, resolves cfg.commodity.builders[name].filepath (already normalised at config-load time), calls the builder function, validates output via validate_builder_output, returns DataFrame (registry.py:39–47).

Notes:

  • Registry keys are plain strings matching the YAML builder block keys in cfg.commodity.builders. No discriminated-union typing at runtime — the type safety comes from the BuilderFn protocol and config-layer BuilderType literals.
  • Adding a builder requires: (1) implementing the BuilderFn protocol, (2) adding an entry to BUILDER_REGISTRY, (3) declaring the builder under commodity.builders in the relevant config YAML.

Cross-references

  • ExperimentConfig — config object threaded through every builder and the assembler.
  • SeasonWindow — window definition used by build_windowed_features.
  • ForecastSlice — carrier for (season_year, init_date, features_dir, indices_zarr) passed to forecast splicing and long-range stub.
  • source_config — YAML builder declarations, required_for_pred_parquet, window definitions.
  • pipeline_features — stage-by-stage walkthrough of a feature build run (page not yet written).

Relationships to other subsystems

  • Configuration (config.py) — ExperimentConfig and typed builder configs (YieldsBuilder, WeatherBuilder, ClimoBuilder, NDVIBuilder, StressBuilder) are the only coupling point between the feature subsystem and the rest of the pipeline.
  • Stages (stages/run_features.py) — calls build_features directly; the stage is a thin wrapper that resolves config and output directories.
  • Forecast stage (stages/run_forecast.py) — calls synthesise_long_range_climo_for_unseen_years and synthesise_long_range_stress_for_unseen_years from forecast_long_range_stub before calling build_features.
  • Models (regression/, meta_models/) — consume fit.parquet and pred.parquet produced by assemble; they are unaware of how the feature matrix was built.
  • Shared utilities (treefera_market_insights.shared.utils.dataframes) — read_dataframe / write_dataframe used throughout for parquet I/O; AnyPath / CloudPath patterns ensure S3 and local paths work identically.
  • tf_data_ml_utilsmaterialise_for_forecast, extend_with_climatology, and compute index functions are called from forecast_weather.py for obs+climo splicing.