diff --git a/tokio/src/runtime/scheduler/multi_thread/queue.rs b/tokio/src/runtime/scheduler/multi_thread/queue.rs index 68670e63ca1..86781eb26a6 100644 --- a/tokio/src/runtime/scheduler/multi_thread/queue.rs +++ b/tokio/src/runtime/scheduler/multi_thread/queue.rs @@ -52,6 +52,13 @@ pub(crate) struct Inner { /// Only updated by producer thread but read by many threads. tail: AtomicUnsignedShort, + /// When a task is scheduled from a worker, it is stored in this slot. The + /// worker will check this slot for a task **before** checking the run + /// queue. This effectively results in the **last** scheduled task to be run + /// next (LIFO). This is an optimization for improving locality which + /// benefits message passing patterns and helps to reduce latency. + lifo: task::AtomicNotified, + /// Elements buffer: Box<[UnsafeCell>>; LOCAL_QUEUE_CAPACITY]>, } @@ -92,6 +99,7 @@ pub(crate) fn local() -> (Steal, Local) { let inner = Arc::new(Inner { head: AtomicUnsignedLong::new(0), tail: AtomicUnsignedShort::new(0), + lifo: task::AtomicNotified::empty(), buffer: make_fixed_size(buffer.into_boxed_slice()), }); @@ -108,9 +116,10 @@ impl Local { /// Returns the number of entries in the queue pub(crate) fn len(&self) -> usize { let (_, head) = unpack(self.inner.head.load(Acquire)); + let lifo = self.inner.lifo.is_some() as usize; // safety: this is the **only** thread that updates this cell. let tail = unsafe { self.inner.tail.unsync_load() }; - len(head, tail) + len(head, tail) + lifo } /// How many tasks can be pushed into the queue @@ -388,6 +397,19 @@ impl Local { Some(self.inner.buffer[idx].with(|ptr| unsafe { ptr::read(ptr).assume_init() })) } + + /// Pushes a task to the LIFO slot, returning the task previously in the + /// LIFO slot (if there was one). + pub(crate) fn push_lifo(&self, task: task::Notified) -> Option> { + self.inner.lifo.swap(Some(task)) + } + + /// Pops the task currently held in the LIFO slot, if there is one; + /// otherwise, returns `None`. + pub(crate) fn pop_lifo(&self) -> Option> { + // LIFO-suction! + self.inner.lifo.take() + } } impl Steal { @@ -395,7 +417,8 @@ impl Steal { pub(crate) fn len(&self) -> usize { let (_, head) = unpack(self.0.head.load(Acquire)); let tail = self.0.tail.load(Acquire); - len(head, tail) + let lifo = self.0.lifo.is_some() as usize; + len(head, tail) + lifo } /// Return true if the queue is empty, @@ -429,23 +452,37 @@ impl Steal { // tasks in `dst`. let mut n = self.steal_into2(dst, dst_tail); - if n == 0 { + // Ooh, there's another task just sitting there? Grab that one, too! + let lifo = self.0.lifo.take(); + + if n == 0 && lifo.is_none() { // No tasks were stolen return None; } - dst_stats.incr_steal_count(n as u16); + // If we also grabbed the task from the LIFO slot, include that in the + // steal count as well. + dst_stats.incr_steal_count(n as u16 + lifo.is_some() as u16); dst_stats.incr_steal_operations(); - // We are returning a task here - n -= 1; + let ret = if let Some(lifo) = lifo { + // If we took the task from the LIFO slot, just return it as the + // next task to run, rather than messing around with tasks from the + // queue. + lifo + } else { + // We are returning a task from the queue here. + n -= 1; - let ret_pos = dst_tail.wrapping_add(n); - let ret_idx = ret_pos as usize & MASK; + // The LIFO slot was was empty, so take the last task we squirted + // into `dst` instead. + let ret_pos = dst_tail.wrapping_add(n); + let ret_idx = ret_pos as usize & MASK; - // safety: the value was written as part of `steal_into2` and not - // exposed to stealers, so no other thread can access it. - let ret = dst.inner.buffer[ret_idx].with(|ptr| unsafe { ptr::read((*ptr).as_ptr()) }); + // safety: the value was written as part of `steal_into2` and not + // exposed to stealers, so no other thread can access it. + dst.inner.buffer[ret_idx].with(|ptr| unsafe { ptr::read((*ptr).as_ptr()) }) + }; if n == 0 { // The `dst` queue is empty, but a single task was stolen diff --git a/tokio/src/runtime/scheduler/multi_thread/worker.rs b/tokio/src/runtime/scheduler/multi_thread/worker.rs index 8a9719b9bd0..ebdea59aadc 100644 --- a/tokio/src/runtime/scheduler/multi_thread/worker.rs +++ b/tokio/src/runtime/scheduler/multi_thread/worker.rs @@ -101,13 +101,6 @@ struct Core { /// Used to schedule bookkeeping tasks every so often. tick: u32, - /// When a task is scheduled from a worker, it is stored in this slot. The - /// worker will check this slot for a task **before** checking the run - /// queue. This effectively results in the **last** scheduled task to be run - /// next (LIFO). This is an optimization for improving locality which - /// benefits message passing patterns and helps to reduce latency. - lifo_slot: Option, - /// When `true`, locally scheduled tasks go to the LIFO slot. When `false`, /// they go to the back of the `run_queue`. lifo_enabled: bool, @@ -257,7 +250,6 @@ pub(super) fn create( cores.push(Box::new(Core { tick: 0, - lifo_slot: None, lifo_enabled: !config.disable_lifo_slot, run_queue, is_searching: false, @@ -405,7 +397,7 @@ where // If we heavily call `spawn_blocking`, there might be no available thread to // run this core. Except for the task in the lifo_slot, all tasks can be // stolen, so we move the task out of the lifo_slot to the run_queue. - if let Some(task) = core.lifo_slot.take() { + if let Some(task) = core.run_queue.pop_lifo() { core.run_queue .push_back_or_overflow(task, &*cx.worker.handle, &mut core.stats); } @@ -620,7 +612,7 @@ impl Context { }; // Check for a task in the LIFO slot - let task = match core.lifo_slot.take() { + let task = match core.run_queue.pop_lifo() { Some(task) => task, None => { self.reset_lifo_enabled(&mut core); @@ -858,7 +850,7 @@ impl Core { } fn next_local_task(&mut self) -> Option { - self.lifo_slot.take().or_else(|| self.run_queue.pop()) + self.run_queue.pop_lifo().or_else(|| self.run_queue.pop()) } /// Function responsible for stealing tasks from another worker @@ -914,7 +906,7 @@ impl Core { } fn has_tasks(&self) -> bool { - self.lifo_slot.is_some() || self.run_queue.has_tasks() + self.run_queue.has_tasks() } fn should_notify_others(&self) -> bool { @@ -923,7 +915,7 @@ impl Core { if self.is_searching { return false; } - self.lifo_slot.is_some() as usize + self.run_queue.len() > 1 + self.run_queue.len() > 1 } /// Prepares the worker state for parking. @@ -1084,29 +1076,23 @@ impl Handle { // task must always be pushed to the back of the queue, enabling other // tasks to be executed. If **not** a yield, then there is more // flexibility and the task may go to the front of the queue. - let should_notify = if is_yield || !core.lifo_enabled { + if is_yield || !core.lifo_enabled { core.run_queue .push_back_or_overflow(task, self, &mut core.stats); - true } else { // Push to the LIFO slot - let prev = core.lifo_slot.take(); - let ret = prev.is_some(); - - if let Some(prev) = prev { + if let Some(prev) = core.run_queue.push_lifo(task) { + // There was a previous task in the LIFO slot which needs + // to be pushed to the back of the run queue. core.run_queue .push_back_or_overflow(prev, self, &mut core.stats); } - - core.lifo_slot = Some(task); - - ret }; // Only notify if not currently parked. If `park` is `None`, then the // scheduling is from a resource driver. As notifications often come in // batches, the notification is delayed until the park is complete. - if should_notify && core.park.is_some() { + if core.park.is_some() { self.notify_parked_local(); } } diff --git a/tokio/src/runtime/task/atomic_notified.rs b/tokio/src/runtime/task/atomic_notified.rs new file mode 100644 index 00000000000..05a408755e8 --- /dev/null +++ b/tokio/src/runtime/task/atomic_notified.rs @@ -0,0 +1,58 @@ +use crate::loom::sync::atomic::AtomicPtr; +use crate::runtime::task::{Header, Notified, RawTask}; + +use std::marker::PhantomData; +use std::ptr; +use std::ptr::NonNull; +use std::sync::atomic::Ordering::{AcqRel, Acquire}; + +/// An atomic cell which can contain a pointer to a [`Notified`] task. +/// +/// This is similar to the `crate::util::AtomicCell` type, but specialized to +/// hold a task pointer --- this type "remembers" the task's scheduler generic +/// when a task is stored in the cell, so that the pointer can be turned back +/// into a [`Notified`] task with the correct generic type when it is retrieved. +pub(crate) struct AtomicNotified { + task: AtomicPtr
, + _scheduler: PhantomData, +} + +impl AtomicNotified { + pub(crate) fn empty() -> Self { + Self { + task: AtomicPtr::new(ptr::null_mut()), + _scheduler: PhantomData, + } + } + + pub(crate) fn swap(&self, task: Option>) -> Option> { + let new = task + .map(|t| t.into_raw().header_ptr().as_ptr()) + .unwrap_or_else(ptr::null_mut); + let old = self.task.swap(new, AcqRel); + NonNull::new(old).map(|ptr| unsafe { + // Safety: since we only allow tasks with the same scheduler type to + // be placed in this cell, we know that the pointed task's scheduler + // type matches the type parameter S. + Notified::from_raw(RawTask::from_raw(ptr)) + }) + } + + pub(crate) fn take(&self) -> Option> { + self.swap(None) + } + + pub(crate) fn is_some(&self) -> bool { + !self.task.load(Acquire).is_null() + } +} + +unsafe impl Send for AtomicNotified {} +unsafe impl Sync for AtomicNotified {} + +impl Drop for AtomicNotified { + fn drop(&mut self) { + // Ensure the task reference is dropped if this cell is dropped. + let _ = self.take(); + } +} diff --git a/tokio/src/runtime/task/mod.rs b/tokio/src/runtime/task/mod.rs index e6b1a0804ec..191fd611ceb 100644 --- a/tokio/src/runtime/task/mod.rs +++ b/tokio/src/runtime/task/mod.rs @@ -214,6 +214,11 @@ pub(crate) use self::raw::RawTask; mod state; use self::state::State; +#[cfg(feature = "rt-multi-thread")] +mod atomic_notified; +#[cfg(feature = "rt-multi-thread")] +pub(crate) use self::atomic_notified::AtomicNotified; + mod waker; pub(crate) use self::spawn_location::SpawnLocation; diff --git a/tokio/src/runtime/tests/loom_multi_thread/queue.rs b/tokio/src/runtime/tests/loom_multi_thread/queue.rs index 0d818283653..2098b548a2b 100644 --- a/tokio/src/runtime/tests/loom_multi_thread/queue.rs +++ b/tokio/src/runtime/tests/loom_multi_thread/queue.rs @@ -62,6 +62,65 @@ fn basic() { }); } +// Like `basic`, but with tasks in the LIFO slot. +#[test] +fn basic_lifo() { + loom::model(|| { + let (steal, mut local) = queue::local(); + let inject = RefCell::new(vec![]); + let mut stats = new_stats(); + + let th = thread::spawn(move || { + let mut stats = new_stats(); + let (_, mut local) = queue::local(); + let mut n = 0; + + for _ in 0..3 { + if steal.steal_into(&mut local, &mut stats).is_some() { + n += 1; + } + + while local.pop().is_some() { + n += 1; + } + } + + n + }); + + let mut n = 0; + + for _ in 0..2 { + for _ in 0..2 { + let (task, _) = unowned(async {}); + if let Some(prev) = local.push_lifo(task) { + local.push_back_or_overflow(prev, &inject, &mut stats); + } + } + + if local.pop_lifo().or_else(|| local.pop()).is_some() { + n += 1; + } + + // Push another task + let (task, _) = unowned(async {}); + if let Some(prev) = local.push_lifo(task) { + local.push_back_or_overflow(prev, &inject, &mut stats); + } + + while local.pop_lifo().or_else(|| local.pop()).is_some() { + n += 1; + } + } + + n += inject.borrow_mut().drain(..).count(); + + n += th.join().unwrap(); + + assert_eq!(6, n); + }); +} + #[test] fn steal_overflow() { loom::model(|| { @@ -116,32 +175,59 @@ fn steal_overflow() { fn multi_stealer() { const NUM_TASKS: usize = 5; - fn steal_tasks(steal: queue::Steal) -> usize { + loom::model(|| { + let (steal, mut local) = queue::local(); + let inject = RefCell::new(vec![]); let mut stats = new_stats(); - let (_, mut local) = queue::local(); - if steal.steal_into(&mut local, &mut stats).is_none() { - return 0; + // Push work + for _ in 0..NUM_TASKS { + let (task, _) = unowned(async {}); + local.push_back_or_overflow(task, &inject, &mut stats); } - let mut n = 1; + let th1 = { + let steal = steal.clone(); + thread::spawn(move || steal_tasks(steal)) + }; + + let th2 = thread::spawn(move || steal_tasks(steal)); + + let mut n = 0; while local.pop().is_some() { n += 1; } - n - } + n += inject.borrow_mut().drain(..).count(); + + n += th1.join().unwrap(); + n += th2.join().unwrap(); + + assert_eq!(n, NUM_TASKS); + }); +} + +// Like `multi_stealer`, but with tasks in the LIFO slot. +#[test] +fn multi_stealer_lifo() { + const NUM_TASKS: usize = 5; loom::model(|| { let (steal, mut local) = queue::local(); let inject = RefCell::new(vec![]); let mut stats = new_stats(); - // Push work + // Push work into the LIFO slot. for _ in 0..NUM_TASKS { let (task, _) = unowned(async {}); - local.push_back_or_overflow(task, &inject, &mut stats); + // Push the new task into the LIFO slot, as though it's being + // notified locally. + if let Some(prev) = local.push_lifo(task) { + // If a task was already in the LIFO slot, stick the previous + // LIFO task into the queue. + local.push_back_or_overflow(prev, &inject, &mut stats); + } } let th1 = { @@ -153,7 +239,7 @@ fn multi_stealer() { let mut n = 0; - while local.pop().is_some() { + while local.pop_lifo().or_else(|| local.pop()).is_some() { n += 1; } @@ -166,6 +252,23 @@ fn multi_stealer() { }); } +fn steal_tasks(steal: queue::Steal) -> usize { + let mut stats = new_stats(); + let (_, mut local) = queue::local(); + + if steal.steal_into(&mut local, &mut stats).is_none() { + return 0; + } + + let mut n = 1; + + while local.pop().is_some() { + n += 1; + } + + n +} + #[test] fn chained_steal() { loom::model(|| { diff --git a/tokio/tests/rt_threaded.rs b/tokio/tests/rt_threaded.rs index ca2a952fb73..4513cc5f93f 100644 --- a/tokio/tests/rt_threaded.rs +++ b/tokio/tests/rt_threaded.rs @@ -692,6 +692,112 @@ fn mutex_in_block_in_place() { }) } +// Tests that when a task is notified by another task and is placed in the LIFO +// slot, and then the notifying task blocks the runtime, the notified task will +// be stolen by another worker thread. +// +// Integration test for: https://github.com/tokio-rs/tokio/issues/4941 +#[test] +fn lifo_stealable() { + use std::time::Duration; + + // This test constructs a scenario where a task (the "blocker task") + // notifies another task (the "victim task") and then blocks that worker + // thread indefinitely. The victim task is placed in the worker's LIFO + // slot, and will only run to completion if another worker steals it from + // the LIFO slot, as the current worker remains blocked running the blocker + // task. + // + // To make the blocker task block its worker thread without yielding, we use + // a `std::sync` blocking channel, so that we can eventually unblock it when + // the test completes. + let (block_thread_tx, block_thread_rx) = mpsc::channel::<()>(); + // We use this channel to wait until the victim task has started running. If + // we just spawned the victim task and then immediately blocked the worker + // thread, it would be in the global inject queue, rather than in the + // worker's LIFO slot. + let (task_started_tx, task_started_rx) = tokio::sync::oneshot::channel(); + // Finally, this channel is used by the blocker task to wake up the victim + // task, so that it is placed in the worker's LIFO slot. + let (notify_tx, notify_rx) = tokio::sync::oneshot::channel(); + let rt = runtime::Builder::new_multi_thread() + // Make sure there are enough workers that one can be parked running the + // I/O driver and another can be parked running the timer wheel and + // there's still at least one worker free to steal the blocked task. + .worker_threads(4) + .enable_time() + .build() + .unwrap(); + + rt.block_on(async { + // Keep the runtime busy so that the workers that might steal the + // blocked task don't all park themselves forever. + // + // Since this task will always be woken by whichever worker is holding + // the time driver, rather than a worker that's executing tasks, it + // shouldn't ever kick the victim task out of its worker's LIFO slot. + let churn = tokio::spawn(async move { + loop { + tokio::time::sleep(Duration::from_millis(64)).await; + } + }); + + let victim_task_joined = tokio::spawn(async move { + println!("[victim] task started"); + task_started_tx.send(()).unwrap(); + println!("[victim] task waiting for wakeup..."); + notify_rx.await.unwrap(); + println!("[victim] task running after wakeup"); + }); + + // Wait for the victim task to have been polled once and have yielded + // before we spawn the task that will notify it. This ensures that it + // will be placed in the LIFO slot of the same worker thread as the + // blocker task, rather than on the global injector queue. + task_started_rx.await.unwrap(); + println!("[main] victim slot task start acked!"); + + // Now, spawn a task that will notify the victim task before going + // blocking forever. + tokio::spawn(async move { + println!("[blocker] sending wakeup"); + notify_tx.send(()).unwrap(); + + println!("[blocker] blocking the worker thread..."); + // Block the worker thread indefinitely by waiting for a message on + // a blocking channel. Since we just notified the victim task, it + // went into the current worker thread's LIFO slot, and will only + // be able to complete if another worker thread successfully steals + // it from the LIFO slot. + // + // Using a channel rather than e.g. `loop {}` allows us to terminate + // the task cleanly when the test finishes. + let _ = block_thread_rx.recv(); + println!("[blocker] done"); + }); + + println!("[main] blocker task spawned"); + + // Wait for the victim task to join. If it does, then it has been stolen + // by another worker thread successfully. + // + // The 30-second timeout is chosen arbitrarily: its purpose is to ensure + // that the failure mode for this test is a panic, rather than hanging + // indefinitely. 30 seconds should be plenty of time for the task to be + // stolen, if it's going to work. + let result = tokio::time::timeout(Duration::from_secs(30), victim_task_joined).await; + println!("[main] result: {result:?}"); + + // Before possibly panicking, make sure that we wake up the blocker task + // so that it doesn't stop the runtime from shutting down. + block_thread_tx.send(()).unwrap(); + churn.abort(); + result + .expect("task in LIFO slot should complete within 30 seconds") + .expect("task in LIFO slot should not panic"); + }) +} + // Testing the tuning logic is tricky as it is inherently timing based, and more // of a heuristic than an exact behavior. This test checks that the interval // changes over time based on load factors. There are no assertions, completion diff --git a/tokio/tests/rt_unstable_metrics.rs b/tokio/tests/rt_unstable_metrics.rs index a8246282126..b6b4c140e11 100644 --- a/tokio/tests/rt_unstable_metrics.rs +++ b/tokio/tests/rt_unstable_metrics.rs @@ -642,9 +642,13 @@ fn worker_local_queue_depth() { }); // Bump the next-run spawn - tokio::spawn(async {}); + let nop = tokio::spawn(async {}); + // Wait until we're sure the other worker is blocked. rx1.recv().unwrap(); + // Make sure the no-op task has terminated so that it doesn't end up + // in the LIFO slot and throw off our counts. + let _ = nop.await; // Spawn some tasks for _ in 0..100 {