Skip to content

Commit 19a5549

Browse files
feat(swarm)!: allow NetworkBehaviours to manage connections
Previously, a `ConnectionHandler` was immediately requested from the `NetworkBehaviour` as soon as a new dial was initiated or a new incoming connection accepted. With this patch, we delay the creation of the handler until the connection is actually established and fully upgraded, i.e authenticated and multiplexed. As a consequence, `NetworkBehaviour::new_handler` is now deprecated in favor of a new set of callbacks: - `NetworkBehaviour::handle_pending_inbound_connection` - `NetworkBehaviour::handle_pending_outbound_connection` - `NetworkBehaviour::handle_established_inbound_connection` - `NetworkBehaviour::handle_established_outbound_connection` All callbacks are fallible, allowing the `NetworkBehaviour` to abort the connection either immediately or after it is fully established. All callbacks also receive a `ConnectionId` parameter which uniquely identifies the connection. For example, in case a `NetworkBehaviour` issues a dial via `NetworkBehaviourAction::Dial`, it can unambiguously detect this dial in these lifecycle callbacks via the `ConnectionId`. Finally, `NetworkBehaviour::handle_pending_outbound_connection` also replaces `NetworkBehaviour::addresses_of_peer` by allowing the behaviour to return more addresses to be used for the dial. Resolves #2824. Pull-Request: #3254.
1 parent 794b2a2 commit 19a5549

File tree

42 files changed

+1543
-540
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

42 files changed

+1543
-540
lines changed

CHANGELOG.md

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -47,6 +47,11 @@
4747

4848
# 0.51.0 [unreleased]
4949

50+
- Enable `NetworkBehaviour`s to manage connections.
51+
This deprecates `NetworkBehaviour::new_handler` and `NetworkBehaviour::addresses_of_peer`.
52+
Due to limitations in the Rust compiler, these deprecations may not show up for you, nevertheless they will be removed in a future release.
53+
See [`libp2p-swarm`'s CHANGELOG](swarm/CHANGELOG.md#0420) for details.
54+
5055
- Count bandwidth at the application level. Previously `BandwidthLogging` would implement `Transport` and now implements `StreamMuxer` ([PR 3180](https://github.com/libp2p/rust-libp2p/pull/3180)).
5156
- `BandwidthLogging::new` now requires a 2nd argument: `Arc<BandwidthSinks>`
5257
- Remove `BandwidthFuture`

misc/metrics/src/swarm.rs

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -244,6 +244,9 @@ impl<TBvEv, THandleErr> super::Recorder<libp2p_swarm::SwarmEvent<TBvEv, THandleE
244244
libp2p_swarm::DialError::WrongPeerId { .. } => {
245245
record(OutgoingConnectionError::WrongPeerId)
246246
}
247+
libp2p_swarm::DialError::Denied { .. } => {
248+
record(OutgoingConnectionError::Denied)
249+
}
247250
};
248251
}
249252
libp2p_swarm::SwarmEvent::BannedPeer { endpoint, .. } => {
@@ -344,6 +347,7 @@ enum OutgoingConnectionError {
344347
WrongPeerId,
345348
TransportMultiaddrNotSupported,
346349
TransportOther,
350+
Denied,
347351
}
348352

349353
#[derive(EncodeLabelSet, Hash, Clone, Eq, PartialEq, Debug)]
@@ -360,6 +364,7 @@ enum IncomingConnectionError {
360364
TransportErrorOther,
361365
Aborted,
362366
ConnectionLimit,
367+
Denied,
363368
}
364369

365370
impl From<&libp2p_swarm::ListenError> for IncomingConnectionError {
@@ -377,6 +382,7 @@ impl From<&libp2p_swarm::ListenError> for IncomingConnectionError {
377382
libp2p_core::transport::TransportError::Other(_),
378383
) => IncomingConnectionError::TransportErrorOther,
379384
libp2p_swarm::ListenError::Aborted => IncomingConnectionError::Aborted,
385+
libp2p_swarm::ListenError::Denied { .. } => IncomingConnectionError::Denied,
380386
}
381387
}
382388
}

protocols/autonat/src/behaviour.rs

Lines changed: 49 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -37,8 +37,8 @@ use libp2p_swarm::{
3737
AddressChange, ConnectionClosed, ConnectionEstablished, DialFailure, ExpiredExternalAddr,
3838
ExpiredListenAddr, FromSwarm,
3939
},
40-
ConnectionId, ExternalAddresses, ListenAddresses, NetworkBehaviour, NetworkBehaviourAction,
41-
PollParameters, THandlerInEvent, THandlerOutEvent,
40+
ConnectionDenied, ConnectionId, ExternalAddresses, ListenAddresses, NetworkBehaviour,
41+
NetworkBehaviourAction, PollParameters, THandler, THandlerInEvent, THandlerOutEvent,
4242
};
4343
use std::{
4444
collections::{HashMap, VecDeque},
@@ -485,12 +485,55 @@ impl NetworkBehaviour for Behaviour {
485485
}
486486
}
487487

