From 1220253963180950d861b9bb6d1d44de90d8742f Mon Sep 17 00:00:00 2001 From: Eliza Weisman Date: Fri, 27 Jun 2025 14:15:43 -0700 Subject: [PATCH 01/15] runtime: Make multithreaded rt LIFO slot atomic This commit changes the LIFO slot on multi-threaded runtime workers from being a mutable `Option>>` to a new `AtomicNotified` type. This type implements a cell containing a nullable task pointer which can be swapped atomically. It's analogous to `AtomicCell` but with the extra `PhantomData` to remember the task's scheduler type parameter, which would otherwise be erased by the conversion into a `*mut Header`` pointer. This change is in preparation for a subsequent change to allow work-stealing from the LIFO slot (see: #4941). --- .../runtime/scheduler/multi_thread/worker.rs | 14 ++--- tokio/src/runtime/task/atomic_notified.rs | 58 +++++++++++++++++++ tokio/src/runtime/task/mod.rs | 5 ++ 3 files changed, 69 insertions(+), 8 deletions(-) create mode 100644 tokio/src/runtime/task/atomic_notified.rs diff --git a/tokio/src/runtime/scheduler/multi_thread/worker.rs b/tokio/src/runtime/scheduler/multi_thread/worker.rs index 838c15840a5..b90ec2646fb 100644 --- a/tokio/src/runtime/scheduler/multi_thread/worker.rs +++ b/tokio/src/runtime/scheduler/multi_thread/worker.rs @@ -106,7 +106,7 @@ struct Core { /// queue. This effectively results in the **last** scheduled task to be run /// next (LIFO). This is an optimization for improving locality which /// benefits message passing patterns and helps to reduce latency. - lifo_slot: Option, + lifo_slot: task::AtomicNotified>, /// When `true`, locally scheduled tasks go to the LIFO slot. When `false`, /// they go to the back of the `run_queue`. @@ -257,7 +257,7 @@ pub(super) fn create( cores.push(Box::new(Core { tick: 0, - lifo_slot: None, + lifo_slot: task::AtomicNotified::empty(), lifo_enabled: !config.disable_lifo_slot, run_queue, is_searching: false, @@ -1084,17 +1084,15 @@ impl Handle { true } else { // Push to the LIFO slot - let prev = core.lifo_slot.take(); - let ret = prev.is_some(); + let prev = core.lifo_slot.swap(Some(task)); if let Some(prev) = prev { core.run_queue .push_back_or_overflow(prev, self, &mut core.stats); + true + } else { + false } - - core.lifo_slot = Some(task); - - ret }; // Only notify if not currently parked. If `park` is `None`, then the diff --git a/tokio/src/runtime/task/atomic_notified.rs b/tokio/src/runtime/task/atomic_notified.rs new file mode 100644 index 00000000000..72aa99a73ea --- /dev/null +++ b/tokio/src/runtime/task/atomic_notified.rs @@ -0,0 +1,58 @@ +use crate::loom::sync::atomic::AtomicPtr; +use crate::runtime::task::{Header, Notified, RawTask}; + +use std::marker::PhantomData; +use std::ptr; +use std::ptr::NonNull; +use std::sync::atomic::Ordering::{AcqRel, Acquire}; + +/// An atomic cell which can contain a pointer to a [`Notified`] task. +/// +/// This is similar to the [`crate::util::AtomicCell`] type, but specialized to +/// hold a task pointer --- this type "remembers" the task's scheduler generic +/// when a task is stored in the cell, so that the pointer can be turned back +/// into a [`Notified`] task with the correct generic type when it is retrieved. +pub(crate) struct AtomicNotified { + task: AtomicPtr
, + _scheduler: PhantomData, +} + +impl AtomicNotified { + pub(crate) fn empty() -> Self { + Self { + task: AtomicPtr::new(ptr::null_mut()), + _scheduler: PhantomData, + } + } + + pub(crate) fn swap(&self, task: Option>) -> Option> { + let new = task + .map(|t| t.into_raw().header_ptr().as_ptr()) + .unwrap_or_else(ptr::null_mut); + let old = self.task.swap(new, AcqRel); + NonNull::new(old).map(|ptr| unsafe { + // Safety: since we only allow tasks with the same scheduler type to + // be placed in this cell, we know that the pointed task's scheduler + // type matches the type parameter S. + Notified::from_raw(RawTask::from_raw(ptr)) + }) + } + + pub(crate) fn take(&self) -> Option> { + self.swap(None) + } + + pub(crate) fn is_some(&self) -> bool { + !self.task.load(Acquire).is_null() + } +} + +unsafe impl Send for AtomicNotified {} +unsafe impl Sync for AtomicNotified {} + +impl Drop for AtomicNotified { + fn drop(&mut self) { + // Ensure the task reference is dropped if this cell is dropped. + let _ = self.take(); + } +} diff --git a/tokio/src/runtime/task/mod.rs b/tokio/src/runtime/task/mod.rs index 7d314c3b176..87998ab4574 100644 --- a/tokio/src/runtime/task/mod.rs +++ b/tokio/src/runtime/task/mod.rs @@ -214,6 +214,11 @@ pub(crate) use self::raw::RawTask; mod state; use self::state::State; +#[cfg(feature = "rt-multi-thread")] +mod atomic_notified; +#[cfg(feature = "rt-multi-thread")] +pub(crate) use self::atomic_notified::AtomicNotified; + mod waker; cfg_taskdump! { From 75d8116de8b0280340312da97aa0447c88b27b3b Mon Sep 17 00:00:00 2001 From: Eliza Weisman Date: Fri, 27 Jun 2025 15:09:29 -0700 Subject: [PATCH 02/15] runtime: start moving LIFO into the run queue This way, it's accesible by the stealer. Leave all the LIFO *accounting* (i.e. deciding whether we hit the LIFO slot or not) up to the worker. Gotta figure out whether the load of lifo presence will race...ugh. --- .../runtime/scheduler/multi_thread/queue.rs | 24 ++++++++++++++++++- .../runtime/scheduler/multi_thread/worker.rs | 20 +++++----------- 2 files changed, 29 insertions(+), 15 deletions(-) diff --git a/tokio/src/runtime/scheduler/multi_thread/queue.rs b/tokio/src/runtime/scheduler/multi_thread/queue.rs index 68670e63ca1..d4d56376744 100644 --- a/tokio/src/runtime/scheduler/multi_thread/queue.rs +++ b/tokio/src/runtime/scheduler/multi_thread/queue.rs @@ -52,6 +52,13 @@ pub(crate) struct Inner { /// Only updated by producer thread but read by many threads. tail: AtomicUnsignedShort, + /// When a task is scheduled from a worker, it is stored in this slot. The + /// worker will check this slot for a task **before** checking the run + /// queue. This effectively results in the **last** scheduled task to be run + /// next (LIFO). This is an optimization for improving locality which + /// benefits message passing patterns and helps to reduce latency. + lifo: task::AtomicNotified, + /// Elements buffer: Box<[UnsafeCell>>; LOCAL_QUEUE_CAPACITY]>, } @@ -92,6 +99,7 @@ pub(crate) fn local() -> (Steal, Local) { let inner = Arc::new(Inner { head: AtomicUnsignedLong::new(0), tail: AtomicUnsignedShort::new(0), + lifo: task::AtomicNotified::empty(), buffer: make_fixed_size(buffer.into_boxed_slice()), }); @@ -110,7 +118,9 @@ impl Local { let (_, head) = unpack(self.inner.head.load(Acquire)); // safety: this is the **only** thread that updates this cell. let tail = unsafe { self.inner.tail.unsync_load() }; - len(head, tail) + // XXX(eliza): the `is_some` here might be racy? do we need to pack a + // LIFO-presence bit into one of the atomics? + len(head, tail) + (self.inner.lifo.is_some() as usize) } /// How many tasks can be pushed into the queue @@ -388,6 +398,18 @@ impl Local { Some(self.inner.buffer[idx].with(|ptr| unsafe { ptr::read(ptr).assume_init() })) } + + /// Pushes a task to the LIFO slot, returning the task previously in the + /// LIFO slot (if there was one). + pub(crate) fn push_lifo(&self, task: task::Notified) -> Option> { + self.inner.lifo.swap(Some(task)) + } + + /// Pops the task currently held in the LIFO slot, if there is one; + /// otherwise, returns `None`. + pub(crate) fn pop_lifo(&self) -> Option> { + self.inner.lifo.take() + } } impl Steal { diff --git a/tokio/src/runtime/scheduler/multi_thread/worker.rs b/tokio/src/runtime/scheduler/multi_thread/worker.rs index b90ec2646fb..d47bcf5ee13 100644 --- a/tokio/src/runtime/scheduler/multi_thread/worker.rs +++ b/tokio/src/runtime/scheduler/multi_thread/worker.rs @@ -101,13 +101,6 @@ struct Core { /// Used to schedule bookkeeping tasks every so often. tick: u32, - /// When a task is scheduled from a worker, it is stored in this slot. The - /// worker will check this slot for a task **before** checking the run - /// queue. This effectively results in the **last** scheduled task to be run - /// next (LIFO). This is an optimization for improving locality which - /// benefits message passing patterns and helps to reduce latency. - lifo_slot: task::AtomicNotified>, - /// When `true`, locally scheduled tasks go to the LIFO slot. When `false`, /// they go to the back of the `run_queue`. lifo_enabled: bool, @@ -257,7 +250,6 @@ pub(super) fn create( cores.push(Box::new(Core { tick: 0, - lifo_slot: task::AtomicNotified::empty(), lifo_enabled: !config.disable_lifo_slot, run_queue, is_searching: false, @@ -405,7 +397,7 @@ where // If we heavily call `spawn_blocking`, there might be no available thread to // run this core. Except for the task in the lifo_slot, all tasks can be // stolen, so we move the task out of the lifo_slot to the run_queue. - if let Some(task) = core.lifo_slot.take() { + if let Some(task) = core.run_queue.pop_lifo() { core.run_queue .push_back_or_overflow(task, &*cx.worker.handle, &mut core.stats); } @@ -617,7 +609,7 @@ impl Context { }; // Check for a task in the LIFO slot - let task = match core.lifo_slot.take() { + let task = match core.run_queue.pop_lifo() { Some(task) => task, None => { self.reset_lifo_enabled(&mut core); @@ -852,7 +844,7 @@ impl Core { } fn next_local_task(&mut self) -> Option { - self.lifo_slot.take().or_else(|| self.run_queue.pop()) + self.run_queue.pop_lifo().or_else(|| self.run_queue.pop()) } /// Function responsible for stealing tasks from another worker @@ -908,7 +900,7 @@ impl Core { } fn has_tasks(&self) -> bool { - self.lifo_slot.is_some() || self.run_queue.has_tasks() + self.run_queue.has_tasks() } fn should_notify_others(&self) -> bool { @@ -917,7 +909,7 @@ impl Core { if self.is_searching { return false; } - self.lifo_slot.is_some() as usize + self.run_queue.len() > 1 + self.run_queue.len() > 1 } /// Prepares the worker state for parking. @@ -1084,7 +1076,7 @@ impl Handle { true } else { // Push to the LIFO slot - let prev = core.lifo_slot.swap(Some(task)); + let prev = core.run_queue.push_lifo(task); if let Some(prev) = prev { core.run_queue From 730a58114fc28c782fbdd99fe52b3b4ecd12187b Mon Sep 17 00:00:00 2001 From: Eliza Weisman Date: Fri, 27 Jun 2025 15:40:52 -0700 Subject: [PATCH 03/15] runtime: actually steal LIFO task (stupid version) --- .../runtime/scheduler/multi_thread/queue.rs | 31 ++++++++++++++----- 1 file changed, 23 insertions(+), 8 deletions(-) diff --git a/tokio/src/runtime/scheduler/multi_thread/queue.rs b/tokio/src/runtime/scheduler/multi_thread/queue.rs index d4d56376744..f1e2d1e22b7 100644 --- a/tokio/src/runtime/scheduler/multi_thread/queue.rs +++ b/tokio/src/runtime/scheduler/multi_thread/queue.rs @@ -451,6 +451,12 @@ impl Steal { // tasks in `dst`. let mut n = self.steal_into2(dst, dst_tail); + // Ooh, there's another task just sitting there? Grab that one, too! + let lifo = self.0.lifo.take(); + if lifo.is_some() { + n += 1; + } + if n == 0 { // No tasks were stolen return None; @@ -462,22 +468,31 @@ impl Steal { // We are returning a task here n -= 1; - let ret_pos = dst_tail.wrapping_add(n); - let ret_idx = ret_pos as usize & MASK; - - // safety: the value was written as part of `steal_into2` and not - // exposed to stealers, so no other thread can access it. - let ret = dst.inner.buffer[ret_idx].with(|ptr| unsafe { ptr::read((*ptr).as_ptr()) }); + let ret = if lifo.is_some() { + // If we took the task from the LIFO slot, just return it as the + // next task to run, rather than messing around with the buffer. + lifo + } else { + // The LIFO slot was was empty, so take the last task we squirted + // into `dst` instead. + let ret_pos = dst_tail.wrapping_add(n); + let ret_idx = ret_pos as usize & MASK; + + // safety: the value was written as part of `steal_into2` and not + // exposed to stealers, so no other thread can access it. + let task = dst.inner.buffer[ret_idx].with(|ptr| unsafe { ptr::read((*ptr).as_ptr()) }); + Some(task) + }; if n == 0 { // The `dst` queue is empty, but a single task was stolen - return Some(ret); + return ret; } // Make the stolen items available to consumers dst.inner.tail.store(dst_tail.wrapping_add(n), Release); - Some(ret) + ret } // Steal tasks from `self`, placing them into `dst`. Returns the number of From 4c680aeaf55387fc723d6d17c845b14cebf8b46e Mon Sep 17 00:00:00 2001 From: Eliza Weisman Date: Fri, 27 Jun 2025 16:02:35 -0700 Subject: [PATCH 04/15] runtime: docs fixes --- tokio/src/runtime/scheduler/multi_thread/queue.rs | 1 + tokio/src/runtime/task/atomic_notified.rs | 2 +- 2 files changed, 2 insertions(+), 1 deletion(-) diff --git a/tokio/src/runtime/scheduler/multi_thread/queue.rs b/tokio/src/runtime/scheduler/multi_thread/queue.rs index f1e2d1e22b7..5bc989c3cfb 100644 --- a/tokio/src/runtime/scheduler/multi_thread/queue.rs +++ b/tokio/src/runtime/scheduler/multi_thread/queue.rs @@ -408,6 +408,7 @@ impl Local { /// Pops the task currently held in the LIFO slot, if there is one; /// otherwise, returns `None`. pub(crate) fn pop_lifo(&self) -> Option> { + // LIFO-suction! self.inner.lifo.take() } } diff --git a/tokio/src/runtime/task/atomic_notified.rs b/tokio/src/runtime/task/atomic_notified.rs index 72aa99a73ea..05a408755e8 100644 --- a/tokio/src/runtime/task/atomic_notified.rs +++ b/tokio/src/runtime/task/atomic_notified.rs @@ -8,7 +8,7 @@ use std::sync::atomic::Ordering::{AcqRel, Acquire}; /// An atomic cell which can contain a pointer to a [`Notified`] task. /// -/// This is similar to the [`crate::util::AtomicCell`] type, but specialized to +/// This is similar to the `crate::util::AtomicCell` type, but specialized to /// hold a task pointer --- this type "remembers" the task's scheduler generic /// when a task is stored in the cell, so that the pointer can be turned back /// into a [`Notified`] task with the correct generic type when it is retrieved. From 9ad2404a1345d497b07b647faedbd879d453fda2 Mon Sep 17 00:00:00 2001 From: Eliza Weisman Date: Mon, 30 Jun 2025 11:09:24 -0700 Subject: [PATCH 05/15] runtime: add loom tests with lifo slot --- .../runtime/tests/loom_multi_thread/queue.rs | 121 ++++++++++++++++-- 1 file changed, 112 insertions(+), 9 deletions(-) diff --git a/tokio/src/runtime/tests/loom_multi_thread/queue.rs b/tokio/src/runtime/tests/loom_multi_thread/queue.rs index 0d818283653..88b65903b5b 100644 --- a/tokio/src/runtime/tests/loom_multi_thread/queue.rs +++ b/tokio/src/runtime/tests/loom_multi_thread/queue.rs @@ -62,6 +62,65 @@ fn basic() { }); } +// Like `basic`, but with tasks in the LIFO slot. +#[test] +fn basic_lifo() { + loom::model(|| { + let (steal, mut local) = queue::local(); + let inject = RefCell::new(vec![]); + let mut stats = new_stats(); + + let th = thread::spawn(move || { + let mut stats = new_stats(); + let (_, mut local) = queue::local(); + let mut n = 0; + + for _ in 0..3 { + if steal.steal_into(&mut local, &mut stats).is_some() { + n += 1; + } + + while local.pop().is_some() { + n += 1; + } + } + + n + }); + + let mut n = 0; + + for _ in 0..2 { + for _ in 0..2 { + let (task, _) = unowned(async {}); + if let Some(prev) = local.push_lifo(task) { + local.push_back_or_overflow(prev, &inject, &mut stats); + } + } + + if local.pop_lifo().or_else(|| local.pop()).is_some() { + n += 1; + } + + // Push another task + let (task, _) = unowned(async {}); + if let Some(prev) = local.push_lifo(task) { + local.push_back_or_overflow(prev, &inject, &mut stats); + } + + while local.pop().or_else(|| local.pop()).is_some() { + n += 1; + } + } + + n += inject.borrow_mut().drain(..).count(); + + n += th.join().unwrap(); + + assert_eq!(6, n); + }); +} + #[test] fn steal_overflow() { loom::model(|| { @@ -116,32 +175,59 @@ fn steal_overflow() { fn multi_stealer() { const NUM_TASKS: usize = 5; - fn steal_tasks(steal: queue::Steal) -> usize { + loom::model(|| { + let (steal, mut local) = queue::local(); + let inject = RefCell::new(vec![]); let mut stats = new_stats(); - let (_, mut local) = queue::local(); - if steal.steal_into(&mut local, &mut stats).is_none() { - return 0; + // Push work + for _ in 0..NUM_TASKS { + let (task, _) = unowned(async {}); + local.push_back_or_overflow(task, &inject, &mut stats); } - let mut n = 1; + let th1 = { + let steal = steal.clone(); + thread::spawn(move || steal_tasks(steal)) + }; + + let th2 = thread::spawn(move || steal_tasks(steal)); + + let mut n = 0; while local.pop().is_some() { n += 1; } - n - } + n += inject.borrow_mut().drain(..).count(); + + n += th1.join().unwrap(); + n += th2.join().unwrap(); + + assert_eq!(n, NUM_TASKS); + }); +} + +// Like `multi_stealer`, but with tasks in the LIFO slot. +#[test] +fn multi_stealer_lifo() { + const NUM_TASKS: usize = 5; loom::model(|| { let (steal, mut local) = queue::local(); let inject = RefCell::new(vec![]); let mut stats = new_stats(); - // Push work + // Push work into the LIFO slot. for _ in 0..NUM_TASKS { let (task, _) = unowned(async {}); - local.push_back_or_overflow(task, &inject, &mut stats); + // Push the new task into the LIFO slot, as though it's being + // notified locally. + if let Some(prev) = local.push_lifo(task) { + // If a task was already in the LIFO slot, stick the previous + // LIFO task into the queue. + local.push_back_or_overflow(prev, &inject, &mut stats); + } } let th1 = { @@ -166,6 +252,23 @@ fn multi_stealer() { }); } +fn steal_tasks(steal: queue::Steal) -> usize { + let mut stats = new_stats(); + let (_, mut local) = queue::local(); + + if steal.steal_into(&mut local, &mut stats).is_none() { + return 0; + } + + let mut n = 1; + + while local.pop().is_some() { + n += 1; + } + + n +} + #[test] fn chained_steal() { loom::model(|| { From fe53128bdd075f580b07e0f9265dc46009406975 Mon Sep 17 00:00:00 2001 From: Eliza Weisman Date: Mon, 30 Jun 2025 11:53:42 -0700 Subject: [PATCH 06/15] runtime: add test for #4941 This commit adds a test ensuring that if a task is notified to the LIFO slot by another task which then blocks the worker thread forever, the LIFO task is eventually stolen by another worker. I've confimed that this test fails on the `master` branch, and passes after these changes. --- tokio/tests/rt_threaded.rs | 71 ++++++++++++++++++++++++++++++++++++++ 1 file changed, 71 insertions(+) diff --git a/tokio/tests/rt_threaded.rs b/tokio/tests/rt_threaded.rs index ca2a952fb73..a76583e26f7 100644 --- a/tokio/tests/rt_threaded.rs +++ b/tokio/tests/rt_threaded.rs @@ -692,6 +692,77 @@ fn mutex_in_block_in_place() { }) } +// Tests that when a task is notified by another task and is placed in the LIFO +// slot, and then the notifying task blocks the runtime, the notified task will +// be stolen by another worker thread. +// +// Integration test for: https://github.com/tokio-rs/tokio/issues/4941 +#[test] +fn lifo_stealable() { + use std::time::Duration; + + let (unblock_tx, unblock_rx) = tokio::sync::oneshot::channel(); + let (task_started_tx, task_started_rx) = tokio::sync::oneshot::channel(); + let (block_thread_tx, block_thread_rx) = mpsc::channel::<()>(); + let rt = runtime::Builder::new_multi_thread() + // Make sure there are enough workers that one can be parked running the + // I/O driver and another can be parked running the timer wheel and + // there's still at least one worker free to steal the blocked task. + .worker_threads(4) + .enable_time() + .build() + .unwrap(); + + rt.block_on(async { + // Keep the runtime busy so that the workers that might steal the + // blocked task don't all park themselves forever. + let churn = tokio::spawn(async move { + loop { + tokio::time::sleep(Duration::from_millis(64)).await; + } + }); + + let blocked_task_joined = tokio::spawn(async move { + println!("[LIFO] task started"); + task_started_tx.send(()).unwrap(); + println!("[LIFO] task waiting for wakeup..."); + unblock_rx.await.unwrap(); + println!("[LIFO] task running after wakeup"); + }); + + // Wait for the blocked task to have been polled once and have yielded + // before we spawn the task that will notify it. + task_started_rx.await.unwrap(); + println!("[main] LIFO slot task start acked!"); + + // Now, spawn a task that will notify the blocked task before going + // blocking forever. + tokio::spawn(async move { + println!("[blocker] sending wakeup"); + unblock_tx.send(()).unwrap(); + + println!("[blocker] blocking the worker thread..."); + // Block the worker thread indefinitely by waiting for a message on + // a blocking channel. Using a channel rather than e.g. `loop {}` + // allows us to terminate the task cleanly when the test finishes. + let _ = block_thread_rx.recv(); + println!("[blocker] done"); + }); + + println!("[main] blocker task spawned"); + + let result = tokio::time::timeout(Duration::from_secs(30), blocked_task_joined).await; + println!("[main] result: {result:?}"); + // Before possibly panicking, make sure that we wake up the blocked task + // so that it doesn't stop the runtime from shutting down. + block_thread_tx.send(()).unwrap(); + churn.abort(); + result + .expect("task in LIFO slot should complete within 30 seconds") + .expect("task in LIFO slot should not panic"); + }) +} + // Testing the tuning logic is tricky as it is inherently timing based, and more // of a heuristic than an exact behavior. This test checks that the interval // changes over time based on load factors. There are no assertions, completion From 86b16c467a8137d86144f6b29906d5250a51ca6d Mon Sep 17 00:00:00 2001 From: Eliza Weisman Date: Mon, 30 Jun 2025 12:53:12 -0700 Subject: [PATCH 07/15] runtime: fix remote queue depth for metrics --- tokio/src/runtime/scheduler/multi_thread/queue.rs | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/tokio/src/runtime/scheduler/multi_thread/queue.rs b/tokio/src/runtime/scheduler/multi_thread/queue.rs index 5bc989c3cfb..0db7bc62b49 100644 --- a/tokio/src/runtime/scheduler/multi_thread/queue.rs +++ b/tokio/src/runtime/scheduler/multi_thread/queue.rs @@ -116,11 +116,10 @@ impl Local { /// Returns the number of entries in the queue pub(crate) fn len(&self) -> usize { let (_, head) = unpack(self.inner.head.load(Acquire)); + let lifo = self.inner.lifo.is_some() as usize; // safety: this is the **only** thread that updates this cell. let tail = unsafe { self.inner.tail.unsync_load() }; - // XXX(eliza): the `is_some` here might be racy? do we need to pack a - // LIFO-presence bit into one of the atomics? - len(head, tail) + (self.inner.lifo.is_some() as usize) + len(head, tail) + lifo } /// How many tasks can be pushed into the queue @@ -418,7 +417,8 @@ impl Steal { pub(crate) fn len(&self) -> usize { let (_, head) = unpack(self.0.head.load(Acquire)); let tail = self.0.tail.load(Acquire); - len(head, tail) + let lifo = self.0.lifo.is_some() as usize; + len(head, tail) + lifo } /// Return true if the queue is empty, From 0497b31032e18bb29a5f709f653e3323be1bcfb2 Mon Sep 17 00:00:00 2001 From: Eliza Weisman Date: Mon, 30 Jun 2025 13:26:57 -0700 Subject: [PATCH 08/15] runtime: fix lifo loom tests not popping from their own lifo slots --- tokio/src/runtime/tests/loom_multi_thread/queue.rs | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/tokio/src/runtime/tests/loom_multi_thread/queue.rs b/tokio/src/runtime/tests/loom_multi_thread/queue.rs index 88b65903b5b..2098b548a2b 100644 --- a/tokio/src/runtime/tests/loom_multi_thread/queue.rs +++ b/tokio/src/runtime/tests/loom_multi_thread/queue.rs @@ -108,7 +108,7 @@ fn basic_lifo() { local.push_back_or_overflow(prev, &inject, &mut stats); } - while local.pop().or_else(|| local.pop()).is_some() { + while local.pop_lifo().or_else(|| local.pop()).is_some() { n += 1; } } @@ -239,7 +239,7 @@ fn multi_stealer_lifo() { let mut n = 0; - while local.pop().is_some() { + while local.pop_lifo().or_else(|| local.pop()).is_some() { n += 1; } From cb27dda702eadb61d56a148036257eca266c30c2 Mon Sep 17 00:00:00 2001 From: Eliza Weisman Date: Mon, 30 Jun 2025 15:05:26 -0700 Subject: [PATCH 09/15] runtime: fix wrong accounting for LIFO slot in new queue len --- .../runtime/scheduler/multi_thread/queue.rs | 18 +++++++++--------- 1 file changed, 9 insertions(+), 9 deletions(-) diff --git a/tokio/src/runtime/scheduler/multi_thread/queue.rs b/tokio/src/runtime/scheduler/multi_thread/queue.rs index 0db7bc62b49..4c3ba93fb78 100644 --- a/tokio/src/runtime/scheduler/multi_thread/queue.rs +++ b/tokio/src/runtime/scheduler/multi_thread/queue.rs @@ -454,26 +454,26 @@ impl Steal { // Ooh, there's another task just sitting there? Grab that one, too! let lifo = self.0.lifo.take(); - if lifo.is_some() { - n += 1; - } - if n == 0 { + if n == 0 && lifo.is_none() { // No tasks were stolen return None; } - dst_stats.incr_steal_count(n as u16); + // If we also grabbed the task from the LIFO slot, include that in the + // steal count as well. + dst_stats.incr_steal_count(n as u16 + lifo.is_some() as u16); dst_stats.incr_steal_operations(); - // We are returning a task here - n -= 1; - let ret = if lifo.is_some() { // If we took the task from the LIFO slot, just return it as the - // next task to run, rather than messing around with the buffer. + // next task to run, rather than messing around with tasks from the + // queue. lifo } else { + // We are returning a task from the queue here. + n -= 1; + // The LIFO slot was was empty, so take the last task we squirted // into `dst` instead. let ret_pos = dst_tail.wrapping_add(n); From 3022aaece22654b0cd979682ebcdaee3e48e2586 Mon Sep 17 00:00:00 2001 From: Eliza Weisman Date: Tue, 1 Jul 2025 11:22:40 -0700 Subject: [PATCH 10/15] runtime: fix racy `worker_local_queue_depth` test This test spawns a task that sometimes ends up in the LIFO slot and sometimes doesn't. This was previously fine as the LIFO slot didn't count for `worker_local_queue_depth`, but now it does. Thus, we have to make sure that task no longer exists before asserting about queue depth. --- tokio/tests/rt_unstable_metrics.rs | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/tokio/tests/rt_unstable_metrics.rs b/tokio/tests/rt_unstable_metrics.rs index a8246282126..b6b4c140e11 100644 --- a/tokio/tests/rt_unstable_metrics.rs +++ b/tokio/tests/rt_unstable_metrics.rs @@ -642,9 +642,13 @@ fn worker_local_queue_depth() { }); // Bump the next-run spawn - tokio::spawn(async {}); + let nop = tokio::spawn(async {}); + // Wait until we're sure the other worker is blocked. rx1.recv().unwrap(); + // Make sure the no-op task has terminated so that it doesn't end up + // in the LIFO slot and throw off our counts. + let _ = nop.await; // Spawn some tasks for _ in 0..100 { From eba7e1d51ecf6158ba1206199523fee5df1333e0 Mon Sep 17 00:00:00 2001 From: Eliza Weisman Date: Wed, 2 Jul 2025 08:56:30 -0700 Subject: [PATCH 11/15] Update queue.rs Co-authored-by: Matt Keeter --- tokio/src/runtime/scheduler/multi_thread/queue.rs | 5 ++--- 1 file changed, 2 insertions(+), 3 deletions(-) diff --git a/tokio/src/runtime/scheduler/multi_thread/queue.rs b/tokio/src/runtime/scheduler/multi_thread/queue.rs index 4c3ba93fb78..4bfe25fc174 100644 --- a/tokio/src/runtime/scheduler/multi_thread/queue.rs +++ b/tokio/src/runtime/scheduler/multi_thread/queue.rs @@ -465,7 +465,7 @@ impl Steal { dst_stats.incr_steal_count(n as u16 + lifo.is_some() as u16); dst_stats.incr_steal_operations(); - let ret = if lifo.is_some() { + let ret = if let Some(lifo) = lifo { // If we took the task from the LIFO slot, just return it as the // next task to run, rather than messing around with tasks from the // queue. @@ -481,8 +481,7 @@ impl Steal { // safety: the value was written as part of `steal_into2` and not // exposed to stealers, so no other thread can access it. - let task = dst.inner.buffer[ret_idx].with(|ptr| unsafe { ptr::read((*ptr).as_ptr()) }); - Some(task) + dst.inner.buffer[ret_idx].with(|ptr| unsafe { ptr::read((*ptr).as_ptr()) }) }; if n == 0 { From f5d5766eda58ab3d800592ab4314474df7d60e7f Mon Sep 17 00:00:00 2001 From: Eliza Weisman Date: Wed, 2 Jul 2025 09:19:43 -0700 Subject: [PATCH 12/15] unbreak return types after applying @mkeeter's suggestion --- tokio/src/runtime/scheduler/multi_thread/queue.rs | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/tokio/src/runtime/scheduler/multi_thread/queue.rs b/tokio/src/runtime/scheduler/multi_thread/queue.rs index 4bfe25fc174..86781eb26a6 100644 --- a/tokio/src/runtime/scheduler/multi_thread/queue.rs +++ b/tokio/src/runtime/scheduler/multi_thread/queue.rs @@ -486,13 +486,13 @@ impl Steal { if n == 0 { // The `dst` queue is empty, but a single task was stolen - return ret; + return Some(ret); } // Make the stolen items available to consumers dst.inner.tail.store(dst_tail.wrapping_add(n), Release); - ret + Some(ret) } // Steal tasks from `self`, placing them into `dst`. Returns the number of From adf146f2fec6dff680cd8ed03d8a0cf06da80960 Mon Sep 17 00:00:00 2001 From: Eliza Weisman Date: Wed, 2 Jul 2025 09:22:45 -0700 Subject: [PATCH 13/15] match on LIFO push result as suggested by @ADD-SP in https://github.com/tokio-rs/tokio/pull/7431#discussion_r2178842825 --- .../runtime/scheduler/multi_thread/worker.rs | 17 +++++++++-------- 1 file changed, 9 insertions(+), 8 deletions(-) diff --git a/tokio/src/runtime/scheduler/multi_thread/worker.rs b/tokio/src/runtime/scheduler/multi_thread/worker.rs index 605a2e1e176..a5b544f09e7 100644 --- a/tokio/src/runtime/scheduler/multi_thread/worker.rs +++ b/tokio/src/runtime/scheduler/multi_thread/worker.rs @@ -1082,14 +1082,15 @@ impl Handle { true } else { // Push to the LIFO slot - let prev = core.run_queue.push_lifo(task); - - if let Some(prev) = prev { - core.run_queue - .push_back_or_overflow(prev, self, &mut core.stats); - true - } else { - false + match core.run_queue.push_lifo(task) { + Some(prev) => { + // There was a previous task in the LIFO slot which needs + // to be pushed to the back of the run queue. + core.run_queue + .push_back_or_overflow(prev, self, &mut core.stats); + true + } + None => false, } }; From 0889f08f27ee6f92582d6b4ad4cd10b1d4ae51a1 Mon Sep 17 00:00:00 2001 From: Eliza Weisman Date: Thu, 3 Jul 2025 11:01:27 -0700 Subject: [PATCH 14/15] more comments explaining the `lifo_stealable` test Thanks @ADD-SP for the suggestions! --- tokio/tests/rt_threaded.rs | 67 +++++++++++++++++++++++++++++--------- 1 file changed, 51 insertions(+), 16 deletions(-) diff --git a/tokio/tests/rt_threaded.rs b/tokio/tests/rt_threaded.rs index a76583e26f7..4513cc5f93f 100644 --- a/tokio/tests/rt_threaded.rs +++ b/tokio/tests/rt_threaded.rs @@ -701,9 +701,25 @@ fn mutex_in_block_in_place() { fn lifo_stealable() { use std::time::Duration; - let (unblock_tx, unblock_rx) = tokio::sync::oneshot::channel(); - let (task_started_tx, task_started_rx) = tokio::sync::oneshot::channel(); + // This test constructs a scenario where a task (the "blocker task") + // notifies another task (the "victim task") and then blocks that worker + // thread indefinitely. The victim task is placed in the worker's LIFO + // slot, and will only run to completion if another worker steals it from + // the LIFO slot, as the current worker remains blocked running the blocker + // task. + // + // To make the blocker task block its worker thread without yielding, we use + // a `std::sync` blocking channel, so that we can eventually unblock it when + // the test completes. let (block_thread_tx, block_thread_rx) = mpsc::channel::<()>(); + // We use this channel to wait until the victim task has started running. If + // we just spawned the victim task and then immediately blocked the worker + // thread, it would be in the global inject queue, rather than in the + // worker's LIFO slot. + let (task_started_tx, task_started_rx) = tokio::sync::oneshot::channel(); + // Finally, this channel is used by the blocker task to wake up the victim + // task, so that it is placed in the worker's LIFO slot. + let (notify_tx, notify_rx) = tokio::sync::oneshot::channel(); let rt = runtime::Builder::new_multi_thread() // Make sure there are enough workers that one can be parked running the // I/O driver and another can be parked running the timer wheel and @@ -716,44 +732,63 @@ fn lifo_stealable() { rt.block_on(async { // Keep the runtime busy so that the workers that might steal the // blocked task don't all park themselves forever. + // + // Since this task will always be woken by whichever worker is holding + // the time driver, rather than a worker that's executing tasks, it + // shouldn't ever kick the victim task out of its worker's LIFO slot. let churn = tokio::spawn(async move { loop { tokio::time::sleep(Duration::from_millis(64)).await; } }); - let blocked_task_joined = tokio::spawn(async move { - println!("[LIFO] task started"); + let victim_task_joined = tokio::spawn(async move { + println!("[victim] task started"); task_started_tx.send(()).unwrap(); - println!("[LIFO] task waiting for wakeup..."); - unblock_rx.await.unwrap(); - println!("[LIFO] task running after wakeup"); + println!("[victim] task waiting for wakeup..."); + notify_rx.await.unwrap(); + println!("[victim] task running after wakeup"); }); - // Wait for the blocked task to have been polled once and have yielded - // before we spawn the task that will notify it. + // Wait for the victim task to have been polled once and have yielded + // before we spawn the task that will notify it. This ensures that it + // will be placed in the LIFO slot of the same worker thread as the + // blocker task, rather than on the global injector queue. task_started_rx.await.unwrap(); - println!("[main] LIFO slot task start acked!"); + println!("[main] victim slot task start acked!"); - // Now, spawn a task that will notify the blocked task before going + // Now, spawn a task that will notify the victim task before going // blocking forever. tokio::spawn(async move { println!("[blocker] sending wakeup"); - unblock_tx.send(()).unwrap(); + notify_tx.send(()).unwrap(); println!("[blocker] blocking the worker thread..."); // Block the worker thread indefinitely by waiting for a message on - // a blocking channel. Using a channel rather than e.g. `loop {}` - // allows us to terminate the task cleanly when the test finishes. + // a blocking channel. Since we just notified the victim task, it + // went into the current worker thread's LIFO slot, and will only + // be able to complete if another worker thread successfully steals + // it from the LIFO slot. + // + // Using a channel rather than e.g. `loop {}` allows us to terminate + // the task cleanly when the test finishes. let _ = block_thread_rx.recv(); println!("[blocker] done"); }); println!("[main] blocker task spawned"); - let result = tokio::time::timeout(Duration::from_secs(30), blocked_task_joined).await; + // Wait for the victim task to join. If it does, then it has been stolen + // by another worker thread successfully. + // + // The 30-second timeout is chosen arbitrarily: its purpose is to ensure + // that the failure mode for this test is a panic, rather than hanging + // indefinitely. 30 seconds should be plenty of time for the task to be + // stolen, if it's going to work. + let result = tokio::time::timeout(Duration::from_secs(30), victim_task_joined).await; println!("[main] result: {result:?}"); - // Before possibly panicking, make sure that we wake up the blocked task + + // Before possibly panicking, make sure that we wake up the blocker task // so that it doesn't stop the runtime from shutting down. block_thread_tx.send(()).unwrap(); churn.abort(); From aff05a99f6b5587a500679ac3a08c609aa4f96a6 Mon Sep 17 00:00:00 2001 From: Eliza Weisman Date: Fri, 4 Jul 2025 10:57:43 -0700 Subject: [PATCH 15/15] get rid of `should_notify` as per [this comment][1] from @Darksonn, we should notify other workers any time we push to the LIFO slot, as it is stealable now. this simplifies the logic in `schedule_local` a bit, as we can get rid of the `should_notify` bool. [1]: https://github.com/tokio-rs/tokio/pull/7431#discussion_r2184724657 --- .../runtime/scheduler/multi_thread/worker.rs | 19 +++++++------------ 1 file changed, 7 insertions(+), 12 deletions(-) diff --git a/tokio/src/runtime/scheduler/multi_thread/worker.rs b/tokio/src/runtime/scheduler/multi_thread/worker.rs index a5b544f09e7..ebdea59aadc 100644 --- a/tokio/src/runtime/scheduler/multi_thread/worker.rs +++ b/tokio/src/runtime/scheduler/multi_thread/worker.rs @@ -1076,28 +1076,23 @@ impl Handle { // task must always be pushed to the back of the queue, enabling other // tasks to be executed. If **not** a yield, then there is more // flexibility and the task may go to the front of the queue. - let should_notify = if is_yield || !core.lifo_enabled { + if is_yield || !core.lifo_enabled { core.run_queue .push_back_or_overflow(task, self, &mut core.stats); - true } else { // Push to the LIFO slot - match core.run_queue.push_lifo(task) { - Some(prev) => { - // There was a previous task in the LIFO slot which needs - // to be pushed to the back of the run queue. - core.run_queue - .push_back_or_overflow(prev, self, &mut core.stats); - true - } - None => false, + if let Some(prev) = core.run_queue.push_lifo(task) { + // There was a previous task in the LIFO slot which needs + // to be pushed to the back of the run queue. + core.run_queue + .push_back_or_overflow(prev, self, &mut core.stats); } }; // Only notify if not currently parked. If `park` is `None`, then the // scheduling is from a resource driver. As notifications often come in // batches, the notification is delayed until the park is complete. - if should_notify && core.park.is_some() { + if core.park.is_some() { self.notify_parked_local(); } }