Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
import { randomUUID } from "node:crypto";
import fs from "node:fs";
import os from "node:os";
import path from "node:path";
Expand Down Expand Up @@ -7019,6 +7020,76 @@ describe("aiOrchestratorService", () => {
}
});

it("does not sync unlinked run steps onto mission steps already claimed by missionStepId", async () => {
const fixture = await createFixture();
try {
const mission = fixture.missionService.create({
prompt: "Avoid ambiguous stepKey sync collisions.",
laneId: fixture.laneId,
plannedSteps: [
{
index: 0,
title: "Review change",
detail: "Primary review step",
kind: "review",
metadata: { stepType: "review", stepKey: "review" },
},
],
});
const missionStep = fixture.missionService.get(mission.id)?.steps[0];
if (!missionStep) throw new Error("Expected mission step");

const started = fixture.orchestratorService.startRun({
missionId: mission.id,
steps: [
{
stepKey: "review",
title: missionStep.title,
stepIndex: 0,
dependencyStepKeys: [],
executorKind: "manual",
missionStepId: missionStep.id,
metadata: { stepType: "review" },
},
],
});
const runId = started.run.id;
fixture.missionService.update({
missionId: mission.id,
status: "in_progress",
});

const spawnedStepId = randomUUID();
const now = new Date().toISOString();
fixture.db.run(
`
insert into orchestrator_steps (
id,
run_id,
project_id,
mission_step_id,
step_key,
title,
step_index,
status,
lane_id,
metadata_json,
created_at,
updated_at
) values (?, ?, ?, null, 'review', 'Coordinator review worker', 1, 'failed', ?, '{}', ?, ?)
`,
[spawnedStepId, runId, fixture.projectId, fixture.laneId, now, now],
);

await fixture.aiOrchestratorService.syncMissionFromRun(runId, "coordinator_worker_failed");

const refreshed = fixture.missionService.get(mission.id);
expect(refreshed?.steps[0]?.status).not.toBe("failed");
} finally {
fixture.dispose();
}
});

