Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 17 additions & 0 deletions packages/populace-build/src/populace/build/us/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,15 @@
SimpleTaxExpenditureReform,
compile_us_fiscal_target_registry,
)
from populace.build.us.puf_support import (
BASE_ASEC_SUPPORT_CHANNEL,
PUF_TAX_DETAIL_SUPPORT_CHANNEL,
US_PUF_SUPPORT_STAGE_NAME,
clone_us_frame_for_puf_support,
support_channel_column,
support_clone_index_column,
support_source_id_column,
)
from populace.build.us.reform_validation import (
REFORM_VALIDATION_SCHEMA_VERSION,
ReformValidationSpec,
Expand Down Expand Up @@ -95,6 +104,7 @@
__all__ = [
"BuildConfig",
"AsecSource",
"BASE_ASEC_SUPPORT_CHANNEL",
"SimpleTaxExpenditureReform",
"ReformValidationSpec",
"REFORM_VALIDATION_SCHEMA_VERSION",
Expand Down Expand Up @@ -126,10 +136,13 @@
"US_SOURCE_MANIFEST",
"US_SOURCE_STAGE_SPECS",
"US_STAGE_NAMES",
"PUF_TAX_DETAIL_SUPPORT_CHANNEL",
"US_PUF_SUPPORT_STAGE_NAME",
"US_STATE_INCOME_TAX_TARGET_SPECS",
"US_STATE_INCOME_TAX_TARGET_REFERENCES",
"compile_us_fiscal_target_registry",
"build_pooled_asec_unit_frame",
"clone_us_frame_for_puf_support",
"disaggregate_us_puf_aggregate_records_from_manifest",
"hard_target_package_aliases",
"in_sample_reform_specs",
Expand All @@ -145,6 +158,9 @@
"us_source_coverage_diagnostics",
"us_source_coverage_gate",
"write_us_source_coverage_diagnostics",
"support_channel_column",
"support_clone_index_column",
"support_source_id_column",
"validation_only_family_ids",
]

Expand Down Expand Up @@ -278,6 +294,7 @@ def to_manifest(self) -> dict[str, object]:
"asec_load",
"unit_assignment",
"derive_cps_carried",
US_PUF_SUPPORT_STAGE_NAME,
"puf_tax_detail",
"scf_wealth",
"sipp_tips",
Expand Down
293 changes: 293 additions & 0 deletions packages/populace-build/src/populace/build/us/puf_support.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,293 @@
"""US support expansion for PUF tax-detail imputations.

The PUF tax-detail donor needs a distinct support channel: the ASEC/CPS
records remain as the baseline channel, and a cloned channel receives
PUF-sourced tax detail without overwriting the baseline rows. The expansion is
mass-conserving: with two support channels, each channel receives half of the
incoming weights so the frame's aggregate population does not double.
"""

from __future__ import annotations

from collections.abc import Sequence
from typing import Any

import numpy as np
import pandas as pd

from populace.frame import US_SCHEMA, Frame, Weights
from populace.frame.schema import EntitySchema

__all__ = [
"BASE_ASEC_SUPPORT_CHANNEL",
"PUF_TAX_DETAIL_SUPPORT_CHANNEL",
"US_PUF_SUPPORT_STAGE_NAME",
"clone_us_frame_for_puf_support",
"support_channel_column",
"support_clone_index_column",
"support_source_id_column",
]

BASE_ASEC_SUPPORT_CHANNEL = "asec"
PUF_TAX_DETAIL_SUPPORT_CHANNEL = "puf_tax_detail"
US_PUF_SUPPORT_STAGE_NAME = "puf_support_channel"

_DEFAULT_SUPPORT_CHANNELS = (
BASE_ASEC_SUPPORT_CHANNEL,
PUF_TAX_DETAIL_SUPPORT_CHANNEL,
)


def clone_us_frame_for_puf_support(
frame: Frame,
*,
channels: Sequence[str] = _DEFAULT_SUPPORT_CHANNELS,
) -> Frame:
"""Clone a US frame into support channels for PUF detail imputation.

Args:
frame: A US-schema frame after unit assignment and CPS-carried
derivations.
channels: Ordered support-channel names. The first channel keeps the
original IDs; later channels receive remapped IDs. Weights are
split evenly across channels.

Returns:
A new frame with every entity table cloned once per support channel,
channel/source metadata added with entity-prefixed column names, all
structural IDs remapped consistently, and typed weights mass-conserved.

Raises:
ValueError: If the frame is not US-schema, channel names are invalid,
metadata columns already exist, or an ID remapping would collide.
"""

if frame.schema != US_SCHEMA:
raise ValueError("PUF support expansion currently requires the US schema.")
support_channels = _validate_channels(channels)
_reject_metadata_collisions(frame, support_channels)
id_multiplier = _id_multiplier_for_frame(frame)

tables: dict[str, pd.DataFrame] = {}
for entity in frame.entities:
tables[entity] = _clone_entity_table(
frame.table(entity),
entity=entity,
schema=frame.schema,
channels=support_channels,
id_multiplier=id_multiplier,
)
for link_name in frame.links:
tables[link_name] = _clone_link_table(
frame.link(link_name),
link_name=link_name,
schema=frame.schema,
channels=support_channels,
id_multiplier=id_multiplier,
)

weights = {
entity: Weights(
values=np.tile(
frame.weights_for(entity).values / len(support_channels),
len(support_channels),
),
kind=frame.weights_for(entity).kind,
)
for entity in frame.weighted_entities
}
strata = pd.concat(
[frame.strata.copy() for _channel in support_channels],
ignore_index=True,
)
return Frame(
tables,
frame.schema,
weights,
strata,
mass_log=frame.mass_log,
)


def support_channel_column(entity: str) -> str:
"""Return the entity-prefixed support-channel metadata column."""

_require_entity_name(entity)
return f"{entity}_support_channel"


def support_clone_index_column(entity: str) -> str:
"""Return the entity-prefixed clone-index metadata column."""

_require_entity_name(entity)
return f"{entity}_support_clone_index"


def support_source_id_column(entity: str) -> str:
"""Return the entity-prefixed original-ID provenance column."""

_require_entity_name(entity)
return f"{entity}_source_id"


def _clone_entity_table(
table: pd.DataFrame,
*,
entity: str,
schema: EntitySchema,
channels: tuple[str, ...],
id_multiplier: int,
) -> pd.DataFrame:
id_columns = _entity_id_columns(schema, entity)
source_id = support_source_id_column(entity)
channel_column = support_channel_column(entity)
clone_index_column = support_clone_index_column(entity)
primary_id = schema.entity_id_column(entity)

clones: list[pd.DataFrame] = []
for clone_index, channel in enumerate(channels):
clone = table.copy(deep=True)
clone[source_id] = table[primary_id].to_numpy()
clone[channel_column] = channel
clone[clone_index_column] = clone_index
for column in id_columns:
clone[column] = _remap_ids(
clone[column].to_numpy(),
clone_index=clone_index,
id_multiplier=id_multiplier,
)
clones.append(clone)
result = pd.concat(clones, ignore_index=True)
if result[primary_id].duplicated().any():
duplicates = result.loc[result[primary_id].duplicated(), primary_id].unique()
raise ValueError(
f"remapped {primary_id!r} values are not unique; id multiplier "
"is too small. Duplicate value(s): "
f"{list(map(str, duplicates[:5]))}."
)
return result


def _clone_link_table(
table: pd.DataFrame,
*,
link_name: str,
schema: EntitySchema,
channels: tuple[str, ...],
id_multiplier: int,
) -> pd.DataFrame:
links = {link.name: link for link in schema.links}
link = links[link_name]
id_columns = (
schema.entity_id_column(link.left_entity),
schema.entity_id_column(link.right_entity),
)
missing = [column for column in id_columns if column not in table]
if missing:
raise ValueError(
f"link table {link_name!r} is missing ID column(s): {missing}."
)

clones: list[pd.DataFrame] = []
for clone_index, _channel in enumerate(channels):
clone = table.copy(deep=True)
for column in id_columns:
clone[column] = _remap_ids(
clone[column].to_numpy(),
clone_index=clone_index,
id_multiplier=id_multiplier,
)
clones.append(clone)
return pd.concat(clones, ignore_index=True)


def _entity_id_columns(schema: EntitySchema, entity: str) -> tuple[str, ...]:
if entity == schema.person_entity:
return (
schema.person_id_column,
*(schema.membership_column(group) for group in schema.group_entities),
)
return (schema.entity_id_column(entity),)


def _validate_channels(channels: Sequence[str]) -> tuple[str, ...]:
if isinstance(channels, str):
raise ValueError("support channels must be a sequence of names, not a string.")
values = tuple(channels)
if len(values) < 2:
raise ValueError("PUF support expansion requires at least two channels.")
bad = [value for value in values if not isinstance(value, str) or not value]
if bad:
raise ValueError("support channels must be non-empty strings.")
if len(set(values)) != len(values):
raise ValueError(f"support channels must be unique, got {values!r}.")
return values


def _reject_metadata_collisions(
frame: Frame,
channels: tuple[str, ...],
) -> None:
expected = {
column
for entity in frame.entities
for column in (
support_source_id_column(entity),
support_channel_column(entity),
support_clone_index_column(entity),
)
}
existing = {
column for entity in frame.entities for column in frame.table(entity).columns
}
collisions = sorted(expected & existing)
if collisions:
raise ValueError(
"PUF support expansion metadata column(s) already exist: "
f"{collisions}. The stage should run exactly once."
)
if channels[0] != BASE_ASEC_SUPPORT_CHANNEL:
raise ValueError(
f"support channels must start with {BASE_ASEC_SUPPORT_CHANNEL!r} "
"so the baseline ASEC channel keeps the original IDs."
)
if PUF_TAX_DETAIL_SUPPORT_CHANNEL not in channels:
raise ValueError(
f"support channels must include {PUF_TAX_DETAIL_SUPPORT_CHANNEL!r}."
)


def _id_multiplier_for_frame(frame: Frame) -> int:
values = [
frame.table(entity)[frame.schema.entity_id_column(entity)]
for entity in frame.entities
]
return _id_multiplier_for_values(*values)


def _id_multiplier_for_values(*values: Sequence[Any]) -> int:
if not values:
raise ValueError("at least one ID value sequence is required.")
max_id = 0
for sequence in values:
numeric = pd.to_numeric(pd.Series(sequence), errors="raise").astype("int64")
if len(numeric):
max_id = max(max_id, int(numeric.abs().max()))
return 10 ** max(1, len(str(max_id)))


def _remap_ids(
ids: Sequence[Any],
*,
clone_index: int,
id_multiplier: int,
) -> np.ndarray:
values = pd.to_numeric(pd.Series(ids), errors="raise").astype("int64").to_numpy()
if clone_index == 0:
return values.copy()
return values + clone_index * id_multiplier


def _require_entity_name(entity: str) -> None:
if not isinstance(entity, str) or not entity:
raise ValueError("entity must be a non-empty string.")
8 changes: 8 additions & 0 deletions packages/populace-build/tests/test_us_plan.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
from populace.build.us import (
US_DONORS,
US_NONNEGATIVE_SOURCE_OUTPUTS,
US_PUF_SUPPORT_STAGE_NAME,
US_SOURCE_MANIFEST,
US_SOURCE_STAGE_SPECS,
US_STAGE_NAMES,
Expand Down Expand Up @@ -92,11 +93,18 @@ def test_every_donor_stage_has_matching_source_spec(self) -> None:

def test_source_specs_align_with_declared_plan(self) -> None:
derived_source_specs = {"mortgage_conversion"}
frame_structural_stages = {US_PUF_SUPPORT_STAGE_NAME}
assert {spec.stage for spec in US_SOURCE_STAGE_SPECS} == set(
US_DONORS
) | derived_source_specs
assert set(US_DONORS).issubset(US_STAGE_NAMES)
assert derived_source_specs.issubset(US_STAGE_NAMES)
assert frame_structural_stages.issubset(US_STAGE_NAMES)

def test_puf_support_channel_precedes_puf_detail_donor_stage(self) -> None:
assert US_STAGE_NAMES.index(US_PUF_SUPPORT_STAGE_NAME) < US_STAGE_NAMES.index(
"puf_tax_detail"
)

def test_source_specs_are_manifest_only_not_python_loaders(self) -> None:
for spec in US_SOURCE_STAGE_SPECS:
Expand Down
Loading
Loading