Skip to content

Add SQLite bus transport#90

Merged
patrickleet merged 4 commits into
mainfrom
codex/sqlite-bus-transport
Jun 14, 2026
Merged

Add SQLite bus transport#90
patrickleet merged 4 commits into
mainfrom
codex/sqlite-bus-transport

Conversation

@patrickleet

@patrickleet patrickleet commented Jun 13, 2026

Copy link
Copy Markdown
Collaborator

Summary

  • Add feature-gated SqliteBus with durable command queue semantics and event log fan-out over SqlitePool.
  • Export the transport from distributed::bus behind feature = "sqlite".
  • Add SQLite transport tests for point-to-point delivery, fan-out, named groups, redelivery, busy/locked retry classification, and corrupt-row failure policy behavior.
  • Document SqliteBus in the async transports guide.

Rebase / DRY Follow-up

  • Rebased onto main after refactor: reduce duplicated source code #91 (ff5cb35).
  • Kept the new transport aligned with the shared test helper pattern from refactor: reduce duplicated source code #91 by reusing tests/transport_conformance recording helpers.
  • Reduced local duplication in the new code by sharing SQLite message insert binding, decoded received-row state, and common test message/id helpers.
  • Did not introduce a Postgres/SQLite shared bus SQL layer in this PR; the dialect-specific claim/read SQL still differs enough that a cross-dialect abstraction would have widened the change beyond this transport.

CLI Integration Follow-up

  • Add the reusable integration-distributed-cli workflow to PR quality checks, matching the push-to-main gate.
  • Fix the generated dsvc schema manifest harness to reference distributed::table::TableSqlDialect, since refactor: reduce duplicated source code #91 intentionally removed the root distributed::TableSqlDialect re-export.
  • Add a unit assertion for the generated harness path so the regression is caught before the ignored e2e harness tests.

Verification

  • cargo fmt --check
  • cargo test --test sqlite_transport --features sqlite --quiet
  • cargo test --features sqlite --quiet
  • cargo test --all-features --quiet
  • cargo test -p distributed_cli --verbose -- --include-ignored
  • git diff --check
  • cargo clippy --test sqlite_transport --features sqlite -- -D warnings -A clippy::doc_lazy_continuation -A clippy::needless_question_mark

Note: cargo clippy --test sqlite_transport --features sqlite -- -D warnings currently fails on base-code warnings outside this PR (src/repository/traits.rs:99, src/snapshot/repository.rs:307).

GitKB task: tasks/sqlite-bus-transport-impl completed.

Summary by CodeRabbit

Release Notes

  • New Features

    • Added SQLite as a durable message transport, including command delivery (competing consumers) and event fan-out across consumer groups, with configurable leasing and retry behavior.
  • Documentation

    • Updated transport docs with SQLite production wiring, namespace handling guidance, adapter topology details, and an overview of the intended local durable scope.
  • Tests

    • Added SQLite integration tests covering delivery, retries, busy/locking scenarios, and dead-letter handling.

@coderabbitai

coderabbitai Bot commented Jun 13, 2026

Copy link
Copy Markdown

Review Change Stack

Warning

Review limit reached

@patrickleet, we couldn't start this review because you've reached your PR review rate limit.

More reviews will be available in 14 minutes and 46 seconds. Learn how PR review limits work.

Your organization has used up its prepaid credits, and credit purchases are no longer available. Enable the review add-on in the billing tab to keep reviews running — you're only billed for reviews past your plan's rate limits ($0.25/file).

⌛ How to resolve this issue?

After more reviews become available, a review can be triggered using the @coderabbitai review command as a PR comment. Alternatively, push new commits to this PR.

We recommend that you space out your commits to avoid hitting the rate limit.

🚦 How do rate limits work?

CodeRabbit enforces hourly rate limits for each developer per organization.

Our paid plans include higher PR review limits than trial, open-source, and free plans. In all cases, reviews become available again over time. During sustained high-volume PR review activity, CodeRabbit may temporarily slow when the next review becomes available.

Please see our Fair Usage Limits Policy for further information.

