diff --git a/benches/Cargo.toml b/benches/Cargo.toml index 87163be153f..2b70cd0d7f5 100644 --- a/benches/Cargo.toml +++ b/benches/Cargo.toml @@ -96,5 +96,10 @@ name = "time_timeout" path = "time_timeout.rs" harness = false +[[bench]] +name = "time_many_timers" +path = "time_many_timers.rs" +harness = false + [lints] workspace = true diff --git a/benches/time_many_timers.rs b/benches/time_many_timers.rs new file mode 100644 index 00000000000..d21c7fbdc73 --- /dev/null +++ b/benches/time_many_timers.rs @@ -0,0 +1,128 @@ +use criterion::{black_box, criterion_group, criterion_main, Criterion}; +/// Benchmark measuring timer lifecycle performance (Issue #6504) +/// +/// This benchmark creates many timers, polls them once to register with the timer +/// system, then drops them before they fire. This simulates the common case of +/// timeouts that don't fire (e.g., operations completing before timeout). +/// +/// The benchmark compares single-threaded vs multi-threaded performance to reveal +/// contention in timer registration and deregistration. +use std::future::{poll_fn, Future}; +use std::iter::repeat; +use std::time::{Duration, Instant}; +use tokio::time::sleep; + +const TIMER_COUNT: usize = 1_000_000; + +struct TimerDistribution { + duration: Duration, + percentage: f64, +} + +const fn from_secs(s: u64) -> Duration { + Duration::from_secs(s) +} + +const TIMER_DISTRIBUTIONS: &[TimerDistribution] = &[ + TimerDistribution { + duration: from_secs(1), + percentage: 0.40, + }, + TimerDistribution { + duration: from_secs(10), + percentage: 0.30, + }, + TimerDistribution { + duration: from_secs(60), + percentage: 0.20, + }, + TimerDistribution { + duration: from_secs(300), + percentage: 0.10, + }, +]; + +/// Each timer is polled once to register, then dropped before firing. +async fn create_and_drop_timers_instrumented(count: usize, concurrent_tasks: usize) -> Duration { + let handles: Vec<_> = (0..concurrent_tasks) + .map(|_| { + tokio::spawn(async move { + // Create all sleep futures with realistic distribution + let sleeps: Vec<_> = TIMER_DISTRIBUTIONS + .iter() + .flat_map(|td| { + repeat(td.duration).take((TIMER_COUNT as f64 * td.percentage) as usize) + }) + .cycle() + .take(count / concurrent_tasks) + .map(|timeout| Box::pin(sleep(timeout))) + .collect(); + + // Start timing - poll and drop (METERED) + let start = Instant::now(); + for mut sleep in sleeps { + // Poll once to register + poll_fn(|cx| { + let _ = sleep.as_mut().poll(cx); + std::task::Poll::Ready(()) + }) + .await; + + // Drop to deregister + black_box(drop(sleep)); + } + let elapsed = start.elapsed(); + + elapsed + }) + }) + .collect(); + + let wall_clock_start = Instant::now(); + + for handle in handles { + handle.await.unwrap(); + } + + wall_clock_start.elapsed() +} + +fn bench_many_timers(c: &mut Criterion) { + let mut group = c.benchmark_group("many_timers"); + + // Single-threaded baseline + let runtime = tokio::runtime::Builder::new_current_thread() + .enable_all() + .build() + .unwrap(); + group.bench_function("single_thread", |b| { + b.iter_custom(|_iters| { + let wall_clock = runtime + .block_on(async { create_and_drop_timers_instrumented(TIMER_COUNT, 1).await }); + + wall_clock + }) + }); + + // Multi-threaded with 8 workers + let runtime_multi = tokio::runtime::Builder::new_multi_thread() + .enable_all() + .worker_threads(8) + .build() + .unwrap(); + + group.bench_function("multi_thread", |b| { + b.iter_custom(|_iters| { + let wall_clock = runtime_multi + .block_on(async { create_and_drop_timers_instrumented(TIMER_COUNT, 8).await }); + + wall_clock + }) + }); + + group.finish(); +} + +criterion_group!(many_timers, bench_many_timers); + +criterion_main!(many_timers); diff --git a/tokio/src/runtime/time/entry.rs b/tokio/src/runtime/time/entry.rs index 627fcbc5ec3..0397b05fd58 100644 --- a/tokio/src/runtime/time/entry.rs +++ b/tokio/src/runtime/time/entry.rs @@ -327,6 +327,11 @@ pub(crate) struct TimerHandle { inner: NonNull, } +// SAFETY: TimerHandle is a pointer to TimerShared, which is Send + Sync. +// The handle can be safely sent across threads. +unsafe impl Send for TimerHandle {} +unsafe impl Sync for TimerHandle {} + pub(super) type EntryList = crate::util::linked_list::LinkedList; /// The shared state structure of a timer. This structure is shared between the @@ -356,6 +361,12 @@ pub(crate) struct TimerShared { /// complete, fired, error, etc). state: StateCell, + /// Tracks whether this timer is in the buckets (true) or wheel (false). + /// This is used during cleanup to determine where to remove the timer from. + /// Accessed with relaxed ordering as it's only modified during registration + /// under either the bucket lock or driver lock. + in_buckets: crate::loom::sync::atomic::AtomicBool, + _p: PhantomPinned, } @@ -388,6 +399,7 @@ impl TimerShared { registered_when: AtomicU64::new(0), pointers: linked_list::Pointers::new(), state: StateCell::default(), + in_buckets: crate::loom::sync::atomic::AtomicBool::new(false), _p: PhantomPinned, } } @@ -415,7 +427,7 @@ impl TimerShared { /// /// SAFETY: Must be called with the driver lock held, and when this entry is /// not in any timer wheel lists. - unsafe fn set_registered_when(&self, when: u64) { + pub(super) unsafe fn set_registered_when(&self, when: u64) { self.registered_when.store(when, Ordering::Relaxed); } @@ -453,6 +465,17 @@ impl TimerShared { pub(super) fn might_be_registered(&self) -> bool { self.state.might_be_registered() } + + /// Returns true if this timer is registered in the buckets (vs the wheel). + pub(super) fn is_in_buckets(&self) -> bool { + self.in_buckets.load(Ordering::Relaxed) + } + + /// Marks this timer as being in the buckets. + /// SAFETY: Must be called while holding the bucket lock during insertion. + pub(super) unsafe fn mark_in_buckets(&self) { + self.in_buckets.store(true, Ordering::Relaxed); + } } unsafe impl linked_list::Link for TimerShared { @@ -583,11 +606,19 @@ impl TimerEntry { } }; - if inner.extend_expiration(tick).is_ok() { + // For bucket timers, we cannot use extend_expiration because the timer handle + // is physically located in a specific bucket. Changing the expiration would + // leave the handle in the wrong bucket. So we skip extend_expiration and go + // straight to reregister, which will insert into the correct new bucket. + // + // NOTE: Even when reregister=false (e.g., reset_without_reregister used by Interval), + // bucket timers MUST still reregister to move the handle to the new bucket. + // The reregister=false case is only valid for wheel timers which can use extend_expiration. + if !inner.is_in_buckets() && inner.extend_expiration(tick).is_ok() { return; } - if reregister { + if reregister || inner.is_in_buckets() { unsafe { self.driver() .reregister(&self.driver.driver().io, tick, inner.into()); @@ -641,8 +672,14 @@ impl TimerHandle { /// Forcibly sets the true and cached expiration times to the given tick. /// - /// SAFETY: The caller must ensure that the handle remains valid, the driver - /// lock is held, and that the timer is not in any wheel linked lists. + /// SAFETY: The caller must ensure that the handle remains valid and that + /// the timer is not in any wheel linked lists. Additionally, either: + /// - The driver lock is held (for wheel-based timers), OR + /// - The appropriate bucket lock is held (for bucket-based timers) + /// + /// The lock requirement ensures proper memory synchronization between the + /// thread setting the expiration and the driver thread that will later + /// fire the timer. pub(super) unsafe fn set_expiration(&self, tick: u64) { self.inner.as_ref().set_expiration(tick); } @@ -670,6 +707,32 @@ impl TimerHandle { } } + /// Marks this timer as being in the buckets. + /// SAFETY: Must be called while holding the bucket lock during insertion. + pub(super) unsafe fn mark_in_buckets(&self) { + self.inner.as_ref().mark_in_buckets() + } + + /// Unmarks this timer as being in the buckets. + pub(super) unsafe fn unmark_in_buckets(&self) { + self.inner + .as_ref() + .in_buckets + .store(false, crate::loom::sync::atomic::Ordering::Relaxed); + } + + /// Returns true if this timer is in the buckets (vs the wheel). + /// SAFETY: The handle must be valid. + pub(super) unsafe fn is_in_buckets_unsafe(&self) -> bool { + unsafe { self.inner.as_ref().is_in_buckets() } + } + + /// Returns true if this timer might still be registered (not yet fired). + /// SAFETY: The handle must be valid. + pub(super) unsafe fn might_be_registered(&self) -> bool { + unsafe { self.inner.as_ref().might_be_registered() } + } + /// Attempts to transition to a terminal state. If the state is already a /// terminal state, does nothing. /// diff --git a/tokio/src/runtime/time/mod.rs b/tokio/src/runtime/time/mod.rs index 3250dce97f6..b1f4a7b2b7d 100644 --- a/tokio/src/runtime/time/mod.rs +++ b/tokio/src/runtime/time/mod.rs @@ -16,6 +16,9 @@ pub(crate) use self::handle::Handle; mod source; pub(crate) use source::TimeSource; +mod timer_buckets; +use timer_buckets::GlobalTimerBuckets; + mod wheel; use crate::loom::sync::atomic::{AtomicBool, Ordering}; @@ -94,6 +97,10 @@ struct Inner { // The state is split like this so `Handle` can access `is_shutdown` without locking the mutex state: Mutex, + /// Global timer buckets for fast-path timer registration (0-120 seconds). + /// These have their own synchronization and don't need the driver lock. + buckets: GlobalTimerBuckets, + /// True if the driver is being shutdown. is_shutdown: AtomicBool, @@ -112,7 +119,7 @@ struct InnerState { /// The earliest time at which we promise to wake up without unparking. next_wake: Option, - /// Timer wheel. + /// Timer wheel (fallback for timers > 120 seconds). wheel: wheel::Wheel, } @@ -125,6 +132,7 @@ impl Driver { /// Specifying the source of time is useful when testing. pub(crate) fn new(park: IoStack, clock: &Clock) -> (Driver, Handle) { let time_source = TimeSource::new(clock); + let initial_tick = time_source.now(clock); let handle = Handle { time_source, @@ -133,6 +141,7 @@ impl Driver { next_wake: None, wheel: wheel::Wheel::new(), }), + buckets: GlobalTimerBuckets::new(initial_tick), is_shutdown: AtomicBool::new(false), #[cfg(feature = "test-util")] @@ -175,7 +184,16 @@ impl Driver { assert!(!handle.is_shutdown()); - let next_wake = lock.wheel.next_expiration_time(); + // Calculate next_wake from both bucket timers and wheel timers + let maybe_bucket_next = handle.inner.buckets.next_expiration_time(); + let maybe_wheel_next = lock.wheel.next_expiration_time(); + + // Take the minimum of bucket and wheel next expiration times + let next_wake = match (maybe_bucket_next, maybe_wheel_next) { + (Some(bucket_next), Some(wheel_next)) => Some(std::cmp::min(bucket_next, wheel_next)), + (bucket_next, wheel_next) => bucket_next.or(wheel_next), + }; + lock.next_wake = next_wake.map(|t| NonZeroU64::new(t).unwrap_or_else(|| NonZeroU64::new(1).unwrap())); @@ -255,6 +273,19 @@ impl Handle { pub(self) fn process_at_time(&self, mut now: u64) { let mut waker_list = WakeList::new(); + // First, advance the timer buckets and fire all expired timers + // This doesn't require the driver lock - buckets have their own synchronization + // SAFETY: The buckets manage their own thread safety via atomics and per-bucket locks + let bucket_wakers = unsafe { self.inner.buckets.advance(now) }; + for waker in bucket_wakers { + waker_list.push(waker); + + if !waker_list.can_push() { + waker_list.wake_all(); + } + } + + // Now process the timer wheel (for timers > 120s) - this DOES need the driver lock let mut lock = self.inner.lock(); if now < lock.wheel.elapsed() { @@ -285,10 +316,18 @@ impl Handle { } } - lock.next_wake = lock - .wheel - .poll_at() - .map(|t| NonZeroU64::new(t).unwrap_or_else(|| NonZeroU64::new(1).unwrap())); + // Calculate next_wake from both bucket timers and wheel timers + let maybe_bucket_next = self.inner.buckets.next_expiration_time(); + let maybe_wheel_next = lock.wheel.poll_at(); + + // Take the minimum of bucket and wheel next expiration times + let next_wake = match (maybe_bucket_next, maybe_wheel_next) { + (Some(b), Some(w)) => Some(std::cmp::min(b, w)), + (bucket, wheel) => bucket.or(wheel), + }; + + lock.next_wake = + next_wake.map(|t| NonZeroU64::new(t).unwrap_or_else(|| NonZeroU64::new(1).unwrap())); drop(lock); @@ -307,13 +346,25 @@ impl Handle { /// `add_entry` must not be called concurrently. pub(self) unsafe fn clear_entry(&self, entry: NonNull) { unsafe { - let mut lock = self.inner.lock(); + // Check if this timer is in the buckets or the wheel + let in_buckets = entry.as_ref().is_in_buckets(); + + if in_buckets { + // Timer is in buckets - remove it from the bucket Vec before the entry is freed. + // This prevents use-after-free when the TimerShared (embedded in TimerEntry) is dropped. + let registered_when = entry.as_ref().registered_when(); + let entry_handle = entry.as_ref().handle(); + self.inner.buckets.try_remove(registered_when, entry_handle); + } else { + // Timer is in the wheel - need driver lock for safe removal + let mut lock = self.inner.lock(); - if entry.as_ref().might_be_registered() { - lock.wheel.remove(entry); - } + if entry.as_ref().might_be_registered() { + lock.wheel.remove(entry); + } - entry.as_ref().handle().fire(Ok(())); + entry.as_ref().handle().fire(Ok(())); + } } } @@ -329,11 +380,74 @@ impl Handle { new_tick: u64, entry: NonNull, ) { + // Check if this timer was previously in buckets + let was_in_buckets = unsafe { entry.as_ref().is_in_buckets() }; + + if was_in_buckets { + // Timer is in buckets - keep it in buckets for reset + // Just insert at new deadline - stale copies will be skipped via registered_when check + let entry_handle = entry.as_ref().handle(); + + match self.inner.buckets.try_insert(new_tick, entry_handle) { + timer_buckets::InsertResult::Inserted => { + // Always unpark for bucket insertions - the bucket maintains its own + // next_wake atomic, and we need to ensure the driver wakes up for + // bucket timers. Checking next_wake would require locking, defeating + // the purpose of lock-free bucket insertion. + unpark.unpark(); + return; + } + timer_buckets::InsertResult::Elapsed(handle) => { + // Timer already elapsed - but update registered_when so future resets work + unsafe { + entry.as_ref().set_registered_when(new_tick); + handle.fire(Ok(())); + }; + return; + } + timer_buckets::InsertResult::OutOfRange(_handle) => { + // New deadline is >120s, must move to wheel + unsafe { entry.as_ref().handle().unmark_in_buckets() }; + // Fall through to wheel path below + } + } + } else { + // Timer was NOT in buckets (either new or was in wheel) + // Try buckets first for the new deadline + let entry_handle = entry.as_ref().handle(); + + match self.inner.buckets.try_insert(new_tick, entry_handle) { + timer_buckets::InsertResult::Inserted => { + // Successfully inserted in buckets + // If timer was previously in wheel, it will remain there as a stale entry + // The wheel will skip it when it sees in_buckets = true + + // Always unpark for bucket insertions - the bucket maintains its own + // next_wake atomic, and we need to ensure the driver wakes up for + // bucket timers. Checking next_wake would require locking, defeating + // the purpose of lock-free bucket insertion. + unpark.unpark(); + return; + } + timer_buckets::InsertResult::Elapsed(handle) => { + // Timer already elapsed - but update registered_when so future resets work + unsafe { + entry.as_ref().set_registered_when(new_tick); + handle.fire(Ok(())); + }; + return; + } + timer_buckets::InsertResult::OutOfRange(_handle) => { + // Fall through to wheel path + } + } + } + + // Timer didn't fit in buckets (>120s) - use wheel let waker = unsafe { let mut lock = self.inner.lock(); - // We may have raced with a firing/deregistration, so check before - // deregistering. + // Remove from wheel if it's already there if unsafe { entry.as_ref().might_be_registered() } { lock.wheel.remove(entry); } @@ -344,7 +458,7 @@ impl Handle { if self.is_shutdown() { unsafe { entry.fire(Err(crate::time::error::Error::shutdown())) } } else { - entry.set_expiration(new_tick); + unsafe { entry.set_expiration(new_tick) }; // Note: We don't have to worry about racing with some other resetting // thread, because add_entry and reregister require exclusive control of diff --git a/tokio/src/runtime/time/timer_buckets.rs b/tokio/src/runtime/time/timer_buckets.rs new file mode 100644 index 00000000000..2a8db2da60d --- /dev/null +++ b/tokio/src/runtime/time/timer_buckets.rs @@ -0,0 +1,325 @@ +//! Global timer buckets for fast-path timer registration and firing. +//! +//! This module implements a ring buffer of timer buckets with 1ms granularity, +//! covering timers from 0-120 seconds in the future. Timers beyond this range +//! fall back to the global timer wheel. +//! +//! # Design +//! +//! The ring buffer contains 120,000 buckets (one per millisecond for 2 minutes). +//! Each bucket has its own lock and contains a Vec of TimerHandles. This provides: +//! - Lock-free index calculation for insertion +//! - Fine-grained locking (only contention when timers land in same millisecond) +//! - O(1) advancement as time progresses +//! - Natural wraparound every 120 seconds +//! +//! # Synchronization +//! +//! - `head`: Atomic index pointing to the "current time" bucket +//! - `ref_time`: Atomic tick value representing when head was at position 0 +//! - Per-bucket locks: Only held during push (insertion) or drain (firing) +//! +//! This allows workers to insert timers lock-free (just atomic reads + single bucket lock), +//! while the driver advances the head pointer and fires expired buckets under the driver lock. + +use crate::loom::sync::atomic::Ordering; +use crate::loom::sync::atomic::{AtomicU64, AtomicUsize}; +use crate::loom::sync::Mutex; +use crate::runtime::time::TimerHandle; + +use std::task::Waker; + +/// Number of milliseconds to cover in the ring buffer (2 minutes). +/// Timers beyond this range fall back to the global timer wheel. +const BUCKET_COUNT: usize = 120_000; + +/// Global ring buffer of timer buckets. +/// +/// This structure is shared across all workers and is integrated into the +/// timer driver's Handle. +pub(crate) struct GlobalTimerBuckets { + /// The buckets themselves. Vec of 120,000 buckets. + buckets: Vec, + + /// Index of the "head" bucket (represents current time). + /// Advances forward as time progresses. + head: AtomicUsize, + + /// The tick value (milliseconds since epoch) that the head position represents. + /// Used to calculate bucket offsets for new timers. + ref_time: AtomicU64, + + /// The earliest timer deadline across all buckets. + /// Used to calculate when the driver should wake up. + /// Value of u64::MAX means no timers are registered. + next_wake: AtomicU64, +} + +/// A single bucket in the ring buffer. +struct Bucket { + /// Timers that expire in this millisecond. + /// Protected by a mutex for simplicity - can be optimized to lock-free later if needed. + timers: Mutex>, +} + +impl Default for Bucket { + fn default() -> Self { + Self { + timers: Mutex::new(Vec::new()), + } + } +} + +/// Result of attempting to insert a timer into buckets. +#[derive(Debug)] +pub(crate) enum InsertResult { + /// Timer was successfully inserted into a bucket + Inserted, + /// Timer is too far in the future (>120s), try the wheel instead + OutOfRange(TimerHandle), + /// Timer has already elapsed and should be fired immediately + Elapsed(TimerHandle), +} + +impl GlobalTimerBuckets { + /// Creates a new global timer bucket ring buffer. + /// + /// The `initial_tick` parameter sets the reference time for the head position. + pub(crate) fn new(initial_tick: u64) -> Self { + // Pre-allocate all buckets on the heap + let buckets = (0..BUCKET_COUNT).map(|_| Bucket::default()).collect(); + + Self { + buckets, + head: AtomicUsize::new(0), + ref_time: AtomicU64::new(initial_tick), + next_wake: AtomicU64::new(u64::MAX), + } + } + + /// Attempts to insert a timer into the appropriate bucket. + /// + /// Returns `InsertResult` indicating what happened. + /// + /// # Parameters + /// - `deadline_tick`: The absolute tick (milliseconds since epoch) when timer expires + /// - `timer`: The timer handle to insert + pub(crate) fn try_insert(&self, deadline_tick: u64, timer: TimerHandle) -> InsertResult { + self.try_insert_inner(deadline_tick, timer, true) + } + + fn try_insert_inner(&self, deadline_tick: u64, timer: TimerHandle, mark: bool) -> InsertResult { + // Read current reference time and head position atomically + let ref_tick = self.ref_time.load(Ordering::Acquire); + let head_pos = self.head.load(Ordering::Acquire); + + // Check if timer has already elapsed + if deadline_tick <= ref_tick { + return InsertResult::Elapsed(timer); + } + + // Calculate offset from current reference time + let offset = deadline_tick - ref_tick; + + // If timer is beyond our range, don't insert it here + if offset >= BUCKET_COUNT as u64 { + return InsertResult::OutOfRange(timer); + } + + // Calculate target bucket index with wraparound + let bucket_idx = (head_pos + offset as usize) % BUCKET_COUNT; + + // Lock just this bucket and insert the timer + let mut bucket = self.buckets[bucket_idx].timers.lock(); + + // Re-check after locking - ref_time might have advanced while we were waiting for lock + let ref_tick_locked = self.ref_time.load(Ordering::Acquire); + if deadline_tick <= ref_tick_locked { + // Deadline passed while we were acquiring the lock + return InsertResult::Elapsed(timer); + } + + // SAFETY: We hold the bucket lock which synchronizes with advance(). + // The handle is valid (just passed to us), and this timer is not in + // the wheel (it's going into buckets instead). The bucket lock provides + // the necessary memory fence for the relaxed atomic operations in + // set_expiration() to be visible when advance() later fires this timer. + unsafe { + if mark { + timer.mark_in_buckets(); + } + timer.set_expiration(deadline_tick); + } + + bucket.push(timer); + + // Update next_wake if this timer is earlier + self.next_wake.fetch_min(deadline_tick, Ordering::Release); + + InsertResult::Inserted + } + + /// Returns the tick of the next timer that will fire, if any. + /// + /// This is used to calculate when the driver should wake up. + pub(crate) fn next_expiration_time(&self) -> Option { + let next = self.next_wake.load(Ordering::Acquire); + if next == u64::MAX { + None + } else { + Some(next) + } + } + + /// Removes a timer from the bucket where it was inserted. + /// + /// This is called when a timer is being cancelled/dropped before it fires. + /// We need to remove the handle from the bucket Vec to avoid use-after-free + /// when the underlying TimerShared is freed. + /// + /// # Parameters + /// - `registered_when`: The tick value when this timer was originally inserted (stored in registered_when) + /// - `timer_to_remove`: The handle to remove + pub(crate) fn try_remove(&self, registered_when: u64, _timer_to_remove: TimerHandle) { + // Calculate which bucket this timer should be in based on when it was registered + let ref_tick = self.ref_time.load(Ordering::Acquire); + let head_pos = self.head.load(Ordering::Acquire); + + // Only try to remove if the timer is still in the valid bucket range + if registered_when <= ref_tick { + // Timer's deadline has passed, it's either already been fired or elapsed + // Don't try to remove it + return; + } + + let offset = registered_when - ref_tick; + + // If offset is >= BUCKET_COUNT, the timer is out of range (shouldn't happen for buckets, + // but be safe) + if offset >= BUCKET_COUNT as u64 { + return; + } + + let bucket_idx = (head_pos + offset as usize) % BUCKET_COUNT; + + // Lock the bucket and search for/remove the matching timer + if let Some(mut bucket) = self.buckets[bucket_idx].timers.try_lock() { + // Remove the timer from the bucket Vec by finding the matching registered_when value. + // Multiple timers can be in the same bucket, but only one should have our registered_when. + let target_registered_when = registered_when; + bucket.retain(|handle: &TimerHandle| { + let handle_registered = unsafe { handle.registered_when() }; + // Keep the handle if it has a different registered_when (not our timer) + handle_registered != target_registered_when + }); + } + // If we can't acquire the lock, bail out. The timer will be fired during the + // current advance() call that's holding the lock, since it's still marked as + // being in buckets. + } + + /// Advances the ring buffer to the current time and fires all expired timers. + /// + /// This is called by the driver when it processes timers. Returns all wakers + /// that need to be woken after the driver lock is released. + /// + /// # Parameters + /// - `now_tick`: The current tick (milliseconds since epoch) + /// + /// # Safety + /// Must be called with the driver lock held. + pub(crate) unsafe fn advance(&self, now_tick: u64) -> Vec { + let mut wakers = Vec::new(); + + let ref_tick = self.ref_time.load(Ordering::Acquire); + let ticks_elapsed = now_tick.saturating_sub(ref_tick); + + // Cap the number of ticks we advance to the bucket count + // If time jumped way forward (e.g., during shutdown with u64::MAX), we only need + // to fire all buckets once, not loop billions of times + let ticks_to_advance = std::cmp::min(ticks_elapsed, BUCKET_COUNT as u64); + + // Track if we need to clear next_wake (if we fire the bucket it points to and it becomes empty) + let current_next_wake = self.next_wake.load(Ordering::Acquire); + + // Advance through each elapsed tick, firing timers in each bucket + for _ in 0..ticks_to_advance { + // Atomically advance head and ref_time, get the tick we're firing + // fetch_add returns the OLD value, but we want to fire the NEW bucket at the NEW time + let bucket_idx = (self.head.fetch_add(1, Ordering::AcqRel) + 1) % BUCKET_COUNT; + let current_tick = self.ref_time.fetch_add(1, Ordering::AcqRel) + 1; + + // Fire all timers in this bucket + let mut bucket = self.buckets[bucket_idx].timers.lock(); + + let had_timers = !bucket.is_empty(); + + for timer_handle in bucket.drain(..) { + // Only fire timers that are actually in buckets and match the current tick. + // If a timer was reset to a deadline > 120s, it was moved to the wheel but + // its old handle may still be in this bucket Vec. We skip these by checking + // is_in_buckets and registered_when to ensure we only fire valid entries. + if unsafe { timer_handle.is_in_buckets_unsafe() } { + let registered = unsafe { timer_handle.registered_when() }; + if registered == current_tick { + // SAFETY: We hold the driver lock, which is required for firing + if let Some(waker) = unsafe { timer_handle.fire(Ok(())) } { + wakers.push(waker); + } + } + } + } + + // If we just drained the bucket that next_wake was pointing to, clear it + if had_timers && current_tick == current_next_wake { + // Use compare_exchange to only clear if it's still pointing to this tick + // (another thread might have inserted a new earlier timer) + let _ = self.next_wake.compare_exchange( + current_next_wake, + u64::MAX, + Ordering::Release, + Ordering::Relaxed, + ); + } + } + + wakers + } +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn test_bucket_offset_calculation() { + let buckets = GlobalTimerBuckets::new(1000); + + // Timer 5000ms in the future should land at index 5000 + assert_eq!(buckets.head.load(Ordering::Relaxed), 0); + assert_eq!(buckets.ref_time.load(Ordering::Relaxed), 1000); + + // Offset = 6000 - 1000 = 5000 + // Index = (0 + 5000) % 120000 = 5000 + let offset = 6000u64.saturating_sub(1000); + assert_eq!(offset, 5000); + assert!((offset as usize) < BUCKET_COUNT); + } + + #[test] + fn test_wraparound() { + let buckets = GlobalTimerBuckets::new(0); + + // Advance head near the end + buckets.head.store(119_500, Ordering::Release); + buckets.ref_time.store(119_500, Ordering::Release); + + // Timer 1000ms in the future should wrap around + let offset = 120_500u64.saturating_sub(119_500); + assert_eq!(offset, 1000); + + let bucket_idx = (119_500 + offset as usize) % BUCKET_COUNT; + assert_eq!(bucket_idx, 120_500 % BUCKET_COUNT); + assert_eq!(bucket_idx, 500); // Wrapped around + } +} diff --git a/tokio/src/runtime/time/wheel/mod.rs b/tokio/src/runtime/time/wheel/mod.rs index 8d94303544c..85fb183b6dc 100644 --- a/tokio/src/runtime/time/wheel/mod.rs +++ b/tokio/src/runtime/time/wheel/mod.rs @@ -234,6 +234,13 @@ impl Wheel { debug_assert_eq!(unsafe { item.registered_when() }, expiration.deadline); } + // Check if timer has already been fired (e.g., by bucket path during reset). + // Also skip if timer was moved to buckets (stale wheel entry). + if !unsafe { item.might_be_registered() } || unsafe { item.is_in_buckets_unsafe() } { + // Timer was already fired or moved to buckets, skip it + continue; + } + // Try to expire the entry; this is cheap (doesn't synchronize) if // the timer is not expired, and updates registered_when. match unsafe { item.mark_pending(expiration.deadline) } {