Skip to content

Commit 6eee8ff

Browse files
authored
polish: clean up abort listener without checking hasNext (#4542)
1 parent 3ed4e11 commit 6eee8ff

1 file changed

Lines changed: 10 additions & 10 deletions

File tree

src/execution/incremental/IncrementalPublisher.ts

Lines changed: 10 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -36,7 +36,6 @@ interface SubsequentIncrementalExecutionResultContext {
3636
export class IncrementalPublisher {
3737
private _ids: Map<DeliveryGroup | ItemStream, string>;
3838
private _nextId: number;
39-
private _cleanUp: (() => void) | undefined;
4039

4140
constructor() {
4241
this._ids = new Map();
@@ -62,11 +61,11 @@ export class IncrementalPublisher {
6261
});
6362
}
6463

64+
let onWorkQueueFinished: (() => void) | undefined;
6565
if (abortSignal) {
6666
abortSignal.addEventListener('abort', abort);
67-
this._cleanUp = () => {
67+
onWorkQueueFinished = () =>
6868
abortSignal.removeEventListener('abort', abort);
69-
};
7069
}
7170

7271
const pending = this._toPendingResults(initialGroups, initialStreams);
@@ -76,8 +75,10 @@ export class IncrementalPublisher {
7675
: { data, pending, hasNext: true };
7776

7877
const subsequentResults = withConcurrentAbruptClose(
79-
mapAsyncIterable(events, (batch) => this._handleBatch(batch)),
80-
() => this._cleanUp?.(),
78+
mapAsyncIterable(events, (batch) =>
79+
this._handleBatch(batch, onWorkQueueFinished),
80+
),
81+
() => onWorkQueueFinished?.(),
8182
);
8283

8384
return {
@@ -128,6 +129,7 @@ export class IncrementalPublisher {
128129
ItemStream
129130
>
130131
>,
132+
onWorkQueueFinished: (() => void) | undefined,
131133
): SubsequentIncrementalExecutionResult {
132134
const context: SubsequentIncrementalExecutionResultContext = {
133135
pending: [],
@@ -137,7 +139,7 @@ export class IncrementalPublisher {
137139
};
138140

139141
for (const event of batch) {
140-
this._handleWorkQueueEvent(event, context);
142+
this._handleWorkQueueEvent(event, context, onWorkQueueFinished);
141143
}
142144

143145
const { incremental, completed, pending, hasNext } = context;
@@ -153,10 +155,6 @@ export class IncrementalPublisher {
153155
result.completed = completed;
154156
}
155157

156-
if (!hasNext) {
157-
this._cleanUp?.();
158-
}
159-
160158
return result;
161159
}
162160

@@ -168,6 +166,7 @@ export class IncrementalPublisher {
168166
ItemStream
169167
>,
170168
context: SubsequentIncrementalExecutionResultContext,
169+
onWorkQueueFinished: (() => void) | undefined,
171170
): void {
172171
switch (event.kind) {
173172
case 'GROUP_VALUES': {
@@ -255,6 +254,7 @@ export class IncrementalPublisher {
255254
break;
256255
}
257256
case 'WORK_QUEUE_TERMINATION': {
257+
onWorkQueueFinished?.();
258258
context.hasNext = false;
259259
break;
260260
}

0 commit comments

Comments
 (0)