diff --git a/.changeset/live-websocket-seam.md b/.changeset/live-websocket-seam.md new file mode 100644 index 0000000..6735827 --- /dev/null +++ b/.changeset/live-websocket-seam.md @@ -0,0 +1,5 @@ +--- +"sideshow": minor +--- + +Add a server `onEvent` feed tap for hosts and a `liveTransport: "ws"` viewer-embed option so hosted wrappers can provide hibernation-friendly WebSocket live updates while self-hosted sideshow keeps using SSE by default. diff --git a/e2e/embed-ws.spec.ts b/e2e/embed-ws.spec.ts new file mode 100644 index 0000000..f8b070e --- /dev/null +++ b/e2e/embed-ws.spec.ts @@ -0,0 +1,157 @@ +// End-to-end proof that an embedding host can opt the viewer engine into the +// WebSocket live-update transport while keeping the same event payloads and +// reconciliation behavior as the default EventSource path. +import { expect, publish, serveEmbedBundle, test } from "./fixtures.ts"; + +const embedHtml = (sessionId: string) => ` +
+ + +`; + +test("embedded engine: liveTransport:'ws' applies events, reconnects, and heartbeats", async ({ + page, + server, +}) => { + const first = await publish( + server.url, + { html: "first websocket card
", title: "First WS", agent: "e2e" }, + "", + ); + + page.on("pageerror", (e) => console.error("[pageerror]", e.message)); + page.on("console", (m) => m.type() === "error" && console.error("[console]", m.text())); + + await page.route("**/__embedtest", (route) => + route.fulfill({ contentType: "text/html", body: embedHtml(first.sessionId) }), + ); + await serveEmbedBundle(page); + + await page.goto(`${server.url}/__embedtest`); + await expect(page.locator(".card-title")).toContainText("First WS"); + + await expect + .poll(() => page.evaluate(() => window.__wsHarness.urls[0])) + .toContain(`/api/events?session=${encodeURIComponent(first.sessionId)}`); + await expect.poll(() => page.evaluate(() => window.__wsHarness.sent.includes("ping"))).toBe(true); + + const second = await publish( + server.url, + { + html: "second websocket card
", + title: "Second WS", + agent: "e2e", + session: first.sessionId, + }, + "", + ); + await page.evaluate((event) => window.__wsHarness.deliver(event), { + type: "post-created", + id: second.id, + sessionId: second.sessionId, + version: second.version, + }); + await expect(page.locator(".card-title")).toContainText(["First WS", "Second WS"]); + + await page.evaluate(() => window.__wsHarness.closeLatest()); + const third = await publish( + server.url, + { + html: "third websocket card
", + title: "Third WS", + agent: "e2e", + session: first.sessionId, + }, + "", + ); + expect(third.sessionId).toBe(first.sessionId); + + await expect.poll(() => page.evaluate(() => window.__wsHarness.urls.length)).toBeGreaterThan(1); + await expect(page.locator(".card-title")).toContainText(["First WS", "Second WS", "Third WS"]); + + const socketsBeforeDispose = await page.evaluate(() => window.__wsHarness.urls.length); + await page.evaluate(() => window.__viewerHandle.dispose()); + await expect.poll(() => page.evaluate(() => window.__wsHarness.closedCount())).toBeGreaterThan(1); + await page.waitForTimeout(1100); + expect(await page.evaluate(() => window.__wsHarness.urls.length)).toBe(socketsBeforeDispose); +}); + +declare global { + interface Window { + __viewerHandle: { dispose(): void }; + __wsHarness: { + sent: string[]; + urls: string[]; + closedCount(): number; + deliver(event: unknown): void; + closeLatest(): void; + }; + } +} diff --git a/server/app.ts b/server/app.ts index 5d957cb..f208132 100644 --- a/server/app.ts +++ b/server/app.ts @@ -3,7 +3,7 @@ import { bodyLimit } from "hono/body-limit"; import { getCookie, setCookie } from "hono/cookie"; import { streamSSE } from "hono/streaming"; import { decodeBase64 } from "./base64.ts"; -import { EventBus } from "./events.ts"; +import { EventBus, type FeedEvent } from "./events.ts"; import { kitSummaries } from "./kits.ts"; import { registerMcp } from "./mcpHttp.ts"; import { @@ -34,6 +34,8 @@ import { } from "./types.ts"; import { validateSurfaces } from "./postSurfaces.ts"; +export type { FeedEvent } from "./events.ts"; + const MAX_SURFACE_BYTES = 2 * 1024 * 1024; const MAX_WAIT_SECONDS = 300; // Hard ceiling on any request body, applied globally. Every write endpoint @@ -157,6 +159,10 @@ export interface AppOptions { upgradeCommand?: string; // Test seam: replaces the npm-registry/GitHub lookup for the latest release. fetchLatestRelease?: () => Promisehi
", agent: "pi", title: "First" }), + ); + assert.equal(res.status, 201); + const post = (await res.json()) as { id: string; sessionId: string; version: number }; + + assert.deepEqual(events, [ + { type: "session-created", id: post.sessionId }, + { type: "post-created", id: post.id, sessionId: post.sessionId, version: post.version }, + ]); +}); + +test("onEvent errors do not fail writes", async () => { + const warn = console.warn; + console.warn = () => {}; + try { + const app = makeApp(undefined, { + onEvent: () => { + throw new Error("fanout failed"); + }, + }); + + const res = await app.request( + "/api/snippets", + json({ html: "hi
", agent: "pi", title: "First" }), + ); + assert.equal(res.status, 201); + const post = (await res.json()) as { sessionId: string }; + const posts = (await ( + await app.request(`/api/sessions/${post.sessionId}/posts`) + ).json()) as unknown[]; + assert.equal(posts.length, 1); + } finally { + console.warn = warn; + } +}); + test("publish into an existing session groups snippets", async () => { const app = makeApp(); const first = (await ( diff --git a/viewer/embed.d.ts b/viewer/embed.d.ts index cb0b72a..41b973e 100644 --- a/viewer/embed.d.ts +++ b/viewer/embed.d.ts @@ -3,6 +3,7 @@ // surface so hosts get types without depending on the viewer source. export type Route = { sessionId?: string | null; surfaceId?: string | null }; +export type LiveTransport = "sse" | "ws"; export interface HostRouter { /** The route the engine should render. */ @@ -30,6 +31,11 @@ export interface SideshowHost { * Orthogonal to `layout`. Self-hosted drives the same flag via a window global. */ readonly?: boolean; + /** + * Live-update transport. Defaults to SSE; embedders can opt into WebSocket + * when their host implements `/api/events` as a hibernatable socket. + */ + liveTransport?: LiveTransport; /** * Whether this deployment can render a surface as a PNG (the /s/:id.png route). * That route needs Cloudflare Browser Rendering, so it exists only on a Workers diff --git a/viewer/src/App.tsx b/viewer/src/App.tsx index f6153ad..e2dddac 100644 --- a/viewer/src/App.tsx +++ b/viewer/src/App.tsx @@ -117,7 +117,8 @@ export default function App() { setInitialLoaded(true); host().onReady?.(); }); - connect(); + const disconnect = connect(); + onCleanup(disconnect); checkVersion(); void initTheme(); const timer = setInterval(() => { diff --git a/viewer/src/embed.tsx b/viewer/src/embed.tsx index 223b3e6..5fbb68d 100644 --- a/viewer/src/embed.tsx +++ b/viewer/src/embed.tsx @@ -9,7 +9,7 @@ import App from "./App.tsx"; import { createDefaultHost, setEngine, type SideshowHost } from "./host.ts"; import stylesCss from "./styles.css?inline"; -export type { SideshowHost, HostRouter, Route, SlotName } from "./host.ts"; +export type { SideshowHost, HostRouter, Route, SlotName, LiveTransport } from "./host.ts"; // Runtime registry of host-overridable slot names (embedders project light DOM // with these `slot=` attributes). Exported as a value so embedders share one // source of truth instead of hardcoding the strings. diff --git a/viewer/src/host.ts b/viewer/src/host.ts index 7b2e588..2bac1f8 100644 --- a/viewer/src/host.ts +++ b/viewer/src/host.ts @@ -12,6 +12,7 @@ import type { ThemeTokens } from "../../server/theme-tokens.ts"; export type Route = { sessionId?: string | null; surfaceId?: string | null }; +export type LiveTransport = "sse" | "ws"; export interface HostRouter { // The current route the engine should render. @@ -39,6 +40,9 @@ export interface SideshowHost { // connect action). Orthogonal to `layout` — a host can have either without the // other. Self-hosted drives the same flag via window.__SIDESHOW_READONLY__. readonly?: boolean; + // Live-update transport. Self-hosted defaults to SSE; embedders can opt into + // WebSocket when their host implements `/api/events` as a hibernatable socket. + liveTransport?: LiveTransport; // Whether this deployment can render a surface as a PNG (the /s/:id.png route). // That route is served only by a Cloudflare Worker with the Browser Rendering // binding; a plain Node server (local dev, `npm start`) has no way to drive a diff --git a/viewer/src/state.ts b/viewer/src/state.ts index 2976ea8..44064e0 100644 --- a/viewer/src/state.ts +++ b/viewer/src/state.ts @@ -448,14 +448,63 @@ interface FeedEvent { surfaceId?: string | null; } -export function connect() { +const WS_HEARTBEAT_MS = 30_000; +const WS_RECONNECT_MS = 1000; + +function eventsPath(): string { const route = host().router.get(); const sessionId = route.sessionId ?? selected(); - const eventsPath = - isReadonly() && publicReadMode() === "session" && sessionId - ? `/api/events?session=${encodeURIComponent(sessionId)}` - : "/api/events"; - const es = new EventSource(appPath(eventsPath)); + return isReadonly() && publicReadMode() === "session" && sessionId + ? `/api/events?session=${encodeURIComponent(sessionId)}` + : "/api/events"; +} + +function wsAppUrl(path: string): string { + const url = new URL(appPath(path), window.location.href); + url.protocol = url.protocol === "https:" ? "wss:" : "ws:"; + return url.href; +} + +async function handleFeedData(data: string) { + if (data === "pong") return; + const e = JSON.parse(data) as FeedEvent; + // activity the user isn't looking at — other session or hidden tab — + // marks the session unread, which also badges the tab title + const away = e.sessionId != null && (e.sessionId !== selected() || document.hidden); + if (e.type === "theme-changed") { + applyTheme(e.id); + } else if (e.type.startsWith("session-")) { + await refreshSessions(); + } else if (e.type === "post-created" || e.type === "post-updated") { + if (away && e.sessionId) markUnread(e.sessionId); + if (e.sessionId === selected()) await upsertPost(e.id); + await refreshSessionsQuiet(); + } else if (e.type === "post-deleted") { + const idx = posts.findIndex((s) => s.id === e.id); + if (idx >= 0) setPostsInternal(produce((arr) => arr.splice(idx, 1))); + await refreshSessionsQuiet(); + } else if (e.type === "trace-updated") { + // the agent working is ambient, not an alert — refetch quietly, no badge + if (e.sessionId === selected()) await fetchTrace(e.sessionId); + } else if (e.type === "comment-created") { + if (away && e.sessionId) markUnread(e.sessionId); + if (e.sessionId === selected()) { + const query = e.surfaceId ? `surface=${e.surfaceId}` : `session=${e.sessionId}`; + const res = await api<{ comments: Comment[] }>(`/api/comments?${query}`); + mergeComments(res.comments); + } + } else if (e.type === "comment-deleted") { + setCommentsInternal((prev) => prev.filter((c) => c.id !== e.id)); + } +} + +export function connect(): () => void { + if (host().liveTransport === "ws") return connectWebSocket(); + return connectSse(); +} + +function connectSse(): () => void { + const es = new EventSource(appPath(eventsPath())); let everConnected = false; es.onopen = async () => { setLiveInternal(true); @@ -465,40 +514,63 @@ export function connect() { everConnected = true; }; es.onerror = () => setLiveInternal(false); - es.onmessage = async (ev) => { - const e = JSON.parse(ev.data) as FeedEvent; - // activity the user isn't looking at — other session or hidden tab — - // marks the session unread, which also badges the tab title - const away = e.sessionId != null && (e.sessionId !== selected() || document.hidden); - if (e.type === "theme-changed") { - applyTheme(e.id); - } else if (e.type.startsWith("session-")) { - await refreshSessions(); - } else if (e.type === "post-created" || e.type === "post-updated") { - if (away && e.sessionId) markUnread(e.sessionId); - if (e.sessionId === selected()) await upsertPost(e.id); - await refreshSessionsQuiet(); - } else if (e.type === "post-deleted") { - const idx = posts.findIndex((s) => s.id === e.id); - if (idx >= 0) setPostsInternal(produce((arr) => arr.splice(idx, 1))); - await refreshSessionsQuiet(); - } else if (e.type === "trace-updated") { - // the agent working is ambient, not an alert — refetch quietly, no badge - if (e.sessionId === selected()) await fetchTrace(e.sessionId); - } else if (e.type === "comment-created") { - if (away && e.sessionId) markUnread(e.sessionId); - if (e.sessionId === selected()) { - const query = e.surfaceId ? `surface=${e.surfaceId}` : `session=${e.sessionId}`; - const res = await api<{ comments: Comment[] }>(`/api/comments?${query}`); - mergeComments(res.comments); - } - } else if (e.type === "comment-deleted") { - setCommentsInternal((prev) => prev.filter((c) => c.id !== e.id)); - } + es.onmessage = (ev) => void handleFeedData(ev.data); + return () => { + es.close(); + setLiveInternal(false); + }; +} + +function connectWebSocket(): () => void { + const url = wsAppUrl(eventsPath()); + let everConnected = false; + let closed = false; + let ws: WebSocket | undefined; + let heartbeat: ReturnType