From 9ce12367b1d6184462e669b1c2e8a2b6380607b2 Mon Sep 17 00:00:00 2001 From: Arda Erzin Date: Fri, 26 Jun 2026 13:48:05 +0200 Subject: [PATCH 1/2] =?UTF-8?q?feat(frontend):=20negotiate=20agent=20strea?= =?UTF-8?q?m=E2=86=92batch=20fallback=20on=20/invoke?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit The agent playground sent a fixed Accept from the kebab toggle and trusted it, with no fallback if the backend could not stream. Add the negotiation the 'stream' default implies, aligned to the SDK /invoke contract: - createNegotiatingFetch (new, @agenta/playground): a fetch middleware that requests the SSE stream, falls back to a batch re-request on a 406 (handler can't stream), and passes any other non-OK response through so useChat surfaces the real error inline. - AgentChatTransport now owns its fetch via the negotiator and parses the body by the channel that actually resolved, not the requested one — fixing the latent case where a stream request that fell back to batch was fed to the SSE parser. - channelMode doc + barrel exports updated; the Stream/Batch toggle is now a preference (Stream = negotiate with fallback, Batch = force batch). - Adds 5 unit tests covering stream, 406 fallback, error pass-through, explicit batch, and Content-Type trust. --- .../assets/AgentChatTransport.ts | 34 ++++-- web/packages/agenta-playground/src/index.ts | 3 +- .../src/state/execution/agentNegotiation.ts | 109 ++++++++++++++++++ .../src/state/execution/channelMode.ts | 10 +- .../src/state/execution/index.ts | 2 + .../agenta-playground/src/state/index.ts | 1 + .../tests/unit/agentNegotiation.test.ts | 97 ++++++++++++++++ 7 files changed, 240 insertions(+), 16 deletions(-) create mode 100644 web/packages/agenta-playground/src/state/execution/agentNegotiation.ts create mode 100644 web/packages/agenta-playground/tests/unit/agentNegotiation.test.ts diff --git a/web/oss/src/components/AgentChatSlice/assets/AgentChatTransport.ts b/web/oss/src/components/AgentChatSlice/assets/AgentChatTransport.ts index 00740c8a7d..b8f84c6e44 100644 --- a/web/oss/src/components/AgentChatSlice/assets/AgentChatTransport.ts +++ b/web/oss/src/components/AgentChatSlice/assets/AgentChatTransport.ts @@ -1,19 +1,20 @@ -import {agentChannelModeAtom} from "@agenta/playground" +import {createNegotiatingFetch, type NegotiatingFetch} from "@agenta/playground" import {DefaultChatTransport, type UIMessage, type UIMessageChunk} from "ai" -import {getDefaultStore} from "jotai" /** * Agent chat transport. * - * `useChat` only renders a stream of `UIMessageChunk`s — it has no "batch" mode. So when the - * playground's channel mode is `batch`, the backend returns a single `WorkflowBatchResponse` - * (JSON, because `buildAgentRequest` sent `Accept: application/json`) and this transport replays + * `useChat` only renders a stream of `UIMessageChunk`s — it has no "batch" mode. So when the run + * resolves to a batch (the toggle forced it, or the backend fell back because the handler can't + * stream), the backend returns a single `WorkflowBatchResponse` (JSON) and this transport replays * it as a ONE-SHOT UIMessage stream — the same chunk sequence the SSE path emits — so the reply - * lands in a single frame. Stream mode delegates to the default SSE parser unchanged. + * lands in a single frame. A real stream delegates to the default SSE parser unchanged. * - * The mode is read from the shared `agentChannelModeAtom` (the same atom that set the request's - * Accept header), so the request channel and the response handling always agree. The main - * playground uses the default Jotai store, which is what `buildAgentRequest` reads too. + * Which channel resolved is decided by the `createNegotiatingFetch` middleware, NOT a fixed + * toggle: it requests the stream, falls back to a batch re-request on a 406 (handler can't + * stream), and passes any other error through so `useChat` surfaces it inline. The transport + * parses the body according to the channel that fetch actually resolved (`resolvedMode`), so the + * request and the response handling can never disagree. */ type AnyChunk = UIMessageChunk @@ -190,9 +191,20 @@ function batchJsonToUiMessageStream( } export class AgentChatTransport extends DefaultChatTransport { + private readonly negotiator: NegotiatingFetch + + constructor(options: ConstructorParameters>[0] = {}) { + // Own the transport's `fetch` so every request goes through stream→batch negotiation; + // any caller-supplied fetch becomes the negotiator's base (tests inject one here). + super({...options, fetch: undefined}) + this.negotiator = createNegotiatingFetch(options.fetch) + this.fetch = this.negotiator.fetch + } + protected processResponseStream(stream: ReadableStream): ReadableStream { - const mode = getDefaultStore().get(agentChannelModeAtom) - if (mode === "batch") return batchJsonToUiMessageStream(stream) + // Parse by the channel the request actually resolved to, not the requested one — a stream + // request can come back as a batch via the 406 fallback. + if (this.negotiator.resolvedMode() === "batch") return batchJsonToUiMessageStream(stream) return super.processResponseStream(stream) } } diff --git a/web/packages/agenta-playground/src/index.ts b/web/packages/agenta-playground/src/index.ts index fa514ca970..4fb179face 100644 --- a/web/packages/agenta-playground/src/index.ts +++ b/web/packages/agenta-playground/src/index.ts @@ -75,8 +75,9 @@ export { buildAgentRequest, buildAgentReferences, agentChannelModeAtom, + createNegotiatingFetch, } from "./state" -export type {AgentRequest, AgentChannelMode} from "./state" +export type {AgentRequest, AgentChannelMode, NegotiatingFetch} from "./state" // HITL resume predicate for `useChat`'s `sendAutomaticallyWhen` (approve AND deny resume). export {agentShouldResumeAfterApproval} from "./state" diff --git a/web/packages/agenta-playground/src/state/execution/agentNegotiation.ts b/web/packages/agenta-playground/src/state/execution/agentNegotiation.ts new file mode 100644 index 0000000000..c43f1c2edc --- /dev/null +++ b/web/packages/agenta-playground/src/state/execution/agentNegotiation.ts @@ -0,0 +1,109 @@ +import type {AgentChannelMode} from "./channelMode" + +/** + * Client-side transport negotiation for the agent `/invoke` lane. + * + * The backend negotiates the response channel off the request `Accept` header + * (sdk/agents/adapters/vercel routing): `text/event-stream` asks for the v6 SSE + * UI-message stream, `application/json` for a single `WorkflowBatchResponse`. A + * handler that cannot stream answers a stream request with **406 Not Acceptable** + * (not an SSE body); a handler that errors answers with JSON carrying the real + * status code. + * + * JP's first cut sent a fixed `Accept` from the kebab toggle and trusted it. This + * adds the real negotiation the toggle's `stream` default implies: + * + * 1. **stream** — request the SSE stream. + * 2. **fall back to batch** — if the server can't stream (406), re-issue the same + * request as `application/json` and replay the batch as a one-shot stream. + * 3. **error gracefully** — any other non-OK response (or a failed batch fallback) + * is returned untouched so the AI-SDK transport throws its body text, which + * `useChat`'s `onError` surfaces inline (`parseAgentRunError`). + * + * An explicit `batch` toggle skips the stream attempt entirely. + */ +export interface NegotiatingFetch { + /** A `fetch` middleware to hand the AI-SDK transport. */ + fetch: typeof globalThis.fetch + /** The channel the LAST request actually resolved to — drives how the transport + * parses the body (SSE stream vs. one-shot batch replay). */ + resolvedMode: () => AgentChannelMode +} + +type Headersish = HeadersInit | undefined + +/** Read a header value tolerant of the three `HeadersInit` shapes the caller may pass. */ +const headerValue = (headers: Headersish, name: string): string => { + const lower = name.toLowerCase() + if (!headers) return "" + if (headers instanceof Headers) return headers.get(name) ?? "" + if (Array.isArray(headers)) { + const hit = headers.find(([k]) => k.toLowerCase() === lower) + return hit?.[1] ?? "" + } + for (const [k, v] of Object.entries(headers)) { + if (k.toLowerCase() === lower) return String(v) + } + return "" +} + +/** Clone `headers` with `Accept` replaced — preserves every other header (auth, UA, format). */ +const withAccept = (headers: Headersish, accept: string): Record => { + const out: Record = {} + if (headers instanceof Headers) { + headers.forEach((v, k) => { + out[k] = v + }) + } else if (Array.isArray(headers)) { + for (const [k, v] of headers) out[k] = v + } else if (headers) { + for (const [k, v] of Object.entries(headers)) out[k] = String(v) + } + // Drop any existing Accept (case-insensitive) before setting the new one. + for (const k of Object.keys(out)) { + if (k.toLowerCase() === "accept") delete out[k] + } + out.Accept = accept + return out +} + +/** HTTP status the SDK route returns when a stream was asked of a handler that can only batch. */ +const NOT_ACCEPTABLE = 406 + +export function createNegotiatingFetch(baseFetch?: typeof globalThis.fetch): NegotiatingFetch { + const base = baseFetch ?? globalThis.fetch.bind(globalThis) + let mode: AgentChannelMode = "stream" + + const fetch: typeof globalThis.fetch = async (input, init) => { + const wantsStream = headerValue(init?.headers, "accept").includes("text/event-stream") + + // Explicit batch request (toggle = batch) — no stream attempt. + if (!wantsStream) { + mode = "batch" + return base(input, init) + } + + const res = await base(input, init) + + if (res.ok) { + // The server may honour the stream or, having no preference path, answer batch JSON. + // Trust the response Content-Type over our request intent. + const isStream = (res.headers.get("content-type") ?? "").includes("text/event-stream") + mode = isStream ? "stream" : "batch" + return res + } + + // Negotiation failed because the handler can't stream — fall back to batch. + if (res.status === NOT_ACCEPTABLE) { + mode = "batch" + return base(input, {...init, headers: withAccept(init?.headers, "application/json")}) + } + + // A real error (4xx/5xx with the run's JSON envelope). Hand it back untouched so the + // AI-SDK transport throws its body text and `useChat` renders it inline. + mode = "stream" + return res + } + + return {fetch, resolvedMode: () => mode} +} diff --git a/web/packages/agenta-playground/src/state/execution/channelMode.ts b/web/packages/agenta-playground/src/state/execution/channelMode.ts index aa389659c0..2ceab6aca3 100644 --- a/web/packages/agenta-playground/src/state/execution/channelMode.ts +++ b/web/packages/agenta-playground/src/state/execution/channelMode.ts @@ -4,11 +4,13 @@ export type AgentChannelMode = "stream" | "batch" /** * How the agent playground talks to the agent `/invoke` endpoint: - * - `stream` (default): the real-time SSE UIMessage stream `useChat` renders token-by-token. - * - `batch`: a single JSON response (`WorkflowBatchResponse`); the transport replays it as a - * one-shot UIMessage stream, so the reply lands in one frame instead of streaming. + * - `stream` (default): request the real-time SSE UIMessage stream `useChat` renders + * token-by-token. If the backend can't stream (the handler can only batch → 406), the + * transport's `createNegotiatingFetch` middleware transparently falls back to a batch. + * - `batch`: skip the stream attempt and request a single JSON `WorkflowBatchResponse` up + * front; the transport replays it as a one-shot UIMessage stream so it lands in one frame. * - * This is a transport/controller concern (which channel the playground speaks), NOT revision + * This is a transport/controller concern (which channel the playground PREFERS), NOT revision * config — it is never persisted on the agent revision. `buildAgentRequest` reads it to set the * `Accept` header; the playground kebab menu writes it. Stream is the default for agents. */ diff --git a/web/packages/agenta-playground/src/state/execution/index.ts b/web/packages/agenta-playground/src/state/execution/index.ts index 87036a71d7..0bb0d1ffe6 100644 --- a/web/packages/agenta-playground/src/state/execution/index.ts +++ b/web/packages/agenta-playground/src/state/execution/index.ts @@ -354,5 +354,7 @@ export { export {buildAgentRequest, buildAgentReferences, type AgentRequest} from "./agentRequest" // Stream vs batch response channel for the agent lane (read by buildAgentRequest's Accept header). export {agentChannelModeAtom, type AgentChannelMode} from "./channelMode" +// Transport negotiation: try stream, fall back to batch on 406, error gracefully otherwise. +export {createNegotiatingFetch, type NegotiatingFetch} from "./agentNegotiation" // Agent-lane HITL resume predicate (approve AND deny both resume the conversation). export {agentShouldResumeAfterApproval} from "./agentApprovalResume" diff --git a/web/packages/agenta-playground/src/state/index.ts b/web/packages/agenta-playground/src/state/index.ts index e0ce0f709f..732c5fa02b 100644 --- a/web/packages/agenta-playground/src/state/index.ts +++ b/web/packages/agenta-playground/src/state/index.ts @@ -177,6 +177,7 @@ export {appTypeAtom, isChatModeAtom, type AppType} from "./execution" export {isAgentModeAtomFamily} from "./execution" export {buildAgentRequest, buildAgentReferences, type AgentRequest} from "./execution" export {agentChannelModeAtom, type AgentChannelMode} from "./execution" +export {createNegotiatingFetch, type NegotiatingFetch} from "./execution" export {agentShouldResumeAfterApproval} from "./execution" export {filterUnreferencedColumnsForSource} from "./execution" diff --git a/web/packages/agenta-playground/tests/unit/agentNegotiation.test.ts b/web/packages/agenta-playground/tests/unit/agentNegotiation.test.ts new file mode 100644 index 0000000000..2fdbac4056 --- /dev/null +++ b/web/packages/agenta-playground/tests/unit/agentNegotiation.test.ts @@ -0,0 +1,97 @@ +/** + * Unit tests for `createNegotiatingFetch` — the agent lane's stream→batch→error negotiation. + * + * Guards the three outcomes JP's fixed-toggle cut couldn't express, each of which fails + * SILENTLY (a stream parser fed batch JSON, or a swallowed error) if untested: + * 1. stream honoured → parse as SSE (resolvedMode "stream") + * 2. 406 (can't stream)→ re-request as batch (resolvedMode "batch", Accept flips to json) + * 3. real error (5xx) → pass through untouched so useChat surfaces it inline + * Plus: an explicit batch request skips the stream attempt, and a server that answers a + * stream request with batch JSON is trusted by Content-Type. + */ +import {describe, expect, it, vi} from "vitest" + +import {createNegotiatingFetch} from "../../src/state/execution/agentNegotiation" + +const res = (body: string, init: {status?: number; contentType?: string}) => + new Response(body, { + status: init.status ?? 200, + headers: init.contentType ? {"content-type": init.contentType} : undefined, + }) + +const acceptOf = (init: RequestInit | undefined) => + new Headers(init?.headers as HeadersInit).get("accept") ?? "" + +describe("createNegotiatingFetch", () => { + it("honours a stream when the server answers text/event-stream", async () => { + const base = vi.fn(async () => res("data: {}\n\n", {contentType: "text/event-stream"})) + const {fetch, resolvedMode} = createNegotiatingFetch( + base as unknown as typeof globalThis.fetch, + ) + + const r = await fetch("/invoke", {headers: {Accept: "text/event-stream"}}) + + expect(r.status).toBe(200) + expect(resolvedMode()).toBe("stream") + expect(base).toHaveBeenCalledTimes(1) + }) + + it("falls back to a batch re-request when the server 406s a stream request", async () => { + const base = vi + .fn() + .mockResolvedValueOnce(res("not acceptable", {status: 406})) + .mockResolvedValueOnce(res('{"data":{}}', {contentType: "application/json"})) + const {fetch, resolvedMode} = createNegotiatingFetch( + base as unknown as typeof globalThis.fetch, + ) + + const r = await fetch("/invoke", {headers: {Accept: "text/event-stream"}}) + + expect(r.status).toBe(200) + expect(resolvedMode()).toBe("batch") + expect(base).toHaveBeenCalledTimes(2) + // the retry asked for batch JSON, dropping the stream Accept + expect(acceptOf(base.mock.calls[1][1])).toBe("application/json") + }) + + it("passes a real error (5xx) through untouched without a second request", async () => { + const base = vi.fn(async () => + res('{"status":{"code":500,"message":"boom"}}', { + status: 500, + contentType: "application/json", + }), + ) + const {fetch} = createNegotiatingFetch(base as unknown as typeof globalThis.fetch) + + const r = await fetch("/invoke", {headers: {Accept: "text/event-stream"}}) + + expect(r.status).toBe(500) + expect(await r.text()).toContain("boom") + expect(base).toHaveBeenCalledTimes(1) // no batch fallback on a non-406 error + }) + + it("skips the stream attempt when batch is requested explicitly", async () => { + const base = vi.fn(async () => res('{"data":{}}', {contentType: "application/json"})) + const {fetch, resolvedMode} = createNegotiatingFetch( + base as unknown as typeof globalThis.fetch, + ) + + await fetch("/invoke", {headers: {Accept: "application/json"}}) + + expect(resolvedMode()).toBe("batch") + expect(base).toHaveBeenCalledTimes(1) + expect(acceptOf(base.mock.calls[0][1])).toBe("application/json") + }) + + it("trusts the response Content-Type: a stream request answered with JSON resolves to batch", async () => { + const base = vi.fn(async () => res('{"data":{}}', {contentType: "application/json"})) + const {fetch, resolvedMode} = createNegotiatingFetch( + base as unknown as typeof globalThis.fetch, + ) + + await fetch("/invoke", {headers: {Accept: "text/event-stream"}}) + + expect(resolvedMode()).toBe("batch") + expect(base).toHaveBeenCalledTimes(1) + }) +}) From 6883e80a0bc6ff097182d05944c034b0ba2cb696 Mon Sep 17 00:00:00 2001 From: Arda Erzin Date: Sun, 28 Jun 2026 17:47:07 +0200 Subject: [PATCH 2/2] fix(frontend): bind agent stream/batch mode to the response body stream The negotiating fetch tracked the resolved channel (stream vs. batch) in a shared mutable variable that AgentChatTransport read back later in processResponseStream. That read-back across the fetch->parse boundary is only safe while requests on a transport never overlap; it is a latent race if they ever do (the parse could pick up a sibling request's mode). Key the mode off the response body ReadableStream via a WeakMap instead, so the parse decision is bound to the exact body being parsed. resolvedMode now takes the stream (with a no-arg fallback to the last negotiation for callers without it). Behaviour is unchanged for the existing single-request paths; the 5 negotiation unit tests stay green. --- .../assets/AgentChatTransport.ts | 6 ++- .../src/state/execution/agentNegotiation.ts | 51 ++++++++++++++----- 2 files changed, 42 insertions(+), 15 deletions(-) diff --git a/web/oss/src/components/AgentChatSlice/assets/AgentChatTransport.ts b/web/oss/src/components/AgentChatSlice/assets/AgentChatTransport.ts index 8edf907639..102924ff59 100644 --- a/web/oss/src/components/AgentChatSlice/assets/AgentChatTransport.ts +++ b/web/oss/src/components/AgentChatSlice/assets/AgentChatTransport.ts @@ -203,8 +203,10 @@ export class AgentChatTransport extends DefaultChatTransport { protected processResponseStream(stream: ReadableStream): ReadableStream { // Parse by the channel the request actually resolved to, not the requested one — a stream - // request can come back as a batch via the 406 fallback. - if (this.negotiator.resolvedMode() === "batch") return batchJsonToUiMessageStream(stream) + // request can come back as a batch via the 406 fallback. The mode is keyed off this exact + // body stream (`resolvedMode(stream)`), so request and parse stay in lockstep. + if (this.negotiator.resolvedMode(stream) === "batch") + return batchJsonToUiMessageStream(stream) return super.processResponseStream(stream) } } diff --git a/web/packages/agenta-playground/src/state/execution/agentNegotiation.ts b/web/packages/agenta-playground/src/state/execution/agentNegotiation.ts index c43f1c2edc..51496a2966 100644 --- a/web/packages/agenta-playground/src/state/execution/agentNegotiation.ts +++ b/web/packages/agenta-playground/src/state/execution/agentNegotiation.ts @@ -25,9 +25,15 @@ import type {AgentChannelMode} from "./channelMode" export interface NegotiatingFetch { /** A `fetch` middleware to hand the AI-SDK transport. */ fetch: typeof globalThis.fetch - /** The channel the LAST request actually resolved to — drives how the transport - * parses the body (SSE stream vs. one-shot batch replay). */ - resolvedMode: () => AgentChannelMode + /** + * The channel a response actually resolved to — drives how the transport parses the body + * (SSE stream vs. one-shot batch replay). + * + * Pass the response body the transport is about to parse (`response.body`); the mode is bound + * to that exact stream, so request and parse can never disagree even if requests overlap. With + * no argument it returns the LAST negotiated mode (kept for callers that don't have the stream). + */ + resolvedMode: (stream?: ReadableStream | null) => AgentChannelMode } type Headersish = HeadersInit | undefined @@ -72,15 +78,25 @@ const NOT_ACCEPTABLE = 406 export function createNegotiatingFetch(baseFetch?: typeof globalThis.fetch): NegotiatingFetch { const base = baseFetch ?? globalThis.fetch.bind(globalThis) - let mode: AgentChannelMode = "stream" + + // Bind the resolved mode to the response body STREAM, not a shared variable: the transport + // reads it back when it parses that body, so keying on the stream keeps request and parse in + // lockstep even if two requests on this transport ever overlap. `last` is the no-arg fallback. + const modes = new WeakMap, AgentChannelMode>() + let last: AgentChannelMode = "stream" + + const remember = (res: Response, mode: AgentChannelMode): Response => { + last = mode + if (res.body) modes.set(res.body, mode) + return res + } const fetch: typeof globalThis.fetch = async (input, init) => { const wantsStream = headerValue(init?.headers, "accept").includes("text/event-stream") // Explicit batch request (toggle = batch) — no stream attempt. if (!wantsStream) { - mode = "batch" - return base(input, init) + return remember(await base(input, init), "batch") } const res = await base(input, init) @@ -89,21 +105,30 @@ export function createNegotiatingFetch(baseFetch?: typeof globalThis.fetch): Neg // The server may honour the stream or, having no preference path, answer batch JSON. // Trust the response Content-Type over our request intent. const isStream = (res.headers.get("content-type") ?? "").includes("text/event-stream") - mode = isStream ? "stream" : "batch" - return res + return remember(res, isStream ? "stream" : "batch") } // Negotiation failed because the handler can't stream — fall back to batch. if (res.status === NOT_ACCEPTABLE) { - mode = "batch" - return base(input, {...init, headers: withAccept(init?.headers, "application/json")}) + const batch = await base(input, { + ...init, + headers: withAccept(init?.headers, "application/json"), + }) + return remember(batch, "batch") } // A real error (4xx/5xx with the run's JSON envelope). Hand it back untouched so the // AI-SDK transport throws its body text and `useChat` renders it inline. - mode = "stream" - return res + return remember(res, "stream") + } + + const resolvedMode = (stream?: ReadableStream | null): AgentChannelMode => { + if (stream) { + const mode = modes.get(stream) + if (mode) return mode + } + return last } - return {fetch, resolvedMode: () => mode} + return {fetch, resolvedMode} }