Skip to content

Commit c718bcc

Browse files
authored
extract getStreamUsage (#4543)
similar to collectSubfields (which also gets deferUsages) this can be a memoized independent helper function
1 parent 6eee8ff commit c718bcc

6 files changed

Lines changed: 206 additions & 150 deletions

File tree

src/execution/Executor.ts

Lines changed: 50 additions & 115 deletions
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,8 @@ import { invariant } from '../jsutils/invariant.js';
33
import { isAsyncIterable } from '../jsutils/isAsyncIterable.js';
44
import { isIterableObject } from '../jsutils/isIterableObject.js';
55
import { isPromise } from '../jsutils/isPromise.js';
6-
import { memoize3 } from '../jsutils/memoize3.js';
6+
import { memoize1 } from '../jsutils/memoize1.js';
7+
import { memoize2 } from '../jsutils/memoize2.js';
78
import type { ObjMap } from '../jsutils/ObjMap.js';
89
import type { Path } from '../jsutils/Path.js';
910
import { addPath, pathToArray } from '../jsutils/Path.js';
@@ -39,7 +40,6 @@ import {
3940
isNonNullType,
4041
isObjectType,
4142
} from '../type/definition.js';
42-
import { GraphQLStreamDirective } from '../type/directives.js';
4343
import type { GraphQLSchema } from '../type/schema.js';
4444

4545
import { cancellablePromise } from './cancellablePromise.js';
@@ -53,6 +53,8 @@ import {
5353
collectFields,
5454
collectSubfields as _collectSubfields,
5555
} from './collectFields.js';
56+
import type { StreamUsage } from './getStreamUsage.js';
57+
import { getStreamUsage } from './getStreamUsage.js';
5658
import type {
5759
DeferUsageSet,
5860
ExecutionPlan,
@@ -64,36 +66,12 @@ import { Queue } from './incremental/Queue.js';
6466
import type { Group, Stream, Task, Work } from './incremental/WorkQueue.js';
6567
import { ResolveInfo } from './ResolveInfo.js';
6668
import type { VariableValues } from './values.js';
67-
import { getArgumentValues, getDirectiveValues } from './values.js';
69+
import { getArgumentValues } from './values.js';
6870

6971
/* eslint-disable max-params */
7072
// This file contains a lot of such errors but we plan to refactor it anyway
7173
// so just disable it for entire file.
7274

73-
/**
74-
* A memoized collection of relevant subfields with regard to the return
75-
* type. Memoizing ensures the subfields are not repeatedly calculated, which
76-
* saves overhead when resolving lists of values.
77-
*/
78-
const collectSubfields = memoize3(
79-
(
80-
validatedExecutionArgs: ValidatedExecutionArgs,
81-
returnType: GraphQLObjectType,
82-
fieldDetailsList: FieldDetailsList,
83-
) => {
84-
const { schema, fragments, variableValues, hideSuggestions } =
85-
validatedExecutionArgs;
86-
return _collectSubfields(
87-
schema,
88-
fragments,
89-
variableValues,
90-
returnType,
91-
fieldDetailsList,
92-
hideSuggestions,
93-
);
94-
},
95-
);
96-
9775
/**
9876
* Terminology
9977
*
@@ -335,12 +313,6 @@ export interface FormattedCompletedResult {
335313
errors?: ReadonlyArray<GraphQLFormattedError>;
336314
}
337315

338-
export interface StreamUsage {
339-
label: string | undefined;
340-
initialCount: number;
341-
fieldDetailsList: FieldDetailsList;
342-
}
343-
344316
/** @internal */
345317
interface ExecutionGroup
346318
extends Task<
@@ -413,6 +385,18 @@ export class Executor {
413385
tasks: Array<ExecutionGroup>;
414386
streams: Array<ItemStream>;
415387

388+
collectSubfields: (
389+
returnType: GraphQLObjectType,
390+
fieldDetailsList: FieldDetailsList,
391+
) => {
392+
groupedFieldSet: GroupedFieldSet;
393+
newDeferUsages: ReadonlyArray<DeferUsage>;
394+
};
395+
396+
getStreamUsage: (
397+
fieldDetailsList: FieldDetailsList,
398+
) => StreamUsage | undefined;
399+
416400
constructor(
417401
validatedExecutionArgs: ValidatedExecutionArgs,
418402
deferUsageSet?: DeferUsageSet,
@@ -426,6 +410,28 @@ export class Executor {
426410
this.groups = [];
427411
this.tasks = [];
428412
this.streams = [];
413+
414+
/**
415+
* A memoized collection of relevant subfields with regard to the return
416+
* type. Memoizing ensures the subfields are not repeatedly calculated, which
417+
* saves overhead when resolving lists of values.
418+
*/
419+
this.collectSubfields = memoize2((returnType, fieldDetailsList) => {
420+
const { schema, fragments, variableValues, hideSuggestions } =
421+
this.validatedExecutionArgs;
422+
return _collectSubfields(
423+
schema,
424+
fragments,
425+
variableValues,
426+
returnType,
427+
fieldDetailsList,
428+
hideSuggestions,
429+
);
430+
});
431+
432+
this.getStreamUsage = memoize1((fieldDetailsList) =>
433+
getStreamUsage(this.validatedExecutionArgs, fieldDetailsList),
434+
);
429435
}
430436

431437
executeQueryOrMutationOrSubscriptionEvent(): PromiseOrValue<
@@ -1046,7 +1052,11 @@ export class Executor {
10461052
items: AsyncIterable<unknown>,
10471053
deliveryGroupMap: ReadonlyMap<DeferUsage, DeliveryGroup> | undefined,
10481054
): Promise<ReadonlyArray<unknown>> {
1049-
const streamUsage = this.getStreamUsage(fieldDetailsList, path);
1055+
// do not stream inner lists of multi-dimensional lists
1056+
const streamUsage = getStreamUsage(
1057+
this.validatedExecutionArgs,
1058+
fieldDetailsList,
1059+
);
10501060

10511061
let containsPromise = false;
10521062
const completedResults: Array<unknown> = [];
@@ -1162,7 +1172,11 @@ export class Executor {
11621172
items: Iterable<unknown>,
11631173
deliveryGroupMap: ReadonlyMap<DeferUsage, DeliveryGroup> | undefined,
11641174
): PromiseOrValue<ReadonlyArray<unknown>> {
1165-
const streamUsage = this.getStreamUsage(fieldDetailsList, path);
1175+
// do not stream inner lists of multi-dimensional lists
1176+
const streamUsage =
1177+
typeof path.key === 'number'
1178+
? undefined
1179+
: this.getStreamUsage(fieldDetailsList);
11661180

11671181
// This is specified as a simple map, however we're optimizing the path
11681182
// where the list contains no Promises by avoiding creating another Promise.
@@ -1534,8 +1548,7 @@ export class Executor {
15341548
deliveryGroupMap: ReadonlyMap<DeferUsage, DeliveryGroup> | undefined,
15351549
): PromiseOrValue<ObjMap<unknown>> {
15361550
// Collect sub-fields to execute to complete this value.
1537-
const { groupedFieldSet, newDeferUsages } = collectSubfields(
1538-
this.validatedExecutionArgs,
1551+
const { groupedFieldSet, newDeferUsages } = this.collectSubfields(
15391552
returnType,
15401553
fieldDetailsList,
15411554
);
@@ -1607,84 +1620,6 @@ export class Executor {
16071620
return data;
16081621
}
16091622

1610-
/**
1611-
* Returns an object containing info for streaming if a field should be
1612-
* streamed based on the experimental flag, stream directive present and
1613-
* not disabled by the "if" argument.
1614-
*/
1615-
getStreamUsage(
1616-
fieldDetailsList: FieldDetailsList,
1617-
path: Path,
1618-
): StreamUsage | undefined {
1619-
// do not stream inner lists of multi-dimensional lists
1620-
if (typeof path.key === 'number') {
1621-
return;
1622-
}
1623-
1624-
// TODO: add test for this case (a streamed list nested under a list).
1625-
/* c8 ignore next 7 */
1626-
if (
1627-
(fieldDetailsList as unknown as { _streamUsage: StreamUsage })
1628-
._streamUsage !== undefined
1629-
) {
1630-
return (fieldDetailsList as unknown as { _streamUsage: StreamUsage })
1631-
._streamUsage;
1632-
}
1633-
1634-
const { operation, variableValues } = this.validatedExecutionArgs;
1635-
// validation only allows equivalent streams on multiple fields, so it is
1636-
// safe to only check the first fieldNode for the stream directive
1637-
const stream = getDirectiveValues(
1638-
GraphQLStreamDirective,
1639-
fieldDetailsList[0].node,
1640-
variableValues,
1641-
fieldDetailsList[0].fragmentVariableValues,
1642-
);
1643-
1644-
if (!stream) {
1645-
return;
1646-
}
1647-
1648-
if (stream.if === false) {
1649-
return;
1650-
}
1651-
1652-
invariant(
1653-
typeof stream.initialCount === 'number',
1654-
'initialCount must be a number',
1655-
);
1656-
1657-
invariant(
1658-
stream.initialCount >= 0,
1659-
'initialCount must be a positive integer',
1660-
);
1661-
1662-
invariant(
1663-
operation.operation !== OperationTypeNode.SUBSCRIPTION,
1664-
'`@stream` directive not supported on subscription operations. Disable `@stream` by setting the `if` argument to `false`.',
1665-
);
1666-
1667-
const streamedFieldDetailsList: FieldDetailsList = fieldDetailsList.map(
1668-
(fieldDetails) => ({
1669-
node: fieldDetails.node,
1670-
deferUsage: undefined,
1671-
fragmentVariableValues: fieldDetails.fragmentVariableValues,
1672-
}),
1673-
);
1674-
1675-
const streamUsage = {
1676-
initialCount: stream.initialCount,
1677-
label: typeof stream.label === 'string' ? stream.label : undefined,
1678-
fieldDetailsList: streamedFieldDetailsList,
1679-
};
1680-
1681-
(
1682-
fieldDetailsList as unknown as { _streamUsage: StreamUsage }
1683-
)._streamUsage = streamUsage;
1684-
1685-
return streamUsage;
1686-
}
1687-
16881623
handleStream(
16891624
index: number,
16901625
path: Path,

src/execution/__tests__/stream-test.ts

Lines changed: 38 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -694,6 +694,44 @@ describe('Execute: stream directive', () => {
694694
},
695695
]);
696696
});
697+
it('Can stream multi-dimensional lists from async iterable', async () => {
698+
const document = parse('{ scalarListList @stream(initialCount: 1) }');
699+
const result = await complete(document, {
700+
async *scalarListList() {
701+
yield await Promise.resolve(['apple', 'apple', 'apple']);
702+
yield await Promise.resolve(['banana', 'banana', 'banana']);
703+
yield await Promise.resolve(['coconut', 'coconut', 'coconut']);
704+
},
705+
});
706+
expectJSON(result).toDeepEqual([
707+
{
708+
data: {
709+
scalarListList: [['apple', 'apple', 'apple']],
710+
},
711+
pending: [{ id: '0', path: ['scalarListList'] }],
712+
hasNext: true,
713+
},
714+
{
715+
incremental: [
716+
{
717+
items: [['banana', 'banana', 'banana']],
718+
id: '0',
719+
},
720+
],
721+
hasNext: true,
722+
},
723+
{
724+
incremental: [
725+
{
726+
items: [['coconut', 'coconut', 'coconut']],
727+
id: '0',
728+
},
729+
],
730+
completed: [{ id: '0' }],
731+
hasNext: false,
732+
},
733+
]);
734+
});
697735
it('Can stream a field that returns an async iterable, using a non-zero initialCount', async () => {
698736
const document = parse(`
699737
query {

src/execution/getStreamUsage.ts

Lines changed: 72 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,72 @@
1+
import { invariant } from '../jsutils/invariant.js';
2+
3+
import { OperationTypeNode } from '../language/ast.js';
4+
5+
import { GraphQLStreamDirective } from '../type/directives.js';
6+
7+
import type { FieldDetailsList } from './collectFields.js';
8+
import type { ValidatedExecutionArgs } from './Executor.js';
9+
import { getDirectiveValues } from './values.js';
10+
11+
export interface StreamUsage {
12+
label: string | undefined;
13+
initialCount: number;
14+
fieldDetailsList: FieldDetailsList;
15+
}
16+
17+
/**
18+
* Returns an object containing info for streaming if a field should be
19+
* streamed based on the experimental flag, stream directive present and
20+
* not disabled by the "if" argument.
21+
*/
22+
export function getStreamUsage(
23+
validatedExecutionArgs: ValidatedExecutionArgs,
24+
fieldDetailsList: FieldDetailsList,
25+
): StreamUsage | undefined {
26+
const { operation, variableValues } = validatedExecutionArgs;
27+
// validation only allows equivalent streams on multiple fields, so it is
28+
// safe to only check the first fieldNode for the stream directive
29+
const stream = getDirectiveValues(
30+
GraphQLStreamDirective,
31+
fieldDetailsList[0].node,
32+
variableValues,
33+
fieldDetailsList[0].fragmentVariableValues,
34+
);
35+
36+
if (!stream) {
37+
return;
38+
}
39+
40+
if (stream.if === false) {
41+
return;
42+
}
43+
44+
invariant(
45+
typeof stream.initialCount === 'number',
46+
'initialCount must be a number',
47+
);
48+
49+
invariant(
50+
stream.initialCount >= 0,
51+
'initialCount must be a positive integer',
52+
);
53+
54+
invariant(
55+
operation.operation !== OperationTypeNode.SUBSCRIPTION,
56+
'`@stream` directive not supported on subscription operations. Disable `@stream` by setting the `if` argument to `false`.',
57+
);
58+
59+
const streamedFieldDetailsList: FieldDetailsList = fieldDetailsList.map(
60+
(fieldDetails) => ({
61+
node: fieldDetails.node,
62+
deferUsage: undefined,
63+
fragmentVariableValues: fieldDetails.fragmentVariableValues,
64+
}),
65+
);
66+
67+
return {
68+
initialCount: stream.initialCount,
69+
label: typeof stream.label === 'string' ? stream.label : undefined,
70+
fieldDetailsList: streamedFieldDetailsList,
71+
};
72+
}

src/jsutils/memoize1.ts

Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,20 @@
1+
/**
2+
* Memoizes the provided one-argument function.
3+
*/
4+
export function memoize1<A1 extends object, R>(
5+
fn: (a1: A1) => R,
6+
): (a1: A1) => R {
7+
let cache0: WeakMap<A1, R>;
8+
9+
return function memoized(a1) {
10+
cache0 ??= new WeakMap();
11+
12+
let fnResult = cache0.get(a1);
13+
if (fnResult === undefined) {
14+
fnResult = fn(a1);
15+
cache0.set(a1, fnResult);
16+
}
17+
18+
return fnResult;
19+
};
20+
}

0 commit comments

Comments
 (0)