Skip to content

Commit 9db13fe

Browse files
authored
libsql wal restore (#1569)
* make copy_to_file take ref to file * turn compacted segment frame_count to u32 * add size after to compacted segment header * make durable frame_no async * pass namespace by ref * add backend to async storage * add restore to Storage * add restore to shell * fix shell bug * error handling * fix tests
1 parent 759ffe0 commit 9db13fe

13 files changed

Lines changed: 432 additions & 210 deletions

File tree

libsql-wal/src/bins/shell/main.rs

Lines changed: 98 additions & 67 deletions
Original file line numberDiff line numberDiff line change
@@ -1,27 +1,25 @@
1+
use std::fs::OpenOptions;
12
use std::path::{Path, PathBuf};
23
use std::sync::Arc;
34

45
use aws_config::{BehaviorVersion, Region};
56
use aws_credential_types::Credentials;
67
use aws_sdk_s3::config::SharedCredentialsProvider;
7-
use clap::Parser;
8-
use libsql_wal::storage::backend::Backend;
8+
use clap::{Parser, ValueEnum};
99
use tokio::task::{block_in_place, JoinSet};
1010

1111
use libsql_sys::name::NamespaceName;
12-
use libsql_sys::rusqlite::{OpenFlags, OptionalExtension};
12+
use libsql_sys::rusqlite::OpenFlags;
1313
use libsql_wal::io::StdIO;
1414
use libsql_wal::registry::WalRegistry;
1515
use libsql_wal::segment::sealed::SealedSegment;
1616
use libsql_wal::storage::async_storage::{AsyncStorage, AsyncStorageInitConfig};
17-
use libsql_wal::storage::backend::s3::{S3Backend, S3Config};
17+
use libsql_wal::storage::backend::s3::S3Backend;
1818
use libsql_wal::storage::Storage;
1919
use libsql_wal::wal::LibsqlWalManager;
2020

2121
#[derive(Debug, clap::Parser)]
2222
struct Cli {
23-
#[arg(long, short = 'p')]
24-
db_path: PathBuf,
2523
#[command(flatten)]
2624
s3_args: S3Args,
2725
#[arg(long, short = 'n')]
@@ -58,10 +56,24 @@ struct S3Args {
5856
s3_region_id: Option<String>,
5957
}
6058

59+
#[derive(Copy, Clone, PartialEq, Eq, PartialOrd, Ord, ValueEnum, Debug)]
60+
enum RestoreOptions {
61+
Latest,
62+
}
63+
6164
#[derive(Debug, clap::Subcommand)]
6265
enum Subcommand {
63-
Shell,
66+
Shell {
67+
#[arg(long, short = 'p')]
68+
db_path: PathBuf,
69+
},
6470
Infos,
71+
Restore {
72+
#[arg(long)]
73+
from: RestoreOptions,
74+
#[arg(long, short)]
75+
path: PathBuf,
76+
},
6577
}
6678

6779
#[tokio::main]
@@ -70,63 +82,73 @@ async fn main() {
7082
let mut join_set = JoinSet::new();
7183

7284
if cli.s3_args.enable_s3 {
73-
let registry = setup_s3_registry(
74-
&cli.db_path,
75-
&cli.s3_args.s3_bucket.as_ref().unwrap(),
76-
&cli.s3_args.cluster_id.as_ref().unwrap(),
77-
&cli.s3_args.s3_url.as_ref().unwrap(),
78-
&cli.s3_args.s3_region_id.as_ref().unwrap(),
79-
&cli.s3_args.s3_access_key_id.as_ref().unwrap(),
80-
&cli.s3_args.s3_access_key.as_ref().unwrap(),
81-
&mut join_set,
82-
)
83-
.await;
84-
85-
handle(registry, &cli).await;
85+
let storage = setup_s3_storage(&cli, &mut join_set).await;
86+
handle(&cli, storage).await;
8687
} else {
8788
todo!()
8889
}
8990

9091
while join_set.join_next().await.is_some() {}
9192
}
9293

93-
async fn handle<S, B>(env: Env<S, B>, cli: &Cli)
94+
async fn handle<S>(cli: &Cli, storage: S)
9495
where
9596
S: Storage<Segment = SealedSegment<std::fs::File>>,
96-
B: Backend,
9797
{
98-
match cli.subcommand {
99-
Subcommand::Shell => {
100-
let path = cli.db_path.join("dbs").join(&cli.namespace);
98+
match &cli.subcommand {
99+
Subcommand::Shell { db_path } => {
100+
let registry = WalRegistry::new(db_path.clone(), storage).unwrap();
101101
run_shell(
102-
env.registry,
103-
&path,
102+
registry,
103+
&db_path,
104104
NamespaceName::from_string(cli.namespace.clone()),
105105
)
106-
.await
106+
.await;
107+
}
108+
Subcommand::Infos => handle_infos(&cli.namespace, storage).await,
109+
Subcommand::Restore { from, path } => {
110+
let namespace = NamespaceName::from_string(cli.namespace.clone());
111+
handle_restore(&namespace, storage, *from, path).await
107112
}
108-
Subcommand::Infos => handle_infos(&cli.namespace, env).await,
109113
}
110114
}
111115

112-
async fn handle_infos<B, S>(namespace: &str, env: Env<S, B>)
113-
where
114-
B: Backend,
116+
async fn handle_restore<S>(
117+
namespace: &NamespaceName,
118+
storage: S,
119+
_from: RestoreOptions,
120+
db_path: &Path,
121+
) where
122+
S: Storage,
115123
{
116-
let namespace = NamespaceName::from_string(namespace.to_owned());
117-
let meta = env
118-
.backend
119-
.meta(&env.backend.default_config(), namespace.clone())
124+
let options = libsql_wal::storage::RestoreOptions::Latest;
125+
let file = OpenOptions::new()
126+
.create_new(true)
127+
.write(true)
128+
.open(db_path)
129+
.unwrap();
130+
storage
131+
.restore(file, &namespace, options, None)
120132
.await
121133
.unwrap();
134+
}
135+
136+
async fn handle_infos<S>(namespace: &str, storage: S)
137+
where
138+
S: Storage,
139+
{
140+
let namespace = NamespaceName::from_string(namespace.to_owned());
141+
let durable = storage.durable_frame_no(&namespace, None).await;
122142
println!("namespace: {namespace}");
123-
println!("max durable frame: {}", meta.max_frame_no);
143+
println!("max durable frame: {durable}");
124144
}
125145

126146
async fn run_shell<S>(registry: WalRegistry<StdIO, S>, db_path: &Path, namespace: NamespaceName)
127147
where
128148
S: Storage<Segment = SealedSegment<std::fs::File>>,
129149
{
150+
let db_path = db_path.join("dbs").join(namespace.as_str());
151+
tokio::fs::create_dir_all(&db_path).await.unwrap();
130152
let registry = Arc::new(registry);
131153
let resolver = move |path: &Path| {
132154
NamespaceName::from_string(
@@ -163,14 +185,27 @@ where
163185
continue;
164186
}
165187

166-
if let Err(e) = block_in_place(|| {
167-
conn.query_row(&q, (), |row| {
168-
println!("{row:?}");
169-
Ok(())
170-
})
171-
.optional()
172-
}) {
173-
println!("error: {e}");
188+
match block_in_place(|| conn.prepare(&q)) {
189+
Ok(mut stmt) => {
190+
match block_in_place(|| {
191+
stmt.query_map((), |row| {
192+
println!("{row:?}");
193+
Ok(())
194+
})
195+
}) {
196+
Ok(rows) => block_in_place(|| {
197+
rows.for_each(|_| ());
198+
}),
199+
Err(e) => {
200+
println!("error: {e}");
201+
continue;
202+
}
203+
}
204+
}
205+
Err(e) => {
206+
println!("error: {e}");
207+
continue;
208+
}
174209
}
175210
}
176211
Err(_) => {
@@ -204,33 +239,30 @@ async fn handle_builtin<S>(
204239
false
205240
}
206241

207-
struct Env<S, B: Backend> {
208-
registry: WalRegistry<StdIO, S>,
209-
backend: Arc<B>,
210-
}
211-
212-
async fn setup_s3_registry(
213-
db_path: &Path,
214-
bucket_name: &str,
215-
cluster_id: &str,
216-
url: &str,
217-
region_id: &str,
218-
access_key_id: &str,
219-
secret_access_key: &str,
242+
async fn setup_s3_storage(
243+
cli: &Cli,
220244
join_set: &mut JoinSet<()>,
221-
) -> Env<AsyncStorage<S3Config, SealedSegment<std::fs::File>>, S3Backend<StdIO>> {
222-
let cred = Credentials::new(access_key_id, secret_access_key, None, None, "");
245+
) -> AsyncStorage<S3Backend<StdIO>, SealedSegment<std::fs::File>> {
246+
let cred = Credentials::new(
247+
cli.s3_args.s3_access_key_id.as_ref().unwrap(),
248+
cli.s3_args.s3_access_key.as_ref().unwrap(),
249+
None,
250+
None,
251+
"",
252+
);
223253
let config = aws_config::SdkConfig::builder()
224254
.behavior_version(BehaviorVersion::latest())
225-
.region(Region::new(region_id.to_string()))
255+
.region(Region::new(
256+
cli.s3_args.s3_region_id.as_ref().unwrap().to_string(),
257+
))
226258
.credentials_provider(SharedCredentialsProvider::new(cred))
227-
.endpoint_url(url)
259+
.endpoint_url(cli.s3_args.s3_url.as_ref().unwrap())
228260
.build();
229261
let backend = Arc::new(
230262
S3Backend::from_sdk_config(
231263
config.clone(),
232-
bucket_name.to_string(),
233-
cluster_id.to_string(),
264+
cli.s3_args.s3_bucket.as_ref().unwrap().to_string(),
265+
cli.s3_args.cluster_id.as_ref().unwrap().to_string(),
234266
)
235267
.await
236268
.unwrap(),
@@ -244,7 +276,6 @@ async fn setup_s3_registry(
244276
join_set.spawn(async move {
245277
storage_loop.run().await;
246278
});
247-
let path = db_path.join("wals");
248-
let registry = WalRegistry::new(path, storage).unwrap();
249-
Env { registry, backend }
279+
280+
storage
250281
}

libsql-wal/src/io/compat.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,7 @@ use super::FileExt;
88

99
/// Copy from src that implements AsyncRead to the detination file, returning how many bytes have
1010
/// been copied
11-
pub async fn copy_to_file<R, F>(mut src: R, dst: F) -> io::Result<usize>
11+
pub async fn copy_to_file<R, F>(mut src: R, dst: &F) -> io::Result<usize>
1212
where
1313
F: FileExt,
1414
R: AsyncRead + Unpin,

libsql-wal/src/lib.rs

Lines changed: 9 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -56,23 +56,26 @@ pub mod test {
5656

5757
pub fn shared(&self, namespace: &str) -> Arc<SharedWal<StdIO>> {
5858
let path = self.tmp.path().join(namespace).join("data");
59-
self.registry
60-
.clone()
61-
.open(path.as_ref(), &NamespaceName::from_string(namespace.into()))
62-
.unwrap()
59+
let registry = self.registry.clone();
60+
let namespace = NamespaceName::from_string(namespace.into());
61+
registry.clone().open(path.as_ref(), &namespace).unwrap()
6362
}
6463

6564
pub fn db_path(&self, namespace: &str) -> PathBuf {
6665
self.tmp.path().join(namespace)
6766
}
6867

69-
pub fn open_conn(&self, namespace: &str) -> libsql_sys::Connection<LibsqlWal<StdIO>> {
68+
pub fn open_conn(
69+
&self,
70+
namespace: &'static str,
71+
) -> libsql_sys::Connection<LibsqlWal<StdIO>> {
7072
let path = self.db_path(namespace);
73+
let wal = self.wal.clone();
7174
std::fs::create_dir_all(&path).unwrap();
7275
libsql_sys::Connection::open(
7376
path.join("data"),
7477
OpenFlags::SQLITE_OPEN_CREATE | OpenFlags::SQLITE_OPEN_READ_WRITE,
75-
self.wal.clone(),
78+
wal,
7679
100000,
7780
None,
7881
)

libsql-wal/src/registry.rs

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -245,7 +245,8 @@ where
245245
let (new_frame_notifier, _) = tokio::sync::watch::channel(next_frame_no.get() - 1);
246246

247247
// TODO: pass config override here
248-
let durable_frame_no = self.storage.durable_frame_no(&namespace, None).into();
248+
let max_frame_no = self.storage.durable_frame_no_sync(&namespace, None);
249+
let durable_frame_no = max_frame_no.into();
249250

250251
let shared = Arc::new(SharedWal {
251252
current,

libsql-wal/src/segment/compacted.rs

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -4,10 +4,11 @@ use zerocopy::{AsBytes, FromBytes, FromZeroes};
44
#[derive(Debug, AsBytes, FromZeroes, FromBytes)]
55
#[repr(C)]
66
pub struct CompactedSegmentDataHeader {
7-
pub(crate) frame_count: lu64,
7+
pub(crate) frame_count: lu32,
88
pub(crate) segment_id: lu128,
99
pub(crate) start_frame_no: lu64,
1010
pub(crate) end_frame_no: lu64,
11+
pub(crate) size_after: lu32,
1112
}
1213

1314
#[derive(Debug, AsBytes, FromZeroes, FromBytes)]

libsql-wal/src/segment/sealed.rs

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -82,10 +82,11 @@ where
8282
let mut hasher = crc32fast::Hasher::new();
8383

8484
let header = CompactedSegmentDataHeader {
85-
frame_count: (self.index().len() as u64).into(),
85+
frame_count: (self.index().len() as u32).into(),
8686
segment_id: id.as_u128().into(),
8787
start_frame_no: self.header().start_frame_no,
8888
end_frame_no: self.header().last_commited_frame_no,
89+
size_after: self.header.size_after,
8990
};
9091

9192
hasher.update(header.as_bytes());

0 commit comments

Comments
 (0)