Skip to content

Commit c1ebf3f

Browse files
committed
un-erase config type
1 parent 028a418 commit c1ebf3f

4 files changed

Lines changed: 41 additions & 55 deletions

File tree

libsql-wal/src/storage/async_storage.rs

Lines changed: 7 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,6 @@
11
//! `AsyncStorage` is a `Storage` implementation that defer storage to a background thread. The
22
//! durable frame_no is notified asynchronously.
33
4-
use std::any::Any;
54
use std::sync::Arc;
65

76
use chrono::Utc;
@@ -114,6 +113,7 @@ where
114113
}
115114
}
116115
}
116+
117117
tracing::info!("Storage shutdown");
118118
if let Some(notify) = notify_shutdown {
119119
let _ = notify.send(());
@@ -124,19 +124,12 @@ where
124124
&self,
125125
namespace: NamespaceName,
126126
ret: oneshot::Sender<super::Result<u64>>,
127-
config_override: Option<Arc<dyn Any + Send + Sync>>,
127+
config_override: Option<B::Config>,
128128
) {
129129
let backend = self.backend.clone();
130-
let config = match config_override
131-
.map(|c| c.downcast::<B::Config>())
132-
.transpose()
133-
{
134-
Ok(Some(config)) => config,
135-
Ok(None) => backend.default_config(),
136-
Err(_) => {
137-
let _ = ret.send(Err(super::Error::InvalidConfigType));
138-
return;
139-
}
130+
let config = match config_override {
131+
Some(config) => config,
132+
None => backend.default_config(),
140133
};
141134

142135
tokio::spawn(async move {
@@ -167,7 +160,7 @@ enum StorageLoopMessage<S, C> {
167160
Shutdown(oneshot::Sender<()>),
168161
}
169162

170-
pub struct AsyncStorage<B, S> {
163+
pub struct AsyncStorage<B: Backend, S> {
171164
/// send request to the main loop
172165
job_sender: mpsc::UnboundedSender<StorageLoopMessage<S, B::Config>>,
173166
force_shutdown: oneshot::Sender<()>,
@@ -195,15 +188,11 @@ where
195188
config_override: Option<Self::Config>,
196189
on_store_callback: OnStoreCallback,
197190
) {
198-
fn into_any<T: Sync + Send + 'static>(t: Arc<T>) -> Arc<dyn Any + Sync + Send> {
199-
t
200-
}
201-
202191
let req = StoreSegmentRequest {
203192
namespace: namespace.clone(),
204193
segment,
205194
created_at: Utc::now(),
206-
storage_config_override: config_override.map(into_any),
195+
storage_config_override: config_override,
207196
on_store_callback,
208197
};
209198

libsql-wal/src/storage/job.rs

Lines changed: 16 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -9,13 +9,13 @@ use crate::segment::Segment;
99

1010
/// A request, with an id
1111
#[derive(Debug)]
12-
pub(crate) struct IndexedRequest<T> {
13-
pub(crate) request: StoreSegmentRequest<T>,
12+
pub(crate) struct IndexedRequest<T, C> {
13+
pub(crate) request: StoreSegmentRequest<T, C>,
1414
pub(crate) id: u64,
1515
}
1616

17-
impl<T> Deref for IndexedRequest<T> {
18-
type Target = StoreSegmentRequest<T>;
17+
impl<T, C> Deref for IndexedRequest<T, C> {
18+
type Target = StoreSegmentRequest<T, C>;
1919

2020
fn deref(&self) -> &Self::Target {
2121
&self.request
@@ -24,10 +24,10 @@ impl<T> Deref for IndexedRequest<T> {
2424

2525
/// A storage Job to be performed
2626
#[derive(Debug)]
27-
pub(crate) struct Job<T> {
27+
pub(crate) struct Job<T, C> {
2828
/// Segment to store.
2929
// TODO: implement request batching (merge segment and send).
30-
pub(crate) request: IndexedRequest<T>,
30+
pub(crate) request: IndexedRequest<T, C>,
3131
}
3232

3333
// #[repr(transparent)]
@@ -43,13 +43,15 @@ pub(crate) struct Job<T> {
4343
// }
4444
//
4545
impl<Seg> Job<Seg>
46+
impl<Seg, C> Job<Seg, C>
4647
where
4748
Seg: Segment,
49+
C: Clone,
4850
{
4951
/// Perform the job and return the JobResult. This is not allowed to panic.
50-
pub(crate) async fn perform<B, IO>(self, backend: B, io: IO) -> JobResult<Seg>
52+
pub(crate) async fn perform<B, IO>(self, backend: B, io: IO) -> JobResult<Seg, C>
5153
where
52-
B: Backend,
54+
B: Backend<Config = C>,
5355
IO: Io,
5456
{
5557
let result = self.try_perform(backend, io).await;
@@ -58,7 +60,7 @@ where
5860

5961
async fn try_perform<B, IO>(&self, backend: B, io: IO) -> Result<u64>
6062
where
61-
B: Backend,
63+
B: Backend<Config = C>,
6264
IO: Io,
6365
{
6466
let segment = &self.request.segment;
@@ -81,9 +83,6 @@ where
8183
.request
8284
.storage_config_override
8385
.clone()
84-
.map(|c| c.downcast::<B::Config>())
85-
.transpose()
86-
.map_err(|_| super::Error::InvalidConfigType)?
8786
.unwrap_or_else(|| backend.default_config());
8887

8988
backend.store(&config, meta, tmp, new_index).await?;
@@ -93,9 +92,9 @@ where
9392
}
9493

9594
#[derive(Debug)]
96-
pub(crate) struct JobResult<S> {
95+
pub(crate) struct JobResult<S, C> {
9796
/// The job that was performed
98-
pub(crate) job: Job<S>,
97+
pub(crate) job: Job<S, C>,
9998
/// The outcome of the job: the new durable index, or an error.
10099
pub(crate) result: Result<u64>,
101100
}
@@ -457,8 +456,8 @@ mod test {
457456
todo!()
458457
}
459458

460-
fn default_config(&self) -> Arc<Self::Config> {
461-
Arc::new(())
459+
fn default_config(&self) -> Self::Config {
460+
()
462461
}
463462

464463
async fn restore(
@@ -501,7 +500,7 @@ mod test {
501500

502501
async fn fetch_segment_data(
503502
self: Arc<Self>,
504-
_config: Arc<Self::Config>,
503+
_config: Self::Config,
505504
_namespace: NamespaceName,
506505
_key: SegmentKey,
507506
) -> Result<impl FileExt> {

libsql-wal/src/storage/mod.rs

Lines changed: 6 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,3 @@
1-
use std::any::Any;
21
use std::collections::BTreeMap;
32
use std::fmt;
43
use std::future::Future;
@@ -11,7 +10,6 @@ use chrono::{DateTime, Utc};
1110
use fst::Map;
1211
use hashbrown::HashMap;
1312
use libsql_sys::name::NamespaceName;
14-
use parking_lot::Mutex;
1513
use tempfile::{tempdir, TempDir};
1614

1715
use crate::io::{FileExt, Io, StdIO};
@@ -204,7 +202,7 @@ impl Storage for NoStorage {
204202
&self,
205203
_namespace: &NamespaceName,
206204
_seg: Self::Segment,
207-
_config: Option<Arc<Self::Config>>,
205+
_config: Option<Self::Config>,
208206
_on_store: OnStoreCallback,
209207
) {
210208
}
@@ -239,7 +237,7 @@ impl Storage for NoStorage {
239237
&self,
240238
_namespace: &NamespaceName,
241239
_frame_no: u64,
242-
_config_override: Option<Arc<Self::Config>>,
240+
_config_override: Option<Self::Config>,
243241
) -> Result<SegmentKey> {
244242
unimplemented!()
245243
}
@@ -248,7 +246,7 @@ impl Storage for NoStorage {
248246
&self,
249247
_namespace: &NamespaceName,
250248
_key: &SegmentKey,
251-
_config_override: Option<Arc<Self::Config>>,
249+
_config_override: Option<Self::Config>,
252250
) -> Result<Map<Arc<[u8]>>> {
253251
unimplemented!()
254252
}
@@ -442,7 +440,7 @@ impl<IO: Io> Storage for TestStorage<IO> {
442440
}
443441
}
444442

445-
pub struct StoreSegmentRequest<S> {
443+
pub struct StoreSegmentRequest<S, C> {
446444
namespace: NamespaceName,
447445
/// Path to the segment. Read-only for bottomless
448446
segment: S,
@@ -451,12 +449,12 @@ pub struct StoreSegmentRequest<S> {
451449

452450
/// alternative configuration to use with the storage layer.
453451
/// e.g: S3 overrides
454-
storage_config_override: Option<Arc<dyn Any + Send + Sync>>,
452+
storage_config_override: Option<C>,
455453
/// Called after the segment was stored, with the new durable index
456454
on_store_callback: OnStoreCallback,
457455
}
458456

459-
impl<S> fmt::Debug for StoreSegmentRequest<S>
457+
impl<S, C> fmt::Debug for StoreSegmentRequest<S, C>
460458
where
461459
S: fmt::Debug,
462460
{

libsql-wal/src/storage/scheduler.rs

Lines changed: 12 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -5,13 +5,13 @@ use super::job::{IndexedRequest, Job, JobResult};
55
use super::StoreSegmentRequest;
66
use libsql_sys::name::NamespaceName;
77

8-
struct NamespaceRequests<F> {
9-
requests: VecDeque<IndexedRequest<F>>,
8+
struct NamespaceRequests<F, C> {
9+
requests: VecDeque<IndexedRequest<F, C>>,
1010
/// there's work in flight for this namespace
1111
in_flight: bool,
1212
}
1313

14-
impl<F> Default for NamespaceRequests<F> {
14+
impl<F, C> Default for NamespaceRequests<F, C> {
1515
fn default() -> Self {
1616
Self {
1717
requests: Default::default(),
@@ -28,14 +28,14 @@ impl<F> Default for NamespaceRequests<F> {
2828
/// processed, because only the most recent segment is checked for durability. This property
2929
/// ensures that all segments are present up to the max durable index.
3030
/// It is generic over C: the storage config type (for config overrides), and T, the segment type
31-
pub(crate) struct Scheduler<T> {
31+
pub(crate) struct Scheduler<T, C> {
3232
/// notify new durability index for namespace
33-
requests: HashMap<NamespaceName, NamespaceRequests<T>>,
33+
requests: HashMap<NamespaceName, NamespaceRequests<T, C>>,
3434
queue: priority_queue::PriorityQueue<NamespaceName, Reverse<u64>>,
3535
next_request_id: u64,
3636
}
3737

38-
impl<T> Scheduler<T> {
38+
impl<T, C> Scheduler<T, C> {
3939
pub fn new() -> Self {
4040
Self {
4141
requests: Default::default(),
@@ -46,7 +46,7 @@ impl<T> Scheduler<T> {
4646

4747
/// Register a new request with the scheduler
4848
#[tracing::instrument(skip_all)]
49-
pub fn register(&mut self, request: StoreSegmentRequest<T>) {
49+
pub fn register(&mut self, request: StoreSegmentRequest<T, C>) {
5050
// invariant: new segment comes immediately after the latest segment for that namespace. This means:
5151
// - immediately after the last registered segment, if there is any
5252
// - immediately after the last durable index
@@ -71,7 +71,7 @@ impl<T> Scheduler<T> {
7171
/// be scheduled, and returns description of the job to be performed. No other job for this
7272
/// namespace will be scheduled, until the `JobResult` is reported
7373
#[tracing::instrument(skip_all)]
74-
pub fn schedule(&mut self) -> Option<Job<T>> {
74+
pub fn schedule(&mut self) -> Option<Job<T, C>> {
7575
let (name, _) = self.queue.pop()?;
7676
let requests = self
7777
.requests
@@ -90,7 +90,7 @@ impl<T> Scheduler<T> {
9090
/// Report the job result to the scheduler. If the job result was a success, the request as
9191
/// removed from the queue, else, the job is rescheduled
9292
#[tracing::instrument(skip_all, fields(req_id = result.job.request.id))]
93-
pub async fn report(&mut self, result: JobResult<T>) {
93+
pub async fn report(&mut self, result: JobResult<T, C>) {
9494
// re-schedule, or report new max durable frame_no for segment
9595
let name = result.job.request.request.namespace.clone();
9696
let requests = self
@@ -151,7 +151,7 @@ mod test {
151151

152152
#[tokio::test]
153153
async fn schedule_simple() {
154-
let mut scheduler = Scheduler::<()>::new();
154+
let mut scheduler = Scheduler::<(), ()>::new();
155155

156156
let ns1 = NamespaceName::from("test1");
157157
let ns2 = NamespaceName::from("test2");
@@ -224,7 +224,7 @@ mod test {
224224

225225
#[tokio::test]
226226
async fn job_error_reschedule() {
227-
let mut scheduler = Scheduler::<()>::new();
227+
let mut scheduler = Scheduler::<(), ()>::new();
228228

229229
let ns1 = NamespaceName::from("test1");
230230
let ns2 = NamespaceName::from("test2");
@@ -264,7 +264,7 @@ mod test {
264264

265265
#[tokio::test]
266266
async fn schedule_while_in_flight() {
267-
let mut scheduler = Scheduler::<()>::new();
267+
let mut scheduler = Scheduler::<(), ()>::new();
268268

269269
let ns1 = NamespaceName::from("test1");
270270

0 commit comments

Comments
 (0)