Skip to content

Commit 710ae01

Browse files
committed
libsql: Update max_write_replication_index after every write
Signed-off-by: Piotr Jastrzebski <piotr@chiselstrike.com>
1 parent 6c5193d commit 710ae01

1 file changed

Lines changed: 16 additions & 1 deletion

File tree

libsql/src/replication/connection.rs

Lines changed: 16 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -28,7 +28,6 @@ pub struct RemoteConnection {
2828
pub(self) local: LibsqlConnection,
2929
writer: Option<Writer>,
3030
inner: Arc<Mutex<Inner>>,
31-
#[allow(dead_code)]
3231
max_write_replication_index: Arc<AtomicU64>,
3332
}
3433

@@ -178,6 +177,18 @@ impl RemoteConnection {
178177
}
179178
}
180179

180+
fn update_max_write_replication_index(&self, index: Option<u64>) {
181+
if let Some(index) = index {
182+
let mut current = self.max_write_replication_index.load(std::sync::atomic::Ordering::SeqCst);
183+
while index > current {
184+
match self.max_write_replication_index.compare_exchange(current, index, std::sync::atomic::Ordering::SeqCst, std::sync::atomic::Ordering::SeqCst) {
185+
Ok(_) => break,
186+
Err(new_current) => current = new_current,
187+
}
188+
}
189+
}
190+
}
191+
181192
fn is_state_init(&self) -> bool {
182193
matches!(self.inner.lock().state, State::Init)
183194
}
@@ -204,6 +215,8 @@ impl RemoteConnection {
204215
.into();
205216
}
206217

218+
self.update_max_write_replication_index(res.current_frame_no);
219+
207220
if let Some(replicator) = writer.replicator() {
208221
replicator.sync_oneshot().await?;
209222
}
@@ -229,6 +242,8 @@ impl RemoteConnection {
229242
.into();
230243
}
231244

245+
self.update_max_write_replication_index(res.current_frame_no);
246+
232247
if let Some(replicator) = writer.replicator() {
233248
replicator.sync_oneshot().await?;
234249
}

0 commit comments

Comments
 (0)