Skip to content

Commit b02054a

Browse files
committed
fix: coalesce tsfn send message (#210)
1 parent 71cd534 commit b02054a

7 files changed

Lines changed: 121 additions & 16 deletions

File tree

.github/workflows/ci.yml

Lines changed: 23 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -68,7 +68,7 @@ jobs:
6868

6969
wasm32-wasip1-threads:
7070
timeout-minutes: 15
71-
name: CI wasm32-wasip1-threads
71+
name: CI wasm32-wasip1-threads (libemnapi-mt)
7272
runs-on: ubuntu-latest
7373

7474
steps:
@@ -93,6 +93,28 @@ jobs:
9393
run: EMNAPI_TEST_4GB=1 npm run test:wt -w packages/test
9494
timeout-minutes: 3
9595

96+
wasm32-wasip1-threads-use-js:
97+
timeout-minutes: 15
98+
name: CI wasm32-wasip1-threads (libemnapi-basic-mt)
99+
runs-on: ubuntu-latest
100+
101+
steps:
102+
- uses: actions/checkout@v6
103+
- uses: ./.github/actions/setup-target-ci
104+
with:
105+
target: 'wasm32-wasip1-threads-js'
106+
needs-wasi-sdk: 'true'
107+
- name: NPM Build
108+
run: npm run build --workspaces --if-present
109+
- name: Build target
110+
run: EMNAPI_WASI_THREADS_LINK_BASIC_MT=1 npm run rebuild:wt -w packages/test
111+
- name: Test target
112+
run: npm run test:wt -w packages/test
113+
timeout-minutes: 3
114+
- name: Test target (4GB)
115+
run: EMNAPI_TEST_4GB=1 npm run test:wt -w packages/test
116+
timeout-minutes: 3
117+
96118
wasm32-wasip1:
97119
timeout-minutes: 15
98120
name: CI wasm32-wasip1

packages/emnapi/script/generate_struct_info.c

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -41,6 +41,8 @@ void print_napi_threadsafe_function_info(FILE* f) {
4141
fprintf(f, " queue_size = %zu,\n", offsetof(struct napi_threadsafe_function__, queue_size));
4242
fprintf(f, " queue = %zu,\n", offsetof(struct napi_threadsafe_function__, queue));
4343
fprintf(f, " async = %zu,\n", offsetof(struct napi_threadsafe_function__, async));
44+
fprintf(f, " async_pending = %zu,\n", offsetof(struct napi_threadsafe_function__, async) + offsetof(uv_async_t, pending));
45+
fprintf(f, " async_u_fd = %zu,\n", offsetof(struct napi_threadsafe_function__, async) + offsetof(uv_async_t, u.fd));
4446
fprintf(f, " thread_count = %zu,\n", offsetof(struct napi_threadsafe_function__, thread_count));
4547
fprintf(f, " state = %zu,\n", offsetof(struct napi_threadsafe_function__, state));
4648
fprintf(f, " dispatch_state = %zu,\n", offsetof(struct napi_threadsafe_function__, dispatch_state));

packages/emnapi/src/threadsafe-function.ts

Lines changed: 76 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,9 @@ export const emnapiTSFN = {
3030
/* size_t */ queue_size: 0,
3131
/* bool */ is_some: 0,
3232
/* void* */ queue: 0,
33+
// Reuse uv_async_t storage as JS-side wakeup state: pending event + scheduled drain.
34+
async_pending: 0,
35+
async_u_fd: 0,
3336
/* size_t */ thread_count: 0,
3437
/* int32_t */ state: 0,
3538
/* atomic_uchar */ dispatch_state: 0,
@@ -54,6 +57,8 @@ export const emnapiTSFN = {
5457
emnapiTSFN.offset.queue_size = NapiTSFNOffset64.queue_size
5558
emnapiTSFN.offset.is_some = NapiTSFNOffset64.async_resource_is_some
5659
emnapiTSFN.offset.queue = NapiTSFNOffset64.queue
60+
emnapiTSFN.offset.async_pending = NapiTSFNOffset64.async_pending
61+
emnapiTSFN.offset.async_u_fd = NapiTSFNOffset64.async_u_fd
5762
emnapiTSFN.offset.thread_count = NapiTSFNOffset64.thread_count
5863
emnapiTSFN.offset.state = NapiTSFNOffset64.state
5964
emnapiTSFN.offset.dispatch_state = NapiTSFNOffset64.dispatch_state
@@ -76,6 +81,8 @@ export const emnapiTSFN = {
7681
emnapiTSFN.offset.queue_size = NapiTSFNOffset32.queue_size
7782
emnapiTSFN.offset.is_some = NapiTSFNOffset32.async_resource_is_some
7883
emnapiTSFN.offset.queue = NapiTSFNOffset32.queue
84+
emnapiTSFN.offset.async_pending = NapiTSFNOffset32.async_pending
85+
emnapiTSFN.offset.async_u_fd = NapiTSFNOffset32.async_u_fd
7986
emnapiTSFN.offset.thread_count = NapiTSFNOffset32.thread_count
8087
emnapiTSFN.offset.state = NapiTSFNOffset32.state
8188
emnapiTSFN.offset.dispatch_state = NapiTSFNOffset32.dispatch_state
@@ -113,7 +120,10 @@ export const emnapiTSFN = {
113120
const type = __emnapi__.type
114121
const payload = __emnapi__.payload
115122
if (type === 'tsfn-send') {
116-
emnapiTSFN.dispatch(payload.tsfn)
123+
const pendng = payload.tsfn + emnapiTSFN.offset.async_pending
124+
if (Atomics.load(new Int32Array(wasmMemory.buffer), pendng >>> 2) !== 0) {
125+
emnapiTSFN.enqueue(payload.tsfn)
126+
}
117127
}
118128
}
119129
}
@@ -708,24 +718,77 @@ export const emnapiTSFN = {
708718
emnapiTSFN.send(func)
709719
}
710720
},
721+
enqueue (func: number): void {
722+
// `pending` means a worker thread has requested a wakeup that has not
723+
// been drained on the main thread yet.
724+
const pending = func + emnapiTSFN.offset.async_pending
725+
// `scheduled` prevents queueing the same main-thread drain chain more than
726+
// once while a previous wakeup is still in flight.
727+
const scheduled = func + emnapiTSFN.offset.async_u_fd
728+
const i32a = new Int32Array(wasmMemory.buffer)
729+
if (Atomics.exchange(i32a, scheduled >>> 2, 1) !== 0) {
730+
return
731+
}
732+
733+
// Match uv_async_send-style coalescing in JS: the first turn represents
734+
// the wakeup reaching the main thread, and the second turn performs the
735+
// actual TSFN drain after nearby Send/Signal calls have had a chance to
736+
// collapse into the shared AsyncProgressWorker state.
737+
emnapiCtx.feature.setImmediate(() => {
738+
if (Atomics.load(i32a, pending >>> 2) === 0) {
739+
Atomics.store(i32a, scheduled >>> 2, 0)
740+
return
741+
}
742+
743+
emnapiCtx.feature.setImmediate(() => {
744+
try {
745+
// Consume the coalesced wakeup once, then let dispatch() observe any
746+
// queue mutations through dispatch_state like the C implementation.
747+
if (Atomics.exchange(i32a, pending >>> 2, 0) === 0) {
748+
return
749+
}
750+
emnapiTSFN.dispatch(func)
751+
} finally {
752+
// Allow a later wakeup to schedule a new drain chain. If another
753+
// worker-thread send raced with this drain, enqueue one more turn.
754+
Atomics.store(i32a, scheduled >>> 2, 0)
755+
if (Atomics.load(i32a, pending >>> 2) !== 0) {
756+
emnapiTSFN.enqueue(func)
757+
}
758+
}
759+
})
760+
})
761+
},
711762
send (func: number): void {
712763
const current_state = Atomics.or(new Uint32Array(wasmMemory.buffer), (func + emnapiTSFN.offset.dispatch_state) >>> 2, 1 << 1)
713764
if ((current_state & 1) === 1) {
714765
return
715766
}
716-
if ((typeof ENVIRONMENT_IS_PTHREAD !== 'undefined') && ENVIRONMENT_IS_PTHREAD) {
717-
postMessage({
718-
__emnapi__: {
719-
type: 'tsfn-send',
720-
payload: {
721-
tsfn: func
767+
768+
const pendng = func + emnapiTSFN.offset.async_pending
769+
// A wakeup is already pending, so this send only needs to leave the queued
770+
// work in the TSFN queue and let the existing drain pick it up.
771+
if (Atomics.load(new Int32Array(wasmMemory.buffer), pendng >>> 2) !== 0) {
772+
return
773+
}
774+
775+
if (Atomics.exchange(new Int32Array(wasmMemory.buffer), pendng >>> 2, 1) === 0) {
776+
if ((typeof ENVIRONMENT_IS_PTHREAD !== 'undefined') && ENVIRONMENT_IS_PTHREAD) {
777+
// Worker threads only post a wakeup token. Main-thread draining is
778+
// serialized by enqueue() once the message is received.
779+
postMessage({
780+
__emnapi__: {
781+
type: 'tsfn-send',
782+
payload: {
783+
tsfn: func
784+
}
722785
}
723-
}
724-
})
725-
} else {
726-
emnapiCtx.feature.setImmediate(() => {
727-
emnapiTSFN.dispatch(func)
728-
})
786+
})
787+
} else {
788+
// On the main thread we can skip the cross-thread hop and schedule the
789+
// coalesced drain chain directly.
790+
emnapiTSFN.enqueue(func)
791+
}
729792
}
730793
}
731794
}

packages/emnapi/src/typings/struct-wasm32.d.ts

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,8 @@ declare const enum NapiTSFNOffset32 {
2727
queue_size = 60,
2828
queue = 64,
2929
async = 72,
30+
async_pending = 132,
31+
async_u_fd = 96,
3032
thread_count = 136,
3133
state = 140,
3234
dispatch_state = 144,

packages/emnapi/src/typings/struct-wasm64.d.ts

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,8 @@ declare const enum NapiTSFNOffset64 {
2727
queue_size = 88,
2828
queue = 96,
2929
async = 112,
30+
async_pending = 232,
31+
async_u_fd = 160,
3032
thread_count = 240,
3133
state = 248,
3234
dispatch_state = 252,

packages/test/CMakeLists.txt

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -75,6 +75,12 @@ else()
7575
set(IS_MEMORY64 OFF)
7676
endif()
7777

78+
if(DEFINED ENV{EMNAPI_WASI_THREADS_LINK_BASIC_MT})
79+
set(_EMNAPI_WASI_THREADS_LINK_BASIC_MT $ENV{EMNAPI_WASI_THREADS_LINK_BASIC_MT})
80+
else()
81+
set(_EMNAPI_WASI_THREADS_LINK_BASIC_MT OFF)
82+
endif()
83+
7884
if(IS_EMSCRIPTEN)
7985
if(IS_MEMORY64)
8086
add_compile_options("-sMEMORY64=1")
@@ -169,7 +175,7 @@ function(add_test NAME SOURCE_LIST PTHREAD)
169175
target_link_libraries(${NAME} PRIVATE "emnapi-basic")
170176
endif()
171177
else()
172-
if(IS_WASI_THREADS)
178+
if(IS_WASI_THREADS AND _EMNAPI_WASI_THREADS_LINK_BASIC_MT)
173179
target_link_libraries(${NAME} PRIVATE "emnapi-basic-mt")
174180
target_link_options(${NAME} PRIVATE "-Wl,--import-memory,--shared-memory,--export=emnapi_async_worker_create,--export=emnapi_async_worker_init")
175181
else()
@@ -227,7 +233,7 @@ function(add_naa_test NAME SOURCE_LIST DEFINES ENABLE_EXCEPTION)
227233
target_link_libraries(${NAME} PRIVATE "emnapi-basic")
228234
endif()
229235
else()
230-
if(IS_WASI_THREADS)
236+
if(IS_WASI_THREADS AND _EMNAPI_WASI_THREADS_LINK_BASIC_MT)
231237
target_link_libraries(${NAME} PRIVATE "emnapi-basic-mt")
232238
target_link_options(${NAME} PRIVATE "-Wl,--import-memory,--shared-memory,--export=emnapi_async_worker_create,--export=emnapi_async_worker_init")
233239
else()

packages/test/script/test.js

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,13 @@
11
if (process.env.EMNAPI_TEST_WASI_THREADS) {
22
process.env.EMNAPI_TEST_WASI = 1
3+
4+
const { spawnSync } = require('child_process')
5+
const path = require('path')
6+
const wasmPath = path.join(__dirname, '../.build/wasm32-wasi-threads/Debug/async.wasm')
7+
const module = new WebAssembly.Module(require('fs').readFileSync(wasmPath))
8+
console.log('libemnapi-basic-mt: ' + (WebAssembly.Module.imports(module).filter(d => {
9+
return d.name === 'napi_create_async_work'
10+
}).length > 0))
311
}
412

513
const { spawnSync } = require('child_process')

0 commit comments

Comments
 (0)