Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
36 changes: 25 additions & 11 deletions web/oss/src/components/AgentChatSlice/assets/AgentChatTransport.ts
Original file line number Diff line number Diff line change
@@ -1,19 +1,20 @@
import {agentChannelModeAtom} from "@agenta/playground"
import {createNegotiatingFetch, type NegotiatingFetch} from "@agenta/playground"
import {DefaultChatTransport, type UIMessage, type UIMessageChunk} from "ai"
import {getDefaultStore} from "jotai"

/**
* Agent chat transport.
*
* `useChat` only renders a stream of `UIMessageChunk`s — it has no "batch" mode. So when the
* playground's channel mode is `batch`, the backend returns a single `WorkflowBatchResponse`
* (JSON, because `buildAgentRequest` sent `Accept: application/json`) and this transport replays
* `useChat` only renders a stream of `UIMessageChunk`s — it has no "batch" mode. So when the run
* resolves to a batch (the toggle forced it, or the backend fell back because the handler can't
* stream), the backend returns a single `WorkflowBatchResponse` (JSON) and this transport replays
* it as a ONE-SHOT UIMessage stream — the same chunk sequence the SSE path emits — so the reply
* lands in a single frame. Stream mode delegates to the default SSE parser unchanged.
* lands in a single frame. A real stream delegates to the default SSE parser unchanged.
*
* The mode is read from the shared `agentChannelModeAtom` (the same atom that set the request's
* Accept header), so the request channel and the response handling always agree. The main
* playground uses the default Jotai store, which is what `buildAgentRequest` reads too.
* Which channel resolved is decided by the `createNegotiatingFetch` middleware, NOT a fixed
* toggle: it requests the stream, falls back to a batch re-request on a 406 (handler can't
* stream), and passes any other error through so `useChat` surfaces it inline. The transport
* parses the body according to the channel that fetch actually resolved (`resolvedMode`), so the
* request and the response handling can never disagree.
*/
type AnyChunk = UIMessageChunk

Expand Down Expand Up @@ -190,9 +191,22 @@ function batchJsonToUiMessageStream(
}

export class AgentChatTransport extends DefaultChatTransport<UIMessage> {
private readonly negotiator: NegotiatingFetch

constructor(options: ConstructorParameters<typeof DefaultChatTransport<UIMessage>>[0] = {}) {
// Own the transport's `fetch` so every request goes through stream→batch negotiation;
// any caller-supplied fetch becomes the negotiator's base (tests inject one here).
super({...options, fetch: undefined})
this.negotiator = createNegotiatingFetch(options.fetch)
this.fetch = this.negotiator.fetch
}

protected processResponseStream(stream: ReadableStream<Uint8Array>): ReadableStream<AnyChunk> {
const mode = getDefaultStore().get(agentChannelModeAtom)
if (mode === "batch") return batchJsonToUiMessageStream(stream)
// Parse by the channel the request actually resolved to, not the requested one — a stream
// request can come back as a batch via the 406 fallback. The mode is keyed off this exact
// body stream (`resolvedMode(stream)`), so request and parse stay in lockstep.
if (this.negotiator.resolvedMode(stream) === "batch")
return batchJsonToUiMessageStream(stream)
return super.processResponseStream(stream)
}
}
3 changes: 2 additions & 1 deletion web/packages/agenta-playground/src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -75,8 +75,9 @@ export {
buildAgentRequest,
buildAgentReferences,
agentChannelModeAtom,
createNegotiatingFetch,
} from "./state"
export type {AgentRequest, AgentChannelMode} from "./state"
export type {AgentRequest, AgentChannelMode, NegotiatingFetch} from "./state"
// HITL resume predicate for `useChat`'s `sendAutomaticallyWhen` (approve AND deny resume).
export {agentShouldResumeAfterApproval} from "./state"
// Queued-message release gate for the agent chat composer (HITL-safe, one-by-one).
Expand Down
134 changes: 134 additions & 0 deletions web/packages/agenta-playground/src/state/execution/agentNegotiation.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,134 @@
import type {AgentChannelMode} from "./channelMode"

