Skip to content

Commit 4855e1e

Browse files
committed
fix(diagnostics): preserve AsyncLocalStorage across async lifecycle
maybeTraceMixed previously delegated start / end to traceSync and then attached its own .then / .finally outside that scope. That worked for event ordering but broke AsyncLocalStorage propagation: a subscriber that called channel.start.bindStore(als, ...) could read that store in its start and end handlers but not in asyncStart or asyncEnd, because those fired in a .then attached outside the start.runStores scope. Promise continuations inherit the AsyncLocalStorage context of the frame that attaches them, so the fix is to attach .then inside the start.runStores block. This mirrors Node's own TracingChannel.tracePromise structure exactly, just with an additional sync branch that short- circuits to [start, end] when fn returns synchronously. Also updates the FakeChannel helper to match: its runStores now publishes the context on entry (the behavior Node's Channel.runStores has), so the fake traceSync / tracePromise implementations match real Node's event counts without the old end.runStores workaround. Adds an integration test that binds a store on graphql:execute's start sub-channel and asserts every lifecycle handler sees it when a resolver returns a promise.
1 parent 3381a40 commit 4855e1e

File tree

3 files changed

+131
-54
lines changed

3 files changed

+131
-54
lines changed

integrationTests/diagnostics/test.js

Lines changed: 37 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@
33
/* eslint-disable n/no-unsupported-features/node-builtins */
44

55
import assert from 'node:assert/strict';
6+
import { AsyncLocalStorage } from 'node:async_hooks';
67
import dc from 'node:diagnostics_channel';
78

