Skip to content

Commit 7c4ea18

Browse files
committed
abstract replicator injector and introduce SqliteInjector
1 parent 8077948 commit 7c4ea18

8 files changed

Lines changed: 418 additions & 338 deletions

File tree

bottomless/src/replicator.rs

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@ use aws_sdk_s3::primitives::ByteStream;
1717
use aws_sdk_s3::{Client, Config};
1818
use bytes::{Buf, Bytes};
1919
use chrono::{DateTime, NaiveDateTime, TimeZone, Utc};
20+
use libsql_replication::injector::Injector as _;
2021
use libsql_sys::{Cipher, EncryptionConfig};
2122
use std::ops::Deref;
2223
use std::path::{Path, PathBuf};
@@ -1449,12 +1450,12 @@ impl Replicator {
14491450
db_path: &Path,
14501451
) -> Result<bool> {
14511452
let encryption_config = self.encryption_config.clone();
1452-
let mut injector = libsql_replication::injector::Injector::new(
1453-
db_path,
1453+
let mut injector = libsql_replication::injector::SqliteInjector::new(
1454+
db_path.to_path_buf(),
14541455
4096,
14551456
libsql_sys::connection::NO_AUTOCHECKPOINT,
14561457
encryption_config,
1457-
)?;
1458+
).await?;
14581459
let prefix = format!("{}-{}/", self.db_name, generation);
14591460
let mut page_buf = {
14601461
let mut v = Vec::with_capacity(page_size);
@@ -1552,7 +1553,7 @@ impl Replicator {
15521553
},
15531554
page_buf.as_slice(),
15541555
);
1555-
injector.inject_frame(frame_to_inject)?;
1556+
injector.inject_frame(frame_to_inject).await?;
15561557
applied_wal_frame = true;
15571558
}
15581559
}

