11
11
use std:: {
12
12
collections:: { HashMap , VecDeque } ,
13
13
ops:: Deref ,
14
- sync:: Arc ,
14
+ sync:: {
15
+ Arc ,
16
+ atomic:: { AtomicUsize , Ordering } ,
17
+ } ,
15
18
time:: Duration ,
16
19
} ;
17
20
18
21
use iroh:: {
19
22
Endpoint , NodeId ,
20
23
endpoint:: { ConnectError , Connection } ,
21
24
} ;
22
- use n0_future:: MaybeFuture ;
25
+ use n0_future:: { MaybeFuture , Stream , StreamExt } ;
23
26
use snafu:: Snafu ;
24
27
use tokio:: {
25
28
sync:: {
26
- OwnedSemaphorePermit ,
29
+ Notify ,
27
30
mpsc:: { self , error:: SendError as TokioSendError } ,
28
31
oneshot,
29
32
} ,
30
33
task:: JoinError ,
31
34
} ;
32
- use tokio_util:: time:: FutureExt ;
33
- use tracing:: { debug, error, trace} ;
35
+ use tokio_util:: time:: FutureExt as TimeFutureExt ;
36
+ use tracing:: { debug, error, info , trace} ;
34
37
35
38
/// Configuration options for the connection pool
36
39
#[ derive( Debug , Clone , Copy ) ]
@@ -54,7 +57,7 @@ impl Default for Options {
54
57
#[ derive( Debug ) ]
55
58
pub struct ConnectionRef {
56
59
connection : iroh:: endpoint:: Connection ,
57
- _permit : OwnedSemaphorePermit ,
60
+ _permit : OneConnection ,
58
61
}
59
62
60
63
impl Deref for ConnectionRef {
@@ -66,10 +69,10 @@ impl Deref for ConnectionRef {
66
69
}
67
70
68
71
impl ConnectionRef {
69
- fn new ( connection : iroh:: endpoint:: Connection , permit : OwnedSemaphorePermit ) -> Self {
72
+ fn new ( connection : iroh:: endpoint:: Connection , counter : OneConnection ) -> Self {
70
73
Self {
71
74
connection,
72
- _permit : permit ,
75
+ _permit : counter ,
73
76
}
74
77
}
75
78
}
@@ -124,6 +127,67 @@ struct RequestRef {
124
127
tx : oneshot:: Sender < Result < ConnectionRef , PoolConnectError > > ,
125
128
}
126
129
130
+ #[ derive( Debug ) ]
131
+ struct ConnectionCounterInner {
132
+ count : AtomicUsize ,
133
+ notify : Notify ,
134
+ }
135
+
136
+ #[ derive( Debug , Clone ) ]
137
+ struct ConnectionCounter {
138
+ inner : Arc < ConnectionCounterInner > ,
139
+ }
140
+
141
+ impl ConnectionCounter {
142
+ fn new ( ) -> Self {
143
+ Self {
144
+ inner : Arc :: new ( ConnectionCounterInner {
145
+ count : Default :: default ( ) ,
146
+ notify : Notify :: new ( ) ,
147
+ } ) ,
148
+ }
149
+ }
150
+
151
+ fn get_one ( & self ) -> OneConnection {
152
+ self . inner . count . fetch_add ( 1 , Ordering :: SeqCst ) ;
153
+ OneConnection {
154
+ inner : self . inner . clone ( ) ,
155
+ }
156
+ }
157
+
158
+ fn is_idle ( & self ) -> bool {
159
+ self . inner . count . load ( Ordering :: SeqCst ) == 0
160
+ }
161
+
162
+ /// Infinite stream that yields when the connection is briefly idle.
163
+ ///
164
+ /// Note that you still have to check if the connection is still idle when
165
+ /// you get the notification.
166
+ ///
167
+ /// Also note that this stream is triggered on [OneConnection::drop], so it
168
+ /// won't trigger initially even though a [ConnectionCounter] starts up as
169
+ /// idle.
170
+ fn idle_stream ( self ) -> impl Stream < Item = ( ) > {
171
+ n0_future:: stream:: unfold ( self , |c| async move {
172
+ c. inner . notify . notified ( ) . await ;
173
+ Some ( ( ( ) , c) )
174
+ } )
175
+ }
176
+ }
177
+
178
+ #[ derive( Debug ) ]
179
+ struct OneConnection {
180
+ inner : Arc < ConnectionCounterInner > ,
181
+ }
182
+
183
+ impl Drop for OneConnection {
184
+ fn drop ( & mut self ) {
185
+ if self . inner . count . fetch_sub ( 1 , Ordering :: SeqCst ) == 1 {
186
+ self . inner . notify . notify_waiters ( ) ;
187
+ }
188
+ }
189
+ }
190
+
127
191
/// Run a connection actor for a single node
128
192
async fn run_connection_actor (
129
193
node_id : NodeId ,
@@ -147,10 +211,11 @@ async fn run_connection_actor(
147
211
return ;
148
212
}
149
213
}
150
- let semaphore = Arc :: new ( tokio :: sync :: Semaphore :: new ( u32 :: MAX as usize ) ) ;
214
+ let counter = ConnectionCounter :: new ( ) ;
151
215
let idle_timer = MaybeFuture :: default ( ) ;
152
- let idle_fut = MaybeFuture :: default ( ) ;
153
- tokio:: pin!( idle_timer, idle_fut) ;
216
+ let idle_stream = counter. clone ( ) . idle_stream ( ) ;
217
+
218
+ tokio:: pin!( idle_timer, idle_stream) ;
154
219
155
220
loop {
156
221
tokio:: select! {
@@ -161,15 +226,9 @@ async fn run_connection_actor(
161
226
match handler {
162
227
Some ( RequestRef { id, tx } ) => {
163
228
assert!( id == node_id, "Not for me!" ) ;
164
- trace!( %node_id, "Received new request" ) ;
165
229
match & state {
166
230
Ok ( state) => {
167
- // first acquire a permit for the op, then aquire all permits for idle
168
- let permit = semaphore. clone( ) . acquire_owned( ) . await . expect( "semaphore closed" ) ;
169
- let res = ConnectionRef :: new( state. clone( ) , permit) ;
170
- if idle_fut. is_none( ) {
171
- idle_fut. as_mut( ) . set_future( semaphore. clone( ) . acquire_many_owned( u32 :: MAX ) ) ;
172
- }
231
+ let res = ConnectionRef :: new( state. clone( ) , counter. get_one( ) ) ;
173
232
174
233
// clear the idle timer
175
234
idle_timer. as_mut( ) . set_none( ) ;
@@ -187,7 +246,10 @@ async fn run_connection_actor(
187
246
}
188
247
}
189
248
190
- _ = & mut idle_fut => {
249
+ _ = idle_stream. next( ) => {
250
+ if !counter. is_idle( ) {
251
+ continue ;
252
+ } ;
191
253
// notify the pool that we are idle.
192
254
trace!( %node_id, "Idle" ) ;
193
255
if context. owner. idle( node_id) . await . is_err( ) {
@@ -200,20 +262,21 @@ async fn run_connection_actor(
200
262
201
263
// Idle timeout - request shutdown
202
264
_ = & mut idle_timer => {
203
- debug !( %node_id, "Connection idle , requesting shutdown" ) ;
265
+ trace !( %node_id, "Idle timer expired , requesting shutdown" ) ;
204
266
context. owner. close( node_id) . await . ok( ) ;
205
267
// Don't break here - wait for main actor to close our channel
206
268
}
207
269
}
208
270
}
209
271
210
272
if let Ok ( connection) = state {
211
- let reason = if semaphore . available_permits ( ) == u32 :: MAX as usize {
212
- "idle "
273
+ let reason = if counter . inner . count . load ( Ordering :: SeqCst ) > 0 {
274
+ b"drop "
213
275
} else {
214
- "drop "
276
+ b"idle "
215
277
} ;
216
- connection. close ( 0u32 . into ( ) , reason. as_bytes ( ) ) ;
278
+ info ! ( %node_id, "Connection actor shutting down, closing connection" ) ;
279
+ connection. close ( 0u32 . into ( ) , reason) ;
217
280
}
218
281
219
282
debug ! ( %node_id, "Connection actor shutting down" ) ;
0 commit comments