/**
* Client-side transport negotiation for the agent `/invoke` lane.
*
* The backend negotiates the response channel off the request `Accept` header
* (sdk/agents/adapters/vercel routing): `text/event-stream` asks for the v6 SSE
* UI-message stream, `application/json` for a single `WorkflowBatchResponse`. A
* handler that cannot stream answers a stream request with **406 Not Acceptable**
* (not an SSE body); a handler that errors answers with JSON carrying the real
* status code.
*
* JP's first cut sent a fixed `Accept` from the kebab toggle and trusted it. This
* adds the real negotiation the toggle's `stream` default implies:
*
* 1. **stream** — request the SSE stream.
* 2. **fall back to batch** — if the server can't stream (406), re-issue the same
* request as `application/json` and replay the batch as a one-shot stream.
* 3. **error gracefully** — any other non-OK response (or a failed batch fallback)
* is returned untouched so the AI-SDK transport throws its body text, which
* `useChat`'s `onError` surfaces inline (`parseAgentRunError`).
*
* An explicit `batch` toggle skips the stream attempt entirely.
*/
export interface NegotiatingFetch {
/** A `fetch` middleware to hand the AI-SDK transport. */
fetch: typeof globalThis.fetch
/**
* The channel a response actually resolved to — drives how the transport parses the body
* (SSE stream vs. one-shot batch replay).
*
* Pass the response body the transport is about to parse (`response.body`); the mode is bound
* to that exact stream, so request and parse can never disagree even if requests overlap. With
* no argument it returns the LAST negotiated mode (kept for callers that don't have the stream).
*/
resolvedMode: (stream?: ReadableStream<Uint8Array> | null) => AgentChannelMode
}

type Headersish = HeadersInit | undefined

/** Read a header value tolerant of the three `HeadersInit` shapes the caller may pass. */
const headerValue = (headers: Headersish, name: string): string => {
const lower = name.toLowerCase()
if (!headers) return ""
if (headers instanceof Headers) return headers.get(name) ?? ""
if (Array.isArray(headers)) {
const hit = headers.find(([k]) => k.toLowerCase() === lower)
return hit?.[1] ?? ""
}
for (const [k, v] of Object.entries(headers)) {
if (k.toLowerCase() === lower) return String(v)
}
return ""
}

/** Clone `headers` with `Accept` replaced — preserves every other header (auth, UA, format). */
const withAccept = (headers: Headersish, accept: string): Record<string, string> => {
const out: Record<string, string> = {}
if (headers instanceof Headers) {
headers.forEach((v, k) => {
out[k] = v
})
} else if (Array.isArray(headers)) {
for (const [k, v] of headers) out[k] = v
} else if (headers) {
for (const [k, v] of Object.entries(headers)) out[k] = String(v)
}
// Drop any existing Accept (case-insensitive) before setting the new one.
for (const k of Object.keys(out)) {
if (k.toLowerCase() === "accept") delete out[k]
}
out.Accept = accept
return out
}

/** HTTP status the SDK route returns when a stream was asked of a handler that can only batch. */
const NOT_ACCEPTABLE = 406

