Skip to content

Commit 2d309c6

Browse files
authored
use single phase of resolver abortSignal cancellation (#4554)
Previously, we used a lazily created abortSignal per resolver so as to shift triggering the abortSignal for a resolver that returned a streamed asyncIterable from the original executor to the stream. This is expensive and difficult to reason about and has been replaced by a single phase of cancellation when the entire execution finishes. Note: when executing a subscription, we are still able to cancel resolver abortSignals after execution of each subscription event. Additional note: this is labelled "bug fix" rather than "polish" because it is technically observable.
1 parent b52b05e commit 2d309c6

5 files changed

Lines changed: 62 additions & 84 deletions

File tree

src/execution/Executor.ts

Lines changed: 35 additions & 70 deletions
Original file line numberDiff line numberDiff line change
@@ -217,40 +217,44 @@ export class Executor<
217217
> {
218218
validatedExecutionArgs: ValidatedExecutionArgs;
219219
finished: boolean;
220-
initialResponseAbortController: AbortController | undefined;
221-
resolverAbortControllers: Map<Path, AbortController>;
222220
collectedErrors: CollectedErrors;
221+
internalAbortController: AbortController;
222+
resolverAbortController: AbortController | undefined;
223+
sharedResolverAbortSignal: AbortSignal;
223224

224-
constructor(validatedExecutionArgs: ValidatedExecutionArgs) {
225+
constructor(
226+
validatedExecutionArgs: ValidatedExecutionArgs,
227+
sharedResolverAbortSignal?: AbortSignal,
228+
) {
225229
this.validatedExecutionArgs = validatedExecutionArgs;
226230
this.finished = false;
227-
this.resolverAbortControllers = new Map();
228231
this.collectedErrors = new CollectedErrors();
232+
this.internalAbortController = new AbortController();
233+
234+
if (sharedResolverAbortSignal === undefined) {
235+
this.resolverAbortController = new AbortController();
236+
this.sharedResolverAbortSignal = this.resolverAbortController.signal;
237+
} else {
238+
this.sharedResolverAbortSignal = sharedResolverAbortSignal;
239+
}
229240
}
230241

231242
executeQueryOrMutationOrSubscriptionEvent(): PromiseOrValue<
232243
ExecutionResult | TAlternativeInitialResponse
233244
> {
234-
const abortController = (this.initialResponseAbortController =
235-
new AbortController());
236-
237-
const validatedExecutionArgs = this.validatedExecutionArgs;
238-
const externalAbortSignal = validatedExecutionArgs.externalAbortSignal;
239-
let removeAbortListener: (() => void) | undefined;
245+
const externalAbortSignal = this.validatedExecutionArgs.externalAbortSignal;
246+
let removeExternalAbortListener: (() => void) | undefined;
240247
if (externalAbortSignal) {
241-
if (externalAbortSignal.aborted) {
242-
throw new Error(externalAbortSignal.reason);
243-
}
248+
externalAbortSignal.throwIfAborted();
244249
const onExternalAbort = () => this.cancel(externalAbortSignal.reason);
245-
removeAbortListener = () =>
250+
removeExternalAbortListener = () =>
246251
externalAbortSignal.removeEventListener('abort', onExternalAbort);
247252
externalAbortSignal.addEventListener('abort', onExternalAbort);
248253
}
249254

250255
const onFinish = () => {
251-
removeAbortListener?.();
252256
this.finish();
253-
abortController.signal.throwIfAborted();
257+
removeExternalAbortListener?.();
254258
};
255259

256260
try {
@@ -261,7 +265,7 @@ export class Executor<
261265
operation,
262266
variableValues,
263267
hideSuggestions,
264-
} = validatedExecutionArgs;
268+
} = this.validatedExecutionArgs;
265269

266270
const { operation: operationType, selectionSet } = operation;
267271

@@ -302,53 +306,30 @@ export class Executor<
302306
return this.buildResponse(null);
303307
},
304308
);
305-
return externalAbortSignal
306-
? cancellablePromise(promise, abortController.signal)
307-
: promise;
309+
return cancellablePromise(promise, this.internalAbortController.signal);
308310
}
309311
onFinish();
310312
return this.buildResponse(result);
311313
} catch (error) {
312-
this.collectedErrors.add(error as GraphQLError, undefined);
313314
onFinish();
315+
this.collectedErrors.add(error as GraphQLError, undefined);
314316
return this.buildResponse(null);
315317
}
316318
}
317319

318-
cancel(reason: unknown): void {
320+
cancel(reason?: unknown): void {
319321
if (!this.finished) {
320-
this.initialResponseAbortController?.abort(reason);
321-
this.finish(reason);
322+
this.finish();
323+
this.internalAbortController.abort(reason);
324+
this.resolverAbortController?.abort(reason);
322325
}
323326
}
324327

325-
finish(reason?: unknown): void {
328+
finish(): void {
326329
if (!this.finished) {
327330
this.finished = true;
328-
this.triggerResolverAbortSignals(reason);
329-
}
330-
}
331-
332-
triggerResolverAbortSignals(reason?: unknown): void {
333-
const { resolverAbortControllers } = this;
334-
const finishReason =
335-
reason ?? new Error('Execution has already completed.');
336-
for (const abortController of resolverAbortControllers.values()) {
337-
abortController.abort(finishReason);
338331
}
339-
}
340-
341-
getAbortSignal(path: Path): AbortSignal {
342-
const resolverAbortSignal = this.resolverAbortControllers.get(path)?.signal;
343-
if (resolverAbortSignal !== undefined) {
344-
return resolverAbortSignal;
345-
}
346-
const abortController = new AbortController();
347-
this.resolverAbortControllers.set(path, abortController);
348-
if (this.finished) {
349-
abortController.abort(new Error('Execution has already completed.'));
350-
}
351-
return abortController.signal;
332+
this.internalAbortController.signal.throwIfAborted();
352333
}
353334

354335
/**
@@ -358,6 +339,7 @@ export class Executor<
358339
buildResponse(
359340
data: ObjMap<unknown> | null,
360341
): ExecutionResult | TAlternativeInitialResponse {
342+
this.resolverAbortController?.abort();
361343
const errors = this.collectedErrors.errors;
362344
return errors.length ? { errors, data } : { data };
363345
}
@@ -542,7 +524,7 @@ export class Executor<
542524
toNodes(fieldDetailsList),
543525
parentType,
544526
path,
545-
() => this.getAbortSignal(path),
527+
() => this.sharedResolverAbortSignal,
546528
);
547529

548530
// Get the resolve function, regardless of if its result is normal or abrupt (error).
@@ -571,7 +553,6 @@ export class Executor<
571553
path,
572554
result,
573555
positionContext,
574-
true,
575556
);
576557
}
577558

@@ -587,22 +568,13 @@ export class Executor<
587568
if (isPromise(completed)) {
588569
// Note: we don't rely on a `catch` method, but we do expect "thenable"
589570
// to take a second callback for the error case.
590-
return completed.then(
591-
(resolved) => {
592-
this.resolverAbortControllers.delete(path);
593-
return resolved;
594-
},
595-
(rawError: unknown) => {
596-
this.resolverAbortControllers.delete(path);
597-
this.handleFieldError(rawError, returnType, fieldDetailsList, path);
598-
return null;
599-
},
600-
);
571+
return completed.then(undefined, (rawError: unknown) => {
572+
this.handleFieldError(rawError, returnType, fieldDetailsList, path);
573+
return null;
574+
});
601575
}
602-
this.resolverAbortControllers.delete(path);
603576
return completed;
604577
} catch (rawError) {
605-
this.resolverAbortControllers.delete(path);
606578
this.handleFieldError(rawError, returnType, fieldDetailsList, path);
607579
return null;
608580
}
@@ -755,7 +727,6 @@ export class Executor<
755727
path: Path,
756728
result: Promise<unknown>,
757729
positionContext: TPositionContext | undefined,
758-
isFieldValue?: boolean,
759730
): Promise<unknown> {
760731
try {
761732
const resolved = await result;
@@ -774,14 +745,8 @@ export class Executor<
774745
if (isPromise(completed)) {
775746
completed = await completed;
776747
}
777-
if (isFieldValue) {
778-
this.resolverAbortControllers.delete(path);
779-
}
780748
return completed;
781749
} catch (rawError) {
782-
if (isFieldValue) {
783-
this.resolverAbortControllers.delete(path);
784-
}
785750
this.handleFieldError(rawError, returnType, fieldDetailsList, path);
786751
return null;
787752
}

src/execution/incremental/IncrementalExecutor.ts

Lines changed: 11 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -264,9 +264,10 @@ export class IncrementalExecutor<
264264

265265
constructor(
266266
validatedExecutionArgs: ValidatedExecutionArgs,
267+
sharedResolverAbortSignal?: AbortSignal,
267268
deferUsageSet?: DeferUsageSet,
268269
) {
269-
super(validatedExecutionArgs);
270+
super(validatedExecutionArgs, sharedResolverAbortSignal);
270271
this.deferUsageSet = deferUsageSet;
271272
this.groups = [];
272273
this.tasks = [];
@@ -276,7 +277,11 @@ export class IncrementalExecutor<
276277
createSubExecutor(
277278
deferUsageSet?: DeferUsageSet,
278279
): IncrementalExecutor<TExperimental> {
279-
return new IncrementalExecutor(this.validatedExecutionArgs, deferUsageSet);
280+
return new IncrementalExecutor(
281+
this.validatedExecutionArgs,
282+
this.sharedResolverAbortSignal,
283+
deferUsageSet,
284+
);
280285
}
281286

282287
override cancel(reason?: unknown): void {
@@ -296,20 +301,21 @@ export class IncrementalExecutor<
296301
override buildResponse(
297302
data: ObjMap<unknown> | null,
298303
): ExecutionResult | TExperimental {
299-
const errors = this.collectedErrors.errors;
300304
const work = this.getIncrementalWork();
301305
const { tasks, streams } = work;
302306
if (tasks?.length === 0 && streams?.length === 0) {
303-
return errors.length ? { errors, data } : { data };
307+
return super.buildResponse(data);
304308
}
305309

310+
const errors = this.collectedErrors.errors;
306311
invariant(data !== null);
307312
const incrementalPublisher = new IncrementalPublisher();
308313
return incrementalPublisher.buildResponse(
309314
data,
310315
errors,
311316
work,
312317
this.validatedExecutionArgs.externalAbortSignal,
318+
() => this.resolverAbortController?.abort(),
313319
) as TExperimental;
314320
}
315321

@@ -509,6 +515,7 @@ export class IncrementalExecutor<
509515
(resolved) =>
510516
this.buildExecutionGroupResult(deliveryGroups, path, resolved),
511517
(error: unknown) => {
518+
this.cancel();
512519
throw error;
513520
},
514521
);

src/execution/incremental/IncrementalPublisher.ts

Lines changed: 6 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -47,6 +47,7 @@ export class IncrementalPublisher {
4747
errors: ReadonlyArray<GraphQLError>,
4848
work: IncrementalWork,
4949
abortSignal: AbortSignal | undefined,
50+
onFinished: () => void,
5051
): ExperimentalIncrementalExecutionResults {
5152
const { initialGroups, initialStreams, events } = createWorkQueue<
5253
ExecutionGroupValue,
@@ -61,12 +62,13 @@ export class IncrementalPublisher {
6162
});
6263
}
6364

64-
let onWorkQueueFinished: (() => void) | undefined;
6565
if (abortSignal) {
6666
abortSignal.addEventListener('abort', abort);
67-
onWorkQueueFinished = () =>
68-
abortSignal.removeEventListener('abort', abort);
6967
}
68+
const onWorkQueueFinished = () => {
69+
onFinished();
70+
abortSignal?.removeEventListener('abort', abort);
71+
};
7072

7173
const pending = this._toPendingResults(initialGroups, initialStreams);
7274

@@ -78,7 +80,7 @@ export class IncrementalPublisher {
7880
mapAsyncIterable(events, (batch) =>
7981
this._handleBatch(batch, onWorkQueueFinished),
8082
),
81-
() => onWorkQueueFinished?.(),
83+
() => onWorkQueueFinished(),
8284
);
8385

8486
return {

src/execution/legacyIncremental/BranchingIncrementalExecutor.ts

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -89,27 +89,29 @@ export class BranchingIncrementalExecutor extends IncrementalExecutor<Experiment
8989
): IncrementalExecutor<ExperimentalIncrementalExecutionResults> {
9090
return new BranchingIncrementalExecutor(
9191
this.validatedExecutionArgs,
92+
this.sharedResolverAbortSignal,
9293
deferUsageSet,
9394
);
9495
}
9596

9697
override buildResponse(
9798
data: ObjMap<unknown> | null,
9899
): ExecutionResult | ExperimentalIncrementalExecutionResults {
99-
const errors = this.collectedErrors.errors;
100100
const work = this.getIncrementalWork();
101101
const { tasks, streams } = work;
102102
if (tasks?.length === 0 && streams?.length === 0) {
103-
return errors.length ? { errors, data } : { data };
103+
return super.buildResponse(data);
104104
}
105105

106+
const errors = this.collectedErrors.errors;
106107
invariant(data !== null);
107108
const incrementalPublisher = new BranchingIncrementalPublisher();
108109
return incrementalPublisher.buildResponse(
109110
data,
110111
errors,
111112
work,
112113
this.validatedExecutionArgs.externalAbortSignal,
114+
() => this.resolverAbortController?.abort(),
113115
);
114116
}
115117

src/execution/legacyIncremental/BranchingIncrementalPublisher.ts

Lines changed: 6 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -44,6 +44,7 @@ export class BranchingIncrementalPublisher {
4444
errors: ReadonlyArray<GraphQLError>,
4545
work: IncrementalWork,
4646
abortSignal: AbortSignal | undefined,
47+
onFinished: () => void,
4748
): ExperimentalIncrementalExecutionResults {
4849
const { initialStreams, events } = createWorkQueue<
4950
ExecutionGroupValue,
@@ -62,12 +63,13 @@ export class BranchingIncrementalPublisher {
6263
});
6364
}
6465

65-
let onWorkQueueFinished: (() => void) | undefined;
6666
if (abortSignal) {
6767
abortSignal.addEventListener('abort', abort);
68-
onWorkQueueFinished = () =>
69-
abortSignal.removeEventListener('abort', abort);
7068
}
69+
const onWorkQueueFinished = () => {
70+
onFinished();
71+
abortSignal?.removeEventListener('abort', abort);
72+
};
7173

7274
const initialResult: InitialIncrementalExecutionResult = errors.length
7375
? { errors, data, hasNext: true }
@@ -77,7 +79,7 @@ export class BranchingIncrementalPublisher {
7779
mapAsyncIterable(events, (batch) =>
7880
this._handleBatch(batch, onWorkQueueFinished),
7981
),
80-
() => onWorkQueueFinished?.(),
82+
() => onWorkQueueFinished(),
8183
);
8284

8385
return {

0 commit comments

Comments
 (0)