Skip to content

Commit 2bcb04a

Browse files
committed
implement Storage for Either
1 parent e35f877 commit 2bcb04a

2 files changed

Lines changed: 126 additions & 2 deletions

File tree

libsql-wal/src/replication/storage.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,7 @@ pub trait ReplicateFromStorage: Sync + Send + 'static {
1818
seen: &'a mut RoaringBitmap,
1919
current: u64,
2020
until: u64,
21-
) -> Pin<Box<dyn Stream<Item = Result<Box<Frame>>> + 'a>>;
21+
) -> Pin<Box<dyn Stream<Item = Result<Box<Frame>>> + 'a + Send>>;
2222
}
2323

2424
pub struct StorageReplicator<S> {
@@ -41,7 +41,7 @@ where
4141
seen: &'a mut roaring::RoaringBitmap,
4242
mut current: u64,
4343
until: u64,
44-
) -> Pin<Box<dyn Stream<Item = Result<Box<Frame>>> + 'a>> {
44+
) -> Pin<Box<dyn Stream<Item = Result<Box<Frame>>> + Send + 'a>> {
4545
Box::pin(async_stream::try_stream! {
4646
loop {
4747
let key = self.storage.find_segment(&self.namespace, current, None).await?;

libsql-wal/src/storage/mod.rs

Lines changed: 124 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@ use chrono::{DateTime, Utc};
1010
use fst::Map;
1111
use hashbrown::HashMap;
1212
use libsql_sys::name::NamespaceName;
13+
use libsql_sys::wal::either::Either;
1314
use tempfile::{tempdir, TempDir};
1415

1516
use crate::io::{FileExt, Io, StdIO};
@@ -188,12 +189,135 @@ pub trait Storage: Send + Sync + 'static {
188189
fn shutdown(&self) -> impl Future<Output = ()> + Send {
189190
async { () }
190191
}
192+
}
193+
194+
/// special zip function for Either storage implementation
195+
fn zip<A, B, C, D>(
196+
x: &Either<A, B>,
197+
y: Option<Either<C, D>>,
198+
) -> Either<(&A, Option<C>), (&B, Option<D>)> {
199+
match (x, y) {
200+
(Either::A(a), Some(Either::A(c))) => Either::A((a, Some(c))),
201+
(Either::B(b), Some(Either::B(d))) => Either::B((b, Some(d))),
202+
(Either::A(a), None) => Either::A((a, None)),
203+
(Either::B(b), None) => Either::B((b, None)),
204+
_ => panic!("incompatible options"),
205+
}
206+
}
207+
208+
impl<A, B, S> Storage for Either<A, B>
209+
where
210+
A: Storage<Segment = S>,
211+
B: Storage<Segment = S>,
212+
S: Segment,
213+
{
214+
type Segment = S;
215+
type Config = Either<A::Config, B::Config>;
216+
217+
fn store(
218+
&self,
219+
namespace: &NamespaceName,
220+
seg: Self::Segment,
221+
config_override: Option<Self::Config>,
222+
on_store: OnStoreCallback,
223+
) {
224+
match zip(self, config_override) {
225+
Either::A((s, c)) => s.store(namespace, seg, c, on_store),
226+
Either::B((s, c)) => s.store(namespace, seg, c, on_store),
227+
}
228+
}
229+
230+
fn durable_frame_no_sync(
231+
&self,
232+
namespace: &NamespaceName,
233+
config_override: Option<Self::Config>,
234+
) -> u64 {
235+
match zip(self, config_override) {
236+
Either::A((s, c)) => s.durable_frame_no_sync(namespace, c),
237+
Either::B((s, c)) => s.durable_frame_no_sync(namespace, c),
238+
}
239+
}
240+
241+
async fn durable_frame_no(
242+
&self,
243+
namespace: &NamespaceName,
244+
config_override: Option<Self::Config>,
245+
) -> u64 {
246+
match zip(self, config_override) {
247+
Either::A((s, c)) => s.durable_frame_no(namespace, c).await,
248+
Either::B((s, c)) => s.durable_frame_no(namespace, c).await,
249+
}
250+
}
251+
252+
async fn restore(
253+
&self,
254+
file: impl FileExt,
255+
namespace: &NamespaceName,
256+
restore_options: RestoreOptions,
257+
config_override: Option<Self::Config>,
258+
) -> Result<()> {
259+
match zip(self, config_override) {
260+
Either::A((s, c)) => s.restore(file, namespace, restore_options, c).await,
261+
Either::B((s, c)) => s.restore(file, namespace, restore_options, c).await,
262+
}
263+
}
264+
265+
fn find_segment(
266+
&self,
267+
namespace: &NamespaceName,
268+
frame_no: u64,
269+
config_override: Option<Self::Config>,
270+
) -> impl Future<Output = Result<SegmentKey>> + Send {
271+
async move {
272+
match zip(self, config_override) {
273+
Either::A((s, c)) => s.find_segment(namespace, frame_no, c).await,
274+
Either::B((s, c)) => s.find_segment(namespace, frame_no, c).await,
275+
}
276+
}
277+
}
278+
191279
fn fetch_segment_index(
192280
&self,
193281
namespace: &NamespaceName,
194282
key: &SegmentKey,
195283
config_override: Option<Self::Config>,
196284
) -> impl Future<Output = Result<Map<Arc<[u8]>>>> + Send {
285+
async move {
286+
match zip(self, config_override) {
287+
Either::A((s, c)) => s.fetch_segment_index(namespace, key, c).await,
288+
Either::B((s, c)) => s.fetch_segment_index(namespace, key, c).await,
289+
}
290+
}
291+
}
292+
293+
fn fetch_segment_data(
294+
&self,
295+
namespace: &NamespaceName,
296+
key: &SegmentKey,
297+
config_override: Option<Self::Config>,
298+
) -> impl Future<Output = Result<CompactedSegment<impl FileExt>>> + Send {
299+
async move {
300+
match zip(self, config_override) {
301+
Either::A((s, c)) => {
302+
let seg = s.fetch_segment_data(namespace, key, c).await?;
303+
let seg = seg.remap_file_type(Either::A);
304+
Ok(seg)
305+
}
306+
Either::B((s, c)) => {
307+
let seg = s.fetch_segment_data(namespace, key, c).await?;
308+
let seg = seg.remap_file_type(Either::B);
309+
Ok(seg)
310+
}
311+
}
312+
}
313+
}
314+
315+
async fn shutdown(&self) {
316+
match self {
317+
Either::A(a) => a.shutdown().await,
318+
Either::B(b) => b.shutdown().await,
319+
}
320+
}
197321
}
198322

199323
/// a placeholder storage that doesn't store segment

0 commit comments

Comments
 (0)