Skip to content

feat(broker): node-only delivery for relaycast v5.0.1#1201

Merged
willwashburn merged 21 commits into
mainfrom
feat/broker-node-delivery
Jun 26, 2026
Merged

feat(broker): node-only delivery for relaycast v5.0.1#1201
willwashburn merged 21 commits into
mainfrom
feat/broker-node-delivery

Conversation

@willwashburn

@willwashburn willwashburn commented Jun 25, 2026

Copy link
Copy Markdown
Member

✅ Fully validated end-to-end on prod (cast.agentrelay.com, v5.0.1)

The complete chain was exercised live against the deployed v5.0.1 engine with this branch's broker: enroll as a role:broker node → spawn an agent (node-bound to the fleet node) → DM from another agent → engine delivered it over /v1/node/ws → broker injected it into the agent's PTY → acked. The delivered event_id matched the sent DM. This is the exact scenario that was broken ("messages never inject between broker-spawned agents") — now working.

What this does

Migrates agent-relay-broker to relaycast v5.0.1's node-only delivery model. In v5.0.1 the workspace firehose (/v1/ws) is observer-only; realtime delivery happens exclusively over the node channel (/v1/node/ws). The broker now:

  • Enrolls as a node at startup — node id derived from machine-id + hash(cwd) (so multiple brokers/workspaces on one host don't collide on create_node); mints + persists a workspace-scoped node token (create_node); connects /v1/node/ws unconditionally and sends node.register. A missing token is a loud hard-fault; a node-WS 401 invalidates the cached token and re-mints (bounded).
  • Binds every spawned agent to the broker's node (both /api/spawn and engine action.invoke spawns) so the engine routes their messages back.
  • Delivers over /v1/node/wsdeliver frames (route by agent_id, branch on payload.type: inject messages + action.completed/action.failed/action.denied results, ack-only for reactions/receipts), cumulative delivery.ack {up_to_seq}, bounded dedup, seq:0 fan-out surfaced.
  • Runs spawn/release as action.invokeaction.result.
  • Removes the firehose — fleet-mode drop, fleet_mode_enabled, and the rejected /v1/ws workspace-stream connection are gone (HTTP client retained for registration/API).

How it was built & validated

  • Implement (3 increments) → per-increment adversarial review + fix → comprehensive review + fixes → /v1/ws teardown → workspace-scoped token cache + re-mint-on-401 + cwd-scoped node id + agent.register reply correlation (robustness bugs the review and prod test surfaced).
  • 785+ unit tests pass, clippy clean. Runtime-validated end-to-end on prod (above).

Architecture

The node / broker / broker-pty-engine / @agent-relay/harness-driver / action-runner vocabulary + delivery model is documented in agentrelay.com docs PR #9 (architecture.mdx).

🤖 Generated with Claude Code

willwashburn and others added 7 commits June 25, 2026 14:11
Bump crates/broker to relaycast =5.0.1 (#214, increment 1 of the broker
node-only delivery migration).

- Remove the workspace-stream toggle (ensure_workspace_stream_enabled and
  its RelaycastWsClient::run call); RelayCast::workspace_stream_set is gone
  in v5.
- Add node-frame fields the v5.0.1 engine sends (structs are
  deny_unknown_fields, so missing fields drop the frame): Deliver gains
  agent_id and delivery_id; ActionInvoke gains optional agent_id and
  agent_name.
- Extract the AgentReleaseRequested / AgentSpawnRequested firehose match
  arms into reusable release_worker_locally / spawn_worker_from_request
  async fns and drop the arms (those WsEvent variants no longer exist in
  v5.0.1). Increment 2 will call these from action.invoke.
- Point register-flow test mocks at the new /v1/agents endpoint with the
  CreateAgentResponse body (v5.0.1 register_agent_token registers via
  /v1/agents instead of /v1/agents/spawn).
- Add the new fields to fleet-wire deliver/action.invoke fixtures.

cargo build and cargo test -p agent-relay-broker both green.

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
… by ws_type

In relaycast v5.0.1 WsEvent ends in #[serde(other)] Unknown, so an
agent.spawn_requested frame deserializes to Ok(WsEvent::Unknown), not Err.
The firehose handler gated its raw-JSON spawn fallback (and a deser-warning)
on from_value::<WsEvent>(..).is_ok(), which is now always true — making both
paths dead code, contrary to the prior "preserved untouched" claim.

Node control owns spawn/release via action.invoke (the extracted
spawn_worker_from_request / release_worker_locally helpers); the workspace
firehose no longer drives these events. Replace the meaningless is_ok() gate
with an explicit ws_type match that ignores agent.spawn_requested /
agent.release_requested (already deduped) instead of letting them fall through
to map_ws_event. Remove the dead fallback, dead warning, and now-unused local
bindings. Add regression tests pinning that both control frames decode to
WsEvent::Unknown so future dispatch must classify by ws_type.

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
…/v1/node/ws

Increment 2 of the broker node-only delivery migration (relaycast v5.0.1). The
broker now enrolls as a relaycast node, binds its agents through node control,
and delivers/injects solely over /v1/node/ws.

- NODE-TOKEN BOOTSTRAP (init.rs): resolve_node_token prefers RELAY_NODE_TOKEN,
  then a token cached for this exact node id, otherwise mints one via
  RelayCast::create_node (kind=ws, role=broker) with the workspace key and
  persists it next to the node id (scoped to node_id so id rotation
  invalidates it). node_control gains load/persist_node_token helpers.
- UNCONDITIONAL node.register (init.rs): push FleetControlCommand::RegisterNode
  with a broker self-manifest (spawn capability) right after spawning the
  node-control client, so the broker enrolls every startup regardless of any
  sidecar.
- BIND EVERY SPAWNED AGENT via node-control agent.register: extracted
  register_node_agent_token; both /api/spawn (api.rs) and the Inc1
  spawn_worker_from_request now mint the agent token over node control (HTTP
  pre-registration only as fallback when node binding is unavailable). The
  minted token injects RELAY_AGENT_TOKEN + RELAY_SKIP_BOOTSTRAP via snippets,
  so the worker MCP never re-registers over HTTP.
- INBOUND NODE FRAMES (fleet.rs): handle_fleet_deliver now uses the real
  delivery_id (no longer derived from msg_id) and branches on payload.type —
  message.created/thread.reply/dm.received/group_dm.received (and legacy empty
  type) inject into the worker PTY; message.reacted/message.read are acked with
  a tracing log only (PTY surfacing deferred); unknown types are acked without
  surfacing. action.invoke routes spawn/spawn:* and release to the Inc1 spawn/
  release fns, replying action.result {output} on success or {error} on
  failure.
- fleet_mode_enabled flips on FleetControlEvent::Connected (and is not cleared
  on disconnect) so workspace-firehose delivery is suppressed once node
  delivery is live, avoiding double-delivery while honoring at-least-once
  resume.

Runtime delivery cannot be exercised without a live engine; added unit tests
for the delivery-classification and action.invoke identity/field helpers.
cargo build and cargo test -p agent-relay-broker both green (779 passing).

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
…lias classification

Node /v1/node/ws deliver frames nest the message under payload.data per
relaycast 5.0.1 normalize_node_deliver (data.text/channel_name/agent_name/
from_name/thread_id). fleet_relay_delivery read only flat /text,/from,/channel
paths, so every node-delivered message injected the raw JSON blob attributed to
"relaycast". Extract from the data envelope (data.* first, legacy flat paths as
fallback) via a testable fleet_delivery_fields helper.

classify_fleet_delivery only injected message.created|thread.reply|dm.received|
group_dm.received; the engine may emit any relaycast parse_inbound_kind alias
(message.received/new/sent/delivered, dm.created/new/sent/message.created,
direct_message.*, thread.message.created/sent, group_dm.*). Those were
acked-and-dropped (permanent loss under at-least-once). Widen the Inject arm to
the full alias set. Update the deliver.json fixture to the real {type,data}
shape.

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
…ery)

Inc3 of the broker node-only delivery migration. Message delivery now
flows solely over /v1/node/ws via handle_fleet_deliver, so the workspace
firehose delivery path in the broker fleet runtime is dead.

- Drop the fleet_mode_enabled field and all its assignments; its only
  reader was the firehose drop in handle_relaycast_message.
