feat(factory): dispatch relayflows from integration events#47
Conversation
📝 WalkthroughWalkthroughReplaces the ChangesRelayflow SDK & Dispatch Integration
Estimated code review effort🎯 4 (Complex) | ⏱️ ~60 minutes Possibly related PRs
Poem
🚥 Pre-merge checks | ✅ 4 | ❌ 1❌ Failed checks (1 warning)
✅ Passed checks (4 passed)
✨ Finishing Touches📝 Generate docstrings
🧪 Generate unit tests (beta)
Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. Comment |
There was a problem hiding this comment.
Code Review
This pull request integrates a policy-driven relayflow dispatch registry into the factory orchestrator, allowing workflows to be dispatched in response to normalized integration events. It also updates the workflow execution model to run the Relayflows SDK in-process rather than shelling out to a CLI, and ensures that relayfile mounts are running at each configured clone path. Key feedback points include addressing a potential TypeError when config.clonePaths is undefined, avoiding non-concurrency-safe mutations of global process.env during concurrent workflow runs, preventing blocking of the live event drain loop by dispatching relayflows asynchronously, forwarding custom options in the provider client wrappers, and correcting edge cases in the plural-to-singular resource name normalization logic.
Important
The consumer version of Gemini Code Assist on GitHub is being sunset. Starting June 18, 2026, new organization installations will be blocked, and all code review activity will officially cease on July 17, 2026.
For more details on the timeline and next steps, please review the Help Documentation.
| const mountFn = deps.ensureLocalMount ?? ensureLocalMount | ||
| const mountOpts = { acceptableWorkspaceIds: acceptableMountIds } | ||
| const daemonCwd = resolve(process.cwd()) | ||
| for (const clonePath of new Set(Object.values(config.clonePaths))) { |
There was a problem hiding this comment.
If config.clonePaths is undefined or null (which is common if no custom checkouts are configured), calling Object.values(config.clonePaths) will throw a TypeError. Guard this with a nullish coalescing operator to prevent CLI crashes.
| for (const clonePath of new Set(Object.values(config.clonePaths))) { | |
| for (const clonePath of new Set(Object.values(config.clonePaths ?? {}))) { |
| async function withProcessEnv<T>( | ||
| values: Record<string, string>, | ||
| fn: () => Promise<T>, | ||
| ): Promise<T> { | ||
| const previous = new Map<string, string | undefined>() | ||
| for (const [key, value] of Object.entries(values)) { | ||
| previous.set(key, process.env[key]) | ||
| process.env[key] = value | ||
| } | ||
| try { | ||
| const packageJsonPath = requireForResolve.resolve('@relayflows/cli/package.json') | ||
| return { command: process.execPath, args: [join(dirname(packageJsonPath), 'dist', 'cli.js'), 'run', workflow] } | ||
| } catch { | ||
| return { command: 'relayflows', args: ['run', workflow] } | ||
| return await fn() | ||
| } finally { | ||
| for (const [key, value] of previous) { | ||
| if (value === undefined) { | ||
| delete process.env[key] | ||
| } else { | ||
| process.env[key] = value | ||
| } | ||
| } | ||
| } | ||
| } |
There was a problem hiding this comment.
Mutating the global process.env is not concurrency-safe. Since a factory node can run multiple workflows concurrently (as indicated by active_agents and max_agents), concurrent executions of runRelayflowsWorkflow will overwrite each other's environment variables (such as RELAYFLOWS_INPUTS_JSON and RELAY_INVOCATION_ID). Consider passing these variables directly to the execution context or isolating the environment (e.g., by spawning a child process with a custom environment map or using worker threads).
| const batch = events.splice(0, LIVE_EVENT_DRAIN_BATCH_SIZE) | ||
| const paths: string[] = [] | ||
| for (const event of batch) { | ||
| await this.#dispatchRelayflowEvent(event) |
There was a problem hiding this comment.
Awaiting this.#dispatchRelayflowEvent(event) inside the live event drain loop will block the processing of subsequent events in the batch. Since running a relayflow workflow can be a long-running operation (involving network requests or multi-step execution), this can severely degrade the responsiveness of the factory orchestrator. Consider dispatching the relayflow asynchronously (e.g., using void or floating the promise) similar to how it is handled in the subscription callback.
| relayClient: (provider) => relayClient(provider, clientOpts), | ||
| providerClient: (provider) => providerClient(provider, clientOpts), |
There was a problem hiding this comment.
The wrapper functions for relayClient and providerClient currently ignore any custom options passed by the caller (such as a custom mountRoot). Forward the optional arguments to the underlying helper functions to ensure the full signature and behavior are preserved.
| relayClient: (provider) => relayClient(provider, clientOpts), | |
| providerClient: (provider) => providerClient(provider, clientOpts), | |
| relayClient: (provider, options) => relayClient(provider, { ...clientOpts, ...options }), | |
| providerClient: (provider, options) => providerClient(provider, { ...clientOpts, ...options }), |
| function normalizeResourceName(value: string | undefined): string | undefined { | ||
| if (!value) return undefined | ||
| if (value === 'pulls' || value === 'pull-requests' || value === 'pull_requests') return 'pull_request' | ||
| if (value.endsWith('ies')) return `${value.slice(0, -3)}y` | ||
| if (value.endsWith('s')) return value.slice(0, -1) | ||
| return value | ||
| } |
There was a problem hiding this comment.
The naive plural-to-singular normalization logic has two failure modes:
- Words ending in
es(likebranchesorboxes) will be incorrectly normalized tobrancheorboxebecause only the trailingsis stripped. - Singular words ending in
s(likestatusorprocess) will have their trailingsstripped (e.g.,statuorproces).
Update the helper to handle these cases correctly.
| function normalizeResourceName(value: string | undefined): string | undefined { | |
| if (!value) return undefined | |
| if (value === 'pulls' || value === 'pull-requests' || value === 'pull_requests') return 'pull_request' | |
| if (value.endsWith('ies')) return `${value.slice(0, -3)}y` | |
| if (value.endsWith('s')) return value.slice(0, -1) | |
| return value | |
| } | |
| function normalizeResourceName(value: string | undefined): string | undefined { | |
| if (!value) return undefined | |
| if (value === 'pulls' || value === 'pull-requests' || value === 'pull_requests') return 'pull_request' | |
| if (value.endsWith('ies')) return `${value.slice(0, -3)}y` | |
| if (value.endsWith('ches') || value.endsWith('shes') || value.endsWith('xes')) return value.slice(0, -2) | |
| if (value.endsWith('s') && !['status', 'process', 'address', 'access', 'analysis'].includes(value)) return value.slice(0, -1) | |
| return value | |
| } |
There was a problem hiding this comment.
💡 Codex Review
Here are some automated review suggestions for this pull request.
Reviewed commit: 3df469abec
ℹ️ About Codex in GitHub
Your team has set up Codex to review pull requests in this repo. Reviews are triggered when you
- Open a pull request for review
- Mark a draft as ready
- Comment "@codex review".
If Codex has suggestions, it will comment; otherwise it will react with 👍.
Codex can also answer questions or update the PR. Try commenting "@codex address that feedback".
| const batch = events.splice(0, LIVE_EVENT_DRAIN_BATCH_SIZE) | ||
| const paths: string[] = [] | ||
| for (const event of batch) { | ||
| await this.#dispatchRelayflowEvent(event) |
There was a problem hiding this comment.
Widen the live subscription before dispatching relayflows
When live transport uses subscribe/subscribe-and-poll, this call only sees events enqueued by the subscription registered with [LIVE_ISSUE_GLOB, LIVE_GITHUB_ISSUE_GLOB] at src/orchestrator/factory.ts:529; the real event client checks those globs before invoking onChange, so paths like /slack/... or /notion/... never reach this dispatch. Registered policies for non-issue integration events therefore silently do not run unless poll transport is selected; subscribe to the registered trigger roots or all integration paths before dispatching.
Useful? React with 👍 / 👎.
| const batch = events.splice(0, LIVE_EVENT_DRAIN_BATCH_SIZE) | ||
| const paths: string[] = [] | ||
| for (const event of batch) { | ||
| await this.#dispatchRelayflowEvent(event) |
There was a problem hiding this comment.
Suppress replayed events before dispatching relayflows
This dispatch runs before #prepareLiveEventForDrain applies the high-watermark, replay-skew, and duplicate checks. In live subscribe reconnect/replay or duplicate-delivery scenarios, events that the factory later suppresses for issue processing still invoke registered relayflows, which can repeat writebacks or other side effects; apply the same live-event identity checks before invoking policies.
Useful? React with 👍 / 👎.
| const previous = new Map<string, string | undefined>() | ||
| for (const [key, value] of Object.entries(values)) { | ||
| previous.set(key, process.env[key]) | ||
| process.env[key] = value |
There was a problem hiding this comment.
Avoid sharing workflow inputs through process.env
withProcessEnv mutates global process.env across an awaited SDK run; if two workflow:run actions or relayflow dispatches overlap in the node process, the second invocation can overwrite RELAYFLOWS_INPUTS_JSON/RELAY_INVOCATION_ID while the first workflow or its child commands are still reading them, and either finally block can restore/delete the other's values. The previous shell-out passed env per child, so keep inputs in per-run SDK options or serialize/isolated-spawn the env-dependent path.
Useful? React with 👍 / 👎.
|
Working tree is clean — only the untracked Let me make one final review-quality pass on a potential semantic concern worth flagging: the The test
That matches the test expectation Now I have a complete picture. Let me finalize the review. Review Summary — PR #47:
|
There was a problem hiding this comment.
Actionable comments posted: 3
🤖 Prompt for all review comments with AI agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
Inline comments:
In `@src/node/factory-node.ts`:
- Around line 295-355: The env-backed workflow execution in factory-node is
mutating process.env inside withProcessEnv, so overlapping workflow:run calls
can clobber each other’s RELAYFLOWS_INPUTS_JSON and RELAY_INVOCATION_ID. Add a
serialization guard around the workflow execution path in the main handler (the
branch that calls runWorkflow and runScriptWorkflowWithCwd) so only one
env-mutating run can happen at a time. Keep the env restore logic in
withProcessEnv, but make sure concurrent requests cannot enter that block
simultaneously.
In `@src/orchestrator/factory.ts`:
- Line 418: The subscribe transport in factory.ts still filters events to only
the Linear/GitHub trees, so non-issue integration paths like Notion never reach
the new dispatch hooks. Update the subscription matching logic around
dispatchRelayflowEvent and the related subscribe path handling so it includes
all Relayfile provider event namespaces, not just issue-based ones, and verify
the same fix is applied at the other referenced hook location as well.
- Around line 740-741: Relayflow dispatch is happening before the live-event
suppression gate, so replayed/stale events can still trigger duplicate external
workflows. Update the flow in the orchestrator factory around
`#dispatchRelayflowEvent` and `#prepareLiveEventForDrain` so relayflow is only
dispatched after the event passes the duplicate/high-watermark filtering, or
otherwise guard `#dispatchRelayflowEvent` with the same suppression result used by
`#prepareLiveEventForDrain`.
🪄 Autofix (Beta)
Fix all unresolved CodeRabbit comments on this PR:
- Push a commit to this branch (recommended)
- Create a new PR with the fixes
ℹ️ Review info
⚙️ Run configuration
Configuration used: Organization UI
Review profile: CHILL
Plan: Pro Plus
Run ID: 930ec489-f162-4dac-a2e5-c676f7102c24
⛔ Files ignored due to path filters (1)
package-lock.jsonis excluded by!**/package-lock.json
📒 Files selected for processing (18)
package.jsonplanning/factory-cloud-watches-local-node-linear-issue.mdplanning/factory-unified-node-architecture-linear-issue.mdplanning/linear-issue-factory-phase-3-fleet-client.mdplanning/linear-issue-factory-phase-4-node-registration.mdsrc/cli/fleet.test.tssrc/cli/fleet.tssrc/dispatch/relayflow-registry.test.tssrc/dispatch/relayflow-registry.tssrc/index.tssrc/node/factory-node.test.tssrc/node/factory-node.tssrc/orchestrator/factory.test.tssrc/orchestrator/factory.tssrc/ports/fleet.tssrc/subscriptions/__tests__/event-client.test.tssrc/subscriptions/event-client.tssrc/types.ts
|
I made no file edits — the diff is clean, has no lint/format tooling, and the full CI command set passes. My findings are behavioral observations for the author. Here is my review. Review — PR #47: dispatch relayflows from integration eventsSummaryThis PR moves the CI verification (ran the canonical CI command set)
One transient failure on the first cold run: I made no file edits — the diff is internally consistent, there is no lint/format tooling or lint CI step, and I found no mechanical (typo/import-order/format) issues to apply. Review comments (for author judgment — left as comments, not changed)
Addressed commentsNo bot or human review comments were provided in the PR metadata ( Advisory Notes
CI is green across the full command set, the change is well-covered by new and updated tests, and the remaining items are author-judgment behavioral questions rather than defects. Not printing READY: the three review comments above are genuine open questions for the author (dedup asymmetry, subscription breadth, global-env mutation) that should be resolved or consciously accepted before merge, so this is not yet a clean hand-off requiring only human sign-off. |
Summary
FactoryPorts.relayflowsinto live subscription handling without pollingrelayClient(provider)/providerClient(provider)while keeping named Linear/GitHub/Slack clientsfile.createdmaps to triggers likelinear.issue.createdwhile maintainingrelayfile.changedcompatibilityworkflow:runthrough the embedded@relayflows/coreSDK instead of shelling out to a CLI>=20.18.1to match runtime depsVerification
npm test -- src/subscriptions/__tests__/event-client.test.ts src/dispatch/relayflow-registry.test.ts src/orchestrator/factory.test.ts src/node/factory-node.test.ts src/linear/state-resolver.test.tsnpm run buildnpm test>=22engine declarations remain in the relevant runtime treeSummary by cubic
Dispatch Relayflows from integration events using a registry and the embedded
@relayflows/corerunner. Live subscriptions now trigger workflows immediately (no polling), with provider clients from@relayfile/relay-helpers.New Features
RelayflowPolicyRegistryand dispatch from live integration events; workflows run in-process via@relayflows/core.file.createdmaps to triggers likelinear.issue.created,filesystemEventTypeis forwarded, andrelayfile.changedstays compatible.relayClient(provider)andproviderClient(provider)alongside named Linear/GitHub/Slack clients.FactoryPorts.relayflowsto wire policy dispatch into the factory loop.clonePathso integration paths resolve from the checkout.Dependencies
@relayflows/coreand@relayfile/relay-helpers.relayflowsCLI; workflows run via the SDK.>=20.18.1.@scalar/postman-to-openapiandlistr2.Written for commit 92a18ae. Summary will update on new commits.