Skip to content

Commit 99e57f2

Browse files
feat(ingester): add pending_retry folder (#1678)
* feat(ingester): add pending_retry folder Fixes issues with file counter since the files were moved directly to archive before their data was processed, since their data was moved to the queue directly. Part of #1376 * fix: ingester perf tests Also adjusts performance tests docs
1 parent 992981f commit 99e57f2

9 files changed

Lines changed: 203 additions & 134 deletions

File tree

backend/kernelCI_app/management/commands/helpers/file_utils.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -45,6 +45,8 @@ def verify_dir(dir: str) -> None:
4545
def verify_spool_dirs(spool_dir: str) -> None:
4646
failed_dir = os.path.join(spool_dir, "failed")
4747
archive_dir = os.path.join(spool_dir, "archive")
48+
pending_retry_dir = os.path.join(spool_dir, "pending_retry")
4849
verify_dir(spool_dir)
4950
verify_dir(failed_dir)
5051
verify_dir(archive_dir)
52+
verify_dir(pending_retry_dir)

backend/kernelCI_app/management/commands/helpers/kcidbng_ingester.py

Lines changed: 49 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@
66
import logging
77
import os
88
from queue import Queue, Empty
9+
from typing_extensions import Literal
910
from kernelCI_app.constants.ingester import (
1011
CONVERT_LOG_EXCERPT,
1112
INGEST_BATCH_SIZE,
@@ -38,13 +39,7 @@
3839

3940
from prometheus_client import Counter
4041

41-
42-
class SubmissionMetadata(TypedDict):
43-
filename: str
44-
full_filename: str
45-
fsize: int | None
46-
processing_time: float | None
47-
error: str | None
42+
type INGESTER_DIRS = Literal["archive", "failed", "pending_retry"]
4843

4944

5045
class SubmissionFileMetadata(TypedDict):
@@ -177,6 +172,9 @@ def flush_buffers(
177172
builds_buf: list[Builds],
178173
tests_buf: list[Tests],
179174
incidents_buf: list[Incidents],
175+
buffer_files: set[tuple[str, str]],
176+
archive_dir: str,
177+
pending_retry_dir: str,
180178
) -> None:
181179
"""
182180
Consumes the list of objects and tries to insert them into the database.
@@ -206,8 +204,17 @@ def flush_buffers(
206204
checkouts_instances=checkouts_buf,
207205
tests_instances=tests_buf,
208206
)
207+
for filename, filepath in buffer_files:
208+
os.rename(filepath, os.path.join(archive_dir, filename))
209209
except Exception as e:
210210
logger.error("Error during buffer flush: %s", e)
211+
try:
212+
for filename, filepath in buffer_files:
213+
os.rename(filepath, os.path.join(pending_retry_dir, filename))
214+
out("Moved %d files to pending retry directory" % len(buffer_files))
215+
except OSError as oe:
216+
logger.error("OS error during buffer file pending retry move: %s", oe)
217+
logger.error("Removing files from buffer set, they should be retried")
211218
finally:
212219
flush_dur = time.time() - flush_start
213220
rate = total / flush_dur if flush_dur > 0 else 0.0
@@ -230,10 +237,16 @@ def flush_buffers(
230237
builds_buf.clear()
231238
tests_buf.clear()
232239
incidents_buf.clear()
240+
buffer_files.clear()
233241

234242

235243
# TODO: lower the complexity of this function
236-
def db_worker(stop_event: EventClass, db_queue: Queue) -> None: # noqa: C901
244+
def db_worker( # noqa: C901
245+
stop_event: EventClass,
246+
db_queue: Queue,
247+
archive_dir: str,
248+
pending_retry_dir: str,
249+
) -> None:
237250
"""
238251
Worker process that processes the database queue.
239252
This is the only process that interacts with the database.
@@ -249,6 +262,7 @@ def db_worker(stop_event: EventClass, db_queue: Queue) -> None: # noqa: C901
249262
builds_buf: list[Builds] = []
250263
tests_buf: list[Tests] = []
251264
incidents_buf: list[Incidents] = []
265+
buffer_files: set[tuple[str, str]] = set()
252266

253267
last_flush_ts = time.time()
254268

@@ -268,8 +282,9 @@ def buffered_total() -> int:
268282
db_queue.task_done()
269283
break
270284
try:
271-
filename, inst = item
285+
filename, filepath, inst = item
272286
if inst is not None:
287+
buffer_files.add((filename, filepath))
273288
issues_buf.extend(inst["issues"])
274289
checkouts_buf.extend(inst["checkouts"])
275290
builds_buf.extend(inst["builds"])
@@ -283,6 +298,9 @@ def buffered_total() -> int:
283298
builds_buf=builds_buf,
284299
tests_buf=tests_buf,
285300
incidents_buf=incidents_buf,
301+
archive_dir=archive_dir,
302+
pending_retry_dir=pending_retry_dir,
303+
buffer_files=buffer_files,
286304
)
287305
last_flush_ts = time.time()
288306

@@ -322,6 +340,9 @@ def buffered_total() -> int:
322340
builds_buf=builds_buf,
323341
tests_buf=tests_buf,
324342
incidents_buf=incidents_buf,
343+
archive_dir=archive_dir,
344+
pending_retry_dir=pending_retry_dir,
345+
buffer_files=buffer_files,
325346
)
326347
last_flush_ts = time.time()
327348
continue
@@ -335,6 +356,9 @@ def buffered_total() -> int:
335356
builds_buf=builds_buf,
336357
tests_buf=tests_buf,
337358
incidents_buf=incidents_buf,
359+
archive_dir=archive_dir,
360+
pending_retry_dir=pending_retry_dir,
361+
buffer_files=buffer_files,
338362
)
339363

