Skip to content

Commit 4d7a535

Browse files
refactor(relay): use oneshot's to track requested streams
This is much cleaner as it allows us to construct a single `Future` that expresses the entire outbound protocol from stream opening to finish. Pull-Request: #4900.
1 parent ee17df9 commit 4d7a535

File tree

2 files changed

+99
-125
lines changed

2 files changed

+99
-125
lines changed

protocols/relay/src/priv_client/handler.rs

Lines changed: 97 additions & 123 deletions
Original file line numberDiff line numberDiff line change
@@ -18,27 +18,29 @@
1818
// FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
1919
// DEALINGS IN THE SOFTWARE.
2020

21+
use crate::client::Connection;
2122
use crate::priv_client::transport;
23+
use crate::priv_client::transport::ToListenerMsg;
2224
use crate::protocol::{self, inbound_stop, outbound_hop};
2325
use crate::{priv_client, proto, HOP_PROTOCOL_NAME, STOP_PROTOCOL_NAME};
26+
use futures::channel::mpsc::Sender;
2427
use futures::channel::{mpsc, oneshot};
2528
use futures::future::FutureExt;
2629
use futures_timer::Delay;
2730
use libp2p_core::multiaddr::Protocol;
2831
use libp2p_core::upgrade::ReadyUpgrade;
2932
use libp2p_core::Multiaddr;
3033
use libp2p_identity::PeerId;
31-
use libp2p_swarm::handler::{
32-
ConnectionEvent, DialUpgradeError, FullyNegotiatedInbound, FullyNegotiatedOutbound,
33-
};
34+
use libp2p_swarm::handler::{ConnectionEvent, FullyNegotiatedInbound};
3435
use libp2p_swarm::{
35-
ConnectionHandler, ConnectionHandlerEvent, StreamProtocol, StreamUpgradeError,
36+
ConnectionHandler, ConnectionHandlerEvent, Stream, StreamProtocol, StreamUpgradeError,
3637
SubstreamProtocol,
3738
};
3839
use std::collections::VecDeque;
3940
use std::task::{Context, Poll};
4041
use std::time::Duration;
4142
use std::{fmt, io};
43+
use void::Void;
4244

