Skip to content

Commit 8eee22b

Browse files
Add Initial Version of Fleet Awareness Plugin
1 parent 4653320 commit 8eee22b

File tree

5 files changed

+229
-16
lines changed

5 files changed

+229
-16
lines changed

gateway-js/package.json

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -37,11 +37,20 @@
3737
"@apollo/utils.logger": "^2.0.0",
3838
"@josephg/resolvable": "^1.0.1",
3939
"@opentelemetry/api": "^1.0.1",
40+
"@opentelemetry/exporter-metrics-otlp-http": "^0.203.0",
41+
"@opentelemetry/node": "^0.24.0",
42+
"@opentelemetry/resources": "^2.0.1",
43+
"@opentelemetry/resource-detector-alibaba-cloud": "^0.31.3",
44+
"@opentelemetry/resource-detector-aws": "^2.3.0",
45+
"@opentelemetry/resource-detector-azure": "^0.10.0",
46+
"@opentelemetry/resource-detector-gcp": "^0.37.0",
47+
"@opentelemetry/sdk-metrics": "^2.0.1",
4048
"@types/node-fetch": "^2.6.2",
4149
"async-retry": "^1.3.3",
4250
"loglevel": "^1.6.1",
4351
"make-fetch-happen": "^11.0.0",
4452
"node-abort-controller": "^3.0.1",
53+
"node-cpu-count": "^0.1.1",
4554
"node-fetch": "^2.6.7"
4655
},
4756
"peerDependencies": {

gateway-js/src/datasources/RemoteGraphQLDataSource.ts

Lines changed: 43 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -8,12 +8,17 @@ import { Headers as NodeFetchHeaders, Request as NodeFetchRequest } from 'node-f
88
import { Fetcher, FetcherRequestInit, FetcherResponse } from '@apollo/utils.fetcher';
99
import { GraphQLError, GraphQLErrorExtensions } from 'graphql';
1010
import { GatewayCacheHint, GatewayCachePolicy, GatewayGraphQLRequest, GatewayGraphQLRequestContext, GatewayGraphQLResponse } from '@apollo/server-gateway-interface';
11+
import { MeterProvider } from '@opentelemetry/sdk-metrics';
12+
import { Counter } from '@opentelemetry/api';
1113

1214
export class RemoteGraphQLDataSource<
1315
TContext extends Record<string, any> = Record<string, any>,
1416
> implements GraphQLDataSource<TContext>
1517
{
1618
fetcher: Fetcher;
19+
fetch_request_size_counter: (Counter | undefined);
20+
fetch_response_size_counter: (Counter | undefined);
21+
meterProvider?: MeterProvider
1722

1823
constructor(
1924
config?: Partial<RemoteGraphQLDataSource<TContext>> &
@@ -34,6 +39,15 @@ export class RemoteGraphQLDataSource<
3439
if (config) {
3540
return Object.assign(this, config);
3641
}
42+
if (this.meterProvider) {
43+
const meter = this.meterProvider.getMeter('apollo/gateway');
44+
this.fetch_request_size_counter = meter.createCounter("apollo.gateway.operations.fetch.request_size", {
45+
unit: 'bytes',
46+
});
47+
this.fetch_response_size_counter = meter.createCounter("apollo.gateway.operations.fetch.response_size", {
48+
unit: 'bytes',
49+
});
50+
}
3751
}
3852

3953
url!: string;
@@ -149,7 +163,7 @@ export class RemoteGraphQLDataSource<
149163
request: requestWithoutQuery,
150164
context,
151165
overallCachePolicy,
152-
pathInIncomingRequest
166+
pathInIncomingRequest,
153167
});
154168
}
155169
}
@@ -167,7 +181,7 @@ export class RemoteGraphQLDataSource<
167181
request: requestWithQuery,
168182
context,
169183
overallCachePolicy,
170-
pathInIncomingRequest
184+
pathInIncomingRequest,
171185
});
172186
}
173187

@@ -201,6 +215,9 @@ export class RemoteGraphQLDataSource<
201215
let fetchResponse: FetcherResponse | undefined;
202216