340364

@@ -351,7 +375,6 @@ def process_file(
351375
file: SubmissionFileMetadata,
352376
tree_names: dict[str, str],
353377
failed_dir: str,
354-
archive_dir: str,
355378
db_queue: Queue,
356379
) -> bool:
357380
"""
@@ -374,25 +397,21 @@ def process_file(
374397
return True
375398

376399
db_queue.put(
377-
(file["name"], build_instances_from_submission(data, MAP_TABLENAMES_TO_COUNTER))
400+
(
401+
file["name"],
402+
file["path"],
403+
build_instances_from_submission(data, MAP_TABLENAMES_TO_COUNTER),
404+
)
378405
)
379406
FILES_INGESTER_COUNTER.labels(INGESTER_GRAFANA_LABEL).inc()
380407

381-
# Archive the file after queuing (we can do this optimistically)
382-
try:
383-
os.rename(file["path"], os.path.join(archive_dir, file["name"]))
384-
except Exception as e:
385-
logger.error("Error archiving file %s: %s", file["name"], e)
386-
return False
387-
388408
return True
389409

390410

391411
def ingest_submissions_parallel( # noqa: C901 - orchestrator with IO + threading
392412
json_files: list[DirEntry[str]],
393413
tree_names: dict[str, str],
394-
archive_dir: str,
395-
failed_dir: str,
414+
dirs: dict[INGESTER_DIRS, str],
396415
max_workers: int = 5,
397416
) -> None:
398417
"""
@@ -423,7 +442,15 @@ def ingest_submissions_parallel( # noqa: C901 - orchestrator with IO + threadin
423442
# Start database worker process
424443
# This process will constantly consume the db_queue and send data to the database
425444
stop_event = multiprocessing.Event()
426-
db_process = multiprocessing.Process(target=db_worker, args=(stop_event, db_queue))
445+
db_process = multiprocessing.Process(
446+
target=db_worker,
447+
args=(
448+
stop_event,
449+
db_queue,
450+
dirs["archive"],
451+
dirs["pending_retry"],
452+
),
453+
)
427454
db_process.start()
428455

429456
stat_ok = 0
@@ -443,8 +470,7 @@ def ingest_submissions_parallel( # noqa: C901 - orchestrator with IO + threadin
443470
process_file,
444471
{"path": file.path, "name": file.name, "size": file.stat().st_size},
445472
tree_names,
446-
failed_dir,
447-
archive_dir,
473+
dirs["failed"],
448474
db_queue,
449475
): file.name
450476
for file in json_files

backend/kernelCI_app/management/commands/monitor_submissions.py

Lines changed: 13 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@
55
import time
66
import os
77
from kernelCI_app.management.commands.helpers.kcidbng_ingester import (
8+
INGESTER_DIRS,
89
ingest_submissions_parallel,
910
)
1011
from kernelCI_app.constants.ingester import (
@@ -91,12 +92,16 @@ def handle(
9192
"PROMETHEUS_MULTIPROC_DIR is not set, skipping Prometheus metrics"
9293
)
9394

94-
archive_dir = os.path.join(spool_dir, "archive")
95-
failed_dir = os.path.join(spool_dir, "failed")
95+
dirs: dict[INGESTER_DIRS, str] = {
96+
"archive": os.path.join(spool_dir, "archive"),
97+
"failed": os.path.join(spool_dir, "failed"),
98+
"pending_retry": os.path.join(spool_dir, "pending_retry"),
99+
}
96100

97101
self.stdout.write(f"Monitoring folder: {spool_dir}")
98-
self.stdout.write(f"Archive directory: {archive_dir}")
99-
self.stdout.write(f"Failed directory: {failed_dir}")
102+
self.stdout.write(f"Archive directory: {dirs['archive']}")
103+
self.stdout.write(f"Failed directory: {dirs['failed']}")
104+
self.stdout.write(f"Pending retry directory: {dirs['pending_retry']}")
100105
self.stdout.write(f"Check interval: {interval} seconds")
101106
self.stdout.write(f"Using {max_workers} workers")
102107

@@ -126,7 +131,10 @@ def handle(
126131

127132
if len(json_files) > 0:
128133
ingest_submissions_parallel(
129-
json_files, tree_names, archive_dir, failed_dir, max_workers
134+
json_files,
135+
tree_names,
136+
dirs,
137+
max_workers,
130138
)
131139
cache_logs_maintenance()
132140

backend/kernelCI_app/tests/performanceTests/test_ingest_perf.py

Lines changed: 20 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -125,8 +125,11 @@ def test_ingest_perf_workers(
125125
args=(
126126
files,
127127
trees_names,
128-
os.path.join(SUBMISSIONS_DIR, "archive"),
129-
os.path.join(SUBMISSIONS_DIR, "failed"),
128+
{
129+
"archive": os.path.join(SUBMISSIONS_DIR, "archive"),
130+
"failed": os.path.join(SUBMISSIONS_DIR, "failed"),
131+
"pending_retry": os.path.join(SUBMISSIONS_DIR, "pending_retry"),
132+
},
130133
max_workers,
131134
),
132135
rounds=5,
@@ -158,8 +161,11 @@ def test_ingest_perf_file_count(
158161
args=(
159162
files,
160163
trees_names,
161-
os.path.join(SUBMISSIONS_DIR, "archive"),
162-
os.path.join(SUBMISSIONS_DIR, "failed"),
164+
{
165+
"archive": os.path.join(SUBMISSIONS_DIR, "archive"),
166+
"failed": os.path.join(SUBMISSIONS_DIR, "failed"),
167+
"pending_retry": os.path.join(SUBMISSIONS_DIR, "pending_retry"),
168+
},
163169
3,
164170
),
165171
rounds=5,
@@ -252,6 +258,7 @@ def _prepare_buffers(
252258
"tests": [],
253259
"incidents": [],
254260
}
261+
buffer_files: set[tuple[str, str]] = set()
255262

256263
for file in files:
257264
file_metadata: SubmissionFileMetadata = {
@@ -272,13 +279,17 @@ def _prepare_buffers(
272279
objects_buffers["builds"].extend(instances["builds"])
273280
objects_buffers["tests"].extend(instances["tests"])
274281
objects_buffers["incidents"].extend(instances["incidents"])
282+
buffer_files.add((file.name, file.path))
275283

276284
return [], {
277285
"issues_buf": objects_buffers["issues"],
278286
"checkouts_buf": objects_buffers["checkouts"],
279287
"builds_buf": objects_buffers["builds"],
280288
"tests_buf": objects_buffers["tests"],
281289
"incidents_buf": objects_buffers["incidents"],
290+
"buffer_files": buffer_files,
291+
"archive_dir": os.path.join(SUBMISSIONS_DIR, "archive"),
292+
"pending_retry_dir": os.path.join(SUBMISSIONS_DIR, "pending_retry"),
282293
}
283294

284295

@@ -325,8 +336,11 @@ def test_ingest_parallel_batch_size(
325336
args=(
326337
files,
327338
trees_names,
328-
os.path.join(SUBMISSIONS_DIR, "archive"),
329-
os.path.join(SUBMISSIONS_DIR, "failed"),
339+
{
340+
"archive": os.path.join(SUBMISSIONS_DIR, "archive"),
341+
"failed": os.path.join(SUBMISSIONS_DIR, "failed"),
342+
"pending_retry": os.path.join(SUBMISSIONS_DIR, "pending_retry"),
343+
},
330344
1,
331345
),
332346
rounds=5,

backend/kernelCI_app/tests/unitTests/commands/monitorSubmissions/file_utils_test.py

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@
99
verify_spool_dirs,
1010
)
1111
from kernelCI_app.tests.unitTests.helpers.fixtures.file_utils_data import (
12+
PENDING_RETRY_SPOOL_SUBDIR,
1213
TREES_PATH_TESTING,
1314
BASE_TREES_FILE,
1415
EXPECTED_PARSED_TREES_FILE,
@@ -242,13 +243,17 @@ def test_verify_spool_dirs_success(self, mock_verify_dir):
242243
"""Test verify_spool_dirs with successful directory verification."""
243244
joined_fail_dir = "/".join([SPOOL_DIR_TESTING, FAIL_SPOOL_SUBDIR])
244245
joined_archive_dir = "/".join([SPOOL_DIR_TESTING, ARCHIVE_SPOOL_SUBDIR])
246+
joined_pending_retry_dir = "/".join(
247+
[SPOOL_DIR_TESTING, PENDING_RETRY_SPOOL_SUBDIR]
248+
)
245249

246250
verify_spool_dirs(SPOOL_DIR_TESTING)
247251

248-
assert mock_verify_dir.call_count == 3
252+
assert mock_verify_dir.call_count == 4
249253
mock_verify_dir.assert_any_call(SPOOL_DIR_TESTING)
250254
mock_verify_dir.assert_any_call(joined_fail_dir)
251255
mock_verify_dir.assert_any_call(joined_archive_dir)
256+
mock_verify_dir.assert_any_call(joined_pending_retry_dir)
252257

253258
@patch("kernelCI_app.management.commands.helpers.file_utils.verify_dir")
254259
def test_verify_spool_dirs_verify_spool_dir_fails(self, mock_verify_dir):

0 commit comments

Comments
 (0)