-
Notifications
You must be signed in to change notification settings - Fork 1
feat(runtime): ctx.relay — agent-to-agent messaging over the relay (#254 pt.2) #256
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,98 @@ | ||
| import test from 'node:test'; | ||
| import assert from 'node:assert/strict'; | ||
|
|
||
| import { buildRelayContext, DEFAULT_RELAYCAST_URL } from './relay.js'; | ||
|
|
||
| const noopLog = () => {}; | ||
|
|
||
| type Captured = { url: string; init: RequestInit }; | ||
|
|
||
| /** Install a capturing fetch stub; returns the captured calls + a restore fn. */ | ||
| function stubFetch(response: () => Response): { calls: Captured[]; restore: () => void } { | ||
| const calls: Captured[] = []; | ||
| const original = globalThis.fetch; | ||
| globalThis.fetch = (async (url: string | URL, init: RequestInit = {}) => { | ||
| calls.push({ url: String(url), init }); | ||
| return response(); | ||
| }) as typeof fetch; | ||
| return { calls, restore: () => { globalThis.fetch = original; } }; | ||
| } | ||
|
|
||
| function withEnv(vars: Record<string, string | undefined>, fn: () => Promise<void>): Promise<void> { | ||
| const keys = ['RELAYCAST_URL', 'RELAY_BASE_URL', 'WORKFORCE_AGENT_TOKEN', 'RELAY_AGENT_TOKEN', 'RELAY_API_KEY']; | ||
| const saved: Record<string, string | undefined> = {}; | ||
| for (const k of keys) { saved[k] = process.env[k]; delete process.env[k]; } | ||
| for (const [k, v] of Object.entries(vars)) { if (v !== undefined) process.env[k] = v; } | ||
| return fn().finally(() => { | ||
| for (const k of keys) { saved[k] === undefined ? delete process.env[k] : (process.env[k] = saved[k]); } | ||
| }); | ||
| } | ||
|
|
||
| test('dm posts /v1/dm with bearer agent token + {to,text}, unwraps {ok,data} id', async () => { | ||
| await withEnv({ RELAY_AGENT_TOKEN: 'tok_agent' }, async () => { | ||
| const { calls, restore } = stubFetch(() => | ||
| new Response(JSON.stringify({ ok: true, data: { message: { id: 'm1' } } }), { status: 200 }) | ||
| ); | ||
| try { | ||
| const relay = buildRelayContext(noopLog); | ||
| const res = await relay.dm('peer-agent', 'hello over relay'); | ||
| assert.deepEqual(res, { ok: true, messageId: 'm1' }); | ||
| assert.equal(calls.length, 1); | ||
| assert.equal(calls[0].url, `${DEFAULT_RELAYCAST_URL}/v1/dm`); | ||
| assert.equal(calls[0].init.method, 'POST'); | ||
| assert.equal((calls[0].init.headers as Record<string, string>).authorization, 'Bearer tok_agent'); | ||
| assert.deepEqual(JSON.parse(String(calls[0].init.body)), { to: 'peer-agent', text: 'hello over relay' }); | ||
| } finally { | ||
| restore(); | ||
| } | ||
| }); | ||
| }); | ||
|
|
||
| test('agent token precedence: WORKFORCE_AGENT_TOKEN over RELAY_API_KEY', async () => { | ||
| await withEnv({ WORKFORCE_AGENT_TOKEN: 'tok_wf', RELAY_API_KEY: 'rk_live_x' }, async () => { | ||
| const { calls, restore } = stubFetch(() => new Response(JSON.stringify({ ok: true, data: { id: 'm2' } }), { status: 200 })); | ||
| try { | ||
| await buildRelayContext(noopLog).dm('p', 'hi'); | ||
| assert.equal((calls[0].init.headers as Record<string, string>).authorization, 'Bearer tok_wf'); | ||
| } finally { | ||
| restore(); | ||
| } | ||
| }); | ||
| }); | ||
|
|
||
| test('RELAYCAST_URL overrides the default gateway (trailing slash trimmed)', async () => { | ||
| await withEnv({ RELAY_API_KEY: 'rk', RELAYCAST_URL: 'https://cast.example.com/' }, async () => { | ||
| const { calls, restore } = stubFetch(() => new Response(JSON.stringify({ ok: true, data: { id: 'm' } }), { status: 200 })); | ||
| try { | ||
| await buildRelayContext(noopLog).post('general', 'yo'); | ||
| assert.equal(calls[0].url, 'https://cast.example.com/v1/channels/general/messages'); | ||
| assert.deepEqual(JSON.parse(String(calls[0].init.body)), { text: 'yo' }); | ||
| } finally { | ||
| restore(); | ||
| } | ||
| }); | ||
| }); | ||
|
|
||
| test('no agent token → {ok:false} and no fetch', async () => { | ||
| await withEnv({}, async () => { | ||
| const { calls, restore } = stubFetch(() => new Response('{}', { status: 200 })); | ||
| try { | ||
| const res = await buildRelayContext(noopLog).dm('p', 'hi'); | ||
| assert.deepEqual(res, { ok: false }); | ||
| assert.equal(calls.length, 0); | ||
| } finally { | ||
| restore(); | ||
| } | ||
| }); | ||
| }); | ||
|
|
||
| test('non-2xx response → {ok:false}', async () => { | ||
| await withEnv({ RELAY_AGENT_TOKEN: 'tok' }, async () => { | ||
| const { restore } = stubFetch(() => new Response('nope', { status: 401 })); | ||
| try { | ||
| assert.deepEqual(await buildRelayContext(noopLog).dm('p', 'hi'), { ok: false }); | ||
| } finally { | ||
| restore(); | ||
| } | ||
| }); | ||
| }); | ||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,95 @@ | ||
| import type { RelayContext, RelaySendResult, WorkforceCtx } from './types.js'; | ||
|
|
||
| /** | ||
| * Canonical relaycast gateway — SINGLE SOURCE OF TRUTH for the runtime relay | ||
| * client. Override per-env via `RELAYCAST_URL` (preferred) or `RELAY_BASE_URL` | ||
| * (the cloud launcher injects the latter, minted-against value into the box). | ||
| * Mirrors `@agentworkforce/delivery`'s default; kept inline here because | ||
| * delivery depends on runtime (importing it back would be circular). | ||
| */ | ||
| export const DEFAULT_RELAYCAST_URL = 'https://cast.agentrelay.com'; | ||
|
|
||
| type Log = WorkforceCtx['log']; | ||
|
|
||
| function resolveRelaycastUrl(env: NodeJS.ProcessEnv): string { | ||
| const raw = env.RELAYCAST_URL?.trim() || env.RELAY_BASE_URL?.trim() || DEFAULT_RELAYCAST_URL; | ||
| return raw.replace(/\/+$/, ''); | ||
| } | ||
|
|
||
| /** | ||
| * Relaycast agent actions (`/v1/dm`, channel posts) are authenticated with the | ||
| * AGENT token, not the workspace key — prefer it, falling back to the workspace | ||
| * `RELAY_API_KEY` so single-identity boxes and tests still work. | ||
| */ | ||
| function resolveAgentToken(env: NodeJS.ProcessEnv): string | undefined { | ||
| return ( | ||
| env.WORKFORCE_AGENT_TOKEN?.trim() || | ||
| env.RELAY_AGENT_TOKEN?.trim() || | ||
| env.RELAY_API_KEY?.trim() || | ||
|
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
When the sandbox provides per-agent credentials via Useful? React with 👍 / 👎. |
||
| undefined | ||
| ); | ||
| } | ||
|
Comment on lines
+24
to
+31
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. The token resolution logic here is missing several fallback environment variables that are supported by the rest of the runtime (specifically in function tokenFromAgentTokenMap(raw: string | undefined, agentName: string | undefined): string | undefined {
const value = raw?.trim();
if (!value) return undefined;
if (!value.startsWith('{')) return value;
try {
const parsed = JSON.parse(value) as unknown;
if (typeof parsed !== 'object' || parsed === null || Array.isArray(parsed)) return undefined;
const tokens = parsed as Record<string, unknown>;
const namedToken = agentName ? tokens[agentName] : undefined;
if (typeof namedToken === 'string' && namedToken.trim()) return namedToken.trim();
const singleToken = Object.values(tokens).find((entry): entry is string => typeof entry === 'string' && entry.trim().length > 0);
return singleToken?.trim();
} catch {
return undefined;
}
}
function resolveAgentToken(env: NodeJS.ProcessEnv): string | undefined {
return (
env.WORKFORCE_AGENT_TOKEN?.trim() ||
env.RELAY_AGENT_TOKEN?.trim() ||
env.RELAYFILE_TOKEN?.trim() ||
tokenFromAgentTokenMap(env.RELAY_AGENT_TOKENS, env.RELAY_AGENT_NAME) ||
env.RELAY_API_KEY?.trim() ||
env.WORKFORCE_WORKSPACE_TOKEN?.trim() ||
undefined
);
} |
||
|
|
||
| const RELAY_TIMEOUT_MS = 8_000; | ||
|
|
||
| async function fetchWithTimeout(url: string, init: RequestInit): Promise<Response | undefined> { | ||
| const controller = new AbortController(); | ||
| const timer = setTimeout(() => controller.abort(), RELAY_TIMEOUT_MS); | ||
| try { | ||
| return await fetch(url, { ...init, signal: controller.signal }); | ||
| } catch { | ||
| return undefined; | ||
| } finally { | ||
| clearTimeout(timer); | ||
| } | ||
| } | ||
|
Comment on lines
+35
to
+45
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
Comment on lines
+35
to
+45
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. 🩺 Stability & Availability | 🟠 Major | ⚡ Quick win Timeout does not cover response-body reading.
Consider keeping the abort signal armed until the body is consumed, e.g. read the body inside the timeout-guarded scope (or only clear the timer after 🔧 One option: parse inside the guarded scope-async function fetchWithTimeout(url: string, init: RequestInit): Promise<Response | undefined> {
- const controller = new AbortController();
- const timer = setTimeout(() => controller.abort(), RELAY_TIMEOUT_MS);
- try {
- return await fetch(url, { ...init, signal: controller.signal });
- } catch {
- return undefined;
- } finally {
- clearTimeout(timer);
- }
-}
+async function fetchWithTimeout(
+ url: string,
+ init: RequestInit
+): Promise<{ status: number; ok: boolean; json: () => Promise<unknown> } | undefined> {
+ const controller = new AbortController();
+ const timer = setTimeout(() => controller.abort(), RELAY_TIMEOUT_MS);
+ try {
+ const res = await fetch(url, { ...init, signal: controller.signal });
+ // Buffer the body while the abort signal is still armed.
+ const text = await res.text();
+ return {
+ status: res.status,
+ ok: res.ok,
+ json: async () => JSON.parse(text)
+ };
+ } catch {
+ return undefined;
+ } finally {
+ clearTimeout(timer);
+ }
+}(Adjust 🤖 Prompt for AI Agents |
||
|
|
||
| /** | ||
| * Build the relay context from the box environment. Always safe to call: when | ||
| * no agent token is present every send returns `{ ok: false }` (logged). | ||
| */ | ||
| export function buildRelayContext(log: Log, env: NodeJS.ProcessEnv = process.env): RelayContext { | ||
| const token = resolveAgentToken(env); | ||
| const baseUrl = resolveRelaycastUrl(env); | ||
|
|
||
| async function send(path: string, body: unknown, action: string): Promise<RelaySendResult> { | ||
| if (!token) { | ||
| log('warn', `relay.${action}.no-token`, { | ||
| reason: 'no agent token (WORKFORCE_AGENT_TOKEN/RELAY_AGENT_TOKEN/RELAY_API_KEY) in the box' | ||
| }); | ||
| return { ok: false }; | ||
| } | ||
| const res = await fetchWithTimeout(`${baseUrl}${path}`, { | ||
| method: 'POST', | ||
| headers: { authorization: `Bearer ${token}`, 'content-type': 'application/json' }, | ||
| body: JSON.stringify(body) | ||
| }); | ||
| if (!res) { | ||
| log('warn', `relay.${action}.failed`, { reason: 'timeout or network error' }); | ||
| return { ok: false }; | ||
| } | ||
| if (!res.ok) { | ||
| log('warn', `relay.${action}.failed`, { status: res.status }); | ||
| return { ok: false }; | ||
| } | ||
| // Relaycast REST wraps success as `{ ok, data }`; ids live under | ||
| // data.message.id / data.id. Unwrap before reading. | ||
| const json = (await res.json().catch(() => null)) as Record<string, unknown> | null; | ||
| const data = | ||
| json && typeof json.data === 'object' && json.data !== null | ||
| ? (json.data as Record<string, unknown>) | ||
| : json; | ||
| const message = | ||
| data && typeof data.message === 'object' && data.message !== null | ||
| ? (data.message as Record<string, unknown>) | ||
| : undefined; | ||
| const rawId = message?.id ?? data?.messageId ?? data?.id; | ||
| return { ok: true, messageId: rawId != null ? String(rawId) : undefined }; | ||
| } | ||
|
Comment on lines
+55
to
+88
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Inline the timeout and abort controller logic here to ensure the 8-second timeout covers the entire request lifecycle, including reading and parsing the response body via async function send(path: string, body: unknown, action: string): Promise<RelaySendResult> {
if (!token) {
log('warn', `relay.${action}.no-token`, {
reason: 'no agent token (WORKFORCE_AGENT_TOKEN/RELAY_AGENT_TOKEN/RELAY_API_KEY) in the box'
});
return { ok: false };
}
const controller = new AbortController();
const timer = setTimeout(() => controller.abort(), RELAY_TIMEOUT_MS);
try {
const res = await fetch(`${baseUrl}${path}`, {
method: 'POST',
headers: { authorization: `Bearer ${token}`, 'content-type': 'application/json' },
body: JSON.stringify(body),
signal: controller.signal
});
if (!res.ok) {
log('warn', `relay.${action}.failed`, { status: res.status });
return { ok: false };
}
const json = (await res.json().catch(() => null)) as Record<string, unknown> | null;
const data =
json && typeof json.data === 'object' && json.data !== null
? (json.data as Record<string, unknown>)
: json;
const message =
data && typeof data.message === 'object' && data.message !== null
? (data.message as Record<string, unknown>)
: undefined;
const rawId = message?.id ?? data?.messageId ?? data?.id;
return { ok: true, messageId: rawId != null ? String(rawId) : undefined };
} catch (err) {
const isTimeout = err instanceof Error && err.name === 'AbortError';
log('warn', `relay.${action}.failed`, { reason: isTimeout ? 'timeout' : 'network error' });
return { ok: false };
} finally {
clearTimeout(timer);
}
} |
||
|
|
||
| return { | ||
| dm: (to: string, text: string) => send('/v1/dm', { to, text }, 'dm'), | ||
| post: (channel: string, text: string) => | ||
| send(`/v1/channels/${encodeURIComponent(channel)}/messages`, { text }, 'post') | ||
| }; | ||
| } | ||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Since we are expanding
resolveAgentTokento support additional environment variables (RELAYFILE_TOKEN,RELAY_AGENT_TOKENS,RELAY_AGENT_NAME, andWORKFORCE_WORKSPACE_TOKEN), we must also clean them up in the test helperwithEnv. Otherwise, if any of these variables are set in the local development environment, tests like'no agent token → {ok:false} and no fetch'will fail due to unexpected token resolution.