- Reduce handle_relaycast_message to log-and-discard. The map_ws_event
  injection block, self-echo filtering, DM resolution, dashboard
  rebroadcast, and the fleet-mode drop are all gone.
- Remove now-dead firehose-only helpers and their tests:
  relaycast_ws_control_dedup_key; routing is_self_echo,
  resolve_delivery_targets, worker_names_for_dm_participants,
  display_target_for_dashboard, DeliveryPlan; queue_and_try_delivery;
  WorkerRegistry::has_any_worker / has_worker_by_name_ignoring_case;
  the unused dm_participants_cache runtime field.
- Rewrite routing tests to cover the surviving
  worker_names_for_channel_delivery / worker_names_for_direct_target
  (sender exclusion, case-insensitivity, workspace-id filtering).

Deliberately left intact: RelaycastWsClient::run and map_ws_event are
still used by `agent-relay-broker wrap` (single-agent PTY mode), which
legitimately consumes the workspace firehose. Spawn/release stay owned
by node control (spawn_worker_from_request / release_worker_locally).
No observer/observer-token path added (separate follow-up).

cargo build and clippy clean; 781 broker tests pass.

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
Apply targeted adversarial-review fixes to the broker's node-only
delivery migration:

- HTTP-register spawn fallback now binds the agent to the broker's node
  via SDK bind_agent_to_node so it becomes via_node (the only kind the
  engine delivers to); a failed bind emits a loud registration_warning.
  A missing node token is logged as a hard fault, not a quiet warning.
- seq:0 fan-out frames are no longer dropped: special-cased in
  FleetDeliveryBook::observe/commit to always surface-and-ack with
  msg_id dedup; action.completed/action.failed/action.denied route to
  Inject (delivered to the caller), message.reacted/message.read stay
  ack-only (PTY surfacing deferred).
- Remove deny_unknown_fields from inbound Deliver/ActionInvoke so a
  future engine field no longer drops the frame without an ack
  (infinite redelivery). Outbound frames keep it.
- Bound AgentDeliveryCursor.seen_msg_ids with a FIFO cap (512).
- release action.result is now faithful: genuinely-unknown worker
  returns an error; already-exited worker still reports success.

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
In v5.0.1 node-only delivery the /v1/ws workspace-stream WebSocket is
observer-only and rejects the broker's workspace key with HTTP 401. The
broker kept opening it anyway, 401-looping every 3s and burning reconnects
even though delivery already flows over /v1/node/ws. The earlier teardown
removed firehose message handling but left the connection itself.

MultiWorkspaceSession::new no longer spawns the workspace-stream WS task;
it drains the WsControl channel to a no-op and keeps the inbound channel
open as an inert empty source (kept alive by a sender clone so the wrap
action consumer and runtime no-op handler never busy-loop on a closed
receiver). RelaycastWsClient (the 401-looping run loop) is deleted. The
workspace HTTP client, WsControl plumbing, and ws_control_tx senders are
all kept intact.

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
@coderabbitai

coderabbitai Bot commented Jun 25, 2026

Copy link
Copy Markdown
Contributor

Review Change Stack

Note

Reviews paused

It looks like this branch is under active development. To avoid overwhelming you with review comments due to an influx of new commits, CodeRabbit has automatically paused this review. You can configure this behavior by changing the reviews.auto_review.auto_pause_after_reviewed_commits setting.

Use the following commands to manage reviews:

  • @coderabbitai resume to resume automatic reviews.
  • @coderabbitai review to trigger a single review.

Use the checkboxes below for quick actions:

  • ▶️ Resume reviews
  • 🔍 Trigger review
📝 Walkthrough

Walkthrough

The broker now uses node-scoped token minting and recovery, node-only delivery surfacing with seq:0 dedup changes, node-bound spawn/release routing, and removal of the legacy workspace websocket and routing surface.

Changes

Broker node-only delivery

