Skip to content

Commit 1e925d5

Browse files
feat: enhance rollup aggregation with dedup logic (#1843)
* feat: enhance tree_tests_rollup aggregation with dedup logic * test: add unit tests for rollup entry correction behavior
1 parent eab552c commit 1e925d5

3 files changed

Lines changed: 340 additions & 4 deletions

File tree

backend/kernelCI_app/management/commands/helpers/process_pending_helpers.py

Lines changed: 14 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -53,6 +53,8 @@ def extract_path_group(path: str) -> str:
5353
def accumulate_rollup_entry(
5454
rollup_data: dict[tuple, dict],
5555
entry: RollupEntryData,
56+
*,
57+
is_correction: bool = False,
5658
) -> None:
5759
"""Accumulate a single test entry into rollup_data in-place."""
5860
checkout = entry["checkout"]
@@ -91,21 +93,30 @@ def accumulate_rollup_entry(
9193
)
9294

9395
counter = ROLLUP_STATUS_FIELDS.get(entry["status"], "null_tests")
94-
record[counter] += 1
95-
record["total_tests"] += 1
96+
97+
if is_correction:
98+
record["null_tests"] -= 1
99+
record[counter] += 1
100+
else:
101+
record[counter] += 1
102+
record["total_tests"] += 1
96103

97104

98105
def aggregate_tests_rollup(
99106
ready_tests: Sequence[PendingTest],
100107
test_builds_by_id: dict[str, Builds],
101108
issues_map: dict[str, dict],
109+
reprocess_test_ids: set[str] | None = None,
102110
) -> dict[tuple, dict]:
103111
"""
104112
Build rollup data from pending tests.
105113
Returns rollup data without touching the database.
106114
"""
107115
rollup_data: dict[tuple, dict] = {}
108116

117+
if reprocess_test_ids is None:
118+
reprocess_test_ids = set()
119+
109120
for test in ready_tests:
110121
# shouldn't happen, but being defensive here
111122
try:
@@ -155,6 +166,7 @@ def aggregate_tests_rollup(
155166
"is_boot": test.is_boot,
156167
"status": test.full_status,
157168
},
169+
is_correction=test.test_id in reprocess_test_ids,
158170
)
159171

160172
return rollup_data

backend/kernelCI_app/management/commands/process_pending_aggregations.py

Lines changed: 65 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -85,6 +85,11 @@ def get_tree_listing_key(
8585
).digest()
8686

8787

