@@ -261,19 +261,32 @@ func dequeuePipeline(runningParallelLabels []string) (pipeline *models.Pipeline,
261261 }))
262262 // prepare query to find an appropriate pipeline to execute
263263 pipeline = & models.Pipeline {}
264+ // 1. find out the current highest priority in the queue
265+ top_priority := 0
266+ var top_priorities []int
267+ where_status := dal .Where ("status IN ?" , []string {models .TASK_CREATED , models .TASK_RERUN , models .TASK_RESUME })
268+ err = tx .Pluck ("priority" , & top_priorities , dal .From (pipeline ), where_status , dal .Orderby ("priority DESC" ), dal .Limit (1 ))
269+ if err != nil {
270+ panic (err )
271+ }
272+ if len (top_priorities ) > 0 {
273+ top_priority = top_priorities [0 ]
274+ }
275+ // 2. pick the earlier runnable pipeline with the highest priority
264276 err = tx .First (pipeline ,
265- dal .Where ("status IN ?" , []string {models .TASK_CREATED , models .TASK_RERUN , models .TASK_RESUME }),
277+ where_status ,
278+ dal .Where ("priority = ?" , top_priority ),
266279 dal .Join (
267280 `left join _devlake_pipeline_labels ON
268281 _devlake_pipeline_labels.pipeline_id = _devlake_pipelines.id AND
269282 _devlake_pipeline_labels.name LIKE 'parallel/%' AND
270283 _devlake_pipeline_labels.name in ?` ,
271284 runningParallelLabels ,
272285 ),
273- dal .Groupby ("priority, id" ),
286+ dal .Groupby ("id" ),
274287 dal .Having ("count(_devlake_pipeline_labels.name)=0" ),
275288 dal .Select ("id" ),
276- dal .Orderby ("priority DESC, id ASC" ),
289+ dal .Orderby ("id ASC" ),
277290 dal .Limit (1 ),
278291 )
279292 if err == nil {
0 commit comments