Skip to content

Commit f8841c8

Browse files
authored
fix(socket): prevent false-ready state when socket errors during handshake (#3128)
* fix(socket): prevent false-ready state when socket errors during handshake Fixes race condition where async socket errors during connection handshake don't trigger reconnection. Validates socket state after initiator completes to catch errors swallowed by command handlers. fixes: #3108 * remove comments
1 parent b1c39fe commit f8841c8

File tree

2 files changed

+98
-1
lines changed

2 files changed

+98
-1
lines changed

packages/client/lib/client/index.spec.ts

Lines changed: 88 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -4,12 +4,16 @@ import RedisClient, { RedisClientOptions, RedisClientType } from '.';
44
import { AbortError, ClientClosedError, ClientOfflineError, ConnectionTimeoutError, DisconnectsClientError, ErrorReply, MultiErrorReply, TimeoutError, WatchError } from '../errors';
55
import { defineScript } from '../lua-script';
66
import { spy, stub } from 'sinon';
7-
import { once } from 'node:events';
7+
import EventEmitter, { once } from 'node:events';
88
import { MATH_FUNCTION, loadMathFunction } from '../commands/FUNCTION_LOAD.spec';
99
import { RESP_TYPES } from '../RESP/decoder';
1010
import { BlobStringReply, NumberReply } from '../RESP/types';
1111
import { SortedSetMember } from '../commands/generic-transformers';
1212
import { CommandParser } from './parser';
13+
import { RedisSocketOptions } from './socket';
14+
import { getFreePortNumber } from '@redis/test-utils/lib/proxy/redis-proxy';
15+
import { createClient } from '../../';
16+
import net from 'node:net'
1317

1418
export const SQUARE_SCRIPT = defineScript({
1519
SCRIPT:
@@ -1008,6 +1012,89 @@ describe('Client', () => {
10081012
}
10091013
}, GLOBAL.SERVERS.OPEN);
10101014
});
1015+
1016+
describe("socket errors during handshake", () => {
1017+
1018+
it("should successfully connect when server accepts connection immediately", async () => {
1019+
const { log, client, teardown } = await setup({}, 0);
1020+
await client.connect();
1021+
assert.deepEqual(["connect", "ready"], log);
1022+
teardown();
1023+
});
1024+
1025+
it("should reconnect after multiple connection drops during handshake", async () => {
1026+
const { log, client, teardown } = await setup({}, 2);
1027+
await client.connect();
1028+
assert.deepEqual(
1029+
[
1030+
"connect",
1031+
"error",
1032+
"reconnecting",
1033+
"connect",
1034+
"error",
1035+
"reconnecting",
1036+
"connect",
1037+
"ready",
1038+
],
1039+
log,
1040+
);
1041+
teardown();
1042+
});
1043+
1044+
async function setup(
1045+
socketOptions: Partial<RedisSocketOptions>,
1046+
dropCount: number,
1047+
) {
1048+
const port = await getFreePortNumber();
1049+
const server = setupMockServer(dropCount);
1050+
const options = {
1051+
...{
1052+
socket: {
1053+
host: "localhost",
1054+
port,
1055+
},
1056+
...socketOptions,
1057+
},
1058+
};
1059+
const client = createClient(options);
1060+
const log = setupLog(client);
1061+
await once(server.listen(port), "listening");
1062+
return {
1063+
log,
1064+
client,
1065+
server,
1066+
teardown: async function () {
1067+
client.destroy();
1068+
server.close();
1069+
},
1070+
};
1071+
}
1072+
1073+
function setupLog(client: EventEmitter): string[] {
1074+
const log: string[] = [];
1075+
client.on("connect", () => log.push("connect"));
1076+
client.on("ready", () => log.push("ready"));
1077+
client.on("reconnecting", () => log.push("reconnecting"));
1078+
client.on("error", () => log.push("error"));
1079+
return log;
1080+
}
1081+
1082+
// Create a TCP server that accepts connections but immediately drops them <dropImmediately> times
1083+
// This simulates what happens when Docker container is stopped:
1084+
// - TCP connection succeeds (OS accepts it)
1085+
// - But socket is immediately destroyed, causing ECONNRESET during handshake
1086+
function setupMockServer(dropImmediately: number) {
1087+
const server = net.createServer(async (socket) => {
1088+
if (dropImmediately > 0) {
1089+
dropImmediately--;
1090+
socket.destroy();
1091+
}
1092+
socket.write("+OK\r\n+OK\r\n");
1093+
});
1094+
return server;
1095+
}
1096+
1097+
});
10111098
});
10121099

10131100
/**

packages/client/lib/client/socket.ts

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -220,6 +220,15 @@ export default class RedisSocket extends EventEmitter {
220220

221221
try {
222222
await this.#initiator();
223+
224+
// Check if socket was closed/destroyed during initiator execution
225+
if (!this.#socket || this.#socket.destroyed || !this.#socket.readable || !this.#socket.writable) {
226+
const retryIn = this.#shouldReconnect(retries++, new SocketClosedUnexpectedlyError());
227+
if (typeof retryIn !== 'number') { throw retryIn; }
228+
await setTimeout(retryIn);
229+
this.emit('reconnecting');
230+
continue;
231+
}
223232
} catch (err) {
224233
this.#socket.destroy();
225234
this.#socket = undefined;
@@ -312,6 +321,7 @@ export default class RedisSocket extends EventEmitter {
312321
});
313322
}
314323

324+
315325
write(iterable: Iterable<ReadonlyArray<RedisArgument>>) {
316326
if (!this.#socket) return;
317327

0 commit comments

Comments
 (0)