export function createNegotiatingFetch(baseFetch?: typeof globalThis.fetch): NegotiatingFetch {
const base = baseFetch ?? globalThis.fetch.bind(globalThis)

// Bind the resolved mode to the response body STREAM, not a shared variable: the transport
// reads it back when it parses that body, so keying on the stream keeps request and parse in
// lockstep even if two requests on this transport ever overlap. `last` is the no-arg fallback.
const modes = new WeakMap<ReadableStream<Uint8Array>, AgentChannelMode>()
let last: AgentChannelMode = "stream"

const remember = (res: Response, mode: AgentChannelMode): Response => {
last = mode
if (res.body) modes.set(res.body, mode)
return res
}

const fetch: typeof globalThis.fetch = async (input, init) => {
const wantsStream = headerValue(init?.headers, "accept").includes("text/event-stream")

// Explicit batch request (toggle = batch) — no stream attempt.
if (!wantsStream) {
return remember(await base(input, init), "batch")
}

const res = await base(input, init)

if (res.ok) {
// The server may honour the stream or, having no preference path, answer batch JSON.
// Trust the response Content-Type over our request intent.
const isStream = (res.headers.get("content-type") ?? "").includes("text/event-stream")
return remember(res, isStream ? "stream" : "batch")
}

// Negotiation failed because the handler can't stream — fall back to batch.
if (res.status === NOT_ACCEPTABLE) {
const batch = await base(input, {
...init,
headers: withAccept(init?.headers, "application/json"),
})
return remember(batch, "batch")
}

// A real error (4xx/5xx with the run's JSON envelope). Hand it back untouched so the
// AI-SDK transport throws its body text and `useChat` renders it inline.
return remember(res, "stream")
}

const resolvedMode = (stream?: ReadableStream<Uint8Array> | null): AgentChannelMode => {
if (stream) {
const mode = modes.get(stream)
if (mode) return mode
}
return last
}

return {fetch, resolvedMode}
}
Comment thread
ardaerzin marked this conversation as resolved.
Original file line number Diff line number Diff line change
Expand Up @@ -4,11 +4,13 @@ export type AgentChannelMode = "stream" | "batch"

