Pipeline: Fit¶
Purpose¶
The fit stage trains the full modelling stack for one walk-forward CV fold (or the production fold). It is the sole place in the pipeline that fits model artefacts. The entry point is train(train_data, fold_label, config) in stages/run_fit.py:28. The function deliberately produces no in-memory state for callers — every artefact is persisted to disk under {models_dir}/{experiment_key}/{fold_label}/ so that downstream stages re-load from disk via HindcastSlice (DESIGN.md: "run_dir is the sole cross-stage hand-off contract"). It returns only a HindcastSlice handle pointing at those artefacts.
Inputs¶
| Input | Path | Format | Producer |
|---|---|---|---|
train_data DataFrame |
In-memory — passed by run_experiment or _run_production_fit_phase |
pandas DataFrame | run_hindcast._load_and_preprocess + fold slice from ExpandingFoldGenerator |
fold_label |
In-memory string ("production" or str(season_year)) |
str | run_hindcast._run_walk_forward_phase / _run_production_fit_phase |
ExperimentConfig |
In-memory (resolved) | Pydantic model | _prepare_config() in cli.py:112 |
Outputs¶
| Output | Path | Format | Consumer |
|---|---|---|---|
detrender.pkl |
{models_dir}/{experiment_key}/{fold_label}/detrender.pkl |
Pickle (joblib) | fold.load_detrender(config) in run_predict._load_prediction_inputs |
feature_fill_values.parquet |
{models_dir}/{experiment_key}/{fold_label}/feature_fill_values.parquet |
Parquet (index as explicit column) | fold.load_feature_fill_values() in run_predict._load_prediction_inputs |
| Regressor file(s) | {models_dir}/{experiment_key}/{fold_label}/ — ridge_model.pkl, pca_ridge_model.pkl, or model.json + metadata.pkl |
joblib / XGBoost JSON | fold.load_model() in run_predict._load_prediction_inputs |
HindcastSlice handle |
In-memory return value | Python object | run_experiment / _run_production_fit_phase for immediate downstream chaining |
Step-by-step flow¶
-
Initialise artefacts —
config.build_detrender()(factory inmodels/detrend/build.py:54) instantiates the configured detrender class (linear_state,gaussian_state, orpartial_pooling).config.build_fit_aggregation_policy()andconfig.regression_feature_columns()are resolved once (run_fit.py:45–47). -
Detrend —
fit_transform—detrender.fit_transform(train_data)fits the trend model on the training set and returns a copy withtarget_detrended_coladded (run_fit.py:55). This is ALWAYS called before the regressor — the regressor sees only detrended yield values.fit_transformmust precedetransform/inverse_transform; calling them beforefit_transformraisesRuntimeError(guarded byAbstractDetrend.is_fitted). -
Save detrender — serialise to
{models_dir}/{experiment_key}/{fold_label}/detrender.pklviaAbstractDetrend.save(path)(joblib +vfs.local_contextfor S3 safety) (run_fit.py:62–65). -
Fit
MedianImputer—MedianImputer().fit(train_data_detrended_full, regression_feature_columns)computes per-feature training medians (run_fit.py:70–71). -
Persist fill values — serialise
fill_valuesasfeature_fill_values.parquetwith the index materialised as an explicit"feature"column (run_fit.py:74–86; DESIGN.md Clause 30):series.to_frame("fill_value").rename_axis("feature").reset_index(). This is the canonical write pattern required by Clause 30 after a silent regression was caused by aRangeIndexround-trip. -
Transform training data —
feature_imputer.transform(train_data_detrended_full, regression_feature_columns)fills NaN features with training medians (run_fit.py:87–90). -
Drop NaN rows —
train_data_detrended.dropna(subset=fit_drop_cols)removes rows whose mandatory columns are still NaN after imputation (run_fit.py:92–96). -
Aggregation policy —
fit_aggregation_policy.fit_transform(train_data_detrended, ...)applies the configured aggregation (e.g. county → state) to the training data, returningtrain_fit_data(run_fit.py:103–108). -
Fit regressor —
config.build_regressor()instantiatesRidgeRegressor,PcaRidgeRegressor, orXGBRegressor;model.fit(X_train_for_fit, y_train, sample_weight=sample_weights)trains the estimator (run_fit.py:139–141). For XGBoost,val_split_column(default"year") is appended to the feature frame to enable last-year validation split (run_fit.py:119–128). -
Save regressor —
model.save_model({models_dir}/{experiment_key}/{fold_label})writes the regressor-specific artefact(s) (run_fit.py:145). Ridge:ridge_model.pkl; PCA-Ridge:pca_ridge_model.pkl; XGBoost:model.json+metadata.pkl. -
Return
HindcastSlice—HindcastSlice.from_config(config, fold_label)builds a frozen path-handle pointing at the just-written artefacts (run_fit.py:149). This is the load-side pair used byfold.load_detrender(config)at predict time.
Mermaid flow diagram¶
flowchart LR
TD["train_data\n(in-memory DataFrame)"]
BD["build_detrender(config)\nmodels/detrend/build.py:54"]
FT["detrender.fit_transform(train_data)\nrun_fit.py:55"]
SD["detrender.save(detrender.pkl)\nrun_fit.py:65"]
MI["MedianImputer.fit(...)\nrun_fit.py:70"]
WF["write feature_fill_values.parquet\n(index as explicit column)\nrun_fit.py:78"]
TR["feature_imputer.transform(...)\nrun_fit.py:87"]
AP["fit_aggregation_policy.fit_transform(...)\nrun_fit.py:103"]
BR["build_regressor(config)\nmodels/regression/__init__.py:58"]
FIT["model.fit(X_train, y_train)\nrun_fit.py:141"]
SM["model.save_model(...)\nrun_fit.py:145"]
HS["HindcastSlice.from_config(...)\nrun_fit.py:149"]
TD --> BD --> FT --> SD
FT --> MI --> WF
MI --> TR --> AP --> BR --> FIT --> SM --> HS
Invariants and contracts¶
DESIGN.md Clause 34 (artefact contract, verbatim):
"FIT →
models/{experiment_key}/{label}/+train_preds.parquet."
Note: train_preds.parquet is written by run_experiment in run/experiment_protocol.py:22, not by train() itself. train() writes only the three model artefacts listed above.
DESIGN.md Clause 30 (index persistence, verbatim):
"WHEN persisting a pandas object whose index carries meaning … the system SHALL materialise that label as an explicit named column before writing and restore it with
set_index(...)on read. Relying on the writer to 'preserve the index' is forbidden."
Detrend pipeline contract (from sources/code/detrend.md):
"The regression pipeline calls
fit_transformon training data, serialises the fitted detrender todetrender.pkl, then callstransformon any held-out fold andinverse_transforminsidemodels/regression/runtime.py:176to retrend model predictions."
fit_transform is called BEFORE build_regressor — the regressor receives only trend-adjusted yield values and never sees raw trend-contaminated yields.
Failure modes and recovery¶
| Symptom | Likely cause | Recovery |
|---|---|---|
RuntimeError: detrender not fitted at predict time |
detrender.pkl written by an older code version before is_fitted guard was added |
Delete {fold}/detrender.pkl and re-run cli run hindcast for the fold |
KeyError or all-NaN in MedianImputer.transform at predict time |
feature_fill_values.parquet written with RangeIndex (pre-Clause-30 bug) |
Delete feature_fill_values.parquet and re-run the fit stage |
KeyError: Missing sample-weight column |
use_sample_weights=True but aggregation policy doesn't emit weight_column |
Set use_sample_weights=False in config or fix aggregation policy |
ValueError: fit data has NaN features after imputation (from assert_no_nan_features) |
All-NaN feature in fit_drop_cols subset survived imputation |
Trace back to feature_fill_values.parquet for all-NaN training column |
OperationalError from MLflow |
Two folds fitting the same commodity in parallel | Run same-commodity folds sequentially (known MLflow DB locking issue) |
Cross-references¶
- ExperimentConfig —
model.detrend,model.regression,models_dir,regression_feature_columns() - HindcastSlice — return value;
load_detrender,load_model,load_feature_fill_valuesare the load-side pairs - Concept: walk-forward CV —
run_experimentcallstrain()per fold - Pipeline: predict — downstream consumer of all three artefacts
- Source: detrend —
AbstractDetrendcontract, three concrete classes - Source: regression —
AbstractRegressionImplcontract, persistence formats - Source: stages —
run_fit.pyrole in the stage DAG - Source: DESIGN.md — Clauses 30, 34
PRs that materially changed this stage¶
- Clause 30 regression fix —
feature_fill_values.parquetnow writes the feature-name index as an explicit"feature"column (run_fit.py:74–86) after a silent no-op regression whereMedianImputer.transformreturned all-NaN becausefv.reindex(feature_cols)found aRangeIndex.