Skip to content

Commit 89dcb74

Browse files
committed
libsql: provide more return info for sync
1 parent d6a380c commit 89dcb74

4 files changed

Lines changed: 123 additions & 23 deletions

File tree

libsql-server/tests/embedded_replica/mod.rs

Lines changed: 89 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -98,15 +98,15 @@ fn embedded_replica() {
9898
)
9999
.await?;
100100

101-
let n = db.sync().await?;
101+
let n = db.sync().await?.frame_no();
102102
assert_eq!(n, None);
103103

104104
let conn = db.connect()?;
105105

106106
conn.execute("CREATE TABLE user (id INTEGER NOT NULL PRIMARY KEY)", ())
107107
.await?;
108108

109-
let n = db.sync().await?;
109+
let n = db.sync().await?.frame_no();
110110
assert_eq!(n, Some(1));
111111

112112
let err = conn
@@ -171,15 +171,15 @@ fn execute_batch() {
171171
)
172172
.await?;
173173

174-
let n = db.sync().await?;
174+
let n = db.sync().await?.frame_no();
175175
assert_eq!(n, None);
176176

177177
let conn = db.connect()?;
178178

179179
conn.execute("CREATE TABLE user (id INTEGER NOT NULL PRIMARY KEY)", ())
180180
.await?;
181181

182-
let n = db.sync().await?;
182+
let n = db.sync().await?.frame_no();
183183
assert_eq!(n, Some(1));
184184

185185
conn.execute_batch(
@@ -224,15 +224,15 @@ fn stream() {
224224
)
225225
.await?;
226226

227-
let n = db.sync().await?;
227+
let n = db.sync().await?.frame_no();
228228
assert_eq!(n, None);
229229

230230
let conn = db.connect()?;
231231

232232
conn.execute("CREATE TABLE user (id INTEGER NOT NULL PRIMARY KEY)", ())
233233
.await?;
234234

235-
let n = db.sync().await?;
235+
let n = db.sync().await?.frame_no();
236236
assert_eq!(n, Some(1));
237237

238238
conn.execute_batch(
@@ -299,15 +299,15 @@ fn embedded_replica_with_encryption() {
299299
)
300300
.await?;
301301

302-
let n = db.sync().await?;
302+
let n = db.sync().await?.frame_no();
303303
assert_eq!(n, None);
304304

305305
let conn = db.connect()?;
306306

307307
conn.execute("CREATE TABLE user (id INTEGER NOT NULL PRIMARY KEY)", ())
308308
.await?;
309309

310-
let n = db.sync().await?;
310+
let n = db.sync().await?.frame_no();
311311
assert_eq!(n, Some(1));
312312

313313
let err = conn
@@ -461,7 +461,7 @@ fn replica_primary_reset() {
461461
)
462462
.await
463463
.unwrap();
464-
let replica_index = replica.sync().await.unwrap().unwrap();
464+
let replica_index = replica.sync().await.unwrap().frame_no().unwrap();
465465
let primary_index = Client::new()
466466
.get("http://primary:9090/v1/namespaces/default/stats")
467467
.await
@@ -520,7 +520,7 @@ fn replica_primary_reset() {
520520
)
521521
.await
522522
.unwrap();
523-
let replica_index = replica.sync().await.unwrap().unwrap();
523+
let replica_index = replica.sync().await.unwrap().frame_no().unwrap();
524524
let primary_index = Client::new()
525525
.get("http://primary:9090/v1/namespaces/default/stats")
526526
.await
@@ -625,7 +625,7 @@ fn replica_no_resync_on_restart() {
625625
)
626626
.await
627627
.unwrap();
628-
db.sync().await.unwrap().unwrap()
628+
db.sync().await.unwrap().frame_no().unwrap()
629629
};
630630
let first_sync = before.elapsed();
631631

@@ -641,7 +641,7 @@ fn replica_no_resync_on_restart() {
641641
)
642642
.await
643643
.unwrap();
644-
db.sync().await.unwrap().unwrap()
644+
db.sync().await.unwrap().frame_no().unwrap()
645645
};
646646
let second_sync = before.elapsed();
647647

