diff --git a/src/execution/IncrementalPublisher.ts b/src/execution/IncrementalPublisher.ts index ffde6f70b8..c9d4d67242 100644 --- a/src/execution/IncrementalPublisher.ts +++ b/src/execution/IncrementalPublisher.ts @@ -93,7 +93,7 @@ export interface FormattedSubsequentIncrementalExecutionResult< extensions?: TExtensions; } -interface RawDeferResult> { +interface BareDeferredGroupedFieldSetResult> { errors?: ReadonlyArray; data: TData; } @@ -101,7 +101,7 @@ interface RawDeferResult> { export interface IncrementalDeferResult< TData = ObjMap, TExtensions = ObjMap, -> extends RawDeferResult { +> extends BareDeferredGroupedFieldSetResult { id: string; subPath?: ReadonlyArray; extensions?: TExtensions; @@ -118,7 +118,7 @@ export interface FormattedIncrementalDeferResult< extensions?: TExtensions; } -interface RawStreamItemsResult> { +interface BareStreamItemsResult> { errors?: ReadonlyArray; items: TData; } @@ -126,7 +126,7 @@ interface RawStreamItemsResult> { export interface IncrementalStreamResult< TData = ReadonlyArray, TExtensions = ObjMap, -> extends RawStreamItemsResult { +> extends BareStreamItemsResult { id: string; subPath?: ReadonlyArray; extensions?: TExtensions; @@ -175,10 +175,14 @@ export function buildIncrementalResponse( context: IncrementalPublisherContext, result: ObjMap, errors: ReadonlyArray | undefined, - futures: ReadonlyArray, + incrementalDataRecords: ReadonlyArray, ): ExperimentalIncrementalExecutionResults { const incrementalPublisher = new IncrementalPublisher(context); - return incrementalPublisher.buildResponse(result, errors, futures); + return incrementalPublisher.buildResponse( + result, + errors, + incrementalDataRecords, + ); } interface IncrementalPublisherContext { @@ -195,7 +199,7 @@ class IncrementalPublisher { private _context: IncrementalPublisherContext; private _nextId: number; private _pending: Set; - private _completedResultQueue: Array; + private _completedResultQueue: Array; private _newPending: Set; private _incremental: Array; private _completed: Array; @@ -217,9 +221,9 @@ class IncrementalPublisher { buildResponse( data: ObjMap, errors: ReadonlyArray | undefined, - futures: ReadonlyArray, + incrementalDataRecords: ReadonlyArray, ): ExperimentalIncrementalExecutionResults { - this._addFutures(futures); + this._addIncrementalDataRecords(incrementalDataRecords); this._pruneEmpty(); const pending = this._pendingSourcesToResults(); @@ -235,14 +239,16 @@ class IncrementalPublisher { }; } - private _addFutures(futures: ReadonlyArray): void { - for (const future of futures) { - if (isDeferredGroupedFieldSetRecord(future)) { - for (const deferredFragmentRecord of future.deferredFragmentRecords) { + private _addIncrementalDataRecords( + incrementalDataRecords: ReadonlyArray, + ): void { + for (const incrementalDataRecord of incrementalDataRecords) { + if (isDeferredGroupedFieldSetRecord(incrementalDataRecord)) { + for (const deferredFragmentRecord of incrementalDataRecord.deferredFragmentRecords) { this._addDeferredFragmentRecord(deferredFragmentRecord); } - const result = future.result; + const result = incrementalDataRecord.result; if (isPromise(result)) { // eslint-disable-next-line @typescript-eslint/no-floating-promises result.then((resolved) => { @@ -255,12 +261,12 @@ class IncrementalPublisher { continue; } - const streamRecord = future.streamRecord; + const streamRecord = incrementalDataRecord.streamRecord; if (streamRecord.id === undefined) { this._newPending.add(streamRecord); } - const result = future.getResult(); + const result = incrementalDataRecord.getResult(); if (isPromise(result)) { // eslint-disable-next-line @typescript-eslint/no-floating-promises result.then((resolved) => { @@ -383,7 +389,7 @@ class IncrementalPublisher { while (!isDone) { let pending: Array = []; - let completedResult: FutureResult | undefined; + let completedResult: IncrementalDataRecordResult | undefined; while ( (completedResult = this._completedResultQueue.shift()) !== undefined ) { @@ -514,8 +520,10 @@ class IncrementalPublisher { ); } - if (deferredGroupedFieldSetResult.futures) { - this._addFutures(deferredGroupedFieldSetResult.futures); + if (deferredGroupedFieldSetResult.incrementalDataRecords) { + this._addIncrementalDataRecords( + deferredGroupedFieldSetResult.incrementalDataRecords, + ); } for (const deferredFragmentRecord of deferredGroupedFieldSetResult.deferredFragmentRecords) { @@ -608,8 +616,10 @@ class IncrementalPublisher { this._incremental.push(incrementalEntry); - if (streamItemsResult.futures) { - this._addFutures(streamItemsResult.futures); + if (streamItemsResult.incrementalDataRecords) { + this._addIncrementalDataRecords( + streamItemsResult.incrementalDataRecords, + ); this._pruneEmpty(); } } @@ -655,16 +665,16 @@ export function isDeferredFragmentRecord( } export function isDeferredGroupedFieldSetRecord( - future: Future, -): future is DeferredGroupedFieldSetRecord { - return future instanceof DeferredGroupedFieldSetRecord; + incrementalDataRecord: IncrementalDataRecord, +): incrementalDataRecord is DeferredGroupedFieldSetRecord { + return incrementalDataRecord instanceof DeferredGroupedFieldSetRecord; } export interface IncrementalContext { deferUsageSet: DeferUsageSet | undefined; path: Path | undefined; errors?: Map | undefined; - futures?: Array | undefined; + incrementalDataRecords?: Array | undefined; } export type DeferredGroupedFieldSetResult = @@ -680,8 +690,8 @@ export function isDeferredGroupedFieldSetResult( interface ReconcilableDeferredGroupedFieldSetResult { deferredFragmentRecords: ReadonlyArray; path: Array; - result: RawDeferResult; - futures?: ReadonlyArray | undefined; + result: BareDeferredGroupedFieldSetResult; + incrementalDataRecords?: ReadonlyArray | undefined; sent?: true | undefined; } @@ -785,14 +795,14 @@ interface NonReconcilableStreamItemsResult { interface NonTerminatingStreamItemsResult { streamRecord: StreamRecord; - result: RawStreamItemsResult; - futures?: ReadonlyArray | undefined; + result: BareStreamItemsResult; + incrementalDataRecords?: ReadonlyArray | undefined; } interface TerminatingStreamItemsResult { streamRecord: StreamRecord; result?: never; - futures?: never; + incrementalDataRecords?: never; errors?: never; } @@ -848,14 +858,21 @@ export class StreamItemsRecord { this.nextStreamItems !== undefined ? { ...result, - futures: [this.nextStreamItems, ...(result.futures ?? [])], + incrementalDataRecords: [ + this.nextStreamItems, + ...(result.incrementalDataRecords ?? []), + ], } : result; } } -export type Future = DeferredGroupedFieldSetRecord | StreamItemsRecord; +export type IncrementalDataRecord = + | DeferredGroupedFieldSetRecord + | StreamItemsRecord; -export type FutureResult = DeferredGroupedFieldSetResult | StreamItemsResult; +export type IncrementalDataRecordResult = + | DeferredGroupedFieldSetResult + | StreamItemsResult; type SubsequentResultRecord = DeferredFragmentRecord | StreamRecord; diff --git a/src/execution/execute.ts b/src/execution/execute.ts index 742457be21..53dd1b271f 100644 --- a/src/execution/execute.ts +++ b/src/execution/execute.ts @@ -62,8 +62,8 @@ import type { DeferredGroupedFieldSetResult, ExecutionResult, ExperimentalIncrementalExecutionResults, - Future, IncrementalContext, + IncrementalDataRecord, StreamItemsResult, } from './IncrementalPublisher.js'; import { @@ -160,7 +160,7 @@ export interface ExecutionContext { subscribeFieldResolver: GraphQLFieldResolver; errors?: Map | undefined; cancellableStreams?: Set | undefined; - futures?: Array; + incrementalDataRecords?: Array; } export interface ExecutionArgs { @@ -319,7 +319,10 @@ function executeImpl( newDeferMap, ); - addFutures(exeContext, newDeferredGroupedFieldSetRecords); + addIncrementalDataRecords( + exeContext, + newDeferredGroupedFieldSetRecords, + ); } } @@ -338,28 +341,16 @@ function executeImpl( } } -function addFutures( +function addIncrementalDataRecords( context: ExecutionContext | IncrementalContext, - newFutures: ReadonlyArray, + newIncrementalDataRecords: ReadonlyArray, ): void { - const futures = context.futures; - if (futures === undefined) { - context.futures = [...newFutures]; + const incrementalDataRecords = context.incrementalDataRecords; + if (incrementalDataRecords === undefined) { + context.incrementalDataRecords = [...newIncrementalDataRecords]; return; } - futures.push(...newFutures); -} - -function addFuture( - context: ExecutionContext | IncrementalContext, - newFuture: Future, -): void { - const futures = context.futures; - if (futures === undefined) { - context.futures = [newFuture]; - return; - } - futures.push(newFuture); + incrementalDataRecords.push(...newIncrementalDataRecords); } function withError( @@ -373,18 +364,27 @@ function buildDataResponse( exeContext: ExecutionContext, data: ObjMap, ): ExecutionResult | ExperimentalIncrementalExecutionResults { - const { errors, futures } = exeContext; - if (futures === undefined) { + const { errors, incrementalDataRecords } = exeContext; + if (incrementalDataRecords === undefined) { return buildSingleResult(data, errors); } if (errors === undefined) { - return buildIncrementalResponse(exeContext, data, undefined, futures); + return buildIncrementalResponse( + exeContext, + data, + undefined, + incrementalDataRecords, + ); } - const filteredFutures = filterFutures(undefined, errors, futures); + const filteredIncrementalDataRecords = filterIncrementalDataRecords( + undefined, + errors, + incrementalDataRecords, + ); - if (filteredFutures.length === 0) { + if (filteredIncrementalDataRecords.length === 0) { return buildSingleResult(data, errors); } @@ -392,7 +392,7 @@ function buildDataResponse( exeContext, data, Array.from(errors.values()), - filteredFutures, + filteredIncrementalDataRecords, ); } @@ -405,16 +405,18 @@ function buildSingleResult( : { data }; } -function filterFutures( +function filterIncrementalDataRecords( initialPath: Path | undefined, errors: ReadonlyMap, - futures: ReadonlyArray, -): ReadonlyArray { - const filteredFutures: Array = []; - for (const future of futures) { - let currentPath: Path | undefined = isDeferredGroupedFieldSetRecord(future) - ? future.path - : future.streamRecord.path; + incrementalDataRecords: ReadonlyArray, +): ReadonlyArray { + const filteredIncrementalDataRecords: Array = []; + for (const incrementalDataRecord of incrementalDataRecords) { + let currentPath: Path | undefined = isDeferredGroupedFieldSetRecord( + incrementalDataRecord, + ) + ? incrementalDataRecord.path + : incrementalDataRecord.streamRecord.path; if (errors.has(currentPath)) { continue; @@ -437,11 +439,11 @@ function filterFutures( } if (!filtered) { - filteredFutures.push(future); + filteredIncrementalDataRecords.push(incrementalDataRecord); } } - return filteredFutures; + return filteredIncrementalDataRecords; } /** @@ -689,7 +691,7 @@ function executeFields( throw error; } - // If there are no promises, we can just return the object and any futures + // If there are no promises, we can just return the object and any incrementalDataRecords if (!containsPromise) { return results; } @@ -1140,7 +1142,7 @@ async function completeAsyncIteratorValue( ); const context = incrementalContext ?? exeContext; - addFuture(context, firstStreamItems); + addIncrementalDataRecord(context, firstStreamItems); break; } @@ -1177,6 +1179,18 @@ async function completeAsyncIteratorValue( return containsPromise ? Promise.all(completedResults) : completedResults; } +function addIncrementalDataRecord( + context: ExecutionContext | IncrementalContext, + newIncrementalDataRecord: IncrementalDataRecord, +): void { + const incrementalDataRecords = context.incrementalDataRecords; + if (incrementalDataRecords === undefined) { + context.incrementalDataRecords = [newIncrementalDataRecord]; + return; + } + incrementalDataRecords.push(newIncrementalDataRecord); +} + /** * Complete a list value by completing each item in the list with the * inner type @@ -1251,7 +1265,7 @@ function completeListValue( ); const context = incrementalContext ?? exeContext; - addFuture(context, firstStreamItems); + addIncrementalDataRecord(context, firstStreamItems); break; } @@ -1664,7 +1678,7 @@ function collectAndExecuteSubfields( ); const context = incrementalContext ?? exeContext; - addFutures(context, newDeferredGroupedFieldSetRecords); + addIncrementalDataRecords(context, newDeferredGroupedFieldSetRecords); } return subFields; } @@ -1701,7 +1715,7 @@ function collectAndExecuteSubfields( ); const context = incrementalContext ?? exeContext; - addFutures(context, newDeferredGroupedFieldSetRecords); + addIncrementalDataRecords(context, newDeferredGroupedFieldSetRecords); } return subFields; } @@ -2100,8 +2114,8 @@ function buildDeferredGroupedFieldSetResult( path: Path | undefined, data: ObjMap, ): DeferredGroupedFieldSetResult { - const { errors, futures } = incrementalContext; - if (futures === undefined) { + const { errors, incrementalDataRecords } = incrementalContext; + if (incrementalDataRecords === undefined) { return { deferredFragmentRecords, path: pathToArray(path), @@ -2119,7 +2133,7 @@ function buildDeferredGroupedFieldSetResult( deferredFragmentRecords, path: pathToArray(path), result: { data }, - futures, + incrementalDataRecords, }; } @@ -2127,7 +2141,11 @@ function buildDeferredGroupedFieldSetResult( deferredFragmentRecords, path: pathToArray(path), result: { data, errors: [...errors.values()] }, - futures: filterFutures(path, errors, futures), + incrementalDataRecords: filterIncrementalDataRecords( + path, + errors, + incrementalDataRecords, + ), }; } @@ -2412,8 +2430,8 @@ function buildStreamItemsResult( streamRecord: StreamRecord, completedItem: unknown, ): StreamItemsResult { - const { errors, futures } = incrementalContext; - if (futures === undefined) { + const { errors, incrementalDataRecords } = incrementalContext; + if (incrementalDataRecords === undefined) { return { streamRecord, result: @@ -2430,7 +2448,7 @@ function buildStreamItemsResult( return { streamRecord, result: { items: [completedItem] }, - futures, + incrementalDataRecords, }; } @@ -2441,6 +2459,10 @@ function buildStreamItemsResult( items: [completedItem], errors: [...errors.values()], }, - futures: filterFutures(path, errors, futures), + incrementalDataRecords: filterIncrementalDataRecords( + path, + errors, + incrementalDataRecords, + ), }; }