Skip to content

Commit 67b200b

Browse files
committed
Use snafu for errors, and track pool tasks explicitly
1 parent cbeb15a commit 67b200b

File tree

3 files changed

+173
-127
lines changed

3 files changed

+173
-127
lines changed

iroh-connection-pool/src/connection_pool.rs

Lines changed: 87 additions & 65 deletions
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,7 @@ use tokio::{
2626
mpsc::{self, error::SendError as TokioSendError},
2727
oneshot,
2828
},
29-
task::JoinError,
29+
task::JoinSet,
3030
};
3131
use tokio_util::time::FutureExt as TimeFutureExt;
3232
use tracing::{debug, error, trace};
@@ -86,7 +86,8 @@ struct Context {
8686
///
8787
/// This includes the normal iroh connection errors as well as pool specific
8888
/// errors such as timeouts and connection limits.
89-
#[derive(Debug, Clone)]
89+
#[derive(Debug, Clone, Snafu)]
90+
#[snafu(module)]
9091
pub enum PoolConnectError {
9192
/// Connection pool is shut down
9293
Shutdown,
@@ -95,23 +96,27 @@ pub enum PoolConnectError {
9596
/// Too many connections
9697
TooManyConnections,
9798
/// Error during connect
98-
ConnectError(Arc<ConnectError>),
99-
/// Handler actor panicked
100-
JoinError(Arc<JoinError>),
99+
ConnectError { source: Arc<ConnectError> },
101100
}
102101

103-
impl std::fmt::Display for PoolConnectError {
104-
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
105-
match self {
106-
PoolConnectError::Shutdown => write!(f, "Connection pool is shut down"),
107-
PoolConnectError::Timeout => write!(f, "Connection timed out"),
108-
PoolConnectError::TooManyConnections => write!(f, "Too many connections"),
109-
PoolConnectError::ConnectError(e) => write!(f, "Connection error: {e}"),
110-
PoolConnectError::JoinError(e) => write!(f, "Join error: {e}"),
102+
impl From<ConnectError> for PoolConnectError {
103+
fn from(e: ConnectError) -> Self {
104+
PoolConnectError::ConnectError {
105+
source: Arc::new(e),
111106
}
112107
}
113108
}
114109

110+
/// Error when calling a fn on the [`ConnectionPool`].
111+
///
112+
/// The only thing that can go wrong is that the connection pool is shut down.
113+
#[derive(Debug, Snafu)]
114+
#[snafu(module)]
115+
pub enum ConnectionPoolError {
116+
/// The connection pool has been shut down
117+
Shutdown,
118+
}
119+
115120
pub type PoolConnectResult = std::result::Result<Connection, PoolConnectError>;
116121

117122
enum ActorMessage {
@@ -125,7 +130,7 @@ struct RequestRef {
125130
tx: oneshot::Sender<Result<ConnectionRef, PoolConnectError>>,
126131
}
127132

128-
/// Run a connection actor for a single node
133+
/// Run a connection actor for a single remote node id
129134
async fn run_connection_actor(
130135
node_id: NodeId,
131136
mut rx: mpsc::Receiver<RequestRef>,
@@ -139,7 +144,7 @@ async fn run_connection_actor(
139144
.await
140145
{
141146
Ok(Ok(conn)) => Ok(conn),
142-
Ok(Err(e)) => Err(PoolConnectError::ConnectError(Arc::new(e))),
147+
Ok(Err(e)) => Err(PoolConnectError::from(e)),
143148
Err(_) => Err(PoolConnectError::Timeout),
144149
};
145150
if let Err(e) = &state {
@@ -221,6 +226,8 @@ struct Actor {
221226
// idle set (most recent last)
222227
// todo: use a better data structure if this becomes a performance issue
223228
idle: VecDeque<NodeId>,
229+
// per connection tasks
230+
tasks: JoinSet<()>,
224231
}
225232

226233
impl Actor {
@@ -241,6 +248,7 @@ impl Actor {
241248
endpoint,
242249
owner: ConnectionPool { tx: tx.clone() },
243250
}),
251+
tasks: JoinSet::new(),
244252
},
245253
tx,
246254
)
@@ -264,70 +272,84 @@ impl Actor {
264272
self.remove_idle(id);
265273
}
266274

267-
pub async fn run(mut self) {
268-
while let Some(msg) = self.rx.recv().await {
269-
match msg {
270-
ActorMessage::RequestRef(mut msg) => {
271-
let id = msg.id;
272-
self.remove_idle(id);
273-
// Try to send to existing connection actor
274-
if let Some(conn_tx) = self.connections.get(&id) {
275-
if let Err(TokioSendError(e)) = conn_tx.send(msg).await {
276-
msg = e;
277-
} else {
278-
continue;
279-
}
280-
// Connection actor died, remove it
281-
self.remove_connection(id);
275+
async fn handle_msg(&mut self, msg: ActorMessage) {
276+
match msg {
277+
ActorMessage::RequestRef(mut msg) => {
278+
let id = msg.id;
279+
self.remove_idle(id);
280+
// Try to send to existing connection actor
281+
if let Some(conn_tx) = self.connections.get(&id) {
282+
if let Err(TokioSendError(e)) = conn_tx.send(msg).await {
283+
msg = e;
284+
} else {
285+
return;
282286
}
287+
// Connection actor died, remove it
288+
self.remove_connection(id);
289+
}
283290

284-
// No connection actor or it died - check limits
285-
if self.connections.len() >= self.context.options.max_connections {
286-
if let Some(idle) = self.pop_oldest_idle() {
287-
// remove the oldest idle connection to make room for one more
288-
trace!("removing oldest idle connection {}", idle);
289-
self.connections.remove(&idle);
290-
} else {
291-
msg.tx.send(Err(PoolConnectError::TooManyConnections)).ok();
292-
continue;
293-
}
291+
// No connection actor or it died - check limits
292+
if self.connections.len() >= self.context.options.max_connections {
293+
if let Some(idle) = self.pop_oldest_idle() {
294+
// remove the oldest idle connection to make room for one more
295+
trace!("removing oldest idle connection {}", idle);
296+
self.connections.remove(&idle);
297+
} else {
298+
msg.tx.send(Err(PoolConnectError::TooManyConnections)).ok();
299+
return;
294300
}
295-
let (conn_tx, conn_rx) = mpsc::channel(100);
296-
self.connections.insert(id, conn_tx.clone());
301+
}
302+
let (conn_tx, conn_rx) = mpsc::channel(100);
303+
self.connections.insert(id, conn_tx.clone());
297304

298-
let context = self.context.clone();
305+
let context = self.context.clone();
299306

300-
tokio::spawn(run_connection_actor(id, conn_rx, context));
307+
self.tasks.spawn(run_connection_actor(id, conn_rx, context));
301308

302-
// Send the handler to the new actor
303-
if conn_tx.send(msg).await.is_err() {
304-
error!(%id, "Failed to send handler to new connection actor");
305-
self.connections.remove(&id);
306-
}
309+
// Send the handler to the new actor
310+
if conn_tx.send(msg).await.is_err() {
311+
error!(%id, "Failed to send handler to new connection actor");
312+
self.connections.remove(&id);
307313
}
308-
ActorMessage::ConnectionIdle { id } => {
309-
self.add_idle(id);
310-
trace!(%id, "connection idle");
314+
}
315+
ActorMessage::ConnectionIdle { id } => {
316+
self.add_idle(id);
317+
trace!(%id, "connection idle");
318+
}
319+
ActorMessage::ConnectionShutdown { id } => {
320+
// Remove the connection from our map - this closes the channel
321+
self.remove_connection(id);
322+
trace!(%id, "removed connection");
323+
}
324+
}
325+
}
326+
327+
pub async fn run(mut self) {
328+
loop {
329+
tokio::select! {
330+
biased;
331+
332+
msg = self.rx.recv() => {
333+
if let Some(msg) = msg {
334+
self.handle_msg(msg).await;
335+
} else {
336+
break;
337+
}
311338
}
312-
ActorMessage::ConnectionShutdown { id } => {
313-
// Remove the connection from our map - this closes the channel
314-
self.remove_connection(id);
315-
trace!(%id, "removed connection");
339+
340+
res = self.tasks.join_next(), if !self.tasks.is_empty() => {
341+
if let Some(Err(e)) = res {
342+
// panic during either connection establishment or
343+
// timeout. Message handling is outside the actor's
344+
// control, so we should hopefully never get this.
345+
error!("Connection actor failed: {e}");
346+
}
316347
}
317348
}
318349
}
319350
}
320351
}
321352

322-
/// Error when calling a fn on the [`ConnectionPool`].
323-
///
324-
/// The only thing that can go wrong is that the connection pool is shut down.
325-
#[derive(Debug, Snafu)]
326-
pub enum ConnectionPoolError {
327-
/// The connection pool has been shut down
328-
Shutdown,
329-
}
330-
331353
/// A connection pool
332354
#[derive(Debug, Clone)]
333355
pub struct ConnectionPool {

0 commit comments

Comments
 (0)