Skip to content

Commit ce63363

Browse files
committed
fmt
1 parent e9c292d commit ce63363

7 files changed

Lines changed: 54 additions & 37 deletions

File tree

libsql-server/src/lib.rs

Lines changed: 8 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -979,10 +979,12 @@ where
979979
let namespace = conf.namespace().clone();
980980
let path = dbs_path.join(namespace.as_str());
981981
tokio::fs::create_dir_all(&path).await?;
982-
tokio::task::spawn_blocking(move || registry.open(&path.join("data"), &namespace.into()))
983-
.await
984-
.unwrap()?;
985-
}
982+
tokio::task::spawn_blocking(move || {
983+
registry.open(&path.join("data"), &namespace.into())
984+
})
985+
.await
986+
.unwrap()?;
987+
}
986988

987989
if self.should_sync_from_storage {
988990
registry.sync_all(self.sync_conccurency).await?;
@@ -1256,9 +1258,8 @@ where
12561258
let is_previous_migration_successful = self.check_previous_migration_success()?;
12571259
let is_libsql_wal = matches!(self.use_custom_wal, Some(CustomWAL::LibsqlWal));
12581260
let is_bottomless_enabled = self.db_config.bottomless_replication.is_some();
1259-
let should_attempt_migration = is_bottomless_enabled
1260-
&& !is_previous_migration_successful
1261-
&& is_libsql_wal;
1261+
let should_attempt_migration =
1262+
is_bottomless_enabled && !is_previous_migration_successful && is_libsql_wal;
12621263

12631264
if should_attempt_migration {
12641265
bottomless_migrate(meta_store, base_config.clone(), primary_config.clone()).await?;
@@ -1274,7 +1275,6 @@ where
12741275

12751276
tracing::info!("bottomless already migrated, skipping...");
12761277
}
1277-
12781278
}
12791279

12801280
Ok(false)

libsql-server/src/main.rs

Lines changed: 3 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -285,14 +285,13 @@ struct Cli {
285285
#[clap(
286286
long,
287287
env = "LIBSQL_SYNC_FROM_STORAGE",
288-
requires = "enable_bottomless_replication",
288+
requires = "enable_bottomless_replication"
289289
)]
290290
sync_from_storage: bool,
291291
/// Whether to force loading all WAL at startup, with libsql-wal
292292
/// By default, WALs are loaded lazily, as the databases are openned.
293-
#[clap(
294-
long,
295-
)]
293+
/// Whether to force loading all wal at startup
294+
#[clap(long)]
296295
force_load_wals: bool,
297296
/// Sync conccurency
298297
#[clap(

