Skip to content

Commit 631dd81

Browse files
committed
pass durable frame_no to libsql-injector
1 parent d554082 commit 631dd81

4 files changed

Lines changed: 18 additions & 0 deletions

File tree

libsql-replication/src/injector/libsql_injector.rs

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -49,4 +49,8 @@ impl super::Injector for LibsqlInjector {
4949
.map_err(|e| Error::FatalInjectError(e.into()))?;
5050
Ok(None)
5151
}
52+
53+
fn durable_frame_no(&mut self, frame_no: u64) {
54+
self.injector.set_durable(frame_no);
55+
}
5256
}

libsql-replication/src/injector/mod.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -29,4 +29,6 @@ pub trait Injector {
2929
/// Trigger a dummy write, and flush the cache to trigger a call to xFrame. The buffer's frame
3030
/// are then injected into the wal.
3131
fn flush(&mut self) -> impl Future<Output = Result<Option<FrameNo>>> + Send;
32+
33+
fn durable_frame_no(&mut self, frame_no: u64);
3234
}

libsql-replication/src/injector/sqlite_injector/mod.rs

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -46,6 +46,9 @@ impl Injector for SqliteInjector {
4646
let inner = self.inner.clone();
4747
spawn_blocking(move || inner.lock().flush()).await.unwrap()
4848
}
49+
50+
#[inline]
51+
fn durable_frame_no(&mut self, _frame_no: u64) { }
4952
}
5053

5154
impl SqliteInjector {

libsql-wal/src/replication/injector.rs

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
//! The injector is the module in charge of injecting frames into a replica database.
22
3+
use std::sync::atomic::Ordering;
34
use std::sync::Arc;
45

56
use crate::error::Result;
@@ -36,6 +37,14 @@ impl<IO: Io> Injector<IO> {
3637
})
3738
}
3839

40+
pub fn set_durable(&self, durable_frame_no: u64) {
41+
let mut old = self.wal.durable_frame_no.lock();
42+
if *old < durable_frame_no {
43+
*old = durable_frame_no
44+
} {
45+
todo!("primary reported older frameno than current");
46+
}
47+
}
3948
pub async fn insert_frame(&mut self, frame: Box<Frame>) -> Result<Option<u64>> {
4049
let size_after = frame.size_after();
4150
self.max_tx_frame_no = self.max_tx_frame_no.max(frame.header().frame_no());

0 commit comments

Comments
 (0)