/**
* How the agent playground talks to the agent `/invoke` endpoint:
* - `stream` (default): the real-time SSE UIMessage stream `useChat` renders token-by-token.
* - `batch`: a single JSON response (`WorkflowBatchResponse`); the transport replays it as a
* one-shot UIMessage stream, so the reply lands in one frame instead of streaming.
* - `stream` (default): request the real-time SSE UIMessage stream `useChat` renders
* token-by-token. If the backend can't stream (the handler can only batch → 406), the
* transport's `createNegotiatingFetch` middleware transparently falls back to a batch.
* - `batch`: skip the stream attempt and request a single JSON `WorkflowBatchResponse` up
* front; the transport replays it as a one-shot UIMessage stream so it lands in one frame.
*
* This is a transport/controller concern (which channel the playground speaks), NOT revision
* This is a transport/controller concern (which channel the playground PREFERS), NOT revision
* config — it is never persisted on the agent revision. `buildAgentRequest` reads it to set the
* `Accept` header; the playground kebab menu writes it. Stream is the default for agents.
*/
Expand Down
2 changes: 2 additions & 0 deletions web/packages/agenta-playground/src/state/execution/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -354,6 +354,8 @@ export {
export {buildAgentRequest, buildAgentReferences, type AgentRequest} from "./agentRequest"
// Stream vs batch response channel for the agent lane (read by buildAgentRequest's Accept header).
export {agentChannelModeAtom, type AgentChannelMode} from "./channelMode"
// Transport negotiation: try stream, fall back to batch on 406, error gracefully otherwise.
export {createNegotiatingFetch, type NegotiatingFetch} from "./agentNegotiation"
// Agent-lane HITL resume predicate (approve AND deny both resume the conversation).
export {agentShouldResumeAfterApproval} from "./agentApprovalResume"
// Agent-lane queued-message release gate (never releases mid-HITL or pre-resume).
Expand Down
1 change: 1 addition & 0 deletions web/packages/agenta-playground/src/state/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -177,6 +177,7 @@ export {appTypeAtom, isChatModeAtom, type AppType} from "./execution"
export {isAgentModeAtomFamily} from "./execution"
export {buildAgentRequest, buildAgentReferences, type AgentRequest} from "./execution"
export {agentChannelModeAtom, type AgentChannelMode} from "./execution"
export {createNegotiatingFetch, type NegotiatingFetch} from "./execution"
export {agentShouldResumeAfterApproval} from "./execution"
export {canReleaseQueuedMessage, isHitlPending} from "./execution"

Expand Down
97 changes: 97 additions & 0 deletions web/packages/agenta-playground/tests/unit/agentNegotiation.test.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,97 @@
/**
* Unit tests for `createNegotiatingFetch` — the agent lane's stream→batch→error negotiation.
*
* Guards the three outcomes JP's fixed-toggle cut couldn't express, each of which fails
* SILENTLY (a stream parser fed batch JSON, or a swallowed error) if untested:
* 1. stream honoured → parse as SSE (resolvedMode "stream")
* 2. 406 (can't stream)→ re-request as batch (resolvedMode "batch", Accept flips to json)
* 3. real error (5xx) → pass through untouched so useChat surfaces it inline
* Plus: an explicit batch request skips the stream attempt, and a server that answers a
* stream request with batch JSON is trusted by Content-Type.
*/
import {describe, expect, it, vi} from "vitest"

import {createNegotiatingFetch} from "../../src/state/execution/agentNegotiation"

const res = (body: string, init: {status?: number; contentType?: string}) =>
new Response(body, {
status: init.status ?? 200,
headers: init.contentType ? {"content-type": init.contentType} : undefined,
})

const acceptOf = (init: RequestInit | undefined) =>
new Headers(init?.headers as HeadersInit).get("accept") ?? ""

describe("createNegotiatingFetch", () => {
it("honours a stream when the server answers text/event-stream", async () => {
const base = vi.fn(async () => res("data: {}\n\n", {contentType: "text/event-stream"}))
const {fetch, resolvedMode} = createNegotiatingFetch(
base as unknown as typeof globalThis.fetch,
)

const r = await fetch("/invoke", {headers: {Accept: "text/event-stream"}})

expect(r.status).toBe(200)
expect(resolvedMode()).toBe("stream")
expect(base).toHaveBeenCalledTimes(1)
})

it("falls back to a batch re-request when the server 406s a stream request", async () => {
const base = vi
.fn()
.mockResolvedValueOnce(res("not acceptable", {status: 406}))
.mockResolvedValueOnce(res('{"data":{}}', {contentType: "application/json"}))
const {fetch, resolvedMode} = createNegotiatingFetch(
base as unknown as typeof globalThis.fetch,
)

const r = await fetch("/invoke", {headers: {Accept: "text/event-stream"}})

expect(r.status).toBe(200)
expect(resolvedMode()).toBe("batch")
expect(base).toHaveBeenCalledTimes(2)
// the retry asked for batch JSON, dropping the stream Accept
expect(acceptOf(base.mock.calls[1][1])).toBe("application/json")
})

it("passes a real error (5xx) through untouched without a second request", async () => {
const base = vi.fn(async () =>
res('{"status":{"code":500,"message":"boom"}}', {
status: 500,
contentType: "application/json",
}),
)
const {fetch} = createNegotiatingFetch(base as unknown as typeof globalThis.fetch)

const r = await fetch("/invoke", {headers: {Accept: "text/event-stream"}})

expect(r.status).toBe(500)
expect(await r.text()).toContain("boom")
expect(base).toHaveBeenCalledTimes(1) // no batch fallback on a non-406 error
})

it("skips the stream attempt when batch is requested explicitly", async () => {
const base = vi.fn(async () => res('{"data":{}}', {contentType: "application/json"}))
const {fetch, resolvedMode} = createNegotiatingFetch(
base as unknown as typeof globalThis.fetch,
)

await fetch("/invoke", {headers: {Accept: "application/json"}})

expect(resolvedMode()).toBe("batch")
expect(base).toHaveBeenCalledTimes(1)
expect(acceptOf(base.mock.calls[0][1])).toBe("application/json")
})

it("trusts the response Content-Type: a stream request answered with JSON resolves to batch", async () => {
const base = vi.fn(async () => res('{"data":{}}', {contentType: "application/json"}))
const {fetch, resolvedMode} = createNegotiatingFetch(
base as unknown as typeof globalThis.fetch,
)

await fetch("/invoke", {headers: {Accept: "text/event-stream"}})

expect(resolvedMode()).toBe("batch")
expect(base).toHaveBeenCalledTimes(1)
})
})
Loading