Layer / File(s) Summary
Wire shapes and /v1/agents mocks
crates/broker/src/fleet_wire.rs, crates/broker/src/cli_mcp_args.rs, crates/broker/src/runtime/tests.rs, crates/broker/tests/fixtures/fleet-wire/*, CHANGELOG.md
Deliver and ActionInvoke accept the new agent fields, and the registration/read-ack tests and fixtures use /v1/agents with updated payload shapes.
Node token resolution and startup
crates/broker/src/node_control.rs, crates/broker/src/runtime/init.rs, crates/broker/src/runtime/event_loop.rs
Node IDs derive from a machine seed plus cwd, persisted node tokens are scoped by node/workspace/base URL, fleet_node_name is added to runtime state, and /v1/node/ws 401s trigger bounded re-minting and registration correlation updates.
Delivery surfacing and seq:0 dedup
crates/broker/src/runtime/delivery.rs, crates/broker/src/runtime/fleet.rs, crates/broker/src/node_control.rs, crates/broker/src/runtime/relaycast_events.rs
Node Deliver frames are surfaced before ack, seq:0 frames dedupe by msg_id with capped memory, and payload helpers read typed envelopes with legacy fallbacks.
Spawn and release control routing
crates/broker/src/runtime/api.rs, crates/broker/src/runtime/fleet.rs, crates/broker/src/runtime/relaycast_events.rs, crates/broker/src/runtime/tests.rs
/spawn now prefers node-bound registration and HTTP fallback rebinding, node-control spawn/release paths route through local helpers, and the control-dedup tests follow the new relaycast v5 shapes.
Legacy websocket, routing, and worker cleanup
crates/broker/Cargo.toml, crates/broker/src/relaycast/workspace.rs, crates/broker/src/relaycast/ws.rs, crates/broker/src/routing.rs, crates/broker/src/runtime/messages.rs, crates/broker/src/runtime/mod.rs, crates/broker/src/worker.rs
Legacy workspace websocket fanout, routing helpers, worker query helpers, and related imports are removed, and the relaycast dependency pin is updated.

Estimated code review effort

🎯 5 (Critical) | ⏱️ ~90+ minutes

Possibly related issues

Possibly related PRs

Suggested reviewers

  • khaliqgant

Poem

I tunneled through the node-token snow,
With seq:0 crumbs to help me go.
I bound the spawn right to its nest,
Then waved goodbye to websocket rest.
🐇✨ The broker burrow glows aglow.

🚥 Pre-merge checks | ✅ 5
✅ Passed checks (5 passed)
Check name Status Explanation
Title check ✅ Passed The title clearly summarizes the main change: broker node-only delivery for relaycast v5.0.1.
Description check ✅ Passed It covers the PR summary and validation details, matching the template’s required content despite different headings.
Docstring Coverage ✅ Passed Docstring coverage is 100.00% which is sufficient. The required threshold is 80.00%.
Linked Issues check ✅ Passed Check skipped because no linked issues were found for this pull request.
Out of Scope Changes check ✅ Passed Check skipped because no linked issues were found for this pull request.
✨ Finishing Touches
🧪 Generate unit tests (beta)
  • Create PR with unit tests
  • Commit unit tests in branch feat/broker-node-delivery

Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out.

❤️ Share

Comment @coderabbitai help to get the list of available commands.

@gemini-code-assist gemini-code-assist Bot left a comment

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Code Review

This pull request transitions the broker to node-only delivery by upgrading to relaycast v5.0.1, removing the dead workspace-stream firehose delivery path, and routing message delivery solely over /v1/node/ws. It introduces node enrollment, agent-to-node binding, support for seq:0 fan-out frames, and bounded seen message ID tracking. Feedback on these changes highlights two key issues: first, receiving a delivery for an unknown or released agent will fail injection and withhold the acknowledgment, leading to infinite redelivery retries; second, extracting non-string action result outputs or errors via first_string fails and falls back to stringifying the entire envelope, which can be improved by specifically stringifying the /data/output or /data/error pointers first.

Important

The consumer version of Gemini Code Assist on GitHub is being sunset. Starting June 18, 2026, new organization installations will be blocked, and all code review activity will officially cease on July 17, 2026.
For more details on the timeline and next steps, please review the Help Documentation.

Comment on lines +388 to +391
FleetDeliverySurfacing::Inject => {
let relay_delivery = self.fleet_relay_delivery(deliver);
self.workers.deliver(&deliver.agent, relay_delivery).await
}

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

high

If a delivery is received for an agent that has already been released or does not exist in the registry, self.workers.deliver will fail. Under the current implementation, any failure in surface_fleet_deliver causes the broker to withhold the acknowledgment (delivery.ack), leading to infinite redelivery retries by the engine.

To prevent this, we should check if the agent exists in self.workers.workers before attempting delivery. If the agent is missing, we should log a warning and return Ok(()) so the message is committed and acknowledged rather than retried indefinitely.

            FleetDeliverySurfacing::Inject => {
                if !self.workers.workers.contains_key(&deliver.agent) {
                    tracing::warn!(
                        target = "relay_broker::fleet",
                        agent = %deliver.agent,
                        delivery_id = %deliver.delivery_id,
                        "received delivery for unknown or released agent; acking to prevent infinite redelivery"
                    );
                    return Ok(());
                }
                let relay_delivery = self.fleet_relay_delivery(deliver);
                self.workers.deliver(&deliver.agent, relay_delivery).await
            }

Comment on lines +1162 to +1177
&[
"/data/text",
"/text",
"/body",
"/content",
"/message/text",
"/payload/text",
// Action-result fan-out (action.completed/failed/denied) carries the
// result under data.output / data.error rather than a text field.
"/data/output",
"/data/error",
],
)
.unwrap_or_else(|| payload.to_string());
let from = first_string(
payload,

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

When extracting fields from a node deliver payload, if the payload represents an action result (such as action.completed or action.failed), the result/output is typically carried under data.output or data.error as a JSON object or other non-string type.

Currently, first_string only extracts string values, so it will return None for non-string outputs. The fallback unwrap_or_else(|| payload.to_string()) then stringifies the entire deliver frame (including all envelope metadata), which is injected into the agent's PTY.

We can improve this by specifically stringifying /data/output, /data/error, or /data if they exist, before falling back to the entire payload.

    let body = first_string(
        payload,
        &[
            "/data/text",
            "/text",
            "/body",
            "/content",
            "/message/text",
            "/payload/text",
            "/data/output",
            "/data/error",
        ],
    )
    .unwrap_or_else(|| {
        payload
            .pointer("/data/output")
            .or_else(|| payload.pointer("/data/error"))
            .or_else(|| payload.pointer("/data"))
            .map(|v| v.to_string())
            .unwrap_or_else(|| payload.to_string())
    });

@willwashburn willwashburn marked this pull request as ready for review June 26, 2026 17:08
@willwashburn willwashburn requested a review from khaliqgant as a code owner June 26, 2026 17:08

@chatgpt-codex-connector chatgpt-codex-connector Bot left a comment

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

💡 Codex Review

Here are some automated review suggestions for this pull request.

Reviewed commit: 63037a3ae4

ℹ️ About Codex in GitHub

Your team has set up Codex to review pull requests in this repo. Reviews are triggered when you

  • Open a pull request for review
  • Mark a draft as ready
  • Comment "@codex review".

If Codex has suggestions, it will comment; otherwise it will react with 👍.

Codex can also answer questions or update the PR. Try commenting "@codex address that feedback".

Comment on lines +496 to +500
let cli = match action_invoke_string(&invoke.input, &["cli", "command", "provider"]) {
Some(cli) => cli,
None => {
self.reply_action_error(&invoke.invocation_id, "spawn_missing_cli")
.await;

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P1 Badge Derive the CLI from spawn action suffix

For node action.invoke spawns where the engine encodes the requested harness in the action name (for example the added fixture uses "action": "spawn:codex" with no input.cli), this branch returns spawn_missing_cli before spawning anything. Since the comment above says spawn:<harness> is a supported form, the handler needs to fall back to the suffix (codex here) before rejecting the invoke.

Useful? React with 👍 / 👎.

Comment on lines +127 to +132
Ok(token) => {
tracing::info!(
worker = %name,
error = %error,
"continuing spawn without pre-registration after retries exhausted"
"bound agent to node via agent.register for HTTP spawn"
);
preregistration_warning = Some(message);
None
Some(token.token)

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P1 Badge Seed node-minted agent tokens locally

When /api/spawn succeeds through the new node-control agent.register path, this branch only passes the minted token to the worker and never seeds RelaycastHttpClient's registration cache. After that worker confirms a delivered Relaycast message, mark_delivery_read_ack calls mark_read_as_agent, which will see no cached token and re-register the same agent over the HTTP /v1/agents path; in the node-only model that HTTP registration is not node-bound and can rotate the identity away from the working via_node registration. Seed the returned token the same way the supplied-token path does, and apply the same fix to the duplicated node-register spawn paths.

Useful? React with 👍 / 👎.

@coderabbitai coderabbitai Bot left a comment

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 9

🧹 Nitpick comments (2)
CHANGELOG.md (1)

21-23: 📐 Maintainability & Code Quality | 🔵 Trivial | ⚡ Quick win

Condense these changelog bullets to user-facing outcomes.

These entries read like release notes plus implementation notes (agent.register, registration_warning, seq:0, PTY surfacing). Please shorten them to one impact-first bullet per visible behavior change.

Suggested rewrite
-- `agent-relay-broker` node-only delivery: agents spawned via the HTTP-register fallback (when node-control `agent.register` is unavailable) are now bound to the broker's node so the engine delivers to them; a failed bind surfaces a loud `registration_warning` instead of silently producing an undeliverable agent. A missing node token is now logged as a hard fault (realtime delivery disabled) rather than a quiet warning.
-- `agent-relay-broker` node-only delivery: `seq:0` fan-out frames (reactions, read receipts, and `action.completed`/`action.failed`/`action.denied` results) are no longer dropped — action results are injected into the calling agent's PTY; reaction/read receipts are acked (PTY surfacing deferred). Inbound `deliver`/`action.invoke` frames tolerate unknown future engine fields instead of being dropped without an ack (which caused infinite redelivery), and the broker's per-agent delivery-dedup memory is now bounded.
-- `agent-relay-broker` node `release` action reports a faithful `action.result`: a genuinely unknown worker returns an error while an already-exited worker still reports success.
+- `agent-relay-broker` no longer leaves fallback-spawned agents unreachable, and missing node tokens now fail loudly when realtime delivery is unavailable.
+- `agent-relay-broker` now delivers node fan-out action results correctly, acknowledges receipt/read frames without redelivery loops, and bounds delivery dedup state.
+- `agent-relay-broker` `release` actions now distinguish unknown workers from already-exited ones.

As per coding guidelines, CHANGELOG.md: "Changelog entries should be concise and impact-first, with one short bullet per user-visible change. Omit issue/PR links, internal notes, and implementation details."

🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

In `@CHANGELOG.md` around lines 21 - 23, Condense the three changelog bullets into
one short, user-facing bullet per behavior change, removing implementation
details like agent.register, registration_warning, seq:0, PTY surfacing, and
other internal mechanics. Rephrase the entries around visible outcomes only:
reliable delivery to newly registered agents, fan-out results/reactions being
handled correctly instead of dropped, and release actions reporting accurate
success/error results. Keep the wording concise and impact-first in
CHANGELOG.md.

Source: Coding guidelines

crates/broker/src/fleet_wire.rs (1)

895-915: 📐 Maintainability & Code Quality | 🔵 Trivial | ⚡ Quick win

Pin the new top-level forward-compatibility behavior.

This test still round-trips the exact serialized shape. Since the core change is accepting future top-level fields, add an extra unknown field before deserialization so deny_unknown_fields cannot be reintroduced unnoticed.

🧪 Proposed regression guard
-        let value: Value = serde_json::to_value(&msg).unwrap();
+        let mut value: Value = serde_json::to_value(&msg).unwrap();
         assert_eq!(value["payload"]["metadata"]["channel"], "general");
+        value["engine_future_field"] = json!({ "ignored": true });
         let decoded: RelaycastToBroker = serde_json::from_value(value).unwrap();
         assert_eq!(decoded, msg);
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

In `@crates/broker/src/fleet_wire.rs` around lines 895 - 915, The regression test
in deliver_accepts_open_payloads only verifies a normal round-trip and does not
lock in forward-compatibility for unknown top-level fields. Update this test to
add an extra unknown field to the serialized Value before calling
serde_json::from_value, so RelaycastToBroker/Deliver continues to accept future
top-level fields and reintroducing deny_unknown_fields will fail the test.
🤖 Prompt for all review comments with AI agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

Inline comments:
In `@crates/broker/src/node_control.rs`:
- Around line 466-484: The cached node token in
`load_node_token`/`PersistedNodeToken` is only scoped by `node_id`, so it can be
reused after switching Relaycast bases or engines. Update `PersistedNodeToken`
and the token persistence/loading flow in `node_control.rs` to also store the
relay base or node WS URL, and have `load_node_token` validate that value
alongside `node_id` before returning the token. Make sure the code that writes
tokens and the callers that read them both use the same identifying symbol for
the base URL so stale tokens are rejected when the host changes.
- Around line 490-501: The node token persistence in persist_node_token
currently uses a plain write path that can leave node-token.json with overly
permissive permissions. Update persist_node_token in node_control.rs to
create/write the file through OpenOptions, set restrictive Unix permissions to
0o600 at creation time, and ensure existing files are also chmod’d back to 0o600
after writing; keep the existing serialization and error context intact while
making the permission handling part of the write flow.

In `@crates/broker/src/runtime/api.rs`:
- Around line 127-132: The HTTP spawn path in api::spawn currently keeps only
token.token after agent.register, but it also needs to retain token.agent_id so
failed spawns can clean up the node-bound agent. Update the spawn flow to store
the agent_id alongside the token, and in the workers.spawn(...) error branch use
that agent_id to deregister/prune the agent before returning the error. Focus
the change around the agent.register handling and the workers.spawn failure
path.

In `@crates/broker/src/runtime/fleet.rs`:
- Around line 388-391: In FleetDeliverySurfacing::Inject, missing workers are
treated like retryable delivery failures, which causes handle_fleet_deliver to
withhold the ack and loop forever. Update the inject path in fleet.rs to detect
a permanently missing worker before or after
self.workers.deliver(&deliver.agent, relay_delivery) by checking WorkerRegistry
state or matching the unknown worker error from send_to_worker/deliver, and
convert that case into the terminal AckUnknown behavior so the ack advances
immediately.
- Around line 548-562: The node-action spawn flow in handle_fleet_action_spawn
updates the worker set but never registers the new agent in fleet_inventory or
syncs the engine state. After spawn_worker_from_request succeeds and
self.workers.workers contains the new name, insert the agent into
self.fleet_inventory and immediately call self.publish_fleet_inventory().await,
matching the behavior already used in handle_fleet_spawn_agent so spawned agents
are visible and later prune_fleet_agent_state can find them.

In `@crates/broker/src/runtime/init.rs`:
- Around line 245-263: The node-token resolution path in init() only logs a
fatal configuration message but still continues startup with no token, leaving
the broker ready while unreachable. Update the node-only bootstrap flow around
node_token handling and the node-control client initialization to fail fast when
the token is missing: stop before API readiness, return an error or exit from
the init path, and ensure the control client is never started with None. Apply
the same behavior to the corresponding fallback branch referenced by the
duplicate block so both paths enforce the same hard-fail semantics.

In `@crates/broker/src/runtime/relaycast_events.rs`:
- Around line 421-423: The action spawn flow is dropping the warning returned by
bind_http_registered_agent_to_node, so handle_fleet_action_spawn can treat an
undeliverable node binding as success. Update the relaycast_events spawn path to
inspect the return from bind_http_registered_agent_to_node and, when it yields
Some(warning), stop the spawn or return an error/outcome that lets the caller
emit action.result with error. Use handle_fleet_action_spawn and
bind_http_registered_agent_to_node as the key points to wire this failure
through instead of unconditionally continuing with Some(token).
- Around line 392-398: The local spawn flow in relaycast_events keeps only
`token.token` after a successful `agent.register`, so failed
`workers.spawn(...)` calls cannot clean up the relaycast registration. Update
the spawn path around the `agent.register` handling to retain `token.agent_id`
alongside the token, then in the `workers.spawn` error branch use that saved
agent id to deregister or prune the node-bound agent. Apply the same fix in both
affected spawn blocks so failed local spawns do not leave stale registrations.
- Around line 211-230: The unknown-worker duplicate release path in
relaycast_events should mirror the normal success cleanup before returning
Released. In the Err(error) branch of the release handling logic, after
is_unknown_worker_error_message(...) succeeds, make sure the same cleanup
performed on the success path runs for pending deliveries, pending requests,
delivery state, and result tokens, so stale retry/timeout state is cleared for
already-exited workers. Use the existing release handling flow and related state
helpers around ReleaseOutcome::Released to locate and reuse the success-path
cleanup rather than duplicating ad hoc logic.

---

Nitpick comments:
In `@CHANGELOG.md`:
- Around line 21-23: Condense the three changelog bullets into one short,
user-facing bullet per behavior change, removing implementation details like
agent.register, registration_warning, seq:0, PTY surfacing, and other internal
mechanics. Rephrase the entries around visible outcomes only: reliable delivery
to newly registered agents, fan-out results/reactions being handled correctly
instead of dropped, and release actions reporting accurate success/error
results. Keep the wording concise and impact-first in CHANGELOG.md.

In `@crates/broker/src/fleet_wire.rs`:
- Around line 895-915: The regression test in deliver_accepts_open_payloads only
verifies a normal round-trip and does not lock in forward-compatibility for
unknown top-level fields. Update this test to add an extra unknown field to the
serialized Value before calling serde_json::from_value, so
RelaycastToBroker/Deliver continues to accept future top-level fields and
reintroducing deny_unknown_fields will fail the test.
🪄 Autofix (Beta)

Fix all unresolved CodeRabbit comments on this PR:

  • Push a commit to this branch (recommended)
  • Create a new PR with the fixes

ℹ️ Review info
⚙️ Run configuration

Configuration used: Organization UI

Review profile: CHILL

Plan: Pro Plus

Run ID: 3fe7a9d0-e176-4c73-9618-186f8a9604af

📥 Commits

Reviewing files that changed from the base of the PR and between 54672ae and 63037a3.

⛔ Files ignored due to path filters (1)
  • Cargo.lock is excluded by !**/*.lock
