Skip to content

Commit d3914ef

Browse files
committed
store libsql-wal segments in the db directory
Instead of having a `wals` directory to manage all the wals for all the dbs, we now store the wals for a specific db alonside the db itself, in a `{db_name}-wal` directory. This servers multiple purposes - it's easier to cleanup a db: just remove the dir - the directory is placed where sqlite would expect the wal to be, but since it's a directory, it will refuse to open it, thus preventing accidental interferences between libsql-wal and sqlite - when forking, we can create the fork in a separate tmp dir, and then swap the whole dir, in place with all the wal contents - The regitry doesn't manage any of it's own files anymore, it'll be easier to integrate in embedded replicas.
1 parent 5129424 commit d3914ef

9 files changed

Lines changed: 37 additions & 77 deletions

File tree

libsql-server/src/bottomless_migrate.rs

Lines changed: 0 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -50,7 +50,6 @@ pub async fn bottomless_migrate(
5050
let tmp = TempDir::new()?;
5151

5252
tokio::fs::create_dir_all(tmp.path().join("dbs")).await?;
53-
tokio::fs::create_dir_all(tmp.path().join("wals")).await?;
5453

5554
let configs_stream = meta_store.namespaces();
5655
tokio::pin!(configs_stream);
@@ -68,7 +67,6 @@ pub async fn bottomless_migrate(
6867
});
6968

