Skip to content

Commit f313563

Browse files
committed
fix(otel): optimize metrics tracking by eliminating promise chaining overhead
1 parent ab39c57 commit f313563

File tree

3 files changed

+33
-37
lines changed

3 files changed

+33
-37
lines changed

packages/client/lib/client/commands-queue.ts

Lines changed: 16 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@ import { ChannelListeners, PubSub, PubSubCommand, PubSubListener, PubSubType, Pu
66
import { AbortError, ErrorReply, CommandTimeoutDuringMaintenanceError, TimeoutError } from '../errors';
77
import { MonitorCallback } from '.';
88
import { dbgMaintenance } from './enterprise-maintenance-manager';
9+
import { OTelClientAttributes, OTelMetrics } from '../opentelemetry';
910

1011
export interface CommandOptions<T = TypeMapping> {
1112
chainId?: symbol;
@@ -199,23 +200,35 @@ export default class RedisCommandsQueue {
199200

200201
addCommand<T>(
201202
args: ReadonlyArray<RedisArgument>,
202-
options?: CommandOptions
203+
options?: CommandOptions,
204+
otelAttributes?: OTelClientAttributes
203205
): Promise<T> {
204206
if (this.#maxLength && this.#toWrite.length + this.#waitingForReply.length >= this.#maxLength) {
205207
return Promise.reject(new Error('The queue is full'));
206208
} else if (options?.abortSignal?.aborted) {
207209
return Promise.reject(new AbortError());
208210
}
209211

212+
const recordOperation = OTelMetrics.instance.createRecordOperationDuration(args, otelAttributes);
213+
OTelMetrics.instance.recordPendingRequests(1, otelAttributes);
214+
210215
return new Promise((resolve, reject) => {
211216
let node: DoublyLinkedNode<CommandToWrite>;
212217
const value: CommandToWrite = {
213218
args,
214219
chainId: options?.chainId,
215220
abort: undefined,
216221
timeout: undefined,
217-
resolve,
218-
reject,
222+
resolve: reply => {
223+
recordOperation()
224+
OTelMetrics.instance.recordPendingRequests(-1, otelAttributes);
225+
resolve(reply as T);
226+
},
227+
reject: (err) => {
228+
recordOperation(err as Error);
229+
OTelMetrics.instance.recordPendingRequests(-1, otelAttributes);
230+
reject(err);
231+
},
219232
channelsCounter: undefined,
220233
typeMapping: options?.typeMapping
221234
};

packages/client/lib/client/index.ts

Lines changed: 9 additions & 32 deletions
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,7 @@ import { BasicCommandParser, CommandParser } from './parser';
2121
import SingleEntryCache from '../single-entry-cache';
2222
import { version } from '../../package.json'
2323
import EnterpriseMaintenanceManager, { MaintenanceUpdate, MovingEndpointType } from './enterprise-maintenance-manager';
24-
import { OTelClientAttributes, OTelMetrics } from '../opentelemetry';
24+
import { OTelClientAttributes } from '../opentelemetry';
2525

2626
export interface RedisClientOptions<
2727
M extends RedisModules = RedisModules,
@@ -652,7 +652,7 @@ export default class RedisClient<
652652
.addCommand(cmd, {
653653
chainId,
654654
asap
655-
})
655+
}, this._getClientOTelAttributes())
656656
.catch(errorHandler)
657657
);
658658
}
@@ -1076,17 +1076,12 @@ export default class RedisClient<
10761076
args: ReadonlyArray<RedisArgument>,
10771077
options?: CommandOptions
10781078
): Promise<T> {
1079-
const clientAttributes = this._self._getClientOTelAttributes();
1080-
const recordOperation = OTelMetrics.instance.createRecordOperationDuration(args, clientAttributes);
1081-
10821079
if (!this._self.#socket.isOpen) {
1083-
recordOperation(new ClientClosedError());
10841080
return Promise.reject(new ClientClosedError());
10851081
} else if (
10861082
!this._self.#socket.isReady &&
10871083
this._self.#options.disableOfflineQueue
10881084
) {
1089-
recordOperation(new ClientOfflineError());
10901085
return Promise.reject(new ClientOfflineError());
10911086
}
10921087

@@ -1096,29 +1091,11 @@ export default class RedisClient<
10961091
...options,
10971092
};
10981093

1099-
const promise = this._self.#queue.addCommand<T>(args, opts);
1100-
1101-
if (OTelMetrics.isInitialized()) {
1102-
OTelMetrics.instance.recordPendingRequests(1, clientAttributes);
1094+
const promise = this._self.#queue.addCommand<T>(args, opts, this._self._getClientOTelAttributes());
11031095

1104-
const trackedPromise = promise
1105-
.then((reply) => {
1106-
recordOperation();
1107-
OTelMetrics.instance.recordPendingRequests(-1, clientAttributes);
1108-
return reply;
1109-
})
1110-
.catch((err) => {
1111-
recordOperation(err);
1112-
OTelMetrics.instance.recordPendingRequests(-1, clientAttributes);
1113-
throw err;
1114-
});
11151096

1116-
this._self.#scheduleWrite();
1117-
return trackedPromise;
1118-
} else {
1119-
this._self.#scheduleWrite();
1120-
return promise;
1121-
}
1097+
this._self.#scheduleWrite();
1098+
return promise;
11221099
}
11231100