libsql-wal/src/lib.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -179,7 +179,7 @@ pub mod test {
179179
loop {
180180
{
181181
if *shared.durable_frame_no.lock() >= current {
182-
break
182+
break;
183183
}
184184
}
185185

libsql-wal/src/registry.rs

Lines changed: 27 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -151,7 +151,7 @@ fn maybe_store_segment<S: Storage>(
151151
notifier: &tokio::sync::mpsc::Sender<CheckpointMessage>,
152152
namespace: &NamespaceName,
153153
durable_frame_no: &Arc<Mutex<u64>>,
154-
seg: S::Segment
154+
seg: S::Segment,
155155
) {
156156
if seg.is_storable() {
157157
let cb: OnStoreCallback = Box::new({
@@ -339,14 +339,14 @@ where
339339
let header = segment.header();
340340
(header.size_after(), header.next_frame_no())
341341
})
342-
.unwrap_or_else(|| match header {
343-
Some(header) => (
344-
header.db_size.get(),
345-
NonZeroU64::new(checkpointed_frame_no + 1)
346-
.unwrap_or(NonZeroU64::new(1).unwrap()),
347-
),
348-
None => (0, NonZeroU64::new(1).unwrap()),
349-
});
342+
.unwrap_or_else(|| match header {
343+
Some(header) => (
344+
header.db_size.get(),
345+
NonZeroU64::new(checkpointed_frame_no + 1)
346+
.unwrap_or(NonZeroU64::new(1).unwrap()),
347+
),
348+
None => (0, NonZeroU64::new(1).unwrap()),
349+
});
350350

351351
let current_segment_path = path.join(format!("{namespace}:{next_frame_no:020}.seg"));
352352

@@ -405,15 +405,18 @@ where
405405

406406
/// Attempts to sync all loaded dbs with durable storage
407407
pub async fn sync_all(&self, conccurency: usize) -> Result<()>
408-
where S: Storage,
408+
where
409+
S: Storage,
409410
{
410411
let mut join_set = JoinSet::new();
411412
tracing::info!("syncing {} namespaces", self.opened.len());
412413
// FIXME: arbitrary value, maybe use something like numcpu * 2?
413414
let before_sync = Instant::now();
414415
let sem = Arc::new(Semaphore::new(conccurency));
415416
for entry in self.opened.iter() {
416-
let Slot::Wal(shared) = entry.value() else { panic!("all wals should already be opened") };
417+
let Slot::Wal(shared) = entry.value() else {
418+
panic!("all wals should already be opened")
419+
};
417420
let storage = self.storage.clone();
418421
let shared = shared.clone();
419422
let sem = sem.clone();
@@ -499,14 +502,22 @@ where
499502

500503
#[tracing::instrument(skip_all, fields(namespace = shared.namespace().as_str()))]
501504
async fn sync_one<IO, S>(shared: Arc<SharedWal<IO>>, storage: Arc<S>) -> Result<()>
502-
where IO: Io,
503-
S: Storage
505+
where
506+
IO: Io,
507+
S: Storage,
504508
{
505-
let remote_durable_frame_no = storage.durable_frame_no(shared.namespace(), None).await.map_err(Box::new)?;
509+
let remote_durable_frame_no = storage
510+
.durable_frame_no(shared.namespace(), None)
511+
.await
512+
.map_err(Box::new)?;
506513
let local_current_frame_no = shared.current.load().next_frame_no().get() - 1;
507514

508515
if remote_durable_frame_no > local_current_frame_no {
509-
tracing::info!(remote_durable_frame_no, local_current_frame_no, "remote storage has newer segments");
516+
tracing::info!(
517+
remote_durable_frame_no,
518+
local_current_frame_no,
519+
"remote storage has newer segments"
520+
);
510521
let mut seen = RoaringBitmap::new();
511522
let replicator = StorageReplicator::new(storage, shared.namespace().clone());
512523
let stream = replicator
@@ -523,7 +534,7 @@ where IO: Io,
523534
frame.header_mut().frame_no();
524535
frame.header_mut().set_size_after(seen.len() as _);
525536
injector.insert_frame(frame).await?;
526-
break
537+
break;
527538
} else {
528539
injector.insert_frame(frame).await?;
529540
}

libsql-wal/src/replication/injector.rs

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -20,13 +20,13 @@ pub struct Injector<IO: Io> {
2020
}
2121

2222
impl<IO: Io> Injector<IO> {
23-
pub fn new(
24-
wal: Arc<SharedWal<IO>>,
25-
buffer_capacity: usize,
26-
) -> Result<Self> {
23+
pub fn new(wal: Arc<SharedWal<IO>>, buffer_capacity: usize) -> Result<Self> {
2724
let mut tx = Transaction::Read(wal.begin_read(u64::MAX));
2825
wal.upgrade(&mut tx)?;
29-
let tx = tx.into_write().unwrap_or_else(|_| unreachable!()).into_lock_owned();
26+
let tx = tx
27+
.into_write()
28+
.unwrap_or_else(|_| unreachable!())
29+
.into_lock_owned();
3030
Ok(Self {
3131
wal,
3232
buffer: Vec::with_capacity(buffer_capacity),

libsql-wal/src/segment/current.rs

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -195,7 +195,9 @@ impl<F> CurrentSegment<F> {
195195
header.set_flags(header.flags().union(SegmentFlags::FRAME_UNORDERED));
196196
{
197197
let savepoint = tx.savepoints.first().unwrap();
198-
header.frame_count = (header.frame_count.get() + (tx.next_offset - savepoint.next_offset) as u64).into();
198+
header.frame_count = (header.frame_count.get()
199+
+ (tx.next_offset - savepoint.next_offset) as u64)
200+
.into();
199201
}
200202
header.recompute_checksum();
201203

@@ -335,7 +337,9 @@ impl<F> CurrentSegment<F> {
335337
// offset
336338
let tx = tx.deref_mut();
337339
let savepoint = tx.savepoints.first().unwrap();
338-
header.frame_count = (header.frame_count.get() + (tx.next_offset - savepoint.next_offset) as u64).into();
340+
header.frame_count = (header.frame_count.get()
341+
+ (tx.next_offset - savepoint.next_offset) as u64)
342+
.into();
339343
header.recompute_checksum();
340344

341345
self.file.write_all_at(header.as_bytes(), 0)?;

libsql-wal/src/segment/sealed.rs

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -201,7 +201,10 @@ where
201201
// - in a replica: no need for storage
202202
// - in a primary, on recovery from storage: we don't want to override remote
203203
// segment.
204-
!self.header().flags().contains(SegmentFlags::FRAME_UNORDERED)
204+
!self
205+
.header()
206+
.flags()
207+
.contains(SegmentFlags::FRAME_UNORDERED)
205208
}
206209
}
207210

0 commit comments

Comments
 (0)