Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
import { describe, expect, it } from "vitest";
import { failedStepInterventionsMatch } from "./failedStepInterventionMatching";

describe("failedStepInterventionsMatch", () => {
it("matches worker and terminal-sync metadata for the same failed step", () => {
const workerMetadata = {
runId: "run-1",
stepId: "orchestrator-step-1",
stepKey: "implement-api",
reasonCode: "retry_exhausted",
};
expect(
failedStepInterventionsMatch(workerMetadata, {
missionStepId: "mission-step-1",
orchestratorStepId: "orchestrator-step-1",
stepKey: "implement-api",
runId: "run-1",
}),
).toBe(true);
});

it("matches mission-step ids from terminal sync", () => {
expect(
failedStepInterventionsMatch(
{ stepId: "mission-step-1", stepKey: "implement-api", runId: "run-1" },
{ missionStepId: "mission-step-1", orchestratorStepId: "orchestrator-step-1", runId: "run-1" },
),
).toBe(true);
});

it("does not match different failed steps", () => {
expect(
failedStepInterventionsMatch(
{ stepId: "mission-step-1", stepKey: "first-step", runId: "run-1" },
{ missionStepId: "mission-step-2", stepKey: "second-step", runId: "run-1" },
),
).toBe(false);
});

it("does not match the same step key across different runs", () => {
expect(
failedStepInterventionsMatch(
{ stepKey: "implement-api", runId: "run-1" },
{ stepKey: "implement-api", runId: "run-2" },
),
).toBe(false);
});
});
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
export type FailedStepInterventionTarget = {
missionStepId?: string | null;
orchestratorStepId?: string | null;
stepKey?: string | null;
runId?: string | null;
};

function stableString(value: unknown): string | null {
if (typeof value !== "string") return null;
const trimmed = value.trim();
return trimmed.length > 0 ? trimmed : null;
}

