Skip to content

Commit b90ccea

Browse files
committed
replicate from durable storage
1 parent 8af0600 commit b90ccea

1 file changed

Lines changed: 18 additions & 4 deletions

File tree

libsql-wal/src/replication/replicator.rs

Lines changed: 18 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -104,12 +104,26 @@ impl<IO: Io> Replicator<IO> {
104104
should_replicate_from_storage.then_some(replicated_until)
105105
};
106106

107+
if let Some(replicated_until) = replicated_until {
108+
let stream = self
109+
.shared
110+
.stored_segments
111+
.stream(&mut seen, replicated_until, self.next_frame_no)
112+
.peekable();
107113

108-
yield frame
109-
}
114+
tokio::pin!(stream);
115+
116+
loop {
117+
let Some(frame) = stream.next().await else { break };
118+
let mut frame = frame?;
119+
commit_frame_no = frame.header().frame_no().max(commit_frame_no);
120+
if stream.peek().await.is_none() {
121+
frame.header_mut().set_size_after(size_after);
122+
self.next_frame_no = commit_frame_no + 1;
123+
}
110124

111-
if should_replicate_from_durable {
112-
todo!("we need to fetch new segments from durable storage");
125+
yield frame
126+
}
113127
}
114128
}
115129
}

0 commit comments

Comments
 (0)