Skip to content

Commit e5c8ce6

Browse files
Adapter to impl futures::Stream (#1575)
* Adapter to impl futures::Stream Signed-off-by: Brian Hardock <brian.hardock@fermyon.com> * Move to futures_stream mod Signed-off-by: Brian Hardock <brian.hardock@fermyon.com> * Add tests and build check Signed-off-by: Brian Hardock <brian.hardock@fermyon.com> --------- Signed-off-by: Brian Hardock <brian.hardock@fermyon.com>
1 parent 6a13b24 commit e5c8ce6

File tree

8 files changed

+175
-1
lines changed

8 files changed

+175
-1
lines changed

.github/workflows/main.yml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -226,6 +226,7 @@ jobs:
226226
- run: cargo build --target wasm32-wasip1 -p wit-bindgen --no-default-features --features async-spawn
227227
- run: cargo build --target wasm32-wasip1 -p wit-bindgen --no-default-features --features async,macros
228228
- run: cargo build --target wasm32-wasip1 -p wit-bindgen --no-default-features --features inter-task-wakeup
229+
- run: cargo build --target wasm32-wasip1 -p wit-bindgen --no-default-features --features futures-stream
229230

230231
# Verity that documentation can be generated for the rust bindings crate.
231232
- run: rustup update nightly --no-self-update

crates/guest-rust/Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,7 @@ std = []
3535
async = ["std", "wit-bindgen-rust-macro?/async"]
3636
bitflags = ["dep:bitflags"]
3737
async-spawn = ['async', 'dep:futures']
38+
futures-stream = ['async', 'dep:futures']
3839
macro-string = ["wit-bindgen-rust-macro?/macro-string"]
3940

4041
# Unstable feature to support being a libstd dependency

crates/guest-rust/src/rt/async_support.rs

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -61,6 +61,8 @@ mod abi_buffer;
6161
mod cabi;
6262
mod error_context;
6363
mod future_support;
64+
#[cfg(feature = "futures-stream")]
65+
mod futures_stream;
6466
#[cfg(feature = "inter-task-wakeup")]
6567
mod inter_task_wakeup;
6668
mod stream_support;
@@ -79,6 +81,8 @@ use self::waitable_set::WaitableSet;
7981
pub use abi_buffer::*;
8082
pub use error_context::*;
8183
pub use future_support::*;
84+
#[cfg(feature = "futures-stream")]
85+
pub use futures_stream::*;
8286
pub use stream_support::*;
8387
#[doc(hidden)]
8488
pub use subtask::Subtask;
Lines changed: 100 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,100 @@
1+
use super::stream_support::{RawStreamReader, StreamOps, StreamVtable};
2+
use std::boxed::Box;
3+
use std::{
4+
future::Future,
5+
pin::Pin,
6+
task::{Context, Poll},
7+
};
8+
9+
/// A wrapper around [`RawStreamReader`] that implements [`futures::Stream`].
10+
///
11+
/// Obtain one via [`RawStreamReader::into_stream`] or
12+
/// [`RawStreamReaderStream::new`].
13+
pub struct RawStreamReaderStream<O: StreamOps + 'static> {
14+
state: StreamAdapterState<O>,
15+
}
16+
17+
// SAFETY: No field is structurally pinned. The inner `Pin<Box<dyn Future>>`
18+
// is itself `Unpin`, and `RawStreamReader` is only stored when idle.
19+
impl<O: StreamOps + 'static> Unpin for RawStreamReaderStream<O> {}
20+
21+
/// Convenience alias for the common vtable-based case.
22+
pub type StreamReaderStream<T> = RawStreamReaderStream<&'static StreamVtable<T>>;
23+
24+
type ReadNextFut<O> =
25+
Pin<Box<dyn Future<Output = (RawStreamReader<O>, Option<<O as StreamOps>::Payload>)>>>;
26+
27+
enum StreamAdapterState<O: StreamOps + 'static> {
28+
/// The reader is idle and ready for the next read.
29+
Idle(RawStreamReader<O>),
30+
/// A read is in progress.
31+
Reading(ReadNextFut<O>),
32+
/// The stream has been exhausted.
33+
Complete,
34+
}
35+
36+
impl<O: StreamOps + 'static> RawStreamReaderStream<O> {
37+
/// Create a new [`futures::Stream`] wrapper from a [`RawStreamReader`].
38+
pub fn new(reader: RawStreamReader<O>) -> Self {
39+
Self {
40+
state: StreamAdapterState::Idle(reader),
41+
}
42+
}
43+
44+
/// Recover the underlying [`RawStreamReader`], if no read is in flight.
45+
///
46+
/// Returns `None` when a read is currently in progress or the stream has
47+
/// already finished.
48+
pub fn into_inner(self) -> Option<RawStreamReader<O>> {
49+
match self.state {
50+
StreamAdapterState::Idle(reader) => Some(reader),
51+
_ => None,
52+
}
53+
}
54+
}
55+
56+
impl<O: StreamOps + 'static> futures::stream::Stream for RawStreamReaderStream<O> {
57+
type Item = O::Payload;
58+
59+
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
60+
// All variants of `StreamAdapterState` are `Unpin`, so `Pin<&mut Self>`
61+
// can be freely projected.
62+
loop {
63+
match std::mem::replace(&mut self.state, StreamAdapterState::Complete) {
64+
StreamAdapterState::Idle(mut reader) => {
65+
let fut: ReadNextFut<O> = Box::pin(async move {
66+
let item = reader.next().await;
67+
(reader, item)
68+
});
69+
self.state = StreamAdapterState::Reading(fut);
70+
// Loop to immediately poll the new future.
71+
}
72+
StreamAdapterState::Reading(mut fut) => match fut.as_mut().poll(cx) {
73+
Poll::Pending => {
74+
self.state = StreamAdapterState::Reading(fut);
75+
return Poll::Pending;
76+
}
77+
Poll::Ready((reader, Some(item))) => {
78+
self.state = StreamAdapterState::Idle(reader);
79+
return Poll::Ready(Some(item));
80+
}
81+
Poll::Ready((_reader, None)) => {
82+
self.state = StreamAdapterState::Complete;
83+
return Poll::Ready(None);
84+
}
85+
},
86+
StreamAdapterState::Complete => {
87+
self.state = StreamAdapterState::Complete;
88+
return Poll::Ready(None);
89+
}
90+
}
91+
}
92+
}
93+
}
94+
95+
impl<O: StreamOps + 'static> RawStreamReader<O> {
96+
/// Convert this reader into a [`futures::Stream`].
97+
pub fn into_stream(self) -> RawStreamReaderStream<O> {
98+
RawStreamReaderStream::new(self)
99+
}
100+
}

crates/test/src/rust.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -132,7 +132,7 @@ name = "tmp"
132132
[workspace]
133133
134134
[dependencies]
135-
wit-bindgen = {{ {wit_bindgen_dep}, features = ['async-spawn', 'inter-task-wakeup'] }}
135+
wit-bindgen = {{ {wit_bindgen_dep}, features = ['async-spawn', 'inter-task-wakeup', 'futures-stream'] }}
136136
futures = "0.3.31"
137137
138138
[lib]
Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,24 @@
1+
//@ wasmtime-flags = '-Wcomponent-model-async'
2+
3+
include!(env!("BINDINGS"));
4+
5+
use crate::my::test::i::*;
6+
use wit_bindgen::StreamResult;
7+
8+
struct Component;
9+
10+
export!(Component);
11+
12+
impl Guest for Component {
13+
async fn run() {
14+
let (mut tx, rx) = wit_stream::new();
15+
let test = async {
16+
let (result, _ret) = tx.write(vec![10, 20, 30]).await;
17+
assert_eq!(result, StreamResult::Complete(3));
18+
19+
// Drop the writer so the reader sees the end of the stream.
20+
drop(tx);
21+
};
22+
let ((), ()) = futures::join!(test, read_stream(rx));
23+
}
24+
}
Lines changed: 29 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,29 @@
1+
use futures::stream::StreamExt;
2+
use wit_bindgen::StreamReader;
3+
4+
include!(env!("BINDINGS"));
5+
6+
struct Component;
7+
8+
export!(Component);
9+
10+
impl crate::exports::my::test::i::Guest for Component {
11+
async fn read_stream(x: StreamReader<u8>) {
12+
// Convert the low-level StreamReader into a futures::Stream
13+
let mut stream = x.into_stream();
14+
15+
// Read all items via StreamExt::next()
16+
let first = stream.next().await;
17+
assert_eq!(first, Some(10));
18+
19+
let second = stream.next().await;
20+
assert_eq!(second, Some(20));
21+
22+
let third = stream.next().await;
23+
assert_eq!(third, Some(30));
24+
25+
// Stream should be exhausted after the writer is dropped
26+
let end = stream.next().await;
27+
assert_eq!(end, None);
28+
}
29+
}
Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,15 @@
1+
package my:test;
2+
3+
interface i {
4+
read-stream: async func(x: stream<u8>);
5+
}
6+
7+
world test {
8+
export i;
9+
}
10+
11+
world runner {
12+
import i;
13+
14+
export run: async func();
15+
}

0 commit comments

Comments
 (0)