4345
/// The maximum number of circuits being denied concurrently.
4446
///
@@ -104,8 +106,7 @@ pub struct Handler {
104106
>,
105107
>,
106108

107-
/// We issue a stream upgrade for each pending request.
108-
pending_requests: VecDeque<PendingRequest>,
109+
pending_streams: VecDeque<oneshot::Sender<Result<Stream, StreamUpgradeError<Void>>>>,
109110

110111
inflight_reserve_requests: futures_bounded::FuturesTupleSet<
111112
Result<outbound_hop::Reservation, outbound_hop::ReserveError>,
@@ -133,7 +134,7 @@ impl Handler {
133134
remote_peer_id,
134135
remote_addr,
135136
queued_events: Default::default(),
136-
pending_requests: Default::default(),
137+
pending_streams: Default::default(),
137138
inflight_reserve_requests: futures_bounded::FuturesTupleSet::new(
138139
STREAM_TIMEOUT,
139140
MAX_CONCURRENT_STREAMS_PER_CONNECTION,
@@ -154,57 +155,6 @@ impl Handler {
154155
}
155156
}
156157

157-
fn on_dial_upgrade_error(
158-
&mut self,
159-
DialUpgradeError { error, .. }: DialUpgradeError<
160-
<Self as ConnectionHandler>::OutboundOpenInfo,
161-
<Self as ConnectionHandler>::OutboundProtocol,
162-
>,
163-
) {
164-
let pending_request = self
165-
.pending_requests
166-
.pop_front()
167-
.expect("got a stream error without a pending request");
168-
169-
match pending_request {
170-
PendingRequest::Reserve { mut to_listener } => {
171-
let error = match error {
172-
StreamUpgradeError::Timeout => {
173-
outbound_hop::ReserveError::Io(io::ErrorKind::TimedOut.into())
174-
}
175-
StreamUpgradeError::Apply(never) => void::unreachable(never),
176-
StreamUpgradeError::NegotiationFailed => {
177-
outbound_hop::ReserveError::Unsupported
178-
}
179-
StreamUpgradeError::Io(e) => outbound_hop::ReserveError::Io(e),
180-
};
181-
182-
if let Err(e) =
183-
to_listener.try_send(transport::ToListenerMsg::Reservation(Err(error)))
184-
{
185-
tracing::debug!("Unable to send error to listener: {}", e.into_send_error())
186-
}
187-
self.reservation.failed();
188-
}
189-
PendingRequest::Connect {
190-
to_dial: send_back, ..
191-
} => {
192-
let error = match error {
193-
StreamUpgradeError::Timeout => {
194-
outbound_hop::ConnectError::Io(io::ErrorKind::TimedOut.into())
195-
}
196-
StreamUpgradeError::NegotiationFailed => {
197-
outbound_hop::ConnectError::Unsupported
198-
}
199-
StreamUpgradeError::Io(e) => outbound_hop::ConnectError::Io(e),
200-
StreamUpgradeError::Apply(v) => void::unreachable(v),
201-
};
202-
203-
let _ = send_back.send(Err(error));
204-
}
205-
}
206-
}
207-
208158
fn insert_to_deny_futs(&mut self, circuit: inbound_stop::Circuit) {
209159
let src_peer_id = circuit.src_peer_id();
210160

@@ -219,6 +169,62 @@ impl Handler {
219169
)
220170
}
221171
}
172+
173+
fn make_new_reservation(&mut self, to_listener: Sender<ToListenerMsg>) {
174+
let (sender, receiver) = oneshot::channel();
175+
176+
self.pending_streams.push_back(sender);
177+
self.queued_events
178+
.push_back(ConnectionHandlerEvent::OutboundSubstreamRequest {
179+
protocol: SubstreamProtocol::new(ReadyUpgrade::new(HOP_PROTOCOL_NAME), ()),
180+
});
181+
let result = self.inflight_reserve_requests.try_push(
182+
async move {
183+
let stream = receiver
184+
.await
185+
.map_err(|_| io::Error::from(io::ErrorKind::BrokenPipe))?
186+
.map_err(into_reserve_error)?;
187+
188+
let reservation = outbound_hop::make_reservation(stream).await?;
189+
190+
Ok(reservation)
191+
},
192+
to_listener,
193+
);
194+
195+
if result.is_err() {
196+
tracing::warn!("Dropping in-flight reservation request because we are at capacity");
197+
}
198+
}
199+
200+
fn establish_new_circuit(
201+
&mut self,
202+
to_dial: oneshot::Sender<Result<Connection, outbound_hop::ConnectError>>,
203+
dst_peer_id: PeerId,
204+
) {
205+
let (sender, receiver) = oneshot::channel();
206+
207+
self.pending_streams.push_back(sender);
208+
self.queued_events
209+
.push_back(ConnectionHandlerEvent::OutboundSubstreamRequest {
210+
protocol: SubstreamProtocol::new(ReadyUpgrade::new(HOP_PROTOCOL_NAME), ()),
211+
});
212+
let result = self.inflight_outbound_connect_requests.try_push(
213+
async move {
214+
let stream = receiver
215+
.await
216+
.map_err(|_| io::Error::from(io::ErrorKind::BrokenPipe))?
217+
.map_err(into_connect_error)?;
218+
219+
outbound_hop::open_circuit(stream, dst_peer_id).await
220+
},
221+
to_dial,
222+
);
223+
224+
if result.is_err() {
225+
tracing::warn!("Dropping in-flight connect request because we are at capacity")
226+
}
227+
}
222228
}
223229