ℹ️ Review info
⚙️ Run configuration

Configuration used: defaults

Review profile: CHILL

Plan: Pro

Run ID: 4191e03e-71cb-46b1-b085-5a7c37b86e9e

📥 Commits

Reviewing files that changed from the base of the PR and between d2c1834 and ba71afc.

📒 Files selected for processing (5)
  • .github/workflows/on-pr-quality.yaml
  • distributed_cli/src/cli.rs
  • docs/async-transports.md
  • src/bus/sqlite_bus.rs
  • tests/sqlite_transport/main.rs
📝 Walkthrough

Walkthrough

This PR adds SQLite as a durable message bus transport, providing a local alternative to PostgreSQL for tests, demos, and small deployments. The implementation includes three-table schema for competing-consumer commands and fan-out events with per-consumer offset tracking, trait implementations for send/publish and listen/subscribe patterns, comprehensive integration tests validating core and edge-case behavior, and updated documentation describing the transport's intended scope and topology.

Changes

SQLite Durable Transport

Layer / File(s) Summary
Module wiring and SQLite schema foundation
src/bus/mod.rs, src/bus/sqlite_bus.rs (lines 1–95)
Conditionally includes and re-exports sqlite_bus module behind feature gate. Defines bus_queue for competing-consumer commands, bus_log for fan-out events, and bus_offset for per-consumer event offsets, plus helper functions for SQL name filtering.
SqliteBus type and public construction API
src/bus/sqlite_bus.rs (lines 162–262)
Exports SqliteBus struct with pool, topology, and lease duration. Public methods: new, new_with_group, group, with_lease for configuration; ensure_tables to create schema; internal helpers manage enqueueing into queue and appending into log.
Message reconstruction and error handling
src/bus/sqlite_bus.rs (lines 96–161)
Implements message_from_row to decode messages with required-field validation; ReceivedRow struct carries seq, decoded message, and optional decode_error; decode_or_placeholder utility preserves decode failures while returning placeholder messages for failure-policy handling.
Bus trait implementation
src/bus/sqlite_bus.rs (lines 264–282)
Wires Bus trait send/publish methods to enqueue and append helpers for persisting commands to bus_queue and events to bus_log.
BusConsumer trait and source routing
src/bus/sqlite_bus.rs (lines 284–323)
Implements BusConsumer listen/subscribe to build QueueSource for competing-consumer delivery and LogSource for fan-out delivery, resolving consumer group and deriving command/event names from router.
Competing-consumer pattern
src/bus/sqlite_bus.rs (lines 325–428)
Implements QueueSource with atomic UPDATE...RETURNING lease-based claiming; SqliteQueueReceived wraps claimed rows; ack deletes, nack clears lease, dead_letter/park delete to prevent redelivery.
Fan-out event pattern
src/bus/sqlite_bus.rs (lines 430–515)
Implements LogSource selecting next event by seq > last_seq per consumer offset; SqliteLogReceived wraps log rows; ack/dead_letter/park advance offset, nack leaves offset unchanged.
Test infrastructure
tests/sqlite_transport/main.rs (lines 1–185)
TempDb creates isolated databases with unique files and cleanup; helpers (re)create schema with explicit columns, defaults, indexes; corruption utilities inject poison messages.
Core functional tests
tests/sqlite_transport/main.rs (lines 186–339)
Validates point-to-point delivery, fan-out across groups, named service group resolution, and retryable failures with nack-triggered redelivery and offset advancement only after success.
Advanced and edge-case tests
tests/sqlite_transport/main.rs (lines 341–454)
Tests SQLite write contention, dead-letter handling for corrupted queue rows, and dead-letter handling for corrupted log rows with offset progression past corruption.
Documentation updates
docs/async-transports.md
Added SqliteBus to production wiring example; clarified namespace handling for PostgresBus and SqliteBus; extended topology table with queue-claim and log/offset details; explained lease/claim approach and local deployment scope; updated test command.

Estimated code review effort

🎯 4 (Complex) | ⏱️ ~50 minutes

