Skip to content

Commit eec79f4

Browse files
committed
replicate from storage test
1 parent b90ccea commit eec79f4

1 file changed

Lines changed: 51 additions & 0 deletions

File tree

libsql-wal/src/replication/replicator.rs

Lines changed: 51 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -134,6 +134,8 @@ impl<IO: Io> Replicator<IO> {
134134

135135
#[cfg(test)]
136136
mod test {
137+
use std::time::Duration;
138+
137139
use tempfile::NamedTempFile;
138140
use tokio_stream::StreamExt;
139141

@@ -252,4 +254,53 @@ mod test {
252254
.unwrap();
253255
}
254256
}
257+
258+
#[tokio::test]
259+
async fn stream_from_storage() {
260+
let env = TestEnv::new_store(true);
261+
let conn = env.open_conn("test");
262+
let shared = env.shared("test");
263+
264+
conn.execute("create table test (x)", ()).unwrap();
265+
266+
conn.execute("insert into test values (randomblob(128))", ())
267+
.unwrap();
268+
269+
tokio::task::spawn_blocking({
270+
let shared = shared.clone();
271+
move || seal_current_segment(&shared)
272+
}).await.unwrap();
273+
274+
conn.execute("create table test2 (x)", ()).unwrap();
275+
conn.execute("insert into test2 values (randomblob(128))", ())
276+
.unwrap();
277+
278+
tokio::task::spawn_blocking({
279+
let shared = shared.clone();
280+
move || seal_current_segment(&shared)
281+
}).await.unwrap();
282+
283+
while !shared.current.load().tail().is_empty() {
284+
tokio::time::sleep(Duration::from_millis(50)).await;
285+
}
286+
287+
let db_content = std::fs::read(&env.db_path("test").join("data")).unwrap();
288+
289+
let mut replicator = Replicator::new(shared, 1);
290+
let stream = replicator.frame_stream().take(3);
291+
292+
tokio::pin!(stream);
293+
294+
let tmp = NamedTempFile::new().unwrap();
295+
let mut replica_content = vec![0u8; db_content.len()];
296+
while let Some(f) = stream.next().await {
297+
let frame = f.unwrap();
298+
dbg!(frame.header().page_no());
299+
let offset = (frame.header().page_no() as usize - 1) * 4096;
300+
tmp.as_file().write_all_at(frame.data(), offset as u64).unwrap();
301+
replica_content[offset..offset+4096].copy_from_slice(frame.data());
302+
}
303+
304+
assert_eq!(replica_content, db_content);
305+
}
255306
}

0 commit comments

Comments
 (0)