Skip to content

Pipeline: Fit

Purpose

The fit stage trains the full modelling stack for one walk-forward CV fold (or the production fold). It is the sole place in the pipeline that fits model artefacts. The entry point is train(train_data, fold_label, config) in stages/run_fit.py:28. The function deliberately produces no in-memory state for callers — every artefact is persisted to disk under {models_dir}/{experiment_key}/{fold_label}/ so that downstream stages re-load from disk via HindcastSlice (DESIGN.md: "run_dir is the sole cross-stage hand-off contract"). It returns only a HindcastSlice handle pointing at those artefacts.

Inputs

Input Path Format Producer
train_data DataFrame In-memory — passed by run_experiment or _run_production_fit_phase pandas DataFrame run_hindcast._load_and_preprocess + fold slice from ExpandingFoldGenerator
fold_label In-memory string ("production" or str(season_year)) str run_hindcast._run_walk_forward_phase / _run_production_fit_phase
ExperimentConfig In-memory (resolved) Pydantic model _prepare_config() in cli.py:112

Outputs

Output Path Format Consumer
detrender.pkl {models_dir}/{experiment_key}/{fold_label}/detrender.pkl Pickle (joblib) fold.load_detrender(config) in run_predict._load_prediction_inputs
feature_fill_values.parquet {models_dir}/{experiment_key}/{fold_label}/feature_fill_values.parquet Parquet (index as explicit column) fold.load_feature_fill_values() in run_predict._load_prediction_inputs
Regressor file(s) {models_dir}/{experiment_key}/{fold_label}/ridge_model.pkl, pca_ridge_model.pkl, or model.json + metadata.pkl joblib / XGBoost JSON fold.load_model() in run_predict._load_prediction_inputs
HindcastSlice handle In-memory return value Python object run_experiment / _run_production_fit_phase for immediate downstream chaining

Step-by-step flow

  1. Initialise artefactsconfig.build_detrender() (factory in models/detrend/build.py:54) instantiates the configured detrender class (linear_state, gaussian_state, or partial_pooling). config.build_fit_aggregation_policy() and config.regression_feature_columns() are resolved once (run_fit.py:45–47).

  2. Detrend — fit_transformdetrender.fit_transform(train_data) fits the trend model on the training set and returns a copy with target_detrended_col added (run_fit.py:55). This is ALWAYS called before the regressor — the regressor sees only detrended yield values. fit_transform must precede transform/inverse_transform; calling them before fit_transform raises RuntimeError (guarded by AbstractDetrend.is_fitted).

  3. Save detrender — serialise to {models_dir}/{experiment_key}/{fold_label}/detrender.pkl via AbstractDetrend.save(path) (joblib + vfs.local_context for S3 safety) (run_fit.py:62–65).

  4. Fit MedianImputerMedianImputer().fit(train_data_detrended_full, regression_feature_columns) computes per-feature training medians (run_fit.py:70–71).

  5. Persist fill values — serialise fill_values as feature_fill_values.parquet with the index materialised as an explicit "feature" column (run_fit.py:74–86; DESIGN.md Clause 30): series.to_frame("fill_value").rename_axis("feature").reset_index(). This is the canonical write pattern required by Clause 30 after a silent regression was caused by a RangeIndex round-trip.

  6. Transform training datafeature_imputer.transform(train_data_detrended_full, regression_feature_columns) fills NaN features with training medians (run_fit.py:87–90).

  7. Drop NaN rowstrain_data_detrended.dropna(subset=fit_drop_cols) removes rows whose mandatory columns are still NaN after imputation (run_fit.py:92–96).

  8. Aggregation policyfit_aggregation_policy.fit_transform(train_data_detrended, ...) applies the configured aggregation (e.g. county → state) to the training data, returning train_fit_data (run_fit.py:103–108).

  9. Fit regressorconfig.build_regressor() instantiates RidgeRegressor, PcaRidgeRegressor, or XGBRegressor; model.fit(X_train_for_fit, y_train, sample_weight=sample_weights) trains the estimator (run_fit.py:139–141). For XGBoost, val_split_column (default "year") is appended to the feature frame to enable last-year validation split (run_fit.py:119–128).

  10. Save regressormodel.save_model({models_dir}/{experiment_key}/{fold_label}) writes the regressor-specific artefact(s) (run_fit.py:145). Ridge: ridge_model.pkl; PCA-Ridge: pca_ridge_model.pkl; XGBoost: model.json + metadata.pkl.

  11. Return HindcastSliceHindcastSlice.from_config(config, fold_label) builds a frozen path-handle pointing at the just-written artefacts (run_fit.py:149). This is the load-side pair used by fold.load_detrender(config) at predict time.

