Skip to content

Commit 41a671c

Browse files
authored
fix(withConcurrentAbruptClose): do not close unnecessarily (#4644)
do not call abrupt close repeatedly or if closed naturally
1 parent 06e5bd0 commit 41a671c

File tree

4 files changed

+300
-25
lines changed

4 files changed

+300
-25
lines changed

src/execution/__tests__/withConcurrentAbruptClose-test.ts

Lines changed: 163 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,9 @@ import { expectPromise } from '../../__testUtils__/expectPromise.js';
55

66
import { withConcurrentAbruptClose } from '../withConcurrentAbruptClose.js';
77

8+
const asyncDispose: typeof Symbol.asyncDispose =
9+
Symbol.asyncDispose ?? Symbol.for('Symbol.asyncDispose');
10+
811
/* eslint-disable @typescript-eslint/require-await */
912
describe('withConcurrentAbruptClose', () => {
1013
it('calls function when returned', async () => {
@@ -164,6 +167,166 @@ describe('withConcurrentAbruptClose', () => {
164167
expect(returned).to.equal(true);
165168
});
166169

170+
it('calls the abrupt-close function at most once before completion is observed', async () => {
171+
let resolveNext!: (result: IteratorResult<number, void>) => void;
172+
const nextPromise = new Promise<IteratorResult<number, void>>((resolve) => {
173+
resolveNext = resolve;
174+
});
175+
176+
const source: AsyncGenerator<number, void, void> = {
177+
[Symbol.asyncIterator]() {
178+
return this;
179+
},
180+
next(): Promise<IteratorResult<number, void>> {
181+
return nextPromise;
182+
},
183+
return(): Promise<IteratorResult<number, void>> {
184+
return Promise.resolve({ done: true, value: undefined });
185+
},
186+
throw(reason?: unknown): Promise<IteratorResult<number, void>> {
187+
// eslint-disable-next-line @typescript-eslint/prefer-promise-reject-errors
188+
return Promise.reject(reason);
189+
},
190+
async [asyncDispose]() {
191+
await this.return();
192+
},
193+
};
194+
195+
let cleanupCalls = 0;
196+
const generator = withConcurrentAbruptClose(source, () => {
197+
cleanupCalls += 1;
198+
});
199+
200+
const pendingNext = generator.next();
201+
await generator.return();
202+
await generator[asyncDispose]();
203+
204+
resolveNext({ done: true, value: undefined });
205+
await pendingNext;
206+
207+
expect(cleanupCalls).to.equal(1);
208+
});
209+
210+
it('does not call cleanup function again when returned after completion', async () => {
211+
let returned = false;
212+
213+
const source: AsyncGenerator<number, void, void> = {
214+
[Symbol.asyncIterator]() {
215+
return this;
216+
},
217+
next(): Promise<IteratorResult<number, void>> {
218+
return Promise.resolve({ done: true, value: undefined });
219+
},
220+
return(): Promise<IteratorResult<number, void>> {
221+
returned = true;
222+
return Promise.resolve({ done: true, value: undefined });
223+
},
224+
throw(reason?: unknown): Promise<IteratorResult<number, void>> {
225+
// eslint-disable-next-line @typescript-eslint/prefer-promise-reject-errors
226+
return Promise.reject(reason);
227+
},
228+
async [asyncDispose]() {
229+
await this.return();
230+
},
231+
};
232+
233+
let called = false;
234+
const generator = withConcurrentAbruptClose(source, () => {
235+
called = true;
236+
});
237+
238+
expect(await generator.next()).to.deep.equal({
239+
value: undefined,
240+
done: true,
241+
});
242+
expect(await generator.return()).to.deep.equal({
243+
value: undefined,
244+
done: true,
245+
});
246+
247+
expect(called).to.equal(false);
248+
expect(returned).to.equal(true);
249+
});
250+
251+
it('does not call cleanup function again when thrown after completion', async () => {
252+
let thrownReason: unknown;
253+
254+
const source: AsyncGenerator<number, void, void> = {
255+
[Symbol.asyncIterator]() {
256+
return this;
257+
},
258+
next(): Promise<IteratorResult<number, void>> {
259+
return Promise.resolve({ done: true, value: undefined });
260+
},
261+
return(): Promise<IteratorResult<number, void>> {
262+
return Promise.resolve({ done: true, value: undefined });
263+
},
264+
throw(reason?: unknown): Promise<IteratorResult<number, void>> {
265+
thrownReason = reason;
266+
// eslint-disable-next-line @typescript-eslint/prefer-promise-reject-errors
267+
return Promise.reject(reason);
268+
},
269+
async [Symbol.asyncDispose]() {
270+
await this.return();
271+
},
272+
};
273+
274+
let called = false;
275+
const generator = withConcurrentAbruptClose(source, () => {
276+
called = true;
277+
});
278+
279+
expect(await generator.next()).to.deep.equal({
280+
value: undefined,
281+
done: true,
282+
});
283+
284+
const oops = new Error('Oops');
285+
await expectPromise(generator.throw(oops)).toRejectWith('Oops');
286+
287+
expect(called).to.equal(false);
288+
expect(thrownReason).to.equal(oops);
289+
});
290+
291+
it('does not call cleanup function again when disposed after completion', async () => {
292+
let returned = false;
293+
294+
const source: AsyncGenerator<number, void, void> = {
295+
[Symbol.asyncIterator]() {
296+
return this;
297+
},
298+
next(): Promise<IteratorResult<number, void>> {
299+
return Promise.resolve({ done: true, value: undefined });
300+
},
301+
return(): Promise<IteratorResult<number, void>> {
302+
returned = true;
303+
return Promise.resolve({ done: true, value: undefined });
304+
},
305+
throw(reason?: unknown): Promise<IteratorResult<number, void>> {
306+
// eslint-disable-next-line @typescript-eslint/prefer-promise-reject-errors
307+
return Promise.reject(reason);
308+
},
309+
async [Symbol.asyncDispose]() {
310+
await this.return();
311+
},
312+
};
313+
314+
let called = false;
315+
{
316+
await using generator = withConcurrentAbruptClose(source, () => {
317+
called = true;
318+
});
319+
320+
expect(await generator.next()).to.deep.equal({
321+
value: undefined,
322+
done: true,
323+
});
324+
}
325+
326+
expect(called).to.equal(false);
327+
expect(returned).to.equal(true);
328+
});
329+
167330
it('returns the generator itself when the `Symbol.asyncIterator` method is called', async () => {
168331
async function* source() {
169332
yield 1;

src/execution/incremental/__tests__/stream-test.ts

Lines changed: 58 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -2778,18 +2778,66 @@ describe('Execute: stream directive', () => {
27782778

27792779
assert(returned);
27802780
});
2781-
it('Returns underlying async iterables when uses resource is disposed', async () => {
2781+
it('Returns underlying async iterables when resource is disposed before source completion', async () => {
2782+
let returned = false;
2783+
const iterable = {
2784+
[Symbol.asyncIterator]: () => ({
2785+
next: () =>
2786+
new Promise(() => {
2787+
/* never resolves */
2788+
}),
2789+
return: () => {
2790+
returned = true;
2791+
},
2792+
}),
2793+
};
2794+
2795+
const document = parse(`
2796+
query {
2797+
friendList @stream(initialCount: 0) {
2798+
id
2799+
}
2800+
}
2801+
`);
2802+
2803+
const executeResult = await experimentalExecuteIncrementally({
2804+
schema,
2805+
document,
2806+
rootValue: {
2807+
friendList: iterable,
2808+
},
2809+
});
2810+
assert('initialResult' in executeResult);
2811+
2812+
{
2813+
await using iterator =
2814+
executeResult.subsequentResults[Symbol.asyncIterator]();
2815+
assert(iterator != null);
2816+
2817+
const result1 = executeResult.initialResult;
2818+
expectJSON(result1).toDeepEqual({
2819+
data: {
2820+
friendList: [],
2821+
},
2822+
pending: [{ id: '0', path: ['friendList'] }],
2823+
hasNext: true,
2824+
});
2825+
}
2826+
2827+
assert(returned);
2828+
});
2829+
2830+
it('Does not return underlying async iterables when resource is disposed after source completion', async () => {
27822831
let index = 0;
27832832
let returned = false;
2833+
const values = [friends[0]];
27842834
const iterable = {
27852835
[Symbol.asyncIterator]: () => ({
27862836
next: () => {
2787-
if (index > 1) {
2788-
return new Promise(() => {
2789-
// never resolves
2790-
});
2837+
const friend = values[index++];
2838+
if (friend == null) {
2839+
return Promise.resolve({ done: true, value: undefined });
27912840
}
2792-
const friend = friends[index++];
27932841
return Promise.resolve({ done: false, value: friend });
27942842
},
27952843
return: () => {
@@ -2833,16 +2881,17 @@ describe('Execute: stream directive', () => {
28332881
value: {
28342882
incremental: [
28352883
{
2836-
items: [{ id: '1' }, { id: '2' }],
2884+
items: [{ id: '1' }],
28372885
id: '0',
28382886
},
28392887
],
2840-
hasNext: true,
2888+
completed: [{ id: '0' }],
2889+
hasNext: false,
28412890
},
28422891
});
28432892
}
28442893

2845-
assert(returned);
2894+
assert(!returned);
28462895
});
28472896
it('limits stream batches to the default capacity (100)', async () => {
28482897
const document = parse(`

src/execution/legacyIncremental/__tests__/legacy-stream-test.ts

Lines changed: 56 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -2697,18 +2697,65 @@ describe('Execute: stream directive (legacy)', () => {
26972697

26982698
assert(returned);
26992699
});
2700-
it('Returns underlying async iterables when uses resource is disposed', async () => {
2700+
it('Returns underlying async iterables when resource is disposed before source completion', async () => {
2701+
let returned = false;
2702+
const iterable = {
2703+
[Symbol.asyncIterator]: () => ({
2704+
next: () =>
2705+
new Promise(() => {
2706+
/* never resolves */
2707+
}),
2708+
return: () => {
2709+
returned = true;
2710+
},
2711+
}),
2712+
};
2713+
2714+
const document = parse(`
2715+
query {
2716+
friendList @stream(initialCount: 0) {
2717+
id
2718+
}
2719+
}
2720+
`);
2721+
2722+
const executeResult = await legacyExecuteIncrementally({
2723+
schema,
2724+
document,
2725+
rootValue: {
2726+
friendList: iterable,
2727+
},
2728+
});
2729+
assert('initialResult' in executeResult);
2730+
2731+
{
2732+
await using iterator =
2733+
executeResult.subsequentResults[Symbol.asyncIterator]();
2734+
assert(iterator != null);
2735+
2736+
const result1 = executeResult.initialResult;
2737+
expectJSON(result1).toDeepEqual({
2738+
data: {
2739+
friendList: [],
2740+
},
2741+
hasNext: true,
2742+
});
2743+
}
2744+
2745+
assert(returned);
2746+
});
2747+
2748+
it('Does not return underlying async iterables when resource is disposed after source completion', async () => {
27012749
let index = 0;
27022750
let returned = false;
2751+
const values = [friends[0]];
27032752
const iterable = {
27042753
[Symbol.asyncIterator]: () => ({
27052754
next: () => {
2706-
if (index > 1) {
2707-
return new Promise(() => {
2708-
// never resolves
2709-
});
2755+
const friend = values[index++];
2756+
if (friend == null) {
2757+
return Promise.resolve({ done: true, value: undefined });
27102758
}
2711-
const friend = friends[index++];
27122759
return Promise.resolve({ done: false, value: friend });
27132760
},
27142761
return: () => {
@@ -2751,16 +2798,16 @@ describe('Execute: stream directive (legacy)', () => {
27512798
value: {
27522799
incremental: [
27532800
{
2754-
items: [{ id: '1' }, { id: '2' }],
2801+
items: [{ id: '1' }],
27552802
path: ['friendList', 0],
27562803
},
27572804
],
2758-
hasNext: true,
2805+
hasNext: false,
27592806
},
27602807
});
27612808
}
27622809

2763-
assert(returned);
2810+
assert(!returned);
27642811
});
27652812
it('limits stream batches to the default capacity (100)', async () => {
27662813
const document = parse(`

0 commit comments

Comments
 (0)