Skip to content

Commit 8deac0f

Browse files
committed
introduce BoxReplicationService type
for dynamic replication service selection
1 parent 83f3b67 commit 8deac0f

1 file changed

Lines changed: 46 additions & 0 deletions

File tree

libsql-replication/src/rpc.rs

Lines changed: 46 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -23,9 +23,14 @@ pub mod proxy {
2323

2424
pub mod replication {
2525
#![allow(clippy::all)]
26+
use std::pin::Pin;
2627

28+
use tokio_stream::Stream;
2729
use uuid::Uuid;
2830

31+
pub type BoxStream<'a, T> = Pin<Box<dyn Stream<Item = T> + Send + 'a>>;
32+
33+
use self::replication_log_server::ReplicationLog;
2934
include!("generated/wal_log.rs");
3035

3136
pub const NO_HELLO_ERROR_MSG: &str = "NO_HELLO";
@@ -53,6 +58,47 @@ pub mod replication {
5358
}
5459
}
5560
}
61+
62+
pub type BoxReplicationService = Box<
63+
dyn ReplicationLog<
64+
LogEntriesStream = BoxStream<'static, Result<Frame, tonic::Status>>,
65+
SnapshotStream = BoxStream<'static, Result<Frame, tonic::Status>>,
66+
>,
67+
>;
68+
69+
#[tonic::async_trait]
70+
impl ReplicationLog for BoxReplicationService {
71+
type LogEntriesStream = BoxStream<'static, Result<Frame, tonic::Status>>;
72+
type SnapshotStream = BoxStream<'static, Result<Frame, tonic::Status>>;
73+
74+
async fn log_entries(
75+
&self,
76+
req: tonic::Request<LogOffset>,
77+
) -> Result<tonic::Response<Self::LogEntriesStream>, tonic::Status> {
78+
self.as_ref().log_entries(req).await
79+
}
80+
81+
async fn batch_log_entries(
82+
&self,
83+
req: tonic::Request<LogOffset>,
84+
) -> Result<tonic::Response<Frames>, tonic::Status> {
85+
self.as_ref().batch_log_entries(req).await
86+
}
87+
88+
async fn hello(
89+
&self,
90+
req: tonic::Request<HelloRequest>,
91+
) -> Result<tonic::Response<HelloResponse>, tonic::Status> {
92+
self.as_ref().hello(req).await
93+
}
94+
95+
async fn snapshot(
96+
&self,
97+
req: tonic::Request<LogOffset>,
98+
) -> Result<tonic::Response<Self::SnapshotStream>, tonic::Status> {
99+
self.as_ref().snapshot(req).await
100+
}
101+
}
56102
}
57103

58104
pub mod metadata {

0 commit comments

Comments
 (0)