Skip to content

Commit ebbd2bb

Browse files
committed
feat: add more framing utilities
- Replace `Invocation` by `Oneshot`, which now also supports `Accept` - Add `Oneshot::duplex` - Add `Oneshot::unix_pair` - Add `Accept{Stream,Receiver,UnboundedReceiver}` Signed-off-by: Roman Volosatovs <rvolosatovs@riseup.net>
1 parent c619faa commit ebbd2bb

18 files changed

Lines changed: 377 additions & 102 deletions

File tree

Cargo.lock

Lines changed: 12 additions & 12 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

Cargo.toml

Lines changed: 12 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
[package]
22
description = "WebAssembly component-native RPC framework based on WIT"
33
name = "wrpc"
4-
version = "0.16.0"
4+
version = "0.17.0"
55

66
authors.workspace = true
77
categories.workspace = true
@@ -167,22 +167,22 @@ wasmtime-wasi = { version = "38", default-features = false }
167167
wasmtime-wasi-http = { version = "38", default-features = false }
168168
wit-bindgen = { version = "0.45", default-features = false }
169169
wit-bindgen-core = { version = "0.36", default-features = false }
170-
wit-bindgen-wrpc = { version = "0.10", default-features = false, path = "./crates/wit-bindgen" }
170+
wit-bindgen-wrpc = { version = "0.11", default-features = false, path = "./crates/wit-bindgen" }
171171
wit-bindgen-wrpc-go = { version = "0.12", default-features = false, path = "./crates/wit-bindgen-go" }
172-
wit-bindgen-wrpc-rust = { version = "0.10", default-features = false, path = "./crates/wit-bindgen-rust" }
173-
wit-bindgen-wrpc-rust-macro = { version = "0.10", default-features = false, path = "./crates/wit-bindgen-rust-macro" }
172+
wit-bindgen-wrpc-rust = { version = "0.11", default-features = false, path = "./crates/wit-bindgen-rust" }
173+
wit-bindgen-wrpc-rust-macro = { version = "0.11", default-features = false, path = "./crates/wit-bindgen-rust-macro" }
174174
wit-component = { version = "0.239", default-features = false }
175175
wit-parser = { version = "0.220", default-features = false }
176176
wrpc-cli = { version = "0.7", path = "./crates/cli", default-features = false }
177177
wrpc-introspect = { version = "0.7", default-features = false, path = "./crates/introspect" }
178-
wrpc-runtime-wasmtime = { version = "0.30", path = "./crates/runtime-wasmtime", default-features = false }
178+
wrpc-runtime-wasmtime = { version = "0.31", path = "./crates/runtime-wasmtime", default-features = false }
179179
wrpc-test = { path = "./crates/test", default-features = false }
180-
wrpc-transport = { version = "0.28.3", path = "./crates/transport", default-features = false }
181-
wrpc-transport-nats = { version = "0.30", path = "./crates/transport-nats", default-features = false }
182-
wrpc-transport-quic = { version = "0.5", path = "./crates/transport-quic", default-features = false }
183-
wrpc-transport-web = { version = "0.2", path = "./crates/transport-web", default-features = false }
180+
wrpc-transport = { version = "0.29", path = "./crates/transport", default-features = false }
181+
wrpc-transport-nats = { version = "0.31", path = "./crates/transport-nats", default-features = false }
182+
wrpc-transport-quic = { version = "0.6", path = "./crates/transport-quic", default-features = false }
183+
wrpc-transport-web = { version = "0.3", path = "./crates/transport-web", default-features = false }
184184
wrpc-wasi-keyvalue = { version = "0.1.1", path = "./crates/wasi-keyvalue", default-features = false }
185-
wrpc-wasi-keyvalue-mem = { version = "0.1", path = "./crates/wasi-keyvalue-mem", default-features = false }
186-
wrpc-wasi-keyvalue-redis = { version = "0.1", path = "./crates/wasi-keyvalue-redis", default-features = false }
187-
wrpc-wasmtime-cli = { version = "0.8.1", path = "./crates/wasmtime-cli", default-features = false }
185+
wrpc-wasi-keyvalue-mem = { version = "0.2", path = "./crates/wasi-keyvalue-mem", default-features = false }
186+
wrpc-wasi-keyvalue-redis = { version = "0.2", path = "./crates/wasi-keyvalue-redis", default-features = false }
187+
wrpc-wasmtime-cli = { version = "0.9", path = "./crates/wasmtime-cli", default-features = false }
188188
wtransport = { version = "0.6.1", default-features = false }

