Skip to content

Commit 464e16e

Browse files
paschal533dozyio
andauthored
fix(registrar): ensure onDisconnect respects notifyOnLimitedConnection (#3376)
* fix(registrar): ensure onDisconnect respects notifyOnLimitedConnection * fix: apply filter check to _onPeerUpdate disconnect notifications and added test --------- Co-authored-by: dozyio <37986489+dozyio@users.noreply.github.com>
1 parent 28af2ff commit 464e16e

File tree

2 files changed

+190
-4
lines changed

2 files changed

+190
-4
lines changed

packages/libp2p/src/registrar.ts

Lines changed: 10 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -199,7 +199,11 @@ export class Registrar implements RegistrarInterface {
199199

200200
await Promise.all(
201201
[...topologies.values()].map(async topology => {
202-
if (topology.filter?.has(remotePeer) === false) {
202+
// If the topology has a filter, only call onDisconnect if the peer
203+
// was previously added to the filter (which happens on onConnect).
204+
// This ensures limited connections that were never notified via
205+
// onConnect don't trigger onDisconnect.
206+
if (topology.filter != null && topology.filter.has(remotePeer) !== true) {
203207
return
204208
}
205209

@@ -237,7 +241,11 @@ export class Registrar implements RegistrarInterface {
237241

238242
await Promise.all(
239243
[...topologies.values()].map(async topology => {
240-
if (topology.filter?.has(peer.id) === false) {
244+
// If the topology has a filter, only call onDisconnect if the peer
245+
// was previously added to the filter (which happens on onConnect).
246+
// This ensures limited connections that were never notified via
247+
// onConnect don't trigger onDisconnect.
248+
if (topology.filter != null && topology.filter.has(peer.id) !== true) {
241249
return
242250
}
243251

packages/libp2p/test/registrar/registrar.spec.ts

Lines changed: 180 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -5,9 +5,10 @@ import { peerIdFromPrivateKey } from '@libp2p/peer-id'
55
import { expect } from 'aegir/chai'
66
import { TypedEventEmitter } from 'main-event'
77
import pDefer from 'p-defer'
8+
import sinon from 'sinon'
89
import { stubInterface } from 'sinon-ts'
910
import { Registrar } from '../../src/registrar.js'
10-
import type { Libp2pEvents, PeerId, PeerStore, Topology, Peer, Connection } from '@libp2p/interface'
11+
import type { Libp2pEvents, PeerId, PeerStore, Topology, TopologyFilter, Peer, Connection } from '@libp2p/interface'
1112
import type { TypedEventTarget } from 'main-event'
1213
import type { StubbedInstance } from 'sinon-ts'
1314

@@ -229,6 +230,15 @@ describe('registrar topologies', () => {
229230
// register topology for protocol
230231
await registrar.register(protocol, topology)
231232

233+
// Peer data is in the peer store
234+
peerStore.get.withArgs(remotePeerId).resolves({
235+
id: remotePeerId,
236+
addresses: [],
237+
protocols: [protocol],
238+
metadata: new Map(),
239+
tags: new Map()
240+
})
241+
232242
// remote peer connects
233243
events.safeDispatchEvent('peer:identify', {
234244
detail: {
@@ -238,19 +248,35 @@ describe('registrar topologies', () => {
238248
}
239249
})
240250

251+
// wait a bit to ensure onConnect is not called
241252
await expect(Promise.any([
242253
onConnectDefer.promise,
254+
new Promise<void>((resolve) => {
255+
setTimeout(() => {
256+
resolve()
257+
}, 100)
258+
})
259+
])).to.eventually.not.be.rejected()
260+
261+
// now simulate disconnect
262+
events.safeDispatchEvent('peer:disconnect', {
263+
detail: remotePeerId
264+
})
265+
266+
// wait to ensure onDisconnect is not called
267+
await expect(Promise.any([
243268
onDisconnectDefer.promise,
244269
new Promise<void>((resolve) => {
245270
setTimeout(() => {
246271
resolve()
247-
}, 1000)
272+
}, 100)
248273
})
249274
])).to.eventually.not.be.rejected()
250275
})
251276

252277
it('should call topology onConnect handler for limited connection when explicitly requested', async () => {
253278
const onConnectDefer = pDefer()
279+
const onDisconnectDefer = pDefer()
254280

255281
// setup connections before registrar
256282
const remotePeerId = peerIdFromPrivateKey(await generateKeyPair('Ed25519'))
@@ -266,12 +292,24 @@ describe('registrar topologies', () => {
266292
notifyOnLimitedConnection: true,
267293
onConnect: () => {
268294
onConnectDefer.resolve()
295+
},
296+
onDisconnect: () => {
297+
onDisconnectDefer.resolve()
269298
}
270299
}
271300

272301
// register topology for protocol
273302
await registrar.register(protocol, topology)
274303

304+
// Peer data is in the peer store
305+
peerStore.get.withArgs(remotePeerId).resolves({
306+
id: remotePeerId,
307+
addresses: [],
308+
protocols: [protocol],
309+
metadata: new Map(),
310+
tags: new Map()
311+
})
312+
275313
// remote peer connects
276314
events.safeDispatchEvent('peer:identify', {
277315
detail: {
@@ -282,6 +320,146 @@ describe('registrar topologies', () => {
282320
})
283321

284322
await expect(onConnectDefer.promise).to.eventually.be.undefined()
323+
324+
// now simulate disconnect - this should also be called
325+
events.safeDispatchEvent('peer:disconnect', {
326+
detail: remotePeerId
327+
})
328+
329+
await expect(onDisconnectDefer.promise).to.eventually.be.undefined()
330+
})
331+
332+
it('should not call topology onDisconnect when peer was filtered out during connect', async () => {
333+
const onDisconnectDefer = pDefer()
334+
335+
// setup peer
336+
const remotePeerId = peerIdFromPrivateKey(await generateKeyPair('Ed25519'))
337+
338+
// topology WITH filter - this is required to track which peers were notified
339+
const filter = stubInterface<TopologyFilter>({
340+
has: sinon.stub().returns(false),
341+
add: sinon.stub(),
342+
remove: sinon.stub()
343+
})
344+
345+
const topology: Topology = {
346+
filter,
347+
onDisconnect: () => {
348+
onDisconnectDefer.reject(new Error('Topology onDisconnect called for peer that was never onConnect\'d'))
349+
}
350+
}
351+
352+
// register topology for protocol
353+
await registrar.register(protocol, topology)
354+
355+
// Peer data is in the peer store
356+
peerStore.get.withArgs(remotePeerId).resolves({
357+
id: remotePeerId,
358+
addresses: [],
359+
protocols: [protocol],
360+
metadata: new Map(),
361+
tags: new Map()
362+
})
363+
364+
// simulate disconnect without the peer ever being in the filter
365+
// (this happens when a limited connection connects and disconnects
366+
// but the topology has notifyOnLimitedConnection: false)
367+
events.safeDispatchEvent('peer:disconnect', {
368+
detail: remotePeerId
369+
})
370+
371+
// wait to ensure onDisconnect is not called
372+
await expect(Promise.any([
373+
onDisconnectDefer.promise,
374+
new Promise<void>((resolve) => {
375+
setTimeout(() => {
376+
resolve()
377+
}, 100)
378+
})
379+
])).to.eventually.not.be.rejected()
380+
})
381+
382+
it('should not call topology onDisconnect on peer update when peer was filtered out during connect', async () => {
383+
const onDisconnectDefer = pDefer()
384+
385+
// setup peer
386+
const remotePeerId = peerIdFromPrivateKey(await generateKeyPair('Ed25519'))
387+
388+
// connection is limited
389+
const conn = stubInterface<Connection>({
390+
remotePeer: remotePeerId,
391+
limits: {
392+
bytes: 100n
393+
}
394+
})
395+
396+
// topology WITH filter - this is required to track which peers were notified
397+
const filter = stubInterface<TopologyFilter>({
398+
has: sinon.stub().returns(false),
399+
add: sinon.stub(),
400+
remove: sinon.stub()
401+
})
402+
403+
const topology: Topology = {
404+
filter,
405+
// notifyOnLimitedConnection is NOT set (defaults to false)
406+
onDisconnect: () => {
407+
onDisconnectDefer.reject(new Error('Topology onDisconnect called for peer that was never onConnect\'d'))
408+
}
409+
}
410+
411+
// register topology for protocol
412+
await registrar.register(protocol, topology)
413+
414+
// Peer data is in the peer store with the protocol
415+
peerStore.get.withArgs(remotePeerId).resolves({
416+
id: remotePeerId,
417+
addresses: [],
418+
protocols: [protocol],
419+
metadata: new Map(),
420+
tags: new Map()
421+
})
422+
423+
// remote peer identifies with limited connection
424+
events.safeDispatchEvent('peer:identify', {
425+
detail: {
426+
peerId: remotePeerId,
427+
protocols: [protocol],
428+
connection: conn
429+
}
430+
})
431+
432+
// wait a bit to ensure onConnect is not called (because connection is limited)
433+
await new Promise(resolve => setTimeout(resolve, 100))
434+
435+
// now simulate peer update removing the protocol
436+
// (this triggers onDisconnect in _onPeerUpdate)
437+
events.safeDispatchEvent('peer:update', {
438+
detail: {
439+
peer: {
440+
id: remotePeerId,
441+
protocols: [], // protocol removed
442+
addresses: [],
443+
metadata: new Map()
444+
},
445+
previous: {
446+
id: remotePeerId,
447+
protocols: [protocol], // had protocol before
448+
addresses: [],
449+
metadata: new Map()
450+
}
451+
}
452+
})
453+
454+
// wait to ensure onDisconnect is not called
455+
await expect(Promise.any([
456+
onDisconnectDefer.promise,
457+
new Promise<void>((resolve) => {
458+
setTimeout(() => {
459+
resolve()
460+
}, 100)
461+
})
462+
])).to.eventually.not.be.rejected()
285463
})
286464

287465
it('should call topology handlers for non-limited connection opened after limited connection', async () => {

0 commit comments

Comments
 (0)