Skip to content

Commit b7ffb05

Browse files
committed
add storage replicator to SharedWal
1 parent 037a75b commit b7ffb05

2 files changed

Lines changed: 9 additions & 3 deletions

File tree

libsql-wal/src/registry.rs

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -294,6 +294,10 @@ where
294294
checkpointed_frame_no: header.replication_index.get().into(),
295295
new_frame_notifier,
296296
durable_frame_no,
297+
stored_segments: Box::new(StorageReplicator::new(
298+
self.storage.clone(),
299+
namespace.clone(),
300+
)),
297301
});
298302

299303
opened.with_upgraded(|opened| {

libsql-wal/src/shared_wal.rs

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@ use parking_lot::{Mutex, MutexGuard};
1111
use crate::error::{Error, Result};
1212
use crate::io::file::FileExt;
1313
use crate::io::Io;
14+
use crate::replication::storage::ReplicateFromStorage;
1415
use crate::segment::current::CurrentSegment;
1516
use crate::transaction::{ReadTransaction, Savepoint, Transaction, TxGuard, WriteTransaction};
1617
use libsql_sys::name::NamespaceName;
@@ -43,8 +44,9 @@ pub struct SharedWal<IO: Io> {
4344
#[allow(dead_code)] // used by replication
4445
pub(crate) checkpointed_frame_no: AtomicU64,
4546
/// max frame_no acknoledged by the durable storage
46-
pub(crate) durable_frame_no: AtomicU64,
47+
pub(crate) durable_frame_no: Arc<Mutex<u64>>,
4748
pub(crate) new_frame_notifier: tokio::sync::watch::Sender<u64>,
49+
pub(crate) stored_segments: Box<dyn ReplicateFromStorage>,
4850
}
4951

5052
impl<IO: Io> SharedWal<IO> {
@@ -265,7 +267,7 @@ impl<IO: Io> SharedWal<IO> {
265267
}
266268

267269
pub async fn checkpoint(&self) -> Result<Option<u64>> {
268-
let durable_frame_no = self.durable_frame_no.load(Ordering::SeqCst);
270+
let durable_frame_no = *self.durable_frame_no.lock();
269271
let checkpointed_frame_no = self
270272
.current
271273
.load()
@@ -312,7 +314,7 @@ mod test {
312314

313315
seal_current_segment(&shared);
314316

315-
shared.durable_frame_no.store(99999, Ordering::Relaxed);
317+
*shared.durable_frame_no.lock() = 999999;
316318

317319
let frame_no = shared.checkpoint().await.unwrap().unwrap();
318320
assert_eq!(frame_no, 4);

0 commit comments

Comments
 (0)