crates/runtime-wasmtime/Cargo.toml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
[package]
22
name = "wrpc-runtime-wasmtime"
3-
version = "0.30.0"
3+
version = "0.31.0"
44
description = "wRPC wasmtime integration"
55

66
authors.workspace = true

crates/transport-nats/Cargo.toml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
[package]
22
name = "wrpc-transport-nats"
3-
version = "0.30.0"
3+
version = "0.31.0"
44
description = "wRPC NATS transport"
55

66
authors.workspace = true

crates/transport-quic/Cargo.toml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
[package]
22
name = "wrpc-transport-quic"
3-
version = "0.5.0"
3+
version = "0.6.0"
44
description = "wRPC QUIC transport"
55

66
authors.workspace = true

crates/transport-web/Cargo.toml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
[package]
22
name = "wrpc-transport-web"
3-
version = "0.2.0"
3+
version = "0.3.0"
44
description = "wRPC WebTransport transport"
55

66
authors.workspace = true

crates/transport/Cargo.toml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
[package]
22
name = "wrpc-transport"
3-
version = "0.28.4"
3+
version = "0.29.0"
44
description = "wRPC core transport functionality"
55

66
authors.workspace = true

crates/transport/src/frame/conn/accept.rs

Lines changed: 133 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,9 @@
11
use core::future::Future;
22
use core::ops::{Deref, DerefMut};
33

4+
use futures::{Stream, StreamExt as _};
45
use tokio::io::{AsyncRead, AsyncWrite};
6+
use tokio::sync::mpsc;
57

