Skip to content

Commit 842242b

Browse files
committed
implement timeout relaxation
1 parent 78491fe commit 842242b

File tree

6 files changed

+143
-28
lines changed

6 files changed

+143
-28
lines changed

packages/client/lib/client/commands-queue.ts

Lines changed: 71 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -3,9 +3,10 @@ import encodeCommand from '../RESP/encoder';
33
import { Decoder, PUSH_TYPE_MAPPING, RESP_TYPES } from '../RESP/decoder';
44
import { TypeMapping, ReplyUnion, RespVersions, RedisArgument } from '../RESP/types';
55
import { ChannelListeners, PubSub, PubSubCommand, PubSubListener, PubSubType, PubSubTypeListeners } from './pub-sub';
6-
import { AbortError, ErrorReply, TimeoutError } from '../errors';
6+
import { AbortError, ErrorReply, TimeoutDuringMaintanance, TimeoutError } from '../errors';
77
import { MonitorCallback } from '.';
88
import EventEmitter from 'events';
9+
import assert from 'assert';
910

1011
export interface CommandOptions<T = TypeMapping> {
1112
chainId?: symbol;
@@ -31,6 +32,7 @@ export interface CommandToWrite extends CommandWaitingForReply {
3132
timeout: {
3233
signal: AbortSignal;
3334
listener: () => unknown;
35+
originalTimeout: number | undefined;
3436
} | undefined;
3537
}
3638

@@ -62,6 +64,50 @@ export default class RedisCommandsQueue {
6264
readonly #pubSub = new PubSub();
6365
readonly events = new EventEmitter();
6466

67+
// If this value is set, we are in a maintenance mode.
68+
// This means any existing commands should have their timeout
69+
// overwritten to the new timeout. And all new commands should
70+
// have their timeout set as the new timeout.
71+
#maintenanceCommandTimeout: number | undefined
72+
73+
setMaintenanceCommandTimeout(ms: number | undefined) {
74+
// Prevent possible api misuse
75+
if (this.#maintenanceCommandTimeout === ms) return;
76+
77+
this.#maintenanceCommandTimeout = ms;
78+
79+
// Overwrite timeouts of all eligible toWrite commands
80+
this.#toWrite.forEachNode(node => {
81+
const command = node.value;
82+
83+
// If the command didnt have a timeout, skip it
84+
if (!command.timeout) return;
85+
86+
// Remove existing timeout listener
87+
RedisCommandsQueue.#removeTimeoutListener(command)
88+
89+
//TODO see if this is needed
90+
// // Keep a flag to know if we were in maintenance at this point in time.
91+
// // To be used in the timeout listener, which needs to know which exact error to use.
92+
// const wasMaintenance = !!this.#maintenanceCommandTimeout
93+
94+
// Determine newTimeout
95+
const newTimeout = this.#maintenanceCommandTimeout ?? command.timeout?.originalTimeout;
96+
assert(newTimeout !== undefined, 'Trying to reset timeout to `undefined`')
97+
98+
const signal = AbortSignal.timeout(newTimeout);
99+
command.timeout = {
100+
signal,
101+
listener: () => {
102+
this.#toWrite.remove(node);
103+
command.reject(this.#maintenanceCommandTimeout ? new TimeoutDuringMaintanance(newTimeout) : new TimeoutError());
104+
},
105+
originalTimeout: command.timeout.originalTimeout
106+
};
107+
signal.addEventListener('abort', command.timeout.listener, { once: true });
108+
});
109+
}
110+
65111
get isPubSubActive() {
66112
return this.#pubSub.isActive;
67113
}
@@ -139,7 +185,16 @@ export default class RedisCommandsQueue {
139185
case 'MOVING': {
140186
const [_, afterMs, url] = push;
141187
const [host, port] = url.toString().split(':');
142-
this.events.emit('moving', afterMs, host, Number(port))
188+
this.events.emit('moving', afterMs, host, Number(port));
189+
break;
190+
}
191+
case 'MIGRATING': {
192+
console.log('GOT MIGRATING', push.map(p => p.toString()));
193+
this.events.emit('migrating');
194+
break;
195+
}
196+
case 'MIGRATED': {
197+
this.events.emit('migrated');
143198
break;
144199
}
145200
}
@@ -187,15 +242,25 @@ export default class RedisCommandsQueue {
187242
typeMapping: options?.typeMapping
188243
};
189244

190-
const timeout = options?.timeout;
245+
// If #commandTimeout was explicitly set, this
246+
// means we are in maintenance mode and should
247+
// use it instead of the timeout provided by the command
248+
const timeout = this.#maintenanceCommandTimeout || options?.timeout
191249
if (timeout) {
250+
251+
//TODO see if this is needed
252+
// // Keep a flag to know if we were in maintenance at this point in time.
253+
// // To be used in the timeout listener, which needs to know which exact error to use.
254+
// const wasMaintenance = !!this.#maintenanceCommandTimeout
255+
192256
const signal = AbortSignal.timeout(timeout);
193257
value.timeout = {
194258
signal,
195259
listener: () => {
196260
this.#toWrite.remove(node);
197-
value.reject(new TimeoutError());
198-
}
261+
value.reject(this.#maintenanceCommandTimeout ? new TimeoutDuringMaintanance(timeout) : new TimeoutError());
262+
},
263+
originalTimeout: options?.timeout
199264
};
200265
signal.addEventListener('abort', value.timeout.listener, { once: true });
201266
}
@@ -451,7 +516,7 @@ export default class RedisCommandsQueue {
451516
}
452517

453518
static #removeTimeoutListener(command: CommandToWrite) {
454-
command.timeout!.signal.removeEventListener('abort', command.timeout!.listener);
519+
command.timeout?.signal.removeEventListener('abort', command.timeout!.listener);
455520
}
456521

457522
static #flushToWrite(toBeSent: CommandToWrite, err: Error) {

packages/client/lib/client/enterprise-maintenance-manager.ts

Lines changed: 35 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -4,17 +4,16 @@ import RedisCommandsQueue from "./commands-queue";
44
import RedisSocket from "./socket";
55

66
export default class EnterpriseMaintenanceManager extends EventEmitter {
7-
commandsQueue: RedisCommandsQueue;
8-
options: RedisClientOptions;
9-
constructor(
10-
commandsQueue: RedisCommandsQueue,
11-
options: RedisClientOptions,
12-
) {
7+
#commandsQueue: RedisCommandsQueue;
8+
#options: RedisClientOptions;
9+
constructor(commandsQueue: RedisCommandsQueue, options: RedisClientOptions) {
1310
super();
14-
this.commandsQueue = commandsQueue;
15-
this.options = options;
11+
this.#commandsQueue = commandsQueue;
12+
this.#options = options;
1613

17-
this.commandsQueue.events.on("moving", this.#onMoving);
14+
this.#commandsQueue.events.on("moving", this.#onMoving);
15+
this.#commandsQueue.events.on("migrating", this.#onMigrating);
16+
this.#commandsQueue.events.on("migrated", this.#onMigrated);
1817
}
1918

2019
// Queue:
@@ -36,21 +35,44 @@ export default class EnterpriseMaintenanceManager extends EventEmitter {
3635
): Promise<void> => {
3736
// 1 [EVENT] MOVING PN received
3837
// 2 [ACTION] Pause writing
39-
this.emit('pause')
38+
this.emit("pause");
4039

4140
const newSocket = new RedisSocket({
42-
...this.options.socket,
41+
...this.#options.socket,
4342
host,
4443
port,
4544
});
45+
//todo
46+
newSocket.setMaintenanceTimeout();
4647
await newSocket.connect();
4748
// 3 [EVENT] New socket connected
4849

49-
await this.commandsQueue.waitForInflightCommandsToComplete();
50+
await this.#commandsQueue.waitForInflightCommandsToComplete();
5051
// 4 [EVENT] In-flight commands completed
5152

5253
// 5 + 6
53-
this.emit('resume', newSocket);
54+
this.emit("resume", newSocket);
55+
};
5456

57+
#onMigrating = async () => {
58+
this.#commandsQueue.setMaintenanceCommandTimeout(this.#getCommandTimeout());
59+
this.emit("maintenance", this.#getSocketTimeout());
5560
};
61+
62+
#onMigrated = async () => {
63+
this.#commandsQueue.setMaintenanceCommandTimeout(undefined);
64+
this.emit("maintenance", undefined);
65+
}
66+
67+
#getSocketTimeout(): number | undefined {
68+
return this.#options.gracefulMaintenance?.handleTimeouts === "error"
69+
? this.#options.socket?.socketTimeout
70+
: this.#options.gracefulMaintenance?.handleTimeouts;
71+
}
72+
73+
#getCommandTimeout(): number | undefined {
74+
return this.#options.gracefulMaintenance?.handleTimeouts === "error"
75+
? this.#options.commandOptions?.timeout
76+
: this.#options.gracefulMaintenance?.handleTimeouts;
77+
}
5678
}

