Skip to content

Commit b73204e

Browse files
committed
add legacy branching Executor
1 parent 8dafa16 commit b73204e

File tree

6 files changed

+6591
-0
lines changed

6 files changed

+6591
-0
lines changed

src/execution/__tests__/executor-test.ts

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -36,12 +36,14 @@ import {
3636
experimentalExecuteIncrementally,
3737
} from '../execute.js';
3838
import type { ExecutionResult } from '../Executor.js';
39+
import { legacyExecuteIncrementally } from '../legacyIncremental/legacyExecuteIncrementally.js';
3940

4041
function execute(args: ExecutionArgs): PromiseOrValue<ExecutionResult> {
4142
return expectEqualPromisesOrValues([
4243
executeThrowingOnIncremental(args),
4344
executeIgnoringIncremental(args),
4445
experimentalExecuteIncrementally(args),
46+
legacyExecuteIncrementally(args),
4547
]) as PromiseOrValue<ExecutionResult>;
4648
}
4749

@@ -50,6 +52,7 @@ function executeSync(args: ExecutionArgs): ExecutionResult {
5052
executeSyncWrappingThrowingOnIncremental(args),
5153
executeIgnoringIncremental(args),
5254
experimentalExecuteIncrementally(args),
55+
legacyExecuteIncrementally(args),
5356
]) as ExecutionResult;
5457
}
5558