Mermaid flow diagram

flowchart LR
    TD["train_data\n(in-memory DataFrame)"]
    BD["build_detrender(config)\nmodels/detrend/build.py:54"]
    FT["detrender.fit_transform(train_data)\nrun_fit.py:55"]
    SD["detrender.save(detrender.pkl)\nrun_fit.py:65"]
    MI["MedianImputer.fit(...)\nrun_fit.py:70"]
    WF["write feature_fill_values.parquet\n(index as explicit column)\nrun_fit.py:78"]
    TR["feature_imputer.transform(...)\nrun_fit.py:87"]
    AP["fit_aggregation_policy.fit_transform(...)\nrun_fit.py:103"]
    BR["build_regressor(config)\nmodels/regression/__init__.py:58"]
    FIT["model.fit(X_train, y_train)\nrun_fit.py:141"]
    SM["model.save_model(...)\nrun_fit.py:145"]
    HS["HindcastSlice.from_config(...)\nrun_fit.py:149"]

    TD --> BD --> FT --> SD
    FT --> MI --> WF
    MI --> TR --> AP --> BR --> FIT --> SM --> HS

Invariants and contracts

DESIGN.md Clause 34 (artefact contract, verbatim):

"FIT → models/{experiment_key}/{label}/ + train_preds.parquet."

Note: train_preds.parquet is written by run_experiment in run/experiment_protocol.py:22, not by train() itself. train() writes only the three model artefacts listed above.

DESIGN.md Clause 30 (index persistence, verbatim):

"WHEN persisting a pandas object whose index carries meaning … the system SHALL materialise that label as an explicit named column before writing and restore it with set_index(...) on read. Relying on the writer to 'preserve the index' is forbidden."

Detrend pipeline contract (from sources/code/detrend.md):

"The regression pipeline calls fit_transform on training data, serialises the fitted detrender to detrender.pkl, then calls transform on any held-out fold and inverse_transform inside models/regression/runtime.py:176 to retrend model predictions."

fit_transform is called BEFORE build_regressor — the regressor receives only trend-adjusted yield values and never sees raw trend-contaminated yields.

Failure modes and recovery

Symptom Likely cause Recovery
RuntimeError: detrender not fitted at predict time detrender.pkl written by an older code version before is_fitted guard was added Delete {fold}/detrender.pkl and re-run cli run hindcast for the fold
KeyError or all-NaN in MedianImputer.transform at predict time feature_fill_values.parquet written with RangeIndex (pre-Clause-30 bug) Delete feature_fill_values.parquet and re-run the fit stage
KeyError: Missing sample-weight column use_sample_weights=True but aggregation policy doesn't emit weight_column Set use_sample_weights=False in config or fix aggregation policy
ValueError: fit data has NaN features after imputation (from assert_no_nan_features) All-NaN feature in fit_drop_cols subset survived imputation Trace back to feature_fill_values.parquet for all-NaN training column
OperationalError from MLflow Two folds fitting the same commodity in parallel Run same-commodity folds sequentially (known MLflow DB locking issue)

Cross-references

PRs that materially changed this stage

  • Clause 30 regression fixfeature_fill_values.parquet now writes the feature-name index as an explicit "feature" column (run_fit.py:74–86) after a silent no-op regression where MedianImputer.transform returned all-NaN because fv.reindex(feature_cols) found a RangeIndex.