11241101
async SELECT(db: number): Promise<void> {
@@ -1354,20 +1331,20 @@ export default class RedisClient<
13541331
const typeMapping = this._commandOptions?.typeMapping;
13551332
const chainId = Symbol('MULTI Chain');
13561333
const promises = [
1357-
this._self.#queue.addCommand(['MULTI'], { chainId }),
1334+
this._self.#queue.addCommand(['MULTI'], { chainId }, this._self._getClientOTelAttributes()),
13581335
];
13591336

13601337
for (const { args } of commands) {
13611338
promises.push(
13621339
this._self.#queue.addCommand(args, {
13631340
chainId,
13641341
typeMapping
1365-
})
1342+
}, this._self._getClientOTelAttributes())
13661343
);
13671344
}
13681345

13691346
promises.push(
1370-
this._self.#queue.addCommand(['EXEC'], { chainId })
1347+
this._self.#queue.addCommand(['EXEC'], { chainId }, this._self._getClientOTelAttributes())
13711348
);
13721349

13731350
this._self.#scheduleWrite();
@@ -1543,7 +1520,7 @@ export default class RedisClient<
15431520
this._self.#credentialsSubscription = null;
15441521
return this._self.#socket.quit(async () => {
15451522
clearTimeout(this._self.#pingTimer);
1546-
const quitPromise = this._self.#queue.addCommand<string>(['QUIT']);
1523+
const quitPromise = this._self.#queue.addCommand<string>(['QUIT'], undefined, this._self._getClientOTelAttributes());
15471524
this._self.#scheduleWrite();
15481525
return quitPromise;
15491526
});

packages/client/lib/opentelemetry/metrics.spec.ts

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,7 @@ import {
1212
import { spy } from "sinon";
1313

1414
import { OTelMetrics } from "./metrics";
15-
import { METRIC_NAMES, ObservabilityConfig } from "./types";
15+
import { METRIC_NAMES, ObservabilityConfig, OTEL_ATTRIBUTES } from "./types";
1616
import { NOOP_UP_DOWN_COUNTER_METRIC } from "./noop-meter";
1717
import { noopFunction, waitForMetrics } from "./utils";
1818
import testUtils, { GLOBAL } from "../test-utils";
@@ -333,7 +333,13 @@ describe("OTel Metrics E2E", function () {
333333

334334
const dataPoints = metric.dataPoints as DataPoint<Histogram>[];
335335

336-
assert.strictEqual(dataPoints[0].value.count, 3);
336+
const setDataPoint = dataPoints.find(
337+
(dp) => dp.attributes[OTEL_ATTRIBUTES.dbOperationName] === "SET"
338+
);
339+
340+
assert.ok(setDataPoint, "expected SET data point to be present");
341+
342+
assert.strictEqual(setDataPoint.value.count, 3);
337343
},
338344
{
339345
client: GLOBAL.SERVERS.OPEN,

0 commit comments

Comments
 (0)