Skip to content

Commit 6aa6346

Browse files
fix(executor): subflow edge keys mismatch (#4202)
* fix(executor): subflow edge keys mismatch' * improve style
1 parent 1708bbe commit 6aa6346

File tree

7 files changed

+354
-69
lines changed

7 files changed

+354
-69
lines changed

apps/sim/executor/execution/edge-manager.test.ts

Lines changed: 165 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -600,6 +600,171 @@ describe('EdgeManager', () => {
600600
})
601601
expect(readyNodes).toContain(function1Id)
602602
})
603+
604+
/**
605+
* Regression for the substring-match bug in clearDeactivatedEdgesForNodes.
606+
*
607+
* Reproduces the real workflow pattern where an empty upstream loop (e.g. KG) cascade
608+
* deactivates its `loop_exit` edge into the next loop's sentinel-start (e.g. SBJ). When
609+
* SBJ iterates and resets its state between iterations, the old buggy `includes(\`-${nodeId}-\`)`
610+
* check matched edge keys where the sentinel was the TARGET (not the source), wrongly
611+
* reactivating that external edge. That made countActiveIncomingEdges see a phantom pending
612+
* upstream and SBJ's sentinel-start stopped being ready, stalling the loop after iteration 1.
613+
*/
614+
it('should not re-activate external cascade-deactivated edges pointing INTO a loop node', () => {
615+
const externalNodeId = 'external-node'
616+
const sbjSentinelStartId = 'loop-sbj-sentinel-start'
617+
const sbjSentinelEndId = 'loop-sbj-sentinel-end'
618+
const bodyNodeId = 'body-node'
619+
620+
const externalNode = createMockNode(externalNodeId, [
621+
{ target: sbjSentinelStartId, sourceHandle: 'condition-if' },
622+
])
623+
const sbjSentinelStartNode = createMockNode(
624+
sbjSentinelStartId,
625+
[{ target: bodyNodeId }],
626+
[externalNodeId]
627+
)
628+
const bodyNode = createMockNode(
629+
bodyNodeId,
630+
[{ target: sbjSentinelEndId }],
631+
[sbjSentinelStartId]
632+
)
633+
const sbjSentinelEndNode = createMockNode(sbjSentinelEndId, [], [bodyNodeId])
634+
635+
const nodes = new Map<string, DAGNode>([
636+
[externalNodeId, externalNode],
637+
[sbjSentinelStartId, sbjSentinelStartNode],
638+
[bodyNodeId, bodyNode],
639+
[sbjSentinelEndId, sbjSentinelEndNode],
640+
])
641+
642+
const dag = createMockDAG(nodes)
643+
const edgeManager = new EdgeManager(dag)
644+
645+
edgeManager.processOutgoingEdges(externalNode, { selectedOption: 'else' })
646+
647+
expect(edgeManager.isNodeReady(sbjSentinelStartNode)).toBe(true)
648+
649+
edgeManager.clearDeactivatedEdgesForNodes(
650+
new Set([sbjSentinelStartId, sbjSentinelEndId, bodyNodeId])
651+
)
652+
653+
expect(edgeManager.isNodeReady(sbjSentinelStartNode)).toBe(true)
654+
})
655+
656+
/**
657+
* End-to-end regression: after a loop reset while an external edge is cascade-deactivated,
658+
* the backwards `loop_continue` edge from sentinel-end must still mark sentinel-start as
659+
* ready. The old code removed the external edge's deactivation entry, leaving a phantom
660+
* active incoming and producing the exact "loop stops after 1 iteration" symptom the user
661+
* hit on the Group A workflow.
662+
*/
663+
it('should leave sbjSentinelStart ready after loop reset when external edge is cascade-deactivated', () => {
664+
const externalNodeId = 'external-node'
665+
const sbjSentinelStartId = 'loop-sbj-sentinel-start'
666+
const sbjSentinelEndId = 'loop-sbj-sentinel-end'
667+
const bodyNodeId = 'body-node'
668+
669+
const externalNode = createMockNode(externalNodeId, [
670+
{ target: sbjSentinelStartId, sourceHandle: 'condition-if' },
671+
])
672+
const sbjSentinelStartNode = createMockNode(
673+
sbjSentinelStartId,
674+
[{ target: bodyNodeId }],
675+
[externalNodeId]
676+
)
677+
const bodyNode = createMockNode(
678+
bodyNodeId,
679+
[{ target: sbjSentinelEndId }],
680+
[sbjSentinelStartId]
681+
)
682+
const sbjSentinelEndNode = createMockNode(
683+
sbjSentinelEndId,
684+
[{ target: sbjSentinelStartId, sourceHandle: 'loop_continue' }],
685+
[bodyNodeId]
686+
)
687+
688+
const nodes = new Map<string, DAGNode>([
689+
[externalNodeId, externalNode],
690+
[sbjSentinelStartId, sbjSentinelStartNode],
691+
[bodyNodeId, bodyNode],
692+
[sbjSentinelEndId, sbjSentinelEndNode],
693+
])
694+
695+
const dag = createMockDAG(nodes)
696+
const edgeManager = new EdgeManager(dag)
697+
698+
edgeManager.processOutgoingEdges(externalNode, { selectedOption: 'else' })
699+
700+
edgeManager.clearDeactivatedEdgesForNodes(
701+
new Set([sbjSentinelStartId, sbjSentinelEndId, bodyNodeId])
702+
)
703+
704+
const readyNodes = edgeManager.processOutgoingEdges(sbjSentinelEndNode, {
705+
selectedRoute: 'loop_continue',
706+
})
707+
708+
expect(readyNodes).toContain(sbjSentinelStartId)
709+
})
710+
711+
/**
712+
* Guard against an overly narrow fix: edges whose SOURCE is inside the loop (e.g. a body
713+
* node that deactivated its outgoing edge during the previous iteration) must still be
714+
* cleared on reset so the next iteration can traverse them.
715+
*/
716+
it('should re-activate internal loop edges (source inside loop) when resetting loop state', () => {
717+
const sbjSentinelStartId = 'loop-sbj-sentinel-start'
718+
const sbjSentinelEndId = 'loop-sbj-sentinel-end'
719+
const conditionInLoopId = 'condition-in-loop'
720+
const thenBranchId = 'then-branch'
721+
722+
const sbjSentinelStartNode = createMockNode(sbjSentinelStartId, [
723+
{ target: conditionInLoopId },
724+
])
725+
const conditionInLoopNode = createMockNode(
726+
conditionInLoopId,
727+
[
728+
{ target: thenBranchId, sourceHandle: 'condition-if' },
729+
{ target: sbjSentinelEndId, sourceHandle: 'condition-else' },
730+
],
731+
[sbjSentinelStartId]
732+
)
733+
const thenBranchNode = createMockNode(
734+
thenBranchId,
735+
[{ target: sbjSentinelEndId }],
736+
[conditionInLoopId]
737+
)
738+
const sbjSentinelEndNode = createMockNode(
739+
sbjSentinelEndId,
740+
[],
741+
[conditionInLoopId, thenBranchId]
742+
)
743+
744+
const nodes = new Map<string, DAGNode>([
745+
[sbjSentinelStartId, sbjSentinelStartNode],
746+
[conditionInLoopId, conditionInLoopNode],
747+
[thenBranchId, thenBranchNode],
748+
[sbjSentinelEndId, sbjSentinelEndNode],
749+
])
750+
751+
const dag = createMockDAG(nodes)
752+
const edgeManager = new EdgeManager(dag)
753+
754+
edgeManager.processOutgoingEdges(conditionInLoopNode, { selectedOption: 'else' })
755+
756+
edgeManager.clearDeactivatedEdgesForNodes(
757+
new Set([sbjSentinelStartId, sbjSentinelEndId, conditionInLoopId, thenBranchId])
758+
)
759+
760+
thenBranchNode.incomingEdges.add(conditionInLoopId)
761+
762+
const readyNodes = edgeManager.processOutgoingEdges(conditionInLoopNode, {
763+
selectedOption: 'if',
764+
})
765+
766+
expect(readyNodes).toContain(thenBranchId)
767+
})
603768
})
604769