88+
def get_rollup_key(test_id: str) -> bytes:
89+
"""Generate a hash (rollup key) from test_id with 'rollup|' prefix for namespacing."""
90+
return hashlib.sha256(f"rollup|{test_id}".encode("utf-8")).digest()
91+
92+
8893
SIMPLIFIED_STATUS_TO_COUNT = {
8994
SimplifiedStatusChoices.PASS: (1, 0, 0),
9095
SimplifiedStatusChoices.FAIL: (0, 1, 0),
@@ -983,10 +988,68 @@ def _process_tests_rollup_batch(
983988
if not ready_tests:
984989
return
985990

986-
test_ids = [t.test_id for t in ready_tests]
991+
rollup_keys_by_test_id = {
992+
t.test_id: get_rollup_key(t.test_id) for t in ready_tests
993+
}
994+
995+
existing_processed = _get_existing_processed(
996+
set(rollup_keys_by_test_id.values())
997+
)
998+
existing_by_key = {
999+
(e.listing_item_key, e.checkout_id): e for e in existing_processed
1000+
}
1001+
1002+
tests_to_process: list[PendingTest] = []
1003+
test_ids = []
1004+
reprocess_test_ids: set[str] = set()
1005+
new_processed_entries: set[ProcessedListingItems] = set()
1006+
1007+
for test in ready_tests:
1008+
rollup_key = rollup_keys_by_test_id[test.test_id]
1009+
1010+
try:
1011+
build = test_builds_by_id[test.build_id]
1012+
checkout_id = build.checkout.id
1013+
except KeyError:
1014+
continue
1015+
1016+
found_existing = existing_by_key.get((rollup_key, checkout_id), None)
1017+
1018+
if found_existing:
1019+
stored_status = found_existing.status
1020+
1021+
if stored_status is not None:
1022+
continue
1023+
if test.status is None:
1024+
# Both null - already counted as null
1025+
continue
1026+
1027+
# null -> non-null: This is a correction (reprocess)
1028+
reprocess_test_ids.add(test.test_id)
1029+
1030+
tests_to_process.append(test)
1031+
test_ids.append(test.test_id)
1032+
new_processed_entries.add(
1033+
ProcessedListingItems(
1034+
listing_item_key=rollup_key,
1035+
checkout_id=checkout_id,
1036+
status=test.status,
1037+
)
1038+
)
1039+
1040+
if not tests_to_process:
1041+
return
1042+
9871043
issues_map = _fetch_test_issues(test_ids)
988-
rollup_data = aggregate_tests_rollup(ready_tests, test_builds_by_id, issues_map)
1044+
rollup_data = aggregate_tests_rollup(
1045+
tests_to_process,
1046+
test_builds_by_id,
1047+
issues_map,
1048+
reprocess_test_ids=reprocess_test_ids,
1049+
)
1050+
9891051
self._process_tests_rollup(rollup_data)
1052+
self._process_new_processed_entries(new_processed_entries)
9901053

9911054
def _process_hardware_batch(
9921055
self,

backend/kernelCI_app/tests/unitTests/commands/process_pending_helpers_test.py

Lines changed: 261 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -447,3 +447,264 @@ def test_skips_test_with_missing_build_but_processes_others(self):
447447
self.assertEqual(len(result), 1)
448448
record = next(iter(result.values()))
449449
self.assertEqual(record["total_tests"], 1)
450+
451+
452+
class TestAccumulateRollupEntryCorrection(SimpleTestCase):
453+
"""Test is_correction=True behavior: null -> non-null transitions."""
454+
455+
def _make_rollup_data_with_null_entry(self, initial_null_count=1):
456+
"""Pre-seed rollup_data with a record having some null_tests."""
457+
checkout = _make_checkout()
458+
entry = _make_rollup_entry(checkout=checkout, status=None)
459+
rollup_key = RollupKey(
460+
origin=checkout.origin,
461+
tree_name=checkout.tree_name,
462+
git_repository_branch=checkout.git_repository_branch,
463+
git_repository_url=checkout.git_repository_url,
464+
git_commit_hash=checkout.git_commit_hash,
465+
path_group=entry["path_group"],
466+
config=entry["config"],
467+
arch=entry["arch"],
468+
compiler=entry["compiler"],
469+
hardware_key=entry["hardware_key"],
470+
platform=entry["platform"],
471+
lab=entry["lab"],
472+
test_origin=entry["origin"],
473+
issue_id=entry["issue_id"],
474+
issue_version=entry["issue_version"],
475+
issue_uncategorized=entry["issue_uncategorized"],
476+
is_boot=entry["is_boot"],
477+
)
478+
rollup_data = {
479+
rollup_key: {
480+
"pass_tests": 0,
481+
"fail_tests": 0,
482+
"skip_tests": 0,
483+
"error_tests": 0,
484+
"miss_tests": 0,
485+
"done_tests": 0,
486+
"null_tests": initial_null_count,
487+
"total_tests": initial_null_count,
488+
}
489+
}
490+
return rollup_data, entry, rollup_key
491+
492+
def test_correction_decrements_null_tests(self):
493+
"""Correction moves count from null_tests to the new status bucket."""
494+
rollup_data, entry, rollup_key = self._make_rollup_data_with_null_entry()
495+
entry["status"] = StatusChoices.PASS
496+
497+
accumulate_rollup_entry(rollup_data, entry, is_correction=True)
498+
499+
record = rollup_data[rollup_key]
500+
self.assertEqual(record["null_tests"], 0)
501+
502+
def test_correction_increments_new_status_bucket(self):
503+
"""The new status bucket gets the count moved from null_tests."""
504+
rollup_data, entry, rollup_key = self._make_rollup_data_with_null_entry()
505+
entry["status"] = StatusChoices.PASS
506+
507+
accumulate_rollup_entry(rollup_data, entry, is_correction=True)
508+
509+
record = rollup_data[rollup_key]
510+
self.assertEqual(record["pass_tests"], 1)
511+
512+
def test_correction_does_not_change_total_tests(self):
513+
"""Total should remain unchanged - just moving from null to bucket."""
514+
rollup_data, entry, rollup_key = self._make_rollup_data_with_null_entry()
515+
entry["status"] = StatusChoices.PASS
516+
517+
accumulate_rollup_entry(rollup_data, entry, is_correction=True)
518+
519+
record = rollup_data[rollup_key]
520+
self.assertEqual(record["total_tests"], 1)
521+
522+
def test_correction_with_fail_status(self):
523+
"""Correction works with fail status too."""
524+
rollup_data, entry, rollup_key = self._make_rollup_data_with_null_entry()
525+
entry["status"] = StatusChoices.FAIL
526+
527+
accumulate_rollup_entry(rollup_data, entry, is_correction=True)
528+
529+
record = rollup_data[rollup_key]
530+
self.assertEqual(record["null_tests"], 0)
531+
self.assertEqual(record["fail_tests"], 1)
532+
self.assertEqual(record["total_tests"], 1)
533+
534+
def test_correction_with_skip_status(self):
535+
"""Correction works with skip status."""
536+
rollup_data, entry, rollup_key = self._make_rollup_data_with_null_entry()
537+
entry["status"] = StatusChoices.SKIP
538+
539+
accumulate_rollup_entry(rollup_data, entry, is_correction=True)
540+
541+
record = rollup_data[rollup_key]
542+
self.assertEqual(record["null_tests"], 0)
543+
self.assertEqual(record["skip_tests"], 1)
544+
self.assertEqual(record["total_tests"], 1)
545+
546+
def test_correction_with_null_as_new_status(self):
547+
"""If new status is also null, both operations hit null_tests (net zero)."""
548+
rollup_data, entry, rollup_key = self._make_rollup_data_with_null_entry()
549+
entry["status"] = None
550+
551+
accumulate_rollup_entry(rollup_data, entry, is_correction=True)
552+
553+
record = rollup_data[rollup_key]
554+
# Decrement then increment null_tests: net change is 0
555+
self.assertEqual(record["null_tests"], 1)
556+
self.assertEqual(record["total_tests"], 1)
557+
558+
def test_multiple_corrections_on_same_rollup_key(self):
559+
"""Multiple corrections on the same key accumulate correctly."""
560+
checkout = _make_checkout()
561+
base_entry = _make_rollup_entry(checkout=checkout, status=None)
562+
rollup_key = RollupKey(
563+
origin=checkout.origin,
564+
tree_name=checkout.tree_name,
565+
git_repository_branch=checkout.git_repository_branch,
566+
git_repository_url=checkout.git_repository_url,
567+
git_commit_hash=checkout.git_commit_hash,
568+
path_group=base_entry["path_group"],
569+
config=base_entry["config"],
570+
arch=base_entry["arch"],
571+
compiler=base_entry["compiler"],
572+
hardware_key=base_entry["hardware_key"],
573+
platform=base_entry["platform"],
574+
lab=base_entry["lab"],
575+
test_origin=base_entry["origin"],
576+
issue_id=base_entry["issue_id"],
577+
issue_version=base_entry["issue_version"],
578+
issue_uncategorized=base_entry["issue_uncategorized"],
579+
is_boot=base_entry["is_boot"],
580+
)
581+
rollup_data = {
582+
rollup_key: {
583+
"pass_tests": 0,
584+
"fail_tests": 0,
585+
"skip_tests": 0,
586+
"error_tests": 0,
587+
"miss_tests": 0,
588+
"done_tests": 0,
589+
"null_tests": 3,
590+
"total_tests": 3,
591+
}
592+
}
593+
594+
# Three corrections: PASS, FAIL, SKIP
595+
entry1 = _make_rollup_entry(checkout=checkout, status=StatusChoices.PASS)
596+
entry2 = _make_rollup_entry(checkout=checkout, status=StatusChoices.FAIL)
597+
entry3 = _make_rollup_entry(checkout=checkout, status=StatusChoices.SKIP)
598+
599+
accumulate_rollup_entry(rollup_data, entry1, is_correction=True)
600+
accumulate_rollup_entry(rollup_data, entry2, is_correction=True)
601+
accumulate_rollup_entry(rollup_data, entry3, is_correction=True)
602+
603+
record = rollup_data[rollup_key]
604+
self.assertEqual(record["null_tests"], 0)
605+
self.assertEqual(record["pass_tests"], 1)
606+
self.assertEqual(record["fail_tests"], 1)
607+
self.assertEqual(record["skip_tests"], 1)
608+
self.assertEqual(record["total_tests"], 3)
609+
610+
611+
class TestAggregateTestsRollupWithReprocess(SimpleTestCase):
612+
"""Test aggregate_tests_rollup with reprocess_test_ids parameter."""
613+
614+
def test_reprocess_test_id_applies_correction(self):
615+
"""Test in reprocess_test_ids gets correction: null_tests decremented."""
616+
checkout = _make_checkout()
617+
build = _make_build(checkout=checkout)
618+
# This test will be marked as reprocess
619+
test = _make_pending_test(
620+
test_id="test-1",
621+
build_id="build-1",
622+
full_status=StatusChoices.PASS,
623+
)
624+
625+
result = aggregate_tests_rollup(
626+
[test],
627+
{"build-1": build},
628+
{},
629+
reprocess_test_ids={"test-1"}, # Mark as correction
630+
)
631+
632+
record = next(iter(result.values()))
633+
# Correction: total should not increment, pass_tests should be 1
634+
# But since there's no prior null_tests to decrement, it goes negative
635+
self.assertEqual(record["pass_tests"], 1)
636+
self.assertEqual(record["null_tests"], -1)
637+
self.assertEqual(record["total_tests"], 0)
638+
639+
def test_normal_test_not_in_reprocess(self):
640+
"""Test not in reprocess_test_ids behaves like normal."""
641+
checkout = _make_checkout()
642+
build = _make_build(checkout=checkout)
643+
test = _make_pending_test(
644+
test_id="test-1",
645+
build_id="build-1",
646+
full_status=StatusChoices.PASS,
647+
)
648+
649+
result = aggregate_tests_rollup(
650+
[test],
651+
{"build-1": build},
652+
{},
653+
reprocess_test_ids=set(), # Empty set
654+
)
655+
656+
record = next(iter(result.values()))
657+
self.assertEqual(record["pass_tests"], 1)
658+
self.assertEqual(record["total_tests"], 1)
659+
self.assertEqual(record["null_tests"], 0)
660+
661+
def test_mixed_batch_corrections_and_new(self):
662+
"""Two tests same rollup key: one correction + one normal."""
663+
checkout = _make_checkout()
664+
build = _make_build(checkout=checkout)
665+
# Correction: was counted as null, now becoming PASS
666+
test_correction = _make_pending_test(
667+
test_id="t1",
668+
build_id="build-1",
669+
full_status=StatusChoices.PASS,
670+
)
671+
# New test, fresh PASS
672+
test_new = _make_pending_test(
673+
test_id="t2",
674+
build_id="build-1",
675+
full_status=StatusChoices.PASS,
676+
)
677+
678+
result = aggregate_tests_rollup(
679+
[test_correction, test_new],
680+
{"build-1": build},
681+
{},
682+
reprocess_test_ids={"t1"}, # Only first is correction
683+
)
684+
685+
self.assertEqual(len(result), 1)
686+
record = next(iter(result.values()))
687+
# Correction: null_tests -1, pass_tests +1, total_tests 0
688+
# New: pass_tests +1, total_tests +1
689+
# Result: null_tests -1, pass_tests 2, total_tests 1
690+
self.assertEqual(record["null_tests"], -1)
691+
self.assertEqual(record["pass_tests"], 2)
692+
self.assertEqual(record["total_tests"], 1)
693+
694+
def test_default_reprocess_test_ids_is_empty(self):
695+
"""Not passing reprocess_test_ids defaults to empty set."""
696+
checkout = _make_checkout()
697+
build = _make_build(checkout=checkout)
698+
test = _make_pending_test(
699+
test_id="test-1",
700+
build_id="build-1",
701+
full_status=StatusChoices.FAIL,
702+
)
703+
704+
# Call without the reprocess_test_ids parameter
705+
result = aggregate_tests_rollup([test], {"build-1": build}, {})
706+
707+
record = next(iter(result.values()))
708+
self.assertEqual(record["fail_tests"], 1)
709+
self.assertEqual(record["total_tests"], 1)
710+
self.assertEqual(record["null_tests"], 0)

0 commit comments

Comments
 (0)