Skip to content

Commit 4aab54e

Browse files
committed
Merge branch 'pool-expired'
2 parents 13741f5 + 727b747 commit 4aab54e

File tree

2 files changed

+129
-3
lines changed

2 files changed

+129
-3
lines changed

src/client/mod.rs

Lines changed: 14 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -91,7 +91,6 @@ impl<C, B> Client<C, B> {
9191
}
9292
}
9393

94-
/// Create a new client with a specific connector.
9594
#[inline]
9695
fn configured(config: Config<C, B>, exec: Exec) -> Client<C, B> {
9796
Client {
@@ -118,6 +117,14 @@ where C: Connect,
118117
/// Send a constructed Request using this Client.
119118
#[inline]
120119
pub fn request(&self, mut req: Request<B>) -> FutureResponse {
120+
// TODO(0.12): do this at construction time.
121+
//
122+
// It cannot be done in the constructor because the Client::configured
123+
// does not have `B: 'static` bounds, which are required to spawn
124+
// the interval. In 0.12, add a static bounds to the constructor,
125+
// and move this.
126+
self.schedule_pool_timer();
127+
121128
match req.version() {
122129
HttpVersion::Http10 |
123130
HttpVersion::Http11 => (),
@@ -249,6 +256,12 @@ where C: Connect,
249256

250257
Box::new(resp)
251258
}
259+
260+
fn schedule_pool_timer(&self) {
261+
if let Exec::Handle(ref h) = self.executor {
262+
self.pool.spawn_expired_interval(h);
263+
}
264+
}
252265
}
253266

254267
impl<C, B> Service for Client<C, B>

src/client/pool.rs

Lines changed: 115 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,8 @@ use std::ops::{Deref, DerefMut, BitAndAssign};
66
use std::rc::{Rc, Weak};
77
use std::time::{Duration, Instant};
88

9-
use futures::{Future, Async, Poll};
9+
use futures::{Future, Async, Poll, Stream};
10+
use tokio::reactor::{Handle, Interval};
1011
use relay;
1112

1213
use proto::{KeepAlive, KA};
@@ -40,6 +41,12 @@ struct PoolInner<T> {
4041
// connection.
4142
parked: HashMap<Rc<String>, VecDeque<relay::Sender<Entry<T>>>>,
4243
timeout: Option<Duration>,
44+
// Used to prevent multiple intervals from being spawned to clear
45+
// expired connections.
46+
//
47+
// TODO(0.12): Remove the need for this when Client::schedule_pool_timer
48+
// can be done in Client::new.
49+
expired_timer_spawned: bool,
4350
}
4451

4552
impl<T: Clone + Ready> Pool<T> {
@@ -50,6 +57,7 @@ impl<T: Clone + Ready> Pool<T> {
5057
idle: HashMap::new(),
5158
parked: HashMap::new(),
5259
timeout: timeout,
60+
expired_timer_spawned: false,
5361
})),
5462
}
5563
}
@@ -194,6 +202,64 @@ impl<T> Pool<T> {
194202
inner.parked.remove(key);
195203
}
196204
}
205+
206+
fn clear_expired(&self) {
207+
let mut inner = self.inner.borrow_mut();
208+
209+
let dur = if let Some(dur) = inner.timeout {
210+
dur
211+
} else {
212+
return
213+
};
214+
215+
let now = Instant::now();
216+
//self.last_idle_check_at = now;
217+
218+
inner.idle.retain(|_key, values| {
219+
220+
values.retain(|val| {
221+
match val.status.get() {
222+
TimedKA::Idle(idle_at) if now - idle_at < dur => {
223+
true
224+
},
225+
_ => false,
226+
}
227+
//now - val.idle_at < dur
228+
});
229+
230+
// returning false evicts this key/val
231+
!values.is_empty()
232+
});
233+
}
234+
}
235+
236+
237+
impl<T: 'static> Pool<T> {
238+
pub(super) fn spawn_expired_interval(&self, handle: &Handle) {
239+
let mut inner = self.inner.borrow_mut();
240+
241+
if !inner.enabled {
242+
return;
243+
}
244+
245+
if inner.expired_timer_spawned {
246+
return;
247+
}
248+
inner.expired_timer_spawned = true;
249+
250+
let dur = if let Some(dur) = inner.timeout {
251+
dur
252+
} else {
253+
return
254+
};
255+
256+
let interval = Interval::new(dur, handle)
257+
.expect("reactor is gone");
258+
handle.spawn(IdleInterval {
259+
interval: interval,
260+
pool: Rc::downgrade(&self.inner),
261+
});
262+
}
197263
}
198264

199265
impl<T> Clone for Pool<T> {
@@ -385,6 +451,28 @@ impl Expiration {
385451
}
386452
}
387453

454+
struct IdleInterval<T> {
455+
interval: Interval,
456+
pool: Weak<RefCell<PoolInner<T>>>,
457+
}
458+
459+
impl<T: 'static> Future for IdleInterval<T> {
460+
type Item = ();
461+
type Error = ();
462+
463+
fn poll(&mut self) -> Poll<Self::Item, Self::Error> {
464+
loop {
465+
try_ready!(self.interval.poll().map_err(|_| unreachable!("interval cannot error")));
466+
467+
if let Some(inner) = self.pool.upgrade() {
468+
let pool = Pool { inner: inner };
469+
pool.clear_expired();
470+
} else {
471+
return Ok(Async::Ready(()));
472+
}
473+
}
474+
}
475+
}
388476

389477
#[cfg(test)]
390478
mod tests {
@@ -428,7 +516,7 @@ mod tests {
428516
}
429517

430518
#[test]
431-
fn test_pool_removes_expired() {
519+
fn test_pool_checkout_removes_expired() {
432520
let pool = Pool::new(true, Some(Duration::from_secs(1)));
433521
let key = Rc::new("foo".to_string());
434522

@@ -451,6 +539,31 @@ mod tests {
451539
assert!(pool.inner.borrow().idle.get(&key).is_none());
452540
}
453541

542+
#[test]
543+
fn test_pool_timer_removes_expired() {
544+
let mut core = ::tokio::reactor::Core::new().unwrap();
545+
let pool = Pool::new(true, Some(Duration::from_millis(100)));
546+
pool.spawn_expired_interval(&core.handle());
547+
let key = Rc::new("foo".to_string());
548+
549+
let mut pooled1 = pool.pooled(key.clone(), 41);
550+
pooled1.idle();
551+
let mut pooled2 = pool.pooled(key.clone(), 5);
552+
pooled2.idle();
553+
let mut pooled3 = pool.pooled(key.clone(), 99);
554+
pooled3.idle();
555+
556+
assert_eq!(pool.inner.borrow().idle.get(&key).map(|entries| entries.len()), Some(3));
557+
558+
let timeout = ::tokio::reactor::Timeout::new(
559+
Duration::from_millis(400), // allow for too-good resolution
560+
&core.handle()
561+
).unwrap();
562+
core.run(timeout).unwrap();
563+
564+
assert!(pool.inner.borrow().idle.get(&key).is_none());
565+
}
566+
454567
#[test]
455568
fn test_pool_checkout_task_unparked() {
456569
let pool = Pool::new(true, Some(Duration::from_secs(10)));

0 commit comments

Comments
 (0)