Skip to content

Commit e89ca8c

Browse files
authored
allow forwarding of cancellation reasons within computations/queues (#4636)
This is currently not directly testable (I think!) without dipping into Executor internals because resolvers currently use a shared abort signal, and so at this point it's just for polish. Although that may change in the future and this hardens the shape of our abort semantics.
1 parent 6c0733e commit e89ca8c

File tree

7 files changed

+147
-36
lines changed

7 files changed

+147
-36
lines changed

src/execution/incremental/Computation.ts

Lines changed: 9 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -9,9 +9,12 @@ type MaybePromise<T> =
99
/** @internal **/
1010
export class Computation<T> {
1111
private _fn: () => PromiseOrValue<T>;
12-
private _onCancel: (() => void) | undefined;
12+
private _onCancel: ((reason?: unknown) => void) | undefined;
1313
private _maybePromise?: MaybePromise<T>;
14-
constructor(fn: () => PromiseOrValue<T>, onCancel?: () => void) {
14+
constructor(
15+
fn: () => PromiseOrValue<T>,
16+
onCancel?: (reason?: unknown) => void,
17+
) {
1518
this._fn = fn;
1619
this._onCancel = onCancel;
1720
}
@@ -51,21 +54,21 @@ export class Computation<T> {
5154
}
5255
}
5356
}
54-
cancel(): void {
57+
cancel(reason?: unknown): void {
5558
const maybePromise = this._maybePromise;
5659
if (!maybePromise) {
5760
this._maybePromise = {
5861
status: 'rejected',
59-
reason: new Error('Cancelled!'),
62+
reason,
6063
};
6164
return;
6265
}
6366
const status = maybePromise.status;
6467
if (status === 'pending' && this._onCancel) {
65-
this._onCancel();
68+
this._onCancel(reason);
6669
this._maybePromise = {
6770
status: 'rejected',
68-
reason: new Error('Cancelled!'),
71+
reason,
6972
};
7073
}
7174
}

src/execution/incremental/IncrementalExecutor.ts

Lines changed: 16 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -321,10 +321,10 @@ export class IncrementalExecutor<
321321
override cancel(reason?: unknown): void {
322322
super.cancel(reason);
323323
for (const task of this.tasks) {
324-
task.computation.cancel();
324+
task.computation.cancel(reason);
325325
}
326326
for (const stream of this.streams) {
327-
stream.queue.abort();
327+
stream.queue.abort(reason);
328328
}
329329
}
330330

@@ -503,7 +503,7 @@ export class IncrementalExecutor<
503503
groupedFieldSet,
504504
deliveryGroupMap,
505505
),
506-
() => executor.cancel(),
506+
(reason) => executor.cancel(reason),
507507
),
508508
};
509509

@@ -581,10 +581,14 @@ export class IncrementalExecutor<
581581
return { groups, tasks, streams };
582582
}
583583

