Skip to content

Commit 034a0ec

Browse files
refactor: modify ingester to use processes in a 'fan-out' pattern (#1672)
1 parent cc382c7 commit 034a0ec

5 files changed

Lines changed: 343 additions & 793 deletions

File tree

backend/kernelCI_app/constants/ingester.py

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -38,6 +38,9 @@
3838
See https://prometheus.github.io/client_python/multiprocess/ for more details.
3939
"""
4040

41+
INGEST_FILES_BATCH_SIZE = int(os.environ.get("INGEST_FILES_BATCH_SIZE", 100))
42+
"""Size of the batch of files to be queued. Default: 100"""
43+
4144
try:
4245
INGESTER_METRICS_PORT = int(os.environ.get("INGESTER_METRICS_PORT", 8002))
4346
except (ValueError, TypeError):

backend/kernelCI_app/management/commands/helpers/aggregation_helpers.py

Lines changed: 17 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -55,23 +55,24 @@ def aggregate_checkouts(checkouts_instances: Sequence[Checkouts]) -> None:
5555
for checkout in checkouts_instances
5656
]
5757

58-
with connection.cursor() as cursor:
59-
cursor.executemany(
60-
"""
61-
INSERT INTO latest_checkout (
62-
checkout_id, origin, tree_name,
63-
git_repository_url, git_repository_branch, start_time
58+
if len(values) > 0:
59+
with connection.cursor() as cursor:
60+
cursor.executemany(
61+
"""
62+
INSERT INTO latest_checkout (
63+
checkout_id, origin, tree_name,
64+
git_repository_url, git_repository_branch, start_time
65+
)
66+
VALUES (%s, %s, %s, %s, %s, %s)
67+
ON CONFLICT (origin, tree_name, git_repository_url, git_repository_branch)
68+
DO UPDATE SET
69+
start_time = EXCLUDED.start_time,
70+
checkout_id = EXCLUDED.checkout_id
71+
WHERE latest_checkout.start_time < EXCLUDED.start_time
72+
""",
73+
values,
6474
)
65-
VALUES (%s, %s, %s, %s, %s, %s)
66-
ON CONFLICT (origin, tree_name, git_repository_url, git_repository_branch)
67-
DO UPDATE SET
68-
start_time = EXCLUDED.start_time,
69-
checkout_id = EXCLUDED.checkout_id
70-
WHERE latest_checkout.start_time < EXCLUDED.start_time
71-
""",
72-
values,
73-
)
74-
out(f"inserted {len(checkouts_instances)} checkouts in {time.time() - t0:.3f}s")
75+
out(f"inserted {len(checkouts_instances)} checkouts in {time.time() - t0:.3f}s")
7576

7677

7778
def aggregate_tests(

0 commit comments

Comments
 (0)