-
Notifications
You must be signed in to change notification settings - Fork 555
feat(frontend): negotiate agent stream→batch fallback on /invoke #4875
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Merged
Merged
Changes from all commits
Commits
Show all changes
5 commits
Select commit
Hold shift + click to select a range
9ce1236
feat(frontend): negotiate agent stream→batch fallback on /invoke
ardaerzin b551ef4
Merge branch 'big-agents' into fe-feat/agent-invoke-endpoint
ardaerzin 6883e80
fix(frontend): bind agent stream/batch mode to the response body stream
ardaerzin 16b9767
Merge branch 'big-agents' into fe-feat/agent-invoke-endpoint
ardaerzin 8231240
Merge branch 'big-agents' into fe-feat/agent-invoke-endpoint
ardaerzin File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
134 changes: 134 additions & 0 deletions
134
web/packages/agenta-playground/src/state/execution/agentNegotiation.ts
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,134 @@ | ||
| import type {AgentChannelMode} from "./channelMode" | ||
|
|
||
| /** | ||
| * Client-side transport negotiation for the agent `/invoke` lane. | ||
| * | ||
| * The backend negotiates the response channel off the request `Accept` header | ||
| * (sdk/agents/adapters/vercel routing): `text/event-stream` asks for the v6 SSE | ||
| * UI-message stream, `application/json` for a single `WorkflowBatchResponse`. A | ||
| * handler that cannot stream answers a stream request with **406 Not Acceptable** | ||
| * (not an SSE body); a handler that errors answers with JSON carrying the real | ||
| * status code. | ||
| * | ||
| * JP's first cut sent a fixed `Accept` from the kebab toggle and trusted it. This | ||
| * adds the real negotiation the toggle's `stream` default implies: | ||
| * | ||
| * 1. **stream** — request the SSE stream. | ||
| * 2. **fall back to batch** — if the server can't stream (406), re-issue the same | ||
| * request as `application/json` and replay the batch as a one-shot stream. | ||
| * 3. **error gracefully** — any other non-OK response (or a failed batch fallback) | ||
| * is returned untouched so the AI-SDK transport throws its body text, which | ||
| * `useChat`'s `onError` surfaces inline (`parseAgentRunError`). | ||
| * | ||
| * An explicit `batch` toggle skips the stream attempt entirely. | ||
| */ | ||
| export interface NegotiatingFetch { | ||
| /** A `fetch` middleware to hand the AI-SDK transport. */ | ||
| fetch: typeof globalThis.fetch | ||
| /** | ||
| * The channel a response actually resolved to — drives how the transport parses the body | ||
| * (SSE stream vs. one-shot batch replay). | ||
| * | ||
| * Pass the response body the transport is about to parse (`response.body`); the mode is bound | ||
| * to that exact stream, so request and parse can never disagree even if requests overlap. With | ||
| * no argument it returns the LAST negotiated mode (kept for callers that don't have the stream). | ||
| */ | ||
| resolvedMode: (stream?: ReadableStream<Uint8Array> | null) => AgentChannelMode | ||
| } | ||
|
|
||
| type Headersish = HeadersInit | undefined | ||
|
|
||
| /** Read a header value tolerant of the three `HeadersInit` shapes the caller may pass. */ | ||
| const headerValue = (headers: Headersish, name: string): string => { | ||
| const lower = name.toLowerCase() | ||
| if (!headers) return "" | ||
| if (headers instanceof Headers) return headers.get(name) ?? "" | ||
| if (Array.isArray(headers)) { | ||
| const hit = headers.find(([k]) => k.toLowerCase() === lower) | ||
| return hit?.[1] ?? "" | ||
| } | ||
| for (const [k, v] of Object.entries(headers)) { | ||
| if (k.toLowerCase() === lower) return String(v) | ||
| } | ||
| return "" | ||
| } | ||
|
|
||
| /** Clone `headers` with `Accept` replaced — preserves every other header (auth, UA, format). */ | ||
| const withAccept = (headers: Headersish, accept: string): Record<string, string> => { | ||
| const out: Record<string, string> = {} | ||
| if (headers instanceof Headers) { | ||
| headers.forEach((v, k) => { | ||
| out[k] = v | ||
| }) | ||
| } else if (Array.isArray(headers)) { | ||
| for (const [k, v] of headers) out[k] = v | ||
| } else if (headers) { | ||
| for (const [k, v] of Object.entries(headers)) out[k] = String(v) | ||
| } | ||
| // Drop any existing Accept (case-insensitive) before setting the new one. | ||
| for (const k of Object.keys(out)) { | ||
| if (k.toLowerCase() === "accept") delete out[k] | ||
| } | ||
| out.Accept = accept | ||
| return out | ||
| } | ||
|
|
||
| /** HTTP status the SDK route returns when a stream was asked of a handler that can only batch. */ | ||
| const NOT_ACCEPTABLE = 406 | ||
|
|
||
| export function createNegotiatingFetch(baseFetch?: typeof globalThis.fetch): NegotiatingFetch { | ||
| const base = baseFetch ?? globalThis.fetch.bind(globalThis) | ||
|
|
||
| // Bind the resolved mode to the response body STREAM, not a shared variable: the transport | ||
| // reads it back when it parses that body, so keying on the stream keeps request and parse in | ||
| // lockstep even if two requests on this transport ever overlap. `last` is the no-arg fallback. | ||
| const modes = new WeakMap<ReadableStream<Uint8Array>, AgentChannelMode>() | ||
| let last: AgentChannelMode = "stream" | ||
|
|
||
| const remember = (res: Response, mode: AgentChannelMode): Response => { | ||
| last = mode | ||
| if (res.body) modes.set(res.body, mode) | ||
| return res | ||
| } | ||
|
|
||
| const fetch: typeof globalThis.fetch = async (input, init) => { | ||
| const wantsStream = headerValue(init?.headers, "accept").includes("text/event-stream") | ||
|
|
||
| // Explicit batch request (toggle = batch) — no stream attempt. | ||
| if (!wantsStream) { | ||
| return remember(await base(input, init), "batch") | ||
| } | ||
|
|
||
| const res = await base(input, init) | ||
|
|
||
| if (res.ok) { | ||
| // The server may honour the stream or, having no preference path, answer batch JSON. | ||
| // Trust the response Content-Type over our request intent. | ||
| const isStream = (res.headers.get("content-type") ?? "").includes("text/event-stream") | ||
| return remember(res, isStream ? "stream" : "batch") | ||
| } | ||
|
|
||
| // Negotiation failed because the handler can't stream — fall back to batch. | ||
| if (res.status === NOT_ACCEPTABLE) { | ||
| const batch = await base(input, { | ||
| ...init, | ||
| headers: withAccept(init?.headers, "application/json"), | ||
| }) | ||
| return remember(batch, "batch") | ||
| } | ||
|
|
||
| // A real error (4xx/5xx with the run's JSON envelope). Hand it back untouched so the | ||
| // AI-SDK transport throws its body text and `useChat` renders it inline. | ||
| return remember(res, "stream") | ||
| } | ||
|
|
||
| const resolvedMode = (stream?: ReadableStream<Uint8Array> | null): AgentChannelMode => { | ||
| if (stream) { | ||
| const mode = modes.get(stream) | ||
| if (mode) return mode | ||
| } | ||
| return last | ||
| } | ||
|
|
||
| return {fetch, resolvedMode} | ||
| } | ||
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
97 changes: 97 additions & 0 deletions
97
web/packages/agenta-playground/tests/unit/agentNegotiation.test.ts
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,97 @@ | ||
| /** | ||
| * Unit tests for `createNegotiatingFetch` — the agent lane's stream→batch→error negotiation. | ||
| * | ||
| * Guards the three outcomes JP's fixed-toggle cut couldn't express, each of which fails | ||
| * SILENTLY (a stream parser fed batch JSON, or a swallowed error) if untested: | ||
| * 1. stream honoured → parse as SSE (resolvedMode "stream") | ||
| * 2. 406 (can't stream)→ re-request as batch (resolvedMode "batch", Accept flips to json) | ||
| * 3. real error (5xx) → pass through untouched so useChat surfaces it inline | ||
| * Plus: an explicit batch request skips the stream attempt, and a server that answers a | ||
| * stream request with batch JSON is trusted by Content-Type. | ||
| */ | ||
| import {describe, expect, it, vi} from "vitest" | ||
|
|
||
| import {createNegotiatingFetch} from "../../src/state/execution/agentNegotiation" | ||
|
|
||
| const res = (body: string, init: {status?: number; contentType?: string}) => | ||
| new Response(body, { | ||
| status: init.status ?? 200, | ||
| headers: init.contentType ? {"content-type": init.contentType} : undefined, | ||
| }) | ||
|
|
||
| const acceptOf = (init: RequestInit | undefined) => | ||
| new Headers(init?.headers as HeadersInit).get("accept") ?? "" | ||
|
|
||
| describe("createNegotiatingFetch", () => { | ||
| it("honours a stream when the server answers text/event-stream", async () => { | ||
| const base = vi.fn(async () => res("data: {}\n\n", {contentType: "text/event-stream"})) | ||
| const {fetch, resolvedMode} = createNegotiatingFetch( | ||
| base as unknown as typeof globalThis.fetch, | ||
| ) | ||
|
|
||
| const r = await fetch("/invoke", {headers: {Accept: "text/event-stream"}}) | ||
|
|
||
| expect(r.status).toBe(200) | ||
| expect(resolvedMode()).toBe("stream") | ||
| expect(base).toHaveBeenCalledTimes(1) | ||
| }) | ||
|
|
||
| it("falls back to a batch re-request when the server 406s a stream request", async () => { | ||
| const base = vi | ||
| .fn() | ||
| .mockResolvedValueOnce(res("not acceptable", {status: 406})) | ||
| .mockResolvedValueOnce(res('{"data":{}}', {contentType: "application/json"})) | ||
| const {fetch, resolvedMode} = createNegotiatingFetch( | ||
| base as unknown as typeof globalThis.fetch, | ||
| ) | ||
|
|
||
| const r = await fetch("/invoke", {headers: {Accept: "text/event-stream"}}) | ||
|
|
||
| expect(r.status).toBe(200) | ||
| expect(resolvedMode()).toBe("batch") | ||
| expect(base).toHaveBeenCalledTimes(2) | ||
| // the retry asked for batch JSON, dropping the stream Accept | ||
| expect(acceptOf(base.mock.calls[1][1])).toBe("application/json") | ||
| }) | ||
|
|
||
| it("passes a real error (5xx) through untouched without a second request", async () => { | ||
| const base = vi.fn(async () => | ||
| res('{"status":{"code":500,"message":"boom"}}', { | ||
| status: 500, | ||
| contentType: "application/json", | ||
| }), | ||
| ) | ||
| const {fetch} = createNegotiatingFetch(base as unknown as typeof globalThis.fetch) | ||
|
|
||
| const r = await fetch("/invoke", {headers: {Accept: "text/event-stream"}}) | ||
|
|
||
| expect(r.status).toBe(500) | ||
| expect(await r.text()).toContain("boom") | ||
| expect(base).toHaveBeenCalledTimes(1) // no batch fallback on a non-406 error | ||
| }) | ||
|
|
||
| it("skips the stream attempt when batch is requested explicitly", async () => { | ||
| const base = vi.fn(async () => res('{"data":{}}', {contentType: "application/json"})) | ||
| const {fetch, resolvedMode} = createNegotiatingFetch( | ||
| base as unknown as typeof globalThis.fetch, | ||
| ) | ||
|
|
||
| await fetch("/invoke", {headers: {Accept: "application/json"}}) | ||
|
|
||
| expect(resolvedMode()).toBe("batch") | ||
| expect(base).toHaveBeenCalledTimes(1) | ||
| expect(acceptOf(base.mock.calls[0][1])).toBe("application/json") | ||
| }) | ||
|
|
||
| it("trusts the response Content-Type: a stream request answered with JSON resolves to batch", async () => { | ||
| const base = vi.fn(async () => res('{"data":{}}', {contentType: "application/json"})) | ||
| const {fetch, resolvedMode} = createNegotiatingFetch( | ||
| base as unknown as typeof globalThis.fetch, | ||
| ) | ||
|
|
||
| await fetch("/invoke", {headers: {Accept: "text/event-stream"}}) | ||
|
|
||
| expect(resolvedMode()).toBe("batch") | ||
| expect(base).toHaveBeenCalledTimes(1) | ||
| }) | ||
| }) |
Oops, something went wrong.
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
Uh oh!
There was an error while loading. Please reload this page.