/**
* Match open failed_step interventions across worker, terminal-sync, and manual
* creation paths. `metadata.stepId` is not stable: worker interventions store
* the orchestrator step id there, while terminal sync stores the mission step id.
*/
export function failedStepInterventionsMatch(
existingMetadata: Record<string, unknown> | null | undefined,
target: FailedStepInterventionTarget,
): boolean {
if (!existingMetadata) return false;

const existingStepId = stableString(existingMetadata.stepId);
const existingOrchestratorStepId = stableString(existingMetadata.orchestratorStepId);
const existingStepKey = stableString(existingMetadata.stepKey);
const existingRunId = stableString(existingMetadata.runId);

const missionStepId = stableString(target.missionStepId);
const orchestratorStepId = stableString(target.orchestratorStepId);
const stepKey = stableString(target.stepKey);
const runId = stableString(target.runId);

if (missionStepId && existingStepId === missionStepId) return true;

if (orchestratorStepId) {
if (existingStepId === orchestratorStepId) return true;
if (existingOrchestratorStepId === orchestratorStepId) return true;
}

if (stepKey && existingStepKey === stepKey) {
if (!runId || !existingRunId || runId === existingRunId) return true;
}

return false;
}
74 changes: 52 additions & 22 deletions apps/desktop/src/main/services/missions/missionService.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import {
isValidResolutionKind,
TERMINAL_MISSION_STATUSES,
} from "../../../shared/types";
import { failedStepInterventionsMatch } from "./failedStepInterventionMatching";
import type {
AddMissionArtifactArgs,
AddMissionInterventionArgs,
Expand Down Expand Up @@ -4034,17 +4035,43 @@ export function createMissionService({
const stepKey = typeof stepMetadata?.stepKey === "string" && stepMetadata.stepKey.trim()
? stepMetadata.stepKey.trim()
: null;
const intervention = insertIntervention({
missionId,
interventionType: "failed_step",
title: `Step failed: ${step.title}`,
body: note ?? "A mission step was marked as failed and needs attention.",
requestedAction: "Review the failure and decide whether to continue, retry, or cancel.",
metadata: {
stepId,
...(stepKey ? { stepKey } : {}),
},
});
const failedMetadata = {
stepId,
...(stepKey ? { stepKey } : {}),
};
const existingOpenRows = db.all<MissionInterventionRow>(
`select id, mission_id, intervention_type, status, title, body,
requested_action, resolution_kind, resolution_note, lane_id,
created_at, updated_at, resolved_at, metadata_json
from mission_interventions
where mission_id = ? and project_id = ? and status = 'open'
and intervention_type = 'failed_step'`,
[missionId, projectId]
);
const hasMatchingIntervention = existingOpenRows.some((row) =>
failedStepInterventionsMatch(safeParseRecord(row.metadata_json), {
missionStepId: stepId,
stepKey,
})
);
let interventionId =
existingOpenRows.find((row) =>
failedStepInterventionsMatch(safeParseRecord(row.metadata_json), {
missionStepId: stepId,
stepKey,
})
)?.id ?? null;
if (!hasMatchingIntervention) {
const intervention = insertIntervention({
missionId,
interventionType: "failed_step",
title: `Step failed: ${step.title}`,
body: note ?? "A mission step was marked as failed and needs attention.",
requestedAction: "Review the failure and decide whether to continue, retry, or cancel.",
metadata: failedMetadata,
});
interventionId = intervention.id;
}

db.run(
`
Expand All @@ -4063,7 +4090,7 @@ export function createMissionService({
updatedAt,
summary: "Mission paused for intervention after step failure.",
payload: {
interventionId: intervention.id,
...(interventionId ? { interventionId } : {}),
stepId
}
});
Expand Down Expand Up @@ -4146,9 +4173,9 @@ export function createMissionService({
// For failed_step interventions, match by stepId in metadata.
// For budget_limit_reached / provider_unreachable / unrecoverable_error,
// match by interventionType alone (at most one open per type).
const meta = args.metadata ?? null;
const metaStepId = meta && typeof (meta as Record<string, unknown>).stepId === "string"
? ((meta as Record<string, unknown>).stepId as string).trim()
const meta = isRecord(args.metadata) ? args.metadata : null;
const metaStepId = meta && typeof meta.stepId === "string"
? meta.stepId.trim()
: null;

const existingOpenRows = db.all<MissionInterventionRow>(
Expand All @@ -4162,14 +4189,17 @@ export function createMissionService({
);

for (const row of existingOpenRows) {
if (args.interventionType === "failed_step" && metaStepId) {
// Match by stepId in metadata
if (args.interventionType === "failed_step" && meta) {
const rowMeta = safeParseRecord(row.metadata_json);
const rowStepId = rowMeta && typeof rowMeta.stepId === "string"
? rowMeta.stepId.trim()
: null;
if (rowStepId === metaStepId) {
// Return existing intervention — skip duplicate creation
if (
failedStepInterventionsMatch(rowMeta, {
missionStepId: metaStepId,
orchestratorStepId:
typeof meta.orchestratorStepId === "string" ? meta.orchestratorStepId : null,
stepKey: typeof meta.stepKey === "string" ? meta.stepKey : null,
runId: typeof meta.runId === "string" ? meta.runId : null,
})
) {
return toMissionIntervention(row);
}
} else if (
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7019,6 +7019,85 @@ describe("aiOrchestratorService", () => {
}
});

it("does not duplicate worker failed-step interventions when a run fails", async () => {
const fixture = await createFixture();
try {
const mission = fixture.missionService.create({
prompt: "Avoid duplicate failed-step interventions on terminal sync.",
laneId: fixture.laneId,
plannedSteps: [
{
index: 0,
title: "Implement API",
detail: "Worker failure",
kind: "implementation",
metadata: { stepType: "implementation", stepKey: "implement-api" },
},
],
});
const missionStep = fixture.missionService.get(mission.id)?.steps[0];
if (!missionStep) throw new Error("Expected mission step");
fixture.missionService.update({ missionId: mission.id, status: "in_progress" });

const started = fixture.orchestratorService.startRun({
missionId: mission.id,
steps: [
{
stepKey: "implement-api",
title: missionStep.title,
stepIndex: 0,
dependencyStepKeys: [],
executorKind: "manual",
missionStepId: missionStep.id,
metadata: { stepType: "implementation", stepKey: "implement-api" },
},
],
});
const runId = started.run.id;
const runStep = fixture.orchestratorService.getRunGraph({ runId }).steps[0];
if (!runStep) throw new Error("Expected run step");

fixture.missionService.addIntervention({
missionId: mission.id,
interventionType: "failed_step",
title: `Step failed: ${missionStep.title}`,
body: "Worker retries exhausted.",
requestedAction: "Review the failure.",
metadata: {
runId,
stepId: runStep.id,
stepKey: "implement-api",
reasonCode: "retry_exhausted",
},
});

const failedAt = new Date().toISOString();
fixture.db.run(
`update orchestrator_runs set status = 'failed', updated_at = ? where id = ?`,
[failedAt, runId],
);
fixture.db.run(
`update orchestrator_steps set status = 'failed', completed_at = ?, updated_at = ? where run_id = ?`,
[failedAt, failedAt, runId],
);
fixture.db.run(
`update mission_steps set status = 'failed', completed_at = ?, updated_at = ? where id = ?`,
[failedAt, failedAt, missionStep.id],
);

await fixture.aiOrchestratorService.syncMissionFromRun(runId, "run_failed");

const refreshed = fixture.missionService.get(mission.id);
const failedStepInterventions = refreshed?.interventions.filter((entry) =>
entry.status === "open" && entry.interventionType === "failed_step"
) ?? [];
expect(failedStepInterventions).toHaveLength(1);
expect(failedStepInterventions[0]?.metadata?.stepId).toBe(runStep.id);
} finally {
fixture.dispose();
}
});

it("opens terminal failed-step interventions for each matched failed mission step", async () => {
const fixture = await createFixture();
try {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -109,6 +109,7 @@ import { isWorkerBootstrapNoiseLine } from "../../../shared/workerRuntimeNoise";
import type { Logger } from "../logging/logger";
import type { AdeDb } from "../state/kvDb";
import type { createMissionService } from "../missions/missionService";
import { failedStepInterventionsMatch } from "../missions/failedStepInterventionMatching";
import type { createOrchestratorService } from "./orchestratorService";
import type { createProjectConfigService } from "../config/projectConfigService";
import type { createAiIntegrationService } from "../ai/aiIntegrationService";
Expand Down Expand Up @@ -7056,15 +7057,25 @@ Check all worker statuses and continue managing the mission from here. Read work
if (!mission) return;

const missionStepById = new Map(mission.steps.map((step) => [step.id, step]));
const readMissionStepKey = (
missionStep: MissionDetail["steps"][number] | null,
): string | null => {
if (!missionStep) return null;
const metadata = isRecord(missionStep.metadata) ? missionStep.metadata : null;
return stableString(metadata?.stepKey);
};
const hasMatchingOpenIntervention = (
failedRunStep: OrchestratorRunGraph["steps"][number] | null,
failedMissionStep: MissionDetail["steps"][number] | null,
) => mission.interventions.some((entry) => {
if (entry.status !== "open" || entry.interventionType !== "failed_step") return false;
const metadata = isRecord(entry.metadata) ? entry.metadata : null;
if (!metadata) return false;
if (failedMissionStep?.id && metadata.stepId === failedMissionStep.id) return true;
return Boolean(!failedMissionStep && failedRunStep?.id && metadata.orchestratorStepId === failedRunStep.id);
return failedStepInterventionsMatch(metadata, {
missionStepId: failedMissionStep?.id ?? null,
orchestratorStepId: failedRunStep?.id ?? null,
stepKey: stableString(failedRunStep?.stepKey) ?? readMissionStepKey(failedMissionStep),
runId: graph.run.id,
});
});
const failedRunPair = graph.steps
.filter((step) => step.status === "failed")
Expand All @@ -7082,6 +7093,8 @@ Check all worker statuses and continue managing the mission from here. Read work
const failedMissionStep = failedRunPair?.missionStep ?? fallbackMissionStep;
if (!failedRunStep && !failedMissionStep) return;
const failedStepTitle = failedMissionStep?.title ?? failedRunStep?.title ?? "Run step";
const failedStepKey =
stableString(failedRunStep?.stepKey) ?? readMissionStepKey(failedMissionStep);

missionService.addIntervention({
missionId: mission.id,
Expand All @@ -7092,7 +7105,7 @@ Check all worker statuses and continue managing the mission from here. Read work
metadata: {
runId: graph.run.id,
stepId: failedMissionStep?.id ?? null,
stepKey: failedRunStep?.stepKey ?? null,
stepKey: failedStepKey,
orchestratorStepId: failedRunStep?.id ?? null,
reasonCode: "terminal_run_failed_step",
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,9 @@ export function routeMissionIntervention(
const interventionStepId = typeof metadata.stepId === "string" && metadata.stepId.trim().length > 0
? metadata.stepId.trim()
: null;
const interventionOrchestratorStepId = typeof metadata.orchestratorStepId === "string" && metadata.orchestratorStepId.trim().length > 0
? metadata.orchestratorStepId.trim()
: null;
const interventionStepKey = typeof metadata.stepKey === "string" && metadata.stepKey.trim().length > 0
? metadata.stepKey.trim()
: null;
Expand All @@ -61,7 +64,11 @@ export function routeMissionIntervention(
&& interventionStepId
&& store.runGraph?.steps.some((step) => step.id === interventionStepId)
? interventionStepId
: null;
: sameRun
&& interventionOrchestratorStepId
&& store.runGraph?.steps.some((step) => step.id === interventionOrchestratorStepId)
? interventionOrchestratorStepId
: null;

if (reasonCode === "coordinator_unavailable" || reasonCode === "coordinator_recovery_failed") {
store.setLogsFocusInterventionId(null);
Expand Down
Loading