Skip to content

Commit 184e438

Browse files
authored
fix: coalesce tsfn send message (#210)
1 parent 119ad4f commit 184e438

7 files changed

Lines changed: 129 additions & 24 deletions

File tree

.github/workflows/ci.yml

Lines changed: 23 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -68,7 +68,7 @@ jobs:
6868

6969
wasm32-wasip1-threads:
7070
timeout-minutes: 15
71-
name: CI wasm32-wasip1-threads
71+
name: CI wasm32-wasip1-threads (libemnapi-mt)
7272
runs-on: ubuntu-latest
7373

7474
steps:
@@ -93,6 +93,28 @@ jobs:
9393
run: EMNAPI_TEST_4GB=1 npm run test:wt -w packages/test
9494
timeout-minutes: 3
9595

96+
wasm32-wasip1-threads-use-js:
97+
timeout-minutes: 15
98+
name: CI wasm32-wasip1-threads (libemnapi-basic-mt)
99+
runs-on: ubuntu-latest
100+
101+
steps:
102+
- uses: actions/checkout@v6
103+
- uses: ./.github/actions/setup-target-ci
104+
with:
105+
target: 'wasm32-wasip1-threads-js'
106+
needs-wasi-sdk: 'true'
107+
- name: NPM Build
108+
run: npm run build --workspaces --if-present
109+
- name: Build target
110+
run: EMNAPI_WASI_THREADS_LINK_BASIC_MT=1 npm run rebuild:wt -w packages/test
111+
- name: Test target
112+
run: npm run test:wt -w packages/test
113+
timeout-minutes: 3
114+
- name: Test target (4GB)
115+
run: EMNAPI_TEST_4GB=1 npm run test:wt -w packages/test
116+
timeout-minutes: 3
117+
96118
wasm32-wasip1:
97119
timeout-minutes: 15
98120
name: CI wasm32-wasip1

packages/emnapi/script/generate_struct_info.c

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -41,6 +41,8 @@ void print_napi_threadsafe_function_info(FILE* f) {
4141
fprintf(f, " queue_size = %zu,\n", offsetof(struct napi_threadsafe_function__, queue_size));
4242
fprintf(f, " queue = %zu,\n", offsetof(struct napi_threadsafe_function__, queue));
4343
fprintf(f, " async = %zu,\n", offsetof(struct napi_threadsafe_function__, async));
44+
fprintf(f, " async_pending = %zu,\n", offsetof(struct napi_threadsafe_function__, async) + offsetof(uv_async_t, pending));
45+
fprintf(f, " async_u_fd = %zu,\n", offsetof(struct napi_threadsafe_function__, async) + offsetof(uv_async_t, u.fd));
4446
fprintf(f, " thread_count = %zu,\n", offsetof(struct napi_threadsafe_function__, thread_count));
4547
fprintf(f, " state = %zu,\n", offsetof(struct napi_threadsafe_function__, state));
4648
fprintf(f, " dispatch_state = %zu,\n", offsetof(struct napi_threadsafe_function__, dispatch_state));

packages/emnapi/src/threadsafe-function.ts

Lines changed: 76 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -46,6 +46,9 @@ const emnapiTSFN = {
4646
/* size_t */ queue_size: 0,
4747
/* bool */ is_some: 0,
4848
/* void* */ queue: 0,
49+
// Reuse uv_async_t storage as JS-side wakeup state: pending event + scheduled drain.
50+
async_pending: 0,
51+
async_u_fd: 0,
4952
/* size_t */ thread_count: 0,
5053
/* int32_t */ state: 0,
5154
/* atomic_uchar */ dispatch_state: 0,
@@ -70,6 +73,8 @@ const emnapiTSFN = {
7073
emnapiTSFN.offset.queue_size = NapiTSFNOffset64.queue_size
7174
emnapiTSFN.offset.is_some = NapiTSFNOffset64.async_resource_is_some
7275
emnapiTSFN.offset.queue = NapiTSFNOffset64.queue
76+
emnapiTSFN.offset.async_pending = NapiTSFNOffset64.async_pending
77+
emnapiTSFN.offset.async_u_fd = NapiTSFNOffset64.async_u_fd
7378
emnapiTSFN.offset.thread_count = NapiTSFNOffset64.thread_count
7479
emnapiTSFN.offset.state = NapiTSFNOffset64.state
7580
emnapiTSFN.offset.dispatch_state = NapiTSFNOffset64.dispatch_state
@@ -92,6 +97,8 @@ const emnapiTSFN = {
9297
emnapiTSFN.offset.queue_size = NapiTSFNOffset32.queue_size
9398
emnapiTSFN.offset.is_some = NapiTSFNOffset32.async_resource_is_some
9499
emnapiTSFN.offset.queue = NapiTSFNOffset32.queue
100+
emnapiTSFN.offset.async_pending = NapiTSFNOffset32.async_pending
101+
emnapiTSFN.offset.async_u_fd = NapiTSFNOffset32.async_u_fd
95102
emnapiTSFN.offset.thread_count = NapiTSFNOffset32.thread_count
96103
emnapiTSFN.offset.state = NapiTSFNOffset32.state
97104
emnapiTSFN.offset.dispatch_state = NapiTSFNOffset32.dispatch_state
@@ -129,7 +136,10 @@ const emnapiTSFN = {
129136
const type = __emnapi__.type
130137
const payload = __emnapi__.payload
131138
if (type === 'tsfn-send') {
132-
emnapiTSFN.dispatch(payload.tsfn)
139+
const pendng = payload.tsfn + emnapiTSFN.offset.async_pending
140+
if (Atomics.load(new Int32Array(wasmMemory.buffer), pendng >>> 2) !== 0) {
141+
emnapiTSFN.enqueue(payload.tsfn)
142+
}
133143
}
134144
}
135145
}
@@ -721,24 +731,77 @@ const emnapiTSFN = {
721731
emnapiTSFN.send(func)
722732
}
723733
},
734+
enqueue (func: number): void {
735+
// `pending` means a worker thread has requested a wakeup that has not
736+
// been drained on the main thread yet.
737+
const pending = func + emnapiTSFN.offset.async_pending
738+
// `scheduled` prevents queueing the same main-thread drain chain more than
739+
// once while a previous wakeup is still in flight.
740+
const scheduled = func + emnapiTSFN.offset.async_u_fd
741+
const i32a = new Int32Array(wasmMemory.buffer)
742+
if (Atomics.exchange(i32a, scheduled >>> 2, 1) !== 0) {
743+
return
744+
}
745+
746+
// Match uv_async_send-style coalescing in JS: the first turn represents
747+
// the wakeup reaching the main thread, and the second turn performs the
748+
// actual TSFN drain after nearby Send/Signal calls have had a chance to
749+
// collapse into the shared AsyncProgressWorker state.
750+
emnapiCtx.features.setImmediate(() => {
751+
if (Atomics.load(i32a, pending >>> 2) === 0) {
752+
Atomics.store(i32a, scheduled >>> 2, 0)
753+
return
754+
}
755+
756+
emnapiCtx.features.setImmediate(() => {
757+
try {
758+
// Consume the coalesced wakeup once, then let dispatch() observe any
759+
// queue mutations through dispatch_state like the C implementation.
760+
if (Atomics.exchange(i32a, pending >>> 2, 0) === 0) {
761+
return
762+
}
763+
emnapiTSFN.dispatch(func)
764+
} finally {
765+
// Allow a later wakeup to schedule a new drain chain. If another
766+
// worker-thread send raced with this drain, enqueue one more turn.
767+
Atomics.store(i32a, scheduled >>> 2, 0)
768+
if (Atomics.load(i32a, pending >>> 2) !== 0) {
769+
emnapiTSFN.enqueue(func)
770+
}
771+
}
772+
})
773+
})
774+
},
724775
send (func: number): void {
725776
const current_state = Atomics.or(new Uint32Array(wasmMemory.buffer), (func + emnapiTSFN.offset.dispatch_state) >>> 2, 1 << 1)
726777
if ((current_state & 1) === 1) {
727778
return
728779
}
729-
if ((typeof ENVIRONMENT_IS_PTHREAD !== 'undefined') && ENVIRONMENT_IS_PTHREAD) {
730-
postMessage({
731-
__emnapi__: {
732-
type: 'tsfn-send',
733-
payload: {
734-
tsfn: func
780+
781+
const pendng = func + emnapiTSFN.offset.async_pending
782+
// A wakeup is already pending, so this send only needs to leave the queued
783+
// work in the TSFN queue and let the existing drain pick it up.
784+
if (Atomics.load(new Int32Array(wasmMemory.buffer), pendng >>> 2) !== 0) {
785+
return
786+
}
787+
788+
if (Atomics.exchange(new Int32Array(wasmMemory.buffer), pendng >>> 2, 1) === 0) {
789+
if ((typeof ENVIRONMENT_IS_PTHREAD !== 'undefined') && ENVIRONMENT_IS_PTHREAD) {
790+
// Worker threads only post a wakeup token. Main-thread draining is
791+
// serialized by enqueue() once the message is received.
792+
postMessage({
793+
__emnapi__: {
794+
type: 'tsfn-send',
795+
payload: {
796+
tsfn: func
797+
}
735798
}
736-
}
737-
})
738-
} else {
739-
emnapiCtx.features.setImmediate(() => {
740-
emnapiTSFN.dispatch(func)
741-
})
799+
})
800+
} else {
801+
// On the main thread we can skip the cross-thread hop and schedule the
802+
// coalesced drain chain directly.
803+
emnapiTSFN.enqueue(func)
804+
}
742805
}
743806
}
744807
}

packages/emnapi/src/typings/struct-wasm32.d.ts

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,8 @@ declare const enum NapiTSFNOffset32 {
2727
queue_size = 60,
2828
queue = 64,
2929
async = 72,
30+
async_pending = 132,
31+
async_u_fd = 96,
3032
thread_count = 136,
3133
state = 140,
3234
dispatch_state = 144,

packages/emnapi/src/typings/struct-wasm64.d.ts

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,8 @@ declare const enum NapiTSFNOffset64 {
2727
queue_size = 88,
2828
queue = 96,
2929
async = 112,
30+
async_pending = 232,
31+
async_u_fd = 160,
3032
thread_count = 240,
3133
state = 248,
3234
dispatch_state = 252,

packages/test/CMakeLists.txt

Lines changed: 16 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -75,6 +75,12 @@ else()
7575
set(IS_MEMORY64 OFF)
7676
endif()
7777

78+
if(DEFINED ENV{EMNAPI_WASI_THREADS_LINK_BASIC_MT})
79+
set(_EMNAPI_WASI_THREADS_LINK_BASIC_MT $ENV{EMNAPI_WASI_THREADS_LINK_BASIC_MT})
80+
else()
81+
set(_EMNAPI_WASI_THREADS_LINK_BASIC_MT OFF)
82+
endif()
83+
7884
if(IS_EMSCRIPTEN)
7985
if(IS_MEMORY64)
8086
add_compile_options("-sMEMORY64=1")
@@ -194,12 +200,12 @@ function(add_test NAME SOURCE_LIST PTHREAD)
194200
target_link_libraries(${NAME} PRIVATE "emnapi-basic")
195201
endif()
196202
else()
197-
# if(IS_WASI_THREADS)
198-
# target_link_libraries(${NAME} PRIVATE "emnapi-basic-mt")
199-
# target_link_options(${NAME} PRIVATE "-Wl,--import-memory,--shared-memory,--export=emnapi_async_worker_create,--export=emnapi_async_worker_init")
200-
# else()
203+
if(IS_WASI_THREADS AND _EMNAPI_WASI_THREADS_LINK_BASIC_MT)
204+
target_link_libraries(${NAME} PRIVATE "emnapi-basic-mt")
205+
target_link_options(${NAME} PRIVATE "-Wl,--import-memory,--shared-memory,--export=emnapi_async_worker_create,--export=emnapi_async_worker_init")
206+
else()
201207
target_link_libraries(${NAME} PRIVATE "emnapi-mt")
202-
# endif()
208+
endif()
203209
target_compile_options(${NAME} PRIVATE "-pthread")
204210
target_link_options(${NAME} PRIVATE "-pthread")
205211
endif()
@@ -255,12 +261,12 @@ function(add_naa_test NAME SOURCE_LIST DEFINES ENABLE_EXCEPTION)
255261
target_link_libraries(${NAME} PRIVATE "emnapi-basic")
256262
endif()
257263
else()
258-
# if(IS_WASI_THREADS)
259-
# target_link_libraries(${NAME} PRIVATE "emnapi-basic-mt")
260-
# target_link_options(${NAME} PRIVATE "-Wl,--import-memory,--shared-memory,--export=emnapi_async_worker_create,--export=emnapi_async_worker_init")
261-
# else()
264+
if(IS_WASI_THREADS AND _EMNAPI_WASI_THREADS_LINK_BASIC_MT)
265+
target_link_libraries(${NAME} PRIVATE "emnapi-basic-mt")
266+
target_link_options(${NAME} PRIVATE "-Wl,--import-memory,--shared-memory,--export=emnapi_async_worker_create,--export=emnapi_async_worker_init")
267+
else()
262268
target_link_libraries(${NAME} PRIVATE "emnapi-mt")
263-
# endif()
269+
endif()
264270
target_compile_options(${NAME} PRIVATE "-pthread")
265271
target_link_options(${NAME} PRIVATE "-pthread")
266272
endif()

packages/test/script/test.js

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,13 @@
11
if (process.env.EMNAPI_TEST_WASI_THREADS) {
22
process.env.EMNAPI_TEST_WASI = 1
3+
4+
const { spawnSync } = require('child_process')
5+
const path = require('path')
6+
const wasmPath = path.join(__dirname, '../.build/wasm32-wasi-threads/Debug/async.wasm')
7+
const module = new WebAssembly.Module(require('fs').readFileSync(wasmPath))
8+
console.log('libemnapi-basic-mt: ' + (WebAssembly.Module.imports(module).filter(d => {
9+
return d.name === 'napi_create_async_work'
10+
}).length > 0))
311
}
412

513
const { spawnSync } = require('child_process')

0 commit comments

Comments
 (0)