📒 Files selected for processing (32)
  • .agentworkforce/trajectories/completed/2026-06/traj_2mv3mylgd1vh/summary.md
  • .agentworkforce/trajectories/completed/2026-06/traj_2mv3mylgd1vh/trajectory.json
  • .agentworkforce/trajectories/completed/2026-06/traj_4yeya7a3y7c6.trace.json
  • .agentworkforce/trajectories/completed/2026-06/traj_4yeya7a3y7c6/summary.md
  • .agentworkforce/trajectories/completed/2026-06/traj_4yeya7a3y7c6/trajectory.json
  • .agentworkforce/trajectories/completed/2026-06/traj_8ogwhkndee1j/summary.md
  • .agentworkforce/trajectories/completed/2026-06/traj_8ogwhkndee1j/trajectory.json
  • .agentworkforce/trajectories/completed/2026-06/traj_af15b3zk02a6/summary.md
  • .agentworkforce/trajectories/completed/2026-06/traj_af15b3zk02a6/trajectory.json
  • .agentworkforce/trajectories/completed/2026-06/traj_nda08hijihz5.trace.json
  • .agentworkforce/trajectories/completed/2026-06/traj_nda08hijihz5/summary.md
  • .agentworkforce/trajectories/completed/2026-06/traj_nda08hijihz5/trajectory.json
  • CHANGELOG.md
  • crates/broker/Cargo.toml
  • crates/broker/src/cli_mcp_args.rs
  • crates/broker/src/fleet_wire.rs
  • crates/broker/src/node_control.rs
  • crates/broker/src/relaycast/workspace.rs
  • crates/broker/src/relaycast/ws.rs
  • crates/broker/src/routing.rs
  • crates/broker/src/runtime/api.rs
  • crates/broker/src/runtime/delivery.rs
  • crates/broker/src/runtime/event_loop.rs
  • crates/broker/src/runtime/fleet.rs
  • crates/broker/src/runtime/init.rs
  • crates/broker/src/runtime/messages.rs
  • crates/broker/src/runtime/mod.rs
  • crates/broker/src/runtime/relaycast_events.rs
  • crates/broker/src/runtime/tests.rs
  • crates/broker/src/worker.rs
  • crates/broker/tests/fixtures/fleet-wire/action.invoke.json
  • crates/broker/tests/fixtures/fleet-wire/deliver.json
