Skip to content

Commit e7f1378

Browse files
committed
time: add timer buckets to reduce lock contention
Introduce GlobalTimerBuckets, a ring buffer of per-bucket locks for timers 0-120 seconds in the future. This reduces occurrence and impact of global lock contention for short-lived timers, the common case. Timers > 120s fall back to the existing timer wheel. When a timer is dropped, it must be removed from whichever storage it's in before the underlying memory is freed. Add try_remove() to safely remove from buckets, and update clear_entry() to call it. Performance: Preliminary results from a million concurrent timer benchmark show 84x improvement in multi-threaded runs and 25x over single-threaded.
1 parent 0e9a67c commit e7f1378

File tree

5 files changed

+570
-32
lines changed

5 files changed

+570
-32
lines changed

benches/time_many_timers.rs

Lines changed: 43 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,4 @@
1+
use criterion::{black_box, criterion_group, criterion_main, Criterion};
12
/// Benchmark measuring timer lifecycle performance (Issue #6504)
23
///
34
/// This benchmark creates many timers, polls them once to register with the timer
@@ -7,23 +8,55 @@
78
/// The benchmark compares single-threaded vs multi-threaded performance to reveal
89
/// contention in timer registration and deregistration.
910
use std::future::{poll_fn, Future};
11+
use std::iter::repeat;
1012
use std::time::{Duration, Instant};
11-
12-
use criterion::{black_box, criterion_group, criterion_main, Criterion};
1313
use tokio::time::sleep;
1414

1515
const TIMER_COUNT: usize = 1_000_000;
1616