605770
describe('restoreIncomingEdge', () => {

apps/sim/executor/execution/edge-manager.ts

Lines changed: 10 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -128,12 +128,21 @@ export class EdgeManager {
128128

129129
/**
130130
* Clear deactivated edges for a set of nodes (used when restoring loop state for next iteration).
131+
*
132+
* Only clears edges whose SOURCE is in the provided set. Edges pointing INTO a node in the set
133+
* whose source lives outside (e.g. an external branch whose path was cascade-deactivated) must
134+
* remain deactivated — otherwise `countActiveIncomingEdges` would count a source that will never
135+
* fire again, stalling the loop on its next iteration.
136+
*
137+
* Edge-key format is `${sourceId}-${targetId}-${handle}`, so `startsWith("${nodeId}-")` uniquely
138+
* matches "node is source". An `includes("-${nodeId}-")` check would also match "node is target"
139+
* and is unsafe for the reset semantics.
131140
*/
132141
clearDeactivatedEdgesForNodes(nodeIds: Set<string>): void {
133142
const edgesToRemove: string[] = []
134143
for (const edgeKey of this.deactivatedEdges) {
135144
for (const nodeId of nodeIds) {
136-
if (edgeKey.startsWith(`${nodeId}-`) || edgeKey.includes(`-${nodeId}-`)) {
145+
if (edgeKey.startsWith(`${nodeId}-`)) {
137146
edgesToRemove.push(edgeKey)
138147
break
139148
}
@@ -142,7 +151,6 @@ export class EdgeManager {
142151
for (const edgeKey of edgesToRemove) {
143152
this.deactivatedEdges.delete(edgeKey)
144153
}
145-
// Also clear activated edge tracking for these nodes
146154
for (const nodeId of nodeIds) {
147155
this.nodesWithActivatedEdge.delete(nodeId)
148156
}

apps/sim/executor/orchestrators/loop.ts

Lines changed: 3 additions & 31 deletions
Original file line numberDiff line numberDiff line change
@@ -7,13 +7,8 @@ import type { DAG } from '@/executor/dag/builder'
77
import type { EdgeManager } from '@/executor/execution/edge-manager'
88
import type { LoopScope } from '@/executor/execution/state'
99
import type { BlockStateController, ContextExtensions } from '@/executor/execution/types'
10-
import {
11-
type ExecutionContext,
12-
getNextExecutionOrder,
13-
type NormalizedBlockOutput,
14-
} from '@/executor/types'
10+
import type { ExecutionContext, NormalizedBlockOutput } from '@/executor/types'
1511
import type { LoopConfigWithNodes } from '@/executor/types/loop'
16-
import { buildContainerIterationContext } from '@/executor/utils/iteration-context'
1712
import { replaceValidReferences } from '@/executor/utils/reference-validation'
1813
import {
1914
addSubflowErrorLog,
@@ -22,6 +17,7 @@ import {
2217
buildSentinelEndId,
2318
buildSentinelStartId,
2419
emitEmptySubflowEvents,
20+
emitSubflowSuccessEvents,
2521
extractBaseBlockId,
2622
resolveArrayInput,
2723
validateMaxCount,
@@ -319,31 +315,7 @@ export class LoopOrchestrator {
319315
const output = { results }
320316
this.state.setBlockOutput(loopId, output, DEFAULTS.EXECUTION_TIME)
321317

322-
if (this.contextExtensions?.onBlockComplete) {
323-
const now = new Date().toISOString()
324-
const iterationContext = buildContainerIterationContext(ctx, loopId)
325-
326-
try {
327-
await this.contextExtensions.onBlockComplete(
328-
loopId,
329-
'Loop',
330-
'loop',
331-
{
332-
output,
333-
executionTime: DEFAULTS.EXECUTION_TIME,
334-
startedAt: now,
335-
executionOrder: getNextExecutionOrder(ctx),
336-
endedAt: now,
337-
},
338-
iterationContext
339-
)
340-
} catch (error) {
341-
logger.warn('Loop completion callback failed', {
342-
loopId,
343-
error: error instanceof Error ? error.message : String(error),
344-
})
345-
}
346-
}
318+
await emitSubflowSuccessEvents(ctx, loopId, 'loop', output, this.contextExtensions)
347319

348320
return {
349321
shouldContinue: false,

apps/sim/executor/orchestrators/parallel.ts

Lines changed: 3 additions & 34 deletions
Original file line numberDiff line numberDiff line change
@@ -3,17 +3,13 @@ import { DEFAULTS } from '@/executor/constants'
33
import type { DAG } from '@/executor/dag/builder'
44
import type { ParallelScope } from '@/executor/execution/state'
55
import type { BlockStateWriter, ContextExtensions } from '@/executor/execution/types'
6-
import {
7-
type ExecutionContext,
8-
getNextExecutionOrder,
9-
type NormalizedBlockOutput,
10-
} from '@/executor/types'
6+
import type { ExecutionContext, NormalizedBlockOutput } from '@/executor/types'
117
import type { ParallelConfigWithNodes } from '@/executor/types/parallel'
12-
import { buildContainerIterationContext } from '@/executor/utils/iteration-context'
138
import { ParallelExpander } from '@/executor/utils/parallel-expansion'
149
import {
1510
addSubflowErrorLog,
1611
emitEmptySubflowEvents,
12+
emitSubflowSuccessEvents,
1713
extractBranchIndex,
1814
resolveArrayInput,
1915
validateMaxCount,
@@ -318,34 +314,7 @@ export class ParallelOrchestrator {
318314
const output = { results }
319315
this.state.setBlockOutput(parallelId, output)
320316

321-
// Emit onBlockComplete for the parallel container so the UI can track it.
322-
// When this parallel is nested inside a parent subflow (parallel or loop), emit
323-
// iteration context so the terminal can group this event under the parent container.
324-
if (this.contextExtensions?.onBlockComplete) {
325-
const now = new Date().toISOString()
326-
const iterationContext = buildContainerIterationContext(ctx, parallelId)
327-
328-
try {
329-
await this.contextExtensions.onBlockComplete(
330-
parallelId,
331-
'Parallel',
332-
'parallel',
333-
{
334-
output,
335-
executionTime: 0,
336-
startedAt: now,
337-
executionOrder: getNextExecutionOrder(ctx),
338-
endedAt: now,
339-
},
340-
iterationContext
341-
)
342-
} catch (error) {
343-
logger.warn('Parallel completion callback failed', {
344-
parallelId,
345-
error: error instanceof Error ? error.message : String(error),
346-
})
347-
}
348-
}
317+
await emitSubflowSuccessEvents(ctx, parallelId, 'parallel', output, this.contextExtensions)
349318

350319
return {
351320
allBranchesComplete: true,

apps/sim/executor/utils/subflow-utils.ts

Lines changed: 57 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -396,3 +396,60 @@ export async function emitEmptySubflowEvents(
396396
}
397397
}
398398
}
399+
400+
/**
401+
* Emits the BlockLog + onBlockComplete callback for a loop/parallel container that
402+
* finished successfully with at least one iteration. Without this, successful container
403+
* runs produce no top-level BlockLog, which forces the trace-span builder to fall back
404+
* to generic counter-based names ("Loop 1", "Parallel 1") instead of the user-configured
405+
* block name.
406+
*/
407+
export async function emitSubflowSuccessEvents(
408+
ctx: ExecutionContext,
409+
blockId: string,
410+
blockType: 'loop' | 'parallel',
411+
output: { results: any[] },
412+
contextExtensions: ContextExtensions | null
413+
): Promise<void> {
414+
const now = new Date().toISOString()
415+
const executionOrder = getNextExecutionOrder(ctx)
416+
const block = ctx.workflow?.blocks.find((b) => b.id === blockId)
417+
const blockName = block?.metadata?.name ?? blockType
418+
const iterationContext = buildContainerIterationContext(ctx, blockId)
419+
420+
ctx.blockLogs.push({
421+
blockId,
422+
blockName,
423+
blockType,
424+
startedAt: now,
425+
endedAt: now,
426+
durationMs: DEFAULTS.EXECUTION_TIME,
427+
success: true,
428+
output,
429+
executionOrder,
430+
})
431+
432+
if (contextExtensions?.onBlockComplete) {
433+
try {
434+
await contextExtensions.onBlockComplete(
435+
blockId,
436+
blockName,
437+
blockType,
438+
{
439+
output,
440+
executionTime: DEFAULTS.EXECUTION_TIME,
441+
startedAt: now,
442+
executionOrder,
443+
endedAt: now,
444+
},
445+
iterationContext
446+
)
447+
} catch (error) {
448+
logger.warn('Subflow success completion callback failed', {
449+
blockId,
450+
blockType,
451+
error: error instanceof Error ? error.message : String(error),
452+
})
453+
}
454+
}
455+
}

0 commit comments

Comments
 (0)