Skip to content

runtime: steal tasks from the LIFO slot #7431

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 17 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 16 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
59 changes: 48 additions & 11 deletions tokio/src/runtime/scheduler/multi_thread/queue.rs
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,13 @@ pub(crate) struct Inner<T: 'static> {
/// Only updated by producer thread but read by many threads.
tail: AtomicUnsignedShort,

/// When a task is scheduled from a worker, it is stored in this slot. The
/// worker will check this slot for a task **before** checking the run
/// queue. This effectively results in the **last** scheduled task to be run
/// next (LIFO). This is an optimization for improving locality which
/// benefits message passing patterns and helps to reduce latency.
lifo: task::AtomicNotified<T>,

/// Elements
buffer: Box<[UnsafeCell<MaybeUninit<task::Notified<T>>>; LOCAL_QUEUE_CAPACITY]>,
}
Expand Down Expand Up @@ -92,6 +99,7 @@ pub(crate) fn local<T: 'static>() -> (Steal<T>, Local<T>) {
let inner = Arc::new(Inner {
head: AtomicUnsignedLong::new(0),
tail: AtomicUnsignedShort::new(0),
lifo: task::AtomicNotified::empty(),
buffer: make_fixed_size(buffer.into_boxed_slice()),
});

Expand All @@ -108,9 +116,10 @@ impl<T> Local<T> {
/// Returns the number of entries in the queue
pub(crate) fn len(&self) -> usize {
let (_, head) = unpack(self.inner.head.load(Acquire));
let lifo = self.inner.lifo.is_some() as usize;
// safety: this is the **only** thread that updates this cell.
let tail = unsafe { self.inner.tail.unsync_load() };
len(head, tail)
len(head, tail) + lifo
}

/// How many tasks can be pushed into the queue
Expand Down Expand Up @@ -388,14 +397,28 @@ impl<T> Local<T> {

Some(self.inner.buffer[idx].with(|ptr| unsafe { ptr::read(ptr).assume_init() }))
}

/// Pushes a task to the LIFO slot, returning the task previously in the
/// LIFO slot (if there was one).
pub(crate) fn push_lifo(&self, task: task::Notified<T>) -> Option<task::Notified<T>> {
self.inner.lifo.swap(Some(task))
}

/// Pops the task currently held in the LIFO slot, if there is one;
/// otherwise, returns `None`.
pub(crate) fn pop_lifo(&self) -> Option<task::Notified<T>> {
// LIFO-suction!
self.inner.lifo.take()
}
}

