diff --git a/.gitignore b/.gitignore index 9e4726d..b564c59 100644 --- a/.gitignore +++ b/.gitignore @@ -8,6 +8,8 @@ SWEET_python.egg-info/ build/ SWEET_python/unused SWEET_python.egg-info +# Local PR-body drafts (edit in VS Code, sync to GitHub via `gh pr edit`) +/pr-drafts/ # Local Claude Code project instructions (not shared) -/CLAUDE.md \ No newline at end of file +/CLAUDE.md diff --git a/SWEET_python/city_params.py b/SWEET_python/city_params.py index 1706fa0..d4e2ddf 100644 --- a/SWEET_python/city_params.py +++ b/SWEET_python/city_params.py @@ -5729,7 +5729,20 @@ def mass_checker_math( if net[waste] < -1e-5: negative_catcher = True - if not negative_catcher: + # Under-delivery: a requested treatment whose proportional component + # fractions don't actually sum to its target (e.g. an empty/insufficient + # eligible pool -> 0-ton "silent accept"). Such cases must NOT take the + # happy path; route them through the solver, which reallocates or raises. + under_delivery = any( + np.abs( + getattr(div_fractions, div) + - sum(components_multiplied_through[div].values()) + ) + > 1e-5 + for div in div_fractions.model_dump().keys() + ) + + if (not negative_catcher) and (not under_delivery): # divs = self._divs_from_component_fractions(div_fractions, div_component_fractions, scenario=scenario) # parameters.divs = divs adjusted_diversion_constituents = False @@ -5748,400 +5761,135 @@ def mass_checker_math( div_component_fractions, ) - if ( - sum( - getattr(div_fractions, div) for div in div_fractions.model_dump().keys() - ) - > 1 - ): - raise CustomError( - "INVALID_PARAMETERS", - f"Diversions sum to {sum(getattr(div_fractions, div) for div in div_fractions.model_dump().keys())}, but they must sum to 1 or less.", - ) - - compostables = sum( - getattr(waste_fractions, waste) - for waste in ["food", "green", "wood", "paper_cardboard"] - ) - if div_fractions.compost + div_fractions.anaerobic > compostables: - raise CustomError( - "INVALID_PARAMETERS", - f"Only food, green, wood, and paper/cardboard can be composted or anaerobically digested. Those waste types sum to {compostables}, but input values of compost and anaerobic digestion sum to {div_fractions.compost + div_fractions.anaerobic}.", - ) - - for div in div_fractions.model_dump().keys(): - fraction = getattr(div_fractions, div) - s = sum( - getattr(waste_fractions, waste) for waste in self.div_components[div] - ) - if s < fraction: - components = self.div_components[div] - values = [getattr(waste_fractions, x) for x in components] - raise CustomError( - "INVALID_PARAMETERS", - f"{div} too high. {div} applies to {components}, which are {values} of total waste--the sum of these is {sum(values)}, so only that much waste can be {div}, but input value was {fraction}.", + # --- Robust diversion allocation (replaces the legacy redistribution) --- + # The old guard-ladder + redistribution loop that lived here falsely + # rejected many *feasible* slider combinations, silently accepted some + # impossible ones (diverting 0 tons), and could crash on a bare assert. + # It is replaced by an exact min-cost max-flow allocation of the three + # contended treatments (compost/anaerobic/recycling). Combustion stays a + # uniform fraction of the leftover combustible mass (handled just below). + # See SWEET_python/dst_allocation.py (+ dst_allocation_prototype.py). + from SWEET_python import dst_allocation + + waste_dict = waste_fractions.model_dump() + three_targets = { + "compost": div_fractions.compost, + "anaerobic": div_fractions.anaerobic, + "recycling": div_fractions.recycling, + } + alloc_result = dst_allocation.solve_allocation( + waste_dict, + three_targets, + eligibility=self.div_components, + spare_combustibles=(div_fractions.combustion > 0), + ) + + if not alloc_result["feasible"]: + # Build an actionable message naming a specific slider + its cap. + def _cap(t, others): + return dst_allocation.max_feasible_target( + t, waste_dict, others, eligibility=self.div_components ) - non_combustables = sum( - getattr(waste_fractions, waste) for waste in ["glass", "metal", "other"] - ) - if ( - div_fractions.compost + div_fractions.anaerobic + div_fractions.combustion - > (1 - non_combustables) - ): - s = ( - div_fractions.compost - + div_fractions.anaerobic - + div_fractions.combustion - ) + # Stage 1: a slider that on its own exceeds its eligible-waste pool + # (clearest message -- a fixed, city-specific ceiling). + for t in ("compost", "anaerobic", "recycling"): + if three_targets[t] <= 0: + continue + standalone = _cap(t, {k: 0.0 for k in three_targets if k != t}) + if three_targets[t] > standalone + 1e-6: + raise CustomError( + "INVALID_PARAMETERS", + f"{t.capitalize()} can be at most {standalone * 100:.1f}% " + f"of this city's waste (only that much is eligible for " + f"{t}), but {three_targets[t] * 100:.1f}% was requested.", + ) + # Stage 2: each fits alone but the combination over-draws a shared + # waste type -- name a slider whose reduction restores feasibility. + for t in ("compost", "anaerobic", "recycling"): + if three_targets[t] <= 0: + continue + cond = _cap(t, {k: v for k, v in three_targets.items() if k != t}) + if three_targets[t] > cond + 1e-6: + raise CustomError( + "INVALID_PARAMETERS", + f"{t.capitalize()} can be at most {cond * 100:.1f}% given " + f"the other diversion selections for this city, but " + f"{three_targets[t] * 100:.1f}% was requested. Reduce " + f"{t} or the other diversion sliders.", + ) raise CustomError( "INVALID_PARAMETERS", - f"Glass, metal, and other account for {non_combustables:.3f} of waste, and they can only be recycled. {div_fractions.compost} compost, {div_fractions.anaerobic} anaerobic, and {div_fractions.combustion} incineration were specified, summing to {s}, but only {1 - non_combustables} of waste can be diverted to these diversion types.", + "The requested diversion combination cannot be met by this " + "city's waste composition. Reduce one or more diversion sliders.", ) - non_combustion = {} - combustion_all = {} - keys_of_interest = ["compost", "anaerobic", "recycling"] - for waste in waste_fractions.model_dump().keys(): - s = sum( - components_multiplied_through[div].get(waste, 0) - for div in keys_of_interest - ) - non_combustion[waste] = s - combustion_all[waste] = getattr(waste_fractions, waste) - s - - adjust_non_combustion = False - for waste, frac in non_combustion.items(): - if frac > getattr(waste_fractions, waste): - adjust_non_combustion = True - - if adjust_non_combustion: - div_component_fractions_adjusted = DivComponentFractions( - **div_component_fractions.model_dump() - ) + # Overwrite the three contended treatments with the flow allocation + # (fraction-of-total-waste units, exactly what + # components_multiplied_through holds). + allocation = alloc_result["allocation"] + for div in ("compost", "anaerobic", "recycling"): + components_multiplied_through[div] = {w: 0.0 for w in self.waste_types} + for w, val in allocation.get(div, {}).items(): + components_multiplied_through[div][w] = val - dont_add_to = { - waste - for waste, frac in waste_fractions.model_dump().items() - if frac == 0 - } - problems = [ - set( - waste - for waste, frac in non_combustion.items() - if frac > getattr(waste_fractions, waste) - ) - ] - dont_add_to.update(problems[0]) - - while problems: - probs = problems.pop(0) - for waste in probs: - remove = {} - distribute = {} - overflow = {} - can_be_adjusted = [] - div_total = sum( - getattr(div_fractions, div) - * getattr(getattr(div_component_fractions_adjusted, div), waste) - for div in keys_of_interest - if waste - in getattr(div_component_fractions_adjusted, div) - .model_dump() - .keys() - ) - div_target = getattr(waste_fractions, waste) - diff = (div_total - div_target) / div_total - - for div in keys_of_interest: - if getattr(div_fractions, div) == 0: - continue - distribute[div] = {} - component = getattr( - getattr(div_component_fractions_adjusted, div), waste, 0 - ) - to_be_removed = diff * component - - to_distribute_to = [ - x for x in self.div_components[div] if x not in dont_add_to - ] - to_distribute_to_sum = sum( - getattr( - getattr(div_component_fractions_adjusted, div), x, 0 - ) - for x in to_distribute_to - ) - if to_distribute_to_sum == 0: - overflow[div] = 1 - continue - - for w in to_distribute_to: - add_amount = to_be_removed * ( - getattr( - getattr(div_component_fractions_adjusted, div), w, 0 - ) - / to_distribute_to_sum - ) - if w not in distribute[div]: - distribute[div][w] = [add_amount] - else: - distribute[div][w].append(add_amount) - - remove[div] = to_be_removed - can_be_adjusted.append(div) - - for div in overflow: - component = getattr( - getattr(div_component_fractions_adjusted, div), waste, 0 - ) - to_be_removed = diff * component - to_distribute_to = [ - x - for x in distribute.keys() - if waste in self.div_components[x] and x not in overflow - ] - to_distribute_to_sum = sum( - getattr(div_fractions, x) for x in to_distribute_to - ) - if to_distribute_to_sum == 0: - raise CustomError( - "INVALID_PARAMETERS", - f"Combination of compost, anaerobic digestion, and recycling is too high", - ) - - for d in to_distribute_to: - to_be_removed_component = ( - to_be_removed - * (getattr(div_fractions, d) / to_distribute_to_sum) - / getattr(div_fractions, d) - ) - to_distribute_to_component = [ - x - for x in getattr(div_component_fractions_adjusted, d) - .model_dump() - .keys() - if x not in dont_add_to - ] - to_distribute_to_sum_component = sum( - getattr( - getattr(div_component_fractions_adjusted, d), x, 0 - ) - for x in to_distribute_to_component - ) - if to_distribute_to_sum_component == 0: - raise CustomError( - "INVALID_PARAMETERS", - f"Combination of compost, anaerobic digestion, and recycling is too high", - ) - - for w in to_distribute_to_component: - add_amount = ( - to_be_removed_component - * getattr( - getattr(div_component_fractions_adjusted, d), - w, - 0, - ) - / to_distribute_to_sum_component - ) - if w in distribute[d]: - distribute[d][w].append(add_amount) - - remove[d] += to_be_removed_component - - for div in distribute: - for w in distribute[div]: - setattr( - getattr(div_component_fractions_adjusted, div), - w, - getattr( - getattr(div_component_fractions_adjusted, div), w - ) - + sum(distribute[div][w]), - ) - - for div in remove: - setattr( - getattr(div_component_fractions_adjusted, div), - waste, - getattr( - getattr(div_component_fractions_adjusted, div), waste - ) - - remove[div], - ) - - new_probs = { - waste - for waste in waste_fractions.model_dump().keys() - if sum( - getattr(div_fractions, div) - * getattr( - getattr(div_component_fractions_adjusted, div), waste, 0 - ) - for div in keys_of_interest - ) - > getattr(waste_fractions, waste) + 0.001 - } - if new_probs: - problems.append(new_probs) - dont_add_to.update(new_probs) - - components_multiplied_through = { - div: { - waste: getattr(div_fractions, div) - * getattr(getattr(div_component_fractions_adjusted, div), waste) - for waste in getattr(div_component_fractions_adjusted, div) - .model_dump() - .keys() - } - for div in div_component_fractions_adjusted.model_dump().keys() - } - - non_combustion = {} + # Combustion takes a uniform fraction of the leftover combustible mass. combustion_all = {} - for waste in waste_fractions.model_dump().keys(): - s = sum( - components_multiplied_through[div].get(waste, 0) - for div in keys_of_interest - ) - non_combustion[waste] = s - combustion_all[waste] = getattr(waste_fractions, waste) - s - - adjust_non_combustion = False - for waste, frac in non_combustion.items(): - if frac > (getattr(waste_fractions, waste) + 1e-5): - adjust_non_combustion = True - raise CustomError( - "INVALID_PARAMETERS", - f"Combination of compost, anaerobic digestion, and recycling is too high", - ) - - all_divs = sum( - getattr(div_fractions, div) for div in div_fractions.model_dump().keys() - ) - - assert ( - np.abs( - div_fractions.recycling - - sum(components_multiplied_through["recycling"].values()) + for waste in self.waste_types: + consumed = sum( + components_multiplied_through[div].get(waste, 0.0) + for div in ("compost", "anaerobic", "recycling") ) - < 1e-3 - ) + combustion_all[waste] = getattr(waste_fractions, waste) - consumed remainder = sum( - fraction - for waste_type, fraction in combustion_all.items() - if waste_type in self.div_components["combustion"] + combustion_all[w] for w in self.div_components["combustion"] ) - combustion_fraction_of_remainder = div_fractions.combustion / remainder - if combustion_fraction_of_remainder > (1 + 1e-5): - non_combustables = [ - x - for x in waste_fractions.model_dump().keys() - if x not in self.div_components["combustion"] - ] - for waste in non_combustables: - if getattr(waste_fractions, waste) == 0: - continue - new_val = getattr(waste_fractions, waste) * all_divs - components_multiplied_through["recycling"][waste] = new_val - - available_div = sum( - v - for k, v in components_multiplied_through["recycling"].items() - if k not in non_combustables - ) - available_div_target = div_fractions.recycling - sum( - v - for k, v in components_multiplied_through["recycling"].items() - if k in non_combustables - ) - if available_div_target < 0: - too_much_frac = ( - sum( - v - for k, v in components_multiplied_through["recycling"].items() - if k in non_combustables - ) - - div_fractions.recycling - ) / sum( - v - for k, v in components_multiplied_through["recycling"].items() - if k in non_combustables - ) - for key, value in components_multiplied_through["recycling"].items(): - if key in non_combustables: - components_multiplied_through["recycling"][key] = value * ( - 1 - too_much_frac - ) - else: - components_multiplied_through["recycling"][key] = 0 - assert ( - np.abs( - div_fractions.recycling - - sum( - v - for v in components_multiplied_through["recycling"].values() - ) - ) - < 1e-5 - ) - - else: - reduce_frac = (available_div - available_div_target) / available_div - for key, value in components_multiplied_through["recycling"].items(): - if key not in non_combustables: - components_multiplied_through["recycling"][key] = value * ( - 1 - reduce_frac - ) - assert ( - np.abs( - div_fractions.recycling - - sum( - v - for v in components_multiplied_through["recycling"].values() - ) - ) - < 1e-5 - ) - - non_combustion = {} - combustion_all = {} - for waste in waste_fractions.model_dump().keys(): - s = sum( - components_multiplied_through[div].get(waste, 0) - for div in keys_of_interest - ) - non_combustion[waste] = s - combustion_all[waste] = getattr(waste_fractions, waste) - s - - remainder = sum( - fraction - for waste_type, fraction in combustion_all.items() - if waste_type in self.div_components["combustion"] + if div_fractions.combustion > remainder + 1e-9: + raise CustomError( + "INVALID_PARAMETERS", + f"Incineration can be at most {remainder * 100:.1f}% of waste " + f"after the requested compost, anaerobic digestion, and " + f"recycling, but {div_fractions.combustion * 100:.1f}% was " + f"requested. Reduce incineration or the other diversion sliders.", ) - combustion_fraction_of_remainder = div_fractions.combustion / remainder - assert combustion_fraction_of_remainder < (1 + 1e-5) - if combustion_fraction_of_remainder > 1: - combustion_fraction_of_remainder = 1 + combustion_fraction_of_remainder = ( + div_fractions.combustion / remainder if remainder > 1e-12 else 0.0 + ) + components_multiplied_through["combustion"] = { + w: 0.0 for w in self.waste_types + } for waste in self.div_components["combustion"]: components_multiplied_through["combustion"][waste] = ( combustion_fraction_of_remainder * combustion_all[waste] ) + # Defensive invariants -> actionable CustomError instead of bare assert + # (asserts are stripped under `python -O` and would surface as 500s). for d in div_fractions.model_dump().keys(): - assert ( + if ( np.abs( getattr(div_fractions, d) - sum(components_multiplied_through[d].values()) ) - < 1e-3 - ) + > 1e-3 + ): + raise CustomError( + "INVALID_PARAMETERS", + f"Could not satisfy the requested {d} fraction with this " + f"city's waste composition. Adjust the diversion sliders.", + ) for w in components_multiplied_through[d]: - if abs(components_multiplied_through[d][w]) < 1e-5: + if abs(components_multiplied_through[d][w]) < 1e-9: components_multiplied_through[d][w] = 0 - assert components_multiplied_through[d][w] >= 0 + if components_multiplied_through[d][w] < 0: + raise CustomError( + "INVALID_PARAMETERS", + f"Could not satisfy the requested diversion mix for " + f"'{w}'. Reduce composting, recycling, or anaerobic " + f"digestion.", + ) adjusted_div_component_fractions = { div: { @@ -7058,21 +6806,24 @@ def implement_dst_changes_simple_v1_5( ).iat[0] scenario_parameters.waste_mass -= food_waste_prevented scenario_parameters.waste_masses.food -= food_waste_prevented - old_nonfood_total = waste_fractions_sum - food_fraction - new_nonfood_total = waste_fractions_sum - food_fraction * ( - 1 - food_waste_prevention - ) - normalization_factor = old_nonfood_total / new_nonfood_total + # Food prevention removes food mass and shrinks the total; every other + # type's mass is unchanged. Rescale ALL fractions by the same + # total-reduction factor (the factor waste_mass was just reduced by, + # above) so the invariant waste_fractions[w] * waste_mass == waste_masses[w] + # stays true. The previous non-food-only rescale (old_nonfood/new_nonfood) + # over-inflated non-food shares, so the allocator believed more + # metal/glass/other existed than the unchanged masses actually hold -> + # spurious "Negative mass for ". + total_scale = 1 - food_waste_prevention * food_fraction # reduced_total / original_total if food_waste_prevention > 0: for frac in scenario_parameters.waste_fractions.columns: if frac == "food": scenario_parameters.waste_fractions.loc[:, "food"] = ( - food_fraction * (1 - food_waste_prevention) + food_fraction * (1 - food_waste_prevention) / total_scale ) continue old_val = scenario_parameters.waste_fractions[frac].iat[0] - new_val = old_val / normalization_factor - scenario_parameters.waste_fractions.loc[:, frac] = new_val + scenario_parameters.waste_fractions.loc[:, frac] = old_val / total_scale if np.abs(scenario_parameters.waste_fractions.sum(axis=1).iat[0] - 1) > 1e-2: raise CustomError( diff --git a/SWEET_python/dst_allocation.py b/SWEET_python/dst_allocation.py new file mode 100644 index 0000000..8753572 --- /dev/null +++ b/SWEET_python/dst_allocation.py @@ -0,0 +1,187 @@ +""" +Exact diversion allocation for the city DST, via min-cost max-flow. + +Splitting user diversion sliders (compost / anaerobic / recycling, each a +fraction of total waste) across waste types is a transportation/assignment +problem: each treatment accepts only some waste types, and no type can be +diverted more than it exists. The legacy hand-rolled redistribution in +`City.mass_checker_math` solved this with local moves and FALSELY REJECTED many +feasible slider combinations (and silently mis-handled / crashed on others). + +This module solves it exactly with a small pure-Python min-cost max-flow +(no scipy / networkx). Feasibility is decided correctly for the entire true +feasible region; among feasible allocations a contention-aware cost keeps the +split sensible (and, when combustion is in play, spares combustible mass so the +leftover-remainder available to combustion is maximized). + +Combustion itself is NOT allocated here: the DST model burns a uniform fraction +of whatever combustible mass is left after the other three treatments, so the +caller handles it as a remainder step (see City.mass_checker_math). + +Validation harness (independent exact-rational oracle + ~400k-case fuzz) lives +at SWEET_python/dst_allocation_prototype.py. +""" + +from __future__ import annotations + +from collections import deque +from fractions import Fraction + +# Mirrors City.div_components (SWEET_python/city_params.py:195). Callers should +# pass their own `eligibility` (e.g. self.div_components); this is the default / +# drift reference for tests. +DEFAULT_ELIGIBILITY = { + "compost": {"food", "green", "wood", "paper_cardboard"}, + "anaerobic": {"food", "green", "wood", "paper_cardboard"}, + "combustion": {"food", "green", "wood", "paper_cardboard", + "textiles", "plastic", "rubber"}, + "recycling": {"wood", "paper_cardboard", "textiles", "plastic", + "rubber", "metal", "glass", "other"}, +} +NON_COMBUSTION = ("compost", "anaerobic", "recycling") + +# 1e9 integer grid: feasibility exact to ~1e-9 (far below slider/display +# precision) while exactly-fitting boundary cases (e.g. divert 100% of a fully +# divertible city) stay feasible. Verified against an exact rational oracle. +SCALE = 10 ** 9 +# Two-tier cost: any combustible edge costs more than any non-combustible edge, +# so when combustion > 0 the three treatments spare combustibles for it. The +# per-type contention degree breaks ties within a tier (protects shared +# wood/paper). Max degree (3) << penalty, so the tiers never cross. +_COMBUSTIBLE_PENALTY = 1000 + + +class _MinCostMaxFlow: + def __init__(self, n: int): + self.n = n + self.g: list[list[list]] = [[] for _ in range(n)] # [to, cap, cost, rev] + + def add_edge(self, u: int, v: int, cap: int, cost: int) -> None: + self.g[u].append([v, cap, cost, len(self.g[v])]) + self.g[v].append([u, 0, -cost, len(self.g[u]) - 1]) + + def run(self, s: int, t: int) -> int: + INF = float("inf") + total_flow = 0 + while True: + dist = [INF] * self.n + in_q = [False] * self.n + pv = [-1] * self.n + pe = [-1] * self.n + dist[s] = 0 + dq = deque([s]) + in_q[s] = True + while dq: + u = dq.popleft() + in_q[u] = False + du = dist[u] + for i, e in enumerate(self.g[u]): + v, cap, cost, _ = e + if cap > 0 and du + cost < dist[v]: + dist[v] = du + cost + pv[v], pe[v] = u, i + if not in_q[v]: + dq.append(v) + in_q[v] = True + if dist[t] == INF: + break + push = INF + v = t + while v != s: + push = min(push, self.g[pv[v]][pe[v]][1]) + v = pv[v] + v = t + while v != s: + e = self.g[pv[v]][pe[v]] + e[1] -= push + self.g[e[0]][e[3]][1] += push + v = pv[v] + total_flow += push + return total_flow + + +def solve_allocation(waste: dict, targets: dict, eligibility: dict = None, + treatments: tuple = NON_COMBUSTION, + spare_combustibles: bool = False, + scale: int = SCALE) -> dict: + """ + Allocate each treatment's target across eligible waste types. + + Args: + waste: {waste_type: fraction_of_total} availability. + targets: {treatment: fraction_of_total} desired per treatment. + eligibility: {treatment: set(waste_types)} (defaults to DEFAULT_ELIGIBILITY). + treatments: which treatments to allocate (default the 3 non-combustion). + spare_combustibles: raise the cost of combustible edges so the leftover + combustible remainder is maximized (use when a combustion slider > 0). + + Returns dict: {feasible, allocation {t: {w: fraction}}, flow, need, shortfall}. + Guarantees, when feasible: sum_w alloc[t][w] == targets[t] (to ~1/scale) and + sum_t alloc[t][w] <= waste[w]. + """ + if eligibility is None: + eligibility = DEFAULT_ELIGIBILITY + tlist = [t for t in treatments] + types = list(waste.keys()) + combustible = eligibility.get("combustion", set()) + + T, W = len(tlist), len(types) + S, SINK = 0, 1 + T + W + mc = _MinCostMaxFlow(SINK + 1) + + # Symmetric nearest rounding onto the integer grid (keeps exact-fit feasible). + avail_i = {w: int(round(Fraction(waste.get(w, 0.0)) * scale)) for w in types} + tgt_i = {t: int(round(Fraction(targets.get(t, 0.0)) * scale)) for t in tlist} + degree = {w: sum(1 for t in tlist if w in eligibility[t]) for w in types} + + def edge_cost(w): + c = degree[w] + if spare_combustibles and w in combustible: + c += _COMBUSTIBLE_PENALTY + return c + + for i, t in enumerate(tlist): + mc.add_edge(S, 1 + i, tgt_i[t], 0) + for i, t in enumerate(tlist): + for j, w in enumerate(types): + if w in eligibility[t] and avail_i[w] > 0: + mc.add_edge(1 + i, 1 + T + j, avail_i[w], edge_cost(w)) + for j, w in enumerate(types): + mc.add_edge(1 + T + j, SINK, avail_i[w], 0) + + need = sum(tgt_i.values()) + flow = mc.run(S, SINK) + feasible = (flow == need) + + allocation = {t: {} for t in tlist} + for i, t in enumerate(tlist): + for e in mc.g[1 + i]: + v, cap, _c, _r = e + if 1 + T <= v < 1 + T + W: + w = types[v - (1 + T)] + used = avail_i[w] - cap + if used > 0: + allocation[t][w] = used / scale + + return {"feasible": feasible, "allocation": allocation, + "flow": flow / scale, "need": need / scale, + "shortfall": (need - flow) / scale} + + +def max_feasible_target(treatment: str, waste: dict, other_targets: dict, + eligibility: dict = None, + treatments: tuple = NON_COMBUSTION, + scale: int = SCALE) -> float: + """Largest value `treatment`'s slider can reach with the others fixed + (binary search on the grid using solve_allocation as the feasibility test). + For actionable error messages / the future clamp-to-max UX.""" + lo, hi = 0, scale + while lo < hi: + mid = (lo + hi + 1) // 2 + tg = dict(other_targets) + tg[treatment] = mid / scale + if solve_allocation(waste, tg, eligibility, treatments, scale=scale)["feasible"]: + lo = mid + else: + hi = mid - 1 + return lo / scale diff --git a/dst_allocation_prototype.py b/dst_allocation_prototype.py new file mode 100644 index 0000000..e8a58ed --- /dev/null +++ b/dst_allocation_prototype.py @@ -0,0 +1,582 @@ +""" +Prototype: robust DST diversion allocation via min-cost max-flow. + +WHY THIS EXISTS +--------------- +`City.mass_checker_math` (SWEET_python/city_params.py) tries to split user- +requested diversion fractions across waste types using a hand-rolled iterative +redistribution loop. That loop is a local heuristic on what is really a +*transportation / assignment problem*, and it: + - FALSELY REJECTS feasible combinations (the documented bug); + - SILENTLY ACCEPTS some impossible ones, diverting 0 tons (dead guard at + city_params.py:5772-5783, after the early return at 5732); + - CRASHES with a bare AssertionError at city_params.py:6144. + +This file prototypes the correct approach as a standalone, dependency-free +module (plain `python3`, no scipy/networkx/SWEET imports), so we can prove +correctness in isolation before touching city_params.py. + +DESIGN +------ +The 3 "non-combustion" treatments (compost, anaerobic, recycling) contend over +wood + paper_cardboard. We allocate them with min-cost max-flow: + sum_w x[t][w] == target[t] (hit every slider) + sum_t x[t][w] <= waste[w] (don't over-draw any type) + x[t][w] == 0 unless w eligible for t +Combustion is NOT a free consumer: the real model burns a *uniform* fraction of +whatever combustible mass is left (city_params.py:6128-6131). So it stays a +separate post-step: feasible iff target_combustion <= achievable remainder. +To maximize that remainder when combustion > 0, the flow uses a two-tier cost +that steers recycling onto NON-combustible types (metal/glass/other) first. + +VERIFICATION (run: python3 dst_allocation_prototype.py) + - solver vs an INDEPENDENT exact (rational) Hall/Gale oracle: 0 disagreements + - per-feasible-case allocation validity incl. aggregate-mass check + - combustion remainder vs an independent closed-form max-remainder oracle + +Eligibility / waste types mirror SWEET_python/city_params.py:195 (source of +truth). Keep them in sync if that ever changes. +""" + +from __future__ import annotations + +import itertools +import random +from collections import deque +from fractions import Fraction + +# --- mirror of city_params.py:195 (div_components) + waste_types ------------- +WASTE_TYPES = [ + "food", "green", "wood", "paper_cardboard", "textiles", + "plastic", "metal", "glass", "rubber", "other", +] +ELIGIBILITY = { + "compost": {"food", "green", "wood", "paper_cardboard"}, + "anaerobic": {"food", "green", "wood", "paper_cardboard"}, + "combustion": {"food", "green", "wood", "paper_cardboard", + "textiles", "plastic", "rubber"}, + "recycling": {"wood", "paper_cardboard", "textiles", "plastic", + "rubber", "metal", "glass", "other"}, +} +NON_COMBUSTION = ("compost", "anaerobic", "recycling") +COMBUSTIBLE = ELIGIBILITY["combustion"] # types combustion can take +NON_COMBUSTIBLE = set(WASTE_TYPES) - COMBUSTIBLE # metal, glass, other + +# 1e9 grid: feasibility decided to ~1e-9, far below slider/display precision. +SCALE = 10 ** 9 +# Two-tier cost: any combustible edge costs more than any non-combustible edge, +# so when combustion > 0 the 3 treatments spare combustibles. Degree breaks ties +# within each tier (protects contended wood/paper). Max degree is 3 << penalty. +COMBUSTIBLE_PENALTY = 1000 +TOL = 1e-7 + + +# --------------------------------------------------------------------------- +# 1. Generic integer min-cost max-flow (successive shortest paths / SPFA) +# --------------------------------------------------------------------------- +class MinCostMaxFlow: + def __init__(self, n: int): + self.n = n + self.g: list[list[list]] = [[] for _ in range(n)] # [to,cap,cost,rev] + + def add_edge(self, u: int, v: int, cap: int, cost: int) -> None: + self.g[u].append([v, cap, cost, len(self.g[v])]) + self.g[v].append([u, 0, -cost, len(self.g[u]) - 1]) + + def run(self, s: int, t: int) -> tuple[int, int]: + INF = float("inf") + total_flow = total_cost = 0 + while True: + dist = [INF] * self.n + in_q = [False] * self.n + pv = [-1] * self.n + pe = [-1] * self.n + dist[s] = 0 + dq = deque([s]) + in_q[s] = True + while dq: + u = dq.popleft() + in_q[u] = False + du = dist[u] + for i, e in enumerate(self.g[u]): + v, cap, cost, _ = e + if cap > 0 and du + cost < dist[v]: + dist[v] = du + cost + pv[v], pe[v] = u, i + if not in_q[v]: + dq.append(v) + in_q[v] = True + if dist[t] == INF: + break + d = INF + v = t + while v != s: + d = min(d, self.g[pv[v]][pe[v]][1]) + v = pv[v] + v = t + while v != s: + e = self.g[pv[v]][pe[v]] + e[1] -= d + self.g[e[0]][e[3]][1] += d + v = pv[v] + total_flow += d + total_cost += d * dist[t] + return total_flow, total_cost + + +# --------------------------------------------------------------------------- +# 2. Non-combustion allocation solver (compost / anaerobic / recycling) +# --------------------------------------------------------------------------- +def solve_allocation(waste: dict, targets: dict, + treatments: tuple = NON_COMBUSTION, + spare_combustibles: bool = False, + scale: int = SCALE) -> dict: + """Min-cost max-flow allocation. `spare_combustibles` raises the cost of + combustible edges so the leftover combustible remainder is maximized (only + matters when a combustion slider is in play).""" + tlist = list(treatments) + T, W = len(tlist), len(WASTE_TYPES) + S, SINK = 0, 1 + T + W + mc = MinCostMaxFlow(SINK + 1) + + # Symmetric nearest rounding onto the SCALE grid. At SCALE=1e9 the decision + # is exact to ~1e-9 (verified against the exact rational oracle), and crucially + # exactly-fitting boundary cases (demand == capacity, e.g. divert 100% of a + # fully-divertible city) round identically and stay FEASIBLE -- which is the + # whole point. The tiny possible over-allocation (<=0.5/SCALE/type) is far + # below float noise and is caught in aggregate by allocation_is_valid. + avail_i = {w: int(round(Fraction(waste.get(w, 0.0)) * scale)) for w in WASTE_TYPES} + tgt_i = {t: int(round(Fraction(targets.get(t, 0.0)) * scale)) for t in tlist} + + degree = {w: sum(1 for t in tlist if w in ELIGIBILITY[t]) for w in WASTE_TYPES} + + def edge_cost(w): + c = degree[w] + if spare_combustibles and w in COMBUSTIBLE: + c += COMBUSTIBLE_PENALTY + return c + + for i, t in enumerate(tlist): + mc.add_edge(S, 1 + i, tgt_i[t], 0) + for i, t in enumerate(tlist): + for j, w in enumerate(WASTE_TYPES): + if w in ELIGIBILITY[t] and avail_i[w] > 0: + mc.add_edge(1 + i, 1 + T + j, avail_i[w], edge_cost(w)) + for j, w in enumerate(WASTE_TYPES): + mc.add_edge(1 + T + j, SINK, avail_i[w], 0) + + need = sum(tgt_i.values()) + flow, _ = mc.run(S, SINK) + feasible = (flow == need) + + allocation = {t: {} for t in tlist} + for i, t in enumerate(tlist): + for e in mc.g[1 + i]: + v, cap, _c, _r = e + if 1 + T <= v < 1 + T + W: + w = WASTE_TYPES[v - (1 + T)] + used = avail_i[w] - cap + if used > 0: + allocation[t][w] = used / scale + return {"feasible": feasible, "allocation": allocation, + "flow": flow / scale, "need": need / scale, + "shortfall": (need - flow) / scale} + + +# --------------------------------------------------------------------------- +# 3. Full 4-slider solve: 3 via flow, combustion via leftover remainder +# --------------------------------------------------------------------------- +def solve_with_combustion(waste: dict, targets: dict) -> dict: + """Feasibility + allocation for all four sliders. + Returns {feasible, stage, allocation, remainder, max_remainder, reason}.""" + three = {t: targets.get(t, 0.0) for t in NON_COMBUSTION} + tc = targets.get("combustion", 0.0) + res3 = solve_allocation(waste, three, spare_combustibles=(tc > 0)) + if not res3["feasible"]: + return {"feasible": False, "stage": "non_combustion", + "allocation": res3["allocation"], "remainder": None, + "reason": "compost/anaerobic/recycling targets exceed available " + "waste of their eligible types"} + + alloc = {t: dict(res3["allocation"][t]) for t in NON_COMBUSTION} + leftover = {w: waste.get(w, 0.0) - sum(alloc[t].get(w, 0.0) + for t in NON_COMBUSTION) + for w in WASTE_TYPES} + remainder = sum(leftover[w] for w in COMBUSTIBLE) + + if tc > remainder + TOL: + return {"feasible": False, "stage": "combustion", + "allocation": alloc, "remainder": remainder, + "reason": f"combustion {tc:.4f} exceeds leftover combustible " + f"mass {remainder:.4f}"} + + frac = (tc / remainder) if remainder > TOL else 0.0 + alloc["combustion"] = {w: frac * leftover[w] for w in COMBUSTIBLE + if leftover[w] > 0} + return {"feasible": True, "stage": "ok", "allocation": alloc, + "remainder": remainder, "reason": "ok"} + + +def max_feasible_target(treatment: str, waste: dict, other_targets: dict, + treatments: tuple = NON_COMBUSTION) -> float: + """Largest value `treatment`'s slider can reach with others fixed.""" + lo, hi = 0, SCALE + while lo < hi: + mid = (lo + hi + 1) // 2 + tg = dict(other_targets) + tg[treatment] = mid / SCALE + if solve_allocation(waste, tg, treatments)["feasible"]: + lo = mid + else: + hi = mid - 1 + return lo / SCALE + + +# --------------------------------------------------------------------------- +# 4. INDEPENDENT oracles (no shared code / no quantization with the solver) +# --------------------------------------------------------------------------- +def hall_feasible_exact(waste: dict, targets: dict, + treatments: tuple = NON_COMBUSTION) -> bool: + """Exact rational Gale/Hall subset condition. Different math from the flow + solver AND no SCALE rounding, so it can observe the quantization band.""" + tlist = list(treatments) + tgt = {t: Fraction(targets.get(t, 0.0)) for t in tlist} + av = {w: Fraction(waste.get(w, 0.0)) for w in WASTE_TYPES} + for r in range(1, len(tlist) + 1): + for sub in itertools.combinations(tlist, r): + demand = sum(tgt[t] for t in sub) + reach = set().union(*(ELIGIBILITY[t] for t in sub)) + if demand > sum(av[w] for w in reach): + return False + return True + + +def max_remainder_exact(waste: dict, targets: dict) -> Fraction: + """Closed-form max combustible remainder after the 3 (exact). The only + non-combustible mass the 3 can absorb is recycling drawing metal/glass/other, + up to min(t_recycling, metal+glass+other).""" + av = {w: Fraction(waste.get(w, 0.0)) for w in WASTE_TYPES} + total_comb = sum(av[w] for w in COMBUSTIBLE) + sum3 = sum(Fraction(targets.get(t, 0.0)) for t in NON_COMBUSTION) + mgo = sum(av[w] for w in NON_COMBUSTIBLE) + absorbed_noncomb = min(Fraction(targets.get("recycling", 0.0)), mgo) + min_comb_consumed = max(Fraction(0), sum3 - absorbed_noncomb) + return total_comb - min_comb_consumed + + +def combustion_feasible_oracle(waste: dict, targets: dict) -> bool: + if not hall_feasible_exact(waste, targets, NON_COMBUSTION): + return False + return Fraction(targets.get("combustion", 0.0)) <= max_remainder_exact(waste, targets) + + +# --------------------------------------------------------------------------- +# 5. Allocation validity (incl. the aggregate-mass check the fuzz needs) +# --------------------------------------------------------------------------- +def allocation_is_valid(waste: dict, targets: dict, allocation: dict, + treatments: tuple = NON_COMBUSTION, + tol: float = 1e-6) -> tuple[bool, str]: + for t in treatments: + got = sum(allocation.get(t, {}).values()) + if abs(got - targets.get(t, 0.0)) > tol: + return False, f"target {t}: want {targets.get(t,0):.7f} got {got:.7f}" + for t in treatments: + for w, v in allocation.get(t, {}).items(): + if w not in ELIGIBILITY[t]: + return False, f"{t} used ineligible {w}" + if v < -tol: + return False, f"{t}/{w} negative {v}" + for w in WASTE_TYPES: + used = sum(allocation.get(t, {}).get(w, 0.0) for t in treatments) + if used > waste.get(w, 0.0) + tol: + return False, f"over-drew {w}: {used:.7f} > {waste.get(w,0):.7f}" + # aggregate: total drawn cannot exceed total waste (catches stacked rounding) + total_used = sum(sum(allocation.get(t, {}).values()) for t in treatments) + if total_used > sum(waste.values()) + tol: + return False, f"aggregate over-draw {total_used:.7f} > {sum(waste.values()):.7f}" + return True, "ok" + + +# --------------------------------------------------------------------------- +# 6. Curated scenarios + audit regression suite +# --------------------------------------------------------------------------- +def wf(**kw): + return {w: kw.get(w, 0.0) for w in WASTE_TYPES} + + +# (name, waste, targets[no combustion], expected_feasible, note) +SCENARIOS = [ + ("REPRO compost40+recycling50 (100% divertible)", + wf(food=0.10, paper_cardboard=0.45, plastic=0.45), + {"compost": 0.40, "recycling": 0.50}, True, "old code raised"), + ("REPRO at ceiling compost40+recycling60", + wf(food=0.10, paper_cardboard=0.45, plastic=0.45), + {"compost": 0.40, "recycling": 0.60}, True, "exactly fits"), + ("REPRO over ceiling compost40+recycling62", + wf(food=0.10, paper_cardboard=0.45, plastic=0.45), + {"compost": 0.40, "recycling": 0.62}, False, "only paper.15 free"), + ("all food compost100", wf(food=1.0), {"compost": 1.0}, True, ""), + ("all food recycling1 (impossible)", wf(food=1.0), {"recycling": 0.01}, + False, "food not recyclable"), + ("all plastic recycling100", wf(plastic=1.0), {"recycling": 1.0}, True, ""), + ("all plastic compost1 (impossible)", wf(plastic=1.0), {"compost": 0.01}, + False, "plastic not compostable"), + ("only paper compost60+recycling40", wf(paper_cardboard=1.0), + {"compost": 0.60, "recycling": 0.40}, True, "shared, sums to 1"), + ("only paper compost60+recycling50", wf(paper_cardboard=1.0), + {"compost": 0.60, "recycling": 0.50}, False, "1.1>1"), + ("food-heavy compost50 recycling45", + wf(food=0.55, paper_cardboard=0.20, plastic=0.15, metal=0.10), + {"compost": 0.50, "recycling": 0.45}, True, ""), + ("food-heavy infeasible compost60 recycling45", + wf(food=0.55, paper_cardboard=0.20, plastic=0.15, metal=0.10), + {"compost": 0.60, "recycling": 0.45}, False, "1.05>1"), + ("compost+anaerobic over organics each35", + wf(food=0.50, paper_cardboard=0.10, plastic=0.40), + {"compost": 0.35, "anaerobic": 0.35}, False, "0.70>organics 0.60"), + ("all three on mixed", wf(food=0.30, green=0.10, wood=0.10, + paper_cardboard=0.10, plastic=0.20, metal=0.10, glass=0.10), + {"compost": 0.30, "anaerobic": 0.20, "recycling": 0.40}, True, ""), + # --- audit regression: empty-pool / silent-drop (legacy wrongly accepted) - + ("AUDIT empty-pool recycling0.01 on food", + wf(food=1.0), {"recycling": 0.01}, False, "silent-drop guard"), + ("AUDIT sub-tolerance compost0.0003 on textiles", + wf(textiles=1.0), {"compost": 0.0003}, False, "sub-tol must reject"), + ("AUDIT metal compost0.01 (ineligible)", wf(metal=1.0), + {"compost": 0.01}, False, ""), + # --- audit regression: legacy AssertionError(6144) inputs, must solve ---- + ("AUDIT legacy-crash A", + wf(wood=0.18, paper_cardboard=0.064, textiles=0.284, glass=0.369, rubber=0.104), + {"anaerobic": 0.1979, "recycling": 0.4188}, True, "legacy asserted"), + ("AUDIT legacy-crash B", + wf(paper_cardboard=0.433, plastic=0.385, glass=0.182), + {"compost": 0.3585, "recycling": 0.3527}, True, "legacy asserted"), + # --- shared-pool one-grid-step boundary --------------------------------- + ("AUDIT one-step-over shared pool", wf(wood=1.0), + {"compost": 0.5, "recycling": 0.500001}, False, "1 grid step over"), + ("AUDIT one-step-under shared pool", wf(wood=1.0), + {"compost": 0.5, "recycling": 0.499999}, True, ""), + # --- numeric / float dust ----------------------------------------------- + ("AUDIT float dust 0.1+0.2", wf(paper_cardboard=1.0), + {"compost": 0.1 + 0.2}, True, "0.30000000000000004"), + ("AUDIT does-not-sum-to-one 0.97", wf(food=0.55, paper_cardboard=0.20, + plastic=0.22), {"recycling": 0.42}, True, "recyclables .42"), + ("AUDIT does-not-sum-to-one 0.97 over", wf(food=0.55, paper_cardboard=0.20, + plastic=0.22), {"recycling": 0.43}, False, "recyclables only .42"), +] + +# (name, waste, targets WITH combustion, expected_feasible, note) +COMBUSTION_SCENARIOS = [ + ("C4 combustible-sparing (THE coupling case)", + wf(food=0.30, paper_cardboard=0.30, metal=0.30, plastic=0.10), + {"recycling": 0.30, "combustion": 0.55}, True, + "feasible only if recycling takes metal -> remainder 0.70"), + ("C5 combustion just over", + wf(food=0.30, paper_cardboard=0.30, metal=0.30, plastic=0.10), + {"recycling": 0.30, "combustion": 0.71}, False, "0.71>max_remainder 0.70"), + ("C6 organics-eaten ok", + wf(food=0.50, paper_cardboard=0.20, plastic=0.30), + {"compost": 0.40, "combustion": 0.55}, True, ""), + ("C7 organics-eaten over", + wf(food=0.50, paper_cardboard=0.20, plastic=0.30), + {"compost": 0.40, "combustion": 0.65}, False, ""), + ("C9 max-remainder (spare paper via metal)", + wf(paper_cardboard=0.60, metal=0.40), + {"compost": 0.30, "recycling": 0.30, "combustion": 0.01}, True, ""), + ("C10 recycling starves combustion (no metal/glass/other)", + wf(food=0.20, paper_cardboard=0.40, plastic=0.40), + {"compost": 0.20, "recycling": 0.40, "combustion": 0.45}, False, + "max_remainder 0.40 < 0.45"), + ("C-divzero guard (no combustible left)", + wf(paper_cardboard=1.0), + {"compost": 0.60, "recycling": 0.40, "combustion": 0.05}, False, + "remainder 0 -> infeasible, must not ZeroDivisionError"), + ("C-divzero combustion0 ok", wf(paper_cardboard=1.0), + {"compost": 0.60, "recycling": 0.40, "combustion": 0.0}, True, ""), + ("C combustion on non-combustible only", wf(metal=1.0), + {"combustion": 0.10}, False, "remainder 0"), + ("C full 4-slider tight boundary", + wf(food=0.25, green=0.10, wood=0.05, paper_cardboard=0.15, plastic=0.15, + metal=0.10, glass=0.10, textiles=0.05), + {"compost": 0.30, "recycling": 0.25, "combustion": 0.40}, True, + "t_comb == max_remainder"), +] + + +# --------------------------------------------------------------------------- +# 7. Runners + fuzz +# --------------------------------------------------------------------------- +def run_curated() -> list: + print("=" * 78) + print("NON-COMBUSTION SCENARIOS (solver vs exact oracle vs expectation)") + print("=" * 78) + tricky = [] + for name, waste, targets, expected, note in SCENARIOS: + res = solve_allocation(waste, targets) + orc = hall_feasible_exact(waste, targets) + valid, why = (allocation_is_valid(waste, targets, res["allocation"]) + if res["feasible"] else (True, "n/a")) + ok = (res["feasible"] == orc == expected) and valid + print(f"[{'OK ' if ok else '!!!'}] {name}") + if not ok: + print(f" solver={res['feasible']} oracle={orc} " + f"expected={expected} valid={valid} ({why}) :: {note}") + tricky.append((name, waste, targets, expected, res, orc)) + return tricky + + +def run_combustion() -> list: + print() + print("=" * 78) + print("COMBUSTION SCENARIOS (solve_with_combustion vs closed-form oracle)") + print("=" * 78) + tricky = [] + for name, waste, targets, expected, note in COMBUSTION_SCENARIOS: + try: + res = solve_with_combustion(waste, targets) + feas = res["feasible"] + crashed = None + except Exception as e: + feas, crashed = None, f"{type(e).__name__}: {e}" + orc = combustion_feasible_oracle(waste, targets) + valid, why = (allocation_is_valid(waste, targets, res["allocation"], NON_COMBUSTION) + if (crashed is None and feas) else (True, "n/a")) + ok = (crashed is None) and (feas == orc == expected) and valid + print(f"[{'OK ' if ok else '!!!'}] {name}") + if not ok: + print(f" solver={feas} oracle={orc} expected={expected} " + f"valid={valid} crash={crashed} ({why}) :: {note}") + tricky.append((name, waste, targets, expected)) + return tricky + + +def run_repro_sweep() -> None: + print() + print("=" * 78) + print("REPRO SWEEP") + print("=" * 78) + waste = wf(food=0.10, paper_cardboard=0.45, plastic=0.45) + accepted = [r for r in range(0, 64, 2) + if solve_allocation(waste, {"compost": 0.40, "recycling": r / 100})["feasible"]] + print(f"waste food.10/paper.45/plastic.45, compost40%: new solver accepts " + f"recycling {accepted[0]}%..{accepted[-1]}% (old code: 0..32%)") + print(f"max_feasible_target(recycling|compost40) = " + f"{max_feasible_target('recycling', waste, {'compost': 0.40})*100:.4f}%") + + +def random_waste(rng): + k = rng.choice([1, 2, 3, 3, 4, 5, 7, 10]) + ts = rng.sample(WASTE_TYPES, k) + raw = [rng.random() for _ in ts] + s = sum(raw) or 1.0 + return wf(**{t: r / s for t, r in zip(ts, raw)}) + + +def fuzz(n=200_000, seed=12345): + print() + print("=" * 78) + print(f"FUZZ {n:,} cases (solver vs EXACT rational oracle; band-aware)") + print("=" * 78) + rng = random.Random(seed) + mis, invalid, band = [], [], 0 + for _ in range(n): + waste = random_waste(rng) + targets = {t: round(rng.random() * rng.choice([0.3, 0.6, 1.0]), 4) + for t in NON_COMBUSTION if rng.random() < 0.75} + res = solve_allocation(waste, targets) + orc = hall_feasible_exact(waste, targets) + if res["feasible"] != orc: + # disagreement only acceptable within one grid step of a boundary + slack = _boundary_slack(waste, targets) + if slack <= Fraction(1, SCALE): + band += 1 + else: + mis.append((waste, targets, res["feasible"], orc, float(slack))) + elif res["feasible"]: + okv, why = allocation_is_valid(waste, targets, res["allocation"]) + if not okv: + invalid.append((waste, targets, why)) + print(f"genuine feasibility mismatches : {len(mis)}") + print(f"within-grid-band disagreements : {band} (expected, |slack|<=1/SCALE)") + print(f"invalid allocations : {len(invalid)}") + for waste, targets, sv, ov, sl in mis[:3]: + nz = {k: round(v, 5) for k, v in waste.items() if v} + print(f" MISMATCH waste={nz} t={targets} solver={sv} oracle={ov} slack={sl:.2e}") + for waste, targets, why in invalid[:3]: + print(f" INVALID {why}") + return mis + invalid + + +def fuzz_combustion(n=200_000, seed=999): + print() + print("=" * 78) + print(f"FUZZ COMBUSTION {n:,} cases (solve_with_combustion vs oracle)") + print("=" * 78) + rng = random.Random(seed) + mis, invalid, crashes, band = [], [], [], 0 + for _ in range(n): + waste = random_waste(rng) + targets = {t: round(rng.random() * rng.choice([0.3, 0.6, 1.0]), 4) + for t in ("compost", "anaerobic", "recycling", "combustion") + if rng.random() < 0.7} + try: + res = solve_with_combustion(waste, targets) + except Exception as e: + crashes.append((waste, targets, f"{type(e).__name__}: {e}")) + continue + orc = combustion_feasible_oracle(waste, targets) + if res["feasible"] != orc: + slack = _boundary_slack_comb(waste, targets) + if slack <= Fraction(2, SCALE): + band += 1 + else: + mis.append((waste, targets, res["feasible"], orc, float(slack))) + elif res["feasible"]: + okv, why = allocation_is_valid(waste, targets, res["allocation"], NON_COMBUSTION) + if not okv: + invalid.append((waste, targets, why)) + print(f"genuine feasibility mismatches : {len(mis)}") + print(f"within-grid-band disagreements : {band}") + print(f"invalid allocations : {len(invalid)}") + print(f"crashes : {len(crashes)}") + for waste, targets, sv, ov, sl in mis[:3]: + nz = {k: round(v, 5) for k, v in waste.items() if v} + print(f" MISMATCH waste={nz} t={targets} solver={sv} oracle={ov} slack={sl:.2e}") + for waste, targets, msg in crashes[:3]: + print(f" CRASH {msg}") + return mis + invalid + crashes + + +def _boundary_slack(waste, targets, treatments=NON_COMBUSTION): + """Smallest absolute Hall-constraint margin (how close to a boundary).""" + tgt = {t: Fraction(targets.get(t, 0.0)) for t in treatments} + av = {w: Fraction(waste.get(w, 0.0)) for w in WASTE_TYPES} + best = None + for r in range(1, len(treatments) + 1): + for sub in itertools.combinations(treatments, r): + demand = sum(tgt[t] for t in sub) + cap = sum(av[w] for w in set().union(*(ELIGIBILITY[t] for t in sub))) + m = abs(demand - cap) + best = m if best is None else min(best, m) + return best if best is not None else Fraction(1) + + +def _boundary_slack_comb(waste, targets): + s1 = _boundary_slack(waste, targets, NON_COMBUSTION) + s2 = abs(Fraction(targets.get("combustion", 0.0)) - max_remainder_exact(waste, targets)) + return min(s1, s2) + + +if __name__ == "__main__": + assert {k: set(v) for k, v in ELIGIBILITY.items()}["compost"] == \ + {"food", "green", "wood", "paper_cardboard"}, "eligibility drift!" + t1 = run_curated() + t2 = run_combustion() + run_repro_sweep() + p1 = fuzz() + p2 = fuzz_combustion() + print() + print("=" * 78) + print(f"SUMMARY noncomb_tricky={len(t1)} comb_tricky={len(t2)} " + f"fuzz_problems={len(p1)} comb_fuzz_problems={len(p2)}") + print("=" * 78) diff --git a/tests/test_dst_allocation.py b/tests/test_dst_allocation.py new file mode 100644 index 0000000..0951003 --- /dev/null +++ b/tests/test_dst_allocation.py @@ -0,0 +1,257 @@ +""" +Unit tests for the DST min-cost-max-flow allocator (SWEET_python.dst_allocation). + +These are pure (no City / DB / geopy needed). They: + - lock in the documented bug fix and the audit regression suite; + - cross-check the solver against an INDEPENDENT exact-rational Hall/Gale + feasibility oracle (different math, no shared quantization) on a fuzz sweep; + - verify combustion's leftover-remainder feasibility + the divide-by-zero guard. + +The full standalone harness (~400k-case fuzz) is SWEET_python/dst_allocation_prototype.py. +""" + +import itertools +import random +from fractions import Fraction + +import pytest + +from SWEET_python.dst_allocation import ( + DEFAULT_ELIGIBILITY, + NON_COMBUSTION, + SCALE, + solve_allocation, + max_feasible_target, +) + +ELIG = DEFAULT_ELIGIBILITY +WASTE_TYPES = ["food", "green", "wood", "paper_cardboard", "textiles", + "plastic", "metal", "glass", "rubber", "other"] +COMBUSTIBLE = ELIG["combustion"] +NON_COMBUSTIBLE = set(WASTE_TYPES) - COMBUSTIBLE + + +def wf(**kw): + return {w: kw.get(w, 0.0) for w in WASTE_TYPES} + + +# --------------------------------------------------------------------------- # +# Independent oracles (exact rational; share no code with the flow solver) +# --------------------------------------------------------------------------- # +def hall_feasible_exact(waste, targets, treatments=NON_COMBUSTION): + tgt = {t: Fraction(targets.get(t, 0.0)) for t in treatments} + av = {w: Fraction(waste.get(w, 0.0)) for w in WASTE_TYPES} + for r in range(1, len(treatments) + 1): + for sub in itertools.combinations(treatments, r): + demand = sum(tgt[t] for t in sub) + reach = set().union(*(ELIG[t] for t in sub)) + if demand > sum(av[w] for w in reach): + return False + return True + + +def max_remainder_exact(waste, targets): + av = {w: Fraction(waste.get(w, 0.0)) for w in WASTE_TYPES} + total_comb = sum(av[w] for w in COMBUSTIBLE) + sum3 = sum(Fraction(targets.get(t, 0.0)) for t in NON_COMBUSTION) + mgo = sum(av[w] for w in NON_COMBUSTIBLE) + absorbed = min(Fraction(targets.get("recycling", 0.0)), mgo) + return total_comb - max(Fraction(0), sum3 - absorbed) + + +def combustion_feasible(waste, targets): + return (hall_feasible_exact(waste, targets) + and Fraction(targets.get("combustion", 0.0)) <= max_remainder_exact(waste, targets)) + + +def allocation_is_valid(waste, targets, allocation, treatments=NON_COMBUSTION, tol=1e-6): + for t in treatments: + if abs(sum(allocation.get(t, {}).values()) - targets.get(t, 0.0)) > tol: + return False + for t in treatments: + for w, v in allocation.get(t, {}).items(): + if w not in ELIG[t] or v < -tol: + return False + for w in WASTE_TYPES: + if sum(allocation.get(t, {}).get(w, 0.0) for t in treatments) > waste.get(w, 0.0) + tol: + return False + if sum(sum(allocation.get(t, {}).values()) for t in treatments) > sum(waste.values()) + tol: + return False + return True + + +# --------------------------------------------------------------------------- # +# Curated + audit regression scenarios (no combustion) +# (name, waste, targets, expected_feasible) +# --------------------------------------------------------------------------- # +NONCOMBUSTION_CASES = [ + ("repro_compost40_recycling50", + wf(food=0.10, paper_cardboard=0.45, plastic=0.45), + {"compost": 0.40, "recycling": 0.50}, True), + ("repro_at_ceiling_recycling60", + wf(food=0.10, paper_cardboard=0.45, plastic=0.45), + {"compost": 0.40, "recycling": 0.60}, True), + ("repro_over_ceiling_recycling62", + wf(food=0.10, paper_cardboard=0.45, plastic=0.45), + {"compost": 0.40, "recycling": 0.62}, False), + ("all_food_compost100", wf(food=1.0), {"compost": 1.0}, True), + ("all_food_recycling1_impossible", wf(food=1.0), {"recycling": 0.01}, False), + ("all_plastic_recycling100", wf(plastic=1.0), {"recycling": 1.0}, True), + ("all_plastic_compost1_impossible", wf(plastic=1.0), {"compost": 0.01}, False), + ("only_paper_60_40_fits", wf(paper_cardboard=1.0), + {"compost": 0.60, "recycling": 0.40}, True), + ("only_paper_60_50_over", wf(paper_cardboard=1.0), + {"compost": 0.60, "recycling": 0.50}, False), + ("food_heavy_50_45", wf(food=0.55, paper_cardboard=0.20, plastic=0.15, metal=0.10), + {"compost": 0.50, "recycling": 0.45}, True), + ("food_heavy_60_45_over", wf(food=0.55, paper_cardboard=0.20, plastic=0.15, metal=0.10), + {"compost": 0.60, "recycling": 0.45}, False), + ("compost_anaerobic_over_organics", wf(food=0.50, paper_cardboard=0.10, plastic=0.40), + {"compost": 0.35, "anaerobic": 0.35}, False), + ("all_three_mixed", wf(food=0.30, green=0.10, wood=0.10, paper_cardboard=0.10, + plastic=0.20, metal=0.10, glass=0.10), + {"compost": 0.30, "anaerobic": 0.20, "recycling": 0.40}, True), + # audit: empty-pool / silent-drop (legacy wrongly "accepted" with 0 tons) + ("audit_empty_pool_recycling_on_food", wf(food=1.0), {"recycling": 0.01}, False), + ("audit_subtolerance_compost_on_textiles", wf(textiles=1.0), {"compost": 0.0003}, False), + ("audit_metal_compost_ineligible", wf(metal=1.0), {"compost": 0.01}, False), + # audit: inputs that crashed legacy with AssertionError(city_params.py:6144) + ("audit_legacy_crash_A", + wf(wood=0.18, paper_cardboard=0.064, textiles=0.284, glass=0.369, rubber=0.104), + {"anaerobic": 0.1979, "recycling": 0.4188}, True), + ("audit_legacy_crash_B", + wf(paper_cardboard=0.433, plastic=0.385, glass=0.182), + {"compost": 0.3585, "recycling": 0.3527}, True), + # boundary / numeric + ("one_step_over_shared_pool", wf(wood=1.0), + {"compost": 0.5, "recycling": 0.500001}, False), + ("one_step_under_shared_pool", wf(wood=1.0), + {"compost": 0.5, "recycling": 0.499999}, True), + ("float_dust_0p1_plus_0p2", wf(paper_cardboard=1.0), {"compost": 0.1 + 0.2}, True), + ("not_sum_to_one_097", wf(food=0.55, paper_cardboard=0.20, plastic=0.22), + {"recycling": 0.42}, True), + ("not_sum_to_one_097_over", wf(food=0.55, paper_cardboard=0.20, plastic=0.22), + {"recycling": 0.43}, False), +] + + +@pytest.mark.parametrize("name,waste,targets,expected", + NONCOMBUSTION_CASES, ids=[c[0] for c in NONCOMBUSTION_CASES]) +def test_noncombustion_feasibility(name, waste, targets, expected): + res = solve_allocation(waste, targets, ELIG) + assert res["feasible"] is expected + if expected: + assert allocation_is_valid(waste, targets, res["allocation"]) + + +@pytest.mark.parametrize("name,waste,targets,expected", + NONCOMBUSTION_CASES, ids=[c[0] for c in NONCOMBUSTION_CASES]) +def test_noncombustion_matches_exact_oracle(name, waste, targets, expected): + assert solve_allocation(waste, targets, ELIG)["feasible"] is hall_feasible_exact(waste, targets) + + +def test_repro_full_band_reachable(): + """The documented bug: old code accepted only recycling 0..32%; the true + feasible ceiling with compost=40% is 60%.""" + waste = wf(food=0.10, paper_cardboard=0.45, plastic=0.45) + accepted = [r for r in range(0, 101) + if solve_allocation(waste, {"compost": 0.40, "recycling": r / 100}, ELIG)["feasible"]] + assert accepted == list(range(0, 61)) # 0..60 inclusive + assert max_feasible_target("recycling", waste, {"compost": 0.40}, ELIG) == pytest.approx(0.60, abs=1e-6) + + +# --------------------------------------------------------------------------- # +# Combustion (caller-side remainder step) — validated via the prototype's logic +# (name, waste, targets WITH combustion, expected_feasible) +# --------------------------------------------------------------------------- # +def solve_with_combustion(waste, targets): + """Mirror of City.mass_checker_math's intended combustion handling, for + test purposes: solve the 3 with combustible-sparing, then check the + leftover-remainder ceiling.""" + three = {t: targets.get(t, 0.0) for t in NON_COMBUSTION} + tc = targets.get("combustion", 0.0) + res = solve_allocation(waste, three, ELIG, spare_combustibles=(tc > 0)) + if not res["feasible"]: + return False + alloc = res["allocation"] + leftover_comb = sum( + waste.get(w, 0.0) - sum(alloc[t].get(w, 0.0) for t in NON_COMBUSTION) + for w in COMBUSTIBLE + ) + return tc <= leftover_comb + 1e-9 + + +COMBUSTION_CASES = [ + ("C4_combustible_sparing_coupling", + wf(food=0.30, paper_cardboard=0.30, metal=0.30, plastic=0.10), + {"recycling": 0.30, "combustion": 0.55}, True), + ("C5_combustion_just_over", + wf(food=0.30, paper_cardboard=0.30, metal=0.30, plastic=0.10), + {"recycling": 0.30, "combustion": 0.71}, False), + ("C6_organics_eaten_ok", wf(food=0.50, paper_cardboard=0.20, plastic=0.30), + {"compost": 0.40, "combustion": 0.55}, True), + ("C7_organics_eaten_over", wf(food=0.50, paper_cardboard=0.20, plastic=0.30), + {"compost": 0.40, "combustion": 0.65}, False), + ("C9_max_remainder_spare_paper_via_metal", wf(paper_cardboard=0.60, metal=0.40), + {"compost": 0.30, "recycling": 0.30, "combustion": 0.01}, True), + ("C10_recycling_starves_combustion", + wf(food=0.20, paper_cardboard=0.40, plastic=0.40), + {"compost": 0.20, "recycling": 0.40, "combustion": 0.45}, False), + ("C_divzero_no_combustible_left", wf(paper_cardboard=1.0), + {"compost": 0.60, "recycling": 0.40, "combustion": 0.05}, False), + ("C_divzero_combustion0_ok", wf(paper_cardboard=1.0), + {"compost": 0.60, "recycling": 0.40, "combustion": 0.0}, True), + ("C_combustion_noncombustible_only", wf(metal=1.0), {"combustion": 0.10}, False), + ("C_full_4slider_tight_boundary", + wf(food=0.25, green=0.10, wood=0.05, paper_cardboard=0.15, plastic=0.15, + metal=0.10, glass=0.10, textiles=0.05), + {"compost": 0.30, "recycling": 0.25, "combustion": 0.40}, True), +] + + +@pytest.mark.parametrize("name,waste,targets,expected", + COMBUSTION_CASES, ids=[c[0] for c in COMBUSTION_CASES]) +def test_combustion_feasibility(name, waste, targets, expected): + assert solve_with_combustion(waste, targets) is expected + assert combustion_feasible(waste, targets) is expected # vs closed-form oracle + + +# --------------------------------------------------------------------------- # +# Fuzz: solver vs independent exact oracle (band-aware) +# --------------------------------------------------------------------------- # +def _random_waste(rng): + k = rng.choice([1, 2, 3, 3, 4, 5, 7, 10]) + ts = rng.sample(WASTE_TYPES, k) + raw = [rng.random() for _ in ts] + s = sum(raw) or 1.0 + return wf(**{t: r / s for t, r in zip(ts, raw)}) + + +def _boundary_slack(waste, targets, treatments=NON_COMBUSTION): + tgt = {t: Fraction(targets.get(t, 0.0)) for t in treatments} + av = {w: Fraction(waste.get(w, 0.0)) for w in WASTE_TYPES} + best = Fraction(1) + for r in range(1, len(treatments) + 1): + for sub in itertools.combinations(treatments, r): + demand = sum(tgt[t] for t in sub) + cap = sum(av[w] for w in set().union(*(ELIG[t] for t in sub))) + best = min(best, abs(demand - cap)) + return best + + +def test_fuzz_solver_matches_exact_oracle(): + rng = random.Random(20260622) + genuine = 0 + invalid = 0 + for _ in range(20000): + waste = _random_waste(rng) + targets = {t: round(rng.random() * rng.choice([0.3, 0.6, 1.0]), 4) + for t in NON_COMBUSTION if rng.random() < 0.75} + res = solve_allocation(waste, targets, ELIG) + if res["feasible"] != hall_feasible_exact(waste, targets): + if _boundary_slack(waste, targets) > Fraction(1, SCALE): + genuine += 1 # only count disagreements outside the grid band + elif res["feasible"] and not allocation_is_valid(waste, targets, res["allocation"]): + invalid += 1 + assert genuine == 0 + assert invalid == 0 diff --git a/tests/test_food_waste_prevention.py b/tests/test_food_waste_prevention.py new file mode 100644 index 0000000..6a39744 --- /dev/null +++ b/tests/test_food_waste_prevention.py @@ -0,0 +1,112 @@ +"""Regression tests for food-waste-prevention fraction renormalization in +`City.implement_dst_changes_simple_v1_5`. + +Bug (fixed): food prevention removes food mass and shrinks the total, but the +non-food waste fractions were rescaled by ``old_nonfood_total / new_nonfood_total`` +instead of by the actual total-reduction factor. That over-inflated every +non-food share, so the diversion allocator believed more metal / glass / other / +textiles existed than the (unchanged) masses actually hold. The downstream +mass check then reported a spurious ``Negative mass for `` and the city +DST chart errored — e.g. Algiers with composting + recycling + food prevention. + +The fix rescales *every* fraction by ``1 - food_waste_prevention * food_fraction`` +(the same factor ``waste_mass`` is reduced by), preserving the invariant + + waste_fractions[w] * waste_mass == waste_masses[w] for every waste type w. + +These tests build a synthetic, food-heavy city from country defaults +(``dst_baseline_blank`` — no database required) and exercise the real +``implement_dst_changes_simple_v1_5`` code path. On the pre-fix code the +food-prevention tests fail (negative ``textiles`` for this city at fwp >= 0.25, +and the renormalization formula is wrong); they pass on the fixed code. +""" + +import pandas as pd +import pytest + +from SWEET_python.city_params import City, CustomError, DiversionFractions + +# Food-heavy synthetic city (food ~50%) built purely from country defaults. +_COUNTRY, _POP, _PRECIP, _TEMP = "Algeria", 2_594_000, 716.81, 18.38 +# A diversion feasible with no food prevention that triggered the bug once food +# prevention was applied (composting + recycling contend, pushing recycling onto +# the over-inflated non-food pools). Kept within the food-prevention range where +# it stays genuinely feasible (at very high prevention the compostable pool +# shrinks below the compost target, which is a *correct* rejection, not the bug). +_COMPOST, _RECYCLING = 0.40, 0.40 +_IMPLEMENT_YEAR, _SCENARIO = 2026, 1 + + +def _fresh_city() -> City: + city = City("fwp_regression") + city.dst_baseline_blank(_COUNTRY, _POP, _PRECIP, _TEMP) + return city + + +def _implement(city: City, food_waste_prevention: float, compost: float, recycling: float): + div_fractions = DiversionFractions( + compost=compost, anaerobic=0.0, combustion=0.0, recycling=recycling + ) + city.implement_dst_changes_simple_v1_5( + div_fractions, 0, 0, 0.0, 0.0, _IMPLEMENT_YEAR, _SCENARIO, food_waste_prevention + ) + + +def _net_masses(scenario_parameters) -> dict: + nm = scenario_parameters.net_masses + if isinstance(nm, pd.DataFrame): + return {col: float(nm[col].min()) for col in nm.columns} + return {key: float(value) for key, value in nm.items()} + + +def test_diversion_is_feasible_without_food_prevention(): + """Sanity: the chosen diversion is feasible at fwp=0, so the food-prevention + assertions below are not vacuously true.""" + city = _fresh_city() + _implement(city, 0.0, _COMPOST, _RECYCLING) # must not raise + negatives = {w: v for w, v in _net_masses(city.scenario_parameters[0]).items() if v < -1e-6} + assert not negatives, f"baseline diversion already infeasible: {negatives}" + + +@pytest.mark.parametrize("fwp", [0.0, 0.25, 0.5]) +def test_food_prevention_does_not_create_negative_mass(fwp): + """A diversion feasible at fwp=0 stays feasible as food prevention rises + (prevention removes waste, so it can only make diversion easier). Pre-fix + this raised 'Negative mass for textiles' for this city at fwp >= 0.25.""" + city = _fresh_city() + try: + _implement(city, fwp, _COMPOST, _RECYCLING) + except CustomError as exc: + pytest.fail( + f"food prevention {fwp:.0%} wrongly rejected a feasible diversion: {exc.message}" + ) + negatives = {w: v for w, v in _net_masses(city.scenario_parameters[0]).items() if v < -1e-6} + assert not negatives, f"negative net masses at fwp={fwp:.0%}: {negatives}" + + +@pytest.mark.parametrize("fwp", [0.25, 0.5, 0.75, 0.9]) +def test_food_prevention_preserves_fraction_renormalization(fwp): + """After food prevention every fraction is the original divided by the + total-reduction factor (food additionally scaled by ``1 - fwp``), and the + row still sums to 1. This is the exact invariant the bug broke for non-food + types (it scaled them by old_nonfood/new_nonfood instead). Uses zero + diversion so the renormalization is isolated from any feasibility limit.""" + city = _fresh_city() + base = city.baseline_parameters.waste_fractions.iloc[0, :].copy() + food0 = float(base["food"]) + total_scale = 1 - fwp * food0 # reduced_total / original_total + + _implement(city, fwp, compost=0.0, recycling=0.0) + scenario = city.scenario_parameters[0].waste_fractions.iloc[0, :] + + for waste_type, base_frac in base.items(): + if waste_type == "food": + expected = food0 * (1 - fwp) / total_scale + else: + expected = float(base_frac) / total_scale + assert float(scenario[waste_type]) == pytest.approx(expected, rel=1e-9, abs=1e-12), ( + f"renormalization wrong for {waste_type} at fwp={fwp:.0%}: " + f"got {float(scenario[waste_type])}, expected {expected}" + ) + + assert float(scenario.sum()) == pytest.approx(1.0, abs=1e-9) diff --git a/tests/test_mass_checker_integration.py b/tests/test_mass_checker_integration.py new file mode 100644 index 0000000..9e02336 --- /dev/null +++ b/tests/test_mass_checker_integration.py @@ -0,0 +1,109 @@ +""" +Integration tests for City.mass_checker_math after the min-cost-max-flow swap-in. + +These exercise the REAL method (feasibility / accept-reject + no crashes), +stubbing only `_divs_from_component_fractions` (the DB/mass-dependent output +builder) so no database or city data is needed. They lock in: + - the documented false-rejection fix (full feasible band reachable), + - the under-delivery / empty-pool "silent accept" fix, + - the legacy AssertionError-crash inputs now solving cleanly, + - combustion remainder feasibility + divide-by-zero guard, + - the no-conflict happy path still being taken. + +Requires the SWEET_python runtime deps (pydantic, numpy, geopy, ...). +""" + +import pytest + +from SWEET_python.city_params import ( + City, CustomError, WasteFractions, DiversionFractions, DivComponentFractions, +) + +WT = ["food", "green", "wood", "paper_cardboard", "textiles", + "plastic", "metal", "glass", "rubber", "other"] +ELIG = { + "compost": ["food", "green", "wood", "paper_cardboard"], + "anaerobic": ["food", "green", "wood", "paper_cardboard"], + "combustion": ["food", "green", "wood", "paper_cardboard", "textiles", "plastic", "rubber"], + "recycling": ["wood", "paper_cardboard", "textiles", "plastic", "rubber", "metal", "glass", "other"], +} + + +def _wf(**kw): + b = {w: 0.0 for w in WT} + b.update(kw) + return b + + +def _norm(waste, comps): + s = sum(waste[c] for c in comps) + return _wf(**{c: (waste[c] / s if s > 0 else 0.0) for c in comps}) + + +def _run(waste, targets): + """Return True if mass_checker_math accepts, False if it raises CustomError.""" + city = City("test") + city._divs_from_component_fractions = lambda *a, **k: "OK" # stub DB/mass builder + dcf = DivComponentFractions(**{d: WasteFractions(**_norm(waste, ELIG[d])) for d in ELIG}) + df = DiversionFractions( + compost=targets.get("compost", 0.0), + anaerobic=targets.get("anaerobic", 0.0), + combustion=targets.get("combustion", 0.0), + recycling=targets.get("recycling", 0.0), + ) + try: + city.mass_checker_math(div_fractions=df, div_component_fractions=dcf, + waste_fractions=WasteFractions(**waste), scenario=1) + return True + except CustomError: + return False + + +CASES = [ + ("repro_c40_r50", _wf(food=.10, paper_cardboard=.45, plastic=.45), + {"compost": .40, "recycling": .50}, True), + ("repro_c40_r60_ceiling", _wf(food=.10, paper_cardboard=.45, plastic=.45), + {"compost": .40, "recycling": .60}, True), + ("repro_c40_r62_over", _wf(food=.10, paper_cardboard=.45, plastic=.45), + {"compost": .40, "recycling": .62}, False), + ("all_food_compost100", _wf(food=1.0), {"compost": 1.0}, True), + ("all_food_recycling1_impossible", _wf(food=1.0), {"recycling": .01}, False), + ("subtol_compost_on_textiles", _wf(textiles=1.0), {"compost": .0003}, False), + ("metal_compost_ineligible", _wf(metal=1.0), {"compost": .01}, False), + ("only_paper_60_40", _wf(paper_cardboard=1.0), {"compost": .60, "recycling": .40}, True), + ("only_paper_60_50_over", _wf(paper_cardboard=1.0), {"compost": .60, "recycling": .50}, False), + ("modest_no_conflict_happy_path", + _wf(food=.40, green=.10, wood=.05, paper_cardboard=.15, plastic=.10, + metal=.05, glass=.05, textiles=.05, other=.05), + {"compost": .20, "recycling": .20}, True), + ("legacy_crash_A", + _wf(wood=.18, paper_cardboard=.064, textiles=.284, glass=.369, rubber=.104), + {"anaerobic": .1979, "recycling": .4188}, True), + ("legacy_crash_B", _wf(paper_cardboard=.433, plastic=.385, glass=.182), + {"compost": .3585, "recycling": .3527}, True), + ("C4_combustible_sparing", _wf(food=.30, paper_cardboard=.30, metal=.30, plastic=.10), + {"recycling": .30, "combustion": .55}, True), + ("C5_combustion_over", _wf(food=.30, paper_cardboard=.30, metal=.30, plastic=.10), + {"recycling": .30, "combustion": .71}, False), + ("C_divzero_no_combustible", _wf(paper_cardboard=1.0), + {"compost": .60, "recycling": .40, "combustion": .05}, False), + ("C_divzero_combustion0_ok", _wf(paper_cardboard=1.0), + {"compost": .60, "recycling": .40, "combustion": 0.0}, True), + ("full_4slider_boundary", + _wf(food=.25, green=.10, wood=.05, paper_cardboard=.15, plastic=.15, + metal=.10, glass=.10, textiles=.05), + {"compost": .30, "recycling": .25, "combustion": .40}, True), +] + + +@pytest.mark.parametrize("name,waste,targets,expected", CASES, ids=[c[0] for c in CASES]) +def test_mass_checker_accept_reject(name, waste, targets, expected): + assert _run(waste, targets) is expected + + +def test_repro_full_band_no_false_rejection(): + """compost=40%: every recycling 0..60% must be accepted, 61..100% rejected.""" + waste = _wf(food=.10, paper_cardboard=.45, plastic=.45) + for r in range(0, 101): + accepted = _run(waste, {"compost": .40, "recycling": r / 100}) + assert accepted is (r <= 60), f"recycling {r}% -> {accepted}"