Skip to content

lib/ — Library Utilities

market_insights_models/src/commodity_hindcast/lib/ is a collection of shared, domain-agnostic building blocks used by every pipeline stage. It has no dependency on the stage modules; stages depend on lib, not the reverse.

Overview

The package is split into five sub-packages plus three top-level modules:

Sub-package Concern
(top-level) Paths, calendar, unit conversion, DataFrame transforms
geo/ Geo-identifier normalisation, area-weighted aggregation, production-based filtering
reference_data/ NASS/WASDE/CONAB benchmark loaders, discriminated-union spec dispatch
results/ Lazy artefact handles for hindcast and forecast slices
edit_and_imputation/ Fellegi-Holt edit rules, feature imputation
tracking/ MLflow lifecycle helpers and decorators

Top-level modules

path_utils.py

Purpose: Single source of truth for path anchoring across all config models.

resolve_data_path

path_utils.py:27 — anchors a relative path at data_root; passes s3:// URIs and absolute paths through unchanged. The rule is simple: if "://" appears in the string, return it as-is; if the path is absolute, return it; otherwise prepend data_root.

ResolvablePath

path_utils.py:60Annotated[AnyPath, _ResolveAgainstDataRoot()]. Fields typed as ResolvablePath are discovered at config-validation time by _iter_resolvable_fields and resolved via resolve_data_path. This is the central path-anchoring mechanism; the recurring class of S3-path-anchoring bugs (where .as_posix() is called on a CloudPath, or a relative path is resolved against cwd instead of data_root) all trace back to bypassing this annotation.

_iter_resolvable_fields

path_utils.py:63 — walks a Pydantic model tree (nested models, dicts, lists) yielding every (owner, field_name) pair whose type annotation carries _ResolveAgainstDataRoot. Consumed by ExperimentConfig._resolve_data_paths and the preflight checkers, eliminating any hand-maintained registry.

calendar.py

Purpose: Year-independent calendar primitives.

MonthDay