💤 Files with no reviewable changes (3)
  • crates/broker/src/runtime/delivery.rs
  • crates/broker/src/runtime/messages.rs
  • crates/broker/src/worker.rs

Comment thread crates/broker/src/node_control.rs
Comment thread crates/broker/src/node_control.rs Outdated
Comment on lines +127 to +132
Ok(token) => {
tracing::info!(
worker = %name,
error = %error,
"continuing spawn without pre-registration after retries exhausted"
"bound agent to node via agent.register for HTTP spawn"
);
preregistration_warning = Some(message);
None
Some(token.token)

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🗄️ Data Integrity & Integration | 🟠 Major | 🏗️ Heavy lift

Clean up node-registered agents when HTTP spawn fails.

This path registers/binds the agent before workers.spawn(...), but it discards token.agent_id. If the later spawn fails, the broker replies with an error while relaycast still has a node-bound agent that cannot be served locally. Keep the agent_id and deregister/prune it in the spawn error branch.

🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

In `@crates/broker/src/runtime/api.rs` around lines 127 - 132, The HTTP spawn path
in api::spawn currently keeps only token.token after agent.register, but it also
needs to retain token.agent_id so failed spawns can clean up the node-bound
agent. Update the spawn flow to store the agent_id alongside the token, and in
the workers.spawn(...) error branch use that agent_id to deregister/prune the
agent before returning the error. Focus the change around the agent.register
handling and the workers.spawn failure path.

Comment on lines +388 to +391
FleetDeliverySurfacing::Inject => {
let relay_delivery = self.fleet_relay_delivery(deliver);
self.workers.deliver(&deliver.agent, relay_delivery).await
}

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🩺 Stability & Availability | 🟠 Major

🧩 Analysis chain

🏁 Script executed:

#!/bin/bash
rg -nP -C6 'fn send_to_worker' crates/broker/src/worker.rs
rg -nP -C3 'unknown worker|has_worker|workers\.get' crates/broker/src/worker.rs

Repository: AgentWorkforce/relay

Length of output: 2130


Ack missing workers immediately to prevent infinite redelivery loops.

When FleetDeliverySurfacing::Inject is processed and the target worker is missing (e.g., released between enqueue and delivery), self.workers.deliver fails. This error propagates to handle_fleet_deliver, which withholds the ack (Line 358), causing the engine to retry the same sequence indefinitely because the delivery book never advances.

WorkerRegistry::send_to_worker explicitly returns an error with the context unknown worker '{name}' when the target is missing (crates/broker/src/worker.rs, line 810). Distinguish this permanent failure from transient channel errors:

  • Check self.workers.has_worker(&deliver.agent) before attempting delivery, or
  • Pattern match the error from deliver to treat "unknown worker" as a terminal failure (equivalent to AckUnknown), releasing the ack immediately.

This prevents the broker from stalling on redelivery attempts for workers that no longer exist.

🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

In `@crates/broker/src/runtime/fleet.rs` around lines 388 - 391, In
FleetDeliverySurfacing::Inject, missing workers are treated like retryable
delivery failures, which causes handle_fleet_deliver to withhold the ack and
loop forever. Update the inject path in fleet.rs to detect a permanently missing
worker before or after self.workers.deliver(&deliver.agent, relay_delivery) by
checking WorkerRegistry state or matching the unknown worker error from
send_to_worker/deliver, and convert that case into the terminal AckUnknown
behavior so the ack advances immediately.

Comment on lines +548 to +562

self.publish_fleet_load(true).await;

// `spawn_worker_from_request` does not return a result; treat presence of
// the worker as success so the engine's invocation resolves.
if self.workers.workers.contains_key(&name) {
self.reply_action_output(
&invoke.invocation_id,
json!({ "spawned": true, "name": name.as_str() }),
)
.await;
} else {
self.reply_action_error(&invoke.invocation_id, "spawn_failed")
.await;
}

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🗄️ Data Integrity & Integration | 🟠 Major

🧩 Analysis chain

🏁 Script executed:

#!/bin/bash
rg -nP -C4 'fn publish_fleet_inventory_snapshot|UpdateInventory|fleet_inventory' crates/broker/src/runtime/fleet.rs crates/broker/src/node_control.rs

Repository: AgentWorkforce/relay

Length of output: 17652


Node-action spawn path omits fleet_inventory registration and sync

handle_fleet_action_spawn (Lines 548-562) creates an agent via spawn_worker_from_request but fails to register it in self.fleet_inventory or propagate the change via publish_fleet_inventory().

In contrast, the sidecar spawn path (handle_fleet_spawn_agent, Lines 677-686) correctly updates the inventory map and calls publish_fleet_inventory() to sync the broker's state with the engine.

Without this update:

  • Agents spawned via the node-only action.invoke path are invisible to the engine's inventory view.
  • Subsequent prune_fleet_agent_state calls during release will be no-ops for these agents, as they cannot be found in fleet_inventory.

Insert the agent into self.fleet_inventory and call self.publish_fleet_inventory().await immediately after successful spawn, mirroring the sidecar path.

Current code state
        self.publish_fleet_load(true).await;

        // `spawn_worker_from_request` does not return a result; treat presence of
        // the worker as success so the engine's invocation resolves.
        if self.workers.workers.contains_key(&name) {
            self.reply_action_output(
                &invoke.invocation_id,
                json!({ "spawned": true, "name": name.as_str() }),
            )
            .await;
        } else {
            self.reply_action_error(&invoke.invocation_id, "spawn_failed")
                .await;
        }
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

In `@crates/broker/src/runtime/fleet.rs` around lines 548 - 562, The node-action
spawn flow in handle_fleet_action_spawn updates the worker set but never
registers the new agent in fleet_inventory or syncs the engine state. After
spawn_worker_from_request succeeds and self.workers.workers contains the new
name, insert the agent into self.fleet_inventory and immediately call
self.publish_fleet_inventory().await, matching the behavior already used in
handle_fleet_spawn_agent so spawned agents are visible and later
prune_fleet_agent_state can find them.

Comment thread crates/broker/src/runtime/init.rs
Comment on lines +211 to +230
Err(error) => {
let message = error.to_string();
if is_unknown_worker_error_message(&message) {
workspace_http.forget_agent_registration(&name);
state.agents.remove(&name);
if paths.persist {
if let Err(save_error) = state.save(&paths.state) {
tracing::warn!(
path = %paths.state.display(),
error = %save_error,
"failed to persist broker state"
);
}
}
tracing::debug!(
child = %name,
"ignoring duplicate relaycast release for already exited worker"
);
// An already-exited worker is still a successful release.
ReleaseOutcome::Released

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🩺 Stability & Availability | 🟠 Major | ⚡ Quick win

Clear stale release state on idempotent unknown-worker releases.

This branch returns Released, but it skips the success-path cleanup for pending deliveries, pending requests, delivery state, and result tokens. Duplicate releases for an already-exited worker can leave retries or pending request timeouts behind.

🧹 Proposed cleanup mirror
             if is_unknown_worker_error_message(&message) {
                 workspace_http.forget_agent_registration(&name);
+                let dropped = take_pending_for_worker(pending_deliveries, &name);
+                if !dropped.is_empty() {
+                    let _ = send_event(
+                        sdk_out_tx,
+                        json!({"kind":"delivery_dropped","name":&name,"count":dropped.len(),"reason":"agent_released"}),
+                    )
+                    .await;
+                    let _ =
+                        emit_dropped_delivery_failures(sdk_out_tx, &dropped, "agent_released")
+                            .await;
+                }
+                fail_pending_requests_for_worker(pending_requests, &name, "relaycast_release");
+                delivery_states.remove(&name);
+                agent_result_tokens.retain(|_, agent| agent != &name);
                 state.agents.remove(&name);
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
Err(error) => {
let message = error.to_string();
if is_unknown_worker_error_message(&message) {
workspace_http.forget_agent_registration(&name);
state.agents.remove(&name);
if paths.persist {
if let Err(save_error) = state.save(&paths.state) {
tracing::warn!(
path = %paths.state.display(),
error = %save_error,
"failed to persist broker state"
);
}
}
tracing::debug!(
child = %name,
"ignoring duplicate relaycast release for already exited worker"
);
// An already-exited worker is still a successful release.
ReleaseOutcome::Released
Err(error) => {
let message = error.to_string();
if is_unknown_worker_error_message(&message) {
workspace_http.forget_agent_registration(&name);
let dropped = take_pending_for_worker(pending_deliveries, &name);
if !dropped.is_empty() {
let _ = send_event(
sdk_out_tx,
json!({"kind":"delivery_dropped","name":&name,"count":dropped.len(),"reason":"agent_released"}),
)
.await;
let _ =
emit_dropped_delivery_failures(sdk_out_tx, &dropped, "agent_released")
.await;
}
fail_pending_requests_for_worker(pending_requests, &name, "relaycast_release");
delivery_states.remove(&name);
agent_result_tokens.retain(|_, agent| agent != &name);
state.agents.remove(&name);
if paths.persist {
if let Err(save_error) = state.save(&paths.state) {
tracing::warn!(
path = %paths.state.display(),
error = %save_error,
"failed to persist broker state"
);
}
}
tracing::debug!(
child = %name,
"ignoring duplicate relaycast release for already exited worker"
);
// An already-exited worker is still a successful release.
ReleaseOutcome::Released
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

In `@crates/broker/src/runtime/relaycast_events.rs` around lines 211 - 230, The
unknown-worker duplicate release path in relaycast_events should mirror the
normal success cleanup before returning Released. In the Err(error) branch of
the release handling logic, after is_unknown_worker_error_message(...) succeeds,
make sure the same cleanup performed on the success path runs for pending
deliveries, pending requests, delivery state, and result tokens, so stale
retry/timeout state is cleared for already-exited workers. Use the existing
release handling flow and related state helpers around ReleaseOutcome::Released
to locate and reuse the success-path cleanup rather than duplicating ad hoc
logic.

Comment on lines +392 to +398
Ok(token) => {
tracing::info!(
worker = %name,
"bound agent to node via agent.register for action.invoke spawn"
);
Some(token.token)
}

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🗄️ Data Integrity & Integration | 🟠 Major | 🏗️ Heavy lift

Keep the registered agent_id so failed local spawns can be deregistered.

After agent.register succeeds, only token.token is kept. If workers.spawn(...) fails later, the node-bound agent remains registered in relaycast with no local worker to receive delivery. Preserve token.agent_id and deregister/prune it in the spawn error branch.

Also applies to: 534-540

🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

In `@crates/broker/src/runtime/relaycast_events.rs` around lines 392 - 398, The
local spawn flow in relaycast_events keeps only `token.token` after a successful
`agent.register`, so failed `workers.spawn(...)` calls cannot clean up the
relaycast registration. Update the spawn path around the `agent.register`
handling to retain `token.agent_id` alongside the token, then in the
`workers.spawn` error branch use that saved agent id to deregister or prune the
node-bound agent. Apply the same fix in both affected spawn blocks so failed
local spawns do not leave stale registrations.

Comment on lines +421 to +423
bind_http_registered_agent_to_node(workspace_http, node_name, &name)
.await;
Some(token)

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🩺 Stability & Availability | 🟠 Major | 🏗️ Heavy lift

Do not ignore failed HTTP-to-node binding for action spawns.

bind_http_registered_agent_to_node returns Some(warning) when the agent remains undeliverable, but this path discards it and still spawns with the token. handle_fleet_action_spawn can then report success solely because the local worker exists, while node-only delivery will not reach it. Propagate this as a spawn failure or return an outcome that lets the caller send action.result { error }.

🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

In `@crates/broker/src/runtime/relaycast_events.rs` around lines 421 - 423, The
action spawn flow is dropping the warning returned by
bind_http_registered_agent_to_node, so handle_fleet_action_spawn can treat an
undeliverable node binding as success. Update the relaycast_events spawn path to
inspect the return from bind_http_registered_agent_to_node and, when it yields
Some(warning), stop the spawn or return an error/outcome that lets the caller
emit action.result with error. Use handle_fleet_action_spawn and
bind_http_registered_agent_to_node as the key points to wire this failure
through instead of unconditionally continuing with Some(token).

github-actions Bot and others added 2 commits June 26, 2026 17:27
The workflow/fix sub-agents' `git add -A` committed this session's
.agentworkforce/trajectories/ records into the code branch. Untrack them
(kept on disk) so the PR is a reviewable code-only diff.

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
@willwashburn willwashburn changed the title feat(broker): node-only delivery for relaycast v5.0.1 [blocked on engine v5.0.1 deploy] feat(broker): node-only delivery for relaycast v5.0.1 Jun 26, 2026
willwashburn and others added 2 commits June 26, 2026 14:27
The broker persisted its minted node token at a single global path and
load_node_token only checked node_id, so a token minted for workspace A
(or a local engine) was reused against workspace B / prod and rejected
with HTTP 401 on /v1/node/ws, and the connect loop looped forever on the
rejected cached token.

- Scope the persisted token to workspace_id (and engine base_url):
  add both to PersistedNodeToken; load_node_token returns the cached
  token only when node_id AND workspace_id match (and base_url when both
  sides know it); legacy caches without base_url still reuse on a
  workspace match. persist_node_token / load_node_token / resolve_node_token
  signatures and call sites in runtime/init.rs updated accordingly.
- Re-mint on 401: detect an HTTP 401 handshake rejection on the
  node-control connect, discard the cached file + in-memory token, and
  re-mint via RelayCast::create_node (wired through a NodeTokenMinter)
  before retrying. Bounded to MAX_UNAUTHORIZED_BEFORE_GIVING_UP (5)
  consecutive 401s, after which a loud hard error is surfaced instead of
  spinning.
- Tests: load_node_token workspace/node/base_url mismatch + round-trip,
  legacy-cache reuse, and 401 connect-error detection.

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>

@coderabbitai coderabbitai Bot left a comment

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Caution

Some comments are outside the diff and can’t be posted inline due to platform limitations.

⚠️ Outside diff range comments (1)
crates/broker/src/node_control.rs (1)

734-777: 🎯 Functional Correctness | 🟠 Major | ⚡ Quick win

consecutive_unauthorized is not reset on non-401 outcomes, contradicting its documented contract.

The comment at Lines 653-656 states the counter is reset "on a successful mint (or any non-401 outcome)", but the only resets are in the Deregistered branch (Line 739) and after a successful re-mint (Line 750). A plain ControlRunResult::Disconnected — which includes a healthy session that later drops, or any non-401 connect failure (DNS/TLS/5xx) — falls through to Line 774 without resetting.

Consequently the count is not truly "consecutive": 401s separated by successful sessions accumulate, and once the running total exceeds MAX_UNAUTHORIZED_BEFORE_GIVING_UP the broker stops re-minting permanently and only logs the "delivery DISABLED" error on every subsequent reconnect, never recovering even if a fresh mint would now succeed.

🐛 Reset the counter on any non-401 result
         if matches!(result, ControlRunResult::Unauthorized) {
             // The engine rejected our current node token. Re-mint a fresh one
             // (bounded) rather than reconnecting forever on the rejected token.
             consecutive_unauthorized = consecutive_unauthorized.saturating_add(1);
@@
             }
+        } else {
+            // A non-401 outcome breaks the consecutive-401 chain.
+            consecutive_unauthorized = 0;
         }
         let _ = event_tx.send(FleetControlEvent::Disconnected).await;
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

In `@crates/broker/src/node_control.rs` around lines 734 - 777, The
consecutive_unauthorized counter is only being cleared for Deregistered and
successful remint, but the contract says it should reset on any non-401 outcome.
Update the node_control retry loop so ControlRunResult::Disconnected (and any
other non-Unauthorized result) also resets consecutive_unauthorized, ideally in
the main result-handling path around ControlRunResult and before the reconnect
sleep; keep the existing Unauthorized branch behavior unchanged.
♻️ Duplicate comments (1)
crates/broker/src/node_control.rs (1)

628-641: 🔒 Security & Privacy | 🟠 Major | ⚡ Quick win

node-token.json is still written with default permissions despite the prior fix being marked resolved.

persist_node_token writes a bearer credential for /v1/node/ws via fs::write, which creates the file with the process umask (commonly 0o644, i.e. world-readable). The 0o600 restriction from the earlier review is not present in the current code. NodeTokenMinter::remint re-persists through the same path, so re-minted tokens inherit the permissive mode too.

🔒 Restrict permissions on create and on existing files
-    fs::write(path, body)
-        .with_context(|| format!("failed to write node token file {}", path.display()))?;
+    let mut options = fs::OpenOptions::new();
+    options.create(true).truncate(true).write(true);
+    #[cfg(unix)]
+    {
+        use std::os::unix::fs::OpenOptionsExt;
+        options.mode(0o600);
+    }
+    {
+        use std::io::Write;
+        let mut file = options
+            .open(path)
+            .with_context(|| format!("failed to write node token file {}", path.display()))?;
+        file.write_all(body.as_bytes())
+            .with_context(|| format!("failed to write node token file {}", path.display()))?;
+    }
+    #[cfg(unix)]
+    {
+        use std::os::unix::fs::PermissionsExt;
+        fs::set_permissions(path, fs::Permissions::from_mode(0o600))
+            .with_context(|| format!("failed to restrict node token file {}", path.display()))?;
+    }
     Ok(())
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

In `@crates/broker/src/node_control.rs` around lines 628 - 641,
`persist_node_token` is still writing `node-token.json` with default umask-based
permissions, so the bearer token can be world-readable. Update the write path in
`persist_node_token` to create or rewrite the file with restrictive `0o600`
permissions, and make sure `NodeTokenMinter::remint` inherits the same behavior
since it calls this same persistence flow. If the file already exists,
explicitly tighten its mode after writing or use a permission-setting
create/write approach so both initial creation and re-persistence stay private.
🧹 Nitpick comments (1)
crates/broker/src/node_control.rs (1)

742-773: 📐 Maintainability & Code Quality | 🔵 Trivial | ⚡ Quick win

No test exercises the live re-mint / 401 recovery path.

All FleetControlConfig fixtures set token_minter: None, and the new unit tests cover only load_node_token/persist_node_token scoping and connect_error_is_unauthorized. The actual Unauthorizedremint() → reconnect loop in run_node_control_client (including the counter behavior above and the give-up branch) is untested. Given the PR notes a live last-mile test still pending, consider adding a mock-engine test that returns 401 then accepts after a re-mint.

Want me to draft a tokio::test that wires a token_minter, rejects the first handshake with 401, and asserts a fresh token is used on reconnect?

🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

In `@crates/broker/src/node_control.rs` around lines 742 - 773, The live
Unauthorized recovery path in run_node_control_client is untested, especially
the remint() retry loop and the give-up logging branch. Add a tokio::test that
uses a FleetControlConfig with a mock token_minter, forces an initial
401/ControlRunResult::Unauthorized from the engine, then verifies remint() is
called, config.node_token is replaced, the reconnect continues with the fresh
token, and the consecutive_unauthorized counter resets; also cover the branch
where repeated unauthorized responses exceed MAX_UNAUTHORIZED_BEFORE_GIVING_UP
and the error path is emitted.
🤖 Prompt for all review comments with AI agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

Outside diff comments:
In `@crates/broker/src/node_control.rs`:
- Around line 734-777: The consecutive_unauthorized counter is only being
cleared for Deregistered and successful remint, but the contract says it should
reset on any non-401 outcome. Update the node_control retry loop so
ControlRunResult::Disconnected (and any other non-Unauthorized result) also
resets consecutive_unauthorized, ideally in the main result-handling path around
ControlRunResult and before the reconnect sleep; keep the existing Unauthorized
branch behavior unchanged.

---

Duplicate comments:
In `@crates/broker/src/node_control.rs`:
- Around line 628-641: `persist_node_token` is still writing `node-token.json`
with default umask-based permissions, so the bearer token can be world-readable.
Update the write path in `persist_node_token` to create or rewrite the file with
restrictive `0o600` permissions, and make sure `NodeTokenMinter::remint`
inherits the same behavior since it calls this same persistence flow. If the
file already exists, explicitly tighten its mode after writing or use a
permission-setting create/write approach so both initial creation and
re-persistence stay private.

---

Nitpick comments:
In `@crates/broker/src/node_control.rs`:
- Around line 742-773: The live Unauthorized recovery path in
run_node_control_client is untested, especially the remint() retry loop and the
give-up logging branch. Add a tokio::test that uses a FleetControlConfig with a
mock token_minter, forces an initial 401/ControlRunResult::Unauthorized from the
engine, then verifies remint() is called, config.node_token is replaced, the
reconnect continues with the fresh token, and the consecutive_unauthorized
counter resets; also cover the branch where repeated unauthorized responses
exceed MAX_UNAUTHORIZED_BEFORE_GIVING_UP and the error path is emitted.

ℹ️ Review info
⚙️ Run configuration

Configuration used: Organization UI

Review profile: CHILL

Plan: Pro Plus

Run ID: fef5e896-e87e-4585-8c80-45dddecf044b

📥 Commits

Reviewing files that changed from the base of the PR and between e30f53c and 2215ecb.

📒 Files selected for processing (3)
  • CHANGELOG.md
  • crates/broker/src/node_control.rs
  • crates/broker/src/runtime/init.rs
✅ Files skipped from review due to trivial changes (1)
  • CHANGELOG.md
🚧 Files skipped from review as they are similar to previous changes (1)
  • crates/broker/src/runtime/init.rs

willwashburn and others added 4 commits June 26, 2026 14:50
The engine scopes relaycast nodes globally, so a host running brokers for
two different workspaces (each in its own working directory) collided on a
single global machine-id node, failing create_node and enrollment.

Derive the node id deterministically from (machine_seed, canonical_cwd) via
sha2, keeping it stable across restarts in the same directory but distinct
across directories. The machine-id file stays the per-machine seed; cwd is
read from current_dir (canonicalized when possible) with a seed-only
fallback when cwd is unreadable. derive_node_id is a pure testable fn.

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
…ent reply WARN

The engine replies to every node-control request (node.register,
inventory.sync) with a fresh snowflake reply id, since the broker sends
those frames without an id. The broker routed every reply frame to
complete_agent_registration, so these non-agent replies never matched a
pending agent registration and produced a spurious WARN
'agent.register reply did not match a pending registration id=<snowflake>'.

Resolve the pending registration by request id first, then fall back to
matching the validated reply data.name against a pending entry (robust
against an engine that drops/regenerates the id). Replies that resolve to
neither are treated as non-agent replies and logged at debug, not warn.

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>

@coderabbitai coderabbitai Bot left a comment

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 1

Caution

Some comments are outside the diff and can’t be posted inline due to platform limitations.

⚠️ Outside diff range comments (1)
crates/broker/src/node_control.rs (1)

549-612: 🗄️ Data Integrity & Integration | 🟠 Major | ⚡ Quick win

Propagate identity filesystem errors instead of silently changing the node id.

Line 550 treats every seed read failure like a missing file, which can rotate the machine seed on permission/UTF-8/transient I/O errors. Line 611 also collapses current_dir failures to "", reintroducing one node id per host. Since this function already returns Result, fail fast instead of minting a potentially colliding identity.

Proposed fix
 pub(crate) fn load_or_create_machine_seed(path: &Path) -> Result<String> {
-    if let Ok(existing) = fs::read_to_string(path) {
-        let existing = existing.trim();
-        if !existing.is_empty() {
-            return Ok(existing.to_string());
-        }
+    match fs::read_to_string(path) {
+        Ok(existing) => {
+            let existing = existing.trim();
+            if !existing.is_empty() {
+                return Ok(existing.to_string());
+            }
+        }
+        Err(error) if error.kind() == std::io::ErrorKind::NotFound => {}
+        Err(error) => {
+            return Err(error)
+                .with_context(|| format!("failed to read node id file {}", path.display()));
+        }
     }
@@
 pub(crate) fn load_or_create_node_id(seed_path: &Path) -> Result<String> {
     let seed = load_or_create_machine_seed(seed_path)?;
-    let cwd = std::env::current_dir()
-        .map(|path| {
-            std::fs::canonicalize(&path)
-                .unwrap_or(path)
-                .to_string_lossy()
-                .into_owned()
-        })
-        .unwrap_or_default();
+    let cwd_path = std::env::current_dir()
+        .context("failed to read current working directory for node id")?;
+    let cwd = std::fs::canonicalize(&cwd_path)
+        .unwrap_or(cwd_path)
+        .to_string_lossy()
+        .into_owned();
     Ok(derive_node_id(&seed, &cwd))
 }
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

In `@crates/broker/src/node_control.rs` around lines 549 - 612,
load_or_create_machine_seed and load_or_create_node_id are swallowing real
filesystem/environment errors and can silently change or collapse the node
identity. Update load_or_create_machine_seed to only generate a new seed when
the file is truly absent, and propagate read/parse/UTF-8/I/O failures instead of
treating them as empty. In load_or_create_node_id, avoid defaulting current_dir
failures to an empty string; return the error so callers can fail fast rather
than deriving a shared host-wide node id. Keep the existing derive_node_id
behavior, but only call it after successfully resolving both seed and cwd.
🤖 Prompt for all review comments with AI agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

Inline comments:
In `@crates/broker/src/node_control.rs`:
- Around line 1121-1128: The fallback in complete_agent_registration currently
matches pending_agent_registrations by name using a linear scan, which can
non-deterministically pick the wrong pending request when names are duplicated.
Update the correlation logic to use a stable unique identifier already available
in the flow, such as request_id or a propagated agent_id from the registration
reply, and remove the lookup-by-name fallback. If name must remain part of the
logic, enforce uniqueness before inserting into pending_agent_registrations so
the request maps to the correct oneshot channel every time.

---

Outside diff comments:
In `@crates/broker/src/node_control.rs`:
- Around line 549-612: load_or_create_machine_seed and load_or_create_node_id
are swallowing real filesystem/environment errors and can silently change or
collapse the node identity. Update load_or_create_machine_seed to only generate
a new seed when the file is truly absent, and propagate read/parse/UTF-8/I/O
failures instead of treating them as empty. In load_or_create_node_id, avoid
defaulting current_dir failures to an empty string; return the error so callers
can fail fast rather than deriving a shared host-wide node id. Keep the existing
derive_node_id behavior, but only call it after successfully resolving both seed
and cwd.
🪄 Autofix (Beta)

Fix all unresolved CodeRabbit comments on this PR:

  • Push a commit to this branch (recommended)
  • Create a new PR with the fixes

ℹ️ Review info
⚙️ Run configuration

Configuration used: Organization UI

Review profile: CHILL

Plan: Pro Plus

Run ID: d7289b04-4b75-434e-a338-d6b2a9a33762

📥 Commits

Reviewing files that changed from the base of the PR and between 2215ecb and 4c85611.

📒 Files selected for processing (2)
  • CHANGELOG.md
  • crates/broker/src/node_control.rs
✅ Files skipped from review due to trivial changes (1)
  • CHANGELOG.md

Comment on lines +1121 to +1128
let name = data.as_ref()?.name.as_deref()?;
let key = pending_agent_registrations
.iter()
.find(|(_, pending)| pending.name == name)
.map(|(key, _)| key.clone())?;
pending_agent_registrations
.remove(&key)
.map(|pending| (key, pending))

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🗄️ Data Integrity & Integration | 🟡 Minor

🧩 Analysis chain

🏁 Script executed:

#!/bin/bash
# Verify whether RegisterAgent insertion enforces unique pending names before complete_agent_registration's name fallback.
rg -n -C4 'pending_agent_registrations\.(insert|entry)|FleetControlCommand::RegisterAgent|PendingAgentRegistration' crates/broker/src/node_control.rs crates/broker/src/runtime

Repository: AgentWorkforce/relay

Length of output: 11941


Fix nondeterministic correlation of agent registration replies.

The code fallback in complete_agent_registration searches pending_agent_registrations by name via linear iteration. Since the map is keyed by request_id (inserted at line 1010) and does not enforce unique name values, concurrent registrations for multiple agents with the same name will cause the fallback to arbitrarily pick the first match. This risks sending the registration response to the wrong oneshot channel.

Ensure the correlation logic uses a deterministic key (e.g., propagate a unique agent_id or request_id in the reply) or enforce a uniqueness constraint on name before insertion.

Current logic at lines 1121-1128
            let name = data.as_ref()?.name.as_deref()?;
            let key = pending_agent_registrations
                .iter()
                .find(|(_, pending)| pending.name == name)
                .map(|(key, _)| key.clone())?;
            pending_agent_registrations
                .remove(&key)
                .map(|pending| (key, pending))
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

In `@crates/broker/src/node_control.rs` around lines 1121 - 1128, The fallback in
complete_agent_registration currently matches pending_agent_registrations by
name using a linear scan, which can non-deterministically pick the wrong pending
request when names are duplicated. Update the correlation logic to use a stable
unique identifier already available in the flow, such as request_id or a
propagated agent_id from the registration reply, and remove the lookup-by-name
fallback. If name must remain part of the logic, enforce uniqueness before
inserting into pending_agent_registrations so the request maps to the correct
oneshot channel every time.

willwashburn and others added 5 commits June 26, 2026 15:46
…on ref, bound re-mint loop

Three node-only delivery fixes:

- Node token cache is now scoped per node_id (node-tokens/{node_id}.json,
  filename-sanitized) instead of one host-wide file, so two brokers in
  different cwds on a host no longer overwrite each other's token.
- node action.invoke spawns forward invocation_id and the harness session
  ref into agent.register (was hardcoded None,None), restoring invocation
  correlation and session resume. HTTP /api/spawn derives session ref from
  its spec too.
- node-control re-mint loop only resets the consecutive-401 counter when a
  connection actually establishes (not on a successful mint) and drops the
  post-mint `continue` so retries honor the reconnect backoff, making the
  give-up cap reachable and stopping a tight POST /v1/nodes loop.

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
Operator-enrolled / fleet nodes pin their node id (via the machine-id
file) to match an engine-issued node token. The cwd-hash derivation broke
that: the broker registered a derived id, so the engine rejected
node.register with node_id_mismatch and the node never came online
(two-node fleet E2E timed out on online+handlers_live). Only derive when
auto-minting (no RELAY_NODE_TOKEN); otherwise use the pinned id verbatim.

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
The broker's pre-sidecar bootstrap node.register advertised a generic
capability literally named "spawn". The relaycast engine does not treat
bare "spawn" as a placement capability (only spawn:* is), so its
ensureCapabilityActions materialized a regular `spawn` ACTION pinned to
whichever node bootstrapped first. From then on every spawn invoke
resolved that action and was dispatched to the bootstrapping node,
short-circuiting capability-based spawn placement for the whole
workspace — cli/target_node/least-loaded routing were all ignored.

This regressed the two-node fleet E2E: spawn:codex/spawn:claude landed on
the wrong node, target_node placement-mismatch returned 201 instead of
409, and least-loaded scheduling misrouted.

The bootstrap descriptor now carries no capabilities; the node's real
spawn:*/action capabilities arrive on the sidecar's node.register. The
node is still registered (online) before the sidecar connects, but claims
no handler until the authoritative capability set is reported.

Extracts the manifest into `bootstrap_node_manifest` with a unit test
asserting the empty capability set.

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
A `spawn:<harness>` node action registered by a sidecar was being run by
the broker directly with the literal `cli` from the action input
(`handle_fleet_action_spawn`), launching the real CLI instead of the
node's declared harness. In the fleet/sidecar model the sidecar owns the
harness: its `spawn(<harness>)` handler resolves the declared harness
spec and calls `ctx.spawnAgent` (-> `spawn_agent` -> handle_fleet_spawn_agent).

Route `spawn:*` to the sidecar's registered handler (same path as
echo/work) whenever the sidecar declared a handler for that capability,
gated by a new `HandlerDispatchState::has_handler`. The broker-direct
raw-`cli` spawn is reserved for the direct / no-sidecar path where no
sidecar handler is registered.

Fixes the fleet-E2E least-loaded scheduling flake: heavyweight real
processes (instead of the lightweight stub PTY) lingered/exited and
triggered broker re-init, collapsing per-node active_agents.

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
@willwashburn willwashburn merged commit b8dfd7d into main Jun 26, 2026
40 checks passed
@willwashburn willwashburn deleted the feat/broker-node-delivery branch June 26, 2026 23:07
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant