Skip to content

Commit c8e37c6

Browse files
authored
wasip3: Implement read/write timeouts (WebAssembly#762)
This is needed for socket options with TCP/UDP to implement timed-out reads/writes. This plumbs the relevant timeout to the waitable-set.wait invocation and handles the timeout outwards from there. This is a bit tricky since timeouts/cancellation can all race with actual completion of the operation, which is handled throughout and along the way, too.
1 parent 4bf849c commit c8e37c6

15 files changed

Lines changed: 289 additions & 148 deletions

File tree

cmake/bindings.cmake

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -131,7 +131,6 @@ add_custom_target(
131131
"--async=-wasi:filesystem/types@${wasip3-version}#[method]descriptor.set-times-at"
132132
"--async=-wasi:filesystem/types@${wasip3-version}#[method]descriptor.set-times"
133133
"--async=-wasi:clocks/monotonic-clock@${wasip3-version}#wait-until"
134-
"--async=-wasi:clocks/monotonic-clock@${wasip3-version}#wait-for"
135134
"--async=-wasi:sockets/ip-name-lookup@${wasip3-version}#resolve-addresses"
136135
${CMAKE_SOURCE_DIR}/wasi/p3/wit
137136
COMMAND cmake -E copy wasip3.h ${bottom_half}/headers/public/wasi/__generated_wasip3.h

expected/wasm32-wasip3/defined-symbols.txt

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -302,7 +302,7 @@ __wasilibc_fd_renumber
302302
__wasilibc_find_abspath
303303
__wasilibc_find_relpath
304304
__wasilibc_find_relpath_alloc
305-
__wasilibc_future_block_on
305+
__wasilibc_future_block_on_timeout
306306
__wasilibc_get_environ
307307
__wasilibc_get_service_entry_by_name
308308
__wasilibc_get_service_entry_by_port
@@ -340,7 +340,7 @@ __wasilibc_rmdirat
340340
__wasilibc_sockaddr_to_wasi
341341
__wasilibc_sockaddr_validate
342342
__wasilibc_stat
343-
__wasilibc_stream_block_on
343+
__wasilibc_stream_block_on_timeout
344344
__wasilibc_subtask_await_nonblocking
345345
__wasilibc_subtask_block_on_and_drop
346346
__wasilibc_tell

libc-bottom-half/cloudlibc/src/libc/time/clock_nanosleep.c

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@
88
#include <wasi/api.h>
99
#include <errno.h>
1010
#include <time.h>
11+
#include <wasi/wasip3_block.h>
1112

1213
#ifdef __wasip1__
1314
static_assert(TIMER_ABSTIME == __WASI_SUBCLOCKFLAGS_SUBSCRIPTION_CLOCK_ABSTIME,
@@ -61,7 +62,9 @@ int clock_nanosleep(clockid_t clock_id, int flags, const struct timespec *rqtp,
6162
if (flags & TIMER_ABSTIME) {
6263
monotonic_clock_wait_until(duration);
6364
} else {
64-
monotonic_clock_wait_for(duration);
65+
wasip3_subtask_status_t status = monotonic_clock_wait_for(duration);
66+
if (WASIP3_SUBTASK_STATE(status) != WASIP3_SUBTASK_RETURNED)
67+
__wasilibc_subtask_block_on_and_drop(WASIP3_SUBTASK_HANDLE(status));
6568
}
6669
#endif
6770
return 0;

libc-bottom-half/headers/private/wasi/descriptor_table.h

Lines changed: 50 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -4,10 +4,58 @@
44
#include <wasi/api.h>
55

66
#ifndef __wasip1__
7+
#include <assert.h>
78
#include <netinet/in.h>
89
#include <sys/stat.h>
910
#include <wasi/poll.h>
1011

12+
#ifdef __wasip3__
13+
14+
/// Helper structure to package up state related to a wasip3 `stream<u8>`.
15+
///
16+
/// This is used by various helpers to coordinate reading/writing/etc on a
17+
/// stream. This simultaneously represents both readers and writers.
18+
typedef struct wasip3_io_state_t {
19+
uint32_t stream;
20+
bool done;
21+
} wasip3_io_state_t;
22+
23+
/// Initializes `state` with the `stream` provided.
24+
static inline void wasip3_io_state_init(wasip3_io_state_t *state,
25+
uint32_t stream) {
26+
assert(stream != 0);
27+
state->stream = stream;
28+
state->done = false;
29+
}
30+
31+
/// Tests whether `state` has been initialized with a stream yet.
32+
static inline bool wasip3_io_state_present(wasip3_io_state_t *state) {
33+
return state->stream != 0;
34+
}
35+
36+
/// Closes out the streams/etc internal to `state`.
37+
///
38+
/// Internally the stream must be a reader-half of a `stream<u8>`.
39+
static inline void wasip3_read_state_close(wasip3_io_state_t *state) {
40+
if (state->stream != 0) {
41+
filesystem_stream_u8_drop_readable(state->stream);
42+
state->stream = 0;
43+
}
44+
state->done = false;
45+
}
46+
47+
/// Closes out the streams/etc internal to `state`.
48+
///
49+
/// Internally the stream must be a writer-half of a `stream<u8>`.
50+
static inline void wasip3_write_state_close(wasip3_io_state_t *state) {
51+
if (state->stream != 0) {
52+
filesystem_stream_u8_drop_writable(state->stream);
53+
state->stream = 0;
54+
}
55+
state->done = false;
56+
}
57+
#endif
58+
1159
// Metadata for WASI reads which is used to delegate to `__wasilibc_read(...)`
1260
// to perform the actual read of a stream.
1361
typedef struct wasi_read_t {
@@ -25,8 +73,7 @@ typedef struct wasi_read_t {
2573
// initialized as-necessary.
2674
poll_own_pollable_t *pollable;
2775
#else
28-
// The `stream<u8>` that's being read.
29-
filesystem_stream_u8_t stream;
76+
wasip3_io_state_t *state;
3077

3178
// A callback/ptr pair to invoke when EOF is reached to set errno and return
3279
// an error code.
@@ -45,22 +92,14 @@ typedef struct wasi_write_t {
4592
streams_borrow_output_stream_t output;
4693
poll_own_pollable_t *pollable;
4794
#else
48-
// The actual stream that is being written to.
49-
filesystem_stream_u8_writer_t output;
50-
// An indicator if `output` has been closed/dropped.
51-
bool *done;
95+
wasip3_io_state_t *state;
5296
// A callback/ptr pair to invoke when EOF is reached to set errno and return
5397
// an error code.
5498
int (*eof)(void *);
5599
void *eof_data;
56100
#endif
57101
} wasi_write_t;
58102

59-
#ifdef __wasip3__
60-
// create an alias to distinguish the handle type in the API
61-
typedef uint32_t waitable_t;
62-
#endif
63-
64103
/**
65104
* Operations that are required of all descriptors registered as file
66105
* descriptors.

libc-bottom-half/headers/private/wasi/tcp.h

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -55,17 +55,16 @@ typedef struct {
5555
#else
5656
// The bytes that are being received on this socket in addition to the future
5757
// of the result to read when the socket hits EOF to see if there's an error.
58-
sockets_stream_u8_t receive;
58+
wasip3_io_state_t receive;
5959
sockets_future_result_void_error_code_t receive_result;
6060

6161
// The stream to write to in order to send out bytes, along with the `send`
6262
// subtask that was created connected to the other half of `send`. The
6363
// result of the subtask at `send_result` will get filled in when
6464
// `send_subtask` is finished.
65-
sockets_stream_u8_writer_t send;
65+
wasip3_io_state_t send;
6666
wasip3_subtask_t send_subtask;
6767
sockets_result_void_error_code_t send_result;
68-
bool send_done;
6968
#endif
7069
} tcp_socket_state_connected_t;
7170

libc-bottom-half/headers/private/wasi/wasip3_block.h

Lines changed: 37 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,8 @@
77
#include <stdint.h>
88
#include <wasi/descriptor_table.h> // for waitable_t
99

10+
typedef wasip3_waitable_status_t (*wasip3_cancel_t)(uint32_t);
11+
1012
// Waits for a subtask to return
1113
void __wasilibc_subtask_block_on_and_drop(wasip3_subtask_t subtask);
1214

@@ -37,8 +39,22 @@ int __wasilibc_subtask_await_nonblocking(wasip3_subtask_status_t status);
3739
/// This function will block if `status` indicates that the future is blocked,
3840
/// and this function won't return until the future has become ready again. This
3941
/// is suitable, for example, for waiting for future reads to complete.
40-
void __wasilibc_future_block_on(wasip3_subtask_status_t status,
41-
uint32_t future);
42+
///
43+
/// If `timeout` is nonzero then this function will block for at most that
44+
/// amount of time. The `cancel` callback must be set when `timeout` is set and
45+
/// that will be used to cancel the in-flight operation on this future.
46+
int __wasilibc_future_block_on_timeout(wasip3_waitable_status_t status,
47+
uint32_t future,
48+
monotonic_clock_duration_t timeout,
49+
wasip3_cancel_t cancel);
50+
51+
// Convenience wrapper around `*_timeout` above which doesn't take a timeout.
52+
static inline void __wasilibc_future_block_on(wasip3_waitable_status_t status,
53+
uint32_t future) {
54+
ssize_t ret = __wasilibc_future_block_on_timeout(status, future, 0, NULL);
55+
(void)ret;
56+
assert(ret >= 0);
57+
}
4258

4359
/// Waits for `stream` to be resolved after a previous operation yielded
4460
/// `status` as a result.
@@ -49,8 +65,24 @@ void __wasilibc_future_block_on(wasip3_subtask_status_t status,
4965
///
5066
/// The `closed` variable is set based on the result of the operation to
5167
/// communicate what was received from the component model.
52-
size_t __wasilibc_stream_block_on(wasip3_subtask_status_t status,
53-
uint32_t stream, bool *closed);
54-
#endif
68+
///
69+
/// If `timeout` is nonzero then this function will block for at most that
70+
/// amount of time. The `cancel` callback must be set when `timeout` is set and
71+
/// that will be used to cancel the in-flight operation on this stream.
72+
ssize_t __wasilibc_stream_block_on_timeout(wasip3_waitable_status_t status,
73+
uint32_t stream, bool *closed,
74+
monotonic_clock_duration_t timeout,
75+
wasip3_cancel_t cancel);
76+
77+
// Convenience wrapper around `*_timeout` above which doesn't take a timeout.
78+
static inline size_t __wasilibc_stream_block_on(wasip3_waitable_status_t status,
79+
uint32_t stream, bool *closed) {
80+
ssize_t ret =
81+
__wasilibc_stream_block_on_timeout(status, stream, closed, 0, NULL);
82+
assert(ret >= 0);
83+
return ret;
84+
}
85+
86+
#endif // __wasip3__
5587

5688
#endif // WASI_WASIP3_BLOCK_H

libc-bottom-half/headers/public/wasi/__generated_wasip3.h

Lines changed: 1 addition & 1 deletion
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

libc-bottom-half/sources/file.c

Lines changed: 26 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -26,11 +26,11 @@ typedef struct {
2626
streams_own_pollable_t read_pollable;
2727
streams_own_pollable_t write_pollable;
2828
#else
29-
filesystem_tuple2_stream_u8_future_result_void_error_code_t read;
30-
filesystem_stream_u8_writer_t write;
29+
wasip3_io_state_t read;
30+
filesystem_future_result_void_error_code_t read_result;
31+
wasip3_io_state_t write;
3132
wasip3_subtask_t write_subtask;
3233
filesystem_result_void_error_code_t write_pending_result;
33-
bool write_done;
3434
#endif
3535
} file_t;
3636

@@ -54,17 +54,11 @@ static void file_close_streams(void *data) {
5454
file->write_stream.__handle = 0;
5555
}
5656
#else
57-
if (file->read.f0 != 0) {
58-
filesystem_stream_u8_drop_readable(file->read.f0);
59-
file->read.f0 = 0;
60-
}
61-
if (file->read.f1 != 0) {
62-
filesystem_future_result_void_error_code_drop_readable(file->read.f1);
63-
file->read.f1 = 0;
64-
}
65-
if (file->write != 0) {
66-
filesystem_stream_u8_drop_writable(file->write);
67-
file->write = 0;
57+
wasip3_read_state_close(&file->read);
58+
wasip3_write_state_close(&file->write);
59+
if (file->read_result != 0) {
60+
filesystem_future_result_void_error_code_drop_readable(file->read_result);
61+
file->read_result = 0;
6862
}
6963
if (file->write_subtask != 0) {
7064
// TODO: this should use `wasip3_subtask_cancel` but right now that's buggy
@@ -76,7 +70,6 @@ static void file_close_streams(void *data) {
7670
__wasilibc_subtask_block_on_and_drop(file->write_subtask);
7771
file->write_subtask = 0;
7872
}
79-
file->write_done = false;
8073
#endif
8174
}
8275

@@ -91,13 +84,13 @@ static void file_free(void *data) {
9184
static int file_read_eof(void *data) {
9285
file_t *file = (file_t *)data;
9386

94-
if (file->read.f1 != 0) {
87+
if (file->read_result != 0) {
9588
filesystem_result_void_error_code_t result;
96-
__wasilibc_future_block_on(
97-
filesystem_future_result_void_error_code_read(file->read.f1, &result),
98-
file->read.f1);
99-
filesystem_future_result_void_error_code_drop_readable(file->read.f1);
100-
file->read.f1 = 0;
89+
__wasilibc_future_block_on(filesystem_future_result_void_error_code_read(
90+
file->read_result, &result),
91+
file->read_result);
92+
filesystem_future_result_void_error_code_drop_readable(file->read_result);
93+
file->read_result = 0;
10194
if (result.is_err) {
10295
translate_error(result.val.err);
10396
return -1;
@@ -123,12 +116,15 @@ static int file_get_read_stream(void *data, wasi_read_t *read) {
123116
read->input = streams_borrow_input_stream(file->read_stream);
124117
read->pollable = &file->read_pollable;
125118
#else
126-
if (file->read.f0 == 0) {
119+
if (!wasip3_io_state_present(&file->read)) {
120+
assert(!file->read_result);
121+
filesystem_tuple2_stream_u8_future_result_void_error_code_t result;
127122
filesystem_method_descriptor_read_via_stream(
128-
filesystem_borrow_descriptor(file->file_handle), file->offset,
129-
&file->read);
123+
filesystem_borrow_descriptor(file->file_handle), file->offset, &result);
124+
wasip3_io_state_init(&file->read, result.f0);
125+
file->read_result = result.f1;
130126
}
131-
read->stream = file->read.f0;
127+
read->state = &file->read;
132128
read->eof = file_read_eof;
133129
read->eof_data = data;
134130
#endif
@@ -178,9 +174,10 @@ static int file_get_write_stream(void *data, wasi_write_t *write) {
178174
write->output = streams_borrow_output_stream(file->write_stream);
179175
write->pollable = &file->write_pollable;
180176
#else
181-
if (file->write == 0) {
177+
if (!wasip3_io_state_present(&file->write)) {
182178
assert(file->write_subtask == 0);
183-
filesystem_stream_u8_t write_read = filesystem_stream_u8_new(&file->write);
179+
filesystem_stream_u8_writer_t writer;
180+
filesystem_stream_u8_t write_read = filesystem_stream_u8_new(&writer);
184181
wasip3_subtask_status_t status;
185182
if (file->oflag & O_APPEND) {
186183
status = filesystem_method_descriptor_append_via_stream(
@@ -193,11 +190,11 @@ static int file_get_write_stream(void *data, wasi_write_t *write) {
193190
}
194191
if (WASIP3_SUBTASK_STATE(status) != WASIP3_SUBTASK_RETURNED)
195192
file->write_subtask = WASIP3_SUBTASK_HANDLE(status);
193+
wasip3_io_state_init(&file->write, writer);
196194
}
197-
write->output = file->write;
195+
write->state = &file->write;
198196
write->eof = file_write_eof;
199197
write->eof_data = data;
200-
write->done = &file->write_done;
201198
#endif
202199
write->offset = &file->offset;
203200
write->timeout = 0;

libc-bottom-half/sources/file_utils.c

Lines changed: 12 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,3 @@
1-
// TODO(wasip3) these are load-bearing assertions functionality-wise, but once
2-
// the assertions below are removed and the implementation is filled in then
3-
// this should be removed.
4-
#undef NDEBUG
5-
61
#include <assert.h>
72
#include <common/errors.h>
83
#include <errno.h>
@@ -164,15 +159,15 @@ ssize_t __wasilibc_write(wasi_write_t *write, const void *buffer,
164159
}
165160
}
166161
#elif defined(__wasip3__)
167-
assert(write->blocking); // TODO(wasip3)
168-
assert(write->timeout == 0); // TODO(wasip3)
162+
assert(write->blocking); // TODO(wasip3)
169163

170164
// Perform writes until either a non-zero-length write completes or the stream
171165
// is closed.
172-
while (!*write->done) {
173-
size_t amount = __wasilibc_stream_block_on(
174-
filesystem_stream_u8_write(write->output, buffer, length),
175-
write->output, write->done);
166+
while (!write->state->done) {
167+
ssize_t amount = __wasilibc_stream_block_on_timeout(
168+
filesystem_stream_u8_write(write->state->stream, buffer, length),
169+
write->state->stream, &write->state->done, write->timeout,
170+
filesystem_stream_u8_cancel_write);
176171
if (amount > 0 || length == 0) {
177172
if (write->offset)
178173
*write->offset += amount;
@@ -250,16 +245,15 @@ ssize_t __wasilibc_read(wasi_read_t *read, void *buffer, size_t length) {
250245
}
251246
}
252247
#elif defined(__wasip3__)
253-
assert(read->blocking); // TODO(wasip3)
254-
assert(read->timeout == 0); // TODO(wasip3)
248+
assert(read->blocking); // TODO(wasip3)
255249

256250
// Attempt reads until a nonzero-length read is encountered or the stream is
257251
// closed.
258-
bool closed = false;
259-
while (!closed) {
260-
size_t amount = __wasilibc_stream_block_on(
261-
filesystem_stream_u8_read(read->stream, buffer, length), read->stream,
262-
&closed);
252+
while (!read->state->done) {
253+
size_t amount = __wasilibc_stream_block_on_timeout(
254+
filesystem_stream_u8_read(read->state->stream, buffer, length),
255+
read->state->stream, &read->state->done, read->timeout,
256+
filesystem_stream_u8_cancel_read);
263257
if (amount > 0 || length == 0) {
264258
if (read->offset)
265259
*read->offset += amount;

0 commit comments

Comments
 (0)