Lines changed: 137 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,137 @@
1+
import { AccumulatorMap } from '../../jsutils/AccumulatorMap.js';
2+
import { getBySet } from '../../jsutils/getBySet.js';
3+
import { invariant } from '../../jsutils/invariant.js';
4+
import { isSameSet } from '../../jsutils/isSameSet.js';
5+
import type { ObjMap } from '../../jsutils/ObjMap.js';
6+
7+
import type { GraphQLError } from '../../error/GraphQLError.js';
8+
9+
import type {
10+
DeferUsage,
11+
FieldDetails,
12+
GroupedFieldSet,
13+
} from '../collectFields.js';
14+
import type { ExecutionResult } from '../Executor.js';
15+
import type {
16+
DeferUsageSet,
17+
ExecutionPlan,
18+
} from '../incremental/buildExecutionPlan.js';
19+
import { IncrementalExecutor } from '../incremental/IncrementalExecutor.js';
20+
21+
import { BranchingIncrementalPublisher } from './BranchingIncrementalPublisher.js';
22+
23+
export interface ExperimentalIncrementalExecutionResults {
24+
initialResult: InitialIncrementalExecutionResult;
25+
subsequentResults: AsyncGenerator<
26+
SubsequentIncrementalExecutionResult,
27+
void,
28+
void
29+
>;
30+
}
31+
32+
export interface InitialIncrementalExecutionResult<
33+
TData = ObjMap<unknown>,
34+
TExtensions = ObjMap<unknown>,
35+
> extends ExecutionResult<TData, TExtensions> {
36+
data: TData;
37+
hasNext: true;
38+
extensions?: TExtensions;
39+
}
40+
41+
export interface SubsequentIncrementalExecutionResult<
42+
TData = unknown,
43+
TExtensions = ObjMap<unknown>,
44+
> {
45+
incremental?: ReadonlyArray<IncrementalResult<TData, TExtensions>>;
46+
hasNext: boolean;
47+
extensions?: TExtensions;
48+
}
49+
50+
export type IncrementalResult<TData = unknown, TExtensions = ObjMap<unknown>> =
51+
| IncrementalDeferResult<TData, TExtensions>
52+
| IncrementalStreamResult<TData, TExtensions>;
53+
54+
export interface IncrementalDeferResult<
55+
TData = ObjMap<unknown>,
56+
TExtensions = ObjMap<unknown>,
57+
> extends ExecutionResult<TData, TExtensions> {
58+
path: ReadonlyArray<string | number>;
59+
label?: string;
60+
}
61+
62+
export interface IncrementalStreamResult<
63+
TData = ReadonlyArray<unknown>,
64+
TExtensions = ObjMap<unknown>,
65+
> {
66+
errors?: ReadonlyArray<GraphQLError>;
67+
items: TData | null;
68+
path: ReadonlyArray<string | number>;
69+
label?: string;
70+
extensions?: TExtensions;
71+
}
72+
73+
/** @internal */
74+
export class BranchingIncrementalExecutor extends IncrementalExecutor<ExperimentalIncrementalExecutionResults> {
75+
override buildResponse(
76+
data: ObjMap<unknown> | null,
77+
): ExecutionResult | ExperimentalIncrementalExecutionResults {
78+
const errors = this.collectedErrors.errors;
79+
const work = this.getIncrementalWork();
80+
const { tasks, streams } = work;
81+
if (tasks?.length === 0 && streams?.length === 0) {
82+
return errors.length ? { errors, data } : { data };
83+
}
84+
85+
invariant(data !== null);
86+
const incrementalPublisher = new BranchingIncrementalPublisher();
87+
return incrementalPublisher.buildResponse(
88+
data,
89+
errors,
90+
work,
91+
this.validatedExecutionArgs.externalAbortSignal,
92+
);
93+
}
94+
95+
override buildExecutionPlan(
96+
originalGroupedFieldSet: GroupedFieldSet,
97+
): ExecutionPlan {
98+
return buildBranchingExecutionPlan(originalGroupedFieldSet);
99+
}
100+
}
101+
102+
function buildBranchingExecutionPlan(
103+
originalGroupedFieldSet: GroupedFieldSet,
104+
parentDeferUsages: DeferUsageSet = new Set<DeferUsage>(),
105+
): ExecutionPlan {
106+
const groupedFieldSet = new AccumulatorMap<string, FieldDetails>();
107+
108+
const newGroupedFieldSets = new Map<
109+
DeferUsageSet,
110+
AccumulatorMap<string, FieldDetails>
111+
>();
112+
113+
for (const [responseKey, fieldGroup] of originalGroupedFieldSet) {
114+
for (const fieldDetails of fieldGroup) {
115+
const deferUsage = fieldDetails.deferUsage;
116+
const deferUsageSet =
117+
deferUsage === undefined
118+
? new Set<DeferUsage>()
119+
: new Set([deferUsage]);
120+
if (isSameSet(parentDeferUsages, deferUsageSet)) {
121+
groupedFieldSet.add(responseKey, fieldDetails);
122+
} else {
123+
let newGroupedFieldSet = getBySet(newGroupedFieldSets, deferUsageSet);
124+
if (newGroupedFieldSet === undefined) {
125+
newGroupedFieldSet = new AccumulatorMap();
126+
newGroupedFieldSets.set(deferUsageSet, newGroupedFieldSet);
127+
}
128+
newGroupedFieldSet.add(responseKey, fieldDetails);
129+
}
130+
}
131+
}
132+
133+
return {
134+
groupedFieldSet,
135+
newGroupedFieldSets,
136+
};
137+
}
Lines changed: 235 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,235 @@
1+
import type { ObjMap } from '../../jsutils/ObjMap.js';
2+
import { addPath, pathToArray } from '../../jsutils/Path.js';
3+
4+
import type { GraphQLError } from '../../error/GraphQLError.js';
5+
6+
import type {
7+
DeliveryGroup,
8+
ExecutionGroupValue,
9+
IncrementalWork,
10+
ItemStream,
11+
StreamItemValue,
12+
} from '../incremental/IncrementalExecutor.js';
13+
import type { WorkQueueEvent } from '../incremental/WorkQueue.js';
14+
import { createWorkQueue } from '../incremental/WorkQueue.js';
15+
import { mapAsyncIterable } from '../mapAsyncIterable.js';
16+
import { withConcurrentAbruptClose } from '../withConcurrentAbruptClose.js';
17+
18+
import type {
19+
ExperimentalIncrementalExecutionResults,
20+
IncrementalDeferResult,
21+
IncrementalResult,
22+
IncrementalStreamResult,
23+
InitialIncrementalExecutionResult,
24+
SubsequentIncrementalExecutionResult,
25+
} from './BranchingIncrementalExecutor.js';
26+
27+
interface SubsequentIncrementalExecutionResultContext {
28+
incremental: Array<IncrementalResult>;
29+
hasNext: boolean;
30+
}
31+
32+
/**
33+
* @internal
34+
*/
35+
export class BranchingIncrementalPublisher {
36+
private _indices: Map<ItemStream, number>;
37+
38+
constructor() {
39+
this._indices = new Map();
40+
}
41+
42+
buildResponse(
43+
data: ObjMap<unknown>,
44+
errors: ReadonlyArray<GraphQLError>,
45+
work: IncrementalWork,
46+
abortSignal: AbortSignal | undefined,
47+
): ExperimentalIncrementalExecutionResults {
48+
const { initialStreams, events } = createWorkQueue<
49+
ExecutionGroupValue,
50+
StreamItemValue,
51+
DeliveryGroup,
52+
ItemStream
53+
>(work);
54+
55+
for (const stream of initialStreams) {
56+
this._indices.set(stream, stream.initialCount);
57+
}
58+
59+
function abort(): void {
60+
subsequentResults.throw(abortSignal?.reason).catch(() => {
61+
// Ignore errors
62+
});
63+
}
64+
65+
let onWorkQueueFinished: (() => void) | undefined;
66+
if (abortSignal) {
67+
abortSignal.addEventListener('abort', abort);
68+
onWorkQueueFinished = () =>
69+
abortSignal.removeEventListener('abort', abort);
70+
}
71+
72+
const initialResult: InitialIncrementalExecutionResult = errors.length
73+
? { errors, data, hasNext: true }
74+
: { data, hasNext: true };
75+
76+
const subsequentResults = withConcurrentAbruptClose(
77+
mapAsyncIterable(events, (batch) =>
78+
this._handleBatch(batch, onWorkQueueFinished),
79+
),
80+
() => onWorkQueueFinished?.(),
81+
);
82+
83+
return {
84+
initialResult,
85+
subsequentResults,
86+
};
87+
}
88+
89+
private _handleBatch(
90+
batch: ReadonlyArray<
91+
WorkQueueEvent<
92+
ExecutionGroupValue,
93+
StreamItemValue,
94+
DeliveryGroup,
95+
ItemStream
96+
>
97+
>,
98+
onWorkQueueFinished: (() => void) | undefined,
99+
): SubsequentIncrementalExecutionResult {
100+
const context: SubsequentIncrementalExecutionResultContext = {
101+
incremental: [],
102+
hasNext: true,
103+
};
104+
105+
for (const event of batch) {
106+
this._handleWorkQueueEvent(event, context, onWorkQueueFinished);
107+
}
108+
109+
const { incremental, hasNext } = context;
110+
111+
const result: SubsequentIncrementalExecutionResult = { hasNext };
112+
if (incremental.length > 0) {
113+
result.incremental = incremental;
114+
}
115+
116+
return result;
117+
}
118+
119+
private _handleWorkQueueEvent(
120+
event: WorkQueueEvent<
121+
ExecutionGroupValue,
122+
StreamItemValue,
123+
DeliveryGroup,
124+
ItemStream
125+
>,
126+
context: SubsequentIncrementalExecutionResultContext,
127+
onWorkQueueFinished: (() => void) | undefined,
128+
): void {
129+
switch (event.kind) {
130+
case 'GROUP_VALUES': {
131+
const group = event.group;
132+
for (const value of event.values) {
133+
context.incremental.push(
134+
buildIncrementalResult(
135+
{
136+
data: value.data,
137+
path: pathToArray(group.path),
138+
},
139+
group.label,
140+
value.errors,
141+
),
142+
);
143+
}
144+
break;
145+
}
146+
case 'GROUP_SUCCESS': {
147+
break;
148+
}
149+
case 'GROUP_FAILURE': {
150+
const group = event.group;
151+
context.incremental.push(
152+
buildIncrementalResult(
153+
{
154+
data: null,
155+
path: pathToArray(group.path),
156+
},
157+
group.label,
158+
[event.error as GraphQLError],
159+
),
160+
);
161+
break;
162+
}
163+
case 'STREAM_VALUES': {
164+
const stream = event.stream;
165+
const { values } = event;
166+
const items: Array<unknown> = [];
167+
const errors: Array<GraphQLError> = [];
168+
for (const value of values) {
169+
items.push(value.item);
170+
if (value.errors !== undefined) {
171+
errors.push(...value.errors);
172+
}
173+
}
174+
let index = this._indices.get(stream);
175+
if (index === undefined) {
176+
index = stream.initialCount;
177+
this._indices.set(stream, index);
178+
}
179+
this._indices.set(stream, index + items.length);
180+
context.incremental.push(
181+
buildIncrementalResult(
182+
{
183+
items,
184+
path: pathToArray(addPath(stream.path, index, undefined)),
185+
},
186+
stream.label,
187+
errors.length > 0 ? errors : undefined,
188+
),
189+
);
190+
break;
191+
}
192+
case 'STREAM_SUCCESS': {
193+
this._indices.delete(event.stream);
194+
break;
195+
}
196+
case 'STREAM_FAILURE': {
197+
this._indices.delete(event.stream);
198+
const stream = event.stream;
199+
context.incremental.push(
200+
buildIncrementalResult(
201+
{
202+
items: null,
203+
path: pathToArray(stream.path),
204+
},
205+
stream.label,
206+
[event.error as GraphQLError],
207+
),
208+
);
209+
break;
210+
}
211+
case 'WORK_QUEUE_TERMINATION': {
212+
onWorkQueueFinished?.();
213+
context.hasNext = false;
214+
break;
215+
}
216+
}
217+
}
218+
}
219+
220+
function buildIncrementalResult(
221+
originalIncrementalResult:
222+
| Omit<IncrementalDeferResult, 'label' | 'errors'>
223+
| Omit<IncrementalStreamResult, 'label' | 'errors'>,
224+
label: string | undefined,
225+
errors: ReadonlyArray<GraphQLError> | undefined,
226+
): IncrementalResult {
227+
const incrementalResult: IncrementalResult = originalIncrementalResult;
228+
if (errors !== undefined) {
229+
incrementalResult.errors = errors;
230+
}
231+
if (label !== undefined) {
232+
incrementalResult.label = label;
233+
}
234+
return incrementalResult;
235+
}

0 commit comments

Comments
 (0)