libsql-replication/src/injector/error.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,5 @@
1+
pub type Result<T, E=Error> = std::result::Result<T, E>;
2+
13
#[derive(Debug, thiserror::Error)]
24
pub enum Error {
35
#[error("IO error: {0}")]
Lines changed: 13 additions & 285 deletions
Original file line numberDiff line numberDiff line change
@@ -1,299 +1,27 @@
1-
use std::path::Path;
2-
use std::sync::Arc;
3-
use std::{collections::VecDeque, path::PathBuf};
1+
use std::future::Future;
42

5-
use parking_lot::Mutex;
6-
use rusqlite::OpenFlags;
3+
pub use sqlite_injector::SqliteInjector;
74

85
use crate::frame::{Frame, FrameNo};
96

7+
use error::Result;
108
pub use error::Error;
119

12-
use self::injector_wal::{
13-
InjectorWal, InjectorWalManager, LIBSQL_INJECT_FATAL, LIBSQL_INJECT_OK, LIBSQL_INJECT_OK_TXN,
14-
};
15-
1610
mod error;
17-
mod headers;
18-
mod injector_wal;
19-
20-
#[derive(Debug)]
21-
pub enum InjectError {}
22-
23-
pub type FrameBuffer = Arc<Mutex<VecDeque<Frame>>>;
24-
25-
pub struct Injector {
26-
/// The injector is in a transaction state
27-
is_txn: bool,
28-
/// Buffer for holding current transaction frames
29-
buffer: FrameBuffer,
30-
/// Maximum capacity of the frame buffer
31-
capacity: usize,
32-
/// Injector connection
33-
// connection must be dropped before the hook context
34-
connection: Arc<Mutex<libsql_sys::Connection<InjectorWal>>>,
35-
biggest_uncommitted_seen: FrameNo,
36-
37-
// Connection config items used to recreate the injection connection
38-
path: PathBuf,
39-
encryption_config: Option<libsql_sys::EncryptionConfig>,
40-
auto_checkpoint: u32,
41-
}
42-
43-
/// Methods from this trait are called before and after performing a frame injection.
44-
/// This trait trait is used to record the last committed frame_no to the log.
45-
/// The implementer can persist the pre and post commit frame no, and compare them in the event of
46-
/// a crash; if the pre and post commit frame_no don't match, then the log may be corrupted.
47-
impl Injector {
48-
pub fn new(
49-
path: impl AsRef<Path>,
50-
capacity: usize,
51-
auto_checkpoint: u32,
52-
encryption_config: Option<libsql_sys::EncryptionConfig>,
53-
) -> Result<Self, Error> {
54-
let path = path.as_ref().to_path_buf();
55-
56-
let buffer = FrameBuffer::default();
57-
let wal_manager = InjectorWalManager::new(buffer.clone());
58-
let connection = libsql_sys::Connection::open(
59-
&path,
60-
OpenFlags::SQLITE_OPEN_READ_WRITE
61-
| OpenFlags::SQLITE_OPEN_CREATE
62-
| OpenFlags::SQLITE_OPEN_URI
63-
| OpenFlags::SQLITE_OPEN_NO_MUTEX,
64-
wal_manager,
65-
auto_checkpoint,
66-
encryption_config.clone(),
67-
)?;
68-
69-
Ok(Self {
70-
is_txn: false,
71-
buffer,
72-
capacity,
73-
connection: Arc::new(Mutex::new(connection)),
74-
biggest_uncommitted_seen: 0,
75-
76-
path,
77-
encryption_config,
78-
auto_checkpoint,
79-
})
80-
}
81-
82-
/// Inject a frame into the log. If this was a commit frame, returns Ok(Some(FrameNo)).
83-
pub fn inject_frame(&mut self, frame: Frame) -> Result<Option<FrameNo>, Error> {
84-
let frame_close_txn = frame.header().size_after.get() != 0;
85-
self.buffer.lock().push_back(frame);
86-
if frame_close_txn || self.buffer.lock().len() >= self.capacity {
87-
return self.flush();
88-
}
11+
mod sqlite_injector;
8912

90-
Ok(None)
91-
}
13+
pub trait Injector {
14+
/// Inject a singular frame.
15+
fn inject_frame(
16+
&mut self,
17+
frame: Frame,
18+
) -> impl Future<Output = Result<Option<FrameNo>>> + Send;
9219

93-
pub fn rollback(&mut self) {
94-
let conn = self.connection.lock();
95-
let mut rollback = conn.prepare_cached("ROLLBACK").unwrap();
96-
let _ = rollback.execute(());
97-
self.is_txn = false;
98-
}
20+
/// Discard any uncommintted frames.
21+
fn rollback(&mut self) -> impl Future<Output = ()> + Send;
9922

10023
/// Flush the buffer to libsql WAL.
10124
/// Trigger a dummy write, and flush the cache to trigger a call to xFrame. The buffer's frame
10225
/// are then injected into the wal.
103-
pub fn flush(&mut self) -> Result<Option<FrameNo>, Error> {
104-
match self.try_flush() {
105-
Err(e) => {
106-
// something went wrong, rollback the connection to make sure we can retry in a
107-
// clean state
108-
self.biggest_uncommitted_seen = 0;
109-
self.rollback();
110-
Err(e)
111-
}
112-
Ok(ret) => Ok(ret),
113-
}
114-
}
115-
116-
fn try_flush(&mut self) -> Result<Option<FrameNo>, Error> {
117-
if !self.is_txn {
118-
self.begin_txn()?;
119-
}
120-
121-
let lock = self.buffer.lock();
122-
// the frames in the buffer are either monotonically increasing (log) or decreasing
123-
// (snapshot). Either way, we want to find the biggest frameno we're about to commit, and
124-
// that is either the front or the back of the buffer
125-
let last_frame_no = match lock.back().zip(lock.front()) {
126-
Some((b, f)) => f.header().frame_no.get().max(b.header().frame_no.get()),
127-
None => {
128-
tracing::trace!("nothing to inject");
129-
return Ok(None);
130-
}
131-
};
132-
133-
self.biggest_uncommitted_seen = self.biggest_uncommitted_seen.max(last_frame_no);
134-
135-
drop(lock);
136-
137-
let connection = self.connection.lock();
138-
// use prepare cached to avoid parsing the same statement over and over again.
139-
let mut stmt =
140-
connection.prepare_cached("INSERT INTO libsql_temp_injection VALUES (42)")?;
141-
142-
// We execute the statement, and then force a call to xframe if necesacary. If the execute
143-
// succeeds, then xframe wasn't called, in this case, we call cache_flush, and then process
144-
// the error.
145-
// It is unexpected that execute flushes, but it is possible, so we handle that case.
146-
match stmt.execute(()).and_then(|_| connection.cache_flush()) {
147-
Ok(_) => panic!("replication hook was not called"),
148-
Err(e) => {
149-
if let Some(e) = e.sqlite_error() {
150-
if e.extended_code == LIBSQL_INJECT_OK {
151-
// refresh schema
152-
connection.pragma_update(None, "writable_schema", "reset")?;
153-
let mut rollback = connection.prepare_cached("ROLLBACK")?;
154-
let _ = rollback.execute(());
155-
self.is_txn = false;
156-
assert!(self.buffer.lock().is_empty());
157-
let commit_frame_no = self.biggest_uncommitted_seen;
158-
self.biggest_uncommitted_seen = 0;
159-
return Ok(Some(commit_frame_no));
160-
} else if e.extended_code == LIBSQL_INJECT_OK_TXN {
161-
self.is_txn = true;
162-
assert!(self.buffer.lock().is_empty());
163-
return Ok(None);
164-
} else if e.extended_code == LIBSQL_INJECT_FATAL {
165-
return Err(Error::FatalInjectError);
166-
}
167-
}
168-
169-
Err(Error::FatalInjectError)
170-
}
171-
}
172-
}
173-
174-
fn begin_txn(&mut self) -> Result<(), Error> {
175-
let mut conn = self.connection.lock();
176-
177-
{
178-
let wal_manager = InjectorWalManager::new(self.buffer.clone());
179-
let new_conn = libsql_sys::Connection::open(
180-
&self.path,
181-
OpenFlags::SQLITE_OPEN_READ_WRITE
182-
| OpenFlags::SQLITE_OPEN_CREATE
183-
| OpenFlags::SQLITE_OPEN_URI
184-
| OpenFlags::SQLITE_OPEN_NO_MUTEX,
185-
wal_manager,
186-
self.auto_checkpoint,
187-
self.encryption_config.clone(),
188-
)?;
189-
190-
let _ = std::mem::replace(&mut *conn, new_conn);
191-
}
192-
193-
conn.pragma_update(None, "writable_schema", "true")?;
194-
195-
let mut stmt = conn.prepare_cached("BEGIN IMMEDIATE")?;
196-
stmt.execute(())?;
197-
// we create a dummy table. This table MUST not be persisted, otherwise the replica schema
198-
// would differ with the primary's.
199-
let mut stmt =
200-
conn.prepare_cached("CREATE TABLE IF NOT EXISTS libsql_temp_injection (x)")?;
201-
stmt.execute(())?;
202-
203-
Ok(())
204-
}
205-
206-
pub fn clear_buffer(&mut self) {
207-
self.buffer.lock().clear()
208-
}
209-
210-
#[cfg(test)]
211-
pub fn is_txn(&self) -> bool {
212-
self.is_txn
213-
}
214-
}
215-
216-
#[cfg(test)]
217-
mod test {
218-
use crate::frame::FrameBorrowed;
219-
use std::mem::size_of;
220-
221-
use super::*;
222-
/// this this is generated by creating a table test, inserting 5 rows into it, and then
223-
/// truncating the wal file of it's header.
224-
const WAL: &[u8] = include_bytes!("../../assets/test/test_wallog");
225-
226-
fn wal_log() -> impl Iterator<Item = Frame> {
227-
WAL.chunks(size_of::<FrameBorrowed>())
228-
.map(|b| Frame::try_from(b).unwrap())
229-
}
230-
231-
#[test]
232-
fn test_simple_inject_frames() {
233-
let temp = tempfile::tempdir().unwrap();
234-
235-
let mut injector = Injector::new(temp.path().join("data"), 10, 10000, None).unwrap();
236-
let log = wal_log();
237-
for frame in log {
238-
injector.inject_frame(frame).unwrap();
239-
}
240-
241-
let conn = rusqlite::Connection::open(temp.path().join("data")).unwrap();
242-
243-
conn.query_row("SELECT COUNT(*) FROM test", (), |row| {
244-
assert_eq!(row.get::<_, usize>(0).unwrap(), 5);
245-
Ok(())
246-
})
247-
.unwrap();
248-
}
249-
250-
#[test]
251-
fn test_inject_frames_split_txn() {
252-
let temp = tempfile::tempdir().unwrap();
253-
254-
// inject one frame at a time
255-
let mut injector = Injector::new(temp.path().join("data"), 1, 10000, None).unwrap();
256-
let log = wal_log();
257-
for frame in log {
258-
injector.inject_frame(frame).unwrap();
259-
}
260-
261-
let conn = rusqlite::Connection::open(temp.path().join("data")).unwrap();
262-
263-
conn.query_row("SELECT COUNT(*) FROM test", (), |row| {
264-
assert_eq!(row.get::<_, usize>(0).unwrap(), 5);
265-
Ok(())
266-
})
267-
.unwrap();
268-
}
269-
270-
#[test]
271-
fn test_inject_partial_txn_isolated() {
272-
let temp = tempfile::tempdir().unwrap();
273-
274-
// inject one frame at a time
275-
let mut injector = Injector::new(temp.path().join("data"), 10, 1000, None).unwrap();
276-
let mut frames = wal_log();
277-
278-
assert!(injector
279-
.inject_frame(frames.next().unwrap())
280-
.unwrap()
281-
.is_none());
282-
let conn = rusqlite::Connection::open(temp.path().join("data")).unwrap();
283-
assert!(conn
284-
.query_row("SELECT COUNT(*) FROM test", (), |_| Ok(()))
285-
.is_err());
286-
287-
while injector
288-
.inject_frame(frames.next().unwrap())
289-
.unwrap()
290-
.is_none()
291-
{}
292-
293-
// reset schema
294-
conn.pragma_update(None, "writable_schema", "reset")
295-
.unwrap();
296-
conn.query_row("SELECT COUNT(*) FROM test", (), |_| Ok(()))
297-
.unwrap();
298-
}
26+
fn flush(&mut self) -> impl Future<Output = Result<Option<FrameNo>>> + Send;
29927
}
File renamed without changes.

libsql-replication/src/injector/injector_wal.rs renamed to libsql-replication/src/injector/sqlite_injector/injector_wal.rs

File renamed without changes.

0 commit comments

Comments
 (0)