Skip to content

Commit c267ede

Browse files
authored
feat: Add plumbing to support unary calls for client side metric collection (#1631)
## Description This PR adds source code changes with plumbing so that client side metric collection can be done for unary calls. No client side metrics are actually collected yet for readModifyWriteRow or checkAndMutate calls, but future PRs will use this plumbing to collect metrics for those calls. A test is provided that makes a fake method on the table to demonstrate how a method would collect these metrics to serve as an example. ## Impact Doesn't change the way the client library works. Just provides support for when we open a PR that collects client side metrics for unary calls. ## Testing A unit test is created that adds a fake method to a table that does a basic unary readModifyWriteRow call. It ensures that the metrics make it to the test metrics handler correctly. ## Additional Information **Changes:** `src/interceptor.ts`: This file contains code for attaching interceptors to a call that will call the correct metrics collector methods when data arrives `src/client-side-metrics/operation-metrics-collector.ts`: The types are cleaned up in this file. None of the functionality has changed in this file `system-test/read-modify-write-row-interceptors.ts`: This adds a test that ensures the plumbing for collecting metadata trailers and headers works as intended for unary calls.
1 parent ca490e8 commit c267ede

File tree

2 files changed

+380
-0
lines changed

2 files changed

+380
-0
lines changed

src/interceptor.ts

Lines changed: 86 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,86 @@
1+
// Copyright 2025 Google LLC
2+
//
3+
// Licensed under the Apache License, Version 2.0 (the "License");
4+
// you may not use this file except in compliance with the License.
5+
// You may obtain a copy of the License at
6+
//
7+
// https://www.apache.org/licenses/LICENSE-2.0
8+
//
9+
// Unless required by applicable law or agreed to in writing, software
10+
// distributed under the License is distributed on an "AS IS" BASIS,
11+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
// See the License for the specific language governing permissions and
13+
// limitations under the License.
14+
15+
import {CallOptions} from 'google-gax';
16+
import {OperationMetricsCollector} from './client-side-metrics/operation-metrics-collector';
17+
18+
// Mock Server Implementation
19+
import * as grpcJs from '@grpc/grpc-js';
20+
import {status as GrpcStatus} from '@grpc/grpc-js';
21+
22+
export type ServerStatus = {
23+
metadata: {internalRepr: Map<string, Uint8Array[]>; options: {}};
24+
code: number;
25+
details: string;
26+
};
27+
28+
// Helper to create interceptor provider for OperationMetricsCollector
29+
function createMetricsInterceptorProvider(
30+
collector: OperationMetricsCollector,
31+
) {
32+
return (options: grpcJs.InterceptorOptions, nextCall: grpcJs.NextCall) => {
33+
// savedReceiveMetadata and savedReceiveStatus are not strictly needed here anymore for the interceptor's own state
34+
// OperationStart and AttemptStart will be called by the calling code (`fakeReadModifyWriteRow`)
35+
return new grpcJs.InterceptingCall(nextCall(options), {
36+
start: (metadata, listener, next) => {
37+
// AttemptStart is called by the orchestrating code
38+
const newListener: grpcJs.Listener = {
39+
onReceiveMetadata: (metadata, nextMd) => {
40+
collector.onMetadataReceived(
41+
metadata as unknown as {
42+
internalRepr: Map<string, string[]>;
43+
options: {};
44+
},
45+
);
46+
nextMd(metadata);
47+
},
48+
onReceiveStatus: (status, nextStat) => {
49+
collector.onStatusMetadataReceived(
50+
status as unknown as ServerStatus,
51+
);
52+
nextStat(status);
53+
},
54+
};
55+
next(metadata, newListener);
56+
},
57+
});
58+
};
59+
}
60+
61+
export function withInterceptors(
62+
gaxOptions: CallOptions,
63+
metricsCollector?: OperationMetricsCollector,
64+
) {
65+
if (metricsCollector) {
66+
const interceptor = createMetricsInterceptorProvider(metricsCollector);
67+
if (!gaxOptions.otherArgs) {
68+
gaxOptions.otherArgs = {};
69+
}
70+
if (!gaxOptions.otherArgs.options) {
71+
gaxOptions.otherArgs.options = {};
72+
}
73+
if (!gaxOptions.otherArgs.options.interceptors) {
74+
gaxOptions.otherArgs.options.interceptors = [interceptor];
75+
} else {
76+
if (Array.isArray(gaxOptions.otherArgs.options.interceptors)) {
77+
// We check that interceptors is an array so that the code has no
78+
// chance of throwing an error.
79+
// Then, if the interceptors is an array, make sure it also includes the
80+
// client side metrics interceptor.
81+
gaxOptions.otherArgs.options.interceptors.push(interceptor);
82+
}
83+
}
84+
}
85+
return gaxOptions;
86+
}
Lines changed: 294 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,294 @@
1+
// Copyright 2025 Google LLC
2+
//
3+
// Licensed under the Apache License, Version 2.0 (the "License");
4+
// you may not use this file except in compliance with the License.
5+
// You may obtain a copy of the License at
6+
//
7+
// https://www.apache.org/licenses/LICENSE-2.0
8+
//
9+
// Unless required by applicable law or agreed to in writing, software
10+
// distributed under the License is distributed on an "AS IS" BASIS,
11+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
// See the License for the specific language governing permissions and
13+
// limitations under the License.
14+
15+
import {describe, it, before, after} from 'mocha';
16+
import {Bigtable} from '../src';
17+
import {ServiceError} from 'google-gax';
18+
import {ClientSideMetricsConfigManager} from '../src/client-side-metrics/metrics-config-manager';
19+
import {TestMetricsHandler} from '../test-common/test-metrics-handler';
20+
import {
21+
OnAttemptCompleteData,
22+
OnOperationCompleteData,
23+
} from '../src/client-side-metrics/metrics-handler';
24+
import {OperationMetricsCollector} from '../src/client-side-metrics/operation-metrics-collector';
25+
import {
26+
MethodName,
27+
StreamingState,
28+
} from '../src/client-side-metrics/client-side-metrics-attributes';
29+
import * as assert from 'assert';
30+
import {status as GrpcStatus} from '@grpc/grpc-js';
31+
import {withInterceptors} from '../src/interceptor';
32+
33+
const INSTANCE_ID = 'isolated-rmw-instance';
34+
const TABLE_ID = 'isolated-rmw-table';
35+
const ZONE = 'us-central1-a';
36+
const CLUSTER = 'fake-cluster';
37+
const COLUMN_FAMILY = 'traits';
38+
const COLUMN_FAMILIES = [COLUMN_FAMILY];
39+
const ROW_KEY = 'gwashington';
40+
const COLUMN = 'teeth';
41+
42+
/**
43+
* Creates a Bigtable instance if it does not already exist.
44+
*
45+
* @param bigtable - The Bigtable client.
46+
* @param instanceId - The ID of the instance to create.
47+
* @param clusterId - The ID of the initial cluster in the instance.
48+
* @param locationId - The location (region) for the initial cluster.
49+
* @returns The created instance object if successful, otherwise logs a message and returns the existing instance.
50+
*/
51+
async function createInstance(
52+
bigtable: Bigtable,
53+
instanceId: string,
54+
clusterId: string,
55+
locationId: string,
56+
) {
57+
const instance = bigtable.instance(instanceId);
58+
59+
const [exists] = await instance.exists();
60+
if (exists) {
61+
console.log(`Instance ${instanceId} already exists.`);
62+
return instance;
63+
}
64+
65+
const [i, operation] = await instance.create({
66+
clusters: [
67+
{
68+
id: clusterId,
69+
location: locationId,
70+
nodes: 3,
71+
},
72+
],
73+
labels: {
74+
time_created: Date.now(),
75+
},
76+
});
77+
await operation.promise();
78+
console.log(`Created instance ${instanceId}`);
79+
return i;
80+
}
81+
82+
/**
83+
* Creates a Bigtable table if it does not already exist.
84+
*
85+
* @param bigtable - The Bigtable client.
86+
* @param instanceId - The ID of the instance containing the table.
87+
* @param tableId - The ID of the table to create.
88+
* @param families - An array of column family names to create in the table.
89+
* @returns A promise that resolves with the created Table object.
90+
*/
91+
async function createTable(
92+
bigtable: Bigtable,
93+
instanceId: string,
94+
tableId: string,
95+
families: string[],
96+
) {
97+
const instance = bigtable.instance(instanceId);
98+
const table = instance.table(tableId);
99+
100+
const [exists] = await table.exists();
101+
if (exists) {
102+
console.log(`Table ${tableId} already exists.`);
103+
return table;
104+
}
105+
106+
const [t] = await table.create({
107+
families: families,
108+
});
109+
const row = table.row(ROW_KEY);
110+
await row.save({
111+
[COLUMN_FAMILY]: {
112+
[COLUMN]: 'shiny',
113+
},
114+
});
115+
console.log(`Created table ${tableId}`);
116+
return t;
117+
}
118+
119+
/**
120+
* Creates and returns a TestMetricsHandler instance for testing purposes.
121+
*
122+
* @returns A TestMetricsHandler instance with the projectId set to 'test-project-id'.
123+
*/
124+
function getTestMetricsHandler() {
125+
const testMetricsHandler = new TestMetricsHandler();
126+
testMetricsHandler.projectId = 'test-project-id';
127+
return testMetricsHandler;
128+
}
129+
130+
/**
131+
* Asynchronously retrieves the project ID associated with the Bigtable client.
132+
*
133+
* @param bigtable - The Bigtable client instance.
134+
* @returns A promise that resolves with the project ID as a string.
135+
* @throws An error if the project ID cannot be retrieved.
136+
*/
137+
async function getProjectIdFromClient(bigtable: Bigtable): Promise<string> {
138+
return new Promise((resolve, reject) => {
139+
bigtable.getProjectId_((err, projectId) => {
140+
if (err) {
141+
reject(err);
142+
} else {
143+
resolve(projectId!);
144+
}
145+
});
146+
});
147+
}
148+
149+
describe('Bigtable/ReadModifyWriteRowInterceptorMetrics', () => {
150+
let bigtable: Bigtable;
151+
let testMetricsHandler: TestMetricsHandler;
152+
153+
before(async () => {
154+
bigtable = new Bigtable();
155+
await getProjectIdFromClient(bigtable);
156+
await createInstance(bigtable, INSTANCE_ID, CLUSTER, ZONE);
157+
await createTable(bigtable, INSTANCE_ID, TABLE_ID, COLUMN_FAMILIES);
158+
testMetricsHandler = getTestMetricsHandler();
159+
bigtable._metricsConfigManager = new ClientSideMetricsConfigManager([
160+
testMetricsHandler,
161+
]);
162+
});
163+
164+
after(async () => {
165+
const instance = bigtable.instance(INSTANCE_ID);
166+
await instance.delete();
167+
});
168+
169+
it('should record and export correct metrics for ReadModifyWriteRow via interceptors', async () => {
170+
const instance = bigtable.instance(INSTANCE_ID);
171+
172+
const table = instance.table(TABLE_ID);
173+
174+
/*
175+
fakeReadModifyWriteRowMethod is just a fake method on a table that makes a
176+
call to the readWriteModifyRow grpc endpoint. It demonstrates what a method
177+
might look like when trying to make a unary call while extracting
178+
information from the headers and trailers that the server returns so that
179+
the extracted information can be recorded in client side metrics.
180+
*/
181+
(table as any).fakeReadModifyWriteRowMethod = async () => {
182+
// 1. Create a metrics collector.
183+
const metricsCollector = new OperationMetricsCollector(
184+
table,
185+
MethodName.READ_MODIFY_WRITE_ROW,
186+
StreamingState.UNARY,
187+
(table as any).bigtable._metricsConfigManager!.metricsHandlers,
188+
);
189+
// 2. Tell the metrics collector an attempt has been started.
190+
metricsCollector.onOperationStart();
191+
metricsCollector.onAttemptStart();
192+
// 3. Make a unary call with gax options that include interceptors. The
193+
// interceptors are built from a method that hooks them up to the
194+
// metrics collector
195+
const responseArray = await new Promise((resolve, reject) => {
196+
bigtable.request(
197+
{
198+
client: 'BigtableClient',
199+
method: 'readModifyWriteRow',
200+
reqOpts: {
201+
tableName: table.name,
202+
rowKey: Buffer.from(ROW_KEY),
203+
rules: [
204+
{
205+
familyName: COLUMN_FAMILY,
206+
columnQualifier: Buffer.from(COLUMN),
207+
appendValue: Buffer.from('-wood'),
208+
},
209+
],
210+
appProfileId: undefined,
211+
},
212+
gaxOpts: withInterceptors({}, metricsCollector),
213+
},
214+
(err: ServiceError | null, resp?: any) => {
215+
if (err) {
216+
reject(err);
217+
} else {
218+
resolve(resp);
219+
}
220+
},
221+
);
222+
});
223+
// 4. Tell the metrics collector the attempt is over
224+
metricsCollector.onAttemptComplete(GrpcStatus.OK);
225+
metricsCollector.onOperationComplete(GrpcStatus.OK);
226+
// 5. Return results of method call to the user
227+
return responseArray;
228+
};
229+
230+
await (table as any).fakeReadModifyWriteRowMethod();
231+
232+
assert.strictEqual(testMetricsHandler.requestsHandled.length, 2);
233+
234+
const attemptCompleteData = testMetricsHandler.requestsHandled.find(
235+
m => (m as {attemptLatency?: number}).attemptLatency !== undefined,
236+
) as OnAttemptCompleteData | undefined;
237+
const operationCompleteData = testMetricsHandler.requestsHandled.find(
238+
m => (m as {operationLatency?: number}).operationLatency !== undefined,
239+
) as OnOperationCompleteData | undefined;
240+
241+
assert.ok(attemptCompleteData, 'OnAttemptCompleteData should be present');
242+
assert.ok(
243+
operationCompleteData,
244+
'OnOperationCompleteData should be present',
245+
);
246+
if (!attemptCompleteData || !operationCompleteData) {
247+
throw new Error('Metrics data is missing'); // Should be caught by asserts above
248+
}
249+
assert.strictEqual(
250+
attemptCompleteData.metricsCollectorData.method,
251+
MethodName.READ_MODIFY_WRITE_ROW,
252+
);
253+
assert.strictEqual(attemptCompleteData.status, '0');
254+
assert.strictEqual(
255+
attemptCompleteData.metricsCollectorData.table,
256+
TABLE_ID,
257+
);
258+
assert.strictEqual(
259+
attemptCompleteData.metricsCollectorData.instanceId,
260+
INSTANCE_ID,
261+
);
262+
assert.ok(attemptCompleteData.attemptLatency >= 0);
263+
assert(attemptCompleteData.serverLatency);
264+
assert.ok(attemptCompleteData.serverLatency >= 0);
265+
assert.strictEqual(attemptCompleteData.metricsCollectorData.zone, ZONE);
266+
assert.strictEqual(
267+
attemptCompleteData.metricsCollectorData.cluster,
268+
CLUSTER,
269+
);
270+
assert.strictEqual(attemptCompleteData.streaming, StreamingState.UNARY);
271+
272+
assert.strictEqual(
273+
operationCompleteData.metricsCollectorData.method,
274+
MethodName.READ_MODIFY_WRITE_ROW,
275+
);
276+
assert.strictEqual(operationCompleteData.status, '0');
277+
assert.strictEqual(
278+
operationCompleteData.metricsCollectorData.table,
279+
TABLE_ID,
280+
);
281+
assert.strictEqual(
282+
operationCompleteData.metricsCollectorData.instanceId,
283+
INSTANCE_ID,
284+
);
285+
assert.ok(operationCompleteData.operationLatency >= 0);
286+
assert.strictEqual(operationCompleteData.retryCount, 0);
287+
assert.strictEqual(operationCompleteData.metricsCollectorData.zone, ZONE);
288+
assert.strictEqual(
289+
operationCompleteData.metricsCollectorData.cluster,
290+
CLUSTER,
291+
);
292+
assert.strictEqual(operationCompleteData.streaming, StreamingState.UNARY);
293+
});
294+
});

0 commit comments

Comments
 (0)