Skip to content

Commit ac85739

Browse files
committed
Change the ConnectionPool0Rtt to return a ConnectionRef
1 parent 501f2ee commit ac85739

File tree

3 files changed

+335
-274
lines changed

3 files changed

+335
-274
lines changed

iroh-connection-pool/src/connection_pool.rs

Lines changed: 10 additions & 98 deletions
Original file line numberDiff line numberDiff line change
@@ -10,30 +10,27 @@
1010
//! connect, and don't clone it out of the future.
1111
use std::{
1212
collections::{HashMap, VecDeque},
13-
ops::Deref,
14-
sync::{
15-
Arc,
16-
atomic::{AtomicUsize, Ordering},
17-
},
13+
sync::Arc,
1814
time::Duration,
1915
};
2016

2117
use iroh::{
2218
Endpoint, NodeId,
2319
endpoint::{ConnectError, Connection},
2420
};
25-
use n0_future::{MaybeFuture, Stream, StreamExt};
21+
use n0_future::{MaybeFuture, StreamExt};
2622
use snafu::Snafu;
2723
use tokio::{
2824
sync::{
29-
Notify,
3025
mpsc::{self, error::SendError as TokioSendError},
3126
oneshot,
3227
},
3328
task::JoinError,
3429
};
3530
use tokio_util::time::FutureExt as TimeFutureExt;
36-
use tracing::{debug, error, info, trace};
31+
use tracing::{debug, error, trace};
32+
33+
use crate::{ConnectionCounter, ConnectionRef};
3734

3835
/// Configuration options for the connection pool
3936
#[derive(Debug, Clone, Copy)]
@@ -53,30 +50,6 @@ impl Default for Options {
5350
}
5451
}
5552

56-
/// A reference to a connection that is owned by a connection pool.
57-
#[derive(Debug)]
58-
pub struct ConnectionRef {
59-
connection: iroh::endpoint::Connection,
60-
_permit: OneConnection,
61-
}
62-
63-
impl Deref for ConnectionRef {
64-
type Target = iroh::endpoint::Connection;
65-
66-
fn deref(&self) -> &Self::Target {
67-
&self.connection
68-
}
69-
}
70-
71-
impl ConnectionRef {
72-
fn new(connection: iroh::endpoint::Connection, counter: OneConnection) -> Self {
73-
Self {
74-
connection,
75-
_permit: counter,
76-
}
77-
}
78-
}
79-
8053
struct Context {
8154
options: Options,
8255
endpoint: Endpoint,
@@ -127,67 +100,6 @@ struct RequestRef {
127100
tx: oneshot::Sender<Result<ConnectionRef, PoolConnectError>>,
128101
}
129102

130-
#[derive(Debug)]
131-
struct ConnectionCounterInner {
132-
count: AtomicUsize,
133-
notify: Notify,
134-
}
135-
136-
#[derive(Debug, Clone)]
137-
struct ConnectionCounter {
138-
inner: Arc<ConnectionCounterInner>,
139-
}
140-
141-
impl ConnectionCounter {
142-
fn new() -> Self {
143-
Self {
144-
inner: Arc::new(ConnectionCounterInner {
145-
count: Default::default(),
146-
notify: Notify::new(),
147-
}),
148-
}
149-
}
150-
151-
fn get_one(&self) -> OneConnection {
152-
self.inner.count.fetch_add(1, Ordering::SeqCst);
153-
OneConnection {
154-
inner: self.inner.clone(),
155-
}
156-
}
157-
158-
fn is_idle(&self) -> bool {
159-
self.inner.count.load(Ordering::SeqCst) == 0
160-
}
161-
162-
/// Infinite stream that yields when the connection is briefly idle.
163-
///
164-
/// Note that you still have to check if the connection is still idle when
165-
/// you get the notification.
166-
///
167-
/// Also note that this stream is triggered on [OneConnection::drop], so it
168-
/// won't trigger initially even though a [ConnectionCounter] starts up as
169-
/// idle.
170-
fn idle_stream(self) -> impl Stream<Item = ()> {
171-
n0_future::stream::unfold(self, |c| async move {
172-
c.inner.notify.notified().await;
173-
Some(((), c))
174-
})
175-
}
176-
}
177-
178-
#[derive(Debug)]
179-
struct OneConnection {
180-
inner: Arc<ConnectionCounterInner>,
181-
}
182-
183-
impl Drop for OneConnection {
184-
fn drop(&mut self) {
185-
if self.inner.count.fetch_sub(1, Ordering::SeqCst) == 1 {
186-
self.inner.notify.notify_waiters();
187-
}
188-
}
189-
}
190-
191103
/// Run a connection actor for a single node
192104
async fn run_connection_actor(
193105
node_id: NodeId,
@@ -270,11 +182,7 @@ async fn run_connection_actor(
270182
}
271183

272184
if let Ok(connection) = state {
273-
let reason = if counter.inner.count.load(Ordering::SeqCst) > 0 {
274-
b"drop"
275-
} else {
276-
b"idle"
277-
};
185+
let reason = if counter.is_idle() { b"idle" } else { b"drop" };
278186
connection.close(0u32.into(), reason);
279187
}
280188

@@ -411,6 +319,10 @@ impl ConnectionPool {
411319
Self { tx }
412320
}
413321

322+
/// Returns either a fresh connection or a reference to an existing one.
323+
///
324+
/// This is guaranteed to return after approximately [Options::connect_timeout]
325+
/// with either an error or a connection.
414326
pub async fn connect(
415327
&self,
416328
id: NodeId,

0 commit comments

Comments
 (0)