diff --git a/web/oss/src/components/AgentChatSlice/assets/AgentChatTransport.ts b/web/oss/src/components/AgentChatSlice/assets/AgentChatTransport.ts index 61fef53242..102924ff59 100644 --- a/web/oss/src/components/AgentChatSlice/assets/AgentChatTransport.ts +++ b/web/oss/src/components/AgentChatSlice/assets/AgentChatTransport.ts @@ -1,19 +1,20 @@ -import {agentChannelModeAtom} from "@agenta/playground" +import {createNegotiatingFetch, type NegotiatingFetch} from "@agenta/playground" import {DefaultChatTransport, type UIMessage, type UIMessageChunk} from "ai" -import {getDefaultStore} from "jotai" /** * Agent chat transport. * - * `useChat` only renders a stream of `UIMessageChunk`s — it has no "batch" mode. So when the - * playground's channel mode is `batch`, the backend returns a single `WorkflowBatchResponse` - * (JSON, because `buildAgentRequest` sent `Accept: application/json`) and this transport replays + * `useChat` only renders a stream of `UIMessageChunk`s — it has no "batch" mode. So when the run + * resolves to a batch (the toggle forced it, or the backend fell back because the handler can't + * stream), the backend returns a single `WorkflowBatchResponse` (JSON) and this transport replays * it as a ONE-SHOT UIMessage stream — the same chunk sequence the SSE path emits — so the reply - * lands in a single frame. Stream mode delegates to the default SSE parser unchanged. + * lands in a single frame. A real stream delegates to the default SSE parser unchanged. * - * The mode is read from the shared `agentChannelModeAtom` (the same atom that set the request's - * Accept header), so the request channel and the response handling always agree. The main - * playground uses the default Jotai store, which is what `buildAgentRequest` reads too. + * Which channel resolved is decided by the `createNegotiatingFetch` middleware, NOT a fixed + * toggle: it requests the stream, falls back to a batch re-request on a 406 (handler can't + * stream), and passes any other error through so `useChat` surfaces it inline. The transport + * parses the body according to the channel that fetch actually resolved (`resolvedMode`), so the + * request and the response handling can never disagree. */ type AnyChunk = UIMessageChunk @@ -190,9 +191,22 @@ function batchJsonToUiMessageStream( } export class AgentChatTransport extends DefaultChatTransport { + private readonly negotiator: NegotiatingFetch + + constructor(options: ConstructorParameters>[0] = {}) { + // Own the transport's `fetch` so every request goes through stream→batch negotiation; + // any caller-supplied fetch becomes the negotiator's base (tests inject one here). + super({...options, fetch: undefined}) + this.negotiator = createNegotiatingFetch(options.fetch) + this.fetch = this.negotiator.fetch + } + protected processResponseStream(stream: ReadableStream): ReadableStream { - const mode = getDefaultStore().get(agentChannelModeAtom) - if (mode === "batch") return batchJsonToUiMessageStream(stream) + // Parse by the channel the request actually resolved to, not the requested one — a stream + // request can come back as a batch via the 406 fallback. The mode is keyed off this exact + // body stream (`resolvedMode(stream)`), so request and parse stay in lockstep. + if (this.negotiator.resolvedMode(stream) === "batch") + return batchJsonToUiMessageStream(stream) return super.processResponseStream(stream) } } diff --git a/web/packages/agenta-playground/src/index.ts b/web/packages/agenta-playground/src/index.ts index 42b8cddb98..962fcd1db0 100644 --- a/web/packages/agenta-playground/src/index.ts +++ b/web/packages/agenta-playground/src/index.ts @@ -75,8 +75,9 @@ export { buildAgentRequest, buildAgentReferences, agentChannelModeAtom, + createNegotiatingFetch, } from "./state" -export type {AgentRequest, AgentChannelMode} from "./state" +export type {AgentRequest, AgentChannelMode, NegotiatingFetch} from "./state" // HITL resume predicate for `useChat`'s `sendAutomaticallyWhen` (approve AND deny resume). export {agentShouldResumeAfterApproval} from "./state" // Queued-message release gate for the agent chat composer (HITL-safe, one-by-one). diff --git a/web/packages/agenta-playground/src/state/execution/agentNegotiation.ts b/web/packages/agenta-playground/src/state/execution/agentNegotiation.ts new file mode 100644 index 0000000000..51496a2966 --- /dev/null +++ b/web/packages/agenta-playground/src/state/execution/agentNegotiation.ts @@ -0,0 +1,134 @@ +import type {AgentChannelMode} from "./channelMode" + +/** + * Client-side transport negotiation for the agent `/invoke` lane. + * + * The backend negotiates the response channel off the request `Accept` header + * (sdk/agents/adapters/vercel routing): `text/event-stream` asks for the v6 SSE + * UI-message stream, `application/json` for a single `WorkflowBatchResponse`. A + * handler that cannot stream answers a stream request with **406 Not Acceptable** + * (not an SSE body); a handler that errors answers with JSON carrying the real + * status code. + * + * JP's first cut sent a fixed `Accept` from the kebab toggle and trusted it. This + * adds the real negotiation the toggle's `stream` default implies: + * + * 1. **stream** — request the SSE stream. + * 2. **fall back to batch** — if the server can't stream (406), re-issue the same + * request as `application/json` and replay the batch as a one-shot stream. + * 3. **error gracefully** — any other non-OK response (or a failed batch fallback) + * is returned untouched so the AI-SDK transport throws its body text, which + * `useChat`'s `onError` surfaces inline (`parseAgentRunError`). + * + * An explicit `batch` toggle skips the stream attempt entirely. + */ +export interface NegotiatingFetch { + /** A `fetch` middleware to hand the AI-SDK transport. */ + fetch: typeof globalThis.fetch + /** + * The channel a response actually resolved to — drives how the transport parses the body + * (SSE stream vs. one-shot batch replay). + * + * Pass the response body the transport is about to parse (`response.body`); the mode is bound + * to that exact stream, so request and parse can never disagree even if requests overlap. With + * no argument it returns the LAST negotiated mode (kept for callers that don't have the stream). + */ + resolvedMode: (stream?: ReadableStream | null) => AgentChannelMode +} + +type Headersish = HeadersInit | undefined + +/** Read a header value tolerant of the three `HeadersInit` shapes the caller may pass. */ +const headerValue = (headers: Headersish, name: string): string => { + const lower = name.toLowerCase() + if (!headers) return "" + if (headers instanceof Headers) return headers.get(name) ?? "" + if (Array.isArray(headers)) { + const hit = headers.find(([k]) => k.toLowerCase() === lower) + return hit?.[1] ?? "" + } + for (const [k, v] of Object.entries(headers)) { + if (k.toLowerCase() === lower) return String(v) + } + return "" +} + +/** Clone `headers` with `Accept` replaced — preserves every other header (auth, UA, format). */ +const withAccept = (headers: Headersish, accept: string): Record => { + const out: Record = {} + if (headers instanceof Headers) { + headers.forEach((v, k) => { + out[k] = v + }) + } else if (Array.isArray(headers)) { + for (const [k, v] of headers) out[k] = v + } else if (headers) { + for (const [k, v] of Object.entries(headers)) out[k] = String(v) + } + // Drop any existing Accept (case-insensitive) before setting the new one. + for (const k of Object.keys(out)) { + if (k.toLowerCase() === "accept") delete out[k] + } + out.Accept = accept + return out +} + +/** HTTP status the SDK route returns when a stream was asked of a handler that can only batch. */ +const NOT_ACCEPTABLE = 406 + +export function createNegotiatingFetch(baseFetch?: typeof globalThis.fetch): NegotiatingFetch { + const base = baseFetch ?? globalThis.fetch.bind(globalThis) + + // Bind the resolved mode to the response body STREAM, not a shared variable: the transport + // reads it back when it parses that body, so keying on the stream keeps request and parse in + // lockstep even if two requests on this transport ever overlap. `last` is the no-arg fallback. + const modes = new WeakMap, AgentChannelMode>() + let last: AgentChannelMode = "stream" + + const remember = (res: Response, mode: AgentChannelMode): Response => { + last = mode + if (res.body) modes.set(res.body, mode) + return res + } + + const fetch: typeof globalThis.fetch = async (input, init) => { + const wantsStream = headerValue(init?.headers, "accept").includes("text/event-stream") + + // Explicit batch request (toggle = batch) — no stream attempt. + if (!wantsStream) { + return remember(await base(input, init), "batch") + } + + const res = await base(input, init) + + if (res.ok) { + // The server may honour the stream or, having no preference path, answer batch JSON. + // Trust the response Content-Type over our request intent. + const isStream = (res.headers.get("content-type") ?? "").includes("text/event-stream") + return remember(res, isStream ? "stream" : "batch") + } + + // Negotiation failed because the handler can't stream — fall back to batch. + if (res.status === NOT_ACCEPTABLE) { + const batch = await base(input, { + ...init, + headers: withAccept(init?.headers, "application/json"), + }) + return remember(batch, "batch") + } + + // A real error (4xx/5xx with the run's JSON envelope). Hand it back untouched so the + // AI-SDK transport throws its body text and `useChat` renders it inline. + return remember(res, "stream") + } + + const resolvedMode = (stream?: ReadableStream | null): AgentChannelMode => { + if (stream) { + const mode = modes.get(stream) + if (mode) return mode + } + return last + } + + return {fetch, resolvedMode} +} diff --git a/web/packages/agenta-playground/src/state/execution/channelMode.ts b/web/packages/agenta-playground/src/state/execution/channelMode.ts index aa389659c0..2ceab6aca3 100644 --- a/web/packages/agenta-playground/src/state/execution/channelMode.ts +++ b/web/packages/agenta-playground/src/state/execution/channelMode.ts @@ -4,11 +4,13 @@ export type AgentChannelMode = "stream" | "batch" /** * How the agent playground talks to the agent `/invoke` endpoint: - * - `stream` (default): the real-time SSE UIMessage stream `useChat` renders token-by-token. - * - `batch`: a single JSON response (`WorkflowBatchResponse`); the transport replays it as a - * one-shot UIMessage stream, so the reply lands in one frame instead of streaming. + * - `stream` (default): request the real-time SSE UIMessage stream `useChat` renders + * token-by-token. If the backend can't stream (the handler can only batch → 406), the + * transport's `createNegotiatingFetch` middleware transparently falls back to a batch. + * - `batch`: skip the stream attempt and request a single JSON `WorkflowBatchResponse` up + * front; the transport replays it as a one-shot UIMessage stream so it lands in one frame. * - * This is a transport/controller concern (which channel the playground speaks), NOT revision + * This is a transport/controller concern (which channel the playground PREFERS), NOT revision * config — it is never persisted on the agent revision. `buildAgentRequest` reads it to set the * `Accept` header; the playground kebab menu writes it. Stream is the default for agents. */ diff --git a/web/packages/agenta-playground/src/state/execution/index.ts b/web/packages/agenta-playground/src/state/execution/index.ts index f1995badac..5aa3e8a306 100644 --- a/web/packages/agenta-playground/src/state/execution/index.ts +++ b/web/packages/agenta-playground/src/state/execution/index.ts @@ -354,6 +354,8 @@ export { export {buildAgentRequest, buildAgentReferences, type AgentRequest} from "./agentRequest" // Stream vs batch response channel for the agent lane (read by buildAgentRequest's Accept header). export {agentChannelModeAtom, type AgentChannelMode} from "./channelMode" +// Transport negotiation: try stream, fall back to batch on 406, error gracefully otherwise. +export {createNegotiatingFetch, type NegotiatingFetch} from "./agentNegotiation" // Agent-lane HITL resume predicate (approve AND deny both resume the conversation). export {agentShouldResumeAfterApproval} from "./agentApprovalResume" // Agent-lane queued-message release gate (never releases mid-HITL or pre-resume). diff --git a/web/packages/agenta-playground/src/state/index.ts b/web/packages/agenta-playground/src/state/index.ts index 58b0d495c2..dd10323a39 100644 --- a/web/packages/agenta-playground/src/state/index.ts +++ b/web/packages/agenta-playground/src/state/index.ts @@ -177,6 +177,7 @@ export {appTypeAtom, isChatModeAtom, type AppType} from "./execution" export {isAgentModeAtomFamily} from "./execution" export {buildAgentRequest, buildAgentReferences, type AgentRequest} from "./execution" export {agentChannelModeAtom, type AgentChannelMode} from "./execution" +export {createNegotiatingFetch, type NegotiatingFetch} from "./execution" export {agentShouldResumeAfterApproval} from "./execution" export {canReleaseQueuedMessage, isHitlPending} from "./execution" diff --git a/web/packages/agenta-playground/tests/unit/agentNegotiation.test.ts b/web/packages/agenta-playground/tests/unit/agentNegotiation.test.ts new file mode 100644 index 0000000000..2fdbac4056 --- /dev/null +++ b/web/packages/agenta-playground/tests/unit/agentNegotiation.test.ts @@ -0,0 +1,97 @@ +/** + * Unit tests for `createNegotiatingFetch` — the agent lane's stream→batch→error negotiation. + * + * Guards the three outcomes JP's fixed-toggle cut couldn't express, each of which fails + * SILENTLY (a stream parser fed batch JSON, or a swallowed error) if untested: + * 1. stream honoured → parse as SSE (resolvedMode "stream") + * 2. 406 (can't stream)→ re-request as batch (resolvedMode "batch", Accept flips to json) + * 3. real error (5xx) → pass through untouched so useChat surfaces it inline + * Plus: an explicit batch request skips the stream attempt, and a server that answers a + * stream request with batch JSON is trusted by Content-Type. + */ +import {describe, expect, it, vi} from "vitest" + +import {createNegotiatingFetch} from "../../src/state/execution/agentNegotiation" + +const res = (body: string, init: {status?: number; contentType?: string}) => + new Response(body, { + status: init.status ?? 200, + headers: init.contentType ? {"content-type": init.contentType} : undefined, + }) + +const acceptOf = (init: RequestInit | undefined) => + new Headers(init?.headers as HeadersInit).get("accept") ?? "" + +describe("createNegotiatingFetch", () => { + it("honours a stream when the server answers text/event-stream", async () => { + const base = vi.fn(async () => res("data: {}\n\n", {contentType: "text/event-stream"})) + const {fetch, resolvedMode} = createNegotiatingFetch( + base as unknown as typeof globalThis.fetch, + ) + + const r = await fetch("/invoke", {headers: {Accept: "text/event-stream"}}) + + expect(r.status).toBe(200) + expect(resolvedMode()).toBe("stream") + expect(base).toHaveBeenCalledTimes(1) + }) + + it("falls back to a batch re-request when the server 406s a stream request", async () => { + const base = vi + .fn() + .mockResolvedValueOnce(res("not acceptable", {status: 406})) + .mockResolvedValueOnce(res('{"data":{}}', {contentType: "application/json"})) + const {fetch, resolvedMode} = createNegotiatingFetch( + base as unknown as typeof globalThis.fetch, + ) + + const r = await fetch("/invoke", {headers: {Accept: "text/event-stream"}}) + + expect(r.status).toBe(200) + expect(resolvedMode()).toBe("batch") + expect(base).toHaveBeenCalledTimes(2) + // the retry asked for batch JSON, dropping the stream Accept + expect(acceptOf(base.mock.calls[1][1])).toBe("application/json") + }) + + it("passes a real error (5xx) through untouched without a second request", async () => { + const base = vi.fn(async () => + res('{"status":{"code":500,"message":"boom"}}', { + status: 500, + contentType: "application/json", + }), + ) + const {fetch} = createNegotiatingFetch(base as unknown as typeof globalThis.fetch) + + const r = await fetch("/invoke", {headers: {Accept: "text/event-stream"}}) + + expect(r.status).toBe(500) + expect(await r.text()).toContain("boom") + expect(base).toHaveBeenCalledTimes(1) // no batch fallback on a non-406 error + }) + + it("skips the stream attempt when batch is requested explicitly", async () => { + const base = vi.fn(async () => res('{"data":{}}', {contentType: "application/json"})) + const {fetch, resolvedMode} = createNegotiatingFetch( + base as unknown as typeof globalThis.fetch, + ) + + await fetch("/invoke", {headers: {Accept: "application/json"}}) + + expect(resolvedMode()).toBe("batch") + expect(base).toHaveBeenCalledTimes(1) + expect(acceptOf(base.mock.calls[0][1])).toBe("application/json") + }) + + it("trusts the response Content-Type: a stream request answered with JSON resolves to batch", async () => { + const base = vi.fn(async () => res('{"data":{}}', {contentType: "application/json"})) + const {fetch, resolvedMode} = createNegotiatingFetch( + base as unknown as typeof globalThis.fetch, + ) + + await fetch("/invoke", {headers: {Accept: "text/event-stream"}}) + + expect(resolvedMode()).toBe("batch") + expect(base).toHaveBeenCalledTimes(1) + }) +})