Skip to content

Commit 2e97665

Browse files
authored
refactor(execution): track pending work (#4657)
canonize async work tracking so promises started during execution are still observed after early errors or abort paths route defaultTypeResolver promise handling through tracked async helpers motivation: - #4658
1 parent c2cee1b commit 2e97665

File tree

11 files changed

+269
-81
lines changed

11 files changed

+269
-81
lines changed

src/execution/AsyncWorkTracker.ts

Lines changed: 42 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,42 @@
1+
import { isPromise } from '../jsutils/isPromise.js';
2+
import type { PromiseOrValue } from '../jsutils/PromiseOrValue.js';
3+
4+
/** @internal */
5+
export class AsyncWorkTracker {
6+
pendingAsyncWork: Set<Promise<void>>;
7+
8+
constructor() {
9+
this.pendingAsyncWork = new Set<Promise<void>>();
10+
}
11+
12+
add(promise: Promise<unknown>): void {
13+
const pendingAsyncWork = this.pendingAsyncWork;
14+
const promiseToSettle = promise.then(
15+
() => {
16+
pendingAsyncWork.delete(promiseToSettle);
17+
},
18+
() => {
19+
pendingAsyncWork.delete(promiseToSettle);
20+
},
21+
);
22+
pendingAsyncWork.add(promiseToSettle);
23+
}
24+
25+
addValues(values: ReadonlyArray<PromiseOrValue<unknown>>): void {
26+
for (const value of values) {
27+
if (isPromise(value)) {
28+
this.add(value);
29+
}
30+
}
31+
}
32+
33+
promiseAllTrackOnReject<T>(
34+
values: ReadonlyArray<PromiseOrValue<T>>,
35+
): Promise<Array<T>> {
36+
const promise = Promise.all(values);
37+
promise.then(undefined, () => {
38+
this.addValues(values);
39+
});
40+
return promise;
41+
}
42+
}

src/execution/Executor.ts

Lines changed: 36 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,7 @@ import type {
3232
GraphQLObjectType,
3333
GraphQLOutputType,
3434
GraphQLResolveInfo,
35+
GraphQLResolveInfoHelpers,
3536
GraphQLTypeResolver,
3637
} from '../type/definition.js';
3738
import {
@@ -231,6 +232,10 @@ export class Executor<
231232
abortResultPromise: ((reason?: unknown) => void) | undefined;
232233
resolverAbortController: AbortController | undefined;
233234
getAbortSignal: () => AbortSignal | undefined;
235+
getAsyncHelpers: () => GraphQLResolveInfoHelpers;
236+
promiseAll: <T>(
237+
values: ReadonlyArray<PromiseOrValue<T>>,
238+
) => Promise<Array<T>>;
234239

235240
constructor(
236241
validatedExecutionArgs: ValidatedExecutionArgs,
@@ -249,8 +254,11 @@ export class Executor<
249254
} else {
250255
this.sharedExecutionContext = sharedExecutionContext;
251256
}
252-
const { getAbortSignal } = this.sharedExecutionContext;
257+
const { getAbortSignal, getAsyncHelpers, promiseAll } =
258+
this.sharedExecutionContext;
253259
this.getAbortSignal = getAbortSignal;
260+
this.getAsyncHelpers = getAsyncHelpers;
261+
this.promiseAll = promiseAll;
254262
}
255263

256264
executeQueryOrMutationOrSubscriptionEvent(): PromiseOrValue<
@@ -261,10 +269,7 @@ export class Executor<
261269
if (externalAbortSignal) {
262270
externalAbortSignal.throwIfAborted();
263271
const onExternalAbort = () => {
264-
const aborted = this.abort(externalAbortSignal.reason);
265-
if (isPromise(aborted)) {
266-
aborted.catch(() => undefined);
267-
}
272+
this.abort(externalAbortSignal.reason);
268273
};
269274
removeExternalAbortListener = () =>
270275
externalAbortSignal.removeEventListener('abort', onExternalAbort);
@@ -324,6 +329,7 @@ export class Executor<
324329
return this.buildResponse(null);
325330
},
326331
);
332+
this.sharedExecutionContext.asyncWorkTracker.add(promise);
327333
const { promise: cancellablePromise, abort: abortResultPromise } =
328334
withCancellation(promise);
329335
this.abortResultPromise = abortResultPromise;
@@ -347,7 +353,7 @@ export class Executor<
347353
}
348354
}
349355

350-
abort(reason?: unknown): PromiseOrValue<void> {
356+
abort(reason?: unknown): void {
351357
if (this.aborted) {
352358
return;
353359
}
@@ -506,8 +512,9 @@ export class Executor<
506512
}
507513
} catch (error) {
508514
if (containsPromise) {
509-
// Ensure that any promises returned by other fields are handled, as they may also reject.
510-
promiseForObject(results).catch(() => undefined);
515+
this.sharedExecutionContext.asyncWorkTracker.addValues(
516+
Object.values(results),
517+
);
511518
}
512519
throw error;
513520
}
@@ -520,7 +527,7 @@ export class Executor<
520527
// Otherwise, results is a map from field name to the result of resolving that
521528
// field, which is possibly a promise. Return a promise that will return this
522529
// same map, but with any promises replaced with the values they resolved to.
523-
return promiseForObject(results);
530+
return promiseForObject(results, this.promiseAll);
524531
}
525532

