Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions examples/notion-essay-pr/notion-essay-pr.smoke.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -252,6 +252,10 @@ class MockNotionEssayRuntime {
async error() {},
async done() {}
},
relay: {
async dm() { return { ok: false }; },
async post() { return { ok: false }; }
},
log: () => undefined
};
}
Expand Down
5 changes: 5 additions & 0 deletions packages/runtime/src/ctx.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import type {
FilesContext,
CredentialsContext,
MemoryItem,
RelayContext,
RequiredRuntimeCredentials,
ScheduleContext,
SandboxContext,
Expand All @@ -14,6 +15,7 @@ import type {
WorkflowContext
} from './types.js';
import { attachTrajectoryRecorder, createTrajectoryRecorder } from './trajectory.js';
import { buildRelayContext } from './relay.js';

type AgentInputValue = string | number | boolean | null | undefined;

Expand Down Expand Up @@ -53,6 +55,7 @@ export interface CtxBuildOptions {
memory?: MemoryContext;
workflow?: WorkflowContext;
schedule?: ScheduleContext;
relay?: RelayContext;
integrations?: Record<string, unknown>;
log?: WorkforceCtx['log'];
harnessRunner: WorkforceCtx['harness']['run'];
Expand Down Expand Up @@ -166,6 +169,7 @@ export function buildCtx(options: CtxBuildOptions): WorkforceCtx {
memory: options.memory ?? defaultMemoryFor(options.persona.memory, options.workspaceId, log),
workflow: options.workflow ?? UNAVAILABLE_WORKFLOW,
schedule: options.schedule ?? UNAVAILABLE_SCHEDULE,
relay: options.relay ?? buildRelayContext(log),
trajectory: trajectoryRecorder.context,
log
};
Expand Down Expand Up @@ -212,6 +216,7 @@ const CORE_CTX_FIELDS: ReadonlySet<string> = new Set([
'memory',
'workflow',
'schedule',
'relay',
'trajectory',
'log'
]);
Expand Down
6 changes: 6 additions & 0 deletions packages/runtime/src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,8 @@ export type {
MemoryItem,
MemoryRecallOptions,
MemorySaveOptions,
RelayContext,
RelaySendResult,
RelayfileCredentials,
RequiredRuntimeCredentials,
SandboxContext,
Expand Down Expand Up @@ -56,6 +58,10 @@ export {
isStartupEvent
} from './types.js';

// Relay (agent-to-agent) client used by ctx.relay; exported for external ctx
// builders and tests.
export { buildRelayContext, DEFAULT_RELAYCAST_URL } from './relay.js';

// Runtime envelope helpers shared by provider-triggered agents.
export {
unwrapResourceRecord
Expand Down
98 changes: 98 additions & 0 deletions packages/runtime/src/relay.test.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,98 @@
import test from 'node:test';
import assert from 'node:assert/strict';

import { buildRelayContext, DEFAULT_RELAYCAST_URL } from './relay.js';

const noopLog = () => {};

type Captured = { url: string; init: RequestInit };

/** Install a capturing fetch stub; returns the captured calls + a restore fn. */
function stubFetch(response: () => Response): { calls: Captured[]; restore: () => void } {
const calls: Captured[] = [];
const original = globalThis.fetch;
globalThis.fetch = (async (url: string | URL, init: RequestInit = {}) => {
calls.push({ url: String(url), init });
return response();
}) as typeof fetch;
return { calls, restore: () => { globalThis.fetch = original; } };
}

function withEnv(vars: Record<string, string | undefined>, fn: () => Promise<void>): Promise<void> {
const keys = ['RELAYCAST_URL', 'RELAY_BASE_URL', 'WORKFORCE_AGENT_TOKEN', 'RELAY_AGENT_TOKEN', 'RELAY_API_KEY'];

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

Since we are expanding resolveAgentToken to support additional environment variables (RELAYFILE_TOKEN, RELAY_AGENT_TOKENS, RELAY_AGENT_NAME, and WORKFORCE_WORKSPACE_TOKEN), we must also clean them up in the test helper withEnv. Otherwise, if any of these variables are set in the local development environment, tests like 'no agent token → {ok:false} and no fetch' will fail due to unexpected token resolution.

Suggested change
const keys = ['RELAYCAST_URL', 'RELAY_BASE_URL', 'WORKFORCE_AGENT_TOKEN', 'RELAY_AGENT_TOKEN', 'RELAY_API_KEY'];
const keys = ['RELAYCAST_URL', 'RELAY_BASE_URL', 'WORKFORCE_AGENT_TOKEN', 'RELAY_AGENT_TOKEN', 'RELAY_API_KEY', 'RELAYFILE_TOKEN', 'RELAY_AGENT_TOKENS', 'RELAY_AGENT_NAME', 'WORKFORCE_WORKSPACE_TOKEN'];

const saved: Record<string, string | undefined> = {};
for (const k of keys) { saved[k] = process.env[k]; delete process.env[k]; }
for (const [k, v] of Object.entries(vars)) { if (v !== undefined) process.env[k] = v; }
return fn().finally(() => {
for (const k of keys) { saved[k] === undefined ? delete process.env[k] : (process.env[k] = saved[k]); }
});
}

test('dm posts /v1/dm with bearer agent token + {to,text}, unwraps {ok,data} id', async () => {
await withEnv({ RELAY_AGENT_TOKEN: 'tok_agent' }, async () => {
const { calls, restore } = stubFetch(() =>
new Response(JSON.stringify({ ok: true, data: { message: { id: 'm1' } } }), { status: 200 })
);
try {
const relay = buildRelayContext(noopLog);
const res = await relay.dm('peer-agent', 'hello over relay');
assert.deepEqual(res, { ok: true, messageId: 'm1' });
assert.equal(calls.length, 1);
assert.equal(calls[0].url, `${DEFAULT_RELAYCAST_URL}/v1/dm`);
assert.equal(calls[0].init.method, 'POST');
assert.equal((calls[0].init.headers as Record<string, string>).authorization, 'Bearer tok_agent');
assert.deepEqual(JSON.parse(String(calls[0].init.body)), { to: 'peer-agent', text: 'hello over relay' });
} finally {
restore();
}
});
});

test('agent token precedence: WORKFORCE_AGENT_TOKEN over RELAY_API_KEY', async () => {
await withEnv({ WORKFORCE_AGENT_TOKEN: 'tok_wf', RELAY_API_KEY: 'rk_live_x' }, async () => {
const { calls, restore } = stubFetch(() => new Response(JSON.stringify({ ok: true, data: { id: 'm2' } }), { status: 200 }));
try {
await buildRelayContext(noopLog).dm('p', 'hi');
assert.equal((calls[0].init.headers as Record<string, string>).authorization, 'Bearer tok_wf');
} finally {
restore();
}
});
});

test('RELAYCAST_URL overrides the default gateway (trailing slash trimmed)', async () => {
await withEnv({ RELAY_API_KEY: 'rk', RELAYCAST_URL: 'https://cast.example.com/' }, async () => {
const { calls, restore } = stubFetch(() => new Response(JSON.stringify({ ok: true, data: { id: 'm' } }), { status: 200 }));
try {
await buildRelayContext(noopLog).post('general', 'yo');
assert.equal(calls[0].url, 'https://cast.example.com/v1/channels/general/messages');
assert.deepEqual(JSON.parse(String(calls[0].init.body)), { text: 'yo' });
} finally {
restore();
}
});
});

test('no agent token → {ok:false} and no fetch', async () => {
await withEnv({}, async () => {
const { calls, restore } = stubFetch(() => new Response('{}', { status: 200 }));
try {
const res = await buildRelayContext(noopLog).dm('p', 'hi');
assert.deepEqual(res, { ok: false });
assert.equal(calls.length, 0);
} finally {
restore();
}
});
});

test('non-2xx response → {ok:false}', async () => {
await withEnv({ RELAY_AGENT_TOKEN: 'tok' }, async () => {
const { restore } = stubFetch(() => new Response('nope', { status: 401 }));
try {
assert.deepEqual(await buildRelayContext(noopLog).dm('p', 'hi'), { ok: false });
} finally {
restore();
}
});
});
95 changes: 95 additions & 0 deletions packages/runtime/src/relay.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,95 @@
import type { RelayContext, RelaySendResult, WorkforceCtx } from './types.js';

/**
* Canonical relaycast gateway — SINGLE SOURCE OF TRUTH for the runtime relay
* client. Override per-env via `RELAYCAST_URL` (preferred) or `RELAY_BASE_URL`
* (the cloud launcher injects the latter, minted-against value into the box).
* Mirrors `@agentworkforce/delivery`'s default; kept inline here because
* delivery depends on runtime (importing it back would be circular).
*/
export const DEFAULT_RELAYCAST_URL = 'https://cast.agentrelay.com';

type Log = WorkforceCtx['log'];

function resolveRelaycastUrl(env: NodeJS.ProcessEnv): string {
const raw = env.RELAYCAST_URL?.trim() || env.RELAY_BASE_URL?.trim() || DEFAULT_RELAYCAST_URL;
return raw.replace(/\/+$/, '');
}

/**
* Relaycast agent actions (`/v1/dm`, channel posts) are authenticated with the
* AGENT token, not the workspace key — prefer it, falling back to the workspace
* `RELAY_API_KEY` so single-identity boxes and tests still work.
*/
function resolveAgentToken(env: NodeJS.ProcessEnv): string | undefined {
return (
env.WORKFORCE_AGENT_TOKEN?.trim() ||
env.RELAY_AGENT_TOKEN?.trim() ||
env.RELAY_API_KEY?.trim() ||

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P2 Badge Honor RELAY_AGENT_TOKENS when resolving relay auth

When the sandbox provides per-agent credentials via RELAY_AGENT_TOKENS plus RELAY_AGENT_NAME (the existing ctx.memory resolver already supports this in packages/runtime/src/ctx.ts:447-451), ctx.relay falls through to no token or to RELAY_API_KEY, so DM/post either skips the fetch or gets rejected by agent-scoped endpoints even though a valid agent token is present. Please share the existing token resolver or include the token-map fallback here.

Useful? React with 👍 / 👎.

undefined
);
}
Comment on lines +24 to +31

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

high

The token resolution logic here is missing several fallback environment variables that are supported by the rest of the runtime (specifically in ctx.ts's resolveAgentToken), such as RELAYFILE_TOKEN, RELAY_AGENT_TOKENS (with RELAY_AGENT_NAME), and WORKFORCE_WORKSPACE_TOKEN. This will cause ctx.relay to fail in environments that rely on these variables for authentication.

function tokenFromAgentTokenMap(raw: string | undefined, agentName: string | undefined): string | undefined {
  const value = raw?.trim();
  if (!value) return undefined;
  if (!value.startsWith('{')) return value;
  try {
    const parsed = JSON.parse(value) as unknown;
    if (typeof parsed !== 'object' || parsed === null || Array.isArray(parsed)) return undefined;
    const tokens = parsed as Record<string, unknown>;
    const namedToken = agentName ? tokens[agentName] : undefined;
    if (typeof namedToken === 'string' && namedToken.trim()) return namedToken.trim();
    const singleToken = Object.values(tokens).find((entry): entry is string => typeof entry === 'string' && entry.trim().length > 0);
    return singleToken?.trim();
  } catch {
    return undefined;
  }
}

function resolveAgentToken(env: NodeJS.ProcessEnv): string | undefined {
  return (
    env.WORKFORCE_AGENT_TOKEN?.trim() ||
    env.RELAY_AGENT_TOKEN?.trim() ||
    env.RELAYFILE_TOKEN?.trim() ||
    tokenFromAgentTokenMap(env.RELAY_AGENT_TOKENS, env.RELAY_AGENT_NAME) ||
    env.RELAY_API_KEY?.trim() ||
    env.WORKFORCE_WORKSPACE_TOKEN?.trim() ||
    undefined
  );
}


const RELAY_TIMEOUT_MS = 8_000;

async function fetchWithTimeout(url: string, init: RequestInit): Promise<Response | undefined> {
const controller = new AbortController();
const timer = setTimeout(() => controller.abort(), RELAY_TIMEOUT_MS);
try {
return await fetch(url, { ...init, signal: controller.signal });
} catch {
return undefined;
} finally {
clearTimeout(timer);
}
}
Comment on lines +35 to +45

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

This helper function is no longer needed if we inline the timeout and abort controller logic directly into the send function. Inlining it allows the timeout to cover both the connection phase and the response body parsing phase (res.json()), preventing potential hangs on slow networks.

Comment on lines +35 to +45

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🩺 Stability & Availability | 🟠 Major | ⚡ Quick win

Timeout does not cover response-body reading.

clearTimeout(timer) runs in finally as soon as fetch resolves (i.e., once headers arrive). The subsequent res.json() in send() (Line 77) then runs with no abort guard, so a peer that returns headers quickly but stalls the body stream can hang the handler indefinitely — defeating the documented 8s / never-hang contract.

Consider keeping the abort signal armed until the body is consumed, e.g. read the body inside the timeout-guarded scope (or only clear the timer after res.json()).

🔧 One option: parse inside the guarded scope
-async function fetchWithTimeout(url: string, init: RequestInit): Promise<Response | undefined> {
-  const controller = new AbortController();
-  const timer = setTimeout(() => controller.abort(), RELAY_TIMEOUT_MS);
-  try {
-    return await fetch(url, { ...init, signal: controller.signal });
-  } catch {
-    return undefined;
-  } finally {
-    clearTimeout(timer);
-  }
-}
+async function fetchWithTimeout(
+  url: string,
+  init: RequestInit
+): Promise<{ status: number; ok: boolean; json: () => Promise<unknown> } | undefined> {
+  const controller = new AbortController();
+  const timer = setTimeout(() => controller.abort(), RELAY_TIMEOUT_MS);
+  try {
+    const res = await fetch(url, { ...init, signal: controller.signal });
+    // Buffer the body while the abort signal is still armed.
+    const text = await res.text();
+    return {
+      status: res.status,
+      ok: res.ok,
+      json: async () => JSON.parse(text)
+    };
+  } catch {
+    return undefined;
+  } finally {
+    clearTimeout(timer);
+  }
+}

(Adjust send()'s res.json().catch(...) accordingly.)

🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

In `@packages/runtime/src/relay.ts` around lines 35 - 45, The timeout in
fetchWithTimeout only guards the network request up to headers, so send() can
still hang while reading the response body via res.json(). Update relay.ts so
the same AbortController/timer remains active until body parsing completes,
either by moving JSON parsing into the timeout-guarded scope or by deferring
clearTimeout(timer) until after res.json() finishes. Keep the fix centered
around fetchWithTimeout and send() so the 8s never-hang behavior applies to both
headers and body consumption.


/**
* Build the relay context from the box environment. Always safe to call: when
* no agent token is present every send returns `{ ok: false }` (logged).
*/
export function buildRelayContext(log: Log, env: NodeJS.ProcessEnv = process.env): RelayContext {
const token = resolveAgentToken(env);
const baseUrl = resolveRelaycastUrl(env);

async function send(path: string, body: unknown, action: string): Promise<RelaySendResult> {
if (!token) {
log('warn', `relay.${action}.no-token`, {
reason: 'no agent token (WORKFORCE_AGENT_TOKEN/RELAY_AGENT_TOKEN/RELAY_API_KEY) in the box'
});
return { ok: false };
}
const res = await fetchWithTimeout(`${baseUrl}${path}`, {
method: 'POST',
headers: { authorization: `Bearer ${token}`, 'content-type': 'application/json' },
body: JSON.stringify(body)
});
if (!res) {
log('warn', `relay.${action}.failed`, { reason: 'timeout or network error' });
return { ok: false };
}
if (!res.ok) {
log('warn', `relay.${action}.failed`, { status: res.status });
return { ok: false };
}
// Relaycast REST wraps success as `{ ok, data }`; ids live under
// data.message.id / data.id. Unwrap before reading.
const json = (await res.json().catch(() => null)) as Record<string, unknown> | null;
const data =
json && typeof json.data === 'object' && json.data !== null
? (json.data as Record<string, unknown>)
: json;
const message =
data && typeof data.message === 'object' && data.message !== null
? (data.message as Record<string, unknown>)
: undefined;
const rawId = message?.id ?? data?.messageId ?? data?.id;
return { ok: true, messageId: rawId != null ? String(rawId) : undefined };
}
Comment on lines +55 to +88

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

Inline the timeout and abort controller logic here to ensure the 8-second timeout covers the entire request lifecycle, including reading and parsing the response body via res.json().

  async function send(path: string, body: unknown, action: string): Promise<RelaySendResult> {
    if (!token) {
      log('warn', `relay.${action}.no-token`, {
        reason: 'no agent token (WORKFORCE_AGENT_TOKEN/RELAY_AGENT_TOKEN/RELAY_API_KEY) in the box'
      });
      return { ok: false };
    }
    const controller = new AbortController();
    const timer = setTimeout(() => controller.abort(), RELAY_TIMEOUT_MS);
    try {
      const res = await fetch(`${baseUrl}${path}`, {
        method: 'POST',
        headers: { authorization: `Bearer ${token}`, 'content-type': 'application/json' },
        body: JSON.stringify(body),
        signal: controller.signal
      });
      if (!res.ok) {
        log('warn', `relay.${action}.failed`, { status: res.status });
        return { ok: false };
      }
      const json = (await res.json().catch(() => null)) as Record<string, unknown> | null;
      const data =
        json && typeof json.data === 'object' && json.data !== null
          ? (json.data as Record<string, unknown>)
          : json;
      const message =
        data && typeof data.message === 'object' && data.message !== null
          ? (data.message as Record<string, unknown>)
          : undefined;
      const rawId = message?.id ?? data?.messageId ?? data?.id;
      return { ok: true, messageId: rawId != null ? String(rawId) : undefined };
    } catch (err) {
      const isTimeout = err instanceof Error && err.name === 'AbortError';
      log('warn', `relay.${action}.failed`, { reason: isTimeout ? 'timeout' : 'network error' });
      return { ok: false };
    } finally {
      clearTimeout(timer);
    }
  }


return {
dm: (to: string, text: string) => send('/v1/dm', { to, text }, 'dm'),
post: (channel: string, text: string) =>
send(`/v1/channels/${encodeURIComponent(channel)}/messages`, { text }, 'post')
};
}
24 changes: 24 additions & 0 deletions packages/runtime/src/types.ts
Original file line number Diff line number Diff line change
Expand Up @@ -378,6 +378,28 @@ export interface TrajectoryContext {
done(summary: string, confidence: number): Promise<void>;
}

/** Result of a relay send (DM or channel post). */
export interface RelaySendResult {
ok: boolean;
/** Delivered relaycast message id, when the gateway returns one. */
messageId?: string;
}

/**
* Agent-to-agent messaging over the relay (relaycast). Lets a handler DM a peer
* agent or post to a relay channel using the box's injected agent token —
* the inbound counterpart to the relay `inbox` trigger. Never throws: returns
* `{ ok: false }` (logged) on missing token / timeout / non-2xx, so a relay
* reply degrades gracefully. To reply to whoever messaged this agent, read the
* sender off the inbound relay event and pass it as `to`.
*/
export interface RelayContext {
/** DM a peer agent by registered name. */
dm(to: string, text: string): Promise<RelaySendResult>;
/** Post a message to a relay channel by name. */
post(channel: string, text: string): Promise<RelaySendResult>;
}

/**
* The context object handlers receive on every event invocation.
* Provider data is accessed via the VFS helpers exported from the runtime
Expand Down Expand Up @@ -413,6 +435,8 @@ export interface WorkforceCtx {
workflow: WorkflowContext;
/** Schedule one-off follow-up ticks. */
schedule: ScheduleContext;
/** Agent-to-agent messaging over the relay (DM a peer / post to a channel). */
relay: RelayContext;
/**
* Auto-recorded decision trajectory (the WHY). No-op when recording is
* disabled (no `persona.memory.trajectories` opt-in or no resolvable
Expand Down
Loading