Skip to content

Commit d554082

Browse files
committed
add durable frame_no to proto
1 parent 07d05f7 commit d554082

7 files changed

Lines changed: 12 additions & 2 deletions

File tree

bottomless/src/replicator.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1652,6 +1652,7 @@ impl Replicator {
16521652
let frame = RpcFrame {
16531653
data: frame_to_inject.bytes(),
16541654
timestamp: None,
1655+
durable_frame_no: None,
16551656
};
16561657
injector.inject_frame(frame).await?;
16571658
applied_wal_frame = true;

libsql-replication/proto/replication_log.proto

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,7 @@ message Frame {
3636
// if this frames is a commit frame, then this can be set
3737
// to the time when the transaction was commited
3838
optional int64 timestamp = 2;
39+
optional int64 durable_frame_no = 3;
3940
}
4041

4142
message Frames {

libsql-replication/src/generated/wal_log.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -83,6 +83,8 @@ pub struct Frame {
8383
/// to the time when the transaction was commited
8484
#[prost(int64, optional, tag = "2")]
8585
pub timestamp: ::core::option::Option<i64>,
86+
#[prost(int64, optional, tag = "3")]
87+
pub durable_frame_no: ::core::option::Option<i64>,
8688
}
8789
#[allow(clippy::derive_partial_eq_without_eq)]
8890
#[derive(Clone, PartialEq, ::prost::Message)]

libsql-replication/src/replicator.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -772,6 +772,7 @@ mod test {
772772
.map(|f| RpcFrame {
773773
data: f.bytes(),
774774
timestamp: None,
775+
durable_frame_no: None,
775776
})
776777
.take(2)
777778
.map(Ok)
@@ -785,6 +786,7 @@ mod test {
785786
.map(|f| RpcFrame {
786787
data: f.bytes(),
787788
timestamp: None,
789+
durable_frame_no: None,
788790
})
789791
.map(Ok)
790792
.collect::<Vec<_>>();

libsql-server/src/rpc/replication/libsql_replicator.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -102,6 +102,7 @@ where
102102
Poll::Ready(Some(Ok(RpcFrame {
103103
data,
104104
timestamp: None,
105+
durable_frame_no: None,
105106
})))
106107
}
107108
WalFlavor::Sqlite => {
@@ -116,6 +117,7 @@ where
116117
Poll::Ready(Some(Ok(RpcFrame {
117118
data: frame.bytes(),
118119
timestamp: None,
120+
durable_frame_no: None,
119121
})))
120122
}
121123
}

libsql-server/src/rpc/replication/replication_log.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -187,6 +187,7 @@ fn map_frame_stream_output(
187187
Ok((frame, ts)) => Ok(Frame {
188188
data: frame.bytes(),
189189
timestamp: ts.map(|ts| ts.timestamp_millis()),
190+
durable_frame_no: None,
190191
}),
191192
Err(LogReadError::SnapshotRequired) => Err(Status::new(
192193
tonic::Code::FailedPrecondition,
@@ -431,6 +432,7 @@ mod snapshot_stream {
431432
yield Ok(Frame {
432433
data: libsql_replication::frame::Frame::from(frame).bytes(),
433434
timestamp: None,
435+
durable_frame_no: None,
434436
});
435437
}
436438
Err(e) => {

libsql/src/replication/local_client.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -47,7 +47,7 @@ impl ReplicatorClient for LocalClient {
4747
async fn next_frames(&mut self) -> Result<Self::FrameStream, Error> {
4848
match self.frames.take() {
4949
Some(Frames::Vec(f)) => {
50-
let iter = f.into_iter().map(|f| RpcFrame { data: f.bytes(), timestamp: None }).map(Ok);
50+
let iter = f.into_iter().map(|f| RpcFrame { data: f.bytes(), timestamp: None, durable_frame_no: None }).map(Ok);
5151
Ok(Box::pin(tokio_stream::iter(iter)))
5252
}
5353
Some(f @ Frames::Snapshot(_)) => {
@@ -72,7 +72,7 @@ impl ReplicatorClient for LocalClient {
7272
next.header_mut().size_after = size_after.into();
7373
}
7474
let frame = Frame::from(next);
75-
yield RpcFrame { data: frame.bytes(), timestamp: None };
75+
yield RpcFrame { data: frame.bytes(), timestamp: None, durable_frame_no: None };
7676
}
7777
};
7878

0 commit comments

Comments
 (0)