Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions benches/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -96,5 +96,10 @@ name = "time_timeout"
path = "time_timeout.rs"
harness = false

[[bench]]
name = "time_many_timers"
path = "time_many_timers.rs"
harness = false

[lints]
workspace = true
128 changes: 128 additions & 0 deletions benches/time_many_timers.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,128 @@
use criterion::{black_box, criterion_group, criterion_main, Criterion};
/// Benchmark measuring timer lifecycle performance (Issue #6504)
///
/// This benchmark creates many timers, polls them once to register with the timer
/// system, then drops them before they fire. This simulates the common case of
/// timeouts that don't fire (e.g., operations completing before timeout).
///
/// The benchmark compares single-threaded vs multi-threaded performance to reveal
/// contention in timer registration and deregistration.
use std::future::{poll_fn, Future};
use std::iter::repeat;
use std::time::{Duration, Instant};
use tokio::time::sleep;

const TIMER_COUNT: usize = 1_000_000;

struct TimerDistribution {
duration: Duration,
percentage: f64,
}

const fn from_secs(s: u64) -> Duration {
Duration::from_secs(s)
}

const TIMER_DISTRIBUTIONS: &[TimerDistribution] = &[
TimerDistribution {
duration: from_secs(1),
percentage: 0.40,
},
TimerDistribution {
duration: from_secs(10),
percentage: 0.30,
},
TimerDistribution {
duration: from_secs(60),
percentage: 0.20,
},
TimerDistribution {
duration: from_secs(300),
percentage: 0.10,
},
];

/// Each timer is polled once to register, then dropped before firing.
async fn create_and_drop_timers_instrumented(count: usize, concurrent_tasks: usize) -> Duration {
let handles: Vec<_> = (0..concurrent_tasks)
.map(|_| {
tokio::spawn(async move {
// Create all sleep futures with realistic distribution
let sleeps: Vec<_> = TIMER_DISTRIBUTIONS
.iter()
.flat_map(|td| {
repeat(td.duration).take((TIMER_COUNT as f64 * td.percentage) as usize)
})
.cycle()
.take(count / concurrent_tasks)
.map(|timeout| Box::pin(sleep(timeout)))
.collect();

// Start timing - poll and drop (METERED)
let start = Instant::now();
for mut sleep in sleeps {
// Poll once to register
poll_fn(|cx| {
let _ = sleep.as_mut().poll(cx);
std::task::Poll::Ready(())
})
.await;

// Drop to deregister
black_box(drop(sleep));
}
let elapsed = start.elapsed();

elapsed
})
})
.collect();

let wall_clock_start = Instant::now();

for handle in handles {
handle.await.unwrap();
}

wall_clock_start.elapsed()
}

fn bench_many_timers(c: &mut Criterion) {
let mut group = c.benchmark_group("many_timers");

// Single-threaded baseline
let runtime = tokio::runtime::Builder::new_current_thread()
.enable_all()
.build()
.unwrap();
group.bench_function("single_thread", |b| {
b.iter_custom(|_iters| {
let wall_clock = runtime
.block_on(async { create_and_drop_timers_instrumented(TIMER_COUNT, 1).await });

wall_clock
})
});

// Multi-threaded with 8 workers
let runtime_multi = tokio::runtime::Builder::new_multi_thread()
.enable_all()
.worker_threads(8)
.build()
.unwrap();

group.bench_function("multi_thread", |b| {
b.iter_custom(|_iters| {
let wall_clock = runtime_multi
.block_on(async { create_and_drop_timers_instrumented(TIMER_COUNT, 8).await });

wall_clock
})
});

group.finish();
}

criterion_group!(many_timers, bench_many_timers);

criterion_main!(many_timers);
73 changes: 68 additions & 5 deletions tokio/src/runtime/time/entry.rs
Original file line number Diff line number Diff line change
Expand Up @@ -327,6 +327,11 @@ pub(crate) struct TimerHandle {
inner: NonNull<TimerShared>,
}

// SAFETY: TimerHandle is a pointer to TimerShared, which is Send + Sync.
// The handle can be safely sent across threads.
unsafe impl Send for TimerHandle {}
unsafe impl Sync for TimerHandle {}

pub(super) type EntryList = crate::util::linked_list::LinkedList<TimerShared, TimerShared>;

/// The shared state structure of a timer. This structure is shared between the
Expand Down Expand Up @@ -356,6 +361,12 @@ pub(crate) struct TimerShared {
/// complete, fired, error, etc).
state: StateCell,

/// Tracks whether this timer is in the buckets (true) or wheel (false).
/// This is used during cleanup to determine where to remove the timer from.
/// Accessed with relaxed ordering as it's only modified during registration
/// under either the bucket lock or driver lock.
in_buckets: crate::loom::sync::atomic::AtomicBool,

_p: PhantomPinned,
}

