Skip to content

Commit 3ea5c10

Browse files
authored
refactor(queue): replace stopped promise with onStop handlers (#4637)
motivation: - makes stop reason propagation explicit. - makes cleanup settling explicit and awaitable across stop/cancel/abort paths.
1 parent e89ca8c commit 3ea5c10

File tree

7 files changed

+694
-159
lines changed

7 files changed

+694
-159
lines changed

src/execution/incremental/IncrementalExecutor.ts

Lines changed: 37 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -324,7 +324,13 @@ export class IncrementalExecutor<
324324
task.computation.cancel(reason);
325325
}
326326
for (const stream of this.streams) {
327-
stream.queue.abort(reason);
327+
const aborted = stream.queue.abort(reason);
328+
/* c8 ignore start */
329+
// TODO: add coverage
330+
if (isPromise(aborted)) {
331+
aborted.catch(() => undefined);
332+
}
333+
/* c8 ignore stop */
328334
}
329335
}
330336

@@ -597,7 +603,13 @@ export class IncrementalExecutor<
597603
const filteredStreams: Array<ItemStream> = [];
598604
for (const stream of streams) {
599605
if (collectedErrors.hasNulledPosition(stream.path)) {
600-
stream.queue.abort(cancellationReason);
606+
const aborted = stream.queue.abort(cancellationReason);
607+
/* c8 ignore start */
608+
// TODO: add coverage
609+
if (isPromise(aborted)) {
610+
aborted.catch(() => undefined);
611+
}
612+
/* c8 ignore stop */
601613
} else {
602614
filteredStreams.push(stream);
603615
}
@@ -716,18 +728,22 @@ export class IncrementalExecutor<
716728
): Queue<StreamItemResult> {
717729
const { enableEarlyExecution } = this.validatedExecutionArgs;
718730
const queue = new Queue<StreamItemResult>(
719-
async ({ push, stop, started, stopped }) => {
731+
async ({ push, stop, onStop, started }) => {
720732
const cancelStreamItems = new Set<(reason?: unknown) => void>();
721-
722-
// eslint-disable-next-line @typescript-eslint/no-floating-promises
723-
stopped.then((reason) => {
724-
cancelStreamItems.forEach((cancelStreamItem) =>
725-
cancelStreamItem(reason),
726-
);
733+
let finishedNormally = false;
734+
let stopRequested = false;
735+
736+
onStop((reason) => {
737+
stopRequested = true;
738+
if (!finishedNormally) {
739+
cancelStreamItems.forEach((cancelStreamItem) =>
740+
cancelStreamItem(reason),
741+
);
742+
}
727743
returnIteratorCatchingErrors(iterator);
728744
});
729745
await (enableEarlyExecution ? Promise.resolve() : started);
730-
if (queue.isStopped()) {
746+
if (stopRequested) {
731747
return;
732748
}
733749
let index = initialIndex;
@@ -737,7 +753,7 @@ export class IncrementalExecutor<
737753
if (isAsync) {
738754
// eslint-disable-next-line no-await-in-loop
739755
iteration = await iterator.next();
740-
if (queue.isStopped()) {
756+
if (stopRequested) {
741757
return;
742758
}
743759
} else {
@@ -752,7 +768,14 @@ export class IncrementalExecutor<
752768
}
753769

754770
if (iteration.done) {
755-
stop();
771+
finishedNormally = true;
772+
const stopped = stop();
773+
/* c8 ignore start */
774+
// TODO: add coverage
775+
if (isPromise(stopped)) {
776+
stopped.catch(() => undefined);
777+
}
778+
/* c8 ignore stop */
756779
return;
757780
}
758781

@@ -778,7 +801,7 @@ export class IncrementalExecutor<
778801
} else {
779802
// eslint-disable-next-line no-await-in-loop
780803
streamItemResult = await streamItemResult;
781-
if (queue.isStopped()) {
804+
if (stopRequested) {
782805
return;
783806
}
784807
}
@@ -787,7 +810,7 @@ export class IncrementalExecutor<
787810
if (isPromise(pushResult)) {
788811
// eslint-disable-next-line no-await-in-loop
789812
await pushResult;
790-
if (queue.isStopped()) {
813+
if (stopRequested) {
791814
return;
792815
}
793816
}

src/execution/incremental/Queue.ts

Lines changed: 105 additions & 53 deletions
Original file line numberDiff line numberDiff line change
@@ -27,9 +27,9 @@ interface BatchRequest<T> {
2727

2828
interface QueueExecutorOptions<T> {
2929
push: (item: PromiseOrValue<T>) => PromiseOrValue<void>;
30-
stop: (reason?: unknown) => void;
30+
stop: (reason?: unknown) => PromiseOrValue<void>;
31+
onStop: (cleanup: (reason?: unknown) => PromiseOrValue<void>) => void;
3132
started: Promise<void>;
32-
stopped: Promise<unknown>;
3333
}
3434

3535
/**
@@ -40,12 +40,12 @@ interface QueueExecutorOptions<T> {
4040
* live somewhere in between.
4141
*
4242
* The constructor takes an executor function and an optional `initialCapacity`.
43-
* Executors receive `{ push, stop, started, stopped }` and may return `void` or
43+
* Executors receive `{ push, stop, onStop, started }` and may return `void` or
4444
* a promise if they perform asynchronous setup. They call `push` whenever
4545
* another item is ready, call `stop` when no more values will be produced
46-
* (optionally supplying an error), await `started` when setup should run only
47-
* after iteration begins, and await `stopped` to observe when the queue
48-
* terminates. Because `push` and `stop` are plain functions, executors can
46+
* (optionally supplying an error), register stop-time cleanup via `onStop`,
47+
* and await `started` when setup should run only after iteration begins.
48+
* Because `push`, `stop`, and `onStop` are plain functions, executors can
4949
* hoist them into outside scopes or pass them to helpers. If the executor
5050
* throws or its returned promise rejects, the queue treats it as `stop(error)`
5151
* and propagates the failure.
@@ -84,17 +84,20 @@ export class Queue<T> {
8484
private _entries: Array<Entry<T>> = [];
8585
private _isStopped = false;
8686
private _stopRequested = false;
87+
private _stopCleanupCallbacks: Array<
88+
(reason?: unknown) => PromiseOrValue<void>
89+
> = [];
90+
private _stopCompletion: Promise<void> | undefined;
8791
private _batchRequests = new Set<BatchRequest<T>>();
8892

8993
private _resolveStarted: () => void;
90-
private _resolveStopped: (reason?: unknown) => void;
9194

9295
constructor(
9396
executor: ({
9497
push,
9598
stop,
99+
onStop,
96100
started,
97-
stopped,
98101
}: QueueExecutorOptions<T>) => PromiseOrValue<void>,
99102
initialCapacity = 1,
100103
) {
@@ -105,22 +108,25 @@ export class Queue<T> {
105108
promiseWithResolvers<void>();
106109

107110
this._resolveStarted = resolveStarted;
108-
const { promise: stopped, resolve: resolveStopped } =
109-
promiseWithResolvers<unknown>();
110-
this._resolveStopped = resolveStopped;
111111

112112
try {
113113
const result = executor({
114114
push: this._push.bind(this),
115115
stop: this._stop.bind(this),
116+
onStop: this._onStop.bind(this),
116117
started,
117-
stopped,
118118
});
119119
if (isPromise(result)) {
120120
result.catch((error: unknown) => this._stop(error));
121121
}
122122
} catch (error) {
123-
this._stop(error);
123+
const stopped = this._stop(error);
124+
/* c8 ignore start */
125+
// TODO: add coverage
126+
if (isPromise(stopped)) {
127+
stopped.catch(() => undefined);
128+
}
129+
/* c8 ignore stop */
124130
}
125131
}
126132

@@ -138,29 +144,33 @@ export class Queue<T> {
138144
);
139145
}
140146

141-
cancel(): void {
142-
if (this._isStopped) {
143-
return;
147+
cancel(): PromiseOrValue<void> {
148+
if (this._stopRequested) {
149+
return this._stopCompletion;
144150
}
145-
this._terminate();
146-
this._batchRequests.forEach((request) => request.resolve(undefined));
147-
this._batchRequests.clear();
151+
return this._terminate(undefined, () => {
152+
this._isStopped = true;
153+
this._batchRequests.forEach((request) => request.resolve(undefined));
154+
this._batchRequests.clear();
155+
});
148156
}
149157

150-
abort(reason?: unknown): void {
151-
if (this._isStopped) {
152-
return;
153-
}
154-
this._terminate(reason);
155-
if (this._batchRequests.size) {
156-
this._batchRequests.forEach((request) => request.reject(reason));
157-
this._batchRequests.clear();
158-
return;
158+
abort(reason?: unknown): PromiseOrValue<void> {
159+
if (this._stopRequested) {
160+
return this._stopCompletion;
159161
}
160-
// save rejection for later batch requests
161-
this._entries.push({
162-
kind: 'item',
163-
settled: { status: 'rejected', reason },
162+
return this._terminate(reason, () => {
163+
this._isStopped = true;
164+
if (this._batchRequests.size) {
165+
this._batchRequests.forEach((request) => request.reject(reason));
166+
this._batchRequests.clear();
167+
return;
168+
}
169+
// save rejection for later batch requests
170+
this._entries.push({
171+
kind: 'item',
172+
settled: { status: 'rejected', reason },
173+
});
164174
});
165175
}
166176

@@ -226,6 +236,44 @@ export class Queue<T> {
226236
this._flush();
227237
}
228238

239+
private _onStop(cleanup: (reason?: unknown) => PromiseOrValue<void>): void {
240+
if (this._stopRequested) {
241+
throw new Error(
242+
'Cannot register onStop cleanup after stop has been requested.',
243+
);
244+
}
245+
this._stopCleanupCallbacks.push(cleanup);
246+
}
247+
248+
private _runStopCleanup(
249+
reason: unknown,
250+
afterCleanup: () => void,
251+
): PromiseOrValue<void> {
252+
this._stopRequested = true;
253+
const cleanupPromises = this._stopCleanupCallbacks.flatMap(
254+
(cleanupCallback): Array<Promise<unknown>> => {
255+
try {
256+
const result = cleanupCallback(reason);
257+
return isPromise(result) ? [result] : [];
258+
} /* c8 ignore start */ catch {
259+
// ignore errors
260+
return [];
261+
} /* c8 ignore stop */
262+
},
263+
);
264+
const cleanup =
265+
cleanupPromises.length > 0
266+
? Promise.allSettled(cleanupPromises).then(() => undefined)
267+
: undefined;
268+
if (isPromise(cleanup)) {
269+
this._stopCompletion = cleanup
270+
.then(afterCleanup, afterCleanup)
271+
.then(() => undefined);
272+
return this._stopCompletion;
273+
}
274+
afterCleanup();
275+
}
276+
229277
private async *_iteratorLoop<U>(
230278
reducer: (
231279
generator: Generator<T, void, void>,
@@ -284,42 +332,48 @@ export class Queue<T> {
284332
return maybePushPromise;
285333
}
286334

287-
private _terminate(reason?: unknown): void {
335+
private _terminate(
336+
reason: unknown,
337+
afterCleanup: () => void,
338+
): PromiseOrValue<void> {
288339
for (const entry of this._entries) {
289340
if (entry.kind === 'item') {
290341
this._release();
291342
}
292343
}
293344
this._entries.length = 0;
294-
this._stopRequested = true;
295-
this._isStopped = true;
296-
this._resolveStopped(reason);
345+
return this._runStopCleanup(reason, afterCleanup);
297346
}
298347

299-
private _stop(reason?: unknown): void {
348+
private _stop(reason?: unknown): PromiseOrValue<void> {
300349
if (this._stopRequested) {
301-
return;
350+
return this._stopCompletion;
302351
}
303-
this._stopRequested = true;
304-
if (reason === undefined) {
305-
if (this._entries.length === 0) {
306-
this._isStopped = true;
307-
this._resolveStopped();
352+
const stopCompletion = this._runStopCleanup(reason, () => {
353+
if (reason === undefined) {
354+
if (this._entries.length === 0) {
355+
this._isStopped = true;
356+
this._deliverBatchIfReady();
357+
return;
358+
}
359+
360+
this._entries.push({ kind: 'stop' });
308361
this._deliverBatchIfReady();
309362
return;
310363
}
311364

365+
this._entries.push({
366+
kind: 'item',
367+
settled: { status: 'rejected', reason },
368+
});
312369
this._entries.push({ kind: 'stop' });
313370
this._deliverBatchIfReady();
314-
return;
315-
}
316-
317-
this._entries.push({
318-
kind: 'item',
319-
settled: { status: 'rejected', reason },
320371
});
321-
this._entries.push({ kind: 'stop' });
322-
this._deliverBatchIfReady();
372+
373+
if (isPromise(stopCompletion)) {
374+
stopCompletion.catch(() => undefined);
375+
}
376+
return stopCompletion;
323377
}
324378

325379
private _deliverBatchIfReady(): void {
@@ -342,7 +396,6 @@ export class Queue<T> {
342396
this._entries.shift();
343397
this._release();
344398
this._isStopped = true;
345-
this._resolveStopped(settled.reason);
346399
this._batchRequests = new Set();
347400
requests.forEach((request) => request.reject(settled.reason));
348401
}
@@ -361,7 +414,6 @@ export class Queue<T> {
361414
if (entry.kind === 'stop') {
362415
this._isStopped = true;
363416
this._entries.shift();
364-
this._resolveStopped();
365417
return;
366418
}
367419
const settled = entry.settled;

0 commit comments

Comments
 (0)