Skip to content

Commit 36ddf41

Browse files
feat: extend process_pending to populate TreeTestsRollup table (#1831)
* feat: enhance test aggregation with tree_tests rollup processing * test: add unit tests for process_pending_helpers functions
1 parent 3bcaf9a commit 36ddf41

5 files changed

Lines changed: 737 additions & 3 deletions

File tree

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,9 @@
1+
ROLLUP_STATUS_FIELDS = {
2+
"PASS": "pass_tests",
3+
"FAIL": "fail_tests",
4+
"SKIP": "skip_tests",
5+
"ERROR": "error_tests",
6+
"MISS": "miss_tests",
7+
"DONE": "done_tests",
8+
"NULL": "null_tests",
9+
}

backend/kernelCI_app/management/commands/helpers/aggregation_helpers.py

Lines changed: 17 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -38,6 +38,7 @@ def convert_build(b: Builds) -> PendingBuilds:
3838

3939

4040
def convert_test(t: Tests) -> PendingTest:
41+
misc = t.misc or {}
4142
return PendingTest(
4243
test_id=t.id,
4344
origin=t.origin,
@@ -46,6 +47,10 @@ def convert_test(t: Tests) -> PendingTest:
4647
build_id=t.build_id,
4748
status=simplify_status(t.status),
4849
is_boot=is_boot(t.path) if t.path else False,
50+
path=t.path,
51+
start_time=t.start_time,
52+
lab=misc.get("runtime"),
53+
full_status=t.status,
4954
)
5055

5156

@@ -159,6 +164,10 @@ def aggregate_tests(
159164
test.build_id,
160165
test.status,
161166
test.is_boot,
167+
test.path,
168+
test.start_time,
169+
test.lab,
170+
test.full_status,
162171
)
163172
for test in pending_tests
164173
]
@@ -167,13 +176,18 @@ def aggregate_tests(
167176
query = """
168177
INSERT INTO pending_test (
169178
test_id, origin, platform, compatible,
170-
build_id, status, is_boot
171-
) VALUES (%s, %s, %s, %s, %s, %s, %s)
179+
build_id, status, is_boot,
180+
path, start_time, lab, full_status
181+
) VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s)
172182
ON CONFLICT (test_id)
173183
DO UPDATE SET
174184
platform = COALESCE(pending_test.platform, EXCLUDED.platform),
175185
compatible = COALESCE(pending_test.compatible, EXCLUDED.compatible),
176-
status = COALESCE(pending_test.status, EXCLUDED.status)
186+
status = COALESCE(pending_test.status, EXCLUDED.status),
187+
path = COALESCE(pending_test.path, EXCLUDED.path),
188+
start_time = COALESCE(pending_test.start_time, EXCLUDED.start_time),
189+
lab = COALESCE(pending_test.lab, EXCLUDED.lab),
190+
full_status = COALESCE(pending_test.full_status, EXCLUDED.full_status)
177191
"""
178192

179193
with connections["default"].cursor() as cursor:
Lines changed: 160 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,160 @@
1+
from typing import NamedTuple, Optional, Sequence, TypedDict
2+
from kernelCI_app.helpers.logger import logger
3+
from kernelCI_app.constants.general import UNKNOWN_STRING
4+
from kernelCI_app.constants.process_pending import ROLLUP_STATUS_FIELDS
5+
from kernelCI_app.models import Builds, Checkouts, PendingTest, StatusChoices
6+
7+
8+
EMPTY_PATH_GROUP = "-"
9+
10+
11+
class RollupKey(NamedTuple):
12+
origin: str
13+
tree_name: str
14+
git_repository_branch: str
15+
git_repository_url: str
16+
git_commit_hash: str
17+
path_group: str
18+
config: str
19+
arch: str
20+
compiler: str
21+
hardware_key: str
22+
platform: Optional[str]
23+
lab: Optional[str]
24+
test_origin: str
25+
issue_id: Optional[str]
26+
issue_version: Optional[int]
27+
issue_uncategorized: bool
28+
is_boot: bool
29+
30+
31+
class RollupEntryData(TypedDict):
32+
checkout: Checkouts
33+
path_group: str
34+
config: str
35+
arch: str
36+
compiler: str
37+
hardware_key: str
38+
platform: Optional[str]
39+
lab: Optional[str]
40+
origin: str
41+
issue_id: Optional[str]
42+
issue_version: Optional[int]
43+
issue_uncategorized: bool
44+
is_boot: bool
45+
status: StatusChoices
46+
47+
48+
def extract_path_group(path: str) -> str:
49+
"""Extract the path group from a path."""
50+
return path.split(".", 1)[0] if path else EMPTY_PATH_GROUP
51+
52+
53+
def accumulate_rollup_entry(
54+
rollup_data: dict[tuple, dict],
55+
entry: RollupEntryData,
56+
) -> None:
57+
"""Accumulate a single test entry into rollup_data in-place."""
58+
checkout = entry["checkout"]
59+
rollup_key = RollupKey(
60+
origin=checkout.origin,
61+
tree_name=checkout.tree_name,
62+
git_repository_branch=checkout.git_repository_branch,
63+
git_repository_url=checkout.git_repository_url,
64+
git_commit_hash=checkout.git_commit_hash,
65+
path_group=entry["path_group"],
66+
config=entry["config"],
67+
arch=entry["arch"],
68+
compiler=entry["compiler"],
69+
hardware_key=entry["hardware_key"],
70+
platform=entry["platform"],
71+
lab=entry["lab"],
72+
test_origin=entry["origin"],
73+
issue_id=entry["issue_id"],
74+
issue_version=entry["issue_version"],
75+
issue_uncategorized=entry["issue_uncategorized"],
76+
is_boot=entry["is_boot"],
77+
)
78+
79+
record = rollup_data.setdefault(
80+
rollup_key,
81+
{
82+
"pass_tests": 0,
83+
"fail_tests": 0,
84+
"skip_tests": 0,
85+
"error_tests": 0,
86+
"miss_tests": 0,
87+
"done_tests": 0,
88+
"null_tests": 0,
89+
"total_tests": 0,
90+
},
91+
)
92+
93+
counter = ROLLUP_STATUS_FIELDS.get(entry["status"], "null_tests")
94+
record[counter] += 1
95+
record["total_tests"] += 1
96+
97+
98+
def aggregate_tests_rollup(
99+
ready_tests: Sequence[PendingTest],
100+
test_builds_by_id: dict[str, Builds],
101+
issues_map: dict[str, dict],
102+
) -> dict[tuple, dict]:
103+
"""
104+
Build rollup data from pending tests.
105+
Returns rollup data without touching the database.
106+
"""
107+
rollup_data: dict[tuple, dict] = {}
108+
109+
for test in ready_tests:
110+
# shouldn't happen, but being defensive here
111+
try:
112+
build = test_builds_by_id[test.build_id]
113+
except KeyError:
114+
logger.warning(
115+
f"Found test {test.test_id} with no build {test.build_id} on aggregate_tests_rollup"
116+
)
117+
continue
118+
119+
checkout = build.checkout
120+
path = test.path or ""
121+
path_group = extract_path_group(path)
122+
123+
hardware_key = UNKNOWN_STRING
124+
if test.compatible:
125+
hardware_key = test.compatible[0]
126+
elif test.platform:
127+
hardware_key = test.platform
128+
129+
issue_info = issues_map.get(test.test_id, {})
130+
issue_id = issue_info.get("issue_id")
131+
issue_version = issue_info.get("issue_version")
132+
issue_uncategorized = (
133+
issue_id is None and test.full_status == StatusChoices.FAIL
134+
)
135+
136+
arch = build.architecture or UNKNOWN_STRING
137+
compiler = build.compiler or UNKNOWN_STRING
138+
config = build.config_name or UNKNOWN_STRING
139+
140+
accumulate_rollup_entry(
141+
rollup_data,
142+
{
143+
"checkout": checkout,
144+
"path_group": path_group,
145+
"config": config,
146+
"arch": arch,
147+
"compiler": compiler,
148+
"hardware_key": hardware_key,
149+
"platform": test.platform,
150+
"lab": test.lab,
151+
"origin": test.origin,
152+
"issue_id": issue_id,
153+
"issue_version": issue_version,
154+
"issue_uncategorized": issue_uncategorized,
155+
"is_boot": test.is_boot,
156+
"status": test.full_status,
157+
},
158+
)
159+
160+
return rollup_data

backend/kernelCI_app/management/commands/process_pending_aggregations.py

Lines changed: 102 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,9 @@
55
import time
66
from datetime import datetime
77
from typing import Literal, Optional, Sequence, TypedDict, Union
8+
from kernelCI_app.management.commands.helpers.process_pending_helpers import (
9+
aggregate_tests_rollup,
10+
)
811
from django.conf import settings
912
from django.core.management.base import BaseCommand
1013
from django.db import connection, transaction
@@ -16,6 +19,7 @@
1619
from kernelCI_app.models import (
1720
Builds,
1821
Checkouts,
22+
Incidents,
1923
PendingTest,
2024
PendingBuilds,
2125
ProcessedListingItems,
@@ -492,6 +496,25 @@ def aggregate_hardware_status(
492496
return hardware_status_data, new_processed_entries
493497

494498

499+
def _fetch_test_issues(test_ids: list[str]) -> dict[str, dict]:
500+
"""Bulk-fetch the first incident per test_id, returning {test_id: {issue_id, issue_version}}."""
501+
issues_map: dict[str, dict] = {}
502+
incidents = Incidents.objects.filter(
503+
test_id__in=test_ids,
504+
).values("test_id", "issue_id", "issue_version")
505+
506+
for inc in incidents:
507+
issues_map.setdefault(
508+
inc["test_id"],
509+
{
510+
"issue_id": inc["issue_id"],
511+
"issue_version": inc["issue_version"],
512+
},
513+
)
514+
515+
return issues_map
516+
517+
495518
class Command(BaseCommand):
496519
help = """
497520
Process pending tests for hardware status aggregation,
@@ -855,8 +878,17 @@ def _get_ready_tests(
855878
.only(
856879
"id",
857880
"status",
881+
"architecture",
882+
"compiler",
883+
"config_name",
884+
"misc",
858885
"checkout__id",
859886
"checkout__start_time",
887+
"checkout__origin",
888+
"checkout__tree_name",
889+
"checkout__git_repository_url",
890+
"checkout__git_repository_branch",
891+
"checkout__git_commit_hash",
860892
)
861893
.in_bulk(pending_test_build_ids)
862894
)
@@ -887,6 +919,75 @@ def _get_ready_tests(
887919
pending_test_count,
888920
)
889921

922+
def _process_tests_rollup(self, rollup_data: dict[tuple, dict]) -> None:
923+
if not rollup_data:
924+
return
925+
926+
values = [
927+
(
928+
*key,
929+
data["pass_tests"],
930+
data["fail_tests"],
931+
data["skip_tests"],
932+
data["error_tests"],
933+
data["miss_tests"],
934+
data["done_tests"],
935+
data["null_tests"],
936+
data["total_tests"],
937+
)
938+
for key, data in rollup_data.items()
939+
]
940+
941+
t0 = time.time()
942+
with connection.cursor() as cursor:
943+
cursor.executemany(
944+
"""
945+
INSERT INTO tree_tests_rollup (
946+
origin, tree_name, git_repository_branch, git_repository_url,
947+
git_commit_hash,
948+
path_group, build_config_name, build_architecture, build_compiler,
949+
hardware_key, test_platform, test_lab, test_origin,
950+
issue_id, issue_version, issue_uncategorized, is_boot,
951+
pass_tests, fail_tests, skip_tests,
952+
error_tests, miss_tests, done_tests,
953+
null_tests, total_tests
954+
)
955+
VALUES (
956+
%s, %s, %s, %s, %s, %s, %s, %s, %s,
957+
%s, %s, %s, %s, %s, %s, %s, %s, %s,
958+
%s, %s, %s, %s, %s, %s, %s
959+
)
960+
ON CONFLICT ON CONSTRAINT tree_tests_rollup_unique DO UPDATE SET
961+
pass_tests = tree_tests_rollup.pass_tests + EXCLUDED.pass_tests,
962+
fail_tests = tree_tests_rollup.fail_tests + EXCLUDED.fail_tests,
963+
skip_tests = tree_tests_rollup.skip_tests + EXCLUDED.skip_tests,
964+
error_tests = tree_tests_rollup.error_tests + EXCLUDED.error_tests,
965+
miss_tests = tree_tests_rollup.miss_tests + EXCLUDED.miss_tests,
966+
done_tests = tree_tests_rollup.done_tests + EXCLUDED.done_tests,
967+
null_tests = tree_tests_rollup.null_tests + EXCLUDED.null_tests,
968+
total_tests = tree_tests_rollup.total_tests + EXCLUDED.total_tests
969+
""",
970+
values,
971+
)
972+
out(
973+
f"Upserted {len(values)} tree_tests_rollup records "
974+
f"in {time.time() - t0:.3f}s"
975+
)
976+
AGGREGATION_RECORDS_WRITTEN.labels(table="tree_tests_rollup").inc(len(values))
977+
978+
def _process_tests_rollup_batch(
979+
self,
980+
ready_tests: Sequence[PendingTest],
981+
test_builds_by_id: dict[str, Builds],
982+
) -> None:
983+
if not ready_tests:
984+
return
985+
986+
test_ids = [t.test_id for t in ready_tests]
987+
issues_map = _fetch_test_issues(test_ids)
988+
rollup_data = aggregate_tests_rollup(ready_tests, test_builds_by_id, issues_map)
989+
self._process_tests_rollup(rollup_data)
990+
890991
def _process_hardware_batch(
891992
self,
892993
ready_tests: Sequence[PendingTest],
@@ -943,6 +1044,7 @@ def process_pending_batch(self, batch_size: int) -> int:
9431044

9441045
if ready_tests:
9451046
self._process_hardware_batch(ready_tests, test_builds_by_id)
1047+
self._process_tests_rollup_batch(ready_tests, test_builds_by_id)
9461048

9471049
(
9481050
ready_builds,

0 commit comments

Comments
 (0)