Skip to content

Commit 35acdf8

Browse files
authored
feat: don't dial peers that failed before, make dialer use dial queue (#2478)
* make dialer use dial queue * skip undiable peers * cover new cases in tests * expose dialer config at connection manager * update tests with new config * add more tests
1 parent 27292ed commit 35acdf8

File tree

6 files changed

+243
-28
lines changed

6 files changed

+243
-28
lines changed

.cspell.json

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -61,6 +61,7 @@
6161
"ineed",
6262
"IPAM",
6363
"ipfs",
64+
"cooldown",
6465
"iwant",
6566
"jdev",
6667
"jswaku",

packages/core/src/lib/connection_manager/connection_limiter.spec.ts

Lines changed: 20 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -48,7 +48,10 @@ describe("ConnectionLimiter", () => {
4848
const defaultOptions = {
4949
maxBootstrapPeers: 2,
5050
pingKeepAlive: 300,
51-
relayKeepAlive: 300
51+
relayKeepAlive: 300,
52+
maxDialingPeers: 3,
53+
failedDialCooldown: 60,
54+
dialCooldown: 10
5255
};
5356

5457
beforeEach(() => {
@@ -761,7 +764,10 @@ describe("ConnectionLimiter", () => {
761764
const customOptions = {
762765
maxBootstrapPeers: 1,
763766
pingKeepAlive: 300,
764-
relayKeepAlive: 300
767+
relayKeepAlive: 300,
768+
maxDialingPeers: 3,
769+
failedDialCooldown: 60,
770+
dialCooldown: 10
765771
};
766772

767773
connectionLimiter = new ConnectionLimiter({
@@ -834,7 +840,10 @@ describe("ConnectionLimiter", () => {
834840
const customOptions = {
835841
maxBootstrapPeers: 1,
836842
pingKeepAlive: 300,
837-
relayKeepAlive: 300
843+
relayKeepAlive: 300,
844+
maxDialingPeers: 3,
845+
failedDialCooldown: 60,
846+
dialCooldown: 10
838847
};
839848

840849
connectionLimiter = new ConnectionLimiter({
@@ -882,7 +891,10 @@ describe("ConnectionLimiter", () => {
882891
const customOptions = {
883892
maxBootstrapPeers: 1,
884893
pingKeepAlive: 300,
885-
relayKeepAlive: 300
894+
relayKeepAlive: 300,
895+
maxDialingPeers: 3,
896+
failedDialCooldown: 60,
897+
dialCooldown: 10
886898
};
887899

888900
connectionLimiter = new ConnectionLimiter({
@@ -928,7 +940,10 @@ describe("ConnectionLimiter", () => {
928940
const customOptions = {
929941
maxBootstrapPeers: 10,
930942
pingKeepAlive: 300,
931-
relayKeepAlive: 300
943+
relayKeepAlive: 300,
944+
maxDialingPeers: 3,
945+
failedDialCooldown: 60,
946+
dialCooldown: 10
932947
};
933948

934949
connectionLimiter = new ConnectionLimiter({

packages/core/src/lib/connection_manager/connection_manager.ts

Lines changed: 10 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,9 @@ const log = new Logger("connection-manager");
2424
const DEFAULT_MAX_BOOTSTRAP_PEERS_ALLOWED = 1;
2525
const DEFAULT_PING_KEEP_ALIVE_SEC = 5 * 60;
2626
const DEFAULT_RELAY_KEEP_ALIVE_SEC = 5 * 60;
27+
const DEFAULT_MAX_DIALING_PEERS = 3;
28+
const DEFAULT_FAILED_DIAL_COOLDOWN_SEC = 60;
29+
const DEFAULT_DIAL_COOLDOWN_SEC = 10;
2730

2831
type ConnectionManagerConstructorOptions = {
2932
libp2p: Libp2p;
@@ -55,6 +58,9 @@ export class ConnectionManager implements IConnectionManager {
5558
maxBootstrapPeers: DEFAULT_MAX_BOOTSTRAP_PEERS_ALLOWED,
5659
pingKeepAlive: DEFAULT_PING_KEEP_ALIVE_SEC,
5760
relayKeepAlive: DEFAULT_RELAY_KEEP_ALIVE_SEC,
61+
maxDialingPeers: DEFAULT_MAX_DIALING_PEERS,
62+
failedDialCooldown: DEFAULT_FAILED_DIAL_COOLDOWN_SEC,
63+
dialCooldown: DEFAULT_DIAL_COOLDOWN_SEC,
5864
...options.config
5965
};
6066

@@ -74,7 +80,8 @@ export class ConnectionManager implements IConnectionManager {
7480

7581
this.dialer = new Dialer({
7682
libp2p: options.libp2p,
77-
shardReader: this.shardReader
83+
shardReader: this.shardReader,
84+
options: this.options
7885
});
7986

8087
this.discoveryDialer = new DiscoveryDialer({
@@ -97,13 +104,15 @@ export class ConnectionManager implements IConnectionManager {
97104
}
98105

99106
public start(): void {
107+
this.dialer.start();
100108
this.networkMonitor.start();
101109
this.discoveryDialer.start();
102110
this.keepAliveManager.start();
103111
this.connectionLimiter.start();
104112
}
105113

106114
public stop(): void {
115+
this.dialer.stop();
107116
this.networkMonitor.stop();
108117
this.discoveryDialer.stop();
109118
this.keepAliveManager.stop();

packages/core/src/lib/connection_manager/dialer.spec.ts

Lines changed: 131 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
import { PeerId } from "@libp2p/interface";
2-
import { Libp2p } from "@waku/interfaces";
2+
import { ConnectionManagerOptions, Libp2p } from "@waku/interfaces";
33
import { expect } from "chai";
44
import sinon from "sinon";
55

@@ -13,6 +13,7 @@ describe("Dialer", () => {
1313
let mockPeerId: PeerId;
1414
let mockPeerId2: PeerId;
1515
let clock: sinon.SinonFakeTimers;
16+
let mockOptions: ConnectionManagerOptions;
1617

1718
const createMockPeerId = (id: string): PeerId =>
1819
({
@@ -31,10 +32,21 @@ describe("Dialer", () => {
3132
isPeerOnNetwork: sinon.stub().resolves(true)
3233
} as unknown as sinon.SinonStubbedInstance<ShardReader>;
3334

35+
mockOptions = {
36+
maxBootstrapPeers: 1,
37+
pingKeepAlive: 300,
38+
relayKeepAlive: 300,
39+
maxDialingPeers: 3,
40+
failedDialCooldown: 60,
41+
dialCooldown: 10
42+
};
43+
3444
mockPeerId = createMockPeerId("12D3KooWTest1");
3545
mockPeerId2 = createMockPeerId("12D3KooWTest2");
3646

37-
clock = sinon.useFakeTimers();
47+
clock = sinon.useFakeTimers({
48+
now: 1000000000000
49+
});
3850
});
3951

4052
afterEach(() => {
@@ -49,7 +61,8 @@ describe("Dialer", () => {
4961
it("should create dialer with libp2p and shardReader", () => {
5062
dialer = new Dialer({
5163
libp2p,
52-
shardReader: mockShardReader
64+
shardReader: mockShardReader,
65+
options: mockOptions
5366
});
5467

5568
expect(dialer).to.be.instanceOf(Dialer);
@@ -60,7 +73,8 @@ describe("Dialer", () => {
6073
beforeEach(() => {
6174
dialer = new Dialer({
6275
libp2p,
63-
shardReader: mockShardReader
76+
shardReader: mockShardReader,
77+
options: mockOptions
6478
});
6579
});
6680

@@ -98,7 +112,8 @@ describe("Dialer", () => {
98112
beforeEach(() => {
99113
dialer = new Dialer({
100114
libp2p,
101-
shardReader: mockShardReader
115+
shardReader: mockShardReader,
116+
options: mockOptions
102117
});
103118
dialer.start();
104119
});
@@ -135,7 +150,8 @@ describe("Dialer", () => {
135150
beforeEach(() => {
136151
dialer = new Dialer({
137152
libp2p,
138-
shardReader: mockShardReader
153+
shardReader: mockShardReader,
154+
options: mockOptions
139155
});
140156
dialer.start();
141157
});
@@ -152,14 +168,28 @@ describe("Dialer", () => {
152168

153169
it("should add peer to queue when queue is not empty", async () => {
154170
const dialStub = libp2p.dial as sinon.SinonStub;
155-
dialStub.resolves();
156171

157-
void dialer.dial(mockPeerId);
172+
let resolveFirstDial: () => void;
173+
const firstDialPromise = new Promise<void>((resolve) => {
174+
resolveFirstDial = resolve;
175+
});
176+
dialStub.onFirstCall().returns(firstDialPromise);
177+
dialStub.onSecondCall().resolves();
178+
179+
const firstDialCall = dialer.dial(mockPeerId);
158180

159-
dialStub.resetHistory();
160181
await dialer.dial(mockPeerId2);
161182

162-
expect(dialStub.called).to.be.true;
183+
expect(dialStub.calledOnce).to.be.true;
184+
expect(dialStub.calledWith(mockPeerId)).to.be.true;
185+
186+
resolveFirstDial!();
187+
await firstDialCall;
188+
189+
clock.tick(500);
190+
await Promise.resolve();
191+
192+
expect(dialStub.calledTwice).to.be.true;
163193
expect(dialStub.calledWith(mockPeerId2)).to.be.true;
164194
});
165195

@@ -186,7 +216,54 @@ describe("Dialer", () => {
186216
clock.tick(5000);
187217
await dialer.dial(mockPeerId);
188218

189-
expect(dialStub.called).to.be.true;
219+
expect(dialStub.called).to.be.false;
220+
});
221+
222+
it("should skip peer when failed to dial recently", async () => {
223+
const dialStub = libp2p.dial as sinon.SinonStub;
224+
dialStub.rejects(new Error("Dial failed"));
225+
226+
await dialer.dial(mockPeerId);
227+
expect(dialStub.calledOnce).to.be.true;
228+
229+
dialStub.resetHistory();
230+
dialStub.resolves();
231+
232+
clock.tick(30000);
233+
234+
await dialer.dial(mockPeerId);
235+
expect(dialStub.called).to.be.false;
236+
});
237+
238+
it("should populate queue if has active dial", async () => {
239+
const dialStub = libp2p.dial as sinon.SinonStub;
240+
const mockPeerId3 = createMockPeerId("12D3KooWTest3");
241+
242+
let resolveFirstDial: () => void;
243+
const firstDialPromise = new Promise<void>((resolve) => {
244+
resolveFirstDial = resolve;
245+
});
246+
dialStub.onFirstCall().returns(firstDialPromise);
247+
dialStub.onSecondCall().resolves();
248+
dialStub.onThirdCall().resolves();
249+
250+
const firstDialCall = dialer.dial(mockPeerId);
251+
252+
await dialer.dial(mockPeerId2);
253+
await dialer.dial(mockPeerId3);
254+
255+
expect(dialStub.calledOnce).to.be.true;
256+
expect(dialStub.calledWith(mockPeerId)).to.be.true;
257+
258+
resolveFirstDial!();
259+
await firstDialCall;
260+
261+
clock.tick(500);
262+
await Promise.resolve();
263+
264+
expect(dialStub.callCount).to.equal(3);
265+
expect(dialStub.calledWith(mockPeerId2)).to.be.true;
266+
expect(dialStub.calledWith(mockPeerId3)).to.be.true;
190267
});
191268

192269
it("should allow redial after cooldown period", async () => {
@@ -252,13 +329,49 @@ describe("Dialer", () => {
252329
expect(dialStub.calledOnce).to.be.true;
253330
expect(dialStub.calledWith(mockPeerId)).to.be.true;
254331
});
332+
333+
it("should allow redial after failed dial cooldown expires", async () => {
334+
const dialStub = libp2p.dial as sinon.SinonStub;
335+
dialStub.onFirstCall().rejects(new Error("Dial failed"));
336+
dialStub.onSecondCall().resolves();
337+
await dialer.dial(mockPeerId);
338+
expect(dialStub.calledOnce).to.be.true;
339+
clock.tick(60001);
340+
await dialer.dial(mockPeerId);
341+
expect(dialStub.calledTwice).to.be.true;
342+
});
343+
344+
it("should handle queue overflow by adding peers to queue", async () => {
345+
const dialStub = libp2p.dial as sinon.SinonStub;
346+
const peers = [];
347+
for (let i = 0; i < 100; i++) {
348+
peers.push(createMockPeerId(`12D3KooWTest${i}`));
349+
}
350+
let resolveFirstDial: () => void;
351+
const firstDialPromise = new Promise<void>((resolve) => {
352+
resolveFirstDial = resolve;
353+
});
354+
dialStub.onFirstCall().returns(firstDialPromise);
355+
dialStub.resolves();
356+
const firstDialCall = dialer.dial(peers[0]);
357+
for (let i = 1; i < 100; i++) {
358+
await dialer.dial(peers[i]);
359+
}
360+
expect(dialStub.calledOnce).to.be.true;
361+
resolveFirstDial!();
362+
await firstDialCall;
363+
clock.tick(500);
364+
await Promise.resolve();
365+
expect(dialStub.callCount).to.be.greaterThan(1);
366+
});
255367
});
256368

257369
describe("queue processing", () => {
258370
beforeEach(() => {
259371
dialer = new Dialer({
260372
libp2p,
261-
shardReader: mockShardReader
373+
shardReader: mockShardReader,
374+
options: mockOptions
262375
});
263376
dialer.start();
264377
});
@@ -334,7 +447,8 @@ describe("Dialer", () => {
334447
beforeEach(() => {
335448
dialer = new Dialer({
336449
libp2p,
337-
shardReader: mockShardReader
450+
shardReader: mockShardReader,
451+
options: mockOptions
338452
});
339453
dialer.start();
340454
});
@@ -368,7 +482,8 @@ describe("Dialer", () => {
368482
it("should handle complete dial lifecycle", async () => {
369483
dialer = new Dialer({
370484
libp2p,
371-
shardReader: mockShardReader
485+
shardReader: mockShardReader,
486+
options: mockOptions
372487
});
373488
dialer.start();
374489

@@ -386,7 +501,8 @@ describe("Dialer", () => {
386501
it("should handle multiple peers with different shard configurations", async () => {
387502
dialer = new Dialer({
388503
libp2p,
389-
shardReader: mockShardReader
504+
shardReader: mockShardReader,
505+
options: mockOptions
390506
});
391507
dialer.start();
392508

0 commit comments

Comments
 (0)