Skip to content

Commit 82b27ae

Browse files
committed
reduce ticks stemming from withCleanup (#4526)
replace withCleanup() with more targeted withConcurrentAbruptClose(). next() calls are no longer wrapped, reducing extra ticks = refactors mapAsyncIterable to use the more targeted function = refactors IncrementalPublisher to also perform required clean up on a normal close. * this change also alters the behavior of a stream test to no longer pass spuriously and allows it to be further streamlined
1 parent 2b4c668 commit 82b27ae

6 files changed

Lines changed: 169 additions & 106 deletions

File tree

src/execution/__tests__/simplePubSub.ts

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
import { promiseWithResolvers } from '../../jsutils/promiseWithResolvers.js';
22

3-
import { withCleanup } from '../withCleanup.js';
3+
import { withConcurrentAbruptClose } from '../withConcurrentAbruptClose.js';
44

55
/**
66
* Create an AsyncIterator from an EventEmitter. Useful for mocking a
@@ -59,7 +59,7 @@ export class SimplePubSub<T> {
5959
}
6060
}
6161

62-
return withCleanup(getSubscriberImpl(), emptyQueue);
62+
return withConcurrentAbruptClose(getSubscriberImpl(), emptyQueue);
6363

6464
function pushValue(event: T): void {
6565
const value: R = transform(event);

src/execution/__tests__/withCleanup-test.ts renamed to src/execution/__tests__/withConcurrentAbruptClose-test.ts

Lines changed: 73 additions & 32 deletions
Original file line numberDiff line numberDiff line change
@@ -3,81 +3,122 @@ import { describe, it } from 'mocha';
33

44
import { expectPromise } from '../../__testUtils__/expectPromise.js';
55

6-
import { withCleanup } from '../withCleanup.js';
6+
import { withConcurrentAbruptClose } from '../withConcurrentAbruptClose.js';
77

88
/* eslint-disable @typescript-eslint/require-await */
9-
describe('withCleanup', () => {
10-
it('calls cleanup function when completes', async () => {
9+
describe('withConcurrentAbruptClose', () => {
10+
it('calls function when returned', async () => {
1111
async function* source() {
1212
yield 1;
1313
}
1414

1515
let done = false;
16-
const generator = withCleanup(source(), () => {
16+
17+
const generator = withConcurrentAbruptClose(source(), () => {
1718
done = true;
1819
});
1920

2021
expect(await generator.next()).to.deep.equal({ value: 1, done: false });
2122
expect(done).to.equal(false);
22-
expect(await generator.next()).to.deep.equal({
23+
expect(await generator.return()).to.deep.equal({
2324
value: undefined,
2425
done: true,
2526
});
2627
expect(done).to.equal(true);
2728
});
2829

29-
it('calls cleanup function when completes with error', async () => {
30+
it('ignores sync errors when returned', async () => {
3031
async function* source() {
3132
yield 1;
32-
throw new Error('Oops');
3333
}
3434

35-
let done = false;
36-
const generator = withCleanup(source(), () => {
37-
done = true;
35+
const generator = withConcurrentAbruptClose(source(), () => {
36+
throw new Error('Oops');
3837
});
3938

4039
expect(await generator.next()).to.deep.equal({ value: 1, done: false });
41-
expect(done).to.equal(false);
42-
await expectPromise(generator.next()).toRejectWith('Oops');
43-
expect(done).to.equal(true);
40+
expect(await generator.return()).to.deep.equal({
41+
value: undefined,
42+
done: true,
43+
});
4444
});
4545

46-
it('calls cleanup function when returned', async () => {
46+
it('ignores async errors when returned', async () => {
4747
async function* source() {
4848
yield 1;
4949
}
5050

51-
let done = false;
52-
const generator = withCleanup(source(), () => {
53-
done = true;
54-
});
51+
const generator = withConcurrentAbruptClose(source(), () =>
52+
Promise.reject(new Error('Oops')),
53+
);
5554

5655
expect(await generator.next()).to.deep.equal({ value: 1, done: false });
57-
expect(done).to.equal(false);
5856
expect(await generator.return()).to.deep.equal({
5957
value: undefined,
6058
done: true,
6159
});
62-
expect(done).to.equal(true);
6360
});
6461

65-
it('calls cleanup function when thrown', async () => {
62+
it('calls function when thrown', async () => {
6663
async function* source() {
6764
yield 1;
6865
}
6966

7067
let done = false;
71-
const generator = withCleanup(source(), () => {
72-
done = true;
73-
});
68+
let error;
69+
const generator = withConcurrentAbruptClose(
70+
source(),
71+
() => {
72+
done = true;
73+
},
74+
(err) => {
75+
done = true;
76+
error = err;
77+
},
78+
);
7479

7580
expect(await generator.next()).to.deep.equal({ value: 1, done: false });
7681
expect(done).to.equal(false);
77-
await expectPromise(generator.throw(new Error('Oops'))).toRejectWith(
78-
'Oops',
79-
);
82+
const oops = new Error('Oops');
83+
await expectPromise(generator.throw(oops)).toRejectWith('Oops');
8084
expect(done).to.equal(true);
85+
expect(error).to.equal(oops);
86+
});
87+
88+
it('ignores sync errors when thrown', async () => {
89+
async function* source() {
90+
yield 1;
91+
}
92+
93+
const generator = withConcurrentAbruptClose(
94+
source(),
95+
() => {
96+
throw new Error('Ignored');
97+
},
98+
() => {
99+
throw new Error('Ignored');
100+
},
101+
);
102+
103+
expect(await generator.next()).to.deep.equal({ value: 1, done: false });
104+
const oops = new Error('Oops');
105+
await expectPromise(generator.throw(oops)).toRejectWith('Oops');
106+
});
107+
108+
it('ignores async errors when thrown', async () => {
109+
async function* source() {
110+
yield 1;
111+
}
112+
113+
const generator = withConcurrentAbruptClose(
114+
source(),
115+
() => Promise.reject(new Error('Ignored')),
116+
() => Promise.reject(new Error('Ignored')),
117+
);
118+
119+
expect(await generator.next()).to.deep.equal({ value: 1, done: false });
120+
const oops = new Error('Oops');
121+
await expectPromise(generator.throw(oops)).toRejectWith('Oops');
81122
});
82123

83124
it('calls cleanup function when disposed', async () => {
@@ -109,17 +150,17 @@ describe('withCleanup', () => {
109150
},
110151
};
111152

112-
let cleanedUp = false;
153+
let called = false;
113154
{
114-
await using generator = withCleanup(source, () => {
115-
cleanedUp = true;
155+
await using generator = withConcurrentAbruptClose(source, () => {
156+
called = true;
116157
});
117158

118159
expect(await generator.next()).to.deep.equal({ value: 1, done: false });
119160
expect(await generator.next()).to.deep.equal({ value: 2, done: false });
120161
}
121162

122-
expect(cleanedUp).to.equal(true);
163+
expect(called).to.equal(true);
123164
expect(returned).to.equal(true);
124165
});
125166

@@ -128,7 +169,7 @@ describe('withCleanup', () => {
128169
yield 1;
129170
}
130171

131-
const generator = withCleanup(source(), () => {
172+
const generator = withConcurrentAbruptClose(source(), () => {
132173
/* noop */
133174
});
134175

src/execution/execute.ts

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -66,7 +66,7 @@ import { getVariableSignature } from './getVariableSignature.js';
6666
import { mapAsyncIterable } from './mapAsyncIterable.js';
6767
import type { VariableValues } from './values.js';
6868
import { getArgumentValues, getVariableValues } from './values.js';
69-
import { withCleanup } from './withCleanup.js';
69+
import { withConcurrentAbruptClose } from './withConcurrentAbruptClose.js';
7070

7171
/* eslint-disable max-params */
7272
// This file contains a lot of such errors but we plan to refactor it anyway
@@ -1557,7 +1557,7 @@ function mapSourceToResponse(
15571557
}
15581558

15591559
return abortSignalListener
1560-
? withCleanup(
1560+
? withConcurrentAbruptClose(
15611561
mapAsyncIterable(
15621562
cancellableIterable(resultOrStream, abortSignalListener),
15631563
mapFn,

src/execution/mapAsyncIterable.ts

Lines changed: 30 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
import { isPromise } from '../jsutils/isPromise.js';
22
import type { PromiseOrValue } from '../jsutils/PromiseOrValue.js';
33

4-
import { withCleanup } from './withCleanup.js';
4+
import { withConcurrentAbruptClose } from './withConcurrentAbruptClose.js';
55

66
/**
77
* Given an AsyncIterable and a callback function, return an AsyncIterator
@@ -11,17 +11,35 @@ export function mapAsyncIterable<T, U>(
1111
iterable: AsyncGenerator<T> | AsyncIterable<T>,
1212
callback: (value: T) => PromiseOrValue<U>,
1313
): AsyncGenerator<U, void, void> {
14-
return withCleanup(mapAsyncIterableImpl(iterable, callback), async () => {
15-
const iterator = iterable[Symbol.asyncIterator]();
16-
if (typeof iterator.return === 'function') {
17-
try {
18-
await iterator.return(); /* c8 ignore start */
19-
} catch (_error) {
20-
// FIXME: add test case
21-
/* ignore error */
22-
} /* c8 ignore stop */
23-
}
24-
});
14+
const iterator = iterable[Symbol.asyncIterator]();
15+
const returnFn = iterator.return?.bind(iterator);
16+
const throwFn = iterator.throw?.bind(iterator);
17+
18+
const onReturn = returnFn
19+
? async () => {
20+
await callIgnoringErrors(returnFn);
21+
}
22+
: () => Promise.resolve();
23+
24+
const onThrow = throwFn
25+
? async (reason?: unknown) => {
26+
await callIgnoringErrors(() => throwFn(reason));
27+
}
28+
: onReturn;
29+
30+
return withConcurrentAbruptClose(
31+
mapAsyncIterableImpl(iterable, callback),
32+
onReturn,
33+
onThrow,
34+
);
35+
}
36+
37+
async function callIgnoringErrors(fn: () => Promise<unknown>): Promise<void> {
38+
try {
39+
await fn();
40+
} catch {
41+
// ignore error
42+
}
2543
}
2644

2745
async function* mapAsyncIterableImpl<T, U, R = undefined>(

src/execution/withCleanup.ts

Lines changed: 0 additions & 58 deletions
This file was deleted.
Lines changed: 62 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,62 @@
1+
import { isPromise } from '../jsutils/isPromise.js';
2+
import type { PromiseOrValue } from '../jsutils/PromiseOrValue.js';
3+
4+
const asyncDispose: typeof Symbol.asyncDispose =
5+
Symbol.asyncDispose /* c8 ignore start */ ??
6+
Symbol.for('Symbol.asyncDispose'); /* c8 ignore stop */
7+
8+
/**
9+
* Given an AsyncGenerator and provided functions, return an AsyncGenerator
10+
* which calls the given functions when the generator is abruptly closed,
11+
* calling the functions immediately even if the generator is paused.
12+
*
13+
* This is useful for allowing return and throw to trigger logic even if the
14+
* generator is paused on a pending await within a `next()` call (including
15+
* if that logic can cause that hanging `next()` call to return early).
16+
*
17+
* Errors from the provided functions are ignored.
18+
*
19+
* The provided functions should be idempotent, as they may be called
20+
* multiple times.
21+
*/
22+
export function withConcurrentAbruptClose<T>(
23+
generator: AsyncGenerator<T, void, void>,
24+
beforeReturn: () => PromiseOrValue<void>,
25+
beforeThrow: (error?: unknown) => PromiseOrValue<void> = beforeReturn,
26+
): AsyncGenerator<T, void, void> {
27+
return {
28+
[Symbol.asyncIterator]() {
29+
return this;
30+
},
31+
next() {
32+
return generator.next();
33+
},
34+
async return() {
35+
ignoreErrors(beforeReturn);
36+
return generator.return();
37+
},
38+
async throw(error?: unknown) {
39+
ignoreErrors(() => beforeThrow(error));
40+
return generator.throw(error);
41+
},
42+
async [asyncDispose]() {
43+
ignoreErrors(beforeReturn);
44+
if (typeof generator[asyncDispose] === 'function') {
45+
await generator[asyncDispose]();
46+
}
47+
},
48+
};
49+
}
50+
51+
function ignoreErrors(fn: () => PromiseOrValue<unknown>): void {
52+
try {
53+
const result = fn();
54+
if (isPromise(result)) {
55+
result.catch(() => {
56+
// ignore error
57+
});
58+
}
59+
} catch {
60+
// ignore error
61+
}
62+
}

0 commit comments

Comments
 (0)