203217
try {
218+
this.fetch_request_size_counter?.add(Buffer.byteLength(stringifiedRequestWithoutHttp), {
219+
"subgraph.name": "foo"
220+
})
204221
// Use our local `fetcher` to allow for fetch injection
205222
// Use the fetcher's `Request` implementation for compatibility
206223
fetchResponse = await this.fetcher(http.url, requestInit);
@@ -215,12 +232,22 @@ export class RemoteGraphQLDataSource<
215232
throw new Error(`Expected JSON response body, but received: ${body}`);
216233
}
217234

235+
this.fetch_response_size_counter?.add(Buffer.byteLength(body.toString()), {
236+
"subgraph.name": "foo"
237+
})
238+
218239
return {
219240
...body,
220241
http: fetchResponse,
221242
};
222243
} catch (error) {
223-
this.didEncounterError(error, fetchRequest, fetchResponse, context, request);
244+
this.didEncounterError(
245+
error,
246+
fetchRequest,
247+
fetchResponse,
248+
context,
249+
request,
250+
);
224251
throw error;
225252
}
226253
}
@@ -234,17 +261,22 @@ export class RemoteGraphQLDataSource<
234261
request,
235262
context,
236263
overallCachePolicy,
237-
pathInIncomingRequest
264+
pathInIncomingRequest,
238265
}: {
239266
response: GatewayGraphQLResponse;
240267
request: GatewayGraphQLRequest;
241268
context: TContext;
242269
overallCachePolicy: GatewayCachePolicy | null;
243-
pathInIncomingRequest?: ResponsePath
270+
pathInIncomingRequest?: ResponsePath;
244271
}): Promise<GatewayGraphQLResponse> {
245272
const processedResponse =
246273
typeof this.didReceiveResponse === 'function'
247-
? await this.didReceiveResponse({ response, request, context, pathInIncomingRequest })
274+
? await this.didReceiveResponse({
275+
response,
276+
request,
277+
context,
278+
pathInIncomingRequest,
279+
})
248280
: response;
249281

250282
if (overallCachePolicy) {
@@ -275,8 +307,11 @@ export class RemoteGraphQLDataSource<
275307

276308
public didReceiveResponse?(
277309
requestContext: Required<
278-
Pick<GatewayGraphQLRequestContext<TContext>, 'request' | 'response' | 'context'>
279-
> & { pathInIncomingRequest?: ResponsePath }
310+
Pick<
311+
GatewayGraphQLRequestContext<TContext>,
312+
'request' | 'response' | 'context'
313+
>
314+
> & { pathInIncomingRequest?: ResponsePath },
280315
): GatewayGraphQLResponse | Promise<GatewayGraphQLResponse>;
281316

282317
public didEncounterError(

gateway-js/src/index.ts

Lines changed: 128 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,7 @@ import {
1616
GraphQLDataSource,
1717
GraphQLDataSourceRequestKind,
1818
} from './datasources/types';
19-
import { RemoteGraphQLDataSource } from './datasources/RemoteGraphQLDataSource';
19+
import { RemoteGraphQLDataSource } from './datasources';
2020
import { getVariableValues } from 'graphql/execution/values';
2121
import {
2222
QueryPlanner,
@@ -47,7 +47,7 @@ import {
4747
requestContextSpanAttributes,
4848
operationContextSpanAttributes,
4949
recordExceptions,
50-
OpenTelemetryAttributeNames
50+
OpenTelemetryAttributeNames, configureOpenTelemetry,
5151
} from './utilities/opentelemetry';
5252
import { addExtensions } from './schema-helper/addExtensions';
5353
import {
@@ -64,7 +64,16 @@ import {
6464
Supergraph,
6565
} from '@apollo/federation-internals';
6666
import { getDefaultLogger } from './logger';
67-
import {GatewayInterface, GatewayUnsubscriber, GatewayGraphQLRequestContext, GatewayExecutionResult} from '@apollo/server-gateway-interface';
67+
import {
68+
GatewayInterface,
69+
GatewayUnsubscriber,
70+
GatewayGraphQLRequestContext,
71+
GatewayExecutionResult,
72+
GatewayGraphQLRequest, GatewayGraphQLResponse,
73+
} from '@apollo/server-gateway-interface';
74+
import * as os from 'node:os';
75+
import { cpuCountSync } from 'node-cpu-count';
76+
import { MeterProvider } from '@opentelemetry/sdk-metrics';
6877

6978
type DataSourceMap = {
7079
[serviceName: string]: { url?: string; dataSource: GraphQLDataSource };
@@ -148,6 +157,7 @@ export class ApolloGateway implements GatewayInterface {
148157
private compositionId?: string;
149158
private state: GatewayState;
150159
private _supergraphManager?: SupergraphManager;
160+
private openTelemetryMeterProvider: MeterProvider;
151161

152162
// Observe query plan, service info, and operation info prior to execution.
153163
// The information made available here will give insight into the resulting
@@ -215,6 +225,8 @@ export class ApolloGateway implements GatewayInterface {
215225

216226
this.validateConfigAndEmitWarnings();
217227

228+
this.openTelemetryMeterProvider = this.initialiseTelemetry();
229+
218230
this.logger.debug('Gateway successfully initialized (but not yet loaded)');
219231
this.state = { phase: 'initialized' };
220232
}
@@ -281,6 +293,74 @@ export class ApolloGateway implements GatewayInterface {
281293
}
282294
}
283295

296+
private initialiseTelemetry() : MeterProvider {
297+
const meterProvider = configureOpenTelemetry();
298+
299+
const os_type = os.type();
300+
const host_arch = os.arch();
301+
const meter = meterProvider.getMeter("apollo/gateway");
302+
303+
// gateway.instance
304+
305+
const instanceGauge = meter.createObservableGauge("gateway.instance", {
306+
"description": "The number of instances of the gateway running"
307+
})
308+
instanceGauge.addCallback((result) => {
309+
result.observe(1, {
310+
"os.type": os_type,
311+
"host.arch": host_arch,
312+
"deployment_type": "gateway"
313+
})
314+
})
315+
316+
// gateway.instance.cpu_freq
317+
const cpuFreqGauge = meter.createObservableGauge("gateway.instance.cpu_freq", {
318+
"description": "The CPU frequency of the underlying instance the router is deployed to",
319+
"unit": "Mhz"
320+
})
321+
cpuFreqGauge.addCallback((result) => {
322+
const cpus = os.cpus();
323+
const average_frequency = os.cpus().map((a) => a.speed).reduce((partialSum, a) => partialSum + a, 0) / cpus.length
324+
result.observe(average_frequency)
325+
})
326+
327+
// gateway.instance.cpu_count
328+
const cpuCountGauge = meter.createObservableGauge("gateway.instance.cpu_count", {
329+
"description": "The number of CPUs reported by the instance the gateway is running on"
330+
})
331+
cpuCountGauge.addCallback((result) => {
332+
result.observe(cpuCountSync(), {
333+
"host.arch": host_arch,
334+
});
335+
})
336+
337+
// gateway.instance.total_memory
338+
const totalMemoryGauge = meter.createObservableGauge("gateway.instance.total_memory", {
339+
"description": "The amount of memory reported by the instance the router is running on",
340+
"unit": "bytes"
341+
})
342+
totalMemoryGauge.addCallback((result) => {
343+
result.observe(os.totalmem(), {
344+
"host.arch": host_arch,
345+
});
346+
})
347+
348+
// gateway.instance.schema
349+
const schemaGauge = meter.createObservableGauge("gateway.instance.schema", {
350+
"description": "Details about the current in-use schema"
351+
})
352+
schemaGauge.addCallback((result) => {
353+
if (this.supergraphSdl) {
354+
const hash = this.getIdForSupergraphSdl(this.supergraphSdl)
355+
result.observe(1, {
356+
schema_hash: hash
357+
})
358+
}
359+
})
360+
361+
return meterProvider
362+
}
363+
284364
public async load(options?: {
285365
apollo?: ApolloConfigFromAS2Or3;
286366
engine?: GraphQLServiceEngineConfig;
@@ -739,6 +819,7 @@ export class ApolloGateway implements GatewayInterface {
739819
? this.config.buildService(serviceDef)
740820
: new RemoteGraphQLDataSource({
741821
url: serviceDef.url,
822+
meterProvider: this.openTelemetryMeterProvider
742823
});
743824
}
744825

@@ -782,6 +863,7 @@ export class ApolloGateway implements GatewayInterface {
782863
public executor = async (
783864
requestContext: GatewayGraphQLRequestContext,
784865
): Promise<GatewayExecutionResult> => {
866+
785867
return tracer.startActiveSpan(
786868
OpenTelemetrySpanNames.REQUEST,
787869
{ attributes: requestContextSpanAttributes(requestContext, this.config.telemetry) },
@@ -874,6 +956,10 @@ export class ApolloGateway implements GatewayInterface {
874956
});
875957
}
876958

959+
this.openTelemetryMeterProvider.getMeter("apollo/gateway").createCounter("apollo.gateway.operations.fetch.request_size", {
960+
unit: "bytes"
961+
}).add(this.calculate_request_size(request));
962+
877963
const response = await executeQueryPlan(
878964
queryPlan,
879965
serviceMap,
@@ -884,6 +970,10 @@ export class ApolloGateway implements GatewayInterface {
884970
this.config.telemetry
885971
);
886972

973+
this.openTelemetryMeterProvider.getMeter("apollo/gateway").createCounter("apollo.gateway.operations.fetch.response_size", {
974+
unit: "bytes"
975+
}).add(this.calculate_response_size(response));
976+
887977
const shouldShowQueryPlan =
888978
this.config.__exposeQueryPlanExperimental &&
889979
request.http &&
@@ -948,6 +1038,39 @@ export class ApolloGateway implements GatewayInterface {
9481038
);
9491039
};
9501040

1041+
private calculate_request_size(request: GatewayGraphQLRequest) {
1042+
let total = 0;
1043+
1044+
if (request.http?.headers) {
1045+
total += Array.from(request.http.headers).reduce((size, headerPair )=> size + Buffer.byteLength(headerPair[0]) + Buffer.byteLength(headerPair[1]), 0);
1046+
}
1047+
if (request.query) {
1048+
total += Buffer.byteLength(request.query);
1049+
}
1050+
if (request.variables) {
1051+
total += Object.entries(request.variables).reduce((accumulator, variable_pair) =>
1052+
accumulator + Buffer.byteLength(variable_pair[0]) + Buffer.byteLength(variable_pair[1].toString())
1053+
, 0);
1054+
}
1055+
1056+
return total
1057+
}
1058+
1059+
private calculate_response_size(response: GatewayGraphQLResponse) {
1060+
let total = 0;
1061+
1062+
if (response.http?.headers) {
1063+
total += Array.from(response.http.headers).reduce((size, headerPair )=> size + Buffer.byteLength(headerPair[0]) + Buffer.byteLength(headerPair[1]), 0);
1064+
}
1065+
if (response.data) {
1066+
total += Object.entries(response.data).reduce((accumulator, data_pair) =>
1067+
accumulator + Buffer.byteLength(data_pair[0]) + Buffer.byteLength(JSON.stringify(data_pair[1]))
1068+
, 0);
1069+
}
1070+
1071+
return total
1072+
}
1073+
9511074
private validateIncomingRequest(
9521075
requestContext: GatewayGraphQLRequestContext,
9531076
operationContext: OperationContext,
@@ -1044,6 +1167,8 @@ export class ApolloGateway implements GatewayInterface {
10441167
queryPlanner: this.queryPlanner,
10451168
};
10461169
}
1170+
1171+
10471172
}
10481173

10491174
ApolloGateway.prototype.onSchemaChange = deprecate(

0 commit comments

Comments
 (0)