Skip to content

Commit 037a75b

Browse files
committed
fix current segment stream
1 parent 51c1c26 commit 037a75b

1 file changed

Lines changed: 7 additions & 4 deletions

File tree

libsql-wal/src/segment/current.rs

Lines changed: 7 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -365,13 +365,16 @@ impl<F> CurrentSegment<F> {
365365
&'a self,
366366
start_frame_no: u64,
367367
seen: &'a mut RoaringBitmap,
368-
) -> (impl Stream<Item = Result<Frame>> + 'a, u64, u32)
368+
) -> (impl Stream<Item = Result<Box<Frame>>> + 'a, u64, u32)
369369
where
370370
F: FileExt,
371371
{
372372
let (seg_start_frame_no, last_committed, db_size) =
373373
self.with_header(|h| (h.start_frame_no.get(), h.last_committed(), h.size_after()));
374-
let replicated_until = seg_start_frame_no.max(start_frame_no);
374+
let replicated_until = seg_start_frame_no
375+
// if current is empty, start_frame_no doesn't exist
376+
.min(last_committed)
377+
.max(start_frame_no);
375378

376379
// TODO: optim, we could read less frames if we had a mapping from frame_no to page_no in
377380
// the index
@@ -594,7 +597,7 @@ mod test {
594597
}
595598

596599
seal_current_segment(&shared);
597-
shared.durable_frame_no.store(999999, Ordering::Relaxed);
600+
*shared.durable_frame_no.lock() = 999999;
598601
shared.checkpoint().await.unwrap();
599602

600603
let mut orig = Vec::new();
@@ -673,7 +676,7 @@ mod test {
673676
let current = shared.current.load();
674677
let (stream, replicated_until, size_after) = current.frame_stream_from(1, &mut seen);
675678
tokio::pin!(stream);
676-
assert_eq!(replicated_until, 3);
679+
assert_eq!(replicated_until, 2);
677680
assert_eq!(size_after, 2);
678681
assert_eq!(stream.fold(0, |count, _| count + 1).await, 0);
679682
}

0 commit comments

Comments
 (0)