Feature Engineering Subsystem¶
Overview¶
The feature engineering subsystem transforms raw inputs (NASS yield parquets, daily weather zarrs, climatology zarrs, NDVI CSVs, stress parquets) into two canonical feature matrices per experiment:
fit.parquet— one row per(geo_identifier, year)at the end-of-harvestinit_date, with a non-NaN target label. Used for model training.pred.parquet— all(geo_identifier, year, init_date)combinations across every configured init date. Used for in-season walk-forward prediction.
Both files share the column order [INDEX_COLS, target_col, feature_cols, auxiliary_cols].
The execution model is:
build_features(orchestrator) iterates over configured builders, calls each viarun_builderfrom the registry, writes intermediatebuilders/<name>.parquet, then callsassemble.assemblemerges intermediate parquets, applies the fit split, validates column presence, and writesfit.parquet,pred.parquet, andmetadata.json.
Skip-if-exists semantics apply at both levels: existing builder parquets and assembled parquets are reused unless force=True is passed.
Modules¶
features/run.py¶
Purpose: Top-level orchestrator. Wires the builder loop with assemble.
Public surface:
def build_features(
cfg: ExperimentConfig,
output_dir: AnyPath | None = None,
*,
force: bool = False,
) -> None
Key functions:
build_features— resolvesoutput_dirfromcfg.features_dir / experiment_keywhen not supplied (run.py:56–59). Iteratescfg.commodity.builders.keys()in declaration order, callingrun_builderfor each and writingbuilders/<name>.parquet(run.py:81–91). Callsassemble(builder_paths, cfg, output_dir)at the end (run.py:96). Intermediate DataFrames are explicitlydel-ed to free memory (run.py:92)._log_output_tree— optionaltreeshell command for a directory summary; degrades gracefully iftreeis absent (run.py:17–30).
Notes:
- The
output_direscape hatch lets forecast mode direct a partial rebuild to a scratch location without clobbering canonical hindcast features (run.py:37–43). - Skip-if-exists at the assemble level checks for both
fit.parquetandpred.parquet(run.py:65–71). Per-builder skip checksbuilders/<name>.parquet(run.py:83–88).
features/assemble.py¶
Purpose: Reads builder parquets, merges them onto a common (year, geo_identifier, init_date) universe, splits into fit/pred, writes final output and a JSON metadata file.
Public surface:
Key functions:
assemble— splitsbuilder_pathsintorequired_paths(inner-joined, define universe) vsother_paths(left-joined) based oncfg.commodity.builders[name].required_for_pred_parquet(assemble.py:183–194). Writesfit.parquet,pred.parquet, andmetadata.json(assemble.py:215–235)._merge_builders— inner-joins required builders sequentially (anchor on first), then left-joins others. Raises on column namespace collisions and on duplicate index keys (assemble.py:69–151). Paths sorted alphabetically by stem for cross-run reproducibility (assemble.py:85–86)._split_fit— keeps rows whereinit_date == harvest_date_by_year[year]ANDtarget_colis non-NaN. The.notna()clause is load-bearing: forecast-year rows at the harvest init date must be excluded from fit even when their init date matches (assemble.py:154–168)._nan_diagnostic— per-builder post-left-join diagnostic that counts all-NaN rows and affected counties; surfaces county-level coverage gaps inside years that are nominally covered (assemble.py:19–47).
Notes:
- Column namespace uniqueness is enforced across builders: any overlap (excluding
INDEX_COLS) raisesValueError(assemble.py:100–106,assemble.py:124–130). - Metadata written to
metadata.jsonincludesfeature_cols,index_cols,target_col, row counts, county count, init-date count, and year range (assemble.py:221–234).
features/forecast_weather.py¶
Purpose: Materialises a forecast-time daily weather indices zarr for one (season_year, init_date) pair by splicing observed data up to init_date with climatology from init_date+1 to harvest_date.
Public surface:
Key functions:
materialise_forecast_indices— opens the raw obs zarr, slices to[season_start, init_date], callsmaterialise_for_forecastfor the full-season climo, then callsextend_with_climatologyto stitch them together (forecast_weather.py:43–105). Handles the future forecast case (zero obs days) by using pure climo mean (forecast_weather.py:79–85)._pack_indices_dataset— assembles xr.Dataset with root attributes matchingconus_adm2_corn.zarrschema so downstream weather/stress builders consume it unchanged (forecast_weather.py:108–128)._write_zarr— strips all variable encodings before writing to prevent Zarr v2 write errors, chunks to{dim: -1}(forecast_weather.py:131–137).
Notes:
- Hard-coded corn daily index spec:
(daily_gdd, gdd, thresh=10),(daily_edd, edd, thresh=29),(daily_precip, precip, {})(forecast_weather.py:29–33). Replicating to other commodities requires extending_CORN_DAILY_INDICES. - Output zarr path is
results.indices_zarr; schema is identical to the hindcast zarr so builder code runs unchanged.
features/forecast_long_range_stub.py¶
Purpose: STUB — pre-materialises builders/climo.parquet and builders/stress.parquet for season_year values not yet covered by the upstream zarr/parquet sources. Enables multi-year forecasting (PR #369) without requiring upstream data extensions.
Public surface:
def synthesise_long_range_climo_for_unseen_years(
cfg: ExperimentConfig,
results: ForecastSlice,
*,
method: PanelImputeMethod = "trailing_median",
) -> None
def synthesise_long_range_stress_for_unseen_years(
cfg: ExperimentConfig,
results: ForecastSlice,
*,
method: PanelImputeMethod = "trailing_median",
) -> None
Key functions:
synthesise_long_range_climo_for_unseen_years— no-op when the climo zarr already covers both calendar years of the season (forecast_long_range_stub.py:105). Otherwise builds a NaN-filled frame with one row pergeo_identifier, then imputes viaimpute_missing_panel_columns. Trailing methods read the canonical hindcastpred.parquet;"zero"skips that read (forecast_long_range_stub.py:67–214).synthesise_long_range_stress_for_unseen_years— no-op whensource_max_year + lag_years >= season_year(forecast_long_range_stub.py:254). Historical reference for trailing methods is the stress source parquet itself (after year+lag shift), notpred.parquet, becausepred.parquetdrops stress columns at assemble time (forecast_long_range_stub.py:236–239).
Notes:
methodoptions:"trailing_median"(default, per-county 3-year trailing median),"trailing_mean","zero"(forecast_long_range_stub.py:72–89).- Both functions are idempotent: if the target parquet already exists the function logs a warning and returns; delete the file to force regeneration (
forecast_long_range_stub.py:108–114,forecast_long_range_stub.py:257–261). - Call site is
stages/run_forecast.py::_build_forecast_features, invoked beforebuild_featuresso skip-if-exists semantics preserve the synthetic parquets (forecast_long_range_stub.py:31–34). - Removal criteria are documented in the module docstring: delete when upstream sources cover the forecast horizon AND no caller imports the module (
forecast_long_range_stub.py:22–29). - Backwards-compatible alias:
synthesise_zero_anomaly_climo_for_unseen_yearspoints at the climo function (forecast_long_range_stub.py:319).
builders/interface.py¶
Purpose: Defines the builder contract, shared constants, and validation helpers. Leaf module — no imports from concrete builders.
Public surface:
INDEX_COLS: Final[tuple[str, ...]] = ("year", "geo_identifier", "init_date")
class BuilderFn(Protocol):
def __call__(self, path: Path | CloudPath, cfg: ExperimentConfig, years: range, /) -> pd.DataFrame: ...
def validate_builder_output(df: pd.DataFrame, name: str) -> pd.DataFrame
def validate_merged_columns(merged: pd.DataFrame, *, expected_feature_cols, expected_auxiliary_cols) -> None
Key functions:
validate_builder_output— checksINDEX_COLSpresent, no duplicate column names, validgeo_identifierformat, no duplicate(year, geo_identifier, init_date)keys (interface.py:34–65).validate_merged_columns— checks every declaredfeature_colandauxiliary_colis present in the merged frame; hints at_x/_ysuffix collisions when auxiliaries are missing (interface.py:68–108).
Notes:
BuilderFnis@runtime_checkablesoisinstancechecks work at dispatch time (interface.py:24).INDEX_COLSis the single shared merge contract; any builder violating it fails loudly invalidate_builder_outputbefore reachingassemble.
builders/core.py¶
Purpose: The shared vectorised windowed-feature engine. Used by weather, climo, and ndvi builders — the only differences between builders are aggregation type (sum/mean/max) and column-naming convention.
Public surface:
def build_windowed_features(
season_arrays: dict[str, np.ndarray],
cfg: ExperimentConfig,
yr: int,
geoids: np.ndarray,
windows: tuple[SeasonWindow, ...],
lag_days: int,
agg: Literal["sum", "mean", "max"],
col_name: Callable[[str, str], str],
) -> pd.DataFrame
def expand_init_dates(df: pd.DataFrame, cfg: ExperimentConfig, years: range) -> pd.DataFrame
def pre_window_fill(agg: Literal["sum", "mean", "max"]) -> float
Key functions:
build_windowed_features— builds prefix arrays (cumsum, cumcount, cummax) once per variable per year, then computes all(init_date × window × variable)combinations with NumPy fancy indexing — no Python loop over init dates (core.py:77–161). Output shape is(n_geo × n_init, n_features + 3)._build_prefix_arrays— returns(cumsum_val, cumsum_cnt, cummax_val)for O(1) window queries (core.py:54–72).pre_window_fill—sumwindows return0.0before the window opens to prevent downstreamMedianImputerfilling with training median and producing a spurious jump at window-open;mean/maxstayNaN(core.py:22–34).expand_init_dates— cross-joins year-level builder output (one row per(geo, year)) against allinit_datesfor that year; used byyieldsandstressbuilders (core.py:37–51).
Notes:
freeze_cap_sdoyis applied insidebuild_windowed_featuresto clamp the progressive window once init passes the freeze date (core.py:109–124); this is a commodity-level config value.- The
lag_daysparameter shiftsinit_sdoyleft before computing effective window ends, representing the data-availability lag (core.py:107). - A
TODOcomment atcore.py:75flags that this function's logic needs a manual line-by-line sense-check.
builders/yields.py¶
Purpose: Builds NASS yield features — lagged yield signals and target column — from a NASS parquet.
Public surface:
Key functions:
build_yields— loads NASS via_load_nass, loops over years to compute yield features and area lookups, assembles a per-year DataFrame, then callsexpand_init_dates(yields.py:174–247)._load_nass— reads NASS parquet, optionally filters bycrop_type, applies declarative Fellegi-Holt edits (apply_edits), pivots into(n_geo, n_year)matrices for yield, area (current + lagged), and optional production (yields.py:63–162).dropna=Falsepreserves counties with NaN yield but valid production soselect_by_productionsees the full 2,815-county universe (yields.py:111–120)._compute_yield_features— returnsyield_last(rightmost non-NaN),yield_avg_3,yield_avg_5(trailing-k mean skipping gap years) (yields.py:165–171)._last_valid_per_row,_nanmean_last_k— vectorised helpers operating on the(n_geo, n_prior_years)pivot matrix (yields.py:27–60).
Notes:
area_colcarries the year-1 lagged harvested area; the lag is baked intoarea_lagged_matvia.shift(1, axis=1)post-pivot (yields.py:137).CURRENT_YEAR_area_harvested_hais also written but flagged as MUST NOT be used as a model feature (leaks harvest outcome) (yields.py:207–211).- A
TODOatyields.py:219notes that area forecasts are not yet incorporated; the builder currently relies on Y-1 estimates throughout.
builders/weather.py¶
Purpose: Builds windowed weather index features (cumulative GDD, EDD, precipitation) from a daily weather zarr.
Public surface:
def build_weather(
weather_indices_path: Path | CloudPath, cfg: ExperimentConfig, years: range
) -> pd.DataFrame
Key functions:
build_weather— readsWeatherBuilderconfig, opens zarr, loops over years, slices the season window via time-mask (handles cross-year seasons), and callsbuild_windowed_featureswithagg="sum"(weather.py:83–141). Column naming:{var.replace('daily_', '')}_{window.name}._build_season_array_from_daily_zarr— extracts the(n_geo, season_length)array for one variable/year, delegating padding topad_partial_season(weather.py:55–80).pad_partial_season— pads a truncated season array toexpected_lengthwith NaN, logging which sdoy ranges are missing (weather.py:27–53).
Notes:
- Heartbeat logs every 5 years and on the final year to show a long-running build is alive (
weather.py:112–113). - If any variable returns
None(year absent from zarr), the entire year is skipped rather than emitting a partial row (weather.py:119–124).
builders/climo.py¶
Purpose: Builds windowed climatology z-score features from a per-year z-score zarr.
Public surface:
def build_climo(
climo_indices_path: Path | CloudPath, cfg: ExperimentConfig, years: range
) -> pd.DataFrame
Key functions:
build_climo— readsClimoBuilderconfig, applies optional geo lookup table, loops over years, stitches cross-year season arrays via_build_season_array, callsbuild_windowed_featureswithagg="mean"(climo.py:60–131). Column naming:{var}_zscore_{window.name}._build_season_array— handles single-year (corn, soy) and cross-year (wheat) commodities by slicing one or two calendar-year arrays from the zarr and concatenating (climo.py:34–57). Caps the autumn slice at the real last DOY of the calendar year (365 or 366) via_last_doy_of_year(climo.py:53–54).
Notes:
- Years missing from the zarr are silently skipped (both calendar years of the season must be present) (
climo.py:105–107). - The geo lookup (
bcfg.geo_lookup_path) maps zarr geoids to canonicalgeo_identifierformat; resolved at config-load time (climo.py:75–83).
builders/ndvi.py¶
Purpose: Builds peak and cumulative NDVI features from a monthly county-level CSV by constructing a pseudo-daily season array.
Public surface:
def build_ndvi(
ndvi_county_path: Path | CloudPath, cfg: ExperimentConfig, years: range
) -> pd.DataFrame
Key functions:
build_ndvi— reads NDVI CSV, maps county + STATEFP togeo_identifierviandvi_geo_identifier, loops over years, builds mean and p75 season arrays, callsbuild_windowed_featurestwice (once withagg="max"for peak, once withagg="mean"for cumulative), merges results (ndvi.py:147–209)._build_season_array— repeats each month's NDVI value for every day in that month within the season window, producing a(n_geo, season_length_days)array (ndvi.py:97–144).ndvi_geo_identifier— maps NDVI CSV row to canonicalgeo_identifier; returnsNonefor unknown STATEFP values (ndvi.py:89–94).
Notes:
STATEFP_TO_STATElookup dict is acknowledged as a workaround for dirty NDVI data that formerly used state-level values; the comment flags it for removal (ndvi.py:32–33).- Window selection reuses the
gstdwindow fromclimo_windows(ndvi.py:183). - Both peak and cumulative features are produced with
lag_days=0— NDVI has no data-availability lag (ndvi.py:185–203).
builders/stress.py¶
Purpose: Applies lag-1 shift to a pre-computed stress parquet to produce stress features for model training. Optionally assembles the stress parquet from a daily indices zarr first.
Public surface:
def build_stress(
stress_path: Path | CloudPath, cfg: ExperimentConfig, years: range
) -> pd.DataFrame
Key functions:
build_stress— optionally calls_maybe_assemble_stress_parquet_from_indices, reads stress parquet, renames columns perbcfg.rename_map, shiftsyear_col + lag_yearsto produce lag-1 stress features, filters to requested years, then callsexpand_init_dates(stress.py:87–112)._maybe_assemble_stress_parquet_from_indices— assembles stress parquet from daily indices zarr whenbcfg.assemble_stress_from_indicesis configured and the parquet is missing, overwrite is set, orCOMMODITY_HINDCAST_FORCE_STRESS_ASSEMBLYenv var is truthy (stress.py:42–84)._should_assemble_stress_parquet— three-way guard: env var override, configoverwriteflag, or file-not-exists (stress.py:27–39).
Notes:
- The
stress_computeimport is deferred inside_maybe_assemble_stress_parquet_from_indicesto avoid circular import risk (stress.py:71–75).
builders/stress_compute.py¶
Purpose: Computes composite weather stress scores (QUBE-sprint parity) from daily indices zarr. Ported from QUBE-sprint/src/features/builders/stress_score.py.
Public surface:
def compute_commodity_stress(
commodity: str,
indices_zarr: Path | CloudPath,
gs_start_doy: int,
gs_end_doy: int,
baseline_start: int = 1980,
baseline_end: int = 2010,
) -> xr.Dataset # stress_score, z_GS_EDD, z_GS_GDD, z_GS_Ptotal
def write_stress_dataset_to_parquet(ds: xr.Dataset, parquet_path: Path | CloudPath) -> None
Key functions:
compute_commodity_stress— looks upCOMMODITY_STRESS_VARIABLESfor the commodity, builds a seasonal dataset from daily zarr via_build_seasonal_from_daily, computes z-scores and weighted average viacalculate_weather_stress_score, returns an xr.Dataset (stress_compute.py:221–253).calculate_weather_stress_score— z-scores each variable against the baseline period, clips atz_clip=3.0, weights and averages, scales to[0, 1]viascale_to_unit(factor=1.5)(stress_compute.py:114–150)._build_seasonal_from_daily— aggregates daily zarr to seasonal totals (sumover DOY window) perseason_year; handles cross-year growing seasons (stress_compute.py:160–218).write_stress_dataset_to_parquet— converts xr.Dataset to DataFrame, renamesgeoid→geo_idandseason_year→year, writes via sharedwrite_dataframe(stress_compute.py:256–263).
Notes:
COMMODITY_STRESS_VARIABLESmaps commodity name to[StressVariable]list. All three commodities (corn, wheat, soybeans) use the same three variables:GS_Ptotal(inverted),GS_GDD,GS_EDD(stress_compute.py:46–62).StressConfigis frozen dataclass withmin_std=1.0,z_clip=3.0,scaling_factor=1.5(stress_compute.py:26–32).
builders/registry.py¶
Purpose: Single point of builder discovery and dispatch. The only file that needs updating when a builder is added or removed.
Public surface:
BUILDER_REGISTRY: dict[str, BuilderFn] = {
"yields": build_yields,
"stress": build_stress,
"weather": build_weather,
"climo": build_climo,
"ndvi": build_ndvi,
}
def run_builder(name: str, cfg: ExperimentConfig, years: range) -> pd.DataFrame
Key functions:
run_builder— looks up builder by string key, resolvescfg.commodity.builders[name].filepath(already normalised at config-load time), calls the builder function, validates output viavalidate_builder_output, returns DataFrame (registry.py:39–47).
Notes:
- Registry keys are plain strings matching the YAML builder block keys in
cfg.commodity.builders. No discriminated-union typing at runtime — the type safety comes from theBuilderFnprotocol and config-layerBuilderTypeliterals. - Adding a builder requires: (1) implementing the
BuilderFnprotocol, (2) adding an entry toBUILDER_REGISTRY, (3) declaring the builder undercommodity.buildersin the relevant config YAML.
Cross-references¶
- ExperimentConfig — config object threaded through every builder and the assembler.
- SeasonWindow — window definition used by
build_windowed_features. - ForecastSlice — carrier for
(season_year, init_date, features_dir, indices_zarr)passed to forecast splicing and long-range stub. - source_config — YAML builder declarations,
required_for_pred_parquet, window definitions. - pipeline_features — stage-by-stage walkthrough of a feature build run (page not yet written).
Relationships to other subsystems¶
- Configuration (
config.py) —ExperimentConfigand typed builder configs (YieldsBuilder,WeatherBuilder,ClimoBuilder,NDVIBuilder,StressBuilder) are the only coupling point between the feature subsystem and the rest of the pipeline. - Stages (
stages/run_features.py) — callsbuild_featuresdirectly; the stage is a thin wrapper that resolves config and output directories. - Forecast stage (
stages/run_forecast.py) — callssynthesise_long_range_climo_for_unseen_yearsandsynthesise_long_range_stress_for_unseen_yearsfromforecast_long_range_stubbefore callingbuild_features. - Models (
regression/,meta_models/) — consumefit.parquetandpred.parquetproduced byassemble; they are unaware of how the feature matrix was built. - Shared utilities (
treefera_market_insights.shared.utils.dataframes) —read_dataframe/write_dataframeused throughout for parquet I/O;AnyPath/CloudPathpatterns ensure S3 and local paths work identically. tf_data_ml_utils—materialise_for_forecast,extend_with_climatology, andcomputeindex functions are called fromforecast_weather.pyfor obs+climo splicing.