7069
let tmp_registry = Arc::new(WalRegistry::new(
71-
tmp.path().join("wals"),
7270
NoStorage.into(),
7371
sender,
7472
)?);
@@ -110,7 +108,6 @@ pub async fn bottomless_migrate(
110108
// doesn't exist, then we restore it, otherwise, we delete it.
111109
tokio::fs::rename(&base_dbs_dir, &base_dbs_dir_tmp).await?;
112110
tokio::fs::rename(tmp.path().join("dbs"), base_dbs_dir).await?;
113-
tokio::fs::rename(tmp.path().join("wals"), base_config.base_path.join("wals")).await?;
114111
tokio::fs::remove_dir_all(base_config.base_path.join("_dbs")).await?;
115112

116113
Ok(())

libsql-server/src/lib.rs

Lines changed: 11 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -835,20 +835,6 @@ where
835835
scripted_backup: Option<ScriptBackupManager>,
836836
meta_store: MetaStore,
837837
) -> anyhow::Result<(NamespaceConfigurators, MakeReplicationSvc)> {
838-
let wal_path = base_config.base_path.join("wals");
839-
let enable_libsql_wal_test = {
840-
let is_primary = self.rpc_server_config.is_some();
841-
let is_libsql_wal_test = std::env::var("LIBSQL_WAL_TEST").is_ok();
842-
is_primary && is_libsql_wal_test
843-
};
844-
let use_libsql_wal =
845-
self.use_custom_wal == Some(CustomWAL::LibsqlWal) || enable_libsql_wal_test;
846-
if !use_libsql_wal {
847-
if wal_path.try_exists()? {
848-
anyhow::bail!("database was previously setup to use libsql-wal");
849-
}
850-
}
851-
852838
#[cfg(feature = "durable-wal")]
853839
if let Some(CustomWAL::DurableWal) = self.use_custom_wal {
854840
if self.db_config.bottomless_replication.is_some() {
@@ -864,7 +850,6 @@ where
864850
task_manager,
865851
migration_scheduler_handle,
866852
scripted_backup,
867-
wal_path,
868853
meta_store,
869854
)
870855
.await
@@ -895,7 +880,6 @@ where
895880
task_manager: &mut TaskManager,
896881
migration_scheduler_handle: SchedulerHandle,
897882
scripted_backup: Option<ScriptBackupManager>,
898-
wal_path: PathBuf,
899883
meta_store: MetaStore,
900884
) -> anyhow::Result<(NamespaceConfigurators, MakeReplicationSvc)> {
901885
tracing::info!("using libsql wal");
@@ -964,7 +948,7 @@ where
964948
anyhow::bail!("replication without bottomless not supported yet");
965949
}
966950

967-
let registry = Arc::new(WalRegistry::new(wal_path, storage, sender)?);
951+
let registry = Arc::new(WalRegistry::new(storage, sender)?);
968952
let checkpointer = LibsqlCheckpointer::new(registry.clone(), receiver, 8);
969953
task_manager.spawn_with_shutdown_notify(|_| async move {
970954
checkpointer.run().await;
@@ -1288,15 +1272,16 @@ where
12881272
}
12891273

12901274
fn check_previous_migration_success(&self) -> anyhow::Result<bool> {
1291-
let wals_path = self.path.join("wals");
1292-
if !wals_path.try_exists()? {
1293-
return Ok(false);
1294-
}
1295-
1296-
let dir = std::fs::read_dir(&wals_path)?;
1297-
1298-
// wals dir exist and is not empty
1299-
Ok(dir.count() != 0)
1275+
todo!("not usings wal directory anymore");
1276+
// let wals_path = self.path.join("wals");
1277+
// if !wals_path.try_exists()? {
1278+
// return Ok(false);
1279+
// }
1280+
//
1281+
// let dir = std::fs::read_dir(&wals_path)?;
1282+
//
1283+
// // wals dir exist and is not empty
1284+
// Ok(dir.count() != 0)
13001285
}
13011286
}
13021287

libsql-server/src/namespace/configurator/helpers.rs

Lines changed: 0 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -494,28 +494,6 @@ pub async fn cleanup_libsql(
494494
let _ = tokio::fs::remove_dir_all(ns_db_path).await;
495495
}
496496

497-
let ns_wals_path = base_path.join("wals").join(namespace.as_str());
498-
if ns_wals_path.try_exists()? {
499-
tracing::debug!("removing database directory: {}", ns_wals_path.display());
500-
if let Err(e) = tokio::fs::remove_dir_all(ns_wals_path).await {
501-
// what can go wrong?:
502-
match e.kind() {
503-
// alright, there's nothing to delete anyway
504-
std::io::ErrorKind::NotFound => (),
505-
_ => {
506-
// something unexpected happened, this namespaces is in a bad state.
507-
// The entry will not be removed from the registry to prevent another
508-
// namespace with the same name to be reuse the same wal files. a
509-
// manual intervention is necessary
510-
// FIXME: on namespace creation, we could ensure that this directory is
511-
// clean.
512-
tracing::error!("error deleting `{namespace}` wal directory, manual intervention may be necessary: {e}");
513-
return Err(e.into());
514-
}
515-
}
516-
}
517-
}
518-
519497
// when all is cleaned, leave place for next one
520498
registry.remove(&namespace).await;
521499

libsql-wal/benches/benchmarks.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -61,7 +61,7 @@ fn with_libsql_conn(f: impl FnOnce(&mut Connection<LibsqlWal<StdIO>>)) {
6161

6262
let (sender, _) = tokio::sync::mpsc::channel(12);
6363
let registry =
64-
Arc::new(WalRegistry::new(tmp.path().join("wals"), NoStorage.into(), sender).unwrap());
64+
Arc::new(WalRegistry::new(NoStorage.into(), sender).unwrap());
6565
let wal_manager = LibsqlWalManager::new(registry.clone(), Arc::new(resolver));
6666

6767
let mut conn = libsql_sys::Connection::open(

libsql-wal/src/lib.rs

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -112,7 +112,6 @@ pub mod test {
112112
let registry = Arc::new(
113113
WalRegistry::new_with_io(
114114
io.clone(),
115-
tmp.path().join("test/wals"),
116115
TestStorage::new_io(store, io).into(),
117116
sender,
118117
)

libsql-wal/src/registry.rs

Lines changed: 14 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
use std::io;
22
use std::num::NonZeroU64;
3-
use std::path::{Path, PathBuf};
3+
use std::path::Path;
44
use std::sync::atomic::{AtomicBool, Ordering};
55
use std::sync::Arc;
66
use std::time::{Duration, Instant};
@@ -47,7 +47,6 @@ enum Slot<IO: Io> {
4747
/// Wal Registry maintains a set of shared Wal, and their respective set of files.
4848
pub struct WalRegistry<IO: Io, S> {
4949
io: Arc<IO>,
50-
path: PathBuf,
5150
shutdown: AtomicBool,
5251
opened: DashMap<NamespaceName, Slot<IO>>,
5352
storage: Arc<S>,
@@ -56,25 +55,21 @@ pub struct WalRegistry<IO: Io, S> {
5655

5756
impl<S> WalRegistry<StdIO, S> {
5857
pub fn new(
59-
path: PathBuf,
6058
storage: Arc<S>,
6159
checkpoint_notifier: mpsc::Sender<CheckpointMessage>,
6260
) -> Result<Self> {
63-
Self::new_with_io(StdIO(()), path, storage, checkpoint_notifier)
61+
Self::new_with_io(StdIO(()), storage, checkpoint_notifier)
6462
}
6563
}
6664

6765
impl<IO: Io, S> WalRegistry<IO, S> {
6866
pub fn new_with_io(
6967
io: IO,
70-
path: PathBuf,
7168
storage: Arc<S>,
7269
checkpoint_notifier: mpsc::Sender<CheckpointMessage>,
7370
) -> Result<Self> {
74-
io.create_dir_all(&path)?;
7571
let registry = Self {
7672
io: io.into(),
77-
path,
7873
opened: Default::default(),
7974
shutdown: Default::default(),
8075
storage,
@@ -246,10 +241,14 @@ where
246241

247242
let mut checkpointed_frame_no = footer.map(|f| f.replication_index.get()).unwrap_or(0);
248243

249-
let path = self.path.join(namespace.as_str());
250-
self.io.create_dir_all(&path)?;
244+
// the trick here to prevent sqlite to open our db is to create a dir <db-name>-wal. Sqlite
245+
// will think that this is a wal file, but it's in fact a directory and it will not like
246+
// it.
247+
let mut wals_path = db_path.to_owned();
248+
wals_path.set_file_name(format!("{}-wal", db_path.file_name().unwrap().to_str().unwrap()));
249+
self.io.create_dir_all(&wals_path)?;
251250
// TODO: handle that with abstract io
252-
let dir = walkdir::WalkDir::new(&path).sort_by_file_name().into_iter();
251+
let dir = walkdir::WalkDir::new(&wals_path).sort_by_file_name().into_iter();
253252

254253
// we only checkpoint durable frame_no so this is a good first estimate without an actual
255254
// network call.
@@ -323,7 +322,7 @@ where
323322
None => (0, NonZeroU64::new(1).unwrap()),
324323
});
325324

326-
let current_segment_path = path.join(format!("{namespace}:{next_frame_no:020}.seg"));
325+
let current_segment_path = wals_path.join(format!("{next_frame_no:020}.seg"));
327326

328327
let segment_file = self.io.open(true, true, true, &current_segment_path)?;
329328
let salt = self.io.with_rng(|rng| rng.gen());
@@ -368,6 +367,7 @@ where
368367
checkpoint_notifier: self.checkpoint_notifier.clone(),
369368
io: self.io.clone(),
370369
swap_strategy,
370+
wals_path: wals_path.to_owned(),
371371
});
372372

373373
self.opened
@@ -524,10 +524,9 @@ where
524524
return Ok(());
525525
}
526526
let start_frame_no = current.next_frame_no();
527-
let path = self
528-
.path
529-
.join(shared.namespace().as_str())
530-
.join(format!("{}:{start_frame_no:020}.seg", shared.namespace()));
527+
let path = shared
528+
.wals_path
529+
.join(format!("{start_frame_no:020}.seg"));
531530

532531
let segment_file = self.io.open(true, true, true, &path)?;
533532
let salt = self.io.with_rng(|rng| rng.gen());

libsql-wal/src/shared_wal.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,5 @@
11
use std::collections::BTreeMap;
2+
use std::path::PathBuf;
23
use std::sync::atomic::{AtomicBool, AtomicU64, Ordering};
34
use std::sync::Arc;
45
use std::time::Instant;
@@ -55,6 +56,7 @@ pub struct SharedWal<IO: Io> {
5556
pub(crate) checkpoint_notifier: mpsc::Sender<CheckpointMessage>,
5657
pub(crate) io: Arc<IO>,
5758
pub(crate) swap_strategy: Box<dyn SegmentSwapStrategy>,
59+
pub(crate) wals_path: PathBuf,
5860
}
5961

6062
impl<IO: Io> SharedWal<IO> {

libsql-wal/tests/flaky_fs.rs

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -213,14 +213,13 @@ async fn flaky_fs() {
213213
let registry = Arc::new(
214214
WalRegistry::new_with_io(
215215
io.clone(),
216-
tmp.path().join("test/wals"),
217216
TestStorage::new_io(false, io).into(),
218217
sender,
219218
)
220219
.unwrap(),
221220
);
222221
let wal_manager = LibsqlWalManager::new(registry.clone(), Arc::new(resolver));
223-
222+
tokio::fs::create_dir_all(tmp.path().join("test")).await.unwrap();
224223
let conn = libsql_sys::Connection::open(
225224
tmp.path().join("test/data").clone(),
226225
OpenFlags::SQLITE_OPEN_CREATE | OpenFlags::SQLITE_OPEN_READ_WRITE,

libsql-wal/tests/oracle.rs

Lines changed: 8 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,7 @@ use std::sync::Arc;
1111
use libsql_sys::ffi::{sqlite3_finalize, sqlite3_prepare, Sqlite3DbHeader};
1212
use libsql_sys::name::NamespaceName;
1313
use libsql_sys::rusqlite::OpenFlags;
14-
use libsql_sys::wal::{Sqlite3WalManager, Wal};
14+
use libsql_sys::wal::{Sqlite3WalManager, Wal, WalManager};
1515
use libsql_sys::Connection;
1616
use libsql_wal::registry::WalRegistry;
1717
use libsql_wal::storage::TestStorage;
@@ -67,7 +67,7 @@ async fn run_test_sample(path: &Path) -> Result {
6767

6868
let mut rng = rand_chacha::ChaCha8Rng::seed_from_u64(42);
6969
let before = std::time::Instant::now();
70-
let sqlite_results = run_script(&sqlite_conn, &script, &mut rng).collect::<Vec<_>>();
70+
let sqlite_results = run_script(&sqlite_conn, &script, &mut rng, Sqlite3WalManager::default()).collect::<Vec<_>>();
7171
println!("ran sqlite in {:?}", before.elapsed());
7272
drop(sqlite_conn);
7373

@@ -95,13 +95,13 @@ async fn run_test_sample(path: &Path) -> Result {
9595
let (sender, _receiver) = tokio::sync::mpsc::channel(64);
9696
let registry = Arc::new(
9797
WalRegistry::new(
98-
tmp.path().join("test/wals"),
9998
TestStorage::new().into(),
10099
sender,
101100
)
102101
.unwrap(),
103102
);
104103
let wal_manager = LibsqlWalManager::new(registry.clone(), Arc::new(resolver));
104+
tokio::fs::create_dir_all(tmp.path().join("test")).await.unwrap();
105105
let db_path = tmp.path().join("test/data").clone();
106106
let libsql_conn = libsql_sys::Connection::open(
107107
&db_path,
@@ -114,7 +114,7 @@ async fn run_test_sample(path: &Path) -> Result {
114114

115115
let mut rng = rand_chacha::ChaCha8Rng::seed_from_u64(42);
116116
let before = std::time::Instant::now();
117-
let libsql_results = run_script(&libsql_conn, &script, &mut rng).collect::<Vec<_>>();
117+
let libsql_results = run_script(&libsql_conn, &script, &mut rng, wal_manager.clone()).collect::<Vec<_>>();
118118
println!("ran libsql in {:?}", before.elapsed());
119119

120120
for ((a, _), (b, _)) in sqlite_results.iter().zip(libsql_results.iter()) {
@@ -204,12 +204,13 @@ fn run_script<'a, T: Wal>(
204204
conn: &'a Connection<T>,
205205
script: &'a str,
206206
rng: &'a mut ChaCha8Rng,
207+
wal_manager: impl WalManager + Clone + 'static,
207208
) -> impl Iterator<Item = (String, String)> + 'a {
208209
let mut stmts = split_statements(conn, script);
209210
std::iter::from_fn(move || {
210211
let stmt_str = patch_randomness(stmts.next()?, rng);
211212
if stmt_str.trim_start().starts_with("ATTACH") {
212-
patch_attach(&stmt_str);
213+
patch_attach(&stmt_str, wal_manager.clone());
213214
}
214215
let mut stmt = conn.prepare(&stmt_str).unwrap();
215216

@@ -227,14 +228,14 @@ fn run_script<'a, T: Wal>(
227228
})
228229
}
229230

230-
fn patch_attach(s: &str) {
231+
fn patch_attach(s: &str, wal: impl WalManager) {
231232
let mut split = s.split_whitespace();
232233
let name = split.nth(1).unwrap();
233234
let name = name.trim_matches('\'');
234235
let _ = libsql_sys::Connection::open(
235236
name,
236237
OpenFlags::SQLITE_OPEN_CREATE | OpenFlags::SQLITE_OPEN_READ_WRITE,
237-
Sqlite3WalManager::default(),
238+
wal,
238239
100000,
239240
None,
240241
)

0 commit comments

Comments
 (0)