calendar.py:16 — frozen dataclass holding month and day. Validates against a non-leap reference year at construction (so Feb 29 is rejected — a recurring annual cutoff that doesn't exist most years would silently misbehave). Used by _ReferenceYieldSpecBase.cutoff_month_day and ReferenceYieldLoader.yield_final.

unit_utils.py

Purpose: Canonical bu/acre ↔ kg/ha conversions for every call-site shape (scalar, numpy array, pandas Series, polars DataFrame).

Key symbols

  • kg_ha_to_bu_acre / bu_acre_to_kg_ha — scalar, unit_utils.py:33-40
  • kg_ha_to_bu_acre_array — numpy, unit_utils.py:65
  • kg_ha_to_bu_acre_series — pandas, unit_utils.py:78
  • clip_yield_to_delivery_range — polars delivery-boundary clip with warn-on-clip logging, unit_utils.py:93

Constants HA_PER_ACRE = 0.404686 and KG_PER_LB = 0.453592 (unit_utils.py:24-25) are 6 d.p. truncations; the module docstring notes a ~0.00017 % systematic bias relative to the exact legal definitions used in QUBE.

transform_utils.py

Purpose: Composable, pure-shape polars df.pipe() transforms with no domain-specific column names.

Key symbols

  • enforce_ci_narrowing — within each year, sorted by init_date, clamps CI bands so they never widen as the season progresses; transform_utils.py:17
  • drop_frozen_tail — drops trailing init_date rows where mean is frozen (unchanged), transform_utils.py:87
  • apply_delivery_post_transforms — canonical entry point for both hindcast and forecast delivery; orders narrowing before frozen-tail, transform_utils.py:68
  • build_wide_prediction_frame — projects a per-county prediction frame onto the fixed 9-column rolling-wide schema, transform_utils.py:126

lib/geo/

identifiers.py

Purpose: Canonical ADM geo-identifier format — normalisation, validation, construction.

GeoIdentifier

identifiers.py:111NewType("GeoIdentifier", str). The canonical string type; returned by verify_geo_identifier after pattern validation.

GEO_ID_PATTERN

identifiers.py:107 — regex enforcing ADM0:XXX[/ADM1:xxx[/ADM2:xxx]] with uppercase country codes and lowercase name segments.

normalise_geo_identifier / normalise_geo_identifiers

identifiers.py:75 — ASCII-folds accented and special characters (e.g. doña ana → dona ana), lowercases name segments, uppercases the ADM0 country code. Special-character map at identifiers.py:29.

make_geo_identifier

identifiers.py:207 — constructs a validated GeoIdentifier from county + state + ISO-3 country code. Fails fast if country_code is not exactly three uppercase letters.

apply_geo_lookup

identifiers.py:161 — vectorised hash→ADM remapping via pandas .map; detects the edge case where the zarr already stores canonical ADM identifiers and skips remapping.

aggregation.py

Purpose: Area-weighted aggregation over ADM levels.

area_weighted_mean

aggregation.py:22 — NaN-safe grouped area-weighted mean; raises ValueError when a group has non-NaN yield but all-NaN area (rather than silently falling back to an unweighted mean, which would produce wrong results for counties with abandonment/missing area).

aggregate_weighted_frame

aggregation.py:96 — aggregates multiple value columns over an optional geo level plus explicit group columns. Used by FitAggregationPolicy.

FitAggregationPolicy

aggregation.py:147 — frozen dataclass encapsulating the aggregation level and weight column for fit-time weather-correction aggregation. fit_transform applies the aggregation or no-ops when weather_correction_fit_level is None.

AggregationLevel

aggregation.py:10Literal["ADM0", "ADM1", "ADM2"]. Used throughout delivery and diagnostics.

selection.py

Purpose: Production-based county filtering for the experiment's county universe.

select_by_production

selection.py:72 — returns geo identifiers covering threshold of cumulative production over the most recent recent_years. Anchors the ranking window at max_year (typically max(test_years) from config) so the window is deterministic and not shifted by unharvested prediction years. Raises on duplicate (geo_identifier, year) rows.

union_fit_pred_for_production_ranking

selection.py:10 — merges fit and pred frames to a deduplicated (geo_identifier, year, production_kg) panel for ranking, since pred has multiple init rows per county-year.

lib/reference_data/

base_reference_yield_loader.py

Purpose: ABC and spec base for all reference yield loaders. The ABC contract is kg/ha always; concrete loaders convert from their source unit on read.

_ReferenceYieldSpecBase

base_reference_yield_loader.py:36 — frozen Pydantic base with fields name, filepath (a ResolvablePath), commodity, geography, cutoff_month_day, and unit. Mirrors _EditRuleBase.

ReferenceYieldLoader

base_reference_yield_loader.py:77 — ABC with a class-level _registry (spec type → loader type). Subclasses implement load(); the base provides yield_asof, yield_final, yield_asof_array, yield_final_series as concrete shared implementations so the algorithm lives once. from_spec dispatches to the registered concrete class.

nass.py

Purpose: NASS raw observation loaders.

load_nass_obs

nass.py:31 — reads the yields builder source parquet (the full county universe, not the production-subsampled feature merge), applies crop_type filter, constructs geo_identifier via make_geo_identifier, drops duplicate county-years.

load_nass_county_panel_yield_area

nass.py:78 — thin wrapper that renames target_colyield_kg_ha and lowercases geo_identifier.

wasde.py

Purpose: USDA WASDE CSV loader.

WasdeRefSpec

wasde.py:25 — spec with kind="wasde" and unit="bu_acre" (the WASDE source default). The geography filter that was previously hardcoded to "united_states" is now spec-driven via geography.

WasdeLoader

wasde.py:32 — reads the WASDE CSV, filters by geography/commodity/variable, parses marketing_year from "YYYY/YY" strings, converts bu/ac → kg/ha on read via bu_acre_to_kg_ha.

conab.py

Purpose: CONAB Serie Historica (one final per safra) and monthly Levantamento bulletins. Handles latin-1 encoding, semicolon delimiter, fixed-width string padding, and the ' 099' LEVANT pseudo-row.

ConabFinalRefSpec / ConabFinalLoader

conab.py:25 / conab.py:94 — loads the Serie Historica; aggregates UF rows to national by summing production and area then recomputing yield (the produtividade_mil_ha_mil_t column is explicitly ignored). Overrides yield_final because release_date is pinned equal to the cutoff, so the base < cutoff filter would exclude the only row.

ConabLevantamentoRefSpec / ConabLevantamentoLoader

conab.py:37 / conab.py:197 — loads the 12 monthly bulletins per safra. Per-LEV release dates are derived from _LEV_RELEASE_CALENDAR (conab.py:167) — a module-level dict mapping LEV ordinal (1–12) to (calendar_month, year_offset) anchored against three published bulletins — because the source file carries no publication date column.

nass_benchmarks.py

Purpose: National NASS benchmark series from the per-county panel.

Key symbols

  • nass_national_prod_div_area_kg_ha — production/area, area-weighted per year, nass_benchmarks.py:28
  • nass_national_survey_yield_area_weighted_kg_ha — survey yield, area-weighted per year, nass_benchmarks.py:38

loader.py

Purpose: Public API: discriminated-union spec, factory, and orchestration helpers. Composes the three concrete loader/spec pairs and exposes the YAML entry point.

ReferenceYieldSpec

loader.py:59Annotated[WasdeRefSpec | ConabFinalRefSpec | ConabLevantamentoRefSpec, Field(discriminator="kind")]. The Pydantic discriminated union that drives YAML parsing for ExperimentConfig.reference_data. Mirrors EditRuleConfig in edit.py.

build_loaders

loader.py:68 — constructs one ReferenceYieldLoader per spec on cfg.reference_data. Single centralised dispatch so delivery, diagnostics, and the fold generator iterate the same loader list.

build_references_by_harvest_year

loader.py:78 — builds a spec.name → harvest_year → release DataFrame dispatch table used by ExpandingFoldGenerator to hand each fold only the references it needs, without re-loading source files per fold.

lib/results/

results_slice.py

Purpose: Lazy artefact handles for hindcast and forecast slices. Both classes live here to break the historical cyclic import between forecast/results.py and steps/experiment_result.py.

AbstractSlice

results_slice.py:72runtime_checkable Protocol exposing the symmetric surface: run_dir, cutoff, walk-forward preds/year-data loaders, feature matrix paths, trained artefact loaders (load_model, load_detrender, load_feature_fill_values), and bias corrector path/presence.

HindcastSlice

results_slice.py:112 — frozen dataclass holding all on-disk paths for one CV fold. Paths are explicit fields rather than derived; from_config is the factory for stage code. load_model probes the three registered regression classes in order (PcaRidge, Ridge, XGB) to handle heterogeneous run directories. load_detrender dispatches on config.model.detrend key.

ForecastSlice

results_slice.py:301 — frozen dataclass keyed on (run_dir, experiment_key, season_year, init_date). Its root property anchors everything under run_dir/forecast/{season_year}/{init_date}/, so multiple (season_year, init_date) pairs coexist without collision. Trained artefacts are delegated to self.training — the production HindcastSlice — via production_hindcast_slice.

production_hindcast_slice

results_slice.py:271 — factory helper returning the production HindcastSlice if detrender.pkl exists, or None. Shared by ExperimentResult.production and ForecastSlice.training without re-introducing the forecast/steps import cycle.

run_result.py

Purpose: Fold-artefact registry over a run directory.

ExperimentResult

run_result.py:31 — frozen dataclass wrapping config, hindcast_slices, forecast_slices, and run_dir. from_run_dir discovers all numeric fold dirs and forecast (season_year, init_date) dirs; it is intentionally lazy (ruling 7) — an empty preds/ yields an empty tuple so the object is constructible on a fresh run_dir holding only config_resolved.yaml. Exposes save_included_geo_identifiers / load_included_geo_identifiers for the county-set handover between FIT and PREDICT phases.

lib/edit_and_imputation/

edit.py

Purpose: Fellegi-Holt edit-and-imputation for NASS-style survey data. Operates at row level on raw survey inputs, sitting between preflight.py (dataset-level run-abort gates) and imputation.py (feature-level NaN fills).

EditOperation (discriminated union)

edit.py:242Annotated[DeductiveImpute | Clip | Flag | Drop | Fail | PanelTrailingMedian, Field(discriminator="operation")]. The six operations:

Class operation Effect
DeductiveImpute "deductive_impute" Replace target via df.eval(source) on firing rows
Clip "clip" Winsorise to [min, max] on firing rows only
Flag "flag" Record fire; leave value unchanged
Drop "drop" Remove firing rows
Fail "fail" Raise ValueError if any row fires
PanelTrailingMedian "panel_trailing_median" Impute via impute_missing_area (delegates to imputation.py)

EditRuleConfig (discriminated union)

edit.py:361Annotated[RatioEditRule | RangeEditRule | NullImputeRule | PanelNullImputeRule, Field(discriminator="kind")]. Detection rule types:

Class kind Fires when
RatioEditRule "ratio_edit" target / eval(derive) outside [1/tolerance, tolerance]
RangeEditRule "range_edit" target outside [min, max]
NullImputeRule "null_impute" target is null
PanelNullImputeRule "panel_null_impute" target is null; corrective operation may reference other rows in the group

apply_edits

edit.py:383 — applies rules sequentially; each rule sees the output of prior rules. Returns (edited_df, EditReport). The EditReport.flags DataFrame is always indexed to the input df (dropped rows appear as False in later rules' columns).

imputation.py

Purpose: Feature-level NaN fills for detrenders and regressors.

MedianImputer

imputation.py:73 — sklearn-style fit/transform; computes nanmedian per feature column at fit, applies stored values at transform. QUBE-faithful default (all-NaN columns receive 0.0). from_fill_values reconstitutes from a persisted Series.

impute_missing_panel_columns

imputation.py:206 — fills NaN cells per county using trailing median, trailing mean, or zero. Trailing methods bound imputed values by each geo's historical [min, max] and raise if this bound is violated (a violation indicates a bug in the upstream imputation, not a data anomaly to absorb silently).

TrendImputer / NationalFallbackTrendImputer

imputation.py:42 / imputation.py:63 — strategy for unseen-geo trend fallback in detrenders; the national fallback fills NaN trend values with the pooled national trend.

partition_groups_by_valid_obs

imputation.py:146 — splits groups by count of rows with all required columns finite. Returns a GroupPartition naming groups that pass/fail the min_obs threshold; used by detrenders to decide whether a county is fitted individually or falls back to the national slope.

lib/tracking/

decorators.py

Purpose: MLflow run-management lifecycle for hindcast runs.

tracking_uri_anchored

decorators.py:43 — anchors a relative sqlite:/// URI at data_root so mlruns.db does not scatter across whichever directory the CLI was invoked from. HTTP(S) URIs pass through. When data_root is a CloudPath, the URI is left cwd-relative and a warning is logged (SQLite cannot live on object storage).

hindcast_mlflow_run

decorators.py:129 — context manager wrapping mlflow.start_run; sets tags from git/env metadata, logs bounded_hindcast_params, and uploads config_resolved.yaml and metadata.yaml as artefacts. Used pervasively by the hindcast stage runner.

prepare_hindcast_mlflow

decorators.py:102 — seeds RNG, writes config_resolved.yaml and metadata.yaml under run_root, calls configure_tracking. Returns (git_meta, env_meta) for tagging.

log.py

Purpose: MLflow artefact logging helpers, dataset hashing, and metadata capture.

log_artifact / log_artifacts

log.py:48 / log.py:75 — stage s3:// paths to a temp dir before calling mlflow.log_artifact*, because MLflow only accepts local paths.

bounded_hindcast_params

log.py:112 — produces the small per-run param dict (seed, data_root, year range, model keys) logged alongside the full YAML config artefact.

log_hindcast_dataset_artifacts

log.py:129 — logs fit/pred feature parquets plus each reference_data source file under datasets/<spec.name>/ so multiple references (e.g. CONAB-final + CONAB-LEV) do not collide on artefact path.

data_file_sha256_prefix

log.py:96 — streaming SHA-256 prefix for both local and S3 files; used for dataset hashing.

capture_git / capture_environment / seed_everything

log.py:151 / log.py:175 / log.py:204 — lifted from the former tfds/runner.py (Phase 6.4); record the git/Python environment that pinned a run.

queries.py

Purpose: Thin wrapper over mlflow.active_run so callers do not import the top-level mlflow package just to check run state.

has_active_run

queries.py:15 — returns True when an MLflow run is currently active.

Cross-references

  • Orchestration — stages call hindcast_mlflow_run from lib/tracking/decorators.py and construct ExperimentResult / HindcastSlice from lib/results/.
  • RegressionHindcastSlice.load_model probes the regression class registry.
  • DetrendHindcastSlice.load_detrender dispatches on config.model.detrend.

Relationships

  • path_utils.ResolvablePath is consumed by _ReferenceYieldSpecBase.filepath (base_reference_yield_loader.py:48) and by every config model field that must be preflight-checked.
  • calendar.MonthDay is consumed by _ReferenceYieldSpecBase.cutoff_month_day and ReferenceYieldLoader.yield_final.
  • unit_utils is consumed by WasdeLoader (bu/ac → kg/ha on read) and delivery boundary clipping.
  • geo/identifiers.make_geo_identifier is consumed by nass.load_nass_obs to build canonical geo identifiers from NASS county/state columns.
  • geo/aggregation.area_weighted_mean is consumed by nass_benchmarks.py and by FitAggregationPolicy.fit_transform.
  • edit_and_imputation/edit.py:EditRuleConfig and reference_data/loader.py:ReferenceYieldSpec are both discriminated unions declared as Annotated[... , Field(discriminator="...")] and parsed from YAML by ExperimentConfig.
  • tracking/log.bounded_hindcast_params is re-exported from tracking/decorators.py for backwards-compatible callers.