it("opens terminal failed-step interventions for each matched failed mission step", async () => {
const fixture = await createFixture();
try {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -258,6 +258,7 @@ import {
getMaxCoordinatorRecoveries as getMaxCoordinatorRecoveriesCtx,
dispatchOrchestratorHookCtx,
maybeDispatchTeammateIdleHookCtx,
resolveMissionStepForRunStep,
} from "./missionLifecycle";
import { resolveFirstPostPlanningPhaseKey } from "../missions/phaseEngine";
import type { HookDispatchDeps } from "./missionLifecycle";
Expand Down Expand Up @@ -6970,40 +6971,12 @@ Check all worker statuses and continue managing the mission from here. Read work
return interventionId;
};

const stableString = (value: unknown): string | null => {
if (typeof value !== "string") return null;
const trimmed = value.trim();
return trimmed.length > 0 ? trimmed : null;
};

const resolveMissionStepForRunStep = (
mission: MissionDetail,
runStep: OrchestratorRunGraph["steps"][number],
missionStepById: Map<string, MissionDetail["steps"][number]> = new Map(mission.steps.map((step) => [step.id, step])),
): MissionDetail["steps"][number] | null => {
if (runStep.missionStepId) {
const byId = missionStepById.get(runStep.missionStepId);
if (byId) return byId;
}
const runStepId = stableString(runStep.id);
const runStepKey = stableString(runStep.stepKey);
return mission.steps.find((step) => {
const metadata = isRecord(step.metadata) ? step.metadata : null;
if (!metadata) return false;
const metadataOrchestratorStepId = stableString(metadata.orchestratorStepId);
if (runStepId && metadataOrchestratorStepId === runStepId) return true;
const metadataStepKey = stableString(metadata.stepKey);
return Boolean(runStepKey && metadataStepKey === runStepKey);
}) ?? null;
};

const syncMissionStepsFromRun = (graph: OrchestratorRunGraph) => {
const mission = missionService.get(graph.run.missionId);
if (!mission) return;
const missionStepById = new Map(mission.steps.map((step) => [step.id, step]));

for (const runStep of graph.steps) {
const missionStep = resolveMissionStepForRunStep(mission, runStep, missionStepById);
const missionStep = resolveMissionStepForRunStep(mission, runStep, graph);
if (!missionStep) continue;
const nextStatus = mapOrchestratorStepStatus(runStep.status);
if (missionStep.status === nextStatus) continue;
Expand Down Expand Up @@ -7055,7 +7028,6 @@ Check all worker statuses and continue managing the mission from here. Read work
const mission = missionService.get(graph.run.missionId);
if (!mission) return;

const missionStepById = new Map(mission.steps.map((step) => [step.id, step]));
const hasMatchingOpenIntervention = (
failedRunStep: OrchestratorRunGraph["steps"][number] | null,
failedMissionStep: MissionDetail["steps"][number] | null,
Expand All @@ -7070,7 +7042,7 @@ Check all worker statuses and continue managing the mission from here. Read work
.filter((step) => step.status === "failed")
.map((runStep) => ({
runStep,
missionStep: resolveMissionStepForRunStep(mission, runStep, missionStepById),
missionStep: resolveMissionStepForRunStep(mission, runStep, graph),
}))
.find(({ runStep, missionStep }) => !hasMatchingOpenIntervention(runStep, missionStep));
const fallbackMissionStep = failedRunPair
Expand Down
62 changes: 58 additions & 4 deletions apps/desktop/src/main/services/orchestrator/missionLifecycle.ts
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ import {
} from "./orchestratorContext";
import type {
OrchestratorRunGraph,
MissionDetail,
MissionStatus,
MissionStepStatus,
OrchestratorWorkerRole,
Expand Down Expand Up @@ -160,6 +161,62 @@ export const TERMINAL_PHASE_STEP_STATUSES = new Set<OrchestratorStepStatus>([
]);


function stableMissionLinkString(value: unknown): string | null {
if (typeof value !== "string") return null;
const trimmed = value.trim();
return trimmed.length > 0 ? trimmed : null;
}

/**
* Resolve the mission step that corresponds to an orchestrator run step.
* Avoids ambiguous stepKey matches when coordinator-spawned steps reuse a key
* already owned by a mission step linked via missionStepId on another run step.
*/
export function resolveMissionStepForRunStep(
mission: MissionDetail,
runStep: OrchestratorRunGraph["steps"][number],
graph?: Pick<OrchestratorRunGraph, "steps">,
): MissionDetail["steps"][number] | null {
const missionStepById = new Map(mission.steps.map((step) => [step.id, step]));
if (runStep.missionStepId) {
const byId = missionStepById.get(runStep.missionStepId);
if (byId) return byId;
}

const runStepId = stableMissionLinkString(runStep.id);
if (runStepId) {
const byOrchestratorId = mission.steps.find((step) => {
const metadata = isRecord(step.metadata) ? step.metadata : null;
if (!metadata) return false;
return stableMissionLinkString(metadata.orchestratorStepId) === runStepId;
});
if (byOrchestratorId) return byOrchestratorId;
}

const runStepKey = stableMissionLinkString(runStep.stepKey);
if (!runStepKey) return null;

const claimedMissionStepIds = new Set(
(graph?.steps ?? [])
.map((step) => stableMissionLinkString(step.missionStepId))
.filter((id): id is string => Boolean(id)),
);

const stepKeyMatches = mission.steps.filter((step) => {
const metadata = isRecord(step.metadata) ? step.metadata : null;
if (!metadata) return false;
const metadataStepKey = stableMissionLinkString(metadata.stepKey);
if (metadataStepKey !== runStepKey) return false;
if (claimedMissionStepIds.has(step.id) && runStep.missionStepId !== step.id) {
return false;
}
return true;
});

if (stepKeyMatches.length === 1) return stepKeyMatches[0]!;
return null;
}

/**
* Sync mission steps from an orchestrator run graph.
*/
Expand All @@ -169,10 +226,7 @@ export function syncMissionStepsFromRun(ctx: OrchestratorContext, graph: Orchest
if (!mission) return;

for (const step of graph.steps) {
const missionStep = mission.steps.find((ms) => {
const msMeta = isRecord(ms.metadata) ? ms.metadata : {};
return msMeta.orchestratorStepId === step.id || msMeta.stepKey === step.stepKey;
});
const missionStep = resolveMissionStepForRunStep(mission, step, graph);
if (missionStep) {
const apply = () => {
try {
Expand Down
Loading