packages/client/lib/client/index.ts

Lines changed: 4 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -179,9 +179,9 @@ export interface RedisClientOptions<
179179
*/
180180
handleFailedCommands: 'exception' | 'retry',
181181
/**
182-
* Specify whether we should throw a MaintenanceTimeout exception or provide more relaxed timeout, in order to minimize command timeouts during maintenance.
182+
* Specify whether we should throw a TimeoutDuringMaintanance exception or provide more relaxed timeout, in order to minimize command timeouts during maintenance.
183183
*/
184-
handleTimeouts: 'exception' | number,
184+
handleTimeouts: 'error' | number,
185185
}
186186
}
187187

@@ -461,10 +461,6 @@ export default class RedisClient<
461461
return this._self.#dirtyWatch !== undefined
462462
}
463463

464-
#pauseForMaintenance() {
465-
this._self.#pausedForMaintenance = true;
466-
}
467-
468464
#resumeFromMaintenance(newSocket: RedisSocket) {
469465
this._self.#socket.removeAllListeners();
470466
this._self.#socket.destroy();
@@ -492,8 +488,9 @@ export default class RedisClient<
492488

493489
if(options?.gracefulMaintenance) {
494490
new EnterpriseMaintenanceManager(this.#queue, this.#options!)
495-
.on('pause', this.#pauseForMaintenance.bind(this))
491+
.on('pause', () => this._self.#pausedForMaintenance = true )
496492
.on('resume', this.#resumeFromMaintenance.bind(this))
493+
.on('maintenance', (mtm: number | undefined) => this._self.#socket.setMaintenanceTimeout(mtm))
497494
}
498495

499496
if (options?.clientSideCache) {

packages/client/lib/client/linked-list.ts

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -111,6 +111,15 @@ export class DoublyLinkedList<T> {
111111
node = node.next;
112112
}
113113
}
114+
115+
forEachNode(fn: (node: DoublyLinkedNode<T>) => void) {
116+
let node = this.#head;
117+
while(node) {
118+
fn(node);
119+
node = node.next;
120+
}
121+
}
122+
114123
}
115124

116125
export interface SinglyLinkedNode<T> {

packages/client/lib/client/socket.ts

Lines changed: 17 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
import { EventEmitter, once } from 'node:events';
22
import net from 'node:net';
33
import tls from 'node:tls';
4-
import { ConnectionTimeoutError, ClientClosedError, SocketClosedUnexpectedlyError, ReconnectStrategyError, SocketTimeoutError } from '../errors';
4+
import { ConnectionTimeoutError, ClientClosedError, SocketClosedUnexpectedlyError, ReconnectStrategyError, SocketTimeoutError, TimeoutDuringMaintanance } from '../errors';
55
import { setTimeout } from 'node:timers/promises';
66
import { RedisArgument } from '../RESP/types';
77

@@ -57,6 +57,8 @@ export default class RedisSocket extends EventEmitter {
5757
readonly #socketFactory;
5858
readonly #socketTimeout;
5959

60+
#maintenanceTimeout: number | undefined;
61+
6062
#socket?: net.Socket | tls.TLSSocket;
6163

6264
#isOpen = false;
@@ -234,6 +236,16 @@ export default class RedisSocket extends EventEmitter {
234236
} while (this.#isOpen && !this.#isReady);
235237
}
236238

239+
setMaintenanceTimeout(ms?: number) {
240+
this.#maintenanceTimeout = ms;
241+
242+
if(ms !== undefined) {
243+
this.#socket?.setTimeout(ms);
244+
} else if (this.#socketTimeout !== undefined) {
245+
this.#socket?.setTimeout(this.#socketTimeout);
246+
}
247+
}
248+
237249
async #createSocket(): Promise<net.Socket | tls.TLSSocket> {
238250
const socket = this.#socketFactory.create();
239251

@@ -256,7 +268,10 @@ export default class RedisSocket extends EventEmitter {
256268

257269
if (this.#socketTimeout) {
258270
socket.once('timeout', () => {
259-
socket.destroy(new SocketTimeoutError(this.#socketTimeout!));
271+
const error = this.#maintenanceTimeout
272+
? new TimeoutDuringMaintanance(this.#socketTimeout!)
273+
: new SocketTimeoutError(this.#socketTimeout!)
274+
socket.destroy(error);
260275
});
261276
socket.setTimeout(this.#socketTimeout);
262277
}

packages/client/lib/errors.ts

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -76,6 +76,13 @@ export class BlobError extends ErrorReply {}
7676

7777
export class TimeoutError extends Error {}
7878

79+
export class TimeoutDuringMaintanance extends Error {
80+
constructor(timeout: number) {
81+
super(`Socket timeout during maintenance. Expecting data, but didn't receive any in ${timeout}ms.`);
82+
}
83+
}
84+
85+
7986
export class MultiErrorReply extends ErrorReply {
8087
replies: Array<ErrorReply>;
8188
errorIndexes: Array<number>;

0 commit comments

Comments
 (0)