488-
fn new_handler(&mut self) -> Self::ConnectionHandler {
489-
self.inner.new_handler()
488+
fn handle_pending_inbound_connection(
489+
&mut self,
490+
connection_id: ConnectionId,
491+
local_addr: &Multiaddr,
492+
remote_addr: &Multiaddr,
493+
) -> Result<(), ConnectionDenied> {
494+
self.inner
495+
.handle_pending_inbound_connection(connection_id, local_addr, remote_addr)
496+
}
497+
498+
fn handle_established_inbound_connection(
499+
&mut self,
500+
_connection_id: ConnectionId,
501+
peer: PeerId,
502+
local_addr: &Multiaddr,
503+
remote_addr: &Multiaddr,
504+
) -> Result<THandler<Self>, ConnectionDenied> {
505+
self.inner.handle_established_inbound_connection(
506+
_connection_id,
507+
peer,
508+
local_addr,
509+
remote_addr,
510+
)
490511
}
491512

492-
fn addresses_of_peer(&mut self, peer: &PeerId) -> Vec<Multiaddr> {
493-
self.inner.addresses_of_peer(peer)
513+
fn handle_pending_outbound_connection(
514+
&mut self,
515+
_connection_id: ConnectionId,
516+
maybe_peer: Option<PeerId>,
517+
_addresses: &[Multiaddr],
518+
_effective_role: Endpoint,
519+
) -> Result<Vec<Multiaddr>, ConnectionDenied> {
520+
self.inner.handle_pending_outbound_connection(
521+
_connection_id,
522+
maybe_peer,
523+
_addresses,
524+
_effective_role,
525+
)
526+
}
527+
528+
fn handle_established_outbound_connection(
529+
&mut self,
530+
_connection_id: ConnectionId,
531+
peer: PeerId,
532+
addr: &Multiaddr,
533+
role_override: Endpoint,
534+
) -> Result<THandler<Self>, ConnectionDenied> {
535+
self.inner
536+
.handle_established_outbound_connection(_connection_id, peer, addr, role_override)
494537
}
495538

496539
fn on_swarm_event(&mut self, event: FromSwarm<Self::ConnectionHandler>) {

protocols/dcutr/src/behaviour_impl.rs

Lines changed: 86 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -24,14 +24,14 @@ use crate::handler;
2424
use either::Either;
2525
use libp2p_core::connection::ConnectedPoint;
2626
use libp2p_core::multiaddr::Protocol;
27-
use libp2p_core::{Multiaddr, PeerId};
27+
use libp2p_core::{Endpoint, Multiaddr, PeerId};
2828
use libp2p_swarm::behaviour::{ConnectionClosed, ConnectionEstablished, DialFailure, FromSwarm};
2929
use libp2p_swarm::dial_opts::{self, DialOpts};
30+
use libp2p_swarm::{dummy, ConnectionDenied, ConnectionId, THandler, THandlerOutEvent};
3031
use libp2p_swarm::{
3132
ConnectionHandlerUpgrErr, ExternalAddresses, NetworkBehaviour, NetworkBehaviourAction,
3233
NotifyHandler, PollParameters, THandlerInEvent,
3334
};
34-
use libp2p_swarm::{ConnectionId, THandlerOutEvent};
3535
use std::collections::{HashMap, HashSet, VecDeque};
3636
use std::task::{Context, Poll};
3737
use thiserror::Error;
@@ -64,12 +64,14 @@ pub enum Error {
6464
#[error("Failed to dial peer.")]
6565
Dial,
6666
#[error("Failed to establish substream: {0}.")]
67-
Handler(ConnectionHandlerUpgrErr<void::Void>),
67+
Handler(ConnectionHandlerUpgrErr<Void>),
6868
}
6969

7070
pub struct Behaviour {
7171
/// Queue of actions to return when polled.
72-
queued_events: VecDeque<NetworkBehaviourAction<Event, Either<handler::relayed::Command, Void>>>,
72+
queued_events: VecDeque<
73+
NetworkBehaviourAction<Event, Either<handler::relayed::Command, Either<Void, Void>>>,
74+
>,
7375

7476
/// All direct (non-relayed) connections.
7577
direct_connections: HashMap<PeerId, HashSet<ConnectionId>>,
@@ -237,11 +239,82 @@ impl Behaviour {
237239
}
238240

239241
impl NetworkBehaviour for Behaviour {
240-
type ConnectionHandler = handler::Prototype;
242+
type ConnectionHandler = Either<
243+
handler::relayed::Handler,
244+
Either<handler::direct::Handler, dummy::ConnectionHandler>,
245+
>;
241246
type OutEvent = Event;
242247

243-
fn new_handler(&mut self) -> Self::ConnectionHandler {
244-
handler::Prototype
248+
fn handle_established_inbound_connection(
249+
&mut self,
250+
connection_id: ConnectionId,
251+
peer: PeerId,
252+
local_addr: &Multiaddr,
253+
remote_addr: &Multiaddr,
254+
) -> Result<THandler<Self>, ConnectionDenied> {
255+
match self
256+
.outgoing_direct_connection_attempts
257+
.remove(&(connection_id, peer))
258+
{
259+
None => {
260+
let handler = if is_relayed(local_addr) {
261+
Either::Left(handler::relayed::Handler::new(ConnectedPoint::Listener {
262+
local_addr: local_addr.clone(),
263+
send_back_addr: remote_addr.clone(),
264+
})) // TODO: We could make two `handler::relayed::Handler` here, one inbound one outbound.
265+
} else {
266+
Either::Right(Either::Right(dummy::ConnectionHandler))
267+
};
268+
269+
Ok(handler)
270+
}
271+
Some(_) => {
272+
assert!(
273+
!is_relayed(local_addr),
274+
"`Prototype::DirectConnection` is never created for relayed connection."
275+
);
276+
277+
Ok(Either::Right(Either::Left(
278+
handler::direct::Handler::default(),
279+
)))
280+
}
281+
}
282+
}
283+
284+
fn handle_established_outbound_connection(
285+
&mut self,
286+
connection_id: ConnectionId,
287+
peer: PeerId,
288+
addr: &Multiaddr,
289+
role_override: Endpoint,
290+
) -> Result<THandler<Self>, ConnectionDenied> {
291+
match self
292+
.outgoing_direct_connection_attempts
293+
.remove(&(connection_id, peer))
294+
{
295+
None => {
296+
let handler = if is_relayed(addr) {
297+
Either::Left(handler::relayed::Handler::new(ConnectedPoint::Dialer {
298+
address: addr.clone(),
299+
role_override,
300+
})) // TODO: We could make two `handler::relayed::Handler` here, one inbound one outbound.
301+
} else {
302+
Either::Right(Either::Right(dummy::ConnectionHandler))
303+
};
304+
305+
Ok(handler)
306+
}
307+
Some(_) => {
308+
assert!(
309+
!is_relayed(addr),
310+
"`Prototype::DirectConnection` is never created for relayed connection."
311+
);
312+
313+
Ok(Either::Right(Either::Left(
314+
handler::direct::Handler::default(),
315+
)))
316+
}
317+
}
245318
}
246319

247320
fn on_connection_handler_event(
@@ -332,7 +405,7 @@ impl NetworkBehaviour for Behaviour {
332405
self.queued_events
333406
.push_back(NetworkBehaviourAction::Dial { opts });
334407
}
335-
Either::Right(handler::direct::Event::DirectConnectionEstablished) => {
408+
Either::Right(Either::Left(handler::direct::Event::DirectConnectionEstablished)) => {
336409
self.queued_events.extend([
337410
NetworkBehaviourAction::NotifyHandler {
338411
peer_id: event_source,
@@ -348,6 +421,7 @@ impl NetworkBehaviour for Behaviour {
348421
),
349422
]);
350423
}
424+
Either::Right(Either::Right(never)) => void::unreachable(never),
351425
};
352426
}
353427

@@ -386,3 +460,7 @@ impl NetworkBehaviour for Behaviour {
386460
}
387461
}
388462
}
463+
464+
fn is_relayed(addr: &Multiaddr) -> bool {
465+
addr.iter().any(|p| p == Protocol::P2pCircuit)
466+
}

protocols/dcutr/src/handler.rs

Lines changed: 0 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -18,29 +18,5 @@
1818
// FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
1919
// DEALINGS IN THE SOFTWARE.
2020

21-
use crate::protocol;
22-
use either::Either;
23-
use libp2p_core::{ConnectedPoint, PeerId};
24-
use libp2p_swarm::handler::SendWrapper;
25-
use libp2p_swarm::{ConnectionHandler, IntoConnectionHandler};
26-
2721
pub mod direct;
2822
pub mod relayed;
29-
30-
pub struct Prototype;
31-
32-
impl IntoConnectionHandler for Prototype {
33-
type Handler = Either<relayed::Handler, direct::Handler>;
34-
35-
fn into_handler(self, _remote_peer_id: &PeerId, endpoint: &ConnectedPoint) -> Self::Handler {
36-
if endpoint.is_relayed() {
37-
Either::Left(relayed::Handler::new(endpoint.clone()))
38-
} else {
39-
Either::Right(direct::Handler::default()) // This is a direct connection. What we don't know is whether it is the one we created or another one that happened accidentally.
40-
}
41-
}
42-
43-
fn inbound_protocol(&self) -> <Self::Handler as ConnectionHandler>::InboundProtocol {
44-
Either::Left(SendWrapper(Either::Left(protocol::inbound::Upgrade {})))
45-
}
46-
}

protocols/floodsub/src/layer.rs

Lines changed: 21 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -26,11 +26,11 @@ use crate::topic::Topic;
2626
use crate::FloodsubConfig;
2727
use cuckoofilter::{CuckooError, CuckooFilter};
2828
use fnv::FnvHashSet;
29-
use libp2p_core::PeerId;
29+
use libp2p_core::{Endpoint, Multiaddr, PeerId};
3030
use libp2p_swarm::behaviour::{ConnectionClosed, ConnectionEstablished, FromSwarm};
3131
use libp2p_swarm::{
32-
dial_opts::DialOpts, ConnectionId, NetworkBehaviour, NetworkBehaviourAction, NotifyHandler,
33-
OneShotHandler, PollParameters, THandlerInEvent, THandlerOutEvent,
32+
dial_opts::DialOpts, ConnectionDenied, ConnectionId, NetworkBehaviour, NetworkBehaviourAction,
33+
NotifyHandler, OneShotHandler, PollParameters, THandler, THandlerInEvent, THandlerOutEvent,
3434
};
3535
use log::warn;
3636
use smallvec::SmallVec;
@@ -334,8 +334,24 @@ impl NetworkBehaviour for Floodsub {
334334
type ConnectionHandler = OneShotHandler<FloodsubProtocol, FloodsubRpc, InnerMessage>;
335335
type OutEvent = FloodsubEvent;
336336

337-
fn new_handler(&mut self) -> Self::ConnectionHandler {
338-
Default::default()
337+
fn handle_established_inbound_connection(
338+
&mut self,
339+
_: ConnectionId,
340+
_: PeerId,
341+
_: &Multiaddr,
342+
_: &Multiaddr,
343+
) -> Result<THandler<Self>, ConnectionDenied> {
344+
Ok(Default::default())
345+
}
346+
347+
fn handle_established_outbound_connection(
348+
&mut self,
349+
_: ConnectionId,
350+
_: PeerId,
351+
_: &Multiaddr,
352+
_: Endpoint,
353+
) -> Result<THandler<Self>, ConnectionDenied> {
354+
Ok(Default::default())
339355
}
340356

341357
fn on_connection_handler_event(

protocols/gossipsub/src/behaviour.rs

Lines changed: 26 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -36,13 +36,14 @@ use prost::Message as _;
3636
use rand::{seq::SliceRandom, thread_rng};
3737

3838
use libp2p_core::{
39-
identity::Keypair, multiaddr::Protocol::Ip4, multiaddr::Protocol::Ip6, Multiaddr, PeerId,
39+
identity::Keypair, multiaddr::Protocol::Ip4, multiaddr::Protocol::Ip6, Endpoint, Multiaddr,
40+
PeerId,
4041
};
4142
use libp2p_swarm::{
4243
behaviour::{AddressChange, ConnectionClosed, ConnectionEstablished, FromSwarm},
4344
dial_opts::DialOpts,
44-
ConnectionId, NetworkBehaviour, NetworkBehaviourAction, NotifyHandler, PollParameters,
45-
THandlerInEvent, THandlerOutEvent,
45+
ConnectionDenied, ConnectionId, NetworkBehaviour, NetworkBehaviourAction, NotifyHandler,
46+
PollParameters, THandler, THandlerInEvent, THandlerOutEvent,
4647
};
4748
use wasm_timer::Instant;
4849

@@ -3289,11 +3290,30 @@ where
32893290
type ConnectionHandler = Handler;
32903291
type OutEvent = Event;
32913292

3292-
fn new_handler(&mut self) -> Self::ConnectionHandler {
3293-
Handler::new(
3293+
fn handle_established_inbound_connection(
3294+
&mut self,
3295+
_: ConnectionId,
3296+
_: PeerId,
3297+
_: &Multiaddr,
3298+
_: &Multiaddr,
3299+
) -> Result<THandler<Self>, ConnectionDenied> {
3300+
Ok(Handler::new(
32943301
ProtocolConfig::new(&self.config),
32953302
self.config.idle_timeout(),
3296-
)
3303+
))
3304+
}
3305+
3306+
fn handle_established_outbound_connection(
3307+
&mut self,
3308+
_: ConnectionId,
3309+
_: PeerId,
3310+
_: &Multiaddr,
3311+
_: Endpoint,
3312+
) -> Result<THandler<Self>, ConnectionDenied> {
3313+
Ok(Handler::new(
3314+
ProtocolConfig::new(&self.config),
3315+
self.config.idle_timeout(),
3316+
))
32973317
}
32983318

32993319
fn on_connection_handler_event(

0 commit comments

Comments
 (0)