The PR introduces substantial new transport logic across schema design, dual messaging patterns (competing consumer and fan-out), message reconstruction with error handling, and a comprehensive test suite covering core functionality and edge cases (concurrency, corruption, retryability). While changes are contained within a feature-gated transport module, the logic density and multiple interacting patterns require careful review of atomicity guarantees, state transitions, offset tracking semantics, and error propagation.

🐰 SQLite brings durability near,
No Postgres required, small & clear,
Lease claims and offsets align,
Poison messages skip in line,
Tests prove it all holds strong and dear!

🚥 Pre-merge checks | ✅ 4 | ❌ 1

❌ Failed checks (1 warning)

Check name Status Explanation Resolution
Docstring Coverage ⚠️ Warning Docstring coverage is 48.39% which is insufficient. The required threshold is 80.00%. Write docstrings for the functions missing them to satisfy the coverage threshold.
✅ Passed checks (4 passed)
Check name Status Explanation
Title check ✅ Passed The title clearly and concisely summarizes the main change: adding a SQLite bus transport.
Linked Issues check ✅ Passed Check skipped because no linked issues were found for this pull request.
Out of Scope Changes check ✅ Passed Check skipped because no linked issues were found for this pull request.
Description Check ✅ Passed Check skipped - CodeRabbit’s high-level summary is enabled.

✏️ Tip: You can configure your own custom pre-merge checks in the settings.

✨ Finishing Touches
🧪 Generate unit tests (beta)
  • Create PR with unit tests
  • Commit unit tests in branch codex/sqlite-bus-transport

Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out.

❤️ Share

Comment @coderabbitai help to get the list of available commands and usage tips.

@coderabbitai coderabbitai Bot left a comment

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 3

Caution

Some comments are outside the diff and can’t be posted inline due to platform limitations.

⚠️ Outside diff range comments (1)
docs/async-transports.md (1)

208-217: ⚠️ Potential issue | 🟡 Minor | ⚡ Quick win

Status section does not mention SqliteBus among implemented/verified transports.

The Status section (lines 208–217) still lists only Postgres / NATS / RabbitMQ / Kafka adapters and InMemoryBus / NatsBus / PostgresBus / RabbitBus / KafkaBus / KnativeBus. Since this PR implements and verifies SqliteBus (with passing tests confirmed in the PR objectives), the Status section should be updated to include SqliteBus in both the adapter list and the bus facade list.

📝 Proposed update to Status section
 Implemented and verified: the core contracts, the source runner, the publisher /
 outbox dispatcher, the conformance harness, the Postgres / NATS / RabbitMQ /
-Kafka adapters, the Knative ingress, and the **bus facade** (`Bus` +
-`BusConsumer` with `InMemoryBus` / `NatsBus` / `PostgresBus` / `RabbitBus` /
-`KafkaBus` / `KnativeBus`, each with real-broker competing-vs-fan-out tests).
+Kafka / SQLite adapters, the Knative ingress, and the **bus facade** (`Bus` +
+`BusConsumer` with `InMemoryBus` / `NatsBus` / `PostgresBus` / `SqliteBus` / `RabbitBus` /
+`KafkaBus` / `KnativeBus`, each with real-broker competing-vs-fan-out tests).
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

In `@docs/async-transports.md` around lines 208 - 217, Update the "Status" section
to include the implemented Sqlite transport: add "Sqlite" to the adapters list
and add "SqliteBus" to the bus-facade list so the sentence that currently lists
Postgres / NATS / RabbitMQ / Kafka adapters and InMemoryBus / NatsBus /
PostgresBus / RabbitBus / KafkaBus / KnativeBus also mentions Sqlite and
SqliteBus (preserving existing wording and punctuation around the bus facade and
adapter lists).
🤖 Prompt for all review comments with AI agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

