Skip to content

Commit 06e5bd0

Browse files
authored
fix(incremental): await async incremental cleanup (#4642)
1 parent ce68f97 commit 06e5bd0

File tree

8 files changed

+776
-49
lines changed

8 files changed

+776
-49
lines changed

src/execution/Executor.ts

Lines changed: 10 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -251,7 +251,12 @@ export class Executor<
251251
let removeExternalAbortListener: (() => void) | undefined;
252252
if (externalAbortSignal) {
253253
externalAbortSignal.throwIfAborted();
254-
const onExternalAbort = () => this.cancel(externalAbortSignal.reason);
254+
const onExternalAbort = () => {
255+
const aborted = this.abort(externalAbortSignal.reason);
256+
if (isPromise(aborted)) {
257+
aborted.catch(() => undefined);
258+
}
259+
};
255260
removeExternalAbortListener = () =>
256261
externalAbortSignal.removeEventListener('abort', onExternalAbort);
257262
externalAbortSignal.addEventListener('abort', onExternalAbort);
@@ -322,7 +327,7 @@ export class Executor<
322327
}
323328
}
324329

325-
cancel(reason?: unknown): void {
330+
abort(reason?: unknown): PromiseOrValue<void> {
326331
if (!this.finished) {
327332
this.finish();
328333
this.internalAbortController.abort(reason);
@@ -826,13 +831,15 @@ export class Executor<
826831
index++;
827832
}
828833
} catch (error) {
834+
// eslint-disable-next-line @typescript-eslint/no-floating-promises
829835
returnIteratorCatchingErrors(asyncIterator);
830836
throw error;
831837
}
832838

833839
// Throwing on completion outside of the loop may allow engines to better optimize
834840
if (this.finished) {
835841
if (!iteration?.done) {
842+
// eslint-disable-next-line @typescript-eslint/no-floating-promises
836843
returnIteratorCatchingErrors(asyncIterator);
837844
}
838845
throw new Error('Execution has already completed.');
@@ -959,6 +966,7 @@ export class Executor<
959966
index++;
960967
}
961968
} catch (error) {
969+
// eslint-disable-next-line @typescript-eslint/no-floating-promises
962970
returnIteratorCatchingErrors(iterator);
963971
throw error;
964972
}

src/execution/ExecutorThrowingOnIncremental.ts

Lines changed: 7 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,5 @@
11
import { invariant } from '../jsutils/invariant.js';
2+
import { isPromise } from '../jsutils/isPromise.js';
23
import type { ObjMap } from '../jsutils/ObjMap.js';
34
import type { Path } from '../jsutils/Path.js';
45
import type { PromiseOrValue } from '../jsutils/PromiseOrValue.js';
@@ -38,7 +39,8 @@ export class ExecutorThrowingOnIncremental extends Executor {
3839
'`@defer` directive not supported on subscription operations. Disable `@defer` by setting the `if` argument to `false`.',
3940
);
4041
const reason = new Error(UNEXPECTED_MULTIPLE_PAYLOADS);
41-
this.cancel(reason);
42+
const aborted = this.abort(reason);
43+
invariant(!isPromise(aborted));
4244
throw reason;
4345
}
4446
return this.executeRootGroupedFieldSet(
@@ -64,7 +66,8 @@ export class ExecutorThrowingOnIncremental extends Executor {
6466
'`@defer` directive not supported on subscription operations. Disable `@defer` by setting the `if` argument to `false`.',
6567
);
6668
const reason = new Error(UNEXPECTED_MULTIPLE_PAYLOADS);
67-
this.cancel(reason);
69+
const aborted = this.abort(reason);
70+
invariant(!isPromise(aborted));
6871
throw reason;
6972
}
7073

@@ -98,7 +101,8 @@ export class ExecutorThrowingOnIncremental extends Executor {
98101
);
99102

100103
const reason = new Error(UNEXPECTED_MULTIPLE_PAYLOADS);
101-
this.cancel(reason);
104+
const aborted = this.abort(reason);
105+
invariant(!isPromise(aborted));
102106
throw reason;
103107
}
104108

src/execution/incremental/IncrementalExecutor.ts

Lines changed: 45 additions & 40 deletions
Original file line numberDiff line numberDiff line change
@@ -318,25 +318,25 @@ export class IncrementalExecutor<
318318
);
319319
}
320320

321-
override cancel(reason?: unknown): void {
322-
super.cancel(reason);
321+
override abort(reason?: unknown): PromiseOrValue<void> {
322+
const abortPromises: Array<Promise<unknown>> = [];
323+
const superAborted = super.abort(reason);
324+
// Executor.abort is currently synchronous
325+
invariant(!isPromise(superAborted));
323326
for (const task of this.tasks) {
324327
const aborted = task.computation.abort(reason);
325-
/* c8 ignore start */
326-
// TODO: add coverage
327328
if (isPromise(aborted)) {
328-
aborted.catch(() => undefined);
329+
abortPromises.push(aborted);
329330
}
330-
/* c8 ignore stop */
331331
}
332332
for (const stream of this.streams) {
333333
const aborted = stream.queue.abort(reason);
334-
/* c8 ignore start */
335-
// TODO: add coverage
336334
if (isPromise(aborted)) {
337-
aborted.catch(() => undefined);
335+
abortPromises.push(aborted);
338336
}
339-
/* c8 ignore stop */
337+
}
338+
if (abortPromises.length > 0) {
339+
return Promise.allSettled(abortPromises).then(() => undefined);
340340
}
341341
}
342342

@@ -515,7 +515,7 @@ export class IncrementalExecutor<
515515
groupedFieldSet,
516516
deliveryGroupMap,
517517
),
518-
(reason) => executor.cancel(reason),
518+
(reason) => executor.abort(reason),
519519
),
520520
};
521521

@@ -552,7 +552,7 @@ export class IncrementalExecutor<
552552
deliveryGroupMap,
553553
);
554554
} catch (error) {
555-
this.cancel();
555+
ignoreAbortCleanup(this.abort());
556556
throw error;
557557
}
558558

@@ -561,7 +561,7 @@ export class IncrementalExecutor<
561561
(resolved) =>
562562
this.buildExecutionGroupResult(deliveryGroups, path, resolved),
563563
(error: unknown) => {
564-
this.cancel();
564+
ignoreAbortCleanup(this.abort());
565565
throw error;
566566
},
567567
);
@@ -600,13 +600,7 @@ export class IncrementalExecutor<
600600
const filteredTasks: Array<ExecutionGroup> = [];
601601
for (const task of tasks) {
602602
if (collectedErrors.hasNulledPosition(task.path)) {
603-
const aborted = task.computation.abort(cancellationReason);
604-
/* c8 ignore start */
605-
// TODO: add coverage
606-
if (isPromise(aborted)) {
607-
aborted.catch(() => undefined);
608-
}
609-
/* c8 ignore stop */
603+
ignoreAbortCleanup(task.computation.abort(cancellationReason));
610604
} else {
611605
filteredTasks.push(task);
612606
}
@@ -615,13 +609,7 @@ export class IncrementalExecutor<
615609
const filteredStreams: Array<ItemStream> = [];
616610
for (const stream of streams) {
617611
if (collectedErrors.hasNulledPosition(stream.path)) {
618-
const aborted = stream.queue.abort(cancellationReason);
619-
/* c8 ignore start */
620-
// TODO: add coverage
621-
if (isPromise(aborted)) {
622-
aborted.catch(() => undefined);
623-
}
624-
/* c8 ignore stop */
612+
ignoreAbortCleanup(stream.queue.abort(cancellationReason));
625613
} else {
626614
filteredStreams.push(stream);
627615
}
@@ -741,17 +729,29 @@ export class IncrementalExecutor<
741729
const { enableEarlyExecution } = this.validatedExecutionArgs;
742730
const queue = new Queue<StreamItemResult>(
743731
async ({ push, stop, onStop, started }) => {
744-
const cancelStreamItems = new Set<(reason?: unknown) => void>();
732+
const abortStreamItems = new Set<
733+
(reason?: unknown) => PromiseOrValue<void>
734+
>();
745735
let finishedNormally = false;
746736
let stopRequested = false;
747737

748738
onStop((reason) => {
749739
stopRequested = true;
750740
if (!finishedNormally) {
751-
cancelStreamItems.forEach((cancelStreamItem) =>
752-
cancelStreamItem(reason),
753-
);
754-
returnIteratorCatchingErrors(iterator);
741+
const abortPromises: Array<Promise<unknown>> = [];
742+
for (const abortStreamItem of abortStreamItems) {
743+
const result = abortStreamItem(reason);
744+
if (isPromise(result)) {
745+
abortPromises.push(result);
746+
}
747+
}
748+
const returned = returnIteratorCatchingErrors(iterator);
749+
if (isPromise(returned)) {
750+
abortPromises.push(returned);
751+
}
752+
if (abortPromises.length > 0) {
753+
return Promise.allSettled(abortPromises).then(() => undefined);
754+
}
755755
}
756756
});
757757
await (enableEarlyExecution ? Promise.resolve() : started);
@@ -783,7 +783,6 @@ export class IncrementalExecutor<
783783
finishedNormally = true;
784784
const stopped = stop();
785785
/* c8 ignore start */
786-
// TODO: add coverage
787786
if (isPromise(stopped)) {
788787
stopped.catch(() => undefined);
789788
}
@@ -804,11 +803,11 @@ export class IncrementalExecutor<
804803
);
805804
if (isPromise(streamItemResult)) {
806805
if (enableEarlyExecution) {
807-
const cancelStreamItem = (reason?: unknown) =>
808-
executor.cancel(reason);
809-
cancelStreamItems.add(cancelStreamItem);
806+
const abortStreamItem = (reason?: unknown) =>
807+
executor.abort(reason);
808+
abortStreamItems.add(abortStreamItem);
810809
streamItemResult = streamItemResult.finally(() => {
811-
cancelStreamItems.delete(cancelStreamItem);
810+
abortStreamItems.delete(abortStreamItem);
812811
});
813812
} else {
814813
// eslint-disable-next-line no-await-in-loop
@@ -864,7 +863,7 @@ export class IncrementalExecutor<
864863
},
865864
)
866865
.then(undefined, (error: unknown) => {
867-
this.cancel();
866+
ignoreAbortCleanup(this.abort());
868867
throw error;
869868
});
870869
}
@@ -885,7 +884,7 @@ export class IncrementalExecutor<
885884
return this.buildStreamItemResult(null);
886885
}
887886
} catch (error) {
888-
this.cancel();
887+
ignoreAbortCleanup(this.abort());
889888
throw error;
890889
}
891890

@@ -904,7 +903,7 @@ export class IncrementalExecutor<
904903
},
905904
)
906905
.then(undefined, (error: unknown) => {
907-
this.cancel();
906+
ignoreAbortCleanup(this.abort());
908907
throw error;
909908
});
910909
}
@@ -923,6 +922,12 @@ export class IncrementalExecutor<
923922
}
924923
}
925924

925+
function ignoreAbortCleanup(aborted: PromiseOrValue<void>): void {
926+
if (isPromise(aborted)) {
927+
aborted.catch(() => undefined);
928+
}
929+
}
930+
926931
function toNodes(fieldDetailsList: FieldDetailsList): ReadonlyArray<FieldNode> {
927932
return fieldDetailsList.map((fieldDetails) => fieldDetails.node);
928933
}

src/execution/incremental/__tests__/defer-test.ts

Lines changed: 91 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2989,6 +2989,97 @@ describe('Execute: defer directive', () => {
29892989
);
29902990
});
29912991

2992+
it('cancels pending deferred tasks with async child stream cleanup', async () => {
2993+
const abortController = new AbortController();
2994+
const document = parse(`
2995+
query {
2996+
todo {
2997+
id
2998+
... @defer {
2999+
items @stream(initialCount: 0)
3000+
author {
3001+
id
3002+
}
3003+
}
3004+
}
3005+
blocker
3006+
}
3007+
`);
3008+
3009+
const { promise: blockerPromise, resolve: resolveBlocker } =
3010+
promiseWithResolvers<string>();
3011+
const { promise: blockerStarted, resolve: resolveBlockerStarted } =
3012+
// eslint-disable-next-line @typescript-eslint/no-invalid-void-type
3013+
promiseWithResolvers<void>();
3014+
const { promise: authorPromise, resolve: resolveAuthor } =
3015+
promiseWithResolvers<{ id: string }>();
3016+
const { promise: authorStarted, resolve: resolveAuthorStarted } =
3017+
// eslint-disable-next-line @typescript-eslint/no-invalid-void-type
3018+
promiseWithResolvers<void>();
3019+
const { promise: itemsStarted, resolve: resolveItemsStarted } =
3020+
// eslint-disable-next-line @typescript-eslint/no-invalid-void-type
3021+
promiseWithResolvers<void>();
3022+
const { promise: returnCleanup, resolve: resolveReturnCleanup } =
3023+
// eslint-disable-next-line @typescript-eslint/no-invalid-void-type
3024+
promiseWithResolvers<void>();
3025+
let sourceReturnCalls = 0;
3026+
const asyncIterator = {
3027+
[Symbol.asyncIterator]() {
3028+
return this;
3029+
},
3030+
next() {
3031+
return new Promise<IteratorResult<string>>(() => {
3032+
/* never resolves */
3033+
});
3034+
},
3035+
async return() {
3036+
sourceReturnCalls += 1;
3037+
await returnCleanup;
3038+
return { value: undefined, done: true };
3039+
},
3040+
};
3041+
3042+
const resultPromise = experimentalExecuteIncrementally({
3043+
schema: cancellationSchema,
3044+
document,
3045+
enableEarlyExecution: true,
3046+
abortSignal: abortController.signal,
3047+
rootValue: {
3048+
blocker() {
3049+
resolveBlockerStarted();
3050+
return blockerPromise;
3051+
},
3052+
todo: {
3053+
id: 'todo',
3054+
items() {
3055+
resolveItemsStarted();
3056+
return asyncIterator;
3057+
},
3058+
author() {
3059+
resolveAuthorStarted();
3060+
return authorPromise;
3061+
},
3062+
},
3063+
},
3064+
});
3065+
3066+
await blockerStarted;
3067+
await itemsStarted;
3068+
await authorStarted;
3069+
abortController.abort();
3070+
await resolveOnNextTick();
3071+
3072+
expect(sourceReturnCalls).to.equal(1);
3073+
await expectPromise(resultPromise).toRejectWith(
3074+
'This operation was aborted',
3075+
);
3076+
3077+
resolveReturnCleanup();
3078+
resolveAuthor({ id: 'author' });
3079+
resolveBlocker('done');
3080+
await resolveOnNextTick();
3081+
});
3082+
29923083
it('should ignore deferred payloads resolved after cancellation', async () => {
29933084
const abortController = new AbortController();
29943085
const document = parse(`

0 commit comments

Comments
 (0)