Expand Down Expand Up @@ -388,6 +399,7 @@ impl TimerShared {
registered_when: AtomicU64::new(0),
pointers: linked_list::Pointers::new(),
state: StateCell::default(),
in_buckets: crate::loom::sync::atomic::AtomicBool::new(false),
_p: PhantomPinned,
}
}
Expand Down Expand Up @@ -415,7 +427,7 @@ impl TimerShared {
///
/// SAFETY: Must be called with the driver lock held, and when this entry is
/// not in any timer wheel lists.
unsafe fn set_registered_when(&self, when: u64) {
pub(super) unsafe fn set_registered_when(&self, when: u64) {
self.registered_when.store(when, Ordering::Relaxed);
}

Expand Down Expand Up @@ -453,6 +465,17 @@ impl TimerShared {
pub(super) fn might_be_registered(&self) -> bool {
self.state.might_be_registered()
}

/// Returns true if this timer is registered in the buckets (vs the wheel).
pub(super) fn is_in_buckets(&self) -> bool {
self.in_buckets.load(Ordering::Relaxed)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does this atomic load require extra synchronization?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same as above 👍🏻

}

/// Marks this timer as being in the buckets.
/// SAFETY: Must be called while holding the bucket lock during insertion.
pub(super) unsafe fn mark_in_buckets(&self) {
self.in_buckets.store(true, Ordering::Relaxed);
}
}

unsafe impl linked_list::Link for TimerShared {
Expand Down Expand Up @@ -583,11 +606,19 @@ impl TimerEntry {
}
};

if inner.extend_expiration(tick).is_ok() {
// For bucket timers, we cannot use extend_expiration because the timer handle
// is physically located in a specific bucket. Changing the expiration would
// leave the handle in the wrong bucket. So we skip extend_expiration and go
// straight to reregister, which will insert into the correct new bucket.
//
// NOTE: Even when reregister=false (e.g., reset_without_reregister used by Interval),
// bucket timers MUST still reregister to move the handle to the new bucket.
// The reregister=false case is only valid for wheel timers which can use extend_expiration.
if !inner.is_in_buckets() && inner.extend_expiration(tick).is_ok() {
return;
}

if reregister {
if reregister || inner.is_in_buckets() {
unsafe {
self.driver()
.reregister(&self.driver.driver().io, tick, inner.into());
Expand Down Expand Up @@ -641,8 +672,14 @@ impl TimerHandle {

/// Forcibly sets the true and cached expiration times to the given tick.
///
/// SAFETY: The caller must ensure that the handle remains valid, the driver
/// lock is held, and that the timer is not in any wheel linked lists.
/// SAFETY: The caller must ensure that the handle remains valid and that
/// the timer is not in any wheel linked lists. Additionally, either:
/// - The driver lock is held (for wheel-based timers), OR
/// - The appropriate bucket lock is held (for bucket-based timers)
///
/// The lock requirement ensures proper memory synchronization between the
/// thread setting the expiration and the driver thread that will later
/// fire the timer.
pub(super) unsafe fn set_expiration(&self, tick: u64) {
self.inner.as_ref().set_expiration(tick);
}
Expand Down Expand Up @@ -670,6 +707,32 @@ impl TimerHandle {
}
}

/// Marks this timer as being in the buckets.
/// SAFETY: Must be called while holding the bucket lock during insertion.
pub(super) unsafe fn mark_in_buckets(&self) {
self.inner.as_ref().mark_in_buckets()
}

/// Unmarks this timer as being in the buckets.
pub(super) unsafe fn unmark_in_buckets(&self) {
self.inner
.as_ref()
.in_buckets
.store(false, crate::loom::sync::atomic::Ordering::Relaxed);
}

/// Returns true if this timer is in the buckets (vs the wheel).
/// SAFETY: The handle must be valid.
pub(super) unsafe fn is_in_buckets_unsafe(&self) -> bool {
unsafe { self.inner.as_ref().is_in_buckets() }
}

/// Returns true if this timer might still be registered (not yet fired).
/// SAFETY: The handle must be valid.
pub(super) unsafe fn might_be_registered(&self) -> bool {
unsafe { self.inner.as_ref().might_be_registered() }
}

/// Attempts to transition to a terminal state. If the state is already a
/// terminal state, does nothing.
///
Expand Down
Loading
Loading