impl<T> Steal<T> {
/// Returns the number of entries in the queue
pub(crate) fn len(&self) -> usize {
let (_, head) = unpack(self.0.head.load(Acquire));
let tail = self.0.tail.load(Acquire);
len(head, tail)
let lifo = self.0.lifo.is_some() as usize;
len(head, tail) + lifo
}

/// Return true if the queue is empty,
Expand Down Expand Up @@ -429,23 +452,37 @@ impl<T> Steal<T> {
// tasks in `dst`.
let mut n = self.steal_into2(dst, dst_tail);

if n == 0 {
// Ooh, there's another task just sitting there? Grab that one, too!
let lifo = self.0.lifo.take();

if n == 0 && lifo.is_none() {
// No tasks were stolen
return None;
}

dst_stats.incr_steal_count(n as u16);
// If we also grabbed the task from the LIFO slot, include that in the
// steal count as well.
dst_stats.incr_steal_count(n as u16 + lifo.is_some() as u16);
Comment on lines +463 to +465
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For reviewers: Rust has defined the casting from bool to u16.

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's also possible to use u16::from(bool), if you care about cast_lossless cleanliness.

dst_stats.incr_steal_operations();

// We are returning a task here
n -= 1;
let ret = if let Some(lifo) = lifo {
// If we took the task from the LIFO slot, just return it as the
// next task to run, rather than messing around with tasks from the
// queue.
lifo
} else {
// We are returning a task from the queue here.
n -= 1;

let ret_pos = dst_tail.wrapping_add(n);
let ret_idx = ret_pos as usize & MASK;
// The LIFO slot was was empty, so take the last task we squirted
// into `dst` instead.
let ret_pos = dst_tail.wrapping_add(n);
let ret_idx = ret_pos as usize & MASK;

// safety: the value was written as part of `steal_into2` and not
// exposed to stealers, so no other thread can access it.
let ret = dst.inner.buffer[ret_idx].with(|ptr| unsafe { ptr::read((*ptr).as_ptr()) });
// safety: the value was written as part of `steal_into2` and not
// exposed to stealers, so no other thread can access it.
dst.inner.buffer[ret_idx].with(|ptr| unsafe { ptr::read((*ptr).as_ptr()) })
};

if n == 0 {
// The `dst` queue is empty, but a single task was stolen
Expand Down
37 changes: 14 additions & 23 deletions tokio/src/runtime/scheduler/multi_thread/worker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -101,13 +101,6 @@ struct Core {
/// Used to schedule bookkeeping tasks every so often.
tick: u32,

/// When a task is scheduled from a worker, it is stored in this slot. The
/// worker will check this slot for a task **before** checking the run
/// queue. This effectively results in the **last** scheduled task to be run
/// next (LIFO). This is an optimization for improving locality which
/// benefits message passing patterns and helps to reduce latency.
lifo_slot: Option<Notified>,

/// When `true`, locally scheduled tasks go to the LIFO slot. When `false`,
/// they go to the back of the `run_queue`.
lifo_enabled: bool,
Expand Down Expand Up @@ -257,7 +250,6 @@ pub(super) fn create(

cores.push(Box::new(Core {
tick: 0,
lifo_slot: None,
lifo_enabled: !config.disable_lifo_slot,
run_queue,
is_searching: false,
Expand Down Expand Up @@ -405,7 +397,7 @@ where
// If we heavily call `spawn_blocking`, there might be no available thread to
// run this core. Except for the task in the lifo_slot, all tasks can be
// stolen, so we move the task out of the lifo_slot to the run_queue.
if let Some(task) = core.lifo_slot.take() {
if let Some(task) = core.run_queue.pop_lifo() {
core.run_queue
.push_back_or_overflow(task, &*cx.worker.handle, &mut core.stats);
}
Expand Down Expand Up @@ -620,7 +612,7 @@ impl Context {
};

// Check for a task in the LIFO slot
let task = match core.lifo_slot.take() {
let task = match core.run_queue.pop_lifo() {
Some(task) => task,
None => {
self.reset_lifo_enabled(&mut core);
Expand Down Expand Up @@ -858,7 +850,7 @@ impl Core {
}

fn next_local_task(&mut self) -> Option<Notified> {
self.lifo_slot.take().or_else(|| self.run_queue.pop())
self.run_queue.pop_lifo().or_else(|| self.run_queue.pop())
}

/// Function responsible for stealing tasks from another worker
Expand Down Expand Up @@ -914,7 +906,7 @@ impl Core {
}

fn has_tasks(&self) -> bool {
self.lifo_slot.is_some() || self.run_queue.has_tasks()
self.run_queue.has_tasks()
}

fn should_notify_others(&self) -> bool {
Expand All @@ -923,7 +915,7 @@ impl Core {
if self.is_searching {
return false;
}
self.lifo_slot.is_some() as usize + self.run_queue.len() > 1
self.run_queue.len() > 1
}

/// Prepares the worker state for parking.
Expand Down Expand Up @@ -1090,17 +1082,16 @@ impl Handle {
true
} else {
// Push to the LIFO slot
let prev = core.lifo_slot.take();
let ret = prev.is_some();

if let Some(prev) = prev {
core.run_queue
.push_back_or_overflow(prev, self, &mut core.stats);
match core.run_queue.push_lifo(task) {
Some(prev) => {
// There was a previous task in the LIFO slot which needs
// to be pushed to the back of the run queue.
core.run_queue
.push_back_or_overflow(prev, self, &mut core.stats);
true
}
None => false,
Copy link
Contributor

@Darksonn Darksonn Jul 4, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This boolean indicates whether other worker threads are notified that there is work available to steal. Since the lifo slot is now stealable, it should no longer be false in this scenario.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this something we could have caught with a loom test? Maybe put something in the lifo slot and then block the thread, concurrently with another thread doing nothing?

}

core.lifo_slot = Some(task);

ret
};

// Only notify if not currently parked. If `park` is `None`, then the
Expand Down
58 changes: 58 additions & 0 deletions tokio/src/runtime/task/atomic_notified.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
use crate::loom::sync::atomic::AtomicPtr;
use crate::runtime::task::{Header, Notified, RawTask};

use std::marker::PhantomData;
use std::ptr;
use std::ptr::NonNull;
use std::sync::atomic::Ordering::{AcqRel, Acquire};

/// An atomic cell which can contain a pointer to a [`Notified`] task.
///
/// This is similar to the `crate::util::AtomicCell` type, but specialized to
/// hold a task pointer --- this type "remembers" the task's scheduler generic
/// when a task is stored in the cell, so that the pointer can be turned back
/// into a [`Notified`] task with the correct generic type when it is retrieved.
pub(crate) struct AtomicNotified<S: 'static> {
task: AtomicPtr<Header>,
_scheduler: PhantomData<S>,
}

impl<S: 'static> AtomicNotified<S> {
pub(crate) fn empty() -> Self {
Self {
task: AtomicPtr::new(ptr::null_mut()),
_scheduler: PhantomData,
}
}

pub(crate) fn swap(&self, task: Option<Notified<S>>) -> Option<Notified<S>> {
let new = task
.map(|t| t.into_raw().header_ptr().as_ptr())
.unwrap_or_else(ptr::null_mut);
let old = self.task.swap(new, AcqRel);
NonNull::new(old).map(|ptr| unsafe {
// Safety: since we only allow tasks with the same scheduler type to
// be placed in this cell, we know that the pointed task's scheduler
// type matches the type parameter S.
Notified::from_raw(RawTask::from_raw(ptr))
})
}

pub(crate) fn take(&self) -> Option<Notified<S>> {
self.swap(None)
}

pub(crate) fn is_some(&self) -> bool {
!self.task.load(Acquire).is_null()
}
}

unsafe impl<S: Send> Send for AtomicNotified<S> {}
unsafe impl<S: Send> Sync for AtomicNotified<S> {}

impl<S> Drop for AtomicNotified<S> {
fn drop(&mut self) {
// Ensure the task reference is dropped if this cell is dropped.
let _ = self.take();
}
}
5 changes: 5 additions & 0 deletions tokio/src/runtime/task/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -214,6 +214,11 @@ pub(crate) use self::raw::RawTask;
mod state;
use self::state::State;

#[cfg(feature = "rt-multi-thread")]
mod atomic_notified;
#[cfg(feature = "rt-multi-thread")]
pub(crate) use self::atomic_notified::AtomicNotified;

mod waker;

pub(crate) use self::spawn_location::SpawnLocation;
Expand Down
Loading