Skip to content

Commit 1dc7a08

Browse files
authored
reduce ticks stemming from withCleanup (#4526)
replace withCleanup() with more targeted withConcurrentAbruptClose(). next() calls are no longer wrapped, reducing extra ticks = refactors mapAsyncIterable to use the more targeted function = refactors IncrementalPublisher to also perform required clean up on a normal close. * this change also alters the behavior of a stream test to no longer pass spuriously and allows it to be further streamlined
1 parent cb6abca commit 1dc7a08

10 files changed

Lines changed: 201 additions & 129 deletions

src/execution/IncrementalPublisher.ts

Lines changed: 17 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,7 @@ import type {
2323
SubsequentIncrementalExecutionResult,
2424
} from './types.js';
2525
import { isCompletedExecutionGroup, isFailedExecutionGroup } from './types.js';
26-
import { withCleanup } from './withCleanup.js';
26+
import { withConcurrentAbruptClose } from './withConcurrentAbruptClose.js';
2727

2828
// eslint-disable-next-line max-params
2929
export function buildIncrementalResponse(
@@ -99,13 +99,17 @@ class IncrementalPublisher {
9999

100100
return {
101101
initialResult,
102-
subsequentResults: withCleanup(subsequentResults, async () => {
103-
this._abortSignalListener?.disconnect();
104-
await this._returnAsyncIteratorsIgnoringErrors();
105-
}),
102+
subsequentResults: withConcurrentAbruptClose(subsequentResults, () =>
103+
this._cleanUp(),
104+
),
106105
};
107106
}
108107

108+
private async _cleanUp(): Promise<void> {
109+
this._abortSignalListener?.disconnect();
110+
await this._returnAsyncIteratorsIgnoringErrors();
111+
}
112+
109113
private _ensureId(deliveryGroup: DeliveryGroup): string {
110114
return (deliveryGroup.id ??= String(this._nextId++));
111115
}
@@ -160,6 +164,14 @@ class IncrementalPublisher {
160164
if (completed.length > 0) {
161165
subsequentIncrementalExecutionResult.completed = completed;
162166
}
167+
168+
if (!hasNext) {
169+
this._cleanUp().catch(() => {
170+
/* c8 ignore next 2 */
171+
// ignore error
172+
});
173+
}
174+
163175
return subsequentIncrementalExecutionResult;
164176
}
165177

src/execution/Queue.ts

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,7 @@ import { isPromise } from '../jsutils/isPromise.js';
22
import type { PromiseOrValue } from '../jsutils/PromiseOrValue.js';
33
import { promiseWithResolvers } from '../jsutils/promiseWithResolvers.js';
44

5-
import { withCleanup } from './withCleanup.js';
5+
import { withConcurrentAbruptClose } from './withConcurrentAbruptClose.js';
66

77
/**
88
* @internal
@@ -42,7 +42,9 @@ export class Queue<T> {
4242
subscribe<U>(
4343
mapFn: (generator: Generator<T>) => U | undefined,
4444
): AsyncGenerator<U, void, void> {
45-
return withCleanup(this.subscribeImpl(mapFn), () => this.stop());
45+
return withConcurrentAbruptClose(this.subscribeImpl(mapFn), () =>
46+
this.stop(),
47+
);
4648
}
4749

4850
private async *subscribeImpl<U>(

src/execution/__tests__/Queue-test.ts

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -84,8 +84,8 @@ describe('Queue', () => {
8484
const sub = queue.subscribe((batch) => Array.from(batch)[0]);
8585
expect(await sub.next()).to.deep.equal({ done: false, value: 1 });
8686
expect(await sub.next()).to.deep.equal({ done: false, value: 4 });
87-
expect(await sub.next()).to.deep.equal({ done: false, value: 16 });
88-
expect(await sub.next()).to.deep.equal({ done: false, value: 28 });
87+
expect(await sub.next()).to.deep.equal({ done: false, value: 13 });
88+
expect(await sub.next()).to.deep.equal({ done: false, value: 22 });
8989
});
9090

9191
it('should allow the executor to indicate completion', async () => {
@@ -180,8 +180,8 @@ describe('Queue', () => {
180180
}
181181
});
182182
expect(await sub.next()).to.deep.equal({ done: false, value: [2] });
183-
expect(await sub.next()).to.deep.equal({ done: false, value: [8] });
184-
expect(await sub.next()).to.deep.equal({ done: false, value: [14] });
183+
expect(await sub.next()).to.deep.equal({ done: false, value: [6] });
184+
expect(await sub.next()).to.deep.equal({ done: false, value: [10] });
185185
});
186186

187187
it('should condense pushes during map into the same batch', async () => {

src/execution/__tests__/simplePubSub.ts

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
import { promiseWithResolvers } from '../../jsutils/promiseWithResolvers.js';
22

3-
import { withCleanup } from '../withCleanup.js';
3+
import { withConcurrentAbruptClose } from '../withConcurrentAbruptClose.js';
44

55
/**
66
* Create an AsyncIterator from an EventEmitter. Useful for mocking a
@@ -59,7 +59,7 @@ export class SimplePubSub<T> {
5959
}
6060
}
6161

62-
return withCleanup(getSubscriberImpl(), emptyQueue);
62+
return withConcurrentAbruptClose(getSubscriberImpl(), emptyQueue);
6363

6464
function pushValue(event: T): void {
6565
const value: R = transform(event);

src/execution/__tests__/stream-test.ts

Lines changed: 7 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -1795,23 +1795,18 @@ describe('Execute: stream directive', () => {
17951795

17961796
it('Returns iterator and ignores errors when stream payloads are filtered', async () => {
17971797
let returned = false;
1798-
let requested = false;
17991798
const iterable = {
18001799
[Symbol.asyncIterator]: () => ({
18011800
next: () => {
1802-
/* c8 ignore start */
1803-
if (requested) {
1804-
// stream is filtered, next is not called, and so this is not reached.
1805-
return Promise.reject(new Error('Oops'));
1806-
} /* c8 ignore stop */
1807-
requested = true;
1808-
const friend = friends[0];
1801+
if (returned) {
1802+
return Promise.resolve({
1803+
done: true,
1804+
value: undefined,
1805+
});
1806+
}
18091807
return Promise.resolve({
18101808
done: false,
1811-
value: {
1812-
name: friend.name,
1813-
nonNullName: null,
1814-
},
1809+
value: friends[0],
18151810
});
18161811
},
18171812
return: () => {

src/execution/__tests__/withCleanup-test.ts renamed to src/execution/__tests__/withConcurrentAbruptClose-test.ts

Lines changed: 73 additions & 32 deletions
Original file line numberDiff line numberDiff line change
@@ -3,81 +3,122 @@ import { describe, it } from 'mocha';
33

44
import { expectPromise } from '../../__testUtils__/expectPromise.js';
55

6-
import { withCleanup } from '../withCleanup.js';
6+
import { withConcurrentAbruptClose } from '../withConcurrentAbruptClose.js';
77

88
/* eslint-disable @typescript-eslint/require-await */
9-
describe('withCleanup', () => {
10-
it('calls cleanup function when completes', async () => {
9+
describe('withConcurrentAbruptClose', () => {
10+
it('calls function when returned', async () => {
1111
async function* source() {
1212
yield 1;
1313
}
1414

1515
let done = false;
16-
const generator = withCleanup(source(), () => {
16+
17+
const generator = withConcurrentAbruptClose(source(), () => {
1718
done = true;
1819
});
1920

2021
expect(await generator.next()).to.deep.equal({ value: 1, done: false });
2122
expect(done).to.equal(false);
22-
expect(await generator.next()).to.deep.equal({
23+
expect(await generator.return()).to.deep.equal({
2324
value: undefined,
2425
done: true,
2526
});
2627
expect(done).to.equal(true);
2728
});
2829

29-
it('calls cleanup function when completes with error', async () => {
30+
it('ignores sync errors when returned', async () => {
3031
async function* source() {
3132
yield 1;
32-
throw new Error('Oops');
3333
}
3434

35-
let done = false;
36-
const generator = withCleanup(source(), () => {
37-
done = true;
35+
const generator = withConcurrentAbruptClose(source(), () => {
36+
throw new Error('Oops');
3837
});
3938

4039
expect(await generator.next()).to.deep.equal({ value: 1, done: false });
41-
expect(done).to.equal(false);
42-
await expectPromise(generator.next()).toRejectWith('Oops');
43-
expect(done).to.equal(true);
40+
expect(await generator.return()).to.deep.equal({
41+
value: undefined,
42+
done: true,
43+
});
4444
});
4545

46-
it('calls cleanup function when returned', async () => {
46+
it('ignores async errors when returned', async () => {
4747
async function* source() {
4848
yield 1;
4949
}
5050

51-
let done = false;
52-
const generator = withCleanup(source(), () => {
53-
done = true;
54-
});
51+
const generator = withConcurrentAbruptClose(source(), () =>
52+
Promise.reject(new Error('Oops')),
53+
);
5554

5655
expect(await generator.next()).to.deep.equal({ value: 1, done: false });
57-
expect(done).to.equal(false);
5856
expect(await generator.return()).to.deep.equal({
5957
value: undefined,
6058
done: true,
6159
});
62-
expect(done).to.equal(true);
6360
});
6461

65-
it('calls cleanup function when thrown', async () => {
62+
it('calls function when thrown', async () => {
6663
async function* source() {
6764
yield 1;
6865
}
6966

7067
let done = false;
71-
const generator = withCleanup(source(), () => {
72-
done = true;
73-
});
68+
let error;
69+
const generator = withConcurrentAbruptClose(
70+
source(),
71+
() => {
72+
done = true;
73+
},
74+
(err) => {
75+
done = true;
76+
error = err;
77+
},
78+
);
7479

7580
expect(await generator.next()).to.deep.equal({ value: 1, done: false });
7681
expect(done).to.equal(false);
77-
await expectPromise(generator.throw(new Error('Oops'))).toRejectWith(
78-
'Oops',
79-
);
82+
const oops = new Error('Oops');
83+
await expectPromise(generator.throw(oops)).toRejectWith('Oops');
8084
expect(done).to.equal(true);
85+
expect(error).to.equal(oops);
86+
});
87+
88+
it('ignores sync errors when thrown', async () => {
89+
async function* source() {
90+
yield 1;
91+
}
92+
93+
const generator = withConcurrentAbruptClose(
94+
source(),
95+
() => {
96+
throw new Error('Ignored');
97+
},
98+
() => {
99+
throw new Error('Ignored');
100+
},
101+
);
102+
103+
expect(await generator.next()).to.deep.equal({ value: 1, done: false });
104+
const oops = new Error('Oops');
105+
await expectPromise(generator.throw(oops)).toRejectWith('Oops');
106+
});
107+
108+
it('ignores async errors when thrown', async () => {
109+
async function* source() {
110+
yield 1;
111+
}
112+
113+
const generator = withConcurrentAbruptClose(
114+
source(),
115+
() => Promise.reject(new Error('Ignored')),
116+
() => Promise.reject(new Error('Ignored')),
117+
);
118+
119+
expect(await generator.next()).to.deep.equal({ value: 1, done: false });
120+
const oops = new Error('Oops');
121+
await expectPromise(generator.throw(oops)).toRejectWith('Oops');
81122
});
82123

83124
it('calls cleanup function when disposed', async () => {
@@ -109,17 +150,17 @@ describe('withCleanup', () => {
109150
},
110151
};
111152

112-
let cleanedUp = false;
153+
let called = false;
113154
{
114-
await using generator = withCleanup(source, () => {
115-
cleanedUp = true;
155+
await using generator = withConcurrentAbruptClose(source, () => {
156+
called = true;
116157
});
117158

118159
expect(await generator.next()).to.deep.equal({ value: 1, done: false });
119160
expect(await generator.next()).to.deep.equal({ value: 2, done: false });
120161
}
121162

122-
expect(cleanedUp).to.equal(true);
163+
expect(called).to.equal(true);
123164
expect(returned).to.equal(true);
124165
});
125166

@@ -128,7 +169,7 @@ describe('withCleanup', () => {
128169
yield 1;
129170
}
130171

131-
const generator = withCleanup(source(), () => {
172+
const generator = withConcurrentAbruptClose(source(), () => {
132173
/* noop */
133174
});
134175

src/execution/execute.ts

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -88,7 +88,7 @@ import {
8888
getDirectiveValues,
8989
getVariableValues,
9090
} from './values.js';
91-
import { withCleanup } from './withCleanup.js';
91+
import { withConcurrentAbruptClose } from './withConcurrentAbruptClose.js';
9292

9393
/* eslint-disable max-params */
9494
// This file contains a lot of such errors but we plan to refactor it anyway
@@ -2264,7 +2264,7 @@ function mapSourceToResponse(
22642264
}
22652265

22662266
return abortSignalListener
2267-
? withCleanup(
2267+
? withConcurrentAbruptClose(
22682268
mapAsyncIterable(
22692269
cancellableIterable(resultOrStream, abortSignalListener),
22702270
mapFn,

src/execution/mapAsyncIterable.ts

Lines changed: 30 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
import { isPromise } from '../jsutils/isPromise.js';
22
import type { PromiseOrValue } from '../jsutils/PromiseOrValue.js';
33

4-
import { withCleanup } from './withCleanup.js';
4+
import { withConcurrentAbruptClose } from './withConcurrentAbruptClose.js';
55

66
/**
77
* Given an AsyncIterable and a callback function, return an AsyncIterator
@@ -11,17 +11,35 @@ export function mapAsyncIterable<T, U>(
1111
iterable: AsyncGenerator<T> | AsyncIterable<T>,
1212
callback: (value: T) => PromiseOrValue<U>,
1313
): AsyncGenerator<U, void, void> {
14-
return withCleanup(mapAsyncIterableImpl(iterable, callback), async () => {
15-
const iterator = iterable[Symbol.asyncIterator]();
16-
if (typeof iterator.return === 'function') {
17-
try {
18-
await iterator.return(); /* c8 ignore start */
19-
} catch (_error) {
20-
// FIXME: add test case
21-
/* ignore error */
22-
} /* c8 ignore stop */
23-
}
24-
});
14+
const iterator = iterable[Symbol.asyncIterator]();
15+
const returnFn = iterator.return?.bind(iterator);
16+
const throwFn = iterator.throw?.bind(iterator);
17+
18+
const onReturn = returnFn
19+
? async () => {
20+
await callIgnoringErrors(returnFn);
21+
}
22+
: () => Promise.resolve();
23+
24+
const onThrow = throwFn
25+
? async (reason?: unknown) => {
26+
await callIgnoringErrors(() => throwFn(reason));
27+
}
28+
: onReturn;
29+
30+
return withConcurrentAbruptClose(
31+
mapAsyncIterableImpl(iterable, callback),
32+
onReturn,
33+
onThrow,
34+
);
35+
}
36+
37+
async function callIgnoringErrors(fn: () => Promise<unknown>): Promise<void> {
38+
try {
39+
await fn();
40+
} catch {
41+
// ignore error
42+
}
2543
}
2644

2745
async function* mapAsyncIterableImpl<T, U, R = undefined>(

0 commit comments

Comments
 (0)