diff --git a/src/microplex_us/pipelines/pe_us_data_rebuild.py b/src/microplex_us/pipelines/pe_us_data_rebuild.py index d09e95d..7f9b036 100644 --- a/src/microplex_us/pipelines/pe_us_data_rebuild.py +++ b/src/microplex_us/pipelines/pe_us_data_rebuild.py @@ -7,6 +7,8 @@ from pathlib import Path from typing import TYPE_CHECKING, Any +from microplex_us.vintages import MP_2024, DatasetProfile + if TYPE_CHECKING: from microplex.core import SourceProvider @@ -95,10 +97,11 @@ def default_policyengine_us_data_rebuild_config( def default_policyengine_us_data_rebuild_source_providers( *, - cps_source_year: int = 2023, + profile: DatasetProfile = MP_2024, + cps_source_year: int | None = None, cps_cache_dir: str | Path | None = None, cps_download: bool = True, - puf_target_year: int = 2024, + puf_target_year: int | None = None, puf_cps_reference_year: int | None = None, puf_cache_dir: str | Path | None = None, puf_path: str | Path | None = None, @@ -107,14 +110,31 @@ def default_policyengine_us_data_rebuild_source_providers( include_donor_surveys: bool = True, include_sipp: bool | None = None, include_scf: bool | None = None, - acs_year: int = 2024, - sipp_year: int = 2023, - scf_year: int = 2022, + acs_year: int | None = None, + sipp_year: int | None = None, + scf_year: int | None = None, donor_cache_dir: str | Path | None = None, policyengine_us_data_repo: str | Path | None = None, policyengine_us_data_python: str | Path | None = None, ) -> tuple[SourceProvider, ...]: - """Return the canonical CPS+PUF provider bundle for the rebuild track.""" + """Return the canonical CPS+PUF provider bundle for one dataset ``profile``. + + Source years derive from ``profile`` -- a single ``(dataset, model_year)`` + key. The per-source ``*_year`` arguments remain only as explicit overrides + (``None`` means "take it from the profile"), so there are no stale literal + defaults to drift from the profile. + """ + + _years = profile.source_years() + cps_source_year = ( + _years["cps_source_year"] if cps_source_year is None else cps_source_year + ) + puf_target_year = ( + _years["puf_target_year"] if puf_target_year is None else puf_target_year + ) + acs_year = _years["acs_year"] if acs_year is None else acs_year + sipp_year = _years["sipp_year"] if sipp_year is None else sipp_year + scf_year = _years["scf_year"] if scf_year is None else scf_year from microplex_us.data_sources.cps import CPSASECSourceProvider from microplex_us.data_sources.donor_surveys import ( diff --git a/src/microplex_us/pipelines/pe_us_data_rebuild_checkpoint.py b/src/microplex_us/pipelines/pe_us_data_rebuild_checkpoint.py index a37f993..b910a7c 100644 --- a/src/microplex_us/pipelines/pe_us_data_rebuild_checkpoint.py +++ b/src/microplex_us/pipelines/pe_us_data_rebuild_checkpoint.py @@ -64,6 +64,7 @@ write_us_stage_run_manifests_from_artifact_manifest, ) from microplex_us.variables import prune_redundant_variables +from microplex_us.vintages import MP_2024, DatasetProfile, get_profile if TYPE_CHECKING: from microplex.core import SourceProvider @@ -1969,7 +1970,8 @@ def run_policyengine_us_data_rebuild_checkpoint( config_overrides: dict[str, Any] | None = None, providers: tuple[SourceProvider, ...] | list[SourceProvider] | None = None, queries: dict[str, SourceQuery] | None = None, - cps_source_year: int = 2023, + profile: DatasetProfile = MP_2024, + cps_source_year: int | None = None, cps_cache_dir: str | Path | None = None, cps_download: bool = True, puf_target_year: int | None = None, @@ -1981,9 +1983,9 @@ def run_policyengine_us_data_rebuild_checkpoint( include_donor_surveys: bool = True, include_sipp: bool | None = None, include_scf: bool | None = None, - acs_year: int = 2024, - sipp_year: int = 2023, - scf_year: int = 2022, + acs_year: int | None = None, + sipp_year: int | None = None, + scf_year: int | None = None, donor_cache_dir: str | Path | None = None, policyengine_us_data_repo: str | Path | None = None, policyengine_us_data_python: str | Path | None = None, @@ -2051,6 +2053,7 @@ def run_policyengine_us_data_rebuild_checkpoint( if providers is None: resolved_providers = tuple( default_policyengine_us_data_rebuild_source_providers( + profile=profile, cps_source_year=cps_source_year, cps_cache_dir=cps_cache_dir, cps_download=cps_download, @@ -2261,12 +2264,15 @@ def main(argv: list[str] | None = None) -> None: "variables. See docs/next-run-plan.md." ), ) - parser.add_argument("--cps-source-year", type=int, default=2023) - parser.add_argument("--puf-target-year", type=int) - parser.add_argument("--puf-cps-reference-year", type=int) - parser.add_argument("--acs-year", type=int, default=2024) - parser.add_argument("--sipp-year", type=int, default=2023) - parser.add_argument("--scf-year", type=int, default=2022) + parser.add_argument( + "--profile", + default=MP_2024.name, + help=( + "Dataset vintage profile name (e.g. mp_ecps_2024). Source release " + "years come from this single (dataset, year) key; the per-source " + "--*-year flags were removed in favor of it." + ), + ) parser.add_argument("--cps-cache-dir") parser.add_argument("--puf-cache-dir") parser.add_argument("--donor-cache-dir") @@ -2467,11 +2473,9 @@ def main(argv: list[str] | None = None) -> None: calibration_target_domains=tuple(args.calibration_target_domain), calibration_target_geo_levels=tuple(args.calibration_target_geo_level), config_overrides=config_overrides, - cps_source_year=args.cps_source_year, + profile=get_profile(args.profile), cps_cache_dir=args.cps_cache_dir, cps_download=not args.no_cps_download, - puf_target_year=args.puf_target_year, - puf_cps_reference_year=args.puf_cps_reference_year, puf_cache_dir=args.puf_cache_dir, puf_path=args.puf_path, puf_demographics_path=args.puf_demographics_path, @@ -2479,9 +2483,6 @@ def main(argv: list[str] | None = None) -> None: include_donor_surveys=args.include_donor_surveys, include_sipp=args.include_sipp, include_scf=args.include_scf, - acs_year=args.acs_year, - sipp_year=args.sipp_year, - scf_year=args.scf_year, donor_cache_dir=args.donor_cache_dir, policyengine_us_data_repo=args.policyengine_us_data_repo, policyengine_us_data_python=args.policyengine_us_data_python, diff --git a/src/microplex_us/vintages.py b/src/microplex_us/vintages.py new file mode 100644 index 0000000..6690ac8 --- /dev/null +++ b/src/microplex_us/vintages.py @@ -0,0 +1,215 @@ +"""Single source of truth for the source vintages a built dataset uses. + +A :class:`DatasetProfile` declares, in ONE place, the model year a dataset +represents and the exact source *release* feeding each input, plus how that +release's dollars reach the model year (used natively, or aged with a +component-specific factor family). + +Build code reads vintages from a profile instead of per-call literal defaults, +so a stale year cannot hide in a function signature, a CLI default, or a +forgotten shell flag: the value is defined once and the safe path is the only +path. (The motivating bug: ``cps_source_year`` defaulted to 2023 -- income year +2022 -- while every production build overrode it to 2025; the stale literal sat +in three signatures for who knows how long because nothing failed.) + +The coherence checks here are the spec the build must satisfy: every source must +reach ``model_year`` -- either it is native to that year (``income_year == +model_year``) or it declares an ``age_to == model_year`` aging step. A source +that does not yet reach the model year must declare a ``gap_reason`` so the gap +is explicit rather than silent. A future build-time gate verifies a produced +artifact against the active profile; this module guarantees the *profile itself* +is internally consistent. +""" + +from __future__ import annotations + +from dataclasses import dataclass + + +@dataclass(frozen=True) +class Release: + """One source release and how its dollars reach a model year. + + Attributes: + release: the survey/file release actually loaded (e.g. CPS ASEC 2025). + income_year: the calendar/income year that release represents. CPS ASEC + survey year ``Y`` covers income year ``Y - 1`` (ASEC 2025 -> 2024); + most other sources have ``release == income_year``. + age_to: when set, dollar variables are aged from ``income_year`` to this + year using ``factors``; when ``None`` the release is used on its + native basis. + factors: label of the component-specific growth-factor family used when + aging (e.g. ``"soi"``). Required iff ``age_to`` is set; the build + binds the label to the actual factor implementation. + gap_reason: explicit, temporary acknowledgement that this source does not + yet reach the model year (e.g. aging not wired). Lets a profile stay + honest about a known gap without silently passing coherence. + """ + + release: int + income_year: int + age_to: int | None = None + factors: str | None = None + gap_reason: str | None = None + + def __post_init__(self) -> None: + if self.age_to is not None and self.factors is None: + raise ValueError( + f"Release(release={self.release}) sets age_to={self.age_to} but " + "no `factors` family to age with." + ) + if self.age_to is None and self.factors is not None: + raise ValueError( + f"Release(release={self.release}) sets factors={self.factors!r} " + "but no `age_to` year to age toward." + ) + if self.age_to is not None and self.age_to < self.income_year: + raise ValueError( + f"Release(release={self.release}) ages backward: age_to=" + f"{self.age_to} < income_year={self.income_year}." + ) + if self.gap_reason is not None and not self.gap_reason.strip(): + raise ValueError( + f"Release(release={self.release}) has an empty gap_reason; use None " + "for no declared gap or give a real explanation." + ) + + @property + def effective_year(self) -> int: + """The model year this release's dollars land on after any aging.""" + return self.age_to if self.age_to is not None else self.income_year + + +@dataclass(frozen=True) +class DatasetProfile: + """The complete vintage definition for one built dataset. + + ``model_year`` is the year the dataset represents; every source must reach it + (or declare a ``gap_reason``). + """ + + dataset: str + model_year: int + cps_asec: Release + puf: Release + acs: Release + sipp: Release + scf: Release + + @property + def name(self) -> str: + """Stable profile name ``{dataset}_{model_year}`` (e.g. ``mp_ecps_2024``).""" + return f"{self.dataset}_{self.model_year}" + + @property + def key(self) -> tuple[str, int]: + """The ``(dataset, model_year)`` identity a build is addressed by.""" + return (self.dataset, self.model_year) + + def sources(self) -> dict[str, Release]: + return { + "cps_asec": self.cps_asec, + "puf": self.puf, + "acs": self.acs, + "sipp": self.sipp, + "scf": self.scf, + } + + def source_years(self) -> dict[str, int]: + """The release/target years the source providers consume, derived once from + this profile so callers pass the profile rather than loose per-source years. + Keyed by the existing provider/checkpoint parameter names.""" + return { + "cps_source_year": self.cps_asec.release, + "puf_target_year": self.model_year, + "acs_year": self.acs.release, + "sipp_year": self.sipp.release, + "scf_year": self.scf.release, + } + + def version_id(self, *, variant: str, commit: str, build_date: str) -> str: + """Derive the canonical build/version id from the profile so the ASEC and + calendar years in the name cannot disagree with the data. Example: + ``mp-ecps-shaped-asec2025-calendar2024---``.""" + prefix = self.dataset.replace("_", "-") + return ( + f"{prefix}-shaped-asec{self.cps_asec.release}-" + f"calendar{self.model_year}-{variant}-{commit}-{build_date}" + ) + + def incoherent_sources(self) -> dict[str, str]: + """Map each source that fails to reach ``model_year`` (and has not + declared a ``gap_reason``) to a human-readable explanation.""" + problems: dict[str, str] = {} + for name, release in self.sources().items(): + if release.gap_reason is not None: + continue + if release.effective_year != self.model_year: + problems[name] = ( + f"reaches {release.effective_year} (release {release.release}, " + f"income {release.income_year}, age_to {release.age_to}); " + f"model_year is {self.model_year}" + ) + return problems + + def declared_gaps(self) -> dict[str, str]: + """Map each source with a declared (acknowledged) basis gap to its reason.""" + return { + name: release.gap_reason + for name, release in self.sources().items() + if release.gap_reason is not None + } + + def __post_init__(self) -> None: + problems = self.incoherent_sources() + if problems: + detail = "; ".join(f"{name}: {why}" for name, why in problems.items()) + raise ValueError( + f"DatasetProfile {self.name!r} is incoherent: every source must " + f"reach model_year {self.model_year} or declare a gap_reason. {detail}" + ) + + +# The current Microplex eCPS-replacement target: a 2024 base dataset that +# replaces ``enhanced_cps_2024``. Source releases match what the production build +# loads today; the aging declarations are the spec the build satisfies (PUF ages +# via SOI factors; SIPP/SCF aging to 2024 landed in #185; ACS donor is now the +# native-2024 release). +MP_2024 = DatasetProfile( + dataset="mp_ecps", + model_year=2024, + # CPS ASEC survey year 2025 == income/calendar year 2024: native 2024 spine. + cps_asec=Release(release=2025, income_year=2024), + # Public-use PUF base is 2015 (latest released); aged to 2024 via SOI factors. + puf=Release(release=2015, income_year=2015, age_to=2024, factors="soi"), + # ACS donor uses the native 2024 release (income year 2024 == model year, + # no aging). The PE-US-data Python module still only exposes ACS_2022, so MP + # loads a local acs_2024.h5 via the donor-provider H5 fallback added in #184; + # the provider default is 2024. The manifest block's default_year=2022 is the + # module baseline, not MP's intended vintage, so ACS is excluded from the + # manifest-tie check below. + acs=Release(release=2024, income_year=2024), + # SIPP/SCF donors aged from their latest releases to 2024. + sipp=Release(release=2023, income_year=2023, age_to=2024, factors="pe_growfactors"), + scf=Release(release=2022, income_year=2022, age_to=2024, factors="pe_growfactors"), +) + + +PROFILES: dict[str, DatasetProfile] = {MP_2024.name: MP_2024} + + +def get_profile(name: str) -> DatasetProfile: + """Return the dataset profile by name (``{dataset}_{model_year}``).""" + try: + return PROFILES[name] + except KeyError: + known = ", ".join(sorted(PROFILES)) + raise KeyError( + f"Unknown dataset profile {name!r}; known profiles: {known}" + ) from None + + +def resolve_profile(dataset: str, model_year: int) -> DatasetProfile: + """Resolve a build's profile from its ``(dataset, model_year)`` key -- the + single identifier a build is keyed on.""" + return get_profile(f"{dataset}_{model_year}") diff --git a/tests/pipelines/test_pe_us_data_rebuild.py b/tests/pipelines/test_pe_us_data_rebuild.py index aacfe69..71e8940 100644 --- a/tests/pipelines/test_pe_us_data_rebuild.py +++ b/tests/pipelines/test_pe_us_data_rebuild.py @@ -138,6 +138,8 @@ def test_default_policyengine_us_data_rebuild_source_providers_use_pe_style_bund SOCIAL_SECURITY_SPLIT_STRATEGY_PE_QRF ) assert isinstance(providers[2], ACSSourceProvider) + # ACS uses the native 2024 release (MP_2024.acs.release); MP loads acs_2024.h5 + # via the #184 donor H5 fallback. assert providers[2].year == 2024 assert isinstance(providers[3], SIPPSourceProvider) assert providers[3].block == "tips" @@ -203,3 +205,39 @@ def test_build_policyengine_us_data_rebuild_pipeline_returns_configured_pipeline assert pipeline.config.calibration_max_iter == 77 assert pipeline.config.synthesis_backend == "seed" assert pipeline.config.calibration_backend == "entropy" + + +def test_source_provider_years_resolve_from_profile_not_literals() -> None: + # The year params no longer carry literal defaults: they default to None and + # resolve from the dataset profile, so a stale literal cannot silently return. + # Both the provider and the checkpoint take a `profile` and None-default years. + import inspect + + from microplex_us.pipelines.pe_us_data_rebuild_checkpoint import ( + run_policyengine_us_data_rebuild_checkpoint, + ) + from microplex_us.vintages import MP_2024 + + provider_params = inspect.signature( + default_policyengine_us_data_rebuild_source_providers + ).parameters + checkpoint_params = inspect.signature( + run_policyengine_us_data_rebuild_checkpoint + ).parameters + for params in (provider_params, checkpoint_params): + assert "profile" in params + for year in ( + "cps_source_year", + "puf_target_year", + "acs_year", + "sipp_year", + "scf_year", + ): + assert params[year].default is None, year + + # With the default profile, the resolved providers carry MP_2024's years. + providers = default_policyengine_us_data_rebuild_source_providers(cps_download=False) + years = MP_2024.source_years() + assert providers[0].year == years["cps_source_year"] == 2025 + assert providers[1].target_year == years["puf_target_year"] == 2024 + assert providers[2].year == years["acs_year"] == 2024 diff --git a/tests/pipelines/test_pe_us_data_rebuild_checkpoint.py b/tests/pipelines/test_pe_us_data_rebuild_checkpoint.py index a78cee0..2f17cda 100644 --- a/tests/pipelines/test_pe_us_data_rebuild_checkpoint.py +++ b/tests/pipelines/test_pe_us_data_rebuild_checkpoint.py @@ -706,6 +706,8 @@ def fake_run_policyengine_us_data_rebuild_checkpoint(**kwargs): "/tmp/policy_data.db", "--version-id", "run-1", + "--profile", + "mp_ecps_2024", "--donor-imputer-condition-selection", "pe_plus_puf_native_challenger", "--defer-native-audit", @@ -720,11 +722,37 @@ def fake_run_policyengine_us_data_rebuild_checkpoint(**kwargs): assert captured["config_overrides"]["random_seed"] == 42 assert captured["defer_native_audit"] is True assert captured["defer_imputation_ablation"] is True + # The CLI resolves --profile through the vintage registry and threads the + # resolved profile to the checkpoint (the per-year flags no longer exist). + assert captured["profile"].name == "mp_ecps_2024" stdout = capsys.readouterr().out assert "/tmp/artifacts/run-1" in stdout assert "hasRealPolicyEngineComparison" in stdout +def test_main_rejects_unknown_profile() -> None: + # --profile is resolved through the vintage registry; an unknown name fails + # loudly rather than silently building the wrong dataset. get_profile raises + # before the checkpoint runs, so no checkpoint mock is needed. + try: + checkpoint_module.main( + [ + "--output-root", + "/tmp/artifacts", + "--baseline-dataset", + "/tmp/enhanced_cps_2024.h5", + "--targets-db", + "/tmp/policy_data.db", + "--profile", + "mp_ecps_1999", + ] + ) + except KeyError as exc: + assert "Unknown dataset profile" in str(exc) + else: + raise AssertionError("Expected unknown --profile to fail closed") + + def test_run_policyengine_us_data_rebuild_checkpoint_rejects_empty_provider_sequence( tmp_path, ) -> None: diff --git a/tests/test_vintages.py b/tests/test_vintages.py new file mode 100644 index 0000000..7a67d90 --- /dev/null +++ b/tests/test_vintages.py @@ -0,0 +1,151 @@ +"""Tests for the single-source-of-truth dataset vintage profiles.""" + +import pytest + +from microplex_us.vintages import ( + MP_2024, + DatasetProfile, + Release, + get_profile, + resolve_profile, +) + + +def _release(**overrides) -> Release: + base = dict(release=2024, income_year=2024) + base.update(overrides) + return Release(**base) + + +def _profile(**overrides) -> DatasetProfile: + base = dict( + dataset="test", + model_year=2024, + cps_asec=Release(release=2025, income_year=2024), + puf=Release(release=2015, income_year=2015, age_to=2024, factors="soi"), + acs=Release(release=2024, income_year=2024), + sipp=Release(release=2023, income_year=2023, age_to=2024, factors="g"), + scf=Release(release=2022, income_year=2022, age_to=2024, factors="g"), + ) + base.update(overrides) + return DatasetProfile(**base) + + +# --- Release --------------------------------------------------------------- + + +def test_release_effective_year_native_vs_aged(): + assert Release(release=2025, income_year=2024).effective_year == 2024 + assert ( + Release(release=2015, income_year=2015, age_to=2024, factors="soi").effective_year + == 2024 + ) + + +def test_release_age_to_requires_factors(): + with pytest.raises(ValueError, match="factors"): + Release(release=2015, income_year=2015, age_to=2024) + + +def test_release_factors_requires_age_to(): + with pytest.raises(ValueError, match="age_to"): + Release(release=2015, income_year=2015, factors="soi") + + +def test_release_cannot_age_backward(): + with pytest.raises(ValueError, match="ages backward"): + Release(release=2015, income_year=2024, age_to=2022, factors="soi") + + +# --- DatasetProfile coherence --------------------------------------------- + + +def test_mp_2024_is_coherent_and_has_no_gaps(): + assert MP_2024.model_year == 2024 + assert MP_2024.incoherent_sources() == {} + assert MP_2024.declared_gaps() == {} + # Every source's dollars land on the model year. + for name, release in MP_2024.sources().items(): + assert release.effective_year == 2024, name + + +def test_release_rejects_empty_gap_reason(): + with pytest.raises(ValueError, match="empty gap_reason"): + Release(release=2022, income_year=2022, gap_reason=" ") + + +def test_mp_2024_donor_releases_match_source_manifest(): + # SIPP/SCF donor release years must equal what the build's donor loaders use + # (the pe_source_impute manifest ``default_year``), or the profile asserts a + # vintage the build does not load. ACS is intentionally excluded: MP loads a + # newer local acs_2024.h5 via the #184 donor H5 fallback, beyond the module's + # ACS_2022 baseline default_year, so the manifest is not authoritative for ACS. + from microplex_us.pe_source_impute_specs import get_pe_source_impute_block_spec + + for source, block in [("sipp", "sipp_tips"), ("scf", "scf")]: + spec = get_pe_source_impute_block_spec(block) + assert MP_2024.sources()[source].release == spec.default_year, source + + +def test_mp_2024_cps_spine_is_native_2024_income(): + # ASEC survey year 2025 -> income year 2024; the spine is native, not aged. + assert MP_2024.cps_asec.release == 2025 + assert MP_2024.cps_asec.income_year == 2024 + assert MP_2024.cps_asec.age_to is None + + +def test_incoherent_profile_raises(): + # A donor stuck at 2022 with no aging and no acknowledged gap is incoherent. + with pytest.raises(ValueError, match="incoherent"): + _profile(scf=Release(release=2022, income_year=2022)) + + +def test_declared_gap_passes_coherence_but_is_surfaced(): + profile = _profile( + scf=Release(release=2022, income_year=2022, gap_reason="aging not wired yet") + ) + assert profile.incoherent_sources() == {} + assert profile.declared_gaps() == {"scf": "aging not wired yet"} + + +def test_aged_source_reaching_wrong_year_is_incoherent(): + with pytest.raises(ValueError, match="incoherent"): + _profile(sipp=Release(release=2023, income_year=2023, age_to=2023, factors="g")) + + +# --- registry -------------------------------------------------------------- + + +def test_get_profile_returns_known_and_raises_unknown(): + assert get_profile("mp_ecps_2024") is MP_2024 + with pytest.raises(KeyError, match="Unknown dataset profile"): + get_profile("mp_ecps_1999") + + +def test_profile_is_keyed_on_dataset_and_year(): + assert MP_2024.dataset == "mp_ecps" + assert MP_2024.model_year == 2024 + assert MP_2024.name == "mp_ecps_2024" + assert MP_2024.key == ("mp_ecps", 2024) + assert resolve_profile("mp_ecps", 2024) is MP_2024 + + +def test_source_years_derive_from_the_profile(): + # The years a build threads through its providers come from one place. + assert MP_2024.source_years() == { + "cps_source_year": 2025, + "puf_target_year": 2024, + "acs_year": 2024, + "sipp_year": 2023, + "scf_year": 2022, + } + + +def test_version_id_is_derived_so_name_years_cannot_drift(): + vid = MP_2024.version_id( + variant="puf-support-clone", commit="abc1234", build_date="20260602" + ) + # ASEC + calendar years are pulled from the profile, not typed into the name. + assert vid == ( + "mp-ecps-shaped-asec2025-calendar2024-puf-support-clone-abc1234-20260602" + )