584+
const cancellationReason = new Error(
585+
'Cancelled secondary to null within original result',
586+
);
587+
584588
const filteredTasks: Array<ExecutionGroup> = [];
585589
for (const task of tasks) {
586590
if (collectedErrors.hasNulledPosition(task.path)) {
587-
task.computation.cancel();
591+
task.computation.cancel(cancellationReason);
588592
} else {
589593
filteredTasks.push(task);
590594
}
@@ -593,7 +597,7 @@ export class IncrementalExecutor<
593597
const filteredStreams: Array<ItemStream> = [];
594598
for (const stream of streams) {
595599
if (collectedErrors.hasNulledPosition(stream.path)) {
596-
stream.queue.cancel();
600+
stream.queue.abort(cancellationReason);
597601
} else {
598602
filteredStreams.push(stream);
599603
}
@@ -713,11 +717,13 @@ export class IncrementalExecutor<
713717
const { enableEarlyExecution } = this.validatedExecutionArgs;
714718
const queue = new Queue<StreamItemResult>(
715719
async ({ push, stop, started, stopped }) => {
716-
const cancelStreamItems = new Set<() => void>();
720+
const cancelStreamItems = new Set<(reason?: unknown) => void>();
717721

718722
// eslint-disable-next-line @typescript-eslint/no-floating-promises
719-
stopped.then(() => {
720-
cancelStreamItems.forEach((cancelStreamItem) => cancelStreamItem());
723+
stopped.then((reason) => {
724+
cancelStreamItems.forEach((cancelStreamItem) =>
725+
cancelStreamItem(reason),
726+
);
721727
returnIteratorCatchingErrors(iterator);
722728
});
723729
await (enableEarlyExecution ? Promise.resolve() : started);
@@ -763,7 +769,8 @@ export class IncrementalExecutor<
763769
);
764770
if (isPromise(streamItemResult)) {
765771
if (enableEarlyExecution) {
766-
const cancelStreamItem = () => executor.cancel();
772+
const cancelStreamItem = (reason?: unknown) =>
773+
executor.cancel(reason);
767774
cancelStreamItems.add(cancelStreamItem);
768775
streamItemResult = streamItemResult.finally(() => {
769776
cancelStreamItems.delete(cancelStreamItem);

src/execution/incremental/Queue.ts

Lines changed: 6 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -87,7 +87,7 @@ export class Queue<T> {
8787
private _batchRequests = new Set<BatchRequest<T>>();
8888

8989
private _resolveStarted: () => void;
90-
private _resolveStopped: () => void;
90+
private _resolveStopped: (reason?: unknown) => void;
9191

9292
constructor(
9393
executor: ({
@@ -106,8 +106,7 @@ export class Queue<T> {
106106

107107
this._resolveStarted = resolveStarted;
108108
const { promise: stopped, resolve: resolveStopped } =
109-
// eslint-disable-next-line @typescript-eslint/no-invalid-void-type
110-
promiseWithResolvers<void>();
109+
promiseWithResolvers<unknown>();
111110
this._resolveStopped = resolveStopped;
112111

113112
try {
@@ -152,7 +151,7 @@ export class Queue<T> {
152151
if (this._isStopped) {
153152
return;
154153
}
155-
this._terminate();
154+
this._terminate(reason);
156155
if (this._batchRequests.size) {
157156
this._batchRequests.forEach((request) => request.reject(reason));
158157
this._batchRequests.clear();
@@ -285,7 +284,7 @@ export class Queue<T> {
285284
return maybePushPromise;
286285
}
287286

288-
private _terminate(): void {
287+
private _terminate(reason?: unknown): void {
289288
for (const entry of this._entries) {
290289
if (entry.kind === 'item') {
291290
this._release();
@@ -294,7 +293,7 @@ export class Queue<T> {
294293
this._entries.length = 0;
295294
this._stopRequested = true;
296295
this._isStopped = true;
297-
this._resolveStopped();
296+
this._resolveStopped(reason);
298297
}
299298

300299
private _stop(reason?: unknown): void {
@@ -343,7 +342,7 @@ export class Queue<T> {
343342
this._entries.shift();
344343
this._release();
345344
this._isStopped = true;
346-
this._resolveStopped();
345+
this._resolveStopped(settled.reason);
347346
this._batchRequests = new Set();
348347
requests.forEach((request) => request.reject(settled.reason));
349348
}

src/execution/incremental/WorkQueue.ts

Lines changed: 12 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -238,7 +238,7 @@ export function createWorkQueue<
238238
}
239239
});
240240
// eslint-disable-next-line @typescript-eslint/no-floating-promises
241-
stopped.then(() => cancel());
241+
stopped.then((reason) => cancel(reason));
242242
},
243243
1,
244244
).subscribe((graphEvents) => handleGraphEvents(graphEvents));
@@ -249,39 +249,39 @@ export function createWorkQueue<
249249
events,
250250
};
251251

252-
function cancel(): void {
252+
function cancel(reason?: unknown): void {
253253
for (const group of rootGroups) {
254-
cancelGroup(group);
254+
cancelGroup(group, reason);
255255
}
256256
for (const stream of rootStreams) {
257-
cancelStream(stream);
257+
cancelStream(stream, reason);
258258
}
259259
}
260260

261-
function cancelGroup(group: G): void {
261+
function cancelGroup(group: G, reason?: unknown): void {
262262
const groupNode = groupNodes.get(group);
263263
if (groupNode) {
264264
for (const task of groupNode.tasks) {
265-
cancelTask(task);
265+
cancelTask(task, reason);
266266
}
267267
for (const childGroup of groupNode.childGroups) {
268-
cancelGroup(childGroup);
268+
cancelGroup(childGroup, reason);
269269
}
270270
}
271271
}
272272

273-
function cancelTask(task: Task<T, I, G, S>): void {
274-
task.computation.cancel();
273+
function cancelTask(task: Task<T, I, G, S>, reason?: unknown): void {
274+
task.computation.cancel(reason);
275275
const taskNode = taskNodes.get(task);
276276
if (taskNode) {
277277
for (const childStream of taskNode.childStreams) {
278-
cancelStream(childStream);
278+
cancelStream(childStream, reason);
279279
}
280280
}
281281
}
282282

283-
function cancelStream(stream: S): void {
284-
stream.queue.cancel();
283+
function cancelStream(stream: S, reason?: unknown): void {
284+
stream.queue.abort(reason);
285285
}
286286

287287
function maybeIntegrateWork(

src/execution/incremental/__tests__/Computation-test.ts

Lines changed: 27 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -168,4 +168,31 @@ describe('Computation', () => {
168168
expect(onCancelRan).to.equal(true);
169169
expect(() => computation.result()).to.throw('Cancelled!');
170170
});
171+
172+
it('can be cancelled with a provided reason before running', () => {
173+
const abortReason = new Error('aborted');
174+
const computation = new Computation(() => ({ value: 123 }));
175+
176+
computation.cancel(abortReason);
177+
expect(() => computation.result()).to.throw('aborted');
178+
});
179+
180+
it('forwards cancellation reason to onCancel while running asynchronously', () => {
181+
const abortReason = new Error('aborted');
182+
let onCancelReason: unknown;
183+
const computation = new Computation(
184+
() =>
185+
new Promise(() => {
186+
// Never resolves.
187+
}),
188+
(reason) => {
189+
onCancelReason = reason;
190+
},
191+
);
192+
193+
computation.prime();
194+
computation.cancel(abortReason);
195+
expect(onCancelReason).to.equal(abortReason);
196+
expect(() => computation.result()).to.throw('aborted');
197+
});
171198
});

src/execution/incremental/__tests__/Queue-test.ts

Lines changed: 19 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -157,6 +157,19 @@ describe('Queue', () => {
157157
expect(await sub.next()).to.deep.equal({ done: true, value: undefined });
158158
});
159159

160+
it('cancel is a no-op after stopping', async () => {
161+
const queue = new Queue(({ stop }) => {
162+
stop();
163+
});
164+
165+
const sub = queue.subscribe();
166+
167+
expect(queue.isStopped()).to.equal(true);
168+
queue.cancel();
169+
170+
expect(await sub.next()).to.deep.equal({ done: true, value: undefined });
171+
});
172+
160173
it('abort is a no-op after stopping', async () => {
161174
const queue = new Queue(({ stop }) => {
162175
stop();
@@ -643,14 +656,17 @@ describe('Queue', () => {
643656
it('stops in an error state when calling stopped with a reason, i.e. the last call to next to reject with that reason', async () => {
644657
let stoppedPromise!: Promise<unknown>;
645658
let stopped = false;
659+
let stoppedReason: unknown;
660+
const stopReason = new Error('Oops');
646661
const sub = new Queue(({ push, stop, stopped: _stoppedPromise }) => {
647662
stoppedPromise = _stoppedPromise;
648663

649-
stoppedPromise.then(() => {
664+
stoppedPromise.then((reason) => {
650665
stopped = true;
666+
stoppedReason = reason;
651667
});
652668
push(1);
653-
stop(new Error('Oops'));
669+
stop(stopReason);
654670
}).subscribe();
655671

656672
expect(stopped).to.equal(false);
@@ -659,6 +675,7 @@ describe('Queue', () => {
659675
await expectPromise(sub.next()).toRejectWith('Oops');
660676

661677
expect(stopped).to.equal(true);
678+
expect(stoppedReason).to.equal(stopReason);
662679
});
663680

664681
it('cancels existing requests when calling cancel', async () => {

src/execution/incremental/__tests__/WorkQueue-test.ts

Lines changed: 58 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
import { expect } from 'chai';
22
import { describe, it } from 'mocha';
33

4+
import { expectPromise } from '../../../__testUtils__/expectPromise.js';
45
import { resolveOnNextTick } from '../../../__testUtils__/resolveOnNextTick.js';
56

67
import { isPromise } from '../../../jsutils/isPromise.js';
@@ -1266,4 +1267,61 @@ describe('WorkQueue', () => {
12661267
expect(childTaskCancelled).to.equal(true);
12671268
expect(childStreamCancelled).to.equal(true);
12681269
});
1270+
1271+
it('forwards throw reason when cancelling nested work', async () => {
1272+
const root: TestGroup = { parent: undefined };
1273+
const rootStream = streamFrom([{ value: 1 }]);
1274+
const abortReason = new Error('Abort nested work');
1275+
1276+
let childStreamCancelReason: unknown;
1277+
const childStreamQueue = new Queue<TestStreamItem>(({ stopped }) => {
1278+
// eslint-disable-next-line @typescript-eslint/no-floating-promises
1279+
stopped.then((reason) => {
1280+
childStreamCancelReason = reason;
1281+
});
1282+
});
1283+
const childStream: TestStream = { queue: childStreamQueue };
1284+
1285+
let childTaskCancelReason: unknown;
1286+
const childTask: Task<
1287+
TestTaskValue,
1288+
TestStreamItemValue,
1289+
TestGroup,
1290+
TestStream
1291+
> = {
1292+
groups: [root],
1293+
computation: new Computation(
1294+
() =>
1295+
new Promise(() => {
1296+
// never resolves
1297+
}),
1298+
(reason) => {
1299+
childTaskCancelReason = reason;
1300+
},
1301+
),
1302+
};
1303+
1304+
const rootTask = makeTask([root], () => ({
1305+
value: 'root',
1306+
work: { streams: [childStream] },
1307+
}));
1308+
1309+
const workQueue = createWorkQueue({
1310+
groups: [root],
1311+
tasks: [rootTask, childTask],
1312+
streams: [rootStream],
1313+
});
1314+
1315+
const iterator = workQueue.events[Symbol.asyncIterator]();
1316+
await iterator.next();
1317+
1318+
await expectPromise(iterator.throw(abortReason)).toRejectWith(
1319+
'Abort nested work',
1320+
);
1321+
1322+
await resolveOnNextTick();
1323+
1324+
expect(childTaskCancelReason).to.equal(abortReason);
1325+
expect(childStreamCancelReason).to.equal(abortReason);
1326+
});
12691327
});

0 commit comments

Comments
 (0)