99from django .conf import settings
1010from django .core .management .base import BaseCommand
1111from django .db import connection , transaction
12+ from django .db .utils import OperationalError
1213from prometheus_client import Counter , start_http_server
14+ from psycopg .errors import DeadlockDetected
1315
1416from kernelCI_app .constants .general import MAESTRO_DUMMY_BUILD_PREFIX
1517from kernelCI_app .constants .ingester import PROMETHEUS_MULTIPROC_DIR
3840 ["table" ], # values: "tree_listing", "hardware_status", "processed_items"
3941)
4042
43+ DEADLOCK_RETRIES_TOTAL = Counter (
44+ "aggregation_deadlock_retries_total" ,
45+ "Total number of deadlock retries" ,
46+ ["table" ], # values: "tree_listing", "hardware_status"
47+ )
48+
4149
4250class ListingItemCount (TypedDict ):
4351 build_pass : int
@@ -525,6 +533,11 @@ def _fetch_test_issues(test_ids: list[str]) -> dict[str, dict]:
525533
526534
527535class Command (BaseCommand ):
536+ # WARNING: This command is designed for single-worker execution only.
537+ # Running multiple concurrent workers is not safe: select_for_update(skip_locked=True)
538+ # releases row locks when Transaction 1 commits, but pending rows are not deleted until
539+ # Transaction 2. In that window a second worker can claim and process the same rows,
540+ # causing double-counting in tree_listing aggregations.
528541 help = """
529542 Process pending tests for hardware status aggregation,
530543 checking corresponding builds and checkouts in the database.
@@ -1075,15 +1088,34 @@ def _process_tree_listing_batch(
10751088 test_builds_by_id : dict [str , Builds ],
10761089 ready_builds : Sequence [PendingBuilds ],
10771090 build_checkouts_by_id : dict [str , Checkouts ],
1091+ max_retries : int = int (os .getenv ("PROCESS_PENDING_MAX_RETRIES" , "5" )),
10781092 ) -> None :
10791093 tree_listing_data , new_processed_entries_tree = aggregate_tree_listing (
10801094 ready_tests ,
10811095 test_builds_by_id ,
10821096 ready_builds ,
10831097 build_checkouts_by_id ,
10841098 )
1085- self ._process_tree_listing (tree_listing_data )
1086- self ._process_new_processed_entries (new_processed_entries_tree )
1099+ for attempt in range (max_retries ):
1100+ try :
1101+ with transaction .atomic ():
1102+ self ._process_tree_listing (tree_listing_data )
1103+ self ._process_new_processed_entries (new_processed_entries_tree )
1104+ break
1105+ except OperationalError as e :
1106+ if (
1107+ isinstance (e .__cause__ , DeadlockDetected )
1108+ and attempt < max_retries - 1
1109+ ):
1110+ DEADLOCK_RETRIES_TOTAL .labels (table = "tree_listing" ).inc ()
1111+ wait = min (30 , 2 ** (attempt + 2 ))
1112+ out (
1113+ f"Deadlock on tree_listing (attempt { attempt + 1 } /{ max_retries } ), "
1114+ f"retrying in { wait :.2f} s..."
1115+ )
1116+ time .sleep (wait )
1117+ else :
1118+ raise
10871119
10881120 def process_pending_batch (self , batch_size : int ) -> int :
10891121 last_processed_test_id = None
@@ -1092,15 +1124,15 @@ def process_pending_batch(self, batch_size: int) -> int:
10921124 builds_count = 0
10931125
10941126 while True :
1095- with transaction .atomic ():
1096- out (
1097- f"Starting batch processing "
1098- f"(last_processed_test_id={ str (last_processed_test_id )[:20 ]} , "
1099- f"last_processed_build_id={ str (last_processed_build_id )[:20 ]} , "
1100- f"batch_size={ batch_size } )..."
1101- )
1102- t0 = time .time ()
1127+ out (
1128+ f"Starting batch processing "
1129+ f"(last_processed_test_id={ str (last_processed_test_id )[:20 ]} , "
1130+ f"last_processed_build_id={ str (last_processed_build_id )[:20 ]} , "
1131+ f"batch_size={ batch_size } )..."
1132+ )
1133+ t0 = time .time ()
11031134
1135+ with transaction .atomic ():
11041136 (
11051137 ready_tests ,
11061138 test_builds_by_id ,
@@ -1127,27 +1159,28 @@ def process_pending_batch(self, batch_size: int) -> int:
11271159 batch_size = batch_size ,
11281160 )
11291161
1130- if ready_tests or ready_builds :
1131- self ._process_tree_listing_batch (
1132- ready_tests ,
1133- test_builds_by_id ,
1134- ready_builds ,
1135- build_checkouts_by_id ,
1136- )
1162+ if ready_tests or ready_builds :
1163+ self ._process_tree_listing_batch (
1164+ ready_tests ,
1165+ test_builds_by_id ,
1166+ ready_builds ,
1167+ build_checkouts_by_id ,
1168+ )
11371169
1170+ with transaction .atomic ():
11381171 tests_count += self ._delete_ready_tests (ready_tests = ready_tests )
11391172 builds_count += self ._delete_ready_builds (ready_builds = ready_builds )
11401173
1141- out (
1142- f"Batch processed: { tests_count } tests aggregated, "
1143- f"skipped (no build): { skipped_no_build } ; "
1144- f"{ builds_count } builds aggregated, "
1145- f"skipped (no checkout): { skipped_no_checkout } ; "
1146- f"in { time .time () - t0 :.3f} s"
1147- )
1174+ out (
1175+ f"Batch processed: { tests_count } tests aggregated, "
1176+ f"skipped (no build): { skipped_no_build } ; "
1177+ f"{ builds_count } builds aggregated, "
1178+ f"skipped (no checkout): { skipped_no_checkout } ; "
1179+ f"in { time .time () - t0 :.3f} s"
1180+ )
11481181
1149- if pending_test_count == 0 and pending_build_count == 0 :
1150- out ("No pending items found, exiting batch loop" )
1151- break
1182+ if pending_test_count == 0 and pending_build_count == 0 :
1183+ out ("No pending items found, exiting batch loop" )
1184+ break
11521185
11531186 return tests_count + builds_count
0 commit comments