From c1e880ca25151664a5cb392479918ffa73b5460e Mon Sep 17 00:00:00 2001 From: Thomas Eizinger Date: Fri, 16 Dec 2022 15:35:56 +1100 Subject: [PATCH 1/2] Refactor dialling to have a more linear flow --- swarm/src/dial_opts.rs | 100 +++++++++++++++++++++++ swarm/src/lib.rs | 179 +++++++++++++---------------------------- 2 files changed, 155 insertions(+), 124 deletions(-) diff --git a/swarm/src/dial_opts.rs b/swarm/src/dial_opts.rs index edc69484b68..acc1b69a617 100644 --- a/swarm/src/dial_opts.rs +++ b/swarm/src/dial_opts.rs @@ -20,6 +20,8 @@ // DEALINGS IN THE SOFTWARE. use libp2p_core::connection::Endpoint; +use libp2p_core::multiaddr::Protocol; +use libp2p_core::multihash::Multihash; use libp2p_core::{Multiaddr, PeerId}; use std::num::NonZeroU8; @@ -79,6 +81,104 @@ impl DialOpts { DialOpts(Opts::WithoutPeerIdWithAddress(_)) => None, } } + + /// Retrieves the [`PeerId`] from the [`DialOpts`] if specified or otherwise tries to parse it + /// from the multihash in the `/p2p` part of the address, if present. + /// + /// Note: A [`Multiaddr`] with something else other than a [`PeerId`] within the `/p2p` protocol is invalid as per specification. + /// Unfortunately, we are not making good use of the type system here. + /// Really, this function should be merged with [`DialOpts::get_peer_id`] above. + /// If it weren't for the parsing error, the function signatures would be the same. + /// + /// See . + pub(crate) fn get_or_parse_peer_id(&self) -> Result, Multihash> { + match self { + DialOpts(Opts::WithPeerId(WithPeerId { peer_id, .. })) => Ok(Some(*peer_id)), + DialOpts(Opts::WithPeerIdWithAddresses(WithPeerIdWithAddresses { + peer_id, .. + })) => Ok(Some(*peer_id)), + DialOpts(Opts::WithoutPeerIdWithAddress(WithoutPeerIdWithAddress { + address, .. + })) => { + let peer_id = address + .iter() + .last() + .and_then(|p| { + if let Protocol::P2p(ma) = p { + Some(PeerId::try_from(ma)) + } else { + None + } + }) + .transpose()?; + + Ok(peer_id) + } + } + } + + pub(crate) fn get_addresses(&self) -> Vec { + match self { + DialOpts(Opts::WithPeerId(WithPeerId { .. })) => vec![], + DialOpts(Opts::WithPeerIdWithAddresses(WithPeerIdWithAddresses { + addresses, .. + })) => addresses.clone(), + DialOpts(Opts::WithoutPeerIdWithAddress(WithoutPeerIdWithAddress { + address, .. + })) => vec![address.clone()], + } + } + + pub(crate) fn extend_addresses_through_behaviour(&self) -> bool { + match self { + DialOpts(Opts::WithPeerId(WithPeerId { .. })) => true, + DialOpts(Opts::WithPeerIdWithAddresses(WithPeerIdWithAddresses { + extend_addresses_through_behaviour, + .. + })) => *extend_addresses_through_behaviour, + DialOpts(Opts::WithoutPeerIdWithAddress(WithoutPeerIdWithAddress { .. })) => true, + } + } + + pub(crate) fn peer_condition(&self) -> PeerCondition { + match self { + DialOpts( + Opts::WithPeerId(WithPeerId { condition, .. }) + | Opts::WithPeerIdWithAddresses(WithPeerIdWithAddresses { condition, .. }), + ) => *condition, + DialOpts(Opts::WithoutPeerIdWithAddress(WithoutPeerIdWithAddress { .. })) => { + PeerCondition::Always + } + } + } + + pub(crate) fn dial_concurrency_override(&self) -> Option { + match self { + DialOpts(Opts::WithPeerId(WithPeerId { + dial_concurrency_factor_override, + .. + })) => *dial_concurrency_factor_override, + DialOpts(Opts::WithPeerIdWithAddresses(WithPeerIdWithAddresses { + dial_concurrency_factor_override, + .. + })) => *dial_concurrency_factor_override, + DialOpts(Opts::WithoutPeerIdWithAddress(WithoutPeerIdWithAddress { .. })) => None, + } + } + + pub(crate) fn role_override(&self) -> Endpoint { + match self { + DialOpts(Opts::WithPeerId(WithPeerId { role_override, .. })) => *role_override, + DialOpts(Opts::WithPeerIdWithAddresses(WithPeerIdWithAddresses { + role_override, + .. + })) => *role_override, + DialOpts(Opts::WithoutPeerIdWithAddress(WithoutPeerIdWithAddress { + role_override, + .. + })) => *role_override, + } + } } impl From for DialOpts { diff --git a/swarm/src/lib.rs b/swarm/src/lib.rs index cf6051e1e85..ed0a89dea35 100644 --- a/swarm/src/lib.rs +++ b/swarm/src/lib.rs @@ -122,7 +122,6 @@ pub use registry::{AddAddressResult, AddressRecord, AddressScore}; use connection::pool::{EstablishedConnection, Pool, PoolConfig, PoolEvent}; use connection::IncomingInfo; use dial_opts::{DialOpts, PeerCondition}; -use either::Either; use futures::{executor::ThreadPoolBuilder, prelude::*, stream::FusedStream}; use libp2p_core::connection::ConnectionId; use libp2p_core::muxing::SubstreamBox; @@ -138,7 +137,6 @@ use libp2p_core::{ use registry::{AddressIntoIter, Addresses}; use smallvec::SmallVec; use std::collections::{HashMap, HashSet}; -use std::iter; use std::num::{NonZeroU32, NonZeroU8, NonZeroUsize}; use std::{ convert::TryFrom, @@ -507,139 +505,72 @@ where fn dial_with_handler( &mut self, - swarm_dial_opts: DialOpts, + dial_opts: DialOpts, handler: ::ConnectionHandler, ) -> Result<(), DialError> { - let (peer_id, addresses, dial_concurrency_factor_override, role_override) = - match swarm_dial_opts.0 { - // Dial a known peer. - dial_opts::Opts::WithPeerId(dial_opts::WithPeerId { - peer_id, - condition, - role_override, - dial_concurrency_factor_override, - }) - | dial_opts::Opts::WithPeerIdWithAddresses(dial_opts::WithPeerIdWithAddresses { - peer_id, - condition, - role_override, - dial_concurrency_factor_override, - .. - }) => { - // Check [`PeerCondition`] if provided. - let condition_matched = match condition { - PeerCondition::Disconnected => !self.is_connected(&peer_id), - PeerCondition::NotDialing => !self.pool.is_dialing(peer_id), - PeerCondition::Always => true, - }; - if !condition_matched { - #[allow(deprecated)] - self.behaviour.inject_dial_failure( - Some(peer_id), - handler, - &DialError::DialPeerConditionFalse(condition), - ); - - return Err(DialError::DialPeerConditionFalse(condition)); - } + let peer_id = dial_opts + .get_or_parse_peer_id() + .map_err(DialError::InvalidPeerId)?; + let condition = dial_opts.peer_condition(); + + let should_dial = match (condition, peer_id) { + (PeerCondition::Always, _) => true, + (PeerCondition::Disconnected, None) => true, + (PeerCondition::NotDialing, None) => true, + (PeerCondition::Disconnected, Some(peer_id)) => !self.pool.is_connected(peer_id), + (PeerCondition::NotDialing, Some(peer_id)) => !self.pool.is_dialing(peer_id), + }; - // Check if peer is banned. - if self.banned_peers.contains(&peer_id) { - let error = DialError::Banned; - #[allow(deprecated)] - self.behaviour - .inject_dial_failure(Some(peer_id), handler, &error); - return Err(error); - } + if !should_dial { + let e = DialError::DialPeerConditionFalse(condition); - // Retrieve the addresses to dial. - let addresses = { - let mut addresses = match swarm_dial_opts.0 { - dial_opts::Opts::WithPeerId(dial_opts::WithPeerId { .. }) => { - self.behaviour.addresses_of_peer(&peer_id) - } - dial_opts::Opts::WithPeerIdWithAddresses( - dial_opts::WithPeerIdWithAddresses { - peer_id, - mut addresses, - extend_addresses_through_behaviour, - .. - }, - ) => { - if extend_addresses_through_behaviour { - addresses.extend(self.behaviour.addresses_of_peer(&peer_id)) - } - addresses - } - dial_opts::Opts::WithoutPeerIdWithAddress { .. } => { - unreachable!("Due to outer match.") - } - }; + #[allow(deprecated)] + self.behaviour.inject_dial_failure(peer_id, handler, &e); - let mut unique_addresses = HashSet::new(); - addresses.retain(|addr| { - !self.listened_addrs.values().flatten().any(|a| a == addr) - && unique_addresses.insert(addr.clone()) - }); + return Err(e); + } - if addresses.is_empty() { - let error = DialError::NoAddresses; - #[allow(deprecated)] - self.behaviour - .inject_dial_failure(Some(peer_id), handler, &error); - return Err(error); - }; + if let Some(peer_id) = peer_id { + // Check if peer is banned. + if self.banned_peers.contains(&peer_id) { + let error = DialError::Banned; + #[allow(deprecated)] + self.behaviour + .inject_dial_failure(Some(peer_id), handler, &error); + return Err(error); + } + } - addresses - }; + let addresses = { + let mut addresses = dial_opts.get_addresses(); - ( - Some(peer_id), - Either::Left(addresses.into_iter()), - dial_concurrency_factor_override, - role_override, - ) + if let Some(peer_id) = peer_id { + if dial_opts.extend_addresses_through_behaviour() { + addresses.extend(self.behaviour.addresses_of_peer(&peer_id)); } - // Dial an unknown peer. - dial_opts::Opts::WithoutPeerIdWithAddress( - dial_opts::WithoutPeerIdWithAddress { - address, - role_override, - }, - ) => { - // If the address ultimately encapsulates an expected peer ID, dial that peer - // such that any mismatch is detected. We do not "pop off" the `P2p` protocol - // from the address, because it may be used by the `Transport`, i.e. `P2p` - // is a protocol component that can influence any transport, like `libp2p-dns`. - let peer_id = match address - .iter() - .last() - .and_then(|p| { - if let Protocol::P2p(ma) = p { - Some(PeerId::try_from(ma)) - } else { - None - } - }) - .transpose() - { - Ok(peer_id) => peer_id, - Err(multihash) => return Err(DialError::InvalidPeerId(multihash)), - }; + } - ( - peer_id, - Either::Right(iter::once(address)), - None, - role_override, - ) - } + let mut unique_addresses = HashSet::new(); + addresses.retain(|addr| { + !self.listened_addrs.values().flatten().any(|a| a == addr) + && unique_addresses.insert(addr.clone()) + }); + + if addresses.is_empty() { + let error = DialError::NoAddresses; + #[allow(deprecated)] + self.behaviour.inject_dial_failure(peer_id, handler, &error); + return Err(error); }; + addresses + }; + let dials = addresses + .into_iter() .map(|a| match p2p_addr(peer_id, a) { Ok(address) => { - let dial = match role_override { + let dial = match dial_opts.role_override() { Endpoint::Dialer => self.transport.dial(address.clone()), Endpoint::Listener => self.transport.dial_as_listener(address.clone()), }; @@ -662,8 +593,8 @@ where dials, peer_id, handler, - role_override, - dial_concurrency_factor_override, + dial_opts.role_override(), + dial_opts.dial_concurrency_override(), ) { Ok(_connection_id) => Ok(()), Err((connection_limit, handler)) => { @@ -1088,9 +1019,9 @@ where return Some(SwarmEvent::Behaviour(event)) } NetworkBehaviourAction::Dial { opts, handler } => { - let peer_id = opts.get_peer_id(); + let peer_id = opts.get_or_parse_peer_id(); if let Ok(()) = self.dial_with_handler(opts, handler) { - if let Some(peer_id) = peer_id { + if let Ok(Some(peer_id)) = peer_id { return Some(SwarmEvent::Dialing(peer_id)); } } From 6a734543eae8409e22591188758052791140038d Mon Sep 17 00:00:00 2001 From: Thomas Eizinger Date: Mon, 19 Dec 2022 17:58:06 +1100 Subject: [PATCH 2/2] Add hack to clear listen addresses --- swarm/src/lib.rs | 2 ++ 1 file changed, 2 insertions(+) diff --git a/swarm/src/lib.rs b/swarm/src/lib.rs index ed0a89dea35..5b09bd2cc38 100644 --- a/swarm/src/lib.rs +++ b/swarm/src/lib.rs @@ -2447,6 +2447,8 @@ mod tests { _ => panic!("Was expecting the listen address to be reported"), })); + swarm.listened_addrs.clear(); // This is a hack to actually execute the dial to ourselves which would otherwise be filtered. + swarm.dial(local_address.clone()).unwrap(); let mut got_dial_err = false;