Skip to content

Commit 87e2929

Browse files
authored
refactor(workqueue): propagate async computation abort cleanup (#4640)
1 parent 06953e6 commit 87e2929

File tree

5 files changed

+87
-18
lines changed

5 files changed

+87
-18
lines changed

src/execution/incremental/Computation.ts

Lines changed: 7 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -9,11 +9,11 @@ type MaybePromise<T> =
99
/** @internal **/
1010
export class Computation<T> {
1111
private _fn: () => PromiseOrValue<T>;
12-
private _onAbort: ((reason?: unknown) => void) | undefined;
12+
private _onAbort: ((reason?: unknown) => PromiseOrValue<void>) | undefined;
1313
private _maybePromise?: MaybePromise<T>;
1414
constructor(
1515
fn: () => PromiseOrValue<T>,
16-
onAbort?: (reason?: unknown) => void,
16+
onAbort?: (reason?: unknown) => PromiseOrValue<void>,
1717
) {
1818
this._fn = fn;
1919
this._onAbort = onAbort;
@@ -54,7 +54,7 @@ export class Computation<T> {
5454
}
5555
}
5656
}
57-
abort(reason?: unknown): void {
57+
abort(reason?: unknown): PromiseOrValue<void> {
5858
const maybePromise = this._maybePromise;
5959
if (!maybePromise) {
6060
this._maybePromise = {
@@ -64,12 +64,14 @@ export class Computation<T> {
6464
return;
6565
}
6666
const status = maybePromise.status;
67-
if (status === 'pending' && this._onAbort) {
68-
this._onAbort(reason);
67+
if (status === 'pending') {
6968
this._maybePromise = {
7069
status: 'rejected',
7170
reason,
7271
};
72+
if (this._onAbort) {
73+
return this._onAbort(reason);
74+
}
7375
}
7476
}
7577
}

src/execution/incremental/IncrementalExecutor.ts

Lines changed: 14 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -321,7 +321,13 @@ export class IncrementalExecutor<
321321
override cancel(reason?: unknown): void {
322322
super.cancel(reason);
323323
for (const task of this.tasks) {
324-
task.computation.abort(reason);
324+
const aborted = task.computation.abort(reason);
325+
/* c8 ignore start */
326+
// TODO: add coverage
327+
if (isPromise(aborted)) {
328+
aborted.catch(() => undefined);
329+
}
330+
/* c8 ignore stop */
325331
}
326332
for (const stream of this.streams) {
327333
const aborted = stream.queue.abort(reason);
@@ -594,7 +600,13 @@ export class IncrementalExecutor<
594600
const filteredTasks: Array<ExecutionGroup> = [];
595601
for (const task of tasks) {
596602
if (collectedErrors.hasNulledPosition(task.path)) {
597-
task.computation.abort(cancellationReason);
603+
const aborted = task.computation.abort(cancellationReason);
604+
/* c8 ignore start */
605+
// TODO: add coverage
606+
if (isPromise(aborted)) {
607+
aborted.catch(() => undefined);
608+
}
609+
/* c8 ignore stop */
598610
} else {
599611
filteredTasks.push(task);
600612
}

src/execution/incremental/WorkQueue.ts

Lines changed: 7 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -282,7 +282,10 @@ export function createWorkQueue<
282282
reason: unknown,
283283
cancelPromises: Array<Promise<unknown>>,
284284
): void {
285-
task.computation.abort(reason);
285+
const abortResult = task.computation.abort(reason);
286+
if (isPromise(abortResult)) {
287+
cancelPromises.push(abortResult);
288+
}
286289
const taskNode = taskNodes.get(task);
287290
if (taskNode) {
288291
for (const childStream of taskNode.childStreams) {
@@ -296,10 +299,9 @@ export function createWorkQueue<
296299
reason: unknown,
297300
cancelPromises: Array<Promise<unknown>>,
298301
): void {
299-
const cancelResult =
300-
reason === undefined ? stream.queue.cancel() : stream.queue.abort(reason);
301-
if (isPromise(cancelResult)) {
302-
cancelPromises.push(cancelResult);
302+
const abortResult = stream.queue.abort(reason);
303+
if (isPromise(abortResult)) {
304+
cancelPromises.push(abortResult);
303305
}
304306
}
305307

src/execution/incremental/__tests__/Computation-test.ts

Lines changed: 54 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -4,8 +4,20 @@ import { describe, it } from 'mocha';
44
import { expectPromise } from '../../../__testUtils__/expectPromise.js';
55
import { resolveOnNextTick } from '../../../__testUtils__/resolveOnNextTick.js';
66

7+
import { isPromise } from '../../../jsutils/isPromise.js';
8+
79
import { Computation } from '../Computation.js';
810

11+
function abortIgnoringCleanup(
12+
computation: Computation<unknown>,
13+
reason?: unknown,
14+
): void {
15+
const aborted = computation.abort(reason);
16+
if (isPromise(aborted)) {
17+
aborted.catch(() => undefined);
18+
}
19+
}
20+
921
describe('Computation', () => {
1022
it('can return a result', () => {
1123
const computation = new Computation(() => ({ value: 123 }));
@@ -114,7 +126,7 @@ describe('Computation', () => {
114126
onAbortRan = true;
115127
},
116128
);
117-
computation.abort();
129+
abortIgnoringCleanup(computation);
118130
expect(() => computation.result()).to.throw('Cancelled!');
119131
expect(onAbortRan).to.equal(false);
120132
});
@@ -129,7 +141,7 @@ describe('Computation', () => {
129141
);
130142

131143
computation.prime();
132-
computation.abort();
144+
abortIgnoringCleanup(computation);
133145
expect(computation.result()).to.deep.equal({ value: 123 });
134146
expect(onAbortRan).to.equal(false);
135147
});
@@ -146,7 +158,7 @@ describe('Computation', () => {
146158
);
147159

148160
computation.prime();
149-
computation.abort();
161+
abortIgnoringCleanup(computation);
150162
expect(() => computation.result()).to.throw('failure');
151163
expect(onAbortRan).to.equal(false);
152164
});
@@ -164,16 +176,52 @@ describe('Computation', () => {
164176
);
165177

166178
computation.prime();
167-
computation.abort();
179+
abortIgnoringCleanup(computation);
168180
expect(onAbortRan).to.equal(true);
169181
expect(() => computation.result()).to.throw('Cancelled!');
170182
});
171183

184+
it('returns async abort cleanup while running', async () => {
185+
let resolveCleanup!: () => void;
186+
const cleanupPromise = new Promise<void>((resolve) => {
187+
resolveCleanup = resolve;
188+
});
189+
const computation = new Computation(
190+
() =>
191+
new Promise(() => {
192+
// Never resolves.
193+
}),
194+
() => cleanupPromise,
195+
);
196+
197+
computation.prime();
198+
const abortResult = computation.abort();
199+
expect(abortResult).to.equal(cleanupPromise);
200+
expect(isPromise(abortResult)).to.equal(true);
201+
if (!isPromise(abortResult)) {
202+
throw new Error('Expected async abort cleanup promise.');
203+
}
204+
205+
let abortSettled = false;
206+
abortResult.then(
207+
() => {
208+
abortSettled = true;
209+
},
210+
() => {
211+
abortSettled = true;
212+
},
213+
);
214+
expect(abortSettled).to.equal(false);
215+
resolveCleanup();
216+
await abortResult;
217+
expect(abortSettled).to.equal(true);
218+
});
219+
172220
it('can be aborted with a provided reason before running', () => {
173221
const abortReason = new Error('aborted');
174222
const computation = new Computation(() => ({ value: 123 }));
175223

176-
computation.abort(abortReason);
224+
abortIgnoringCleanup(computation, abortReason);
177225
expect(() => computation.result()).to.throw('aborted');
178226
});
179227

@@ -191,7 +239,7 @@ describe('Computation', () => {
191239
);
192240

193241
computation.prime();
194-
computation.abort(abortReason);
242+
abortIgnoringCleanup(computation, abortReason);
195243
expect(onAbortReason).to.equal(abortReason);
196244
expect(() => computation.result()).to.throw('aborted');
197245
});

src/execution/incremental/__tests__/WorkQueue-test.ts

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1228,6 +1228,9 @@ describe('WorkQueue', () => {
12281228
const childStream: TestStream = { queue: childStreamQueue };
12291229

12301230
let childTaskCancelled = false;
1231+
const { promise: childTaskCleanup, resolve: resolveChildTaskCleanup } =
1232+
// eslint-disable-next-line @typescript-eslint/no-invalid-void-type
1233+
promiseWithResolvers<void>();
12311234
const childTask: Task<
12321235
TestTaskValue,
12331236
TestStreamItemValue,
@@ -1242,6 +1245,7 @@ describe('WorkQueue', () => {
12421245
}),
12431246
() => {
12441247
childTaskCancelled = true;
1248+
return childTaskCleanup;
12451249
},
12461250
),
12471251
};
@@ -1278,6 +1282,7 @@ describe('WorkQueue', () => {
12781282
await resolveOnNextTick();
12791283

12801284
resolveChildStreamCleanup();
1285+
resolveChildTaskCleanup();
12811286

12821287
expect(await returnPromise).to.deep.equal({
12831288
value: undefined,

0 commit comments

Comments
 (0)