Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
32 changes: 26 additions & 6 deletions src/microplex_us/pipelines/pe_us_data_rebuild.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,8 @@
from pathlib import Path
from typing import TYPE_CHECKING, Any

from microplex_us.vintages import MP_2024, DatasetProfile

if TYPE_CHECKING:
from microplex.core import SourceProvider

Expand Down Expand Up @@ -95,10 +97,11 @@ def default_policyengine_us_data_rebuild_config(

def default_policyengine_us_data_rebuild_source_providers(
*,
cps_source_year: int = 2023,
profile: DatasetProfile = MP_2024,
cps_source_year: int | None = None,
cps_cache_dir: str | Path | None = None,
cps_download: bool = True,
puf_target_year: int = 2024,
puf_target_year: int | None = None,
puf_cps_reference_year: int | None = None,
puf_cache_dir: str | Path | None = None,
puf_path: str | Path | None = None,
Expand All @@ -107,14 +110,31 @@ def default_policyengine_us_data_rebuild_source_providers(
include_donor_surveys: bool = True,
include_sipp: bool | None = None,
include_scf: bool | None = None,
acs_year: int = 2024,
sipp_year: int = 2023,
scf_year: int = 2022,
acs_year: int | None = None,
sipp_year: int | None = None,
scf_year: int | None = None,
donor_cache_dir: str | Path | None = None,
policyengine_us_data_repo: str | Path | None = None,
policyengine_us_data_python: str | Path | None = None,
) -> tuple[SourceProvider, ...]:
"""Return the canonical CPS+PUF provider bundle for the rebuild track."""
"""Return the canonical CPS+PUF provider bundle for one dataset ``profile``.

Source years derive from ``profile`` -- a single ``(dataset, model_year)``
key. The per-source ``*_year`` arguments remain only as explicit overrides
(``None`` means "take it from the profile"), so there are no stale literal
defaults to drift from the profile.
"""

_years = profile.source_years()
cps_source_year = (
_years["cps_source_year"] if cps_source_year is None else cps_source_year
)
puf_target_year = (
_years["puf_target_year"] if puf_target_year is None else puf_target_year
)
acs_year = _years["acs_year"] if acs_year is None else acs_year
sipp_year = _years["sipp_year"] if sipp_year is None else sipp_year
scf_year = _years["scf_year"] if scf_year is None else scf_year

from microplex_us.data_sources.cps import CPSASECSourceProvider
from microplex_us.data_sources.donor_surveys import (
Expand Down
33 changes: 17 additions & 16 deletions src/microplex_us/pipelines/pe_us_data_rebuild_checkpoint.py
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,7 @@
write_us_stage_run_manifests_from_artifact_manifest,
)
from microplex_us.variables import prune_redundant_variables
from microplex_us.vintages import MP_2024, DatasetProfile, get_profile

if TYPE_CHECKING:
from microplex.core import SourceProvider
Expand Down Expand Up @@ -1969,7 +1970,8 @@ def run_policyengine_us_data_rebuild_checkpoint(
config_overrides: dict[str, Any] | None = None,
providers: tuple[SourceProvider, ...] | list[SourceProvider] | None = None,
queries: dict[str, SourceQuery] | None = None,
cps_source_year: int = 2023,
profile: DatasetProfile = MP_2024,
cps_source_year: int | None = None,
cps_cache_dir: str | Path | None = None,
cps_download: bool = True,
puf_target_year: int | None = None,
Expand All @@ -1981,9 +1983,9 @@ def run_policyengine_us_data_rebuild_checkpoint(
include_donor_surveys: bool = True,
include_sipp: bool | None = None,
include_scf: bool | None = None,
acs_year: int = 2024,
sipp_year: int = 2023,
scf_year: int = 2022,
acs_year: int | None = None,
sipp_year: int | None = None,
scf_year: int | None = None,
donor_cache_dir: str | Path | None = None,
policyengine_us_data_repo: str | Path | None = None,
policyengine_us_data_python: str | Path | None = None,
Expand Down Expand Up @@ -2051,6 +2053,7 @@ def run_policyengine_us_data_rebuild_checkpoint(
if providers is None:
resolved_providers = tuple(
default_policyengine_us_data_rebuild_source_providers(
profile=profile,
cps_source_year=cps_source_year,
cps_cache_dir=cps_cache_dir,
cps_download=cps_download,
Expand Down Expand Up @@ -2261,12 +2264,15 @@ def main(argv: list[str] | None = None) -> None:
"variables. See docs/next-run-plan.md."
),
)
parser.add_argument("--cps-source-year", type=int, default=2023)
parser.add_argument("--puf-target-year", type=int)
parser.add_argument("--puf-cps-reference-year", type=int)
parser.add_argument("--acs-year", type=int, default=2024)
parser.add_argument("--sipp-year", type=int, default=2023)
parser.add_argument("--scf-year", type=int, default=2022)
parser.add_argument(
"--profile",
default=MP_2024.name,
help=(
"Dataset vintage profile name (e.g. mp_ecps_2024). Source release "
"years come from this single (dataset, year) key; the per-source "
"--*-year flags were removed in favor of it."
),
)
parser.add_argument("--cps-cache-dir")
parser.add_argument("--puf-cache-dir")
parser.add_argument("--donor-cache-dir")
Expand Down Expand Up @@ -2467,21 +2473,16 @@ def main(argv: list[str] | None = None) -> None:
calibration_target_domains=tuple(args.calibration_target_domain),
calibration_target_geo_levels=tuple(args.calibration_target_geo_level),
config_overrides=config_overrides,
cps_source_year=args.cps_source_year,
profile=get_profile(args.profile),
cps_cache_dir=args.cps_cache_dir,
cps_download=not args.no_cps_download,
puf_target_year=args.puf_target_year,
puf_cps_reference_year=args.puf_cps_reference_year,
puf_cache_dir=args.puf_cache_dir,
puf_path=args.puf_path,
puf_demographics_path=args.puf_demographics_path,
puf_expand_persons=not args.no_puf_expand_persons,
include_donor_surveys=args.include_donor_surveys,
include_sipp=args.include_sipp,
include_scf=args.include_scf,
acs_year=args.acs_year,
sipp_year=args.sipp_year,
scf_year=args.scf_year,
donor_cache_dir=args.donor_cache_dir,
policyengine_us_data_repo=args.policyengine_us_data_repo,
policyengine_us_data_python=args.policyengine_us_data_python,
Expand Down
215 changes: 215 additions & 0 deletions src/microplex_us/vintages.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,215 @@
"""Single source of truth for the source vintages a built dataset uses.

A :class:`DatasetProfile` declares, in ONE place, the model year a dataset
represents and the exact source *release* feeding each input, plus how that
release's dollars reach the model year (used natively, or aged with a
component-specific factor family).

Build code reads vintages from a profile instead of per-call literal defaults,
so a stale year cannot hide in a function signature, a CLI default, or a
forgotten shell flag: the value is defined once and the safe path is the only
path. (The motivating bug: ``cps_source_year`` defaulted to 2023 -- income year
2022 -- while every production build overrode it to 2025; the stale literal sat
in three signatures for who knows how long because nothing failed.)

The coherence checks here are the spec the build must satisfy: every source must
reach ``model_year`` -- either it is native to that year (``income_year ==
model_year``) or it declares an ``age_to == model_year`` aging step. A source
that does not yet reach the model year must declare a ``gap_reason`` so the gap
is explicit rather than silent. A future build-time gate verifies a produced
artifact against the active profile; this module guarantees the *profile itself*
is internally consistent.
"""

from __future__ import annotations

from dataclasses import dataclass


@dataclass(frozen=True)
class Release:
"""One source release and how its dollars reach a model year.

Attributes:
release: the survey/file release actually loaded (e.g. CPS ASEC 2025).
income_year: the calendar/income year that release represents. CPS ASEC
survey year ``Y`` covers income year ``Y - 1`` (ASEC 2025 -> 2024);
most other sources have ``release == income_year``.
age_to: when set, dollar variables are aged from ``income_year`` to this
year using ``factors``; when ``None`` the release is used on its
native basis.
factors: label of the component-specific growth-factor family used when
aging (e.g. ``"soi"``). Required iff ``age_to`` is set; the build
binds the label to the actual factor implementation.
gap_reason: explicit, temporary acknowledgement that this source does not
yet reach the model year (e.g. aging not wired). Lets a profile stay
honest about a known gap without silently passing coherence.
"""

release: int
income_year: int
age_to: int | None = None
factors: str | None = None
gap_reason: str | None = None

def __post_init__(self) -> None:
if self.age_to is not None and self.factors is None:
raise ValueError(
f"Release(release={self.release}) sets age_to={self.age_to} but "
"no `factors` family to age with."
)
if self.age_to is None and self.factors is not None:
raise ValueError(
f"Release(release={self.release}) sets factors={self.factors!r} "
"but no `age_to` year to age toward."
)
if self.age_to is not None and self.age_to < self.income_year:
raise ValueError(
f"Release(release={self.release}) ages backward: age_to="
f"{self.age_to} < income_year={self.income_year}."
)
if self.gap_reason is not None and not self.gap_reason.strip():
raise ValueError(
f"Release(release={self.release}) has an empty gap_reason; use None "
"for no declared gap or give a real explanation."
)

@property
def effective_year(self) -> int:
"""The model year this release's dollars land on after any aging."""
return self.age_to if self.age_to is not None else self.income_year


@dataclass(frozen=True)
class DatasetProfile:
"""The complete vintage definition for one built dataset.

``model_year`` is the year the dataset represents; every source must reach it
(or declare a ``gap_reason``).
"""

dataset: str
model_year: int
cps_asec: Release
puf: Release
acs: Release
sipp: Release
scf: Release

@property
def name(self) -> str:
"""Stable profile name ``{dataset}_{model_year}`` (e.g. ``mp_ecps_2024``)."""
return f"{self.dataset}_{self.model_year}"

@property
def key(self) -> tuple[str, int]:
"""The ``(dataset, model_year)`` identity a build is addressed by."""
return (self.dataset, self.model_year)

def sources(self) -> dict[str, Release]:
return {
"cps_asec": self.cps_asec,
"puf": self.puf,
"acs": self.acs,
"sipp": self.sipp,
"scf": self.scf,
}

def source_years(self) -> dict[str, int]:
"""The release/target years the source providers consume, derived once from
this profile so callers pass the profile rather than loose per-source years.
Keyed by the existing provider/checkpoint parameter names."""
return {
"cps_source_year": self.cps_asec.release,
"puf_target_year": self.model_year,
"acs_year": self.acs.release,
"sipp_year": self.sipp.release,
"scf_year": self.scf.release,
}

def version_id(self, *, variant: str, commit: str, build_date: str) -> str:
"""Derive the canonical build/version id from the profile so the ASEC and
calendar years in the name cannot disagree with the data. Example:
``mp-ecps-shaped-asec2025-calendar2024-<variant>-<commit>-<build_date>``."""
prefix = self.dataset.replace("_", "-")
return (
f"{prefix}-shaped-asec{self.cps_asec.release}-"
f"calendar{self.model_year}-{variant}-{commit}-{build_date}"
)

def incoherent_sources(self) -> dict[str, str]:
"""Map each source that fails to reach ``model_year`` (and has not
declared a ``gap_reason``) to a human-readable explanation."""
problems: dict[str, str] = {}
for name, release in self.sources().items():
if release.gap_reason is not None:
continue
if release.effective_year != self.model_year:
problems[name] = (
f"reaches {release.effective_year} (release {release.release}, "
f"income {release.income_year}, age_to {release.age_to}); "
f"model_year is {self.model_year}"
)
return problems

def declared_gaps(self) -> dict[str, str]:
"""Map each source with a declared (acknowledged) basis gap to its reason."""
return {
name: release.gap_reason
for name, release in self.sources().items()
if release.gap_reason is not None
}

def __post_init__(self) -> None:
problems = self.incoherent_sources()
if problems:
detail = "; ".join(f"{name}: {why}" for name, why in problems.items())
raise ValueError(
f"DatasetProfile {self.name!r} is incoherent: every source must "
f"reach model_year {self.model_year} or declare a gap_reason. {detail}"
)


# The current Microplex eCPS-replacement target: a 2024 base dataset that
# replaces ``enhanced_cps_2024``. Source releases match what the production build
# loads today; the aging declarations are the spec the build satisfies (PUF ages
# via SOI factors; SIPP/SCF aging to 2024 landed in #185; ACS donor is now the
# native-2024 release).
MP_2024 = DatasetProfile(
dataset="mp_ecps",
model_year=2024,
# CPS ASEC survey year 2025 == income/calendar year 2024: native 2024 spine.
cps_asec=Release(release=2025, income_year=2024),
# Public-use PUF base is 2015 (latest released); aged to 2024 via SOI factors.
puf=Release(release=2015, income_year=2015, age_to=2024, factors="soi"),
# ACS donor uses the native 2024 release (income year 2024 == model year,
# no aging). The PE-US-data Python module still only exposes ACS_2022, so MP
# loads a local acs_2024.h5 via the donor-provider H5 fallback added in #184;
# the provider default is 2024. The manifest block's default_year=2022 is the
# module baseline, not MP's intended vintage, so ACS is excluded from the
# manifest-tie check below.
acs=Release(release=2024, income_year=2024),
# SIPP/SCF donors aged from their latest releases to 2024.
sipp=Release(release=2023, income_year=2023, age_to=2024, factors="pe_growfactors"),
scf=Release(release=2022, income_year=2022, age_to=2024, factors="pe_growfactors"),
)


PROFILES: dict[str, DatasetProfile] = {MP_2024.name: MP_2024}


def get_profile(name: str) -> DatasetProfile:
"""Return the dataset profile by name (``{dataset}_{model_year}``)."""
try:
return PROFILES[name]
except KeyError:
known = ", ".join(sorted(PROFILES))
raise KeyError(
f"Unknown dataset profile {name!r}; known profiles: {known}"
) from None


def resolve_profile(dataset: str, model_year: int) -> DatasetProfile:
"""Resolve a build's profile from its ``(dataset, model_year)`` key -- the
single identifier a build is keyed on."""
return get_profile(f"{dataset}_{model_year}")
Loading
Loading