Skip to content

Commit 4b5baac

Browse files
committed
introduce libsql injector
1 parent 7c4ea18 commit 4b5baac

20 files changed

Lines changed: 196 additions & 34 deletions

File tree

Cargo.lock

Lines changed: 2 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -45,6 +45,7 @@ rusqlite = { package = "libsql-rusqlite", path = "vendored/rusqlite", version =
4545
] }
4646
hyper = { version = "0.14" }
4747
tower = { version = "0.4.13" }
48+
zerocopy = { version = "0.7.32", features = ["derive", "alloc"] }
4849

4950
# Config for 'cargo dist'
5051
[workspace.metadata.dist]

libsql-replication/Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@ license = "MIT"
1212
tonic = { version = "0.11", features = ["tls"] }
1313
prost = "0.12"
1414
libsql-sys = { version = "0.7", path = "../libsql-sys", default-features = false, features = ["wal", "rusqlite", "api"] }
15+
libsql-wal = { path = "../libsql-wal/" }
1516
rusqlite = { workspace = true }
1617
parking_lot = "0.12.1"
1718
bytes = { version = "1.5.0", features = ["serde"] }

libsql-replication/proto/replication_log.proto

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,12 @@ message LogOffset {
99

1010
message HelloRequest {
1111
optional uint64 handshake_version = 1;
12+
enum WalFlavor {
13+
Sqlite = 0;
14+
Libsql = 1;
15+
}
16+
// the type of wal that the client is expecting
17+
optional WalFlavor wal_flavor = 2;
1218
}
1319

1420
message HelloResponse {

libsql-replication/src/frame.rs

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,6 @@ use crate::LIBSQL_PAGE_SIZE;
1313
pub type FrameNo = u64;
1414

1515
/// The file header for the WAL log. All fields are represented in little-endian ordering.
16-
/// See `encode` and `decode` for actual layout.
1716
// repr C for stable sizing
1817
#[repr(C)]
1918
#[derive(Debug, Clone, Copy, zerocopy::FromZeroes, zerocopy::FromBytes, zerocopy::AsBytes)]
@@ -22,7 +21,7 @@ pub struct FrameHeader {
2221
pub frame_no: lu64,
2322
/// Rolling checksum of all the previous frames, including this one.
2423
pub checksum: lu64,
25-
/// page number, if frame_type is FrameType::Page
24+
/// page number
2625
pub page_no: lu32,
2726
/// Size of the database (in page) after committing the transaction. This is passed from sqlite,
2827
/// and serves as commit transaction boundary

libsql-replication/src/generated/wal_log.rs

Lines changed: 42 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,48 @@ pub struct LogOffset {
1010
pub struct HelloRequest {
1111
#[prost(uint64, optional, tag = "1")]
1212
pub handshake_version: ::core::option::Option<u64>,
13+
/// the type of wal that the client is expecting
14+
#[prost(enumeration = "hello_request::WalFlavor", optional, tag = "2")]
15+
pub wal_flavor: ::core::option::Option<i32>,
16+
}
17+
/// Nested message and enum types in `HelloRequest`.
18+
pub mod hello_request {
19+
#[derive(
20+
Clone,
21+
Copy,
22+
Debug,
23+
PartialEq,
24+
Eq,
25+
Hash,
26+
PartialOrd,
27+
Ord,
28+
::prost::Enumeration
29+
)]
30+
#[repr(i32)]
31+
pub enum WalFlavor {
32+
Sqlite = 0,
33+
Libsql = 1,
34+
}
35+
impl WalFlavor {
36+
/// String value of the enum field names used in the ProtoBuf definition.
37+
///
38+
/// The values are not transformed in any way and thus are considered stable
39+
/// (if the ProtoBuf definition does not change) and safe for programmatic use.
40+
pub fn as_str_name(&self) -> &'static str {
41+
match self {
42+
WalFlavor::Sqlite => "Sqlite",
43+
WalFlavor::Libsql => "Libsql",
44+
}
45+
}
46+
/// Creates an enum from field names used in the ProtoBuf definition.
47+
pub fn from_str_name(value: &str) -> ::core::option::Option<Self> {
48+
match value {
49+
"Sqlite" => Some(Self::Sqlite),
50+
"Libsql" => Some(Self::Libsql),
51+
_ => None,
52+
}
53+
}
54+
}
1355
}
1456
#[allow(clippy::derive_partial_eq_without_eq)]
1557
#[derive(Clone, PartialEq, ::prost::Message)]
Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,11 +1,12 @@
11
pub type Result<T, E=Error> = std::result::Result<T, E>;
2+
pub type BoxError = Box<dyn std::error::Error + Send + Sync + 'static>;
23

34
#[derive(Debug, thiserror::Error)]
45
pub enum Error {
56
#[error("IO error: {0}")]
67
Io(#[from] std::io::Error),
78
#[error("SQLite error: {0}")]
89
Sqlite(#[from] rusqlite::Error),
9-
#[error("A fatal error occured injecting frames")]
10-
FatalInjectError,
10+
#[error("A fatal error occured injecting frames: {0}")]
11+
FatalInjectError(BoxError),
1112
}
Lines changed: 44 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,44 @@
1+
use std::mem::size_of;
2+
3+
use libsql_wal::io::StdIO;
4+
use libsql_wal::replication::injector::Injector;
5+
use libsql_wal::segment::Frame as WalFrame;
6+
use zerocopy::{AsBytes, FromZeroes};
7+
8+
use crate::frame::{Frame, FrameNo};
9+
10+
use super::error::{Error, Result};
11+
12+
pub struct LibsqlInjector {
13+
injector: Injector<StdIO>,
14+
}
15+
16+
impl super::Injector for LibsqlInjector {
17+
async fn inject_frame(&mut self, frame: Frame) -> Result<Option<FrameNo>> {
18+
// this is a bit annoying be we want to read the frame, and it has to be aligned, so we
19+
// must copy it...
20+
// FIXME: optimize this.
21+
let mut wal_frame = WalFrame::new_box_zeroed();
22+
if frame.bytes().len() != size_of::<WalFrame>() {
23+
todo!("invalid frame");
24+
}
25+
wal_frame.as_bytes_mut().copy_from_slice(&frame.bytes()[..]);
26+
Ok(self
27+
.injector
28+
.insert_frame(wal_frame)
29+
.await
30+
.map_err(|e| Error::FatalInjectError(e.into()))?)
31+
}
32+
33+
async fn rollback(&mut self) {
34+
self.injector.rollback();
35+
}
36+
37+
async fn flush(&mut self) -> Result<Option<FrameNo>> {
38+
self.injector
39+
.flush(None)
40+
.await
41+
.map_err(|e| Error::FatalInjectError(e.into()))?;
42+
Ok(None)
43+
}
44+
}

libsql-replication/src/injector/mod.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
use std::future::Future;
22

33
pub use sqlite_injector::SqliteInjector;
4+
pub use libsql_injector::LibsqlInjector;
45

56
use crate::frame::{Frame, FrameNo};
67

@@ -9,6 +10,7 @@ pub use error::Error;
910

1011
mod error;
1112
mod sqlite_injector;
13+
mod libsql_injector;
1214

1315
pub trait Injector {
1416
/// Inject a singular frame.

libsql-replication/src/injector/sqlite_injector/mod.rs

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -192,8 +192,8 @@ impl SqliteInjectorInner {
192192
match stmt.execute(()).and_then(|_| connection.cache_flush()) {
193193
Ok(_) => panic!("replication hook was not called"),
194194
Err(e) => {
195-
if let Some(e) = e.sqlite_error() {
196-
if e.extended_code == LIBSQL_INJECT_OK {
195+
if let Some(err) = e.sqlite_error() {
196+
if err.extended_code == LIBSQL_INJECT_OK {
197197
// refresh schema
198198
connection.pragma_update(None, "writable_schema", "reset")?;
199199
let mut rollback = connection.prepare_cached("ROLLBACK")?;
@@ -203,16 +203,16 @@ impl SqliteInjectorInner {
203203
let commit_frame_no = self.biggest_uncommitted_seen;
204204
self.biggest_uncommitted_seen = 0;
205205
return Ok(Some(commit_frame_no));
206-
} else if e.extended_code == LIBSQL_INJECT_OK_TXN {
206+
} else if err.extended_code == LIBSQL_INJECT_OK_TXN {
207207
self.is_txn = true;
208208
assert!(self.buffer.lock().is_empty());
209209
return Ok(None);
210-
} else if e.extended_code == LIBSQL_INJECT_FATAL {
211-
return Err(Error::FatalInjectError);
210+
} else if err.extended_code == LIBSQL_INJECT_FATAL {
211+
return Err(Error::FatalInjectError(e.into()));
212212
}
213213
}
214214

215-
Err(Error::FatalInjectError)
215+
Err(Error::FatalInjectError(e.into()))
216216
}
217217
}
218218
}

0 commit comments

Comments
 (0)