@@ -1226,3 +1226,80 @@ fn txn_bug_issue_1283() {
12261226

12271227
sim.run().unwrap();
12281228
}
1229+
1230+
#[test]
1231+
fn replicated_return() {
1232+
let tmp_embedded = tempdir().unwrap();
1233+
let tmp_host = tempdir().unwrap();
1234+
let tmp_embedded_path = tmp_embedded.path().to_owned();
1235+
let tmp_host_path = tmp_host.path().to_owned();
1236+
1237+
let mut sim = Builder::new()
1238+
.simulation_duration(Duration::from_secs(1000))
1239+
.build();
1240+
1241+
make_primary(&mut sim, tmp_host_path.clone());
1242+
1243+
sim.client("client", async move {
1244+
let client = Client::new();
1245+
client
1246+
.post("http://primary:9090/v1/namespaces/foo/create", json!({}))
1247+
.await?;
1248+
1249+
let path = tmp_embedded_path.join("embedded");
1250+
let db = Database::open_with_remote_sync_connector(
1251+
path.to_str().unwrap(),
1252+
"http://foo.primary:8080",
1253+
"",
1254+
TurmoilConnector,
1255+
false,
1256+
None,
1257+
)
1258+
.await?;
1259+
1260+
let rep = db.sync().await.unwrap();
1261+
assert_eq!(rep.frame_no(), None);
1262+
assert_eq!(rep.start_frame_no(), None);
1263+
1264+
let conn = db.connect()?;
1265+
1266+
conn.execute("CREATE TABLE user (id INTEGER NOT NULL PRIMARY KEY)", ())
1267+
.await
1268+
.unwrap();
1269+
1270+
let rep = db.sync().await.unwrap();
1271+
assert_eq!(rep.frame_no(), Some(1));
1272+
assert_eq!(rep.start_frame_no(), None);
1273+
1274+
conn.execute_batch("INSERT into user(id) values (1); INSERT into user(id) values (2); INSERT into user(id) values (3)")
1275+
.await
1276+
.unwrap();
1277+
1278+
let rep = db.sync().await.unwrap();
1279+
assert_eq!(rep.frame_no(), Some(4));
1280+
assert_eq!(rep.start_frame_no(), Some(1));
1281+
1282+
let wal_index_file = format!("{}-client_wal_index", path.to_str().unwrap());
1283+
1284+
std::fs::remove_file(wal_index_file).unwrap();
1285+
1286+
let db = Database::open_with_remote_sync_connector(
1287+
path.to_str().unwrap(),
1288+
"http://foo.primary:8080",
1289+
"",
1290+
TurmoilConnector,
1291+
false,
1292+
None,
1293+
)
1294+
.await?;
1295+
1296+
let rep = db.sync().await.unwrap();
1297+
assert_eq!(rep.frame_no(), Some(4));
1298+
assert_eq!(rep.start_frame_no(), Some(1));
1299+
1300+
1301+
Ok(())
1302+
});
1303+
1304+
sim.run().unwrap();
1305+
}

