Skip to content

Commit 90433d1

Browse files
committed
feat(execution): publish on graphql:subscribe tracing channel
Wraps the public subscribe() entry point with maybeTraceMixed using the same ExecutionArgs-based context factory as graphql:execute. The subscription setup path may return sync (when the subscribe resolver is synchronous) or async (when the resolver returns a promise resolving to an AsyncIterable), so maybeTraceMixed's sync / promise branching fits exactly. Per-subscription-event executions continue to publish on graphql:execute via executeSubscriptionEvent, which was wired in the previous commit. The graphql:subscribe context therefore owns the document reference and covers the setup span; subscribers that need to correlate per-event execute spans to their parent subscription use AsyncLocalStorage context propagation provided by Channel.runStores.
1 parent 6b3f282 commit 90433d1

File tree

3 files changed

+261
-72
lines changed

3 files changed

+261
-72
lines changed

integrationTests/diagnostics/test.js

Lines changed: 115 additions & 58 deletions
Original file line numberDiff line numberDiff line change
@@ -10,68 +10,70 @@ import {
1010
enableDiagnosticsChannel,
1111
execute,
1212
parse,
13+
subscribe,
1314
validate,
1415
} from 'graphql';
1516

1617
enableDiagnosticsChannel(dc);
1718

18-
// graphql:parse - synchronous
19-
{
20-
const events = [];
21-
const handler = {
22-
start: (msg) => events.push({ kind: 'start', source: msg.source }),
23-
end: (msg) => events.push({ kind: 'end', source: msg.source }),
24-
asyncStart: (msg) =>
25-
events.push({ kind: 'asyncStart', source: msg.source }),
26-
asyncEnd: (msg) => events.push({ kind: 'asyncEnd', source: msg.source }),
27-
error: (msg) =>
28-
events.push({ kind: 'error', source: msg.source, error: msg.error }),
29-
};
30-
31-
const channel = dc.tracingChannel('graphql:parse');
32-
channel.subscribe(handler);
33-
34-
try {
35-
const doc = parse('{ field }');
36-
assert.equal(doc.kind, 'Document');
37-
assert.deepEqual(
38-
events.map((e) => e.kind),
39-
['start', 'end'],
40-
);
41-
assert.equal(events[0].source, '{ field }');
42-
assert.equal(events[1].source, '{ field }');
43-
} finally {
44-
channel.unsubscribe(handler);
19+
function runParseCases() {
20+
// graphql:parse - synchronous.
21+
{
22+
const events = [];
23+
const handler = {
24+
start: (msg) => events.push({ kind: 'start', source: msg.source }),
25+
end: (msg) => events.push({ kind: 'end', source: msg.source }),
26+
asyncStart: (msg) =>
27+
events.push({ kind: 'asyncStart', source: msg.source }),
28+
asyncEnd: (msg) => events.push({ kind: 'asyncEnd', source: msg.source }),
29+
error: (msg) =>
30+
events.push({ kind: 'error', source: msg.source, error: msg.error }),
31+
};
32+
33+
const channel = dc.tracingChannel('graphql:parse');
34+
channel.subscribe(handler);
35+
36+
try {
37+
const doc = parse('{ field }');
38+
assert.equal(doc.kind, 'Document');
39+
assert.deepEqual(
40+
events.map((e) => e.kind),
41+
['start', 'end'],
42+
);
43+
assert.equal(events[0].source, '{ field }');
44+
assert.equal(events[1].source, '{ field }');
45+
} finally {
46+
channel.unsubscribe(handler);
47+
}
4548
}
46-
}
4749

48-
// graphql:parse - error path fires start, error, end (traceSync finally-emits end)
49-
{
50-
const events = [];
51-
const handler = {
52-
start: (msg) => events.push({ kind: 'start', source: msg.source }),
53-
end: (msg) => events.push({ kind: 'end', source: msg.source }),
54-
error: (msg) =>
55-
events.push({ kind: 'error', source: msg.source, error: msg.error }),
56-
};
57-
58-
const channel = dc.tracingChannel('graphql:parse');
59-
channel.subscribe(handler);
60-
61-
try {
62-
assert.throws(() => parse('{ '));
63-
assert.deepEqual(
64-
events.map((e) => e.kind),
65-
['start', 'error', 'end'],
66-
);
67-
assert.ok(events[1].error instanceof Error);
68-
} finally {
69-
channel.unsubscribe(handler);
50+
// graphql:parse - error path fires start, error, end.
51+
{
52+
const events = [];
53+
const handler = {
54+
start: (msg) => events.push({ kind: 'start', source: msg.source }),
55+
end: (msg) => events.push({ kind: 'end', source: msg.source }),
56+
error: (msg) =>
57+
events.push({ kind: 'error', source: msg.source, error: msg.error }),
58+
};
59+
60+
const channel = dc.tracingChannel('graphql:parse');
61+
channel.subscribe(handler);
62+
63+
try {
64+
assert.throws(() => parse('{ '));
65+
assert.deepEqual(
66+
events.map((e) => e.kind),
67+
['start', 'error', 'end'],
68+
);
69+
assert.ok(events[1].error instanceof Error);
70+
} finally {
71+
channel.unsubscribe(handler);
72+
}
7073
}
7174
}
7275

73-
// graphql:validate - synchronous, with schema/document context
74-
{
76+
function runValidateCase() {
7577
const schema = buildSchema(`type Query { field: String }`);
7678
const doc = parse('{ field }');
7779

@@ -104,9 +106,7 @@ enableDiagnosticsChannel(dc);
104106
}
105107
}
106108

107-
// graphql:execute - sync path, ctx carries operationType, operationName,
108-
// document, schema.
109-
{
109+
function runExecuteCase() {
110110
const schema = buildSchema(`type Query { hello: String }`);
111111
const document = parse('query Greeting { hello }');
112112

@@ -149,10 +149,67 @@ enableDiagnosticsChannel(dc);
149149
}
150150
}
151151

152-
// No-op when nothing is subscribed - parse still succeeds.
153-
{
152+
async function runSubscribeCase() {
153+
async function* ticks() {
154+
yield { tick: 'one' };
155+
}
156+
157+
const schema = buildSchema(`
158+
type Query { dummy: String }
159+
type Subscription { tick: String }
160+
`);
161+
// buildSchema doesn't attach a subscribe resolver to fields; inject one.
162+
schema.getSubscriptionType().getFields().tick.subscribe = () => ticks();
163+
164+
const document = parse('subscription Tick { tick }');
165+
166+
const events = [];
167+
const handler = {
168+
start: (msg) =>
169+
events.push({
170+
kind: 'start',
171+
operationType: msg.operationType,
172+
operationName: msg.operationName,
173+
}),
174+
end: () => events.push({ kind: 'end' }),
175+
asyncStart: () => events.push({ kind: 'asyncStart' }),
176+
asyncEnd: () => events.push({ kind: 'asyncEnd' }),
177+
error: (msg) => events.push({ kind: 'error', error: msg.error }),
178+
};
179+
180+
const channel = dc.tracingChannel('graphql:subscribe');
181+
channel.subscribe(handler);
182+
183+
try {
184+
const result = subscribe({ schema, document });
185+
const stream = typeof result.then === 'function' ? await result : result;
186+
if (stream[Symbol.asyncIterator]) {
187+
await stream.return?.();
188+
}
189+
// Subscription setup is synchronous here; start/end fire, no async tail.
190+
assert.deepEqual(
191+
events.map((e) => e.kind),
192+
['start', 'end'],
193+
);
194+
assert.equal(events[0].operationType, 'subscription');
195+
assert.equal(events[0].operationName, 'Tick');
196+
} finally {
197+
channel.unsubscribe(handler);
198+
}
199+
}
200+
201+
function runNoSubscriberCase() {
154202
const doc = parse('{ field }');
155203
assert.equal(doc.kind, 'Document');
156204
}
157205

158-
console.log('diagnostics integration test passed');
206+
async function main() {
207+
runParseCases();
208+
runValidateCase();
209+
runExecuteCase();
210+
await runSubscribeCase();
211+
runNoSubscriberCase();
212+
console.log('diagnostics integration test passed');
213+
}
214+
215+
main();
Lines changed: 130 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,130 @@
1+
import { expect } from 'chai';
2+
import { afterEach, beforeEach, describe, it } from 'mocha';
3+
4+
import {
5+
collectEvents,
6+
FakeDc,
7+
} from '../../__testUtils__/fakeDiagnosticsChannel.js';
8+
9+
import { isPromise } from '../../jsutils/isPromise.js';
10+
11+
import { parse } from '../../language/parser.js';
12+
13+
import { GraphQLObjectType } from '../../type/definition.js';
14+
import { GraphQLString } from '../../type/scalars.js';
15+
import { GraphQLSchema } from '../../type/schema.js';
16+
17+
import { enableDiagnosticsChannel } from '../../diagnostics.js';
18+
19+
import { subscribe } from '../execute.js';
20+
21+
function buildSubscriptionSchema(
22+
subscribeFn: () => AsyncIterable<{ tick: string }>,
23+
): GraphQLSchema {
24+
return new GraphQLSchema({
25+
query: new GraphQLObjectType({
26+
name: 'Query',
27+
fields: { dummy: { type: GraphQLString } },
28+
}),
29+
subscription: new GraphQLObjectType({
30+
name: 'Subscription',
31+
fields: {
32+
tick: {
33+
type: GraphQLString,
34+
subscribe: subscribeFn,
35+
},
36+
},
37+
}),
38+
});
39+
}
40+
41+
async function* twoTicks(): AsyncIterable<{ tick: string }> {
42+
await Promise.resolve();
43+
yield { tick: 'one' };
44+
yield { tick: 'two' };
45+
}
46+
47+
const fakeDc = new FakeDc();
48+
const subscribeChannel = fakeDc.tracingChannel('graphql:subscribe');
49+
50+
describe('subscribe diagnostics channel', () => {
51+
let active: ReturnType<typeof collectEvents> | undefined;
52+
53+
beforeEach(() => {
54+
enableDiagnosticsChannel(fakeDc);
55+
});
56+
57+
afterEach(() => {
58+
active?.unsubscribe();
59+
active = undefined;
60+
});
61+
62+
it('emits start and end for a synchronous subscription setup', async () => {
63+
active = collectEvents(subscribeChannel);
64+
65+
const schema = buildSubscriptionSchema(twoTicks);
66+
const document = parse('subscription S { tick }');
67+
68+
const result = subscribe({ schema, document });
69+
const resolved = isPromise(result) ? await result : result;
70+
if (!(Symbol.asyncIterator in resolved)) {
71+
throw new Error('Expected an async iterator');
72+
}
73+
await resolved.return?.();
74+
75+
expect(active.events.map((e) => e.kind)).to.deep.equal(['start', 'end']);
76+
expect(active.events[0].ctx.operationType).to.equal('subscription');
77+
expect(active.events[0].ctx.operationName).to.equal('S');
78+
expect(active.events[0].ctx.document).to.equal(document);
79+
expect(active.events[0].ctx.schema).to.equal(schema);
80+
});
81+
82+
it('emits the full async lifecycle when subscribe resolver returns a promise', async () => {
83+
active = collectEvents(subscribeChannel);
84+
85+
const asyncResolver = (): Promise<AsyncIterable<{ tick: string }>> =>
86+
Promise.resolve(twoTicks());
87+
const schema = buildSubscriptionSchema(
88+
asyncResolver as unknown as () => AsyncIterable<{ tick: string }>,
89+
);
90+
const document = parse('subscription { tick }');
91+
92+
const result = subscribe({ schema, document });
93+
const resolved = isPromise(result) ? await result : result;
94+
if (!(Symbol.asyncIterator in resolved)) {
95+
throw new Error('Expected an async iterator');
96+
}
97+
await resolved.return?.();
98+
99+
expect(active.events.map((e) => e.kind)).to.deep.equal([
100+
'start',
101+
'end',
102+
'asyncStart',
103+
'asyncEnd',
104+
]);
105+
});
106+
107+
it('emits only start and end for a synchronous validation failure', () => {
108+
active = collectEvents(subscribeChannel);
109+
110+
const schema = buildSubscriptionSchema(twoTicks);
111+
// Invalid: no operation.
112+
const document = parse('fragment F on Subscription { tick }');
113+
114+
// eslint-disable-next-line @typescript-eslint/no-floating-promises
115+
subscribe({ schema, document });
116+
117+
expect(active.events.map((e) => e.kind)).to.deep.equal(['start', 'end']);
118+
});
119+
120+
it('does nothing when no subscribers are attached', async () => {
121+
const schema = buildSubscriptionSchema(twoTicks);
122+
const document = parse('subscription { tick }');
123+
124+
const result = subscribe({ schema, document });
125+
const resolved = isPromise(result) ? await result : result;
126+
if (Symbol.asyncIterator in resolved) {
127+
await resolved.return?.();
128+
}
129+
});
130+
});

src/execution/execute.ts

Lines changed: 16 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -282,24 +282,26 @@ export function subscribe(
282282
): PromiseOrValue<
283283
AsyncGenerator<ExecutionResult, void, void> | ExecutionResult
284284
> {
285-
// If a valid execution context cannot be created due to incorrect arguments,
286-
// a "Response" with only errors is returned.
287-
const validatedExecutionArgs = validateExecutionArgs(args);
285+
return maybeTraceMixed('subscribe', buildExecuteCtxFromArgs(args), () => {
286+
// If a valid execution context cannot be created due to incorrect
287+
// arguments, a "Response" with only errors is returned.
288+
const validatedExecutionArgs = validateExecutionArgs(args);
288289

289-
// Return early errors if execution context failed.
290-
if (!('schema' in validatedExecutionArgs)) {
291-
return { errors: validatedExecutionArgs };
292-
}
290+
// Return early errors if execution context failed.
291+
if (!('schema' in validatedExecutionArgs)) {
292+
return { errors: validatedExecutionArgs };
293+
}
293294

294-
const resultOrStream = createSourceEventStreamImpl(validatedExecutionArgs);
295+
const resultOrStream = createSourceEventStreamImpl(validatedExecutionArgs);
295296

296-
if (isPromise(resultOrStream)) {
297-
return resultOrStream.then((resolvedResultOrStream) =>
298-
mapSourceToResponse(validatedExecutionArgs, resolvedResultOrStream),
299-
);
300-
}
297+
if (isPromise(resultOrStream)) {
298+
return resultOrStream.then((resolvedResultOrStream) =>
299+
mapSourceToResponse(validatedExecutionArgs, resolvedResultOrStream),
300+
);
301+
}
301302

302-
return mapSourceToResponse(validatedExecutionArgs, resultOrStream);
303+
return mapSourceToResponse(validatedExecutionArgs, resultOrStream);
304+
});
303305
}
304306

305307
/**

0 commit comments

Comments
 (0)