Skip to content

Commit 970aacd

Browse files
committed
move subscription helpers from Executor to execute (#4540)
1 parent e9de165 commit 970aacd

2 files changed

Lines changed: 182 additions & 174 deletions

File tree

src/execution/Executor.ts

Lines changed: 0 additions & 167 deletions
Original file line numberDiff line numberDiff line change
@@ -51,7 +51,6 @@ import {
5151
collectFields,
5252
collectSubfields as _collectSubfields,
5353
} from './collectFields.js';
54-
import { mapAsyncIterable } from './mapAsyncIterable.js';
5554
import { ResolveInfo } from './ResolveInfo.js';
5655
import type { VariableValues } from './values.js';
5756
import { getArgumentValues } from './values.js';
@@ -1241,172 +1240,6 @@ function collectAndExecuteSubfields(
12411240
);
12421241
}
12431242

1244-
export function mapSourceToResponse(
1245-
validatedExecutionArgs: ValidatedExecutionArgs,
1246-
resultOrStream: ExecutionResult | AsyncIterable<unknown>,
1247-
): AsyncGenerator<ExecutionResult, void, void> | ExecutionResult {
1248-
if (!isAsyncIterable(resultOrStream)) {
1249-
return resultOrStream;
1250-
}
1251-
1252-
// For each payload yielded from a subscription, map it over the normal
1253-
// GraphQL `execute` function, with `payload` as the rootValue.
1254-
// This implements the "MapSourceToResponseEvent" algorithm described in
1255-
// the GraphQL specification..
1256-
function mapFn(payload: unknown): PromiseOrValue<ExecutionResult> {
1257-
const perEventExecutionArgs: ValidatedExecutionArgs = {
1258-
...validatedExecutionArgs,
1259-
rootValue: payload,
1260-
};
1261-
return validatedExecutionArgs.perEventExecutor(perEventExecutionArgs);
1262-
}
1263-
1264-
const externalAbortSignal = validatedExecutionArgs.externalAbortSignal;
1265-
if (externalAbortSignal) {
1266-
const generator = mapAsyncIterable(resultOrStream, mapFn);
1267-
return {
1268-
...generator,
1269-
next: () => cancellablePromise(generator.next(), externalAbortSignal),
1270-
};
1271-
}
1272-
return mapAsyncIterable(resultOrStream, mapFn);
1273-
}
1274-
1275-
export function createSourceEventStreamImpl(
1276-
validatedExecutionArgs: ValidatedExecutionArgs,
1277-
): PromiseOrValue<AsyncIterable<unknown> | ExecutionResult> {
1278-
try {
1279-
const eventStream = executeSubscription(validatedExecutionArgs);
1280-
if (isPromise(eventStream)) {
1281-
return eventStream.then(undefined, (error: unknown) => ({
1282-
errors: [error as GraphQLError],
1283-
}));
1284-
}
1285-
1286-
return eventStream;
1287-
} catch (error) {
1288-
return { errors: [error] };
1289-
}
1290-
}
1291-
1292-
function executeSubscription(
1293-
validatedExecutionArgs: ValidatedExecutionArgs,
1294-
): PromiseOrValue<AsyncIterable<unknown>> {
1295-
const {
1296-
schema,
1297-
fragments,
1298-
rootValue,
1299-
contextValue,
1300-
operation,
1301-
variableValues,
1302-
hideSuggestions,
1303-
externalAbortSignal,
1304-
} = validatedExecutionArgs;
1305-
1306-
const rootType = schema.getSubscriptionType();
1307-
if (rootType == null) {
1308-
throw new GraphQLError(
1309-
'Schema is not configured to execute subscription operation.',
1310-
{ nodes: operation },
1311-
);
1312-
}
1313-
1314-
const { groupedFieldSet } = collectFields(
1315-
schema,
1316-
fragments,
1317-
variableValues,
1318-
rootType,
1319-
operation.selectionSet,
1320-
hideSuggestions,
1321-
);
1322-
1323-
const firstRootField = groupedFieldSet.entries().next().value as [
1324-
string,
1325-
FieldDetailsList,
1326-
];
1327-
const [responseName, fieldDetailsList] = firstRootField;
1328-
const firstFieldDetails = fieldDetailsList[0];
1329-
const firstFieldNode = firstFieldDetails.node;
1330-
const fieldName = firstFieldNode.name.value;
1331-
const fieldDef = schema.getField(rootType, fieldName);
1332-
1333-
if (!fieldDef) {
1334-
throw new GraphQLError(
1335-
`The subscription field "${fieldName}" is not defined.`,
1336-
{ nodes: toNodes(fieldDetailsList) },
1337-
);
1338-
}
1339-
1340-
const path = addPath(undefined, responseName, rootType.name);
1341-
const info = new ResolveInfo(
1342-
validatedExecutionArgs,
1343-
fieldDef,
1344-
fieldDetailsList,
1345-
rootType,
1346-
path,
1347-
() => ({ abortSignal: externalAbortSignal }),
1348-
);
1349-
1350-
try {
1351-
// Implements the "ResolveFieldEventStream" algorithm from GraphQL specification.
1352-
// It differs from "ResolveFieldValue" due to providing a different `resolveFn`.
1353-
1354-
// Build a JS object of arguments from the field.arguments AST, using the
1355-
// variables scope to fulfill any variable references.
1356-
const args = getArgumentValues(
1357-
fieldDef,
1358-
firstFieldNode,
1359-
variableValues,
1360-
firstFieldDetails.fragmentVariableValues,
1361-
hideSuggestions,
1362-
);
1363-
1364-
// Call the `subscribe()` resolver or the default resolver to produce an
1365-
// AsyncIterable yielding raw payloads.
1366-
const resolveFn =
1367-
fieldDef.subscribe ?? validatedExecutionArgs.subscribeFieldResolver;
1368-
1369-
// The resolve function's optional third argument is a context value that
1370-
// is provided to every resolve function within an execution. It is commonly
1371-
// used to represent an authenticated user, or request-specific caches.
1372-
const result = resolveFn(rootValue, args, contextValue, info);
1373-
1374-
if (isPromise(result)) {
1375-
const promise = externalAbortSignal
1376-
? cancellablePromise(result, externalAbortSignal)
1377-
: result;
1378-
return promise
1379-
.then(assertEventStream)
1380-
.then(undefined, (error: unknown) => {
1381-
throw locatedError(
1382-
error,
1383-
toNodes(fieldDetailsList),
1384-
pathToArray(path),
1385-
);
1386-
});
1387-
}
1388-
return assertEventStream(result);
1389-
} catch (error) {
1390-
throw locatedError(error, toNodes(fieldDetailsList), pathToArray(path));
1391-
}
1392-
}
1393-
1394-
function assertEventStream(result: unknown): AsyncIterable<unknown> {
1395-
if (result instanceof Error) {
1396-
throw result;
1397-
}
1398-
1399-
// Assert field returned an event stream, otherwise yield an error.
1400-
if (!isAsyncIterable(result)) {
1401-
throw new GraphQLError(
1402-
'Subscription field must return Async Iterable. ' +
1403-
`Received: ${inspect(result)}.`,
1404-
);
1405-
}
1406-
1407-
return result;
1408-
}
1409-
14101243
function returnIteratorCatchingErrors(
14111244
iterator: Iterator<unknown> | AsyncIterator<unknown>,
14121245
): void {

src/execution/execute.ts

Lines changed: 182 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -1,13 +1,18 @@
1+
import { inspect } from '../jsutils/inspect.js';
2+
import { isAsyncIterable } from '../jsutils/isAsyncIterable.js';
13
import { isObjectLike } from '../jsutils/isObjectLike.js';
24
import { isPromise } from '../jsutils/isPromise.js';
35
import type { Maybe } from '../jsutils/Maybe.js';
46
import type { ObjMap } from '../jsutils/ObjMap.js';
7+
import { addPath, pathToArray } from '../jsutils/Path.js';
58
import type { PromiseOrValue } from '../jsutils/PromiseOrValue.js';
69

710
import { GraphQLError } from '../error/GraphQLError.js';
11+
import { locatedError } from '../error/locatedError.js';
812

913
import type {
1014
DocumentNode,
15+
FieldNode,
1116
FragmentDefinitionNode,
1217
OperationDefinitionNode,
1318
} from '../language/ast.js';
@@ -21,15 +26,15 @@ import type {
2126
import { assertValidSchema } from '../type/index.js';
2227
import type { GraphQLSchema } from '../type/schema.js';
2328

24-
import type { FragmentDetails } from './collectFields.js';
29+
import { cancellablePromise } from './cancellablePromise.js';
30+
import type { FieldDetailsList, FragmentDetails } from './collectFields.js';
31+
import { collectFields } from './collectFields.js';
2532
import type { ExecutionResult, ValidatedExecutionArgs } from './Executor.js';
26-
import {
27-
createSourceEventStreamImpl,
28-
executeQueryOrMutationOrSubscriptionEvent,
29-
mapSourceToResponse,
30-
} from './Executor.js';
33+
import { executeQueryOrMutationOrSubscriptionEvent } from './Executor.js';
3134
import { getVariableSignature } from './getVariableSignature.js';
32-
import { getVariableValues } from './values.js';
35+
import { mapAsyncIterable } from './mapAsyncIterable.js';
36+
import { ResolveInfo } from './ResolveInfo.js';
37+
import { getArgumentValues, getVariableValues } from './values.js';
3338

3439
/**
3540
* Implements the "Executing requests" section of the GraphQL specification.
@@ -373,3 +378,173 @@ export const defaultFieldResolver: GraphQLFieldResolver<unknown, unknown> =
373378
return property;
374379
}
375380
};
381+
382+
function mapSourceToResponse(
383+
validatedExecutionArgs: ValidatedExecutionArgs,
384+
resultOrStream: ExecutionResult | AsyncIterable<unknown>,
385+
): AsyncGenerator<ExecutionResult, void, void> | ExecutionResult {
386+
if (!isAsyncIterable(resultOrStream)) {
387+
return resultOrStream;
388+
}
389+
390+
// For each payload yielded from a subscription, map it over the normal
391+
// GraphQL `execute` function, with `payload` as the rootValue.
392+
// This implements the "MapSourceToResponseEvent" algorithm described in
393+
// the GraphQL specification..
394+
function mapFn(payload: unknown): PromiseOrValue<ExecutionResult> {
395+
const perEventExecutionArgs: ValidatedExecutionArgs = {
396+
...validatedExecutionArgs,
397+
rootValue: payload,
398+
};
399+
return validatedExecutionArgs.perEventExecutor(perEventExecutionArgs);
400+
}
401+
402+
const externalAbortSignal = validatedExecutionArgs.externalAbortSignal;
403+
if (externalAbortSignal) {
404+
const generator = mapAsyncIterable(resultOrStream, mapFn);
405+
return {
406+
...generator,
407+
next: () => cancellablePromise(generator.next(), externalAbortSignal),
408+
};
409+
}
410+
return mapAsyncIterable(resultOrStream, mapFn);
411+
}
412+
413+
function createSourceEventStreamImpl(
414+
validatedExecutionArgs: ValidatedExecutionArgs,
415+
): PromiseOrValue<AsyncIterable<unknown> | ExecutionResult> {
416+
try {
417+
const eventStream = executeSubscription(validatedExecutionArgs);
418+
if (isPromise(eventStream)) {
419+
return eventStream.then(undefined, (error: unknown) => ({
420+
errors: [error as GraphQLError],
421+
}));
422+
}
423+
424+
return eventStream;
425+
} catch (error) {
426+
return { errors: [error] };
427+
}
428+
}
429+
430+
function executeSubscription(
431+
validatedExecutionArgs: ValidatedExecutionArgs,
432+
): PromiseOrValue<AsyncIterable<unknown>> {
433+
const {
434+
schema,
435+
fragments,
436+
rootValue,
437+
contextValue,
438+
operation,
439+
variableValues,
440+
hideSuggestions,
441+
externalAbortSignal,
442+
} = validatedExecutionArgs;
443+
444+
const rootType = schema.getSubscriptionType();
445+
if (rootType == null) {
446+
throw new GraphQLError(
447+
'Schema is not configured to execute subscription operation.',
448+
{ nodes: operation },
449+
);
450+
}
451+
452+
const { groupedFieldSet } = collectFields(
453+
schema,
454+
fragments,
455+
variableValues,
456+
rootType,
457+
operation.selectionSet,
458+
hideSuggestions,
459+
);
460+
461+
const firstRootField = groupedFieldSet.entries().next().value as [
462+
string,
463+
FieldDetailsList,
464+
];
465+
const [responseName, fieldDetailsList] = firstRootField;
466+
const firstFieldDetails = fieldDetailsList[0];
467+
const firstNode = firstFieldDetails.node;
468+
const fieldName = firstNode.name.value;
469+
const fieldDef = schema.getField(rootType, fieldName);
470+
471+
if (!fieldDef) {
472+
throw new GraphQLError(
473+
`The subscription field "${fieldName}" is not defined.`,
474+
{ nodes: toNodes(fieldDetailsList) },
475+
);
476+
}
477+
478+
const path = addPath(undefined, responseName, rootType.name);
479+
const info = new ResolveInfo(
480+
validatedExecutionArgs,
481+
fieldDef,
482+
fieldDetailsList,
483+
rootType,
484+
path,
485+
() => ({ abortSignal: externalAbortSignal }),
486+
);
487+
488+
try {
489+
// Implements the "ResolveFieldEventStream" algorithm from GraphQL specification.
490+
// It differs from "ResolveFieldValue" due to providing a different `resolveFn`.
491+
492+
// Build a JS object of arguments from the field.arguments AST, using the
493+
// variables scope to fulfill any variable references.
494+
const args = getArgumentValues(
495+
fieldDef,
496+
firstNode,
497+
variableValues,
498+
firstFieldDetails.fragmentVariableValues,
499+
hideSuggestions,
500+
);
501+
502+
// Call the `subscribe()` resolver or the default resolver to produce an
503+
// AsyncIterable yielding raw payloads.
504+
const resolveFn =
505+
fieldDef.subscribe ?? validatedExecutionArgs.subscribeFieldResolver;
506+
507+
// The resolve function's optional third argument is a context value that
508+
// is provided to every resolve function within an execution. It is commonly
509+
// used to represent an authenticated user, or request-specific caches.
510+
const result = resolveFn(rootValue, args, contextValue, info);
511+
512+
if (isPromise(result)) {
513+
const promise = externalAbortSignal
514+
? cancellablePromise(result, externalAbortSignal)
515+
: result;
516+
return promise
517+
.then(assertEventStream)
518+
.then(undefined, (error: unknown) => {
519+
throw locatedError(
520+
error,
521+
toNodes(fieldDetailsList),
522+
pathToArray(path),
523+
);
524+
});
525+
}
526+
return assertEventStream(result);
527+
} catch (error) {
528+
throw locatedError(error, toNodes(fieldDetailsList), pathToArray(path));
529+
}
530+
}
531+
532+
function assertEventStream(result: unknown): AsyncIterable<unknown> {
533+
if (result instanceof Error) {
534+
throw result;
535+
}
536+
537+
// Assert field returned an event stream, otherwise yield an error.
538+
if (!isAsyncIterable(result)) {
539+
throw new GraphQLError(
540+
'Subscription field must return Async Iterable. ' +
541+
`Received: ${inspect(result)}.`,
542+
);
543+
}
544+
545+
return result;
546+
}
547+
548+
function toNodes(fieldDetailsList: FieldDetailsList): ReadonlyArray<FieldNode> {
549+
return fieldDetailsList.map((fieldDetails) => fieldDetails.node);
550+
}

0 commit comments

Comments
 (0)