224230
impl ConnectionHandler for Handler {
@@ -236,25 +242,13 @@ impl ConnectionHandler for Handler {
236242
fn on_behaviour_event(&mut self, event: Self::FromBehaviour) {
237243
match event {
238244
In::Reserve { to_listener } => {
239-
self.pending_requests
240-
.push_back(PendingRequest::Reserve { to_listener });
241-
self.queued_events
242-
.push_back(ConnectionHandlerEvent::OutboundSubstreamRequest {
243-
protocol: SubstreamProtocol::new(ReadyUpgrade::new(HOP_PROTOCOL_NAME), ()),
244-
});
245+
self.make_new_reservation(to_listener);
245246
}
246247
In::EstablishCircuit {
247-
to_dial: send_back,
248+
to_dial,
248249
dst_peer_id,
249250
} => {
250-
self.pending_requests.push_back(PendingRequest::Connect {
251-
dst_peer_id,
252-
to_dial: send_back,
253-
});
254-
self.queued_events
255-
.push_back(ConnectionHandlerEvent::OutboundSubstreamRequest {
256-
protocol: SubstreamProtocol::new(ReadyUpgrade::new(HOP_PROTOCOL_NAME), ()),
257-
});
251+
self.establish_new_circuit(to_dial, dst_peer_id);
258252
}
259253
}
260254
}
@@ -402,12 +396,8 @@ impl ConnectionHandler for Handler {
402396
}
403397

404398
if let Poll::Ready(Some(to_listener)) = self.reservation.poll(cx) {
405-
self.pending_requests
406-
.push_back(PendingRequest::Reserve { to_listener });
407-
408-
return Poll::Ready(ConnectionHandlerEvent::OutboundSubstreamRequest {
409-
protocol: SubstreamProtocol::new(ReadyUpgrade::new(HOP_PROTOCOL_NAME), ()),
410-
});
399+
self.make_new_reservation(to_listener);
400+
continue;
411401
}
412402

413403
// Deny incoming circuit requests.
@@ -450,42 +440,16 @@ impl ConnectionHandler for Handler {
450440
tracing::warn!("Dropping inbound stream because we are at capacity")
451441
}
452442
}
453-
ConnectionEvent::FullyNegotiatedOutbound(FullyNegotiatedOutbound {
454-
protocol: stream,
455-
..
456-
}) => {
457-
let pending_request = self.pending_requests.pop_front().expect(
458-
"opened a stream without a pending connection command or a reserve listener",
459-
);
460-
match pending_request {
461-
PendingRequest::Reserve { to_listener } => {
462-
if self
463-
.inflight_reserve_requests
464-
.try_push(outbound_hop::make_reservation(stream), to_listener)
465-
.is_err()
466-
{
467-
tracing::warn!("Dropping outbound stream because we are at capacity");
468-
}
469-
}
470-
PendingRequest::Connect {
471-
dst_peer_id,
472-
to_dial: send_back,
473-
} => {
474-
if self
475-
.inflight_outbound_connect_requests
476-
.try_push(outbound_hop::open_circuit(stream, dst_peer_id), send_back)
477-
.is_err()
478-
{
479-
tracing::warn!("Dropping outbound stream because we are at capacity");
480-
}
481-
}
443+
ConnectionEvent::FullyNegotiatedOutbound(ev) => {
444+
if let Some(next) = self.pending_streams.pop_front() {
445+
let _ = next.send(Ok(ev.protocol));
482446
}
483447
}
484-
ConnectionEvent::ListenUpgradeError(listen_upgrade_error) => {
485-
void::unreachable(listen_upgrade_error.error)
486-
}
487-
ConnectionEvent::DialUpgradeError(dial_upgrade_error) => {
488-
self.on_dial_upgrade_error(dial_upgrade_error)
448+
ConnectionEvent::ListenUpgradeError(ev) => void::unreachable(ev.error),
449+
ConnectionEvent::DialUpgradeError(ev) => {
450+
if let Some(next) = self.pending_streams.pop_front() {
451+
let _ = next.send(Err(ev.error));
452+
}
489453
}
490454
_ => {}
491455
}
@@ -614,14 +578,24 @@ impl Reservation {
614578
}
615579
}
616580

617-
pub(crate) enum PendingRequest {
618-
Reserve {
619-
/// A channel into the [`Transport`](priv_client::Transport).
620-
to_listener: mpsc::Sender<transport::ToListenerMsg>,
621-
},
622-
Connect {
623-
dst_peer_id: PeerId,
624-
/// A channel into the future returned by [`Transport::dial`](libp2p_core::Transport::dial).
625-
to_dial: oneshot::Sender<Result<priv_client::Connection, outbound_hop::ConnectError>>,
626-
},
581+
fn into_reserve_error(e: StreamUpgradeError<Void>) -> outbound_hop::ReserveError {
582+
match e {
583+
StreamUpgradeError::Timeout => {
584+
outbound_hop::ReserveError::Io(io::ErrorKind::TimedOut.into())
585+
}
586+
StreamUpgradeError::Apply(never) => void::unreachable(never),
587+
StreamUpgradeError::NegotiationFailed => outbound_hop::ReserveError::Unsupported,
588+
StreamUpgradeError::Io(e) => outbound_hop::ReserveError::Io(e),
589+
}
590+
}
591+
592+
fn into_connect_error(e: StreamUpgradeError<Void>) -> outbound_hop::ConnectError {
593+
match e {
594+
StreamUpgradeError::Timeout => {
595+
outbound_hop::ConnectError::Io(io::ErrorKind::TimedOut.into())
596+
}
597+
StreamUpgradeError::Apply(never) => void::unreachable(never),
598+
StreamUpgradeError::NegotiationFailed => outbound_hop::ConnectError::Unsupported,
599+
StreamUpgradeError::Io(e) => outbound_hop::ConnectError::Io(e),
600+
}
627601
}

protocols/relay/src/protocol/outbound_hop.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -47,7 +47,7 @@ pub enum ConnectError {
4747
#[error("Remote does not support the `{HOP_PROTOCOL_NAME}` protocol")]
4848
Unsupported,
4949
#[error("IO error")]
50-
Io(#[source] io::Error),
50+
Io(#[from] io::Error),
5151
#[error("Protocol error")]
5252
Protocol(#[from] ProtocolViolation),
5353
}
@@ -61,7 +61,7 @@ pub enum ReserveError {
6161
#[error("Remote does not support the `{HOP_PROTOCOL_NAME}` protocol")]
6262
Unsupported,
6363
#[error("IO error")]
64-
Io(#[source] io::Error),
64+
Io(#[from] io::Error),
6565
#[error("Protocol error")]
6666
Protocol(#[from] ProtocolViolation),
6767
}

0 commit comments

Comments
 (0)