feat(broker): node-only delivery for relaycast v5.0.1#1201
Conversation
Bump crates/broker to relaycast =5.0.1 (#214, increment 1 of the broker node-only delivery migration). - Remove the workspace-stream toggle (ensure_workspace_stream_enabled and its RelaycastWsClient::run call); RelayCast::workspace_stream_set is gone in v5. - Add node-frame fields the v5.0.1 engine sends (structs are deny_unknown_fields, so missing fields drop the frame): Deliver gains agent_id and delivery_id; ActionInvoke gains optional agent_id and agent_name. - Extract the AgentReleaseRequested / AgentSpawnRequested firehose match arms into reusable release_worker_locally / spawn_worker_from_request async fns and drop the arms (those WsEvent variants no longer exist in v5.0.1). Increment 2 will call these from action.invoke. - Point register-flow test mocks at the new /v1/agents endpoint with the CreateAgentResponse body (v5.0.1 register_agent_token registers via /v1/agents instead of /v1/agents/spawn). - Add the new fields to fleet-wire deliver/action.invoke fixtures. cargo build and cargo test -p agent-relay-broker both green. Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
… by ws_type In relaycast v5.0.1 WsEvent ends in #[serde(other)] Unknown, so an agent.spawn_requested frame deserializes to Ok(WsEvent::Unknown), not Err. The firehose handler gated its raw-JSON spawn fallback (and a deser-warning) on from_value::<WsEvent>(..).is_ok(), which is now always true — making both paths dead code, contrary to the prior "preserved untouched" claim. Node control owns spawn/release via action.invoke (the extracted spawn_worker_from_request / release_worker_locally helpers); the workspace firehose no longer drives these events. Replace the meaningless is_ok() gate with an explicit ws_type match that ignores agent.spawn_requested / agent.release_requested (already deduped) instead of letting them fall through to map_ws_event. Remove the dead fallback, dead warning, and now-unused local bindings. Add regression tests pinning that both control frames decode to WsEvent::Unknown so future dispatch must classify by ws_type. Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
…/v1/node/ws
Increment 2 of the broker node-only delivery migration (relaycast v5.0.1). The
broker now enrolls as a relaycast node, binds its agents through node control,
and delivers/injects solely over /v1/node/ws.
- NODE-TOKEN BOOTSTRAP (init.rs): resolve_node_token prefers RELAY_NODE_TOKEN,
then a token cached for this exact node id, otherwise mints one via
RelayCast::create_node (kind=ws, role=broker) with the workspace key and
persists it next to the node id (scoped to node_id so id rotation
invalidates it). node_control gains load/persist_node_token helpers.
- UNCONDITIONAL node.register (init.rs): push FleetControlCommand::RegisterNode
with a broker self-manifest (spawn capability) right after spawning the
node-control client, so the broker enrolls every startup regardless of any
sidecar.
- BIND EVERY SPAWNED AGENT via node-control agent.register: extracted
register_node_agent_token; both /api/spawn (api.rs) and the Inc1
spawn_worker_from_request now mint the agent token over node control (HTTP
pre-registration only as fallback when node binding is unavailable). The
minted token injects RELAY_AGENT_TOKEN + RELAY_SKIP_BOOTSTRAP via snippets,
so the worker MCP never re-registers over HTTP.
- INBOUND NODE FRAMES (fleet.rs): handle_fleet_deliver now uses the real
delivery_id (no longer derived from msg_id) and branches on payload.type —
message.created/thread.reply/dm.received/group_dm.received (and legacy empty
type) inject into the worker PTY; message.reacted/message.read are acked with
a tracing log only (PTY surfacing deferred); unknown types are acked without
surfacing. action.invoke routes spawn/spawn:* and release to the Inc1 spawn/
release fns, replying action.result {output} on success or {error} on
failure.
- fleet_mode_enabled flips on FleetControlEvent::Connected (and is not cleared
on disconnect) so workspace-firehose delivery is suppressed once node
delivery is live, avoiding double-delivery while honoring at-least-once
resume.
Runtime delivery cannot be exercised without a live engine; added unit tests
for the delivery-classification and action.invoke identity/field helpers.
cargo build and cargo test -p agent-relay-broker both green (779 passing).
Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
…lias classification
Node /v1/node/ws deliver frames nest the message under payload.data per
relaycast 5.0.1 normalize_node_deliver (data.text/channel_name/agent_name/
from_name/thread_id). fleet_relay_delivery read only flat /text,/from,/channel
paths, so every node-delivered message injected the raw JSON blob attributed to
"relaycast". Extract from the data envelope (data.* first, legacy flat paths as
fallback) via a testable fleet_delivery_fields helper.
classify_fleet_delivery only injected message.created|thread.reply|dm.received|
group_dm.received; the engine may emit any relaycast parse_inbound_kind alias
(message.received/new/sent/delivered, dm.created/new/sent/message.created,
direct_message.*, thread.message.created/sent, group_dm.*). Those were
acked-and-dropped (permanent loss under at-least-once). Widen the Inject arm to
the full alias set. Update the deliver.json fixture to the real {type,data}
shape.
Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
…ery) Inc3 of the broker node-only delivery migration. Message delivery now flows solely over /v1/node/ws via handle_fleet_deliver, so the workspace firehose delivery path in the broker fleet runtime is dead. - Drop the fleet_mode_enabled field and all its assignments; its only reader was the firehose drop in handle_relaycast_message. - Reduce handle_relaycast_message to log-and-discard. The map_ws_event injection block, self-echo filtering, DM resolution, dashboard rebroadcast, and the fleet-mode drop are all gone. - Remove now-dead firehose-only helpers and their tests: relaycast_ws_control_dedup_key; routing is_self_echo, resolve_delivery_targets, worker_names_for_dm_participants, display_target_for_dashboard, DeliveryPlan; queue_and_try_delivery; WorkerRegistry::has_any_worker / has_worker_by_name_ignoring_case; the unused dm_participants_cache runtime field. - Rewrite routing tests to cover the surviving worker_names_for_channel_delivery / worker_names_for_direct_target (sender exclusion, case-insensitivity, workspace-id filtering). Deliberately left intact: RelaycastWsClient::run and map_ws_event are still used by `agent-relay-broker wrap` (single-agent PTY mode), which legitimately consumes the workspace firehose. Spawn/release stay owned by node control (spawn_worker_from_request / release_worker_locally). No observer/observer-token path added (separate follow-up). cargo build and clippy clean; 781 broker tests pass. Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
Apply targeted adversarial-review fixes to the broker's node-only delivery migration: - HTTP-register spawn fallback now binds the agent to the broker's node via SDK bind_agent_to_node so it becomes via_node (the only kind the engine delivers to); a failed bind emits a loud registration_warning. A missing node token is logged as a hard fault, not a quiet warning. - seq:0 fan-out frames are no longer dropped: special-cased in FleetDeliveryBook::observe/commit to always surface-and-ack with msg_id dedup; action.completed/action.failed/action.denied route to Inject (delivered to the caller), message.reacted/message.read stay ack-only (PTY surfacing deferred). - Remove deny_unknown_fields from inbound Deliver/ActionInvoke so a future engine field no longer drops the frame without an ack (infinite redelivery). Outbound frames keep it. - Bound AgentDeliveryCursor.seen_msg_ids with a FIFO cap (512). - release action.result is now faithful: genuinely-unknown worker returns an error; already-exited worker still reports success. Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
In v5.0.1 node-only delivery the /v1/ws workspace-stream WebSocket is observer-only and rejects the broker's workspace key with HTTP 401. The broker kept opening it anyway, 401-looping every 3s and burning reconnects even though delivery already flows over /v1/node/ws. The earlier teardown removed firehose message handling but left the connection itself. MultiWorkspaceSession::new no longer spawns the workspace-stream WS task; it drains the WsControl channel to a no-op and keeps the inbound channel open as an inert empty source (kept alive by a sender clone so the wrap action consumer and runtime no-op handler never busy-loop on a closed receiver). RelaycastWsClient (the 401-looping run loop) is deleted. The workspace HTTP client, WsControl plumbing, and ws_control_tx senders are all kept intact. Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
|
Note Reviews pausedIt looks like this branch is under active development. To avoid overwhelming you with review comments due to an influx of new commits, CodeRabbit has automatically paused this review. You can configure this behavior by changing the Use the following commands to manage reviews:
Use the checkboxes below for quick actions:
📝 WalkthroughWalkthroughThe broker now uses node-scoped token minting and recovery, node-only delivery surfacing with ChangesBroker node-only delivery
Estimated code review effort🎯 5 (Critical) | ⏱️ ~90+ minutes Possibly related issues
Possibly related PRs
Suggested reviewers
Poem
🚥 Pre-merge checks | ✅ 5✅ Passed checks (5 passed)
✨ Finishing Touches🧪 Generate unit tests (beta)
Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. Comment |
There was a problem hiding this comment.
Code Review
This pull request transitions the broker to node-only delivery by upgrading to relaycast v5.0.1, removing the dead workspace-stream firehose delivery path, and routing message delivery solely over /v1/node/ws. It introduces node enrollment, agent-to-node binding, support for seq:0 fan-out frames, and bounded seen message ID tracking. Feedback on these changes highlights two key issues: first, receiving a delivery for an unknown or released agent will fail injection and withhold the acknowledgment, leading to infinite redelivery retries; second, extracting non-string action result outputs or errors via first_string fails and falls back to stringifying the entire envelope, which can be improved by specifically stringifying the /data/output or /data/error pointers first.
Important
The consumer version of Gemini Code Assist on GitHub is being sunset. Starting June 18, 2026, new organization installations will be blocked, and all code review activity will officially cease on July 17, 2026.
For more details on the timeline and next steps, please review the Help Documentation.
| FleetDeliverySurfacing::Inject => { | ||
| let relay_delivery = self.fleet_relay_delivery(deliver); | ||
| self.workers.deliver(&deliver.agent, relay_delivery).await | ||
| } |
There was a problem hiding this comment.
If a delivery is received for an agent that has already been released or does not exist in the registry, self.workers.deliver will fail. Under the current implementation, any failure in surface_fleet_deliver causes the broker to withhold the acknowledgment (delivery.ack), leading to infinite redelivery retries by the engine.
To prevent this, we should check if the agent exists in self.workers.workers before attempting delivery. If the agent is missing, we should log a warning and return Ok(()) so the message is committed and acknowledged rather than retried indefinitely.
FleetDeliverySurfacing::Inject => {
if !self.workers.workers.contains_key(&deliver.agent) {
tracing::warn!(
target = "relay_broker::fleet",
agent = %deliver.agent,
delivery_id = %deliver.delivery_id,
"received delivery for unknown or released agent; acking to prevent infinite redelivery"
);
return Ok(());
}
let relay_delivery = self.fleet_relay_delivery(deliver);
self.workers.deliver(&deliver.agent, relay_delivery).await
}| &[ | ||
| "/data/text", | ||
| "/text", | ||
| "/body", | ||
| "/content", | ||
| "/message/text", | ||
| "/payload/text", | ||
| // Action-result fan-out (action.completed/failed/denied) carries the | ||
| // result under data.output / data.error rather than a text field. | ||
| "/data/output", | ||
| "/data/error", | ||
| ], | ||
| ) | ||
| .unwrap_or_else(|| payload.to_string()); | ||
| let from = first_string( | ||
| payload, |
There was a problem hiding this comment.
When extracting fields from a node deliver payload, if the payload represents an action result (such as action.completed or action.failed), the result/output is typically carried under data.output or data.error as a JSON object or other non-string type.
Currently, first_string only extracts string values, so it will return None for non-string outputs. The fallback unwrap_or_else(|| payload.to_string()) then stringifies the entire deliver frame (including all envelope metadata), which is injected into the agent's PTY.
We can improve this by specifically stringifying /data/output, /data/error, or /data if they exist, before falling back to the entire payload.
let body = first_string(
payload,
&[
"/data/text",
"/text",
"/body",
"/content",
"/message/text",
"/payload/text",
"/data/output",
"/data/error",
],
)
.unwrap_or_else(|| {
payload
.pointer("/data/output")
.or_else(|| payload.pointer("/data/error"))
.or_else(|| payload.pointer("/data"))
.map(|v| v.to_string())
.unwrap_or_else(|| payload.to_string())
});There was a problem hiding this comment.
💡 Codex Review
Here are some automated review suggestions for this pull request.
Reviewed commit: 63037a3ae4
ℹ️ About Codex in GitHub
Your team has set up Codex to review pull requests in this repo. Reviews are triggered when you
- Open a pull request for review
- Mark a draft as ready
- Comment "@codex review".
If Codex has suggestions, it will comment; otherwise it will react with 👍.
Codex can also answer questions or update the PR. Try commenting "@codex address that feedback".
| let cli = match action_invoke_string(&invoke.input, &["cli", "command", "provider"]) { | ||
| Some(cli) => cli, | ||
| None => { | ||
| self.reply_action_error(&invoke.invocation_id, "spawn_missing_cli") | ||
| .await; |
There was a problem hiding this comment.
Derive the CLI from spawn action suffix
For node action.invoke spawns where the engine encodes the requested harness in the action name (for example the added fixture uses "action": "spawn:codex" with no input.cli), this branch returns spawn_missing_cli before spawning anything. Since the comment above says spawn:<harness> is a supported form, the handler needs to fall back to the suffix (codex here) before rejecting the invoke.
Useful? React with 👍 / 👎.
| Ok(token) => { | ||
| tracing::info!( | ||
| worker = %name, | ||
| error = %error, | ||
| "continuing spawn without pre-registration after retries exhausted" | ||
| "bound agent to node via agent.register for HTTP spawn" | ||
| ); | ||
| preregistration_warning = Some(message); | ||
| None | ||
| Some(token.token) |
There was a problem hiding this comment.
Seed node-minted agent tokens locally
When /api/spawn succeeds through the new node-control agent.register path, this branch only passes the minted token to the worker and never seeds RelaycastHttpClient's registration cache. After that worker confirms a delivered Relaycast message, mark_delivery_read_ack calls mark_read_as_agent, which will see no cached token and re-register the same agent over the HTTP /v1/agents path; in the node-only model that HTTP registration is not node-bound and can rotate the identity away from the working via_node registration. Seed the returned token the same way the supplied-token path does, and apply the same fix to the duplicated node-register spawn paths.
Useful? React with 👍 / 👎.
There was a problem hiding this comment.
Actionable comments posted: 9
🧹 Nitpick comments (2)
CHANGELOG.md (1)
21-23: 📐 Maintainability & Code Quality | 🔵 Trivial | ⚡ Quick winCondense these changelog bullets to user-facing outcomes.
These entries read like release notes plus implementation notes (
agent.register,registration_warning,seq:0, PTY surfacing). Please shorten them to one impact-first bullet per visible behavior change.Suggested rewrite
-- `agent-relay-broker` node-only delivery: agents spawned via the HTTP-register fallback (when node-control `agent.register` is unavailable) are now bound to the broker's node so the engine delivers to them; a failed bind surfaces a loud `registration_warning` instead of silently producing an undeliverable agent. A missing node token is now logged as a hard fault (realtime delivery disabled) rather than a quiet warning. -- `agent-relay-broker` node-only delivery: `seq:0` fan-out frames (reactions, read receipts, and `action.completed`/`action.failed`/`action.denied` results) are no longer dropped — action results are injected into the calling agent's PTY; reaction/read receipts are acked (PTY surfacing deferred). Inbound `deliver`/`action.invoke` frames tolerate unknown future engine fields instead of being dropped without an ack (which caused infinite redelivery), and the broker's per-agent delivery-dedup memory is now bounded. -- `agent-relay-broker` node `release` action reports a faithful `action.result`: a genuinely unknown worker returns an error while an already-exited worker still reports success. +- `agent-relay-broker` no longer leaves fallback-spawned agents unreachable, and missing node tokens now fail loudly when realtime delivery is unavailable. +- `agent-relay-broker` now delivers node fan-out action results correctly, acknowledges receipt/read frames without redelivery loops, and bounds delivery dedup state. +- `agent-relay-broker` `release` actions now distinguish unknown workers from already-exited ones.As per coding guidelines,
CHANGELOG.md: "Changelog entries should be concise and impact-first, with one short bullet per user-visible change. Omit issue/PR links, internal notes, and implementation details."🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the rest with a brief reason, keep changes minimal, and validate. In `@CHANGELOG.md` around lines 21 - 23, Condense the three changelog bullets into one short, user-facing bullet per behavior change, removing implementation details like agent.register, registration_warning, seq:0, PTY surfacing, and other internal mechanics. Rephrase the entries around visible outcomes only: reliable delivery to newly registered agents, fan-out results/reactions being handled correctly instead of dropped, and release actions reporting accurate success/error results. Keep the wording concise and impact-first in CHANGELOG.md.Source: Coding guidelines
crates/broker/src/fleet_wire.rs (1)
895-915: 📐 Maintainability & Code Quality | 🔵 Trivial | ⚡ Quick winPin the new top-level forward-compatibility behavior.
This test still round-trips the exact serialized shape. Since the core change is accepting future top-level fields, add an extra unknown field before deserialization so
deny_unknown_fieldscannot be reintroduced unnoticed.🧪 Proposed regression guard
- let value: Value = serde_json::to_value(&msg).unwrap(); + let mut value: Value = serde_json::to_value(&msg).unwrap(); assert_eq!(value["payload"]["metadata"]["channel"], "general"); + value["engine_future_field"] = json!({ "ignored": true }); let decoded: RelaycastToBroker = serde_json::from_value(value).unwrap(); assert_eq!(decoded, msg);🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the rest with a brief reason, keep changes minimal, and validate. In `@crates/broker/src/fleet_wire.rs` around lines 895 - 915, The regression test in deliver_accepts_open_payloads only verifies a normal round-trip and does not lock in forward-compatibility for unknown top-level fields. Update this test to add an extra unknown field to the serialized Value before calling serde_json::from_value, so RelaycastToBroker/Deliver continues to accept future top-level fields and reintroducing deny_unknown_fields will fail the test.
🤖 Prompt for all review comments with AI agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
Inline comments:
In `@crates/broker/src/node_control.rs`:
- Around line 466-484: The cached node token in
`load_node_token`/`PersistedNodeToken` is only scoped by `node_id`, so it can be
reused after switching Relaycast bases or engines. Update `PersistedNodeToken`
and the token persistence/loading flow in `node_control.rs` to also store the
relay base or node WS URL, and have `load_node_token` validate that value
alongside `node_id` before returning the token. Make sure the code that writes
tokens and the callers that read them both use the same identifying symbol for
the base URL so stale tokens are rejected when the host changes.
- Around line 490-501: The node token persistence in persist_node_token
currently uses a plain write path that can leave node-token.json with overly
permissive permissions. Update persist_node_token in node_control.rs to
create/write the file through OpenOptions, set restrictive Unix permissions to
0o600 at creation time, and ensure existing files are also chmod’d back to 0o600
after writing; keep the existing serialization and error context intact while
making the permission handling part of the write flow.
In `@crates/broker/src/runtime/api.rs`:
- Around line 127-132: The HTTP spawn path in api::spawn currently keeps only
token.token after agent.register, but it also needs to retain token.agent_id so
failed spawns can clean up the node-bound agent. Update the spawn flow to store
the agent_id alongside the token, and in the workers.spawn(...) error branch use
that agent_id to deregister/prune the agent before returning the error. Focus
the change around the agent.register handling and the workers.spawn failure
path.
In `@crates/broker/src/runtime/fleet.rs`:
- Around line 388-391: In FleetDeliverySurfacing::Inject, missing workers are
treated like retryable delivery failures, which causes handle_fleet_deliver to
withhold the ack and loop forever. Update the inject path in fleet.rs to detect
a permanently missing worker before or after
self.workers.deliver(&deliver.agent, relay_delivery) by checking WorkerRegistry
state or matching the unknown worker error from send_to_worker/deliver, and
convert that case into the terminal AckUnknown behavior so the ack advances
immediately.
- Around line 548-562: The node-action spawn flow in handle_fleet_action_spawn
updates the worker set but never registers the new agent in fleet_inventory or
syncs the engine state. After spawn_worker_from_request succeeds and
self.workers.workers contains the new name, insert the agent into
self.fleet_inventory and immediately call self.publish_fleet_inventory().await,
matching the behavior already used in handle_fleet_spawn_agent so spawned agents
are visible and later prune_fleet_agent_state can find them.
In `@crates/broker/src/runtime/init.rs`:
- Around line 245-263: The node-token resolution path in init() only logs a
fatal configuration message but still continues startup with no token, leaving
the broker ready while unreachable. Update the node-only bootstrap flow around
node_token handling and the node-control client initialization to fail fast when
the token is missing: stop before API readiness, return an error or exit from
the init path, and ensure the control client is never started with None. Apply
the same behavior to the corresponding fallback branch referenced by the
duplicate block so both paths enforce the same hard-fail semantics.
In `@crates/broker/src/runtime/relaycast_events.rs`:
- Around line 421-423: The action spawn flow is dropping the warning returned by
bind_http_registered_agent_to_node, so handle_fleet_action_spawn can treat an
undeliverable node binding as success. Update the relaycast_events spawn path to
inspect the return from bind_http_registered_agent_to_node and, when it yields
Some(warning), stop the spawn or return an error/outcome that lets the caller
emit action.result with error. Use handle_fleet_action_spawn and
bind_http_registered_agent_to_node as the key points to wire this failure
through instead of unconditionally continuing with Some(token).
- Around line 392-398: The local spawn flow in relaycast_events keeps only
`token.token` after a successful `agent.register`, so failed
`workers.spawn(...)` calls cannot clean up the relaycast registration. Update
the spawn path around the `agent.register` handling to retain `token.agent_id`
alongside the token, then in the `workers.spawn` error branch use that saved
agent id to deregister or prune the node-bound agent. Apply the same fix in both
affected spawn blocks so failed local spawns do not leave stale registrations.
- Around line 211-230: The unknown-worker duplicate release path in
relaycast_events should mirror the normal success cleanup before returning
Released. In the Err(error) branch of the release handling logic, after
is_unknown_worker_error_message(...) succeeds, make sure the same cleanup
performed on the success path runs for pending deliveries, pending requests,
delivery state, and result tokens, so stale retry/timeout state is cleared for
already-exited workers. Use the existing release handling flow and related state
helpers around ReleaseOutcome::Released to locate and reuse the success-path
cleanup rather than duplicating ad hoc logic.
---
Nitpick comments:
In `@CHANGELOG.md`:
- Around line 21-23: Condense the three changelog bullets into one short,
user-facing bullet per behavior change, removing implementation details like
agent.register, registration_warning, seq:0, PTY surfacing, and other internal
mechanics. Rephrase the entries around visible outcomes only: reliable delivery
to newly registered agents, fan-out results/reactions being handled correctly
instead of dropped, and release actions reporting accurate success/error
results. Keep the wording concise and impact-first in CHANGELOG.md.
In `@crates/broker/src/fleet_wire.rs`:
- Around line 895-915: The regression test in deliver_accepts_open_payloads only
verifies a normal round-trip and does not lock in forward-compatibility for
unknown top-level fields. Update this test to add an extra unknown field to the
serialized Value before calling serde_json::from_value, so
RelaycastToBroker/Deliver continues to accept future top-level fields and
reintroducing deny_unknown_fields will fail the test.
🪄 Autofix (Beta)
Fix all unresolved CodeRabbit comments on this PR:
- Push a commit to this branch (recommended)
- Create a new PR with the fixes
ℹ️ Review info
⚙️ Run configuration
Configuration used: Organization UI
Review profile: CHILL
Plan: Pro Plus
Run ID: 3fe7a9d0-e176-4c73-9618-186f8a9604af
⛔ Files ignored due to path filters (1)
Cargo.lockis excluded by!**/*.lock
📒 Files selected for processing (32)
.agentworkforce/trajectories/completed/2026-06/traj_2mv3mylgd1vh/summary.md.agentworkforce/trajectories/completed/2026-06/traj_2mv3mylgd1vh/trajectory.json.agentworkforce/trajectories/completed/2026-06/traj_4yeya7a3y7c6.trace.json.agentworkforce/trajectories/completed/2026-06/traj_4yeya7a3y7c6/summary.md.agentworkforce/trajectories/completed/2026-06/traj_4yeya7a3y7c6/trajectory.json.agentworkforce/trajectories/completed/2026-06/traj_8ogwhkndee1j/summary.md.agentworkforce/trajectories/completed/2026-06/traj_8ogwhkndee1j/trajectory.json.agentworkforce/trajectories/completed/2026-06/traj_af15b3zk02a6/summary.md.agentworkforce/trajectories/completed/2026-06/traj_af15b3zk02a6/trajectory.json.agentworkforce/trajectories/completed/2026-06/traj_nda08hijihz5.trace.json.agentworkforce/trajectories/completed/2026-06/traj_nda08hijihz5/summary.md.agentworkforce/trajectories/completed/2026-06/traj_nda08hijihz5/trajectory.jsonCHANGELOG.mdcrates/broker/Cargo.tomlcrates/broker/src/cli_mcp_args.rscrates/broker/src/fleet_wire.rscrates/broker/src/node_control.rscrates/broker/src/relaycast/workspace.rscrates/broker/src/relaycast/ws.rscrates/broker/src/routing.rscrates/broker/src/runtime/api.rscrates/broker/src/runtime/delivery.rscrates/broker/src/runtime/event_loop.rscrates/broker/src/runtime/fleet.rscrates/broker/src/runtime/init.rscrates/broker/src/runtime/messages.rscrates/broker/src/runtime/mod.rscrates/broker/src/runtime/relaycast_events.rscrates/broker/src/runtime/tests.rscrates/broker/src/worker.rscrates/broker/tests/fixtures/fleet-wire/action.invoke.jsoncrates/broker/tests/fixtures/fleet-wire/deliver.json
💤 Files with no reviewable changes (3)
- crates/broker/src/runtime/delivery.rs
- crates/broker/src/runtime/messages.rs
- crates/broker/src/worker.rs
| Ok(token) => { | ||
| tracing::info!( | ||
| worker = %name, | ||
| error = %error, | ||
| "continuing spawn without pre-registration after retries exhausted" | ||
| "bound agent to node via agent.register for HTTP spawn" | ||
| ); | ||
| preregistration_warning = Some(message); | ||
| None | ||
| Some(token.token) |
There was a problem hiding this comment.
🗄️ Data Integrity & Integration | 🟠 Major | 🏗️ Heavy lift
Clean up node-registered agents when HTTP spawn fails.
This path registers/binds the agent before workers.spawn(...), but it discards token.agent_id. If the later spawn fails, the broker replies with an error while relaycast still has a node-bound agent that cannot be served locally. Keep the agent_id and deregister/prune it in the spawn error branch.
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
In `@crates/broker/src/runtime/api.rs` around lines 127 - 132, The HTTP spawn path
in api::spawn currently keeps only token.token after agent.register, but it also
needs to retain token.agent_id so failed spawns can clean up the node-bound
agent. Update the spawn flow to store the agent_id alongside the token, and in
the workers.spawn(...) error branch use that agent_id to deregister/prune the
agent before returning the error. Focus the change around the agent.register
handling and the workers.spawn failure path.
| FleetDeliverySurfacing::Inject => { | ||
| let relay_delivery = self.fleet_relay_delivery(deliver); | ||
| self.workers.deliver(&deliver.agent, relay_delivery).await | ||
| } |
There was a problem hiding this comment.
🩺 Stability & Availability | 🟠 Major
🧩 Analysis chain
🏁 Script executed:
#!/bin/bash
rg -nP -C6 'fn send_to_worker' crates/broker/src/worker.rs
rg -nP -C3 'unknown worker|has_worker|workers\.get' crates/broker/src/worker.rsRepository: AgentWorkforce/relay
Length of output: 2130
Ack missing workers immediately to prevent infinite redelivery loops.
When FleetDeliverySurfacing::Inject is processed and the target worker is missing (e.g., released between enqueue and delivery), self.workers.deliver fails. This error propagates to handle_fleet_deliver, which withholds the ack (Line 358), causing the engine to retry the same sequence indefinitely because the delivery book never advances.
WorkerRegistry::send_to_worker explicitly returns an error with the context unknown worker '{name}' when the target is missing (crates/broker/src/worker.rs, line 810). Distinguish this permanent failure from transient channel errors:
- Check
self.workers.has_worker(&deliver.agent)before attempting delivery, or - Pattern match the error from
deliverto treat "unknown worker" as a terminal failure (equivalent toAckUnknown), releasing the ack immediately.
This prevents the broker from stalling on redelivery attempts for workers that no longer exist.
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
In `@crates/broker/src/runtime/fleet.rs` around lines 388 - 391, In
FleetDeliverySurfacing::Inject, missing workers are treated like retryable
delivery failures, which causes handle_fleet_deliver to withhold the ack and
loop forever. Update the inject path in fleet.rs to detect a permanently missing
worker before or after self.workers.deliver(&deliver.agent, relay_delivery) by
checking WorkerRegistry state or matching the unknown worker error from
send_to_worker/deliver, and convert that case into the terminal AckUnknown
behavior so the ack advances immediately.
|
|
||
| self.publish_fleet_load(true).await; | ||
|
|
||
| // `spawn_worker_from_request` does not return a result; treat presence of | ||
| // the worker as success so the engine's invocation resolves. | ||
| if self.workers.workers.contains_key(&name) { | ||
| self.reply_action_output( | ||
| &invoke.invocation_id, | ||
| json!({ "spawned": true, "name": name.as_str() }), | ||
| ) | ||
| .await; | ||
| } else { | ||
| self.reply_action_error(&invoke.invocation_id, "spawn_failed") | ||
| .await; | ||
| } |
There was a problem hiding this comment.
🗄️ Data Integrity & Integration | 🟠 Major
🧩 Analysis chain
🏁 Script executed:
#!/bin/bash
rg -nP -C4 'fn publish_fleet_inventory_snapshot|UpdateInventory|fleet_inventory' crates/broker/src/runtime/fleet.rs crates/broker/src/node_control.rsRepository: AgentWorkforce/relay
Length of output: 17652
Node-action spawn path omits fleet_inventory registration and sync
handle_fleet_action_spawn (Lines 548-562) creates an agent via spawn_worker_from_request but fails to register it in self.fleet_inventory or propagate the change via publish_fleet_inventory().
In contrast, the sidecar spawn path (handle_fleet_spawn_agent, Lines 677-686) correctly updates the inventory map and calls publish_fleet_inventory() to sync the broker's state with the engine.
Without this update:
- Agents spawned via the node-only
action.invokepath are invisible to the engine's inventory view. - Subsequent
prune_fleet_agent_statecalls during release will be no-ops for these agents, as they cannot be found infleet_inventory.
Insert the agent into self.fleet_inventory and call self.publish_fleet_inventory().await immediately after successful spawn, mirroring the sidecar path.
Current code state
self.publish_fleet_load(true).await;
// `spawn_worker_from_request` does not return a result; treat presence of
// the worker as success so the engine's invocation resolves.
if self.workers.workers.contains_key(&name) {
self.reply_action_output(
&invoke.invocation_id,
json!({ "spawned": true, "name": name.as_str() }),
)
.await;
} else {
self.reply_action_error(&invoke.invocation_id, "spawn_failed")
.await;
}🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
In `@crates/broker/src/runtime/fleet.rs` around lines 548 - 562, The node-action
spawn flow in handle_fleet_action_spawn updates the worker set but never
registers the new agent in fleet_inventory or syncs the engine state. After
spawn_worker_from_request succeeds and self.workers.workers contains the new
name, insert the agent into self.fleet_inventory and immediately call
self.publish_fleet_inventory().await, matching the behavior already used in
handle_fleet_spawn_agent so spawned agents are visible and later
prune_fleet_agent_state can find them.
| Err(error) => { | ||
| let message = error.to_string(); | ||
| if is_unknown_worker_error_message(&message) { | ||
| workspace_http.forget_agent_registration(&name); | ||
| state.agents.remove(&name); | ||
| if paths.persist { | ||
| if let Err(save_error) = state.save(&paths.state) { | ||
| tracing::warn!( | ||
| path = %paths.state.display(), | ||
| error = %save_error, | ||
| "failed to persist broker state" | ||
| ); | ||
| } | ||
| } | ||
| tracing::debug!( | ||
| child = %name, | ||
| "ignoring duplicate relaycast release for already exited worker" | ||
| ); | ||
| // An already-exited worker is still a successful release. | ||
| ReleaseOutcome::Released |
There was a problem hiding this comment.
🩺 Stability & Availability | 🟠 Major | ⚡ Quick win
Clear stale release state on idempotent unknown-worker releases.
This branch returns Released, but it skips the success-path cleanup for pending deliveries, pending requests, delivery state, and result tokens. Duplicate releases for an already-exited worker can leave retries or pending request timeouts behind.
🧹 Proposed cleanup mirror
if is_unknown_worker_error_message(&message) {
workspace_http.forget_agent_registration(&name);
+ let dropped = take_pending_for_worker(pending_deliveries, &name);
+ if !dropped.is_empty() {
+ let _ = send_event(
+ sdk_out_tx,
+ json!({"kind":"delivery_dropped","name":&name,"count":dropped.len(),"reason":"agent_released"}),
+ )
+ .await;
+ let _ =
+ emit_dropped_delivery_failures(sdk_out_tx, &dropped, "agent_released")
+ .await;
+ }
+ fail_pending_requests_for_worker(pending_requests, &name, "relaycast_release");
+ delivery_states.remove(&name);
+ agent_result_tokens.retain(|_, agent| agent != &name);
state.agents.remove(&name);📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
| Err(error) => { | |
| let message = error.to_string(); | |
| if is_unknown_worker_error_message(&message) { | |
| workspace_http.forget_agent_registration(&name); | |
| state.agents.remove(&name); | |
| if paths.persist { | |
| if let Err(save_error) = state.save(&paths.state) { | |
| tracing::warn!( | |
| path = %paths.state.display(), | |
| error = %save_error, | |
| "failed to persist broker state" | |
| ); | |
| } | |
| } | |
| tracing::debug!( | |
| child = %name, | |
| "ignoring duplicate relaycast release for already exited worker" | |
| ); | |
| // An already-exited worker is still a successful release. | |
| ReleaseOutcome::Released | |
| Err(error) => { | |
| let message = error.to_string(); | |
| if is_unknown_worker_error_message(&message) { | |
| workspace_http.forget_agent_registration(&name); | |
| let dropped = take_pending_for_worker(pending_deliveries, &name); | |
| if !dropped.is_empty() { | |
| let _ = send_event( | |
| sdk_out_tx, | |
| json!({"kind":"delivery_dropped","name":&name,"count":dropped.len(),"reason":"agent_released"}), | |
| ) | |
| .await; | |
| let _ = | |
| emit_dropped_delivery_failures(sdk_out_tx, &dropped, "agent_released") | |
| .await; | |
| } | |
| fail_pending_requests_for_worker(pending_requests, &name, "relaycast_release"); | |
| delivery_states.remove(&name); | |
| agent_result_tokens.retain(|_, agent| agent != &name); | |
| state.agents.remove(&name); | |
| if paths.persist { | |
| if let Err(save_error) = state.save(&paths.state) { | |
| tracing::warn!( | |
| path = %paths.state.display(), | |
| error = %save_error, | |
| "failed to persist broker state" | |
| ); | |
| } | |
| } | |
| tracing::debug!( | |
| child = %name, | |
| "ignoring duplicate relaycast release for already exited worker" | |
| ); | |
| // An already-exited worker is still a successful release. | |
| ReleaseOutcome::Released |
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
In `@crates/broker/src/runtime/relaycast_events.rs` around lines 211 - 230, The
unknown-worker duplicate release path in relaycast_events should mirror the
normal success cleanup before returning Released. In the Err(error) branch of
the release handling logic, after is_unknown_worker_error_message(...) succeeds,
make sure the same cleanup performed on the success path runs for pending
deliveries, pending requests, delivery state, and result tokens, so stale
retry/timeout state is cleared for already-exited workers. Use the existing
release handling flow and related state helpers around ReleaseOutcome::Released
to locate and reuse the success-path cleanup rather than duplicating ad hoc
logic.
| Ok(token) => { | ||
| tracing::info!( | ||
| worker = %name, | ||
| "bound agent to node via agent.register for action.invoke spawn" | ||
| ); | ||
| Some(token.token) | ||
| } |
There was a problem hiding this comment.
🗄️ Data Integrity & Integration | 🟠 Major | 🏗️ Heavy lift
Keep the registered agent_id so failed local spawns can be deregistered.
After agent.register succeeds, only token.token is kept. If workers.spawn(...) fails later, the node-bound agent remains registered in relaycast with no local worker to receive delivery. Preserve token.agent_id and deregister/prune it in the spawn error branch.
Also applies to: 534-540
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
In `@crates/broker/src/runtime/relaycast_events.rs` around lines 392 - 398, The
local spawn flow in relaycast_events keeps only `token.token` after a successful
`agent.register`, so failed `workers.spawn(...)` calls cannot clean up the
relaycast registration. Update the spawn path around the `agent.register`
handling to retain `token.agent_id` alongside the token, then in the
`workers.spawn` error branch use that saved agent id to deregister or prune the
node-bound agent. Apply the same fix in both affected spawn blocks so failed
local spawns do not leave stale registrations.
| bind_http_registered_agent_to_node(workspace_http, node_name, &name) | ||
| .await; | ||
| Some(token) |
There was a problem hiding this comment.
🩺 Stability & Availability | 🟠 Major | 🏗️ Heavy lift
Do not ignore failed HTTP-to-node binding for action spawns.
bind_http_registered_agent_to_node returns Some(warning) when the agent remains undeliverable, but this path discards it and still spawns with the token. handle_fleet_action_spawn can then report success solely because the local worker exists, while node-only delivery will not reach it. Propagate this as a spawn failure or return an outcome that lets the caller send action.result { error }.
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
In `@crates/broker/src/runtime/relaycast_events.rs` around lines 421 - 423, The
action spawn flow is dropping the warning returned by
bind_http_registered_agent_to_node, so handle_fleet_action_spawn can treat an
undeliverable node binding as success. Update the relaycast_events spawn path to
inspect the return from bind_http_registered_agent_to_node and, when it yields
Some(warning), stop the spawn or return an error/outcome that lets the caller
emit action.result with error. Use handle_fleet_action_spawn and
bind_http_registered_agent_to_node as the key points to wire this failure
through instead of unconditionally continuing with Some(token).
The workflow/fix sub-agents' `git add -A` committed this session's .agentworkforce/trajectories/ records into the code branch. Untrack them (kept on disk) so the PR is a reviewable code-only diff. Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
The broker persisted its minted node token at a single global path and load_node_token only checked node_id, so a token minted for workspace A (or a local engine) was reused against workspace B / prod and rejected with HTTP 401 on /v1/node/ws, and the connect loop looped forever on the rejected cached token. - Scope the persisted token to workspace_id (and engine base_url): add both to PersistedNodeToken; load_node_token returns the cached token only when node_id AND workspace_id match (and base_url when both sides know it); legacy caches without base_url still reuse on a workspace match. persist_node_token / load_node_token / resolve_node_token signatures and call sites in runtime/init.rs updated accordingly. - Re-mint on 401: detect an HTTP 401 handshake rejection on the node-control connect, discard the cached file + in-memory token, and re-mint via RelayCast::create_node (wired through a NodeTokenMinter) before retrying. Bounded to MAX_UNAUTHORIZED_BEFORE_GIVING_UP (5) consecutive 401s, after which a loud hard error is surfaced instead of spinning. - Tests: load_node_token workspace/node/base_url mismatch + round-trip, legacy-cache reuse, and 401 connect-error detection. Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
There was a problem hiding this comment.
Caution
Some comments are outside the diff and can’t be posted inline due to platform limitations.
⚠️ Outside diff range comments (1)
crates/broker/src/node_control.rs (1)
734-777: 🎯 Functional Correctness | 🟠 Major | ⚡ Quick win
consecutive_unauthorizedis not reset on non-401 outcomes, contradicting its documented contract.The comment at Lines 653-656 states the counter is reset "on a successful mint (or any non-401 outcome)", but the only resets are in the
Deregisteredbranch (Line 739) and after a successful re-mint (Line 750). A plainControlRunResult::Disconnected— which includes a healthy session that later drops, or any non-401 connect failure (DNS/TLS/5xx) — falls through to Line 774 without resetting.Consequently the count is not truly "consecutive": 401s separated by successful sessions accumulate, and once the running total exceeds
MAX_UNAUTHORIZED_BEFORE_GIVING_UPthe broker stops re-minting permanently and only logs the "delivery DISABLED" error on every subsequent reconnect, never recovering even if a fresh mint would now succeed.🐛 Reset the counter on any non-401 result
if matches!(result, ControlRunResult::Unauthorized) { // The engine rejected our current node token. Re-mint a fresh one // (bounded) rather than reconnecting forever on the rejected token. consecutive_unauthorized = consecutive_unauthorized.saturating_add(1); @@ } + } else { + // A non-401 outcome breaks the consecutive-401 chain. + consecutive_unauthorized = 0; } let _ = event_tx.send(FleetControlEvent::Disconnected).await;🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the rest with a brief reason, keep changes minimal, and validate. In `@crates/broker/src/node_control.rs` around lines 734 - 777, The consecutive_unauthorized counter is only being cleared for Deregistered and successful remint, but the contract says it should reset on any non-401 outcome. Update the node_control retry loop so ControlRunResult::Disconnected (and any other non-Unauthorized result) also resets consecutive_unauthorized, ideally in the main result-handling path around ControlRunResult and before the reconnect sleep; keep the existing Unauthorized branch behavior unchanged.
♻️ Duplicate comments (1)
crates/broker/src/node_control.rs (1)
628-641: 🔒 Security & Privacy | 🟠 Major | ⚡ Quick win
node-token.jsonis still written with default permissions despite the prior fix being marked resolved.
persist_node_tokenwrites a bearer credential for/v1/node/wsviafs::write, which creates the file with the process umask (commonly0o644, i.e. world-readable). The0o600restriction from the earlier review is not present in the current code.NodeTokenMinter::remintre-persists through the same path, so re-minted tokens inherit the permissive mode too.🔒 Restrict permissions on create and on existing files
- fs::write(path, body) - .with_context(|| format!("failed to write node token file {}", path.display()))?; + let mut options = fs::OpenOptions::new(); + options.create(true).truncate(true).write(true); + #[cfg(unix)] + { + use std::os::unix::fs::OpenOptionsExt; + options.mode(0o600); + } + { + use std::io::Write; + let mut file = options + .open(path) + .with_context(|| format!("failed to write node token file {}", path.display()))?; + file.write_all(body.as_bytes()) + .with_context(|| format!("failed to write node token file {}", path.display()))?; + } + #[cfg(unix)] + { + use std::os::unix::fs::PermissionsExt; + fs::set_permissions(path, fs::Permissions::from_mode(0o600)) + .with_context(|| format!("failed to restrict node token file {}", path.display()))?; + } Ok(())🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the rest with a brief reason, keep changes minimal, and validate. In `@crates/broker/src/node_control.rs` around lines 628 - 641, `persist_node_token` is still writing `node-token.json` with default umask-based permissions, so the bearer token can be world-readable. Update the write path in `persist_node_token` to create or rewrite the file with restrictive `0o600` permissions, and make sure `NodeTokenMinter::remint` inherits the same behavior since it calls this same persistence flow. If the file already exists, explicitly tighten its mode after writing or use a permission-setting create/write approach so both initial creation and re-persistence stay private.
🧹 Nitpick comments (1)
crates/broker/src/node_control.rs (1)
742-773: 📐 Maintainability & Code Quality | 🔵 Trivial | ⚡ Quick winNo test exercises the live re-mint / 401 recovery path.
All
FleetControlConfigfixtures settoken_minter: None, and the new unit tests cover onlyload_node_token/persist_node_tokenscoping andconnect_error_is_unauthorized. The actualUnauthorized→remint()→ reconnect loop inrun_node_control_client(including the counter behavior above and the give-up branch) is untested. Given the PR notes a live last-mile test still pending, consider adding a mock-engine test that returns 401 then accepts after a re-mint.Want me to draft a
tokio::testthat wires atoken_minter, rejects the first handshake with 401, and asserts a fresh token is used on reconnect?🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the rest with a brief reason, keep changes minimal, and validate. In `@crates/broker/src/node_control.rs` around lines 742 - 773, The live Unauthorized recovery path in run_node_control_client is untested, especially the remint() retry loop and the give-up logging branch. Add a tokio::test that uses a FleetControlConfig with a mock token_minter, forces an initial 401/ControlRunResult::Unauthorized from the engine, then verifies remint() is called, config.node_token is replaced, the reconnect continues with the fresh token, and the consecutive_unauthorized counter resets; also cover the branch where repeated unauthorized responses exceed MAX_UNAUTHORIZED_BEFORE_GIVING_UP and the error path is emitted.
🤖 Prompt for all review comments with AI agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
Outside diff comments:
In `@crates/broker/src/node_control.rs`:
- Around line 734-777: The consecutive_unauthorized counter is only being
cleared for Deregistered and successful remint, but the contract says it should
reset on any non-401 outcome. Update the node_control retry loop so
ControlRunResult::Disconnected (and any other non-Unauthorized result) also
resets consecutive_unauthorized, ideally in the main result-handling path around
ControlRunResult and before the reconnect sleep; keep the existing Unauthorized
branch behavior unchanged.
---
Duplicate comments:
In `@crates/broker/src/node_control.rs`:
- Around line 628-641: `persist_node_token` is still writing `node-token.json`
with default umask-based permissions, so the bearer token can be world-readable.
Update the write path in `persist_node_token` to create or rewrite the file with
restrictive `0o600` permissions, and make sure `NodeTokenMinter::remint`
inherits the same behavior since it calls this same persistence flow. If the
file already exists, explicitly tighten its mode after writing or use a
permission-setting create/write approach so both initial creation and
re-persistence stay private.
---
Nitpick comments:
In `@crates/broker/src/node_control.rs`:
- Around line 742-773: The live Unauthorized recovery path in
run_node_control_client is untested, especially the remint() retry loop and the
give-up logging branch. Add a tokio::test that uses a FleetControlConfig with a
mock token_minter, forces an initial 401/ControlRunResult::Unauthorized from the
engine, then verifies remint() is called, config.node_token is replaced, the
reconnect continues with the fresh token, and the consecutive_unauthorized
counter resets; also cover the branch where repeated unauthorized responses
exceed MAX_UNAUTHORIZED_BEFORE_GIVING_UP and the error path is emitted.
ℹ️ Review info
⚙️ Run configuration
Configuration used: Organization UI
Review profile: CHILL
Plan: Pro Plus
Run ID: fef5e896-e87e-4585-8c80-45dddecf044b
📒 Files selected for processing (3)
CHANGELOG.mdcrates/broker/src/node_control.rscrates/broker/src/runtime/init.rs
✅ Files skipped from review due to trivial changes (1)
- CHANGELOG.md
🚧 Files skipped from review as they are similar to previous changes (1)
- crates/broker/src/runtime/init.rs
The engine scopes relaycast nodes globally, so a host running brokers for two different workspaces (each in its own working directory) collided on a single global machine-id node, failing create_node and enrollment. Derive the node id deterministically from (machine_seed, canonical_cwd) via sha2, keeping it stable across restarts in the same directory but distinct across directories. The machine-id file stays the per-machine seed; cwd is read from current_dir (canonicalized when possible) with a seed-only fallback when cwd is unreadable. derive_node_id is a pure testable fn. Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
…ent reply WARN The engine replies to every node-control request (node.register, inventory.sync) with a fresh snowflake reply id, since the broker sends those frames without an id. The broker routed every reply frame to complete_agent_registration, so these non-agent replies never matched a pending agent registration and produced a spurious WARN 'agent.register reply did not match a pending registration id=<snowflake>'. Resolve the pending registration by request id first, then fall back to matching the validated reply data.name against a pending entry (robust against an engine that drops/regenerates the id). Replies that resolve to neither are treated as non-agent replies and logged at debug, not warn. Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
There was a problem hiding this comment.
Actionable comments posted: 1
Caution
Some comments are outside the diff and can’t be posted inline due to platform limitations.
⚠️ Outside diff range comments (1)
crates/broker/src/node_control.rs (1)
549-612: 🗄️ Data Integrity & Integration | 🟠 Major | ⚡ Quick winPropagate identity filesystem errors instead of silently changing the node id.
Line 550 treats every seed read failure like a missing file, which can rotate the machine seed on permission/UTF-8/transient I/O errors. Line 611 also collapses
current_dirfailures to"", reintroducing one node id per host. Since this function already returnsResult, fail fast instead of minting a potentially colliding identity.Proposed fix
pub(crate) fn load_or_create_machine_seed(path: &Path) -> Result<String> { - if let Ok(existing) = fs::read_to_string(path) { - let existing = existing.trim(); - if !existing.is_empty() { - return Ok(existing.to_string()); - } + match fs::read_to_string(path) { + Ok(existing) => { + let existing = existing.trim(); + if !existing.is_empty() { + return Ok(existing.to_string()); + } + } + Err(error) if error.kind() == std::io::ErrorKind::NotFound => {} + Err(error) => { + return Err(error) + .with_context(|| format!("failed to read node id file {}", path.display())); + } } @@ pub(crate) fn load_or_create_node_id(seed_path: &Path) -> Result<String> { let seed = load_or_create_machine_seed(seed_path)?; - let cwd = std::env::current_dir() - .map(|path| { - std::fs::canonicalize(&path) - .unwrap_or(path) - .to_string_lossy() - .into_owned() - }) - .unwrap_or_default(); + let cwd_path = std::env::current_dir() + .context("failed to read current working directory for node id")?; + let cwd = std::fs::canonicalize(&cwd_path) + .unwrap_or(cwd_path) + .to_string_lossy() + .into_owned(); Ok(derive_node_id(&seed, &cwd)) }🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the rest with a brief reason, keep changes minimal, and validate. In `@crates/broker/src/node_control.rs` around lines 549 - 612, load_or_create_machine_seed and load_or_create_node_id are swallowing real filesystem/environment errors and can silently change or collapse the node identity. Update load_or_create_machine_seed to only generate a new seed when the file is truly absent, and propagate read/parse/UTF-8/I/O failures instead of treating them as empty. In load_or_create_node_id, avoid defaulting current_dir failures to an empty string; return the error so callers can fail fast rather than deriving a shared host-wide node id. Keep the existing derive_node_id behavior, but only call it after successfully resolving both seed and cwd.
🤖 Prompt for all review comments with AI agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
Inline comments:
In `@crates/broker/src/node_control.rs`:
- Around line 1121-1128: The fallback in complete_agent_registration currently
matches pending_agent_registrations by name using a linear scan, which can
non-deterministically pick the wrong pending request when names are duplicated.
Update the correlation logic to use a stable unique identifier already available
in the flow, such as request_id or a propagated agent_id from the registration
reply, and remove the lookup-by-name fallback. If name must remain part of the
logic, enforce uniqueness before inserting into pending_agent_registrations so
the request maps to the correct oneshot channel every time.
---
Outside diff comments:
In `@crates/broker/src/node_control.rs`:
- Around line 549-612: load_or_create_machine_seed and load_or_create_node_id
are swallowing real filesystem/environment errors and can silently change or
collapse the node identity. Update load_or_create_machine_seed to only generate
a new seed when the file is truly absent, and propagate read/parse/UTF-8/I/O
failures instead of treating them as empty. In load_or_create_node_id, avoid
defaulting current_dir failures to an empty string; return the error so callers
can fail fast rather than deriving a shared host-wide node id. Keep the existing
derive_node_id behavior, but only call it after successfully resolving both seed
and cwd.
🪄 Autofix (Beta)
Fix all unresolved CodeRabbit comments on this PR:
- Push a commit to this branch (recommended)
- Create a new PR with the fixes
ℹ️ Review info
⚙️ Run configuration
Configuration used: Organization UI
Review profile: CHILL
Plan: Pro Plus
Run ID: d7289b04-4b75-434e-a338-d6b2a9a33762
📒 Files selected for processing (2)
CHANGELOG.mdcrates/broker/src/node_control.rs
✅ Files skipped from review due to trivial changes (1)
- CHANGELOG.md
| let name = data.as_ref()?.name.as_deref()?; | ||
| let key = pending_agent_registrations | ||
| .iter() | ||
| .find(|(_, pending)| pending.name == name) | ||
| .map(|(key, _)| key.clone())?; | ||
| pending_agent_registrations | ||
| .remove(&key) | ||
| .map(|pending| (key, pending)) |
There was a problem hiding this comment.
🗄️ Data Integrity & Integration | 🟡 Minor
🧩 Analysis chain
🏁 Script executed:
#!/bin/bash
# Verify whether RegisterAgent insertion enforces unique pending names before complete_agent_registration's name fallback.
rg -n -C4 'pending_agent_registrations\.(insert|entry)|FleetControlCommand::RegisterAgent|PendingAgentRegistration' crates/broker/src/node_control.rs crates/broker/src/runtimeRepository: AgentWorkforce/relay
Length of output: 11941
Fix nondeterministic correlation of agent registration replies.
The code fallback in complete_agent_registration searches pending_agent_registrations by name via linear iteration. Since the map is keyed by request_id (inserted at line 1010) and does not enforce unique name values, concurrent registrations for multiple agents with the same name will cause the fallback to arbitrarily pick the first match. This risks sending the registration response to the wrong oneshot channel.
Ensure the correlation logic uses a deterministic key (e.g., propagate a unique agent_id or request_id in the reply) or enforce a uniqueness constraint on name before insertion.
Current logic at lines 1121-1128
let name = data.as_ref()?.name.as_deref()?;
let key = pending_agent_registrations
.iter()
.find(|(_, pending)| pending.name == name)
.map(|(key, _)| key.clone())?;
pending_agent_registrations
.remove(&key)
.map(|pending| (key, pending))
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
In `@crates/broker/src/node_control.rs` around lines 1121 - 1128, The fallback in
complete_agent_registration currently matches pending_agent_registrations by
name using a linear scan, which can non-deterministically pick the wrong pending
request when names are duplicated. Update the correlation logic to use a stable
unique identifier already available in the flow, such as request_id or a
propagated agent_id from the registration reply, and remove the lookup-by-name
fallback. If name must remain part of the logic, enforce uniqueness before
inserting into pending_agent_registrations so the request maps to the correct
oneshot channel every time.
…on ref, bound re-mint loop
Three node-only delivery fixes:
- Node token cache is now scoped per node_id (node-tokens/{node_id}.json,
filename-sanitized) instead of one host-wide file, so two brokers in
different cwds on a host no longer overwrite each other's token.
- node action.invoke spawns forward invocation_id and the harness session
ref into agent.register (was hardcoded None,None), restoring invocation
correlation and session resume. HTTP /api/spawn derives session ref from
its spec too.
- node-control re-mint loop only resets the consecutive-401 counter when a
connection actually establishes (not on a successful mint) and drops the
post-mint `continue` so retries honor the reconnect backoff, making the
give-up cap reachable and stopping a tight POST /v1/nodes loop.
Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
Operator-enrolled / fleet nodes pin their node id (via the machine-id file) to match an engine-issued node token. The cwd-hash derivation broke that: the broker registered a derived id, so the engine rejected node.register with node_id_mismatch and the node never came online (two-node fleet E2E timed out on online+handlers_live). Only derive when auto-minting (no RELAY_NODE_TOKEN); otherwise use the pinned id verbatim. Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
The broker's pre-sidecar bootstrap node.register advertised a generic capability literally named "spawn". The relaycast engine does not treat bare "spawn" as a placement capability (only spawn:* is), so its ensureCapabilityActions materialized a regular `spawn` ACTION pinned to whichever node bootstrapped first. From then on every spawn invoke resolved that action and was dispatched to the bootstrapping node, short-circuiting capability-based spawn placement for the whole workspace — cli/target_node/least-loaded routing were all ignored. This regressed the two-node fleet E2E: spawn:codex/spawn:claude landed on the wrong node, target_node placement-mismatch returned 201 instead of 409, and least-loaded scheduling misrouted. The bootstrap descriptor now carries no capabilities; the node's real spawn:*/action capabilities arrive on the sidecar's node.register. The node is still registered (online) before the sidecar connects, but claims no handler until the authoritative capability set is reported. Extracts the manifest into `bootstrap_node_manifest` with a unit test asserting the empty capability set. Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
A `spawn:<harness>` node action registered by a sidecar was being run by the broker directly with the literal `cli` from the action input (`handle_fleet_action_spawn`), launching the real CLI instead of the node's declared harness. In the fleet/sidecar model the sidecar owns the harness: its `spawn(<harness>)` handler resolves the declared harness spec and calls `ctx.spawnAgent` (-> `spawn_agent` -> handle_fleet_spawn_agent). Route `spawn:*` to the sidecar's registered handler (same path as echo/work) whenever the sidecar declared a handler for that capability, gated by a new `HandlerDispatchState::has_handler`. The broker-direct raw-`cli` spawn is reserved for the direct / no-sidecar path where no sidecar handler is registered. Fixes the fleet-E2E least-loaded scheduling flake: heavyweight real processes (instead of the lightweight stub PTY) lingered/exited and triggered broker re-init, collapsing per-node active_agents. Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
✅ Fully validated end-to-end on prod (cast.agentrelay.com, v5.0.1)
The complete chain was exercised live against the deployed v5.0.1 engine with this branch's broker: enroll as a
role:brokernode → spawn an agent (node-bound to the fleet node) → DM from another agent → engine delivered it over/v1/node/ws→ broker injected it into the agent's PTY → acked. The deliveredevent_idmatched the sent DM. This is the exact scenario that was broken ("messages never inject between broker-spawned agents") — now working.What this does
Migrates
agent-relay-brokerto relaycast v5.0.1's node-only delivery model. In v5.0.1 the workspace firehose (/v1/ws) is observer-only; realtime delivery happens exclusively over the node channel (/v1/node/ws). The broker now:machine-id + hash(cwd)(so multiple brokers/workspaces on one host don't collide oncreate_node); mints + persists a workspace-scoped node token (create_node); connects/v1/node/wsunconditionally and sendsnode.register. A missing token is a loud hard-fault; a node-WS 401 invalidates the cached token and re-mints (bounded)./api/spawnand engineaction.invokespawns) so the engine routes their messages back./v1/node/ws—deliverframes (route byagent_id, branch onpayload.type: inject messages +action.completed/action.failed/action.deniedresults, ack-only for reactions/receipts), cumulativedelivery.ack {up_to_seq}, bounded dedup,seq:0fan-out surfaced.action.invoke→action.result.fleet_mode_enabled, and the rejected/v1/wsworkspace-stream connection are gone (HTTP client retained for registration/API).How it was built & validated
/v1/wsteardown → workspace-scoped token cache + re-mint-on-401 + cwd-scoped node id + agent.register reply correlation (robustness bugs the review and prod test surfaced).Architecture
The node / broker / broker-pty-engine /
@agent-relay/harness-driver/ action-runner vocabulary + delivery model is documented in agentrelay.com docs PR #9 (architecture.mdx).🤖 Generated with Claude Code