Add SQLite bus transport#90
Conversation
|
Warning Review limit reached
More reviews will be available in 14 minutes and 46 seconds. Learn how PR review limits work. Your organization has used up its prepaid credits, and credit purchases are no longer available. Enable the review add-on in the billing tab to keep reviews running — you're only billed for reviews past your plan's rate limits ($0.25/file). ⌛ How to resolve this issue?After more reviews become available, a review can be triggered using the We recommend that you space out your commits to avoid hitting the rate limit. 🚦 How do rate limits work?CodeRabbit enforces hourly rate limits for each developer per organization. Our paid plans include higher PR review limits than trial, open-source, and free plans. In all cases, reviews become available again over time. During sustained high-volume PR review activity, CodeRabbit may temporarily slow when the next review becomes available. Please see our Fair Usage Limits Policy for further information. ℹ️ Review info⚙️ Run configurationConfiguration used: defaults Review profile: CHILL Plan: Pro Run ID: 📒 Files selected for processing (5)
📝 WalkthroughWalkthroughThis PR adds SQLite as a durable message bus transport, providing a local alternative to PostgreSQL for tests, demos, and small deployments. The implementation includes three-table schema for competing-consumer commands and fan-out events with per-consumer offset tracking, trait implementations for send/publish and listen/subscribe patterns, comprehensive integration tests validating core and edge-case behavior, and updated documentation describing the transport's intended scope and topology. ChangesSQLite Durable Transport
Estimated code review effort🎯 4 (Complex) | ⏱️ ~50 minutes The PR introduces substantial new transport logic across schema design, dual messaging patterns (competing consumer and fan-out), message reconstruction with error handling, and a comprehensive test suite covering core functionality and edge cases (concurrency, corruption, retryability). While changes are contained within a feature-gated transport module, the logic density and multiple interacting patterns require careful review of atomicity guarantees, state transitions, offset tracking semantics, and error propagation.
🚥 Pre-merge checks | ✅ 4 | ❌ 1❌ Failed checks (1 warning)
✅ Passed checks (4 passed)
✏️ Tip: You can configure your own custom pre-merge checks in the settings. ✨ Finishing Touches🧪 Generate unit tests (beta)
Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. Comment |
There was a problem hiding this comment.
Actionable comments posted: 3
Caution
Some comments are outside the diff and can’t be posted inline due to platform limitations.
⚠️ Outside diff range comments (1)
docs/async-transports.md (1)
208-217:⚠️ Potential issue | 🟡 Minor | ⚡ Quick winStatus section does not mention
SqliteBusamong implemented/verified transports.The Status section (lines 208–217) still lists only
Postgres/NATS/RabbitMQ/Kafkaadapters andInMemoryBus/NatsBus/PostgresBus/RabbitBus/KafkaBus/KnativeBus. Since this PR implements and verifiesSqliteBus(with passing tests confirmed in the PR objectives), the Status section should be updated to includeSqliteBusin both the adapter list and the bus facade list.📝 Proposed update to Status section
Implemented and verified: the core contracts, the source runner, the publisher / outbox dispatcher, the conformance harness, the Postgres / NATS / RabbitMQ / -Kafka adapters, the Knative ingress, and the **bus facade** (`Bus` + -`BusConsumer` with `InMemoryBus` / `NatsBus` / `PostgresBus` / `RabbitBus` / -`KafkaBus` / `KnativeBus`, each with real-broker competing-vs-fan-out tests). +Kafka / SQLite adapters, the Knative ingress, and the **bus facade** (`Bus` + +`BusConsumer` with `InMemoryBus` / `NatsBus` / `PostgresBus` / `SqliteBus` / `RabbitBus` / +`KafkaBus` / `KnativeBus`, each with real-broker competing-vs-fan-out tests).🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the rest with a brief reason, keep changes minimal, and validate. In `@docs/async-transports.md` around lines 208 - 217, Update the "Status" section to include the implemented Sqlite transport: add "Sqlite" to the adapters list and add "SqliteBus" to the bus-facade list so the sentence that currently lists Postgres / NATS / RabbitMQ / Kafka adapters and InMemoryBus / NatsBus / PostgresBus / RabbitBus / KafkaBus / KnativeBus also mentions Sqlite and SqliteBus (preserving existing wording and punctuation around the bus facade and adapter lists).
🤖 Prompt for all review comments with AI agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
Inline comments:
In `@src/bus/sqlite_bus.rs`:
- Around line 41-49: The schema and deserialization currently normalize
corrupted rows into valid messages; tighten the CREATE TABLE CHECK for the kind
column to only allow the supported enum values (replace generic CHECK (kind <>
'') with a CHECK that matches the explicit set of MessageKind variants) and stop
lossy decoding: change message_from_row to use strict parsing for MessageKind
(do not use MessageKind::from_str_lossy) and make metadata and content_type
decoding return an error that flows into decode_error instead of falling back to
defaults so corrupted durable rows fail permanently and go through the
poison-row policy.
- Around line 319-320: The SQL RETURNING currently emits "seq, name, message_id,
kind, payload, content_type, metadata" and settlements identify rows by seq
only, which allows stale workers to settle newer claims; change the claiming SQL
to also generate and RETURN a per-claim token (e.g., claim_token or lease_token)
and include that token in the returned row payload, persist it with the claim,
and then require that token in every settlement WHERE clause (every
ack/nack/dead_letter/delete/unlock statement that currently uses only seq) so
settlements use WHERE seq = ? AND claim_token = ?; update the code paths that
receive the RETURNING values to capture the token and pass it into the
ack/nack/dead_letter functions and their SQL, and ensure token is
cleared/updated when the claim is released or expires.
In `@tests/sqlite_transport/main.rs`:
- Line 63: The code calls sidecar.file_name().unwrap() when building name (let
name = format!("{}{suffix}", sidecar.file_name().unwrap().to_string_lossy())),
which can panic; change this to defensively handle None by checking the
file_name() result first (e.g. if let Some(fname) = sidecar.file_name() { let
name = format!("{}{}", fname.to_string_lossy(), suffix); /* existing cleanup
using name */ } else { return; } ) so the Drop/cleanup code returns safely
instead of panicking.
---
Outside diff comments:
In `@docs/async-transports.md`:
- Around line 208-217: Update the "Status" section to include the implemented
Sqlite transport: add "Sqlite" to the adapters list and add "SqliteBus" to the
bus-facade list so the sentence that currently lists Postgres / NATS / RabbitMQ
/ Kafka adapters and InMemoryBus / NatsBus / PostgresBus / RabbitBus / KafkaBus
/ KnativeBus also mentions Sqlite and SqliteBus (preserving existing wording and
punctuation around the bus facade and adapter lists).
🪄 Autofix (Beta)
Fix all unresolved CodeRabbit comments on this PR:
- Push a commit to this branch (recommended)
- Create a new PR with the fixes
ℹ️ Review info
⚙️ Run configuration
Configuration used: defaults
Review profile: CHILL
Plan: Pro
Run ID: 126dae92-a8ae-4c29-a06d-fb111e9a0091
📒 Files selected for processing (4)
docs/async-transports.mdsrc/bus/mod.rssrc/bus/sqlite_bus.rstests/sqlite_transport/main.rs
4db21e8 to
d2c1834
Compare
Implements [[tasks/sqlite-bus-transport-impl]]
Implements [[tasks/sqlite-bus-transport-impl]]
|
CodeRabbit follow-up for non-inline items:
|
Summary
SqliteBuswith durable command queue semantics and event log fan-out overSqlitePool.distributed::busbehindfeature = "sqlite".SqliteBusin the async transports guide.Rebase / DRY Follow-up
mainafter refactor: reduce duplicated source code #91 (ff5cb35).tests/transport_conformancerecording helpers.CLI Integration Follow-up
integration-distributed-cliworkflow to PR quality checks, matching the push-to-main gate.dsvc schemamanifest harness to referencedistributed::table::TableSqlDialect, since refactor: reduce duplicated source code #91 intentionally removed the rootdistributed::TableSqlDialectre-export.Verification
cargo fmt --checkcargo test --test sqlite_transport --features sqlite --quietcargo test --features sqlite --quietcargo test --all-features --quietcargo test -p distributed_cli --verbose -- --include-ignoredgit diff --checkcargo clippy --test sqlite_transport --features sqlite -- -D warnings -A clippy::doc_lazy_continuation -A clippy::needless_question_markNote:
cargo clippy --test sqlite_transport --features sqlite -- -D warningscurrently fails on base-code warnings outside this PR (src/repository/traits.rs:99,src/snapshot/repository.rs:307).GitKB task:
tasks/sqlite-bus-transport-implcompleted.Summary by CodeRabbit
Release Notes
New Features
Documentation
namespacehandling guidance, adapter topology details, and an overview of the intended local durable scope.Tests