526533
/**
@@ -557,6 +564,7 @@ export class Executor<
557564
parentType,
558565
path,
559566
this.getAbortSignal,
567+
this.getAsyncHelpers,
560568
);
561569

562570
// Get the resolve function, regardless of if its result is normal or abrupt (error).
@@ -853,24 +861,30 @@ export class Executor<
853861
index++;
854862
}
855863
} catch (error) {
856-
// eslint-disable-next-line @typescript-eslint/no-floating-promises
857-
returnIteratorCatchingErrors(asyncIterator);
864+
this.sharedExecutionContext.asyncWorkTracker.add(
865+
returnIteratorCatchingErrors(asyncIterator),
866+
);
858867
if (containsPromise) {
859-
Promise.all(completedResults).catch(() => undefined);
868+
this.sharedExecutionContext.asyncWorkTracker.addValues(
869+
completedResults,
870+
);
860871
}
861872
throw error;
862873
}
863874

864875
// Throwing on completion outside of the loop may allow engines to better optimize
865876
if (this.aborted) {
866877
if (!iteration?.done) {
867-
// eslint-disable-next-line @typescript-eslint/no-floating-promises
868-
returnIteratorCatchingErrors(asyncIterator);
878+
this.sharedExecutionContext.asyncWorkTracker.add(
879+
returnIteratorCatchingErrors(asyncIterator),
880+
);
869881
}
870882
throw new Error('Aborted!');
871883
}
872884

873-
return containsPromise ? Promise.all(completedResults) : completedResults;
885+
return containsPromise
886+
? this.promiseAll(completedResults)
887+
: completedResults;
874888
}
875889

876890
/* c8 ignore next 12 */
@@ -991,15 +1005,17 @@ export class Executor<
9911005
index++;
9921006
}
9931007
} catch (error) {
994-
const maybePromises = containsPromise ? completedResults : [];
995-
maybePromises.push(...collectIteratorPromises(iterator));
996-
if (maybePromises.length) {
997-
Promise.all(maybePromises).catch(() => undefined);
1008+
const asyncWorkTracker = this.sharedExecutionContext.asyncWorkTracker;
1009+
if (containsPromise) {
1010+
asyncWorkTracker.addValues(completedResults);
9981011
}
1012+
asyncWorkTracker.addValues(collectIteratorPromises(iterator));
9991013
throw error;
10001014
}
10011015

1002-
return containsPromise ? Promise.all(completedResults) : completedResults;
1016+
return containsPromise
1017+
? this.promiseAll(completedResults)
1018+
: completedResults;
10031019
}
10041020

10051021
completeMaybePromisedListItemValue(
Lines changed: 105 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,105 @@
1+
import { expect } from 'chai';
2+
import { describe, it } from 'mocha';
3+
4+
import { expectEqualPromisesOrValues } from '../../__testUtils__/expectEqualPromisesOrValues.js';
5+
import { resolveOnNextTick } from '../../__testUtils__/resolveOnNextTick.js';
6+
7+
import { promiseWithResolvers } from '../../jsutils/promiseWithResolvers.js';
8+
9+
import { AsyncWorkTracker } from '../AsyncWorkTracker.js';
10+
11+
describe('AsyncWorkTracker', () => {
12+
it('works to track promises', async () => {
13+
const tracker = new AsyncWorkTracker();
14+
const delayed = promiseWithResolvers<number>();
15+
16+
tracker.add(delayed.promise);
17+
expect(tracker.pendingAsyncWork.size).to.equal(1);
18+
delayed.resolve(1);
19+
await resolveOnNextTick();
20+
expect(tracker.pendingAsyncWork.size).to.equal(0);
21+
});
22+
});
23+
24+
describe('promiseAllTrackOnReject', () => {
25+
it('resolves like Promise.all', async () => {
26+
const tracker = new AsyncWorkTracker();
27+
28+
const values = [Promise.resolve(1), Promise.resolve(2), Promise.resolve(3)];
29+
30+
await expectEqualPromisesOrValues([
31+
tracker.promiseAllTrackOnReject(values),
32+
Promise.all(values),
33+
]);
34+
});
35+
36+
it('resolves synchronous values without tracking', async () => {
37+
const tracker = new AsyncWorkTracker();
38+
39+
const result = await tracker.promiseAllTrackOnReject([1, 2, 3]);
40+
41+
expect(result).to.deep.equal([1, 2, 3]);
42+
expect(tracker.pendingAsyncWork.size).to.equal(0);
43+
});
44+
45+
it('does not add an extra microtask on fulfilled promiseAll results', async () => {
46+
const tracker = new AsyncWorkTracker();
47+
let settled = false;
48+
49+
const promise = Promise.resolve(1);
50+
const trackedPromise = tracker.promiseAllTrackOnReject([promise]);
51+
trackedPromise.then(
52+
() => {
53+
settled = true;
54+
},
55+
() => undefined,
56+
);
57+
await Promise.all([promise]);
58+
expect(settled).to.equal(true);
59+
});
60+
61+
it('tracks all promises only after rejection', async () => {
62+
const delayed = promiseWithResolvers<undefined>();
63+
const tracker = new AsyncWorkTracker();
64+
const result = tracker.promiseAllTrackOnReject([
65+
Promise.reject(new Error('bad')),
66+
delayed.promise,
67+
] as const);
68+
expect(tracker.pendingAsyncWork.size).to.equal(0);
69+
70+
await result.catch(() => undefined);
71+
expect(tracker.pendingAsyncWork.size).to.equal(1);
72+
delayed.resolve(undefined);
73+
74+
await resolveOnNextTick();
75+
expect(tracker.pendingAsyncWork.size).to.equal(0);
76+
});
77+
78+
it('tracks promises until they settle and catches later rejections', async () => {
79+
let unhandledRejection: unknown = null;
80+
const unhandledRejectionListener = (reason: unknown) => {
81+
unhandledRejection = reason;
82+
};
83+
// eslint-disable-next-line no-undef
84+
process.on('unhandledRejection', unhandledRejectionListener);
85+
86+
const tracker = new AsyncWorkTracker();
87+
const delayed = promiseWithResolvers<undefined>();
88+
const result = tracker.promiseAllTrackOnReject([
89+
Promise.reject(new Error('bad')),
90+
delayed.promise,
91+
] as const);
92+
93+
await result.catch(() => undefined);
94+
expect(tracker.pendingAsyncWork.size).to.equal(1);
95+
96+
delayed.reject(new Error('late bad'));
97+
await new Promise((resolve) => setTimeout(resolve, 20));
98+
99+
// eslint-disable-next-line no-undef
100+
process.removeListener('unhandledRejection', unhandledRejectionListener);
101+
102+
expect(tracker.pendingAsyncWork.size).to.equal(0);
103+
expect(unhandledRejection).to.equal(null);
104+
});
105+
});

src/execution/__tests__/executor-test.ts

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -258,7 +258,10 @@ describe('Execute: Handles basic execution tasks', () => {
258258
'operation',
259259
'variableValues',
260260
'getAbortSignal',
261+
'getAsyncHelpers',
261262
);
263+
const asyncHelpers = resolvedInfo?.getAsyncHelpers();
264+
expect(asyncHelpers).to.have.all.keys('track');
262265

263266
const operation = document.definitions[0];
264267
assert(operation.kind === Kind.OPERATION_DEFINITION);
@@ -295,6 +298,13 @@ describe('Execute: Handles basic execution tasks', () => {
295298
expect(abortSignal).to.be.instanceOf(AbortSignal);
296299
expect(resolvedInfo?.getAbortSignal()).to.equal(abortSignal);
297300

301+
expect(resolvedInfo?.getAsyncHelpers()).to.equal(asyncHelpers);
302+
303+
const track = asyncHelpers?.track;
304+
expect(track).to.be.a('function');
305+
expect(resolvedInfo?.getAsyncHelpers().track).to.equal(track);
306+
track?.([Promise.resolve()]);
307+
298308
resolve();
299309

300310
await result;
Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,12 +1,38 @@
1+
import type { PromiseOrValue } from '../jsutils/PromiseOrValue.js';
2+
3+
import type { GraphQLResolveInfoHelpers } from '../type/index.js';
4+
5+
import { AsyncWorkTracker } from './AsyncWorkTracker.js';
6+
17
/** @internal */
28
export interface SharedExecutionContext {
9+
asyncWorkTracker: AsyncWorkTracker;
310
getAbortSignal: () => AbortSignal | undefined;
11+
getAsyncHelpers: () => GraphQLResolveInfoHelpers;
12+
promiseAll: <T>(
13+
values: ReadonlyArray<PromiseOrValue<T>>,
14+
) => Promise<Array<T>>;
415
}
516

617
export function createSharedExecutionContext(
718
abortSignal: AbortSignal | undefined,
819
): SharedExecutionContext {
20+
const asyncWorkTracker = new AsyncWorkTracker();
21+
let resolveInfoHelpers: GraphQLResolveInfoHelpers | undefined;
22+
23+
const promiseAll = <T>(
24+
values: ReadonlyArray<PromiseOrValue<T>>,
25+
): Promise<Array<T>> => asyncWorkTracker.promiseAllTrackOnReject(values);
26+
27+
const getAsyncHelpers = (): GraphQLResolveInfoHelpers =>
28+
(resolveInfoHelpers ??= {
29+
track: (maybePromises) => asyncWorkTracker.addValues(maybePromises),
30+
});
31+
932
return {
33+
asyncWorkTracker,
1034
getAbortSignal: () => abortSignal,
35+
getAsyncHelpers,
36+
promiseAll,
1137
};
1238
}

0 commit comments

Comments
 (0)