Skip to content

Commit ec4b2eb

Browse files
committed
delete segment after checkpoint
1 parent 4c9482a commit ec4b2eb

3 files changed

Lines changed: 69 additions & 30 deletions

File tree

libsql-wal/src/registry.rs

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -36,7 +36,7 @@ enum Slot<IO: Io> {
3636

3737
/// Wal Registry maintains a set of shared Wal, and their respective set of files.
3838
pub struct WalRegistry<IO: Io, S> {
39-
io: IO,
39+
io: Arc<IO>,
4040
path: PathBuf,
4141
shutdown: AtomicBool,
4242
opened: DashMap<NamespaceName, Slot<IO>>,
@@ -63,7 +63,7 @@ impl<IO: Io, S> WalRegistry<IO, S> {
6363
) -> Result<Self> {
6464
io.create_dir_all(&path)?;
6565
let registry = Self {
66-
io,
66+
io: io.into(),
6767
path,
6868
opened: Default::default(),
6969
shutdown: Default::default(),
@@ -340,6 +340,7 @@ where
340340
shutdown: false.into(),
341341
checkpoint_notifier: self.checkpoint_notifier.clone(),
342342
max_segment_size: 1000.into(),
343+
io: self.io.clone(),
343344
});
344345

345346
self.opened

libsql-wal/src/segment/list.rs

Lines changed: 64 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -4,16 +4,16 @@ use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering};
44
use std::sync::Arc;
55

66
use arc_swap::ArcSwapOption;
7-
use fst::map::{OpBuilder, Union};
87
use fst::raw::IndexedValue;
98
use fst::Streamer;
109
use roaring::RoaringBitmap;
1110
use tokio_stream::Stream;
11+
use uuid::Uuid;
1212
use zerocopy::FromZeroes;
1313

1414
use crate::error::Result;
1515
use crate::io::buf::{ZeroCopyBoxIoBuf, ZeroCopyBuf};
16-
use crate::io::FileExt;
16+
use crate::io::{FileExt, Io};
1717
use crate::segment::Frame;
1818
use crate::{LibsqlFooter, LIBSQL_MAGIC, LIBSQL_PAGE_SIZE, LIBSQL_WAL_VERSION};
1919

@@ -78,10 +78,13 @@ where
7878

7979
/// Checkpoints as many segments as possible to the main db file, and return the checkpointed
8080
/// frame_no, if anything was checkpointed
81-
pub async fn checkpoint<F>(&self, db_file: &F, until_frame_no: u64) -> Result<Option<u64>>
82-
where
83-
F: FileExt,
84-
{
81+
pub async fn checkpoint<IO: Io>(
82+
&self,
83+
db_file: &IO::File,
84+
until_frame_no: u64,
85+
log_id: Uuid,
86+
io: &IO,
87+
) -> Result<Option<u64>> {
8588
struct Guard<'a>(&'a AtomicBool);
8689
impl<'a> Drop for Guard<'a> {
8790
fn drop(&mut self) {
@@ -122,24 +125,13 @@ where
122125

123126
let size_after = segs.first().unwrap().size_after();
124127

125-
let union = segs
126-
.iter()
127-
.map(|s| s.index())
128-
.collect::<OpBuilder>()
129-
.union();
130-
131-
/// Safety: Union contains a Box<dyn trait> that doesn't require Send, to it's not send.
132-
/// That's an issue for us, but all the indexes we have are safe to send, so we're good.
133-
/// FIXME: we could implement union ourselves.
134-
unsafe impl Send for SendUnion<'_> {}
135-
unsafe impl Sync for SendUnion<'_> {}
136-
struct SendUnion<'a>(Union<'a>);
128+
let index_iter = segs.iter().map(|s| s.index());
137129

138-
let mut union = SendUnion(union);
130+
let mut union = send_fst_ops::SendUnion::from_index_iter(index_iter);
139131

140132
let mut buf = ZeroCopyBuf::<Frame>::new_uninit();
141133
let mut last_replication_index = 0;
142-
while let Some((k, v)) = union.0.next() {
134+
while let Some((k, v)) = union.next() {
143135
let page_no = u32::from_be_bytes(k.try_into().unwrap());
144136
let v = v.iter().min_by_key(|i| i.index).unwrap();
145137
let offset = v.value as u32;
@@ -163,19 +155,25 @@ where
163155
magic: LIBSQL_MAGIC.into(),
164156
version: LIBSQL_WAL_VERSION.into(),
165157
replication_index: last_replication_index.into(),
158+
log_id: log_id.as_u128().into(),
166159
};
167160

161+
db_file.set_len(size_after as u64 * LIBSQL_PAGE_SIZE as u64)?;
162+
168163
let footer_offset = size_after as usize * LIBSQL_PAGE_SIZE as usize;
169164
let (_, ret) = db_file
170165
.write_all_at_async(ZeroCopyBuf::new_init(footer), footer_offset as u64)
171166
.await;
172167
ret?;
173168

174169
// todo: truncate if necessary
175-
176-
//// todo: make async
170+
//// TODO: make async
177171
db_file.sync_all()?;
178172

173+
for seg in segs.iter() {
174+
seg.destroy(io).await;
175+
}
176+
179177
let mut current = self.head.compare_and_swap(&segs[0], None);
180178
if Arc::ptr_eq(&segs[0], current.as_ref().unwrap()) {
181179
// nothing to do
@@ -196,8 +194,6 @@ where
196194

197195
self.len.fetch_sub(segs.len(), Ordering::Relaxed);
198196

199-
db_file.set_len(size_after as u64 * 4096)?;
200-
201197
Ok(Some(last_replication_index))
202198
}
203199

@@ -241,7 +237,8 @@ where
241237
.max(until_fno);
242238

243239
let stream = async_stream::try_stream! {
244-
let mut union = fst::map::OpBuilder::from_iter(segments.iter().map(|s| s.index())).union();
240+
let index_iter = segments.iter().map(|s| s.index());
241+
let mut union = send_fst_ops::SendUnion::from_index_iter(index_iter);
245242
while let Some((key_bytes, indexes)) = union.next() {
246243
let page_no = u32::from_be_bytes(key_bytes.try_into().unwrap());
247244
// we already have a more recent version of this page.
@@ -337,6 +334,46 @@ impl<T> List<T> {
337334
}
338335
}
339336

337+
mod send_fst_ops {
338+
use std::ops::{Deref, DerefMut};
339+
use std::sync::Arc;
340+
341+
use fst::map::{OpBuilder, Union};
342+
343+
/// Safety: Union contains a Box<dyn trait> that doesn't require Send, to it's not send.
344+
/// That's an issue for us, but all the indexes we have are safe to send, so we're good.
345+
/// FIXME: we could implement union ourselves.
346+
unsafe impl Send for SendUnion<'_> {}
347+
unsafe impl Sync for SendUnion<'_> {}
348+
349+
#[repr(transparent)]
350+
pub(super) struct SendUnion<'a>(Union<'a>);
351+
352+
impl<'a> SendUnion<'a> {
353+
pub fn from_index_iter<I>(iter: I) -> Self
354+
where
355+
I: Iterator<Item = &'a fst::map::Map<Arc<[u8]>>>,
356+
{
357+
let op = iter.collect::<OpBuilder>().union();
358+
Self(op)
359+
}
360+
}
361+
362+
impl<'a> Deref for SendUnion<'a> {
363+
type Target = Union<'a>;
364+
365+
fn deref(&self) -> &Self::Target {
366+
&self.0
367+
}
368+
}
369+
370+
impl<'a> DerefMut for SendUnion<'a> {
371+
fn deref_mut(&mut self) -> &mut Self::Target {
372+
&mut self.0
373+
}
374+
}
375+
}
376+
340377
#[cfg(test)]
341378
mod test {
342379
use std::io::{Read, Seek, Write};
@@ -393,8 +430,8 @@ mod test {
393430
*shared.durable_frame_no.lock() = 999999;
394431
shared.checkpoint().await.unwrap();
395432
file.seek(std::io::SeekFrom::Start(0)).unwrap();
396-
let mut copy_ytes = Vec::new();
397-
file.read_to_end(&mut copy_ytes).unwrap();
433+
let mut copy_bytes = Vec::new();
434+
file.read_to_end(&mut copy_bytes).unwrap();
398435

399436
let mut orig_bytes = Vec::new();
400437
shared

libsql-wal/src/shared_wal.rs

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -53,6 +53,7 @@ pub struct SharedWal<IO: Io> {
5353
pub(crate) checkpoint_notifier: mpsc::Sender<CheckpointMessage>,
5454
/// maximum size the segment is allowed to grow
5555
pub(crate) max_segment_size: AtomicUsize,
56+
pub(crate) io: Arc<IO>,
5657
}
5758

5859
impl<IO: Io> SharedWal<IO> {
@@ -301,7 +302,7 @@ impl<IO: Io> SharedWal<IO> {
301302
.current
302303
.load()
303304
.tail()
304-
.checkpoint(&self.db_file, durable_frame_no)
305+
.checkpoint(&self.db_file, durable_frame_no, self.log_id(), &self.io)
305306
.await?;
306307
if let Some(checkpointed_frame_no) = checkpointed_frame_no {
307308
self.checkpointed_frame_no

0 commit comments

Comments
 (0)