Skip to content

Commit 4c69a0d

Browse files
committed
refactor(execution): extract withCleanup from mapAsyncIterable (#4496)
motivations: 1. separation of concerns (`mapAsyncIterable` should have single function) 2. `withCleanUp` is useful independently, i.e. it removes the need to "hand-roll" AsyncGenerators across the codebase Some background for Point 2: Built in async generators have a feature/quirk/annoyance that all of their calls are sequential. If a call to `gen.next()` is pending, i.e. waiting on the return of a promise (`... const result = await fetchExternalData; yield result ... `), then the consumer cannot call `gen.return()` manually to abort the generator, the call to `gen.return()` will not initiate until the external fetch is awaited. This is because Async Generators (and promises more broadly) do not have cancellation as a built-in-feature. Hand-rolled Async Generators can allow a call to `gen.return()` to be used to abort the generator; they can start immediately. Technically, they can even return out of order with that pending `gen.next()`, although it's helpful if they always return in order, like with Repeaters (https://repeater.js.org/). We were previously using "hand-rolled" Async Generators for exactly this purpose. With this change, only `withCleanUp` and `mapAsyncIterable` stay as "hand-rolled" Async Generators, but we can use actual Async Generators in our actual incremental delivery code (within `Incremental Publisher` ) and subscription test code (`simplePubSub`). This is good for two reasons: A. Using JavaScript primitives is more modern/ergonomic B. Hopefully will reduce churn and bug surface. In terms of `B`, this PR also fixes a potential bug introduced when we upgraded TS to 5.2. That version of TypeScript requires "hand-rolled" Async Generators to include an `[Symbol.asyncDispose]` property method, but the runtime is responsible for supplying the functionality polyfill. When we upgraded, we by accident required our users to have that functionality (which is present in all of our tested versions) but not guaranteed. This change updates `mapAsyncIterable` and `withCleanUp` to fall back to `Symbol.for("asyncDispose")` which should avoid a crash. The broader codebase should not have to be aware of this level of detail when using these helpers!
1 parent 5253821 commit 4c69a0d

6 files changed

Lines changed: 254 additions & 114 deletions

File tree

src/execution/__tests__/mapAsyncIterable-test.ts

Lines changed: 0 additions & 48 deletions
Original file line numberDiff line numberDiff line change
@@ -89,54 +89,6 @@ describe('mapAsyncIterable', () => {
8989
});
9090
});
9191