Inline comments:
In `@src/bus/sqlite_bus.rs`:
- Around line 41-49: The schema and deserialization currently normalize
corrupted rows into valid messages; tighten the CREATE TABLE CHECK for the kind
column to only allow the supported enum values (replace generic CHECK (kind <>
'') with a CHECK that matches the explicit set of MessageKind variants) and stop
lossy decoding: change message_from_row to use strict parsing for MessageKind
(do not use MessageKind::from_str_lossy) and make metadata and content_type
decoding return an error that flows into decode_error instead of falling back to
defaults so corrupted durable rows fail permanently and go through the
poison-row policy.
- Around line 319-320: The SQL RETURNING currently emits "seq, name, message_id,
kind, payload, content_type, metadata" and settlements identify rows by seq
only, which allows stale workers to settle newer claims; change the claiming SQL
to also generate and RETURN a per-claim token (e.g., claim_token or lease_token)
and include that token in the returned row payload, persist it with the claim,
and then require that token in every settlement WHERE clause (every
ack/nack/dead_letter/delete/unlock statement that currently uses only seq) so
settlements use WHERE seq = ? AND claim_token = ?; update the code paths that
receive the RETURNING values to capture the token and pass it into the
ack/nack/dead_letter functions and their SQL, and ensure token is
cleared/updated when the claim is released or expires.

In `@tests/sqlite_transport/main.rs`:
- Line 63: The code calls sidecar.file_name().unwrap() when building name (let
name = format!("{}{suffix}", sidecar.file_name().unwrap().to_string_lossy())),
which can panic; change this to defensively handle None by checking the
file_name() result first (e.g. if let Some(fname) = sidecar.file_name() { let
name = format!("{}{}", fname.to_string_lossy(), suffix); /* existing cleanup
using name */ } else { return; } ) so the Drop/cleanup code returns safely
instead of panicking.

---

Outside diff comments:
In `@docs/async-transports.md`:
- Around line 208-217: Update the "Status" section to include the implemented
Sqlite transport: add "Sqlite" to the adapters list and add "SqliteBus" to the
bus-facade list so the sentence that currently lists Postgres / NATS / RabbitMQ
/ Kafka adapters and InMemoryBus / NatsBus / PostgresBus / RabbitBus / KafkaBus
/ KnativeBus also mentions Sqlite and SqliteBus (preserving existing wording and
punctuation around the bus facade and adapter lists).
🪄 Autofix (Beta)

Fix all unresolved CodeRabbit comments on this PR:

  • Push a commit to this branch (recommended)
  • Create a new PR with the fixes

ℹ️ Review info
⚙️ Run configuration

Configuration used: defaults

Review profile: CHILL

Plan: Pro

Run ID: 126dae92-a8ae-4c29-a06d-fb111e9a0091

📥 Commits

Reviewing files that changed from the base of the PR and between 9219e95 and 4db21e8.

📒 Files selected for processing (4)
  • docs/async-transports.md
  • src/bus/mod.rs
  • src/bus/sqlite_bus.rs
  • tests/sqlite_transport/main.rs

Comment thread src/bus/sqlite_bus.rs Outdated
Comment thread src/bus/sqlite_bus.rs Outdated
Comment thread tests/sqlite_transport/main.rs Outdated
@patrickleet patrickleet force-pushed the codex/sqlite-bus-transport branch from 4db21e8 to d2c1834 Compare June 13, 2026 23:15
Implements [[tasks/sqlite-bus-transport-impl]]
Implements [[tasks/sqlite-bus-transport-impl]]
@patrickleet

Copy link
Copy Markdown
Collaborator Author

CodeRabbit follow-up for non-inline items:

  • Outside-diff docs/async-transports.md status note: fixed in ba71afc. The status section now lists SQLite among implemented adapters and SqliteBus among bus facade implementations, with wording adjusted to say tests run against each transport’s broker or local database.
  • Docstring Coverage warning: not fixed in this PR. This is a repo-wide coverage threshold warning, not a PR-specific correctness issue or enforced CI failure. Broadly raising the crate to 80% doc coverage would be a separate documentation sweep; this PR keeps the SQLite transport docs focused on the new public surface and behavior.
  • CodeRabbit rate-limit/billing notice: no code change needed; it is informational from the review service rather than a repository issue.

@patrickleet patrickleet merged commit 5d7c622 into main Jun 14, 2026
8 checks passed
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant