Skip to content

Commit 5782a96

Browse files
refactor(swarm)!: don't be generic over Transport (#3272)
Ever since we moved `Pool` into `libp2p-swarm`, we always use it with the same `Transport`: `Boxed`. It is thus unnecessary for us to be overly generic over what kind of `Transport` we are using. This allows us to remove a few type parameters from the implementation which overall simplifies things. This is technically a breaking change because I am removing a type parameter from two exported type aliases: - `PendingInboundConnectionError` - `PendingOutboundConnectionError` Those have always only be used with `std::io::Error` in our API but it is still a breaking change.
1 parent aca3454 commit 5782a96

File tree

7 files changed

+62
-98
lines changed

7 files changed

+62
-98
lines changed

misc/metrics/src/swarm.rs

Lines changed: 2 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -370,10 +370,8 @@ enum PendingInboundConnectionError {
370370
ConnectionLimit,
371371
}
372372

373-
impl<TTransErr> From<&libp2p_swarm::PendingInboundConnectionError<TTransErr>>
374-
for PendingInboundConnectionError
375-
{
376-
fn from(error: &libp2p_swarm::PendingInboundConnectionError<TTransErr>) -> Self {
373+
impl From<&libp2p_swarm::PendingInboundConnectionError> for PendingInboundConnectionError {
374+
fn from(error: &libp2p_swarm::PendingInboundConnectionError) -> Self {
377375
match error {
378376
libp2p_swarm::PendingInboundConnectionError::WrongPeerId { .. } => {
379377
PendingInboundConnectionError::WrongPeerId

swarm/CHANGELOG.md

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7,9 +7,13 @@
77

88
- Add `estblished_in` to `SwarmEvent::ConnectionEstablished`. See [PR 3134].
99

10+
- Remove type parameter from `PendingOutboundConnectionError` and `PendingInboundConnectionError`.
11+
These two types are always used with `std::io::Error`. See [PR 3272].
12+
1013
[PR 3170]: https://github.com/libp2p/rust-libp2p/pull/3170
1114
[PR 3134]: https://github.com/libp2p/rust-libp2p/pull/3134
1215
[PR 3153]: https://github.com/libp2p/rust-libp2p/pull/3153
16+
[PR 3272]: https://github.com/libp2p/rust-libp2p/pull/3272
1317

1418
# 0.41.1
1519

swarm/src/connection/error.rs

Lines changed: 3 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -76,12 +76,11 @@ impl<THandlerErr> From<io::Error> for ConnectionError<THandlerErr> {
7676
/// Note: Addresses for an outbound connection are dialed in parallel. Thus, compared to
7777
/// [`PendingInboundConnectionError`], one or more [`TransportError`]s can occur for a single
7878
/// connection.
79-
pub type PendingOutboundConnectionError<TTransErr> =
80-
PendingConnectionError<Vec<(Multiaddr, TransportError<TTransErr>)>>;
79+
pub type PendingOutboundConnectionError =
80+
PendingConnectionError<Vec<(Multiaddr, TransportError<io::Error>)>>;
8181

8282
/// Errors that can occur in the context of a pending incoming `Connection`.
83-
pub type PendingInboundConnectionError<TTransErr> =
84-
PendingConnectionError<TransportError<TTransErr>>;
83+
pub type PendingInboundConnectionError = PendingConnectionError<TransportError<io::Error>>;
8584

8685
/// Errors that can occur in the context of a pending `Connection`.
8786
#[derive(Debug)]

swarm/src/connection/pool.rs

Lines changed: 16 additions & 32 deletions
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,7 @@ use crate::{
2626
Connected, ConnectionError, ConnectionLimit, IncomingInfo, PendingConnectionError,
2727
PendingInboundConnectionError, PendingOutboundConnectionError,
2828
},
29-
transport::{Transport, TransportError},
29+
transport::TransportError,
3030
ConnectedPoint, ConnectionHandler, Executor, IntoConnectionHandler, Multiaddr, PeerId,
3131
};
3232
use concurrent_dial::ConcurrentDial;
@@ -79,9 +79,8 @@ impl ExecSwitch {
7979
}
8080

8181
/// A connection `Pool` manages a set of connections for each peer.
82-
pub struct Pool<THandler, TTrans>
82+
pub struct Pool<THandler>
8383
where
84-
TTrans: Transport,
8584
THandler: IntoConnectionHandler,
8685
{
8786
local_id: PeerId,
@@ -124,10 +123,10 @@ where
124123

125124
/// Sender distributed to pending tasks for reporting events back
126125
/// to the pool.
127-
pending_connection_events_tx: mpsc::Sender<task::PendingConnectionEvent<TTrans>>,
126+
pending_connection_events_tx: mpsc::Sender<task::PendingConnectionEvent>,
128127

129128
/// Receiver for events reported from pending tasks.
130-
pending_connection_events_rx: mpsc::Receiver<task::PendingConnectionEvent<TTrans>>,
129+
pending_connection_events_rx: mpsc::Receiver<task::PendingConnectionEvent>,
131130

132131
/// Sender distributed to established tasks for reporting events back
133132
/// to the pool.
@@ -213,7 +212,7 @@ impl<THandler> PendingConnection<THandler> {
213212
}
214213
}
215214

216-
impl<THandler: IntoConnectionHandler, TTrans: Transport> fmt::Debug for Pool<THandler, TTrans> {
215+
impl<THandler: IntoConnectionHandler> fmt::Debug for Pool<THandler> {
217216
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> Result<(), fmt::Error> {
218217
f.debug_struct("Pool")
219218
.field("counters", &self.counters)
@@ -223,10 +222,7 @@ impl<THandler: IntoConnectionHandler, TTrans: Transport> fmt::Debug for Pool<THa
223222

224223
/// Event that can happen on the `Pool`.
225224
#[derive(Debug)]
226-
pub enum PoolEvent<THandler: IntoConnectionHandler, TTrans>
227-
where
228-
TTrans: Transport,
229-
{
225+
pub enum PoolEvent<THandler: IntoConnectionHandler> {
230226
/// A new connection has been established.
231227
ConnectionEstablished {
232228
id: ConnectionId,
@@ -239,7 +235,7 @@ where
239235
/// [`Some`] when the new connection is an outgoing connection.
240236
/// Addresses are dialed in parallel. Contains the addresses and errors
241237
/// of dial attempts that failed before the one successful dial.
242-
concurrent_dial_errors: Option<Vec<(Multiaddr, TransportError<TTrans::Error>)>>,
238+
concurrent_dial_errors: Option<Vec<(Multiaddr, TransportError<std::io::Error>)>>,
243239
/// How long it took to establish this connection.
244240
established_in: std::time::Duration,
245241
},
@@ -272,7 +268,7 @@ where
272268
/// The ID of the failed connection.
273269
id: ConnectionId,
274270
/// The error that occurred.
275-
error: PendingOutboundConnectionError<TTrans::Error>,
271+
error: PendingOutboundConnectionError,
276272
/// The handler that was supposed to handle the connection.
277273
handler: THandler,
278274
/// The (expected) peer of the failed connection.
@@ -288,7 +284,7 @@ where
288284
/// Local connection address.
289285
local_addr: Multiaddr,
290286
/// The error that occurred.
291-
error: PendingInboundConnectionError<TTrans::Error>,
287+
error: PendingInboundConnectionError,
292288
/// The handler that was supposed to handle the connection.
293289
handler: THandler,
294290
},
@@ -312,10 +308,9 @@ where
312308
},
313309
}
314310

315-
impl<THandler, TTrans> Pool<THandler, TTrans>
311+
impl<THandler> Pool<THandler>
316312
where
317313
THandler: IntoConnectionHandler,
318-
TTrans: Transport,
319314
{
320315
/// Creates a new empty `Pool`.
321316
pub fn new(local_id: PeerId, config: PoolConfig, limits: ConnectionLimits) -> Self {
@@ -429,12 +424,9 @@ where
429424
}
430425
}
431426

432-
impl<THandler, TTrans> Pool<THandler, TTrans>
427+
impl<THandler> Pool<THandler>
433428
where
434429
THandler: IntoConnectionHandler,
435-
TTrans: Transport + 'static,
436-
TTrans::Output: Send + 'static,
437-
TTrans::Error: Send + 'static,
438430
{
439431
/// Adds a pending outgoing connection to the pool in the form of a `Future`
440432
/// that establishes and negotiates the connection.
@@ -448,22 +440,15 @@ where
448440
'static,
449441
(
450442
Multiaddr,
451-
Result<
452-
<TTrans as Transport>::Output,
453-
TransportError<<TTrans as Transport>::Error>,
454-
>,
443+
Result<(PeerId, StreamMuxerBox), TransportError<std::io::Error>>,
455444
),
456445
>,
457446
>,
458447
peer: Option<PeerId>,
459448
handler: THandler,
460449
role_override: Endpoint,
461450
dial_concurrency_factor_override: Option<NonZeroU8>,
462-
) -> Result<ConnectionId, (ConnectionLimit, THandler)>
463-
where
464-
TTrans: Send,
465-
TTrans::Dial: Send + 'static,
466-
{
451+
) -> Result<ConnectionId, (ConnectionLimit, THandler)> {
467452
if let Err(limit) = self.counters.check_max_pending_outgoing() {
468453
return Err((limit, handler));
469454
};
@@ -515,7 +500,7 @@ where
515500
info: IncomingInfo<'_>,
516501
) -> Result<ConnectionId, (ConnectionLimit, THandler)>
517502
where
518-
TFut: Future<Output = Result<TTrans::Output, TTrans::Error>> + Send + 'static,
503+
TFut: Future<Output = Result<(PeerId, StreamMuxerBox), std::io::Error>> + Send + 'static,
519504
{
520505
let endpoint = info.create_connected_point();
521506

@@ -552,9 +537,8 @@ where
552537
}
553538

554539
/// Polls the connection pool for events.
555-
pub fn poll(&mut self, cx: &mut Context<'_>) -> Poll<PoolEvent<THandler, TTrans>>
540+
pub fn poll(&mut self, cx: &mut Context<'_>) -> Poll<PoolEvent<THandler>>
556541
where
557-
TTrans: Transport<Output = (PeerId, StreamMuxerBox)>,
558542
THandler: IntoConnectionHandler + 'static,
559543
THandler::Handler: ConnectionHandler + Send,
560544
<THandler::Handler as ConnectionHandler>::OutboundOpenInfo: Send,
@@ -677,7 +661,7 @@ where
677661
),
678662
};
679663

680-
let error: Result<(), PendingInboundConnectionError<_>> = self
664+
let error = self
681665
.counters
682666
// Check general established connection limit.
683667
.check_max_established(&endpoint)

swarm/src/connection/pool/concurrent_dial.rs

Lines changed: 16 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -18,45 +18,38 @@
1818
// FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
1919
// DEALINGS IN THE SOFTWARE.
2020

21-
use crate::{
22-
transport::{Transport, TransportError},
23-
Multiaddr,
24-
};
21+
use crate::{transport::TransportError, Multiaddr};
2522
use futures::{
2623
future::{BoxFuture, Future},
2724
ready,
2825
stream::{FuturesUnordered, StreamExt},
2926
};
27+
use libp2p_core::muxing::StreamMuxerBox;
28+
use libp2p_core::PeerId;
3029
use std::{
3130
num::NonZeroU8,
3231
pin::Pin,
3332
task::{Context, Poll},
3433
};
3534

36-
type Dial<TTrans> = BoxFuture<
35+
type Dial = BoxFuture<
3736
'static,
3837
(
3938
Multiaddr,
40-
Result<<TTrans as Transport>::Output, TransportError<<TTrans as Transport>::Error>>,
39+
Result<(PeerId, StreamMuxerBox), TransportError<std::io::Error>>,
4140
),
4241
>;
4342

44-
pub struct ConcurrentDial<TTrans: Transport> {
45-
dials: FuturesUnordered<Dial<TTrans>>,
46-
pending_dials: Box<dyn Iterator<Item = Dial<TTrans>> + Send>,
47-
errors: Vec<(Multiaddr, TransportError<TTrans::Error>)>,
43+
pub struct ConcurrentDial {
44+
dials: FuturesUnordered<Dial>,
45+
pending_dials: Box<dyn Iterator<Item = Dial> + Send>,
46+
errors: Vec<(Multiaddr, TransportError<std::io::Error>)>,
4847
}
4948

50-
impl<TTrans: Transport> Unpin for ConcurrentDial<TTrans> {}
49+
impl Unpin for ConcurrentDial {}
5150

52-
impl<TTrans> ConcurrentDial<TTrans>
53-
where
54-
TTrans: Transport + Send + 'static,
55-
TTrans::Output: Send,
56-
TTrans::Error: Send,
57-
TTrans::Dial: Send + 'static,
58-
{
59-
pub(crate) fn new(pending_dials: Vec<Dial<TTrans>>, concurrency_factor: NonZeroU8) -> Self {
51+
impl ConcurrentDial {
52+
pub(crate) fn new(pending_dials: Vec<Dial>, concurrency_factor: NonZeroU8) -> Self {
6053
let mut pending_dials = pending_dials.into_iter();
6154

6255
let dials = FuturesUnordered::new();
@@ -75,20 +68,17 @@ where
7568
}
7669
}
7770

78-
impl<TTrans> Future for ConcurrentDial<TTrans>
79-
where
80-
TTrans: Transport,
81-
{
71+
impl Future for ConcurrentDial {
8272
type Output = Result<
8373
// Either one dial succeeded, returning the negotiated [`PeerId`], the address, the
8474
// muxer and the addresses and errors of the dials that failed before.
8575
(
8676
Multiaddr,
87-
TTrans::Output,
88-
Vec<(Multiaddr, TransportError<TTrans::Error>)>,
77+
(PeerId, StreamMuxerBox),
78+
Vec<(Multiaddr, TransportError<std::io::Error>)>,
8979
),
9080
// Or all dials failed, thus returning the address and error for each dial.
91-
Vec<(Multiaddr, TransportError<TTrans::Error>)>,
81+
Vec<(Multiaddr, TransportError<std::io::Error>)>,
9282
>;
9383

9484
fn poll(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Self::Output> {

swarm/src/connection/pool/task.rs

Lines changed: 13 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,7 @@ use crate::{
2626
connection::{
2727
self, ConnectionError, PendingInboundConnectionError, PendingOutboundConnectionError,
2828
},
29-
transport::{Transport, TransportError},
29+
transport::TransportError,
3030
ConnectionHandler, Multiaddr, PeerId,
3131
};
3232
use futures::{
@@ -35,6 +35,7 @@ use futures::{
3535
SinkExt, StreamExt,
3636
};
3737
use libp2p_core::connection::ConnectionId;
38+
use libp2p_core::muxing::StreamMuxerBox;
3839
use std::pin::Pin;
3940
use void::Void;
4041

@@ -48,26 +49,19 @@ pub enum Command<T> {
4849
Close,
4950
}
5051

51-
#[derive(Debug)]
52-
pub enum PendingConnectionEvent<TTrans>
53-
where
54-
TTrans: Transport,
55-
{
52+
pub enum PendingConnectionEvent {
5653
ConnectionEstablished {
5754
id: ConnectionId,
58-
output: TTrans::Output,
55+
output: (PeerId, StreamMuxerBox),
5956
/// [`Some`] when the new connection is an outgoing connection.
6057
/// Addresses are dialed in parallel. Contains the addresses and errors
6158
/// of dial attempts that failed before the one successful dial.
62-
outgoing: Option<(Multiaddr, Vec<(Multiaddr, TransportError<TTrans::Error>)>)>,
59+
outgoing: Option<(Multiaddr, Vec<(Multiaddr, TransportError<std::io::Error>)>)>,
6360
},
6461
/// A pending connection failed.
6562
PendingFailed {
6663
id: ConnectionId,
67-
error: Either<
68-
PendingOutboundConnectionError<TTrans::Error>,
69-
PendingInboundConnectionError<TTrans::Error>,
70-
>,
64+
error: Either<PendingOutboundConnectionError, PendingInboundConnectionError>,
7165
},
7266
}
7367

@@ -97,14 +91,12 @@ pub enum EstablishedConnectionEvent<THandler: ConnectionHandler> {
9791
},
9892
}
9993

100-
pub async fn new_for_pending_outgoing_connection<TTrans>(
94+
pub async fn new_for_pending_outgoing_connection(
10195
connection_id: ConnectionId,
102-
dial: ConcurrentDial<TTrans>,
96+
dial: ConcurrentDial,
10397
abort_receiver: oneshot::Receiver<Void>,
104-
mut events: mpsc::Sender<PendingConnectionEvent<TTrans>>,
105-
) where
106-
TTrans: Transport,
107-
{
98+
mut events: mpsc::Sender<PendingConnectionEvent>,
99+
) {
108100
match futures::future::select(abort_receiver, Box::pin(dial)).await {
109101
Either::Left((Err(oneshot::Canceled), _)) => {
110102
let _ = events
@@ -135,14 +127,13 @@ pub async fn new_for_pending_outgoing_connection<TTrans>(
135127
}
136128
}
137129

138-
pub async fn new_for_pending_incoming_connection<TFut, TTrans>(
130+
pub async fn new_for_pending_incoming_connection<TFut>(
139131
connection_id: ConnectionId,
140132
future: TFut,
141133
abort_receiver: oneshot::Receiver<Void>,
142-
mut events: mpsc::Sender<PendingConnectionEvent<TTrans>>,
134+
mut events: mpsc::Sender<PendingConnectionEvent>,
143135
) where
144-
TTrans: Transport,
145-
TFut: Future<Output = Result<TTrans::Output, TTrans::Error>> + Send + 'static,
136+
TFut: Future<Output = Result<(PeerId, StreamMuxerBox), std::io::Error>> + Send + 'static,
146137
{
147138
match futures::future::select(abort_receiver, Box::pin(future)).await {
148139
Either::Left((Err(oneshot::Canceled), _)) => {

0 commit comments

Comments
 (0)