89
import {
@@ -247,12 +248,48 @@ function runNoSubscriberCase() {
247248
assert.equal(doc.kind, 'Document');
248249
}
249250

251+
async function runAlsPropagationCase() {
252+
// A subscriber that binds a store on the `start` sub-channel should be able
253+
// to read it in every lifecycle handler (start, end, asyncStart, asyncEnd).
254+
// This is what APMs use to parent child spans to the current operation
255+
// without threading state through the ctx object.
256+
const als = new AsyncLocalStorage();
257+
const channel = dc.tracingChannel('graphql:execute');
258+
channel.start.bindStore(als, (ctx) => ({ operationName: ctx.operationName }));
259+
260+
const seen = {};
261+
const handler = {
262+
start: () => (seen.start = als.getStore()),
263+
end: () => (seen.end = als.getStore()),
264+
asyncStart: () => (seen.asyncStart = als.getStore()),
265+
asyncEnd: () => (seen.asyncEnd = als.getStore()),
266+
};
267+
channel.subscribe(handler);
268+
269+
try {
270+
const schema = buildSchema(`type Query { slow: String }`);
271+
const document = parse('query Slow { slow }');
272+
const rootValue = { slow: () => Promise.resolve('done') };
273+
274+
await execute({ schema, document, rootValue });
275+
276+
assert.deepEqual(seen.start, { operationName: 'Slow' });
277+
assert.deepEqual(seen.end, { operationName: 'Slow' });
278+
assert.deepEqual(seen.asyncStart, { operationName: 'Slow' });
279+
assert.deepEqual(seen.asyncEnd, { operationName: 'Slow' });
280+
} finally {
281+
channel.unsubscribe(handler);
282+
channel.start.unbindStore(als);
283+
}
284+
}
285+
250286
async function main() {
251287
runParseCases();
252288
runValidateCase();
253289
runExecuteCase();
254290
await runSubscribeCase();
255291
runResolveCase();
292+
await runAlsPropagationCase();
256293
runNoSubscriberCase();
257294
console.log('diagnostics integration test passed');
258295
}

src/__testUtils__/fakeDiagnosticsChannel.ts

Lines changed: 47 additions & 36 deletions
Original file line numberDiff line numberDiff line change
@@ -29,11 +29,15 @@ export class FakeChannel implements MinimalChannel {
2929
}
3030

3131
runStores<T, ContextType extends object>(
32-
_ctx: ContextType,
32+
ctx: ContextType,
3333
fn: (this: ContextType, ...args: Array<unknown>) => T,
3434
thisArg?: unknown,
3535
...args: Array<unknown>
3636
): T {
37+
// Node's Channel.runStores publishes the context on the channel before
38+
// invoking fn. Mirror that here so traceSync / tracePromise fake exactly
39+
// matches real Node's start / end event counts.
40+
this.publish(ctx);
3741
return fn.apply(thisArg as ContextType, args);
3842
}
3943

@@ -81,23 +85,24 @@ export class FakeTracingChannel implements MinimalTracingChannel {
8185
thisArg?: unknown,
8286
...args: Array<unknown>
8387
): T {
84-
this.start.publish(ctx);
85-
let result: T;
86-
try {
87-
result = this.end.runStores(ctx, fn, thisArg, ...args);
88-
} catch (err) {
89-
(ctx as { error: unknown }).error = err;
90-
this.error.publish(ctx);
88+
return this.start.runStores(ctx, () => {
89+
let result: T;
90+
try {
91+
result = fn.apply(thisArg as object, args);
92+
} catch (err) {
93+
(ctx as { error: unknown }).error = err;
94+
this.error.publish(ctx);
95+
this.end.publish(ctx);
96+
throw err;
97+
}
98+
// Node's real traceSync sets `ctx.result` before publishing `end`, so
99+
// subscribers can inspect `isPromise(ctx.result)` inside their `end`
100+
// handler to decide whether the operation is complete or async events
101+
// will follow. Match that semantic here.
102+
(ctx as { result: unknown }).result = result;
91103
this.end.publish(ctx);
92-
throw err;
93-
}
94-
// Node's real traceSync sets `ctx.result` before publishing `end`, so
95-
// subscribers can inspect `isPromise(ctx.result)` inside their `end`
96-
// handler to decide whether the operation is complete or async events
97-
// will follow. Match that semantic here.
98-
(ctx as { result: unknown }).result = result;
99-
this.end.publish(ctx);
100-
return result;
104+
return result;
105+
});
101106
}
102107

103108
tracePromise<T>(
@@ -106,33 +111,39 @@ export class FakeTracingChannel implements MinimalTracingChannel {
106111
thisArg?: unknown,
107112
...args: Array<unknown>
108113
): Promise<T> {
109-
this.start.publish(ctx);
110-
let promise: Promise<T>;
111-
try {
112-
promise = this.end.runStores(ctx, fn, thisArg, ...args);
113-
} catch (err) {
114-
(ctx as { error: unknown }).error = err;
115-
this.error.publish(ctx);
114+
return this.start.runStores(ctx, () => {
115+
let promise: Promise<T>;
116+
try {
117+
promise = fn.apply(thisArg as object, args);
118+
} catch (err) {
119+
(ctx as { error: unknown }).error = err;
120+
this.error.publish(ctx);
121+
this.end.publish(ctx);
122+
throw err;
123+
}
116124
this.end.publish(ctx);
117-
throw err;
118-
}
119-
this.end.publish(ctx);
120-
this.asyncStart.publish(ctx);
121-
return promise
122-
.then(
125+
return promise.then(
123126
(result) => {
124127
(ctx as { result: unknown }).result = result;
125-
return result;
128+
this.asyncStart.publish(ctx);
129+
try {
130+
return result;
131+
} finally {
132+
this.asyncEnd.publish(ctx);
133+
}
126134
},
127135
(err: unknown) => {
128136
(ctx as { error: unknown }).error = err;
129137
this.error.publish(ctx);
130-
throw err;
138+
this.asyncStart.publish(ctx);
139+
try {
140+
throw err;
141+
} finally {
142+
this.asyncEnd.publish(ctx);
143+
}
131144
},
132-
)
133-
.finally(() => {
134-
this.asyncEnd.publish(ctx);
135-
});
145+
);
146+
});
136147
}
137148
}
138149

src/diagnostics.ts

Lines changed: 47 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -185,7 +185,21 @@ export function maybeTracePromise<T>(
185185
}
186186

187187
/**
188-
* Publish a mixed sync-or-promise operation through the named graphql tracing channel.
188+
* Publish a mixed sync-or-promise operation through the named graphql tracing
189+
* channel.
190+
*
191+
* Mirrors Node's own `TracingChannel.tracePromise` for the async branch while
192+
* handling sync returns without the cost of a promise wrap. The entire
193+
* lifecycle runs inside `start.runStores`, which is what lets subscribers
194+
* that call `channel.start.bindStore(als, ...)` read that store in every
195+
* sub-channel handler: promise continuations attached inside a `runStores`
196+
* block inherit the AsyncLocalStorage context via async_hooks, so
197+
* `asyncStart` and `asyncEnd` fire with the same store active as `start`
198+
* and `end`.
199+
*
200+
* Subscribers can inspect `isPromise(ctx.result)` inside their `end` handler
201+
* to know whether `asyncEnd` will follow or the operation is complete. This
202+
* matches Node's convention.
189203
*
190204
* @internal
191205
*/
@@ -203,29 +217,44 @@ export function maybeTraceMixed<T>(
203217
result?: unknown;
204218
};
205219

206-
// traceSync fires start/end (and error, if fn throws synchronously)
207-
const result = channel.traceSync(fn, ctx);
208-
if (!isPromise(result)) {
209-
return result;
210-
}
220+
return channel.start.runStores(ctx, () => {
221+
let result: T | Promise<T>;
222+
try {
223+
result = fn();
224+
} catch (err) {
225+
ctx.error = err;
226+
channel.error.publish(ctx);
227+
channel.end.publish(ctx);
228+
throw err;
229+
}
211230

212-
// Fires off `asyncStart` and `asyncEnd` lifecycle events.
213-
channel.asyncStart.publish(ctx);
214-
return result
215-
.then(
231+
if (!isPromise(result)) {
232+
ctx.result = result;
233+
channel.end.publish(ctx);
234+
return result;
235+
}
236+
237+
channel.end.publish(ctx);
238+
return result.then(
216239
(value) => {
217240
ctx.result = value;
218-
219-
return value;
241+
channel.asyncStart.publish(ctx);
242+
try {
243+
return value;
244+
} finally {
245+
channel.asyncEnd.publish(ctx);
246+
}
220247
},
221248
(err: unknown) => {
222249
ctx.error = err;
223250
channel.error.publish(ctx);
224-
225-
throw err;
251+
channel.asyncStart.publish(ctx);
252+
try {
253+
throw err;
254+
} finally {
255+
channel.asyncEnd.publish(ctx);
256+
}
226257
},
227-
)
228-
.finally(() => {
229-
channel.asyncEnd.publish(ctx);
230-
});
258+
);
259+
});
231260
}

0 commit comments

Comments
 (0)