68
/// Accepts connections on a transport
79
pub trait Accept {
@@ -80,3 +82,134 @@ where
8082
Ok(((self.f)(cx), tx, rx))
8183
}
8284
}
85+
86+
/// A wrapper around a [Stream] of connections
87+
pub struct AcceptStream<T>(tokio::sync::Mutex<T>);
88+
89+
impl<T> From<T> for AcceptStream<T> {
90+
fn from(stream: T) -> Self {
91+
Self(tokio::sync::Mutex::new(stream))
92+
}
93+
}
94+
95+
impl<T, C, O, I> Accept for AcceptStream<T>
96+
where
97+
T: Stream<Item = (C, O, I)> + Unpin,
98+
C: Send + Sync + 'static,
99+
O: AsyncWrite + Send + Sync + Unpin + 'static,
100+
I: AsyncRead + Send + Sync + Unpin + 'static,
101+
{
102+
type Context = C;
103+
type Outgoing = O;
104+
type Incoming = I;
105+
106+
async fn accept(&self) -> std::io::Result<(Self::Context, Self::Outgoing, Self::Incoming)> {
107+
(&self).accept().await
108+
}
109+
}
110+
111+
impl<T, C, O, I> Accept for &AcceptStream<T>
112+
where
113+
T: Stream<Item = (C, O, I)> + Unpin,
114+
C: Send + Sync + 'static,
115+
O: AsyncWrite + Send + Sync + Unpin + 'static,
116+
I: AsyncRead + Send + Sync + Unpin + 'static,
117+
{
118+
type Context = C;
119+
type Outgoing = O;
120+
type Incoming = I;
121+
122+
async fn accept(&self) -> std::io::Result<(Self::Context, Self::Outgoing, Self::Incoming)> {
123+
let mut stream = self.0.lock().await;
124+
let Some((cx, tx, rx)) = stream.next().await else {
125+
return Err(std::io::ErrorKind::UnexpectedEof.into());
126+
};
127+
Ok((cx, tx, rx))
128+
}
129+
}
130+
131+
/// A wrapper around an [mpsc::Receiver] of connections
132+
pub struct AcceptReceiver<C, O, I>(tokio::sync::Mutex<mpsc::Receiver<(C, O, I)>>);
133+
134+
impl<C, O, I> From<mpsc::Receiver<(C, O, I)>> for AcceptReceiver<C, O, I> {
135+
fn from(stream: mpsc::Receiver<(C, O, I)>) -> Self {
136+
Self(tokio::sync::Mutex::new(stream))
137+
}
138+
}
139+
140+
impl<C, O, I> Accept for AcceptReceiver<C, O, I>
141+
where
142+
C: Send + Sync + 'static,
143+
O: AsyncWrite + Send + Sync + Unpin + 'static,
144+
I: AsyncRead + Send + Sync + Unpin + 'static,
145+
{
146+
type Context = C;
147+
type Outgoing = O;
148+
type Incoming = I;
149+
150+
async fn accept(&self) -> std::io::Result<(Self::Context, Self::Outgoing, Self::Incoming)> {
151+
(&self).accept().await
152+
}
153+
}
154+
155+
impl<C, O, I> Accept for &AcceptReceiver<C, O, I>
156+
where
157+
C: Send + Sync + 'static,
158+
O: AsyncWrite + Send + Sync + Unpin + 'static,
159+
I: AsyncRead + Send + Sync + Unpin + 'static,
160+
{
161+
type Context = C;
162+
type Outgoing = O;
163+
type Incoming = I;
164+
165+
async fn accept(&self) -> std::io::Result<(Self::Context, Self::Outgoing, Self::Incoming)> {
166+
let mut stream = self.0.lock().await;
167+
let Some((cx, tx, rx)) = stream.recv().await else {
168+
return Err(std::io::ErrorKind::UnexpectedEof.into());
169+
};
170+
Ok((cx, tx, rx))
171+
}
172+
}
173+
174+
/// A wrapper around an [mpsc::UnboundedReceiver] of connections
175+
pub struct AcceptUnboundedReceiver<C, O, I>(tokio::sync::Mutex<mpsc::UnboundedReceiver<(C, O, I)>>);
176+
177+
impl<C, O, I> From<mpsc::UnboundedReceiver<(C, O, I)>> for AcceptUnboundedReceiver<C, O, I> {
178+
fn from(stream: mpsc::UnboundedReceiver<(C, O, I)>) -> Self {
179+
Self(tokio::sync::Mutex::new(stream))
180+
}
181+
}
182+
183+
impl<C, O, I> Accept for AcceptUnboundedReceiver<C, O, I>
184+
where
185+
C: Send + Sync + 'static,
186+
O: AsyncWrite + Send + Sync + Unpin + 'static,
187+
I: AsyncRead + Send + Sync + Unpin + 'static,
188+
{
189+
type Context = C;
190+
type Outgoing = O;
191+
type Incoming = I;
192+
193+
async fn accept(&self) -> std::io::Result<(Self::Context, Self::Outgoing, Self::Incoming)> {
194+
(&self).accept().await
195+
}
196+
}
197+
198+
impl<C, O, I> Accept for &AcceptUnboundedReceiver<C, O, I>
199+
where
200+
C: Send + Sync + 'static,
201+
O: AsyncWrite + Send + Sync + Unpin + 'static,
202+
I: AsyncRead + Send + Sync + Unpin + 'static,
203+
{
204+
type Context = C;
205+
type Outgoing = O;
206+
type Incoming = I;
207+
208+
async fn accept(&self) -> std::io::Result<(Self::Context, Self::Outgoing, Self::Incoming)> {
209+
let mut stream = self.0.lock().await;
210+
let Some((cx, tx, rx)) = stream.recv().await else {
211+
return Err(std::io::ErrorKind::UnexpectedEof.into());
212+
};
213+
Ok((cx, tx, rx))
214+
}
215+
}

crates/transport/src/frame/mod.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@ use bytes::Bytes;
66

77
mod codec;
88
mod conn;
9+
mod oneshot;
910

1011
#[cfg(any(target_family = "wasm", feature = "net"))]
1112
pub mod tcp;
@@ -15,6 +16,7 @@ pub mod unix;
1516

1617
pub use codec::*;
1718
pub use conn::*;
19+
pub use oneshot::*;
1820

1921
/// Framing protocol version
2022
pub const PROTOCOL: u8 = 0;

0 commit comments

Comments
 (0)