17-
/// Returns (wall_clock_duration, task_durations)
17+
struct TimerDistribution {
18+
duration: Duration,
19+
percentage: f64,
20+
}
21+
22+
const fn from_secs(s: u64) -> Duration {
23+
Duration::from_secs(s)
24+
}
25+
26+
const TIMER_DISTRIBUTIONS: &[TimerDistribution] = &[
27+
TimerDistribution {
28+
duration: from_secs(1),
29+
percentage: 0.40,
30+
},
31+
TimerDistribution {
32+
duration: from_secs(10),
33+
percentage: 0.30,
34+
},
35+
TimerDistribution {
36+
duration: from_secs(60),
37+
percentage: 0.20,
38+
},
39+
TimerDistribution {
40+
duration: from_secs(300),
41+
percentage: 0.10,
42+
},
43+
];
44+
45+
/// Each timer is polled once to register, then dropped before firing.
1846
async fn create_and_drop_timers_instrumented(count: usize, concurrent_tasks: usize) -> Duration {
1947
let handles: Vec<_> = (0..concurrent_tasks)
2048
.map(|_| {
2149
tokio::spawn(async move {
22-
// Create all sleep futures
23-
let mut sleeps = Vec::with_capacity(count / concurrent_tasks);
24-
for _ in 0..count / concurrent_tasks {
25-
sleeps.push(Box::pin(sleep(Duration::from_secs(60))));
26-
}
50+
// Create all sleep futures with realistic distribution
51+
let sleeps: Vec<_> = TIMER_DISTRIBUTIONS
52+
.iter()
53+
.flat_map(|td| {
54+
repeat(td.duration).take((TIMER_COUNT as f64 * td.percentage) as usize)
55+
})
56+
.cycle()
57+
.take(count / concurrent_tasks)
58+
.map(|timeout| Box::pin(sleep(timeout)))
59+
.collect();
2760

2861
// Start timing - poll and drop (METERED)
2962
let start = Instant::now();
@@ -47,14 +80,11 @@ async fn create_and_drop_timers_instrumented(count: usize, concurrent_tasks: usi
4780

4881
let wall_clock_start = Instant::now();
4982

50-
let mut task_durations = Vec::with_capacity(concurrent_tasks);
5183
for handle in handles {
52-
task_durations.push(handle.await.unwrap());
84+
handle.await.unwrap();
5385
}
5486

55-
let wall_clock = wall_clock_start.elapsed();
56-
57-
wall_clock
87+
wall_clock_start.elapsed()
5888
}
5989

6090
fn bench_many_timers(c: &mut Criterion) {

tokio/src/runtime/time/entry.rs

Lines changed: 65 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -327,6 +327,11 @@ pub(crate) struct TimerHandle {
327327
inner: NonNull<TimerShared>,
328328
}
329329

330+
// SAFETY: TimerHandle is a pointer to TimerShared, which is Send + Sync.
331+
// The handle can be safely sent across threads.
332+
unsafe impl Send for TimerHandle {}
333+
unsafe impl Sync for TimerHandle {}
334+
330335
pub(super) type EntryList = crate::util::linked_list::LinkedList<TimerShared, TimerShared>;
331336

332337
/// The shared state structure of a timer. This structure is shared between the
@@ -356,6 +361,12 @@ pub(crate) struct TimerShared {
356361
/// complete, fired, error, etc).
357362
state: StateCell,
358363

364+
/// Tracks whether this timer is in the buckets (true) or wheel (false).
365+
/// This is used during cleanup to determine where to remove the timer from.
366+
/// Accessed with relaxed ordering as it's only modified during registration
367+
/// under either the bucket lock or driver lock.
368+
in_buckets: crate::loom::sync::atomic::AtomicBool,
369+
359370
_p: PhantomPinned,
360371
}
361372

@@ -388,6 +399,7 @@ impl TimerShared {
388399
registered_when: AtomicU64::new(0),
389400
pointers: linked_list::Pointers::new(),
390401
state: StateCell::default(),
402+
in_buckets: crate::loom::sync::atomic::AtomicBool::new(false),
391403
_p: PhantomPinned,
392404
}
393405
}
@@ -415,7 +427,7 @@ impl TimerShared {
415427
///
416428
/// SAFETY: Must be called with the driver lock held, and when this entry is
417429
/// not in any timer wheel lists.
418-
unsafe fn set_registered_when(&self, when: u64) {
430+
pub(super) unsafe fn set_registered_when(&self, when: u64) {
419431
self.registered_when.store(when, Ordering::Relaxed);
420432
}
421433

@@ -453,6 +465,17 @@ impl TimerShared {
453465
pub(super) fn might_be_registered(&self) -> bool {
454466
self.state.might_be_registered()
455467
}
468+
469+
/// Returns true if this timer is registered in the buckets (vs the wheel).
470+
pub(super) fn is_in_buckets(&self) -> bool {
471+
self.in_buckets.load(Ordering::Relaxed)
472+
}
473+
474+
/// Marks this timer as being in the buckets.
475+
/// SAFETY: Must be called while holding the bucket lock during insertion.
476+
pub(super) unsafe fn mark_in_buckets(&self) {
477+
self.in_buckets.store(true, Ordering::Relaxed);
478+
}
456479
}
457480

458481
unsafe impl linked_list::Link for TimerShared {
@@ -583,11 +606,19 @@ impl TimerEntry {
583606
}
584607
};
585608

586-
if inner.extend_expiration(tick).is_ok() {
609+
// For bucket timers, we cannot use extend_expiration because the timer handle
610+
// is physically located in a specific bucket. Changing the expiration would
611+
// leave the handle in the wrong bucket. So we skip extend_expiration and go
612+
// straight to reregister, which will insert into the correct new bucket.
613+
//
614+
// NOTE: Even when reregister=false (e.g., reset_without_reregister used by Interval),
615+
// bucket timers MUST still reregister to move the handle to the new bucket.
616+
// The reregister=false case is only valid for wheel timers which can use extend_expiration.
617+
if !inner.is_in_buckets() && inner.extend_expiration(tick).is_ok() {
587618
return;
588619
}
589620

590-
if reregister {
621+
if reregister || inner.is_in_buckets() {
591622
unsafe {
592623
self.driver()
593624
.reregister(&self.driver.driver().io, tick, inner.into());
@@ -641,8 +672,14 @@ impl TimerHandle {
641672

642673
/// Forcibly sets the true and cached expiration times to the given tick.
643674
///
644-
/// SAFETY: The caller must ensure that the handle remains valid, the driver
645-
/// lock is held, and that the timer is not in any wheel linked lists.
675+
/// SAFETY: The caller must ensure that the handle remains valid and that
676+
/// the timer is not in any wheel linked lists. Additionally, either:
677+
/// - The driver lock is held (for wheel-based timers), OR
678+
/// - The appropriate bucket lock is held (for bucket-based timers)
679+
///
680+
/// The lock requirement ensures proper memory synchronization between the
681+
/// thread setting the expiration and the driver thread that will later
682+
/// fire the timer.
646683
pub(super) unsafe fn set_expiration(&self, tick: u64) {
647684
self.inner.as_ref().set_expiration(tick);
648685
}
@@ -670,6 +707,29 @@ impl TimerHandle {
670707
}
671708
}
672709

710+
/// Marks this timer as being in the buckets.
711+
/// SAFETY: Must be called while holding the bucket lock during insertion.
712+
pub(super) unsafe fn mark_in_buckets(&self) {
713+
self.inner.as_ref().mark_in_buckets()
714+
}
715+
716+
/// Unmarks this timer as being in the buckets.
717+
pub(super) unsafe fn unmark_in_buckets(&self) {
718+
self.inner.as_ref().in_buckets.store(false, crate::loom::sync::atomic::Ordering::Relaxed);
719+
}
720+
721+
/// Returns true if this timer is in the buckets (vs the wheel).
722+
/// SAFETY: The handle must be valid.
723+
pub(super) unsafe fn is_in_buckets_unsafe(&self) -> bool {
724+
unsafe { self.inner.as_ref().is_in_buckets() }
725+
}
726+
727+
/// Returns true if this timer might still be registered (not yet fired).
728+
/// SAFETY: The handle must be valid.
729+
pub(super) unsafe fn might_be_registered(&self) -> bool {
730+
unsafe { self.inner.as_ref().might_be_registered() }
731+
}
732+
673733
/// Attempts to transition to a terminal state. If the state is already a
674734
/// terminal state, does nothing.
675735
///

0 commit comments

Comments
 (0)