92-
it('calls done when completes', async () => {
93-
async function* source() {
94-
yield 1;
95-
yield 2;
96-
yield 3;
97-
}
98-
99-
let done = false;
100-
const doubles = mapAsyncIterable(
101-
source(),
102-
(x) => Promise.resolve(x + x),
103-
() => {
104-
done = true;
105-
},
106-
);
107-
108-
expect(await doubles.next()).to.deep.equal({ value: 2, done: false });
109-
expect(await doubles.next()).to.deep.equal({ value: 4, done: false });
110-
expect(await doubles.next()).to.deep.equal({ value: 6, done: false });
111-
expect(done).to.equal(false);
112-
expect(await doubles.next()).to.deep.equal({
113-
value: undefined,
114-
done: true,
115-
});
116-
expect(done).to.equal(true);
117-
});
118-
119-
it('calls done when completes with error', async () => {
120-
async function* source() {
121-
yield 1;
122-
throw new Error('Oops');
123-
}
124-
125-
let done = false;
126-
const doubles = mapAsyncIterable(
127-
source(),
128-
(x) => Promise.resolve(x + x),
129-
() => {
130-
done = true;
131-
},
132-
);
133-
134-
expect(await doubles.next()).to.deep.equal({ value: 2, done: false });
135-
expect(done).to.equal(false);
136-
await expectPromise(doubles.next()).toRejectWith('Oops');
137-
expect(done).to.equal(true);
138-
});
139-
14092
it('allows returning early from mapped async generator', async () => {
14193
async function* source() {
14294
try {

src/execution/__tests__/simplePubSub.ts

Lines changed: 28 additions & 34 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,6 @@
1-
import { assert } from 'chai';
1+
import { promiseWithResolvers } from '../../jsutils/promiseWithResolvers.js';
2+
3+
import { withCleanup } from '../withCleanup.js';
24

35
/**
46
* Create an AsyncIterator from an EventEmitter. Useful for mocking a
@@ -21,57 +23,49 @@ export class SimplePubSub<T> {
2123
}
2224

2325
getSubscriber<R>(transform: (value: T) => R): AsyncGenerator<R, void, void> {
24-
const pullQueue: Array<(result: IteratorResult<R, void>) => void> = [];
26+
let pendingNext: ((result: R) => void) | undefined;
2527
const pushQueue: Array<R> = [];
2628
let listening = true;
2729
this._subscribers.add(pushValue);
2830

2931
const emptyQueue = () => {
3032
listening = false;
3133
this._subscribers.delete(pushValue);
32-
for (const resolve of pullQueue) {
33-
resolve({ value: undefined, done: true });
34+
if (pendingNext) {
35+
pendingNext(undefined as R);
3436
}
35-
pullQueue.length = 0;
37+
pendingNext = undefined;
3638
pushQueue.length = 0;
3739
};
3840

39-
return {
40-
next(): Promise<IteratorResult<R, void>> {
41-
if (!listening) {
42-
return Promise.resolve({ value: undefined, done: true });
43-
}
44-
41+
async function* getSubscriberImpl(): AsyncGenerator<R, void, void> {
42+
// eslint-disable-next-line no-unmodified-loop-condition
43+
while (listening) {
4544
if (pushQueue.length > 0) {
4645
const value = pushQueue[0];
4746
pushQueue.shift();
48-
return Promise.resolve({ value, done: false });
47+
yield value;
48+
continue;
4949
}
50-
return new Promise((resolve) => pullQueue.push(resolve));
51-
},
52-
return(): Promise<IteratorResult<R, void>> {
53-
emptyQueue();
54-
return Promise.resolve({ value: undefined, done: true });
55-
},
56-
throw(error: unknown) {
57-
emptyQueue();
58-
// eslint-disable-next-line @typescript-eslint/prefer-promise-reject-errors
59-
return Promise.reject(error);
60-
},
61-
[Symbol.asyncIterator]() {
62-
return this;
63-
},
64-
async [Symbol.asyncDispose]() {
65-
await this.return();
66-
},
67-
};
50+
51+
const { promise, resolve } = promiseWithResolvers<R>();
52+
pendingNext = resolve;
53+
// eslint-disable-next-line no-await-in-loop
54+
const value = await promise;
55+
if (!listening) {
56+
return;
57+
}
58+
yield value;
59+
}
60+
}
61+
62+
return withCleanup(getSubscriberImpl(), emptyQueue);
6863

6964
function pushValue(event: T): void {
7065
const value: R = transform(event);
71-
if (pullQueue.length > 0) {
72-
const receiver = pullQueue.shift();
73-
assert(receiver != null);
74-
receiver({ value, done: false });
66+
if (pendingNext) {
67+
pendingNext(value);
68+
pendingNext = undefined;
7569
} else {
7670
pushQueue.push(value);
7771
}
Lines changed: 137 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,137 @@
1+
import { expect } from 'chai';
2+
import { describe, it } from 'mocha';
3+
4+
import { expectPromise } from '../../__testUtils__/expectPromise.js';
5+
6+
import { withCleanup } from '../withCleanup.js';
7+
8+
/* eslint-disable @typescript-eslint/require-await */
9+
describe('withCleanup', () => {
10+
it('calls cleanup function when completes', async () => {
11+
async function* source() {
12+
yield 1;
13+
}
14+
15+
let done = false;
16+
const generator = withCleanup(source(), () => {
17+
done = true;
18+
});
19+
20+
expect(await generator.next()).to.deep.equal({ value: 1, done: false });
21+
expect(done).to.equal(false);
22+
expect(await generator.next()).to.deep.equal({
23+
value: undefined,
24+
done: true,
25+
});
26+
expect(done).to.equal(true);
27+
});
28+
29+
it('calls cleanup function when completes with error', async () => {
30+
async function* source() {
31+
yield 1;
32+
throw new Error('Oops');
33+
}
34+
35+
let done = false;
36+
const generator = withCleanup(source(), () => {
37+
done = true;
38+
});
39+
40+
expect(await generator.next()).to.deep.equal({ value: 1, done: false });
41+
expect(done).to.equal(false);
42+
await expectPromise(generator.next()).toRejectWith('Oops');
43+
expect(done).to.equal(true);
44+
});
45+
46+
it('calls cleanup function when returned', async () => {
47+
async function* source() {
48+
yield 1;
49+
}
50+
51+
let done = false;
52+
const generator = withCleanup(source(), () => {
53+
done = true;
54+
});
55+
56+
expect(await generator.next()).to.deep.equal({ value: 1, done: false });
57+
expect(done).to.equal(false);
58+
expect(await generator.return()).to.deep.equal({
59+
value: undefined,
60+
done: true,
61+
});
62+
expect(done).to.equal(true);
63+
});
64+
65+
it('calls cleanup function when thrown', async () => {
66+
async function* source() {
67+
yield 1;
68+
}
69+
70+
let done = false;
71+
const generator = withCleanup(source(), () => {
72+
done = true;
73+
});
74+
75+
expect(await generator.next()).to.deep.equal({ value: 1, done: false });
76+
expect(done).to.equal(false);
77+
await expectPromise(generator.throw(new Error('Oops'))).toRejectWith(
78+
'Oops',
79+
);
80+
expect(done).to.equal(true);
81+
});
82+
83+
it('calls cleanup function when disposed', async () => {
84+
let returned = false;
85+
86+
const items = [1, 2, 3];
87+
const source: AsyncGenerator<number, void, void> = {
88+
[Symbol.asyncIterator]() {
89+
return this;
90+
},
91+
next(): Promise<IteratorResult<number, void>> {
92+
const value = items.shift();
93+
if (value !== undefined) {
94+
return Promise.resolve({ done: false, value });
95+
}
96+
97+
return Promise.resolve({ done: true, value: undefined });
98+
},
99+
return(): Promise<IteratorResult<number, void>> {
100+
returned = true;
101+
return Promise.resolve({ done: true, value: undefined });
102+
},
103+
throw(): Promise<IteratorResult<number, void>> {
104+
returned = true;
105+
return Promise.reject(new Error());
106+
},
107+
async [Symbol.asyncDispose]() {
108+
await this.return();
109+
},
110+
};
111+
112+
let cleanedUp = false;
113+
{
114+
await using generator = withCleanup(source, () => {
115+
cleanedUp = true;
116+
});
117+
118+
expect(await generator.next()).to.deep.equal({ value: 1, done: false });
119+
expect(await generator.next()).to.deep.equal({ value: 2, done: false });
120+
}
121+
122+
expect(cleanedUp).to.equal(true);
123+
expect(returned).to.equal(true);
124+
});
125+
126+
it('returns the generator itself when the `Symbol.asyncIterator` method is called', async () => {
127+
async function* source() {
128+
yield 1;
129+
}
130+
131+
const generator = withCleanup(source(), () => {
132+
/* noop */
133+
});
134+
135+
expect(generator[Symbol.asyncIterator]()).to.equal(generator);
136+
});
137+
});

src/execution/execute.ts

Lines changed: 19 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -66,6 +66,7 @@ import { getVariableSignature } from './getVariableSignature.js';
6666
import { mapAsyncIterable } from './mapAsyncIterable.js';
6767
import type { VariableValues } from './values.js';
6868
import { getArgumentValues, getVariableValues } from './values.js';
69+
import { withCleanup } from './withCleanup.js';
6970

7071
/* eslint-disable max-params */
7172
// This file contains a lot of such errors but we plan to refactor it anyway
@@ -467,10 +468,7 @@ export function validateExecutionArgs(
467468
schema,
468469
variableDefinitions,
469470
rawVariableValues ?? {},
470-
{
471-
maxErrors: options?.maxCoercionErrors ?? 50,
472-
hideSuggestions,
473-
},
471+
{ maxErrors: options?.maxCoercionErrors ?? 50, hideSuggestions },
474472
);
475473

476474
if (variableValuesOrErrors.errors) {
@@ -1553,19 +1551,23 @@ function mapSourceToResponse(
15531551
// GraphQL `execute` function, with `payload` as the rootValue.
15541552
// This implements the "MapSourceToResponseEvent" algorithm described in
15551553
// the GraphQL specification..
1556-
return mapAsyncIterable(
1557-
abortSignalListener
1558-
? cancellableIterable(resultOrStream, abortSignalListener)
1559-
: resultOrStream,
1560-
(payload: unknown) => {
1561-
const perEventExecutionArgs: ValidatedExecutionArgs = {
1562-
...validatedExecutionArgs,
1563-
rootValue: payload,
1564-
};
1565-
return validatedExecutionArgs.perEventExecutor(perEventExecutionArgs);
1566-
},
1567-
() => abortSignalListener?.disconnect(),
1568-
);
1554+
function mapFn(payload: unknown): PromiseOrValue<ExecutionResult> {
1555+
const perEventExecutionArgs: ValidatedExecutionArgs = {
1556+
...validatedExecutionArgs,
1557+
rootValue: payload,
1558+
};
1559+
return validatedExecutionArgs.perEventExecutor(perEventExecutionArgs);
1560+
}
1561+
1562+
return abortSignalListener
1563+
? withCleanup(
1564+
mapAsyncIterable(
1565+
cancellableIterable(resultOrStream, abortSignalListener),
1566+
mapFn,
1567+
),
1568+
() => abortSignalListener.disconnect(),
1569+
)
1570+
: mapAsyncIterable(resultOrStream, mapFn);
15691571
}
15701572

15711573
export function executeSubscriptionEvent(

src/execution/mapAsyncIterable.ts

Lines changed: 11 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -7,26 +7,18 @@ import type { PromiseOrValue } from '../jsutils/PromiseOrValue.js';
77
export function mapAsyncIterable<T, U, R = undefined>(
88
iterable: AsyncGenerator<T, R, void> | AsyncIterable<T>,
99
callback: (value: T) => PromiseOrValue<U>,
10-
onDone?: () => void,
1110
): AsyncGenerator<U, R, void> {
1211
const iterator = iterable[Symbol.asyncIterator]();
1312

1413
async function mapResult(
1514
promise: Promise<IteratorResult<T, R>>,
1615
): Promise<IteratorResult<U, R>> {
17-
let value: T;
18-
try {
19-
const result = await promise;
20-
if (result.done) {
21-
onDone?.();
22-
return result;
23-
}
24-
value = result.value;
25-
} catch (error) {
26-
onDone?.();
27-
throw error;
16+
const result = await promise;
17+
if (result.done) {
18+
return result;
2819
}
2920

21+
const value = result.value;
3022
try {
3123
return { value: await callback(value), done: false };
3224
} catch (error) {
@@ -46,6 +38,10 @@ export function mapAsyncIterable<T, U, R = undefined>(
4638
}
4739
}
4840

41+
const asyncDispose: typeof Symbol.asyncDispose =
42+
Symbol.asyncDispose /* c8 ignore start */ ??
43+
Symbol.for('Symbol.asyncDispose'); /* c8 ignore stop */
44+
4945
return {
5046
async next() {
5147
return mapResult(iterator.next());
@@ -70,13 +66,13 @@ export function mapAsyncIterable<T, U, R = undefined>(
7066
[Symbol.asyncIterator]() {
7167
return this;
7268
},
73-
async [Symbol.asyncDispose]() {
69+
async [asyncDispose]() {
7470
await this.return(undefined as R);
7571
if (
76-
typeof (iterable as AsyncGenerator<T, R, void>)[Symbol.asyncDispose] ===
72+
typeof (iterable as AsyncGenerator<T, R, void>)[asyncDispose] ===
7773
'function'
7874
) {
79-
await (iterable as AsyncGenerator<T, R, void>)[Symbol.asyncDispose]();
75+
await (iterable as AsyncGenerator<T, R, void>)[asyncDispose]();
8076
}
8177
},
8278
};

0 commit comments

Comments
 (0)