libsql/src/database.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -323,7 +323,7 @@ cfg_replication! {
323323

324324
/// Sync database from remote, and returns the committed frame_no after syncing, if
325325
/// applicable.
326-
pub async fn sync(&self) -> Result<Option<FrameNo>> {
326+
pub async fn sync(&self) -> Result<crate::replication::Replicated> {
327327
if let DbType::Sync { db, encryption_config: _ } = &self.db_type {
328328
db.sync().await
329329
} else {

libsql/src/local/database.rs

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -83,7 +83,7 @@ impl Database {
8383
encryption_config: Option<EncryptionConfig>,
8484
sync_interval: Option<std::time::Duration>,
8585
http_request_callback: Option<crate::util::HttpRequestCallback>,
86-
namespace: Option<String>
86+
namespace: Option<String>,
8787
) -> Result<Database> {
8888
use std::path::PathBuf;
8989

@@ -260,7 +260,7 @@ impl Database {
260260
#[cfg(feature = "replication")]
261261
/// Perform a sync step, returning the new replication index, or None, if the nothing was
262262
/// replicated yet
263-
pub async fn sync_oneshot(&self) -> Result<Option<FrameNo>> {
263+
pub async fn sync_oneshot(&self) -> Result<crate::replication::Replicated> {
264264
if let Some(ref ctx) = self.replication_ctx {
265265
ctx.replicator.sync_oneshot().await
266266
} else {
@@ -273,7 +273,7 @@ impl Database {
273273

274274
#[cfg(feature = "replication")]
275275
/// Sync with primary
276-
pub async fn sync(&self) -> Result<Option<FrameNo>> {
276+
pub async fn sync(&self) -> Result<crate::replication::Replicated> {
277277
Ok(self.sync_oneshot().await?)
278278
}
279279

libsql/src/replication/mod.rs

Lines changed: 30 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,22 @@ mod connection;
3232
pub(crate) mod local_client;
3333
pub(crate) mod remote_client;
3434

35+
#[derive(Debug)]
36+
pub struct Replicated {
37+
frame_no: Option<FrameNo>,
38+
start_frame_no: Option<FrameNo>,
39+
}
40+
41+
impl Replicated {
42+
pub fn frame_no(&self) -> Option<FrameNo> {
43+
self.frame_no
44+
}
45+
46+
pub fn start_frame_no(&self) -> Option<FrameNo> {
47+
self.start_frame_no
48+
}
49+
}
50+
3551
/// A set of rames to be injected via `sync_frames`.
3652
pub enum Frames {
3753
/// A set of frames, in increasing frame_no.
@@ -75,10 +91,7 @@ impl Writer {
7591
self.execute_steps(steps).await
7692
}
7793

78-
pub(crate) async fn execute_steps(
79-
&self,
80-
steps: Vec<Step>,
81-
) -> anyhow::Result<ExecuteResults> {
94+
pub(crate) async fn execute_steps(&self, steps: Vec<Step>) -> anyhow::Result<ExecuteResults> {
8295
self.client
8396
.execute_program(ProgramReq {
8497
client_id: self.client.client_id(),
@@ -186,7 +199,7 @@ impl EmbeddedReplicator {
186199
})
187200
}
188201

189-
pub async fn sync_oneshot(&self) -> Result<Option<FrameNo>> {
202+
pub async fn sync_oneshot(&self) -> Result<Replicated> {
190203
use libsql_replication::replicator::ReplicatorClient;
191204

192205
let mut replicator = self.replicator.lock().await;
@@ -196,6 +209,8 @@ impl EmbeddedReplicator {
196209
));
197210
}
198211

212+
let start_frame_no = replicator.client_mut().committed_frame_no();
213+
199214
// we force a handshake to get the most up to date replication index from the primary.
200215
replicator.force_handshake();
201216

@@ -218,7 +233,10 @@ impl EmbeddedReplicator {
218233
unreachable!()
219234
};
220235
let Some(primary_index) = client.last_handshake_replication_index() else {
221-
return Ok(None);
236+
return Ok(Replicated {
237+
frame_no: None,
238+
start_frame_no: None,
239+
});
222240
};
223241
if let Some(replica_index) = replicator.client_mut().committed_frame_no() {
224242
if replica_index >= primary_index {
@@ -229,7 +247,12 @@ impl EmbeddedReplicator {
229247
}
230248
}
231249

232-
Ok(replicator.client_mut().committed_frame_no())
250+
let replicated = Replicated {
251+
frame_no: replicator.client_mut().committed_frame_no(),
252+
start_frame_no,
253+
};
254+
255+
Ok(replicated)
233256
}
234257

235258
pub async fn sync_frames(&self, frames: Frames) -> Result<Option<FrameNo>> {

0 commit comments

Comments
 (0)