Skip to content

Pin an Executor's State to minimize atomic operations. #146

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 2 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ name = "async-executor"
version = "1.13.2"
authors = ["Stjepan Glavina <[email protected]>", "John Nunley <[email protected]>"]
edition = "2021"
rust-version = "1.63"
rust-version = "1.65"
description = "Async executor"
license = "Apache-2.0 OR MIT"
repository = "https://github.com/smol-rs/async-executor"
Expand Down
82 changes: 39 additions & 43 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,7 @@ pub use static_executors::*;
/// ```
pub struct Executor<'a> {
/// The executor state.
state: AtomicPtr<State>,
pub(crate) state: AtomicPtr<State>,

/// Makes the `'a` lifetime invariant.
_marker: PhantomData<std::cell::UnsafeCell<&'a ()>>,
Expand Down Expand Up @@ -163,10 +163,11 @@ impl<'a> Executor<'a> {
/// });
/// ```
pub fn spawn<T: Send + 'a>(&self, future: impl Future<Output = T> + Send + 'a) -> Task<T> {
let mut active = self.state().active();
let state = self.state();
let mut active = state.active();

// SAFETY: `T` and the future are `Send`.
unsafe { self.spawn_inner(future, &mut active) }
unsafe { Self::spawn_inner(state, future, &mut active) }
}

/// Spawns many tasks onto the executor.
Expand Down Expand Up @@ -214,12 +215,13 @@ impl<'a> Executor<'a> {
futures: impl IntoIterator<Item = F>,
handles: &mut impl Extend<Task<F::Output>>,
) {
let mut active = Some(self.state().active());
let state = self.state();
let mut active = Some(state.as_ref().active());

// Convert the futures into tasks.
let tasks = futures.into_iter().enumerate().map(move |(i, future)| {
// SAFETY: `T` and the future are `Send`.
let task = unsafe { self.spawn_inner(future, active.as_mut().unwrap()) };
let task = unsafe { Self::spawn_inner(state, future, active.as_mut().unwrap()) };

// Yield the lock every once in a while to ease contention.
if i.wrapping_sub(1) % 500 == 0 {
Expand All @@ -240,14 +242,13 @@ impl<'a> Executor<'a> {
///
/// If this is an `Executor`, `F` and `T` must be `Send`.
unsafe fn spawn_inner<T: 'a>(
&self,
state: Pin<&'a State>,
future: impl Future<Output = T> + 'a,
active: &mut Slab<Waker>,
) -> Task<T> {
// Remove the task from the set of active tasks when the future finishes.
let entry = active.vacant_entry();
let index = entry.key();
let state = self.state_as_arc();
let future = AsyncCallOnDrop::new(future, move || drop(state.active().try_remove(index)));

// Create the task and register it in the set of active tasks.
Expand All @@ -269,12 +270,16 @@ impl<'a> Executor<'a> {
// the `Executor` is drained of all of its runnables. This ensures that
// runnables are dropped and this precondition is satisfied.
//
// `self.schedule()` is `Send`, `Sync` and `'static`, as checked below.
// `Self::schedule` is `Send`, `Sync` and `'static`, as checked below.
// Therefore we do not need to worry about what is done with the
// `Waker`.
//
// `Self::schedule` may not be k`'static`, but we make sure that the `Waker` does
// not outlive `'a`. When the executor is dropped, the `active` field is
// drained and all of the `Waker`s are woken.
let (runnable, task) = Builder::new()
.propagate_panic(true)
.spawn_unchecked(|()| future, self.schedule());
.spawn_unchecked(|()| future, Self::schedule(state));
entry.insert(runnable.waker());

runnable.schedule();
Expand Down Expand Up @@ -345,9 +350,7 @@ impl<'a> Executor<'a> {
}

/// Returns a function that schedules a runnable task when it gets woken up.
fn schedule(&self) -> impl Fn(Runnable) + Send + Sync + 'static {
let state = self.state_as_arc();

fn schedule(state: Pin<&'a State>) -> impl Fn(Runnable) + Send + Sync + 'a {
// TODO: If possible, push into the current local queue and notify the ticker.
move |runnable| {
state.queue.push(runnable).unwrap();
Expand All @@ -357,12 +360,11 @@ impl<'a> Executor<'a> {

/// Returns a pointer to the inner state.
#[inline]
fn state_ptr(&self) -> *const State {
fn state(&self) -> Pin<&'a State> {
#[cold]
fn alloc_state(atomic_ptr: &AtomicPtr<State>) -> *mut State {
let state = Arc::new(State::new());
// TODO: Switch this to use cast_mut once the MSRV can be bumped past 1.65
let ptr = Arc::into_raw(state) as *mut State;
let ptr = Arc::into_raw(state).cast_mut();
if let Err(actual) = atomic_ptr.compare_exchange(
std::ptr::null_mut(),
ptr,
Expand All @@ -381,26 +383,10 @@ impl<'a> Executor<'a> {
if ptr.is_null() {
ptr = alloc_state(&self.state);
}
ptr
}

/// Returns a reference to the inner state.
#[inline]
fn state(&self) -> &State {
// SAFETY: So long as an Executor lives, it's state pointer will always be valid
// when accessed through state_ptr.
unsafe { &*self.state_ptr() }
}

// Clones the inner state Arc
#[inline]
fn state_as_arc(&self) -> Arc<State> {
// SAFETY: So long as an Executor lives, it's state pointer will always be a valid
// Arc when accessed through state_ptr.
let arc = unsafe { Arc::from_raw(self.state_ptr()) };
let clone = arc.clone();
std::mem::forget(arc);
clone
// and will never be moved until it's dropped.
Pin::new(unsafe { &*ptr })
}
}

Expand All @@ -415,7 +401,7 @@ impl Drop for Executor<'_> {
// via Arc::into_raw in state_ptr.
let state = unsafe { Arc::from_raw(ptr) };

let mut active = state.active();
let mut active = state.pin().active();
for w in active.drain() {
w.wake();
}
Expand Down Expand Up @@ -517,11 +503,12 @@ impl<'a> LocalExecutor<'a> {
/// });
/// ```
pub fn spawn<T: 'a>(&self, future: impl Future<Output = T> + 'a) -> Task<T> {
let mut active = self.inner().state().active();
let state = self.inner().state();
let mut active = state.active();

// SAFETY: This executor is not thread safe, so the future and its result
// cannot be sent to another thread.
unsafe { self.inner().spawn_inner(future, &mut active) }
unsafe { Executor::spawn_inner(state, future, &mut active) }
}

/// Spawns many tasks onto the executor.
Expand Down Expand Up @@ -569,13 +556,14 @@ impl<'a> LocalExecutor<'a> {
futures: impl IntoIterator<Item = F>,
handles: &mut impl Extend<Task<F::Output>>,
) {
let mut active = self.inner().state().active();
let state = self.inner().state();
let mut active = state.active();

// Convert all of the futures to tasks.
let tasks = futures.into_iter().map(|future| {
// SAFETY: This executor is not thread safe, so the future and its result
// cannot be sent to another thread.
unsafe { self.inner().spawn_inner(future, &mut active) }
unsafe { Executor::spawn_inner(state, future, &mut active) }

// As only one thread can spawn or poll tasks at a time, there is no need
// to release lock contention here.
Expand Down Expand Up @@ -694,9 +682,16 @@ impl State {
}
}

fn pin(&self) -> Pin<&Self> {
Pin::new(self)
}

/// Returns a reference to currently active tasks.
fn active(&self) -> MutexGuard<'_, Slab<Waker>> {
self.active.lock().unwrap_or_else(|e| e.into_inner())
fn active(self: Pin<&Self>) -> MutexGuard<'_, Slab<Waker>> {
self.get_ref()
.active
.lock()
.unwrap_or_else(|e| e.into_inner())
}

/// Notifies a sleeping ticker.
Expand Down Expand Up @@ -1192,13 +1187,14 @@ fn _ensure_send_and_sync() {
is_sync::<Executor<'_>>(Executor::new());

let ex = Executor::new();
let state = ex.state();
is_send(ex.run(pending::<()>()));
is_sync(ex.run(pending::<()>()));
is_send(ex.tick());
is_sync(ex.tick());
is_send(ex.schedule());
is_sync(ex.schedule());
is_static(ex.schedule());
is_send(Executor::schedule(state));
is_sync(Executor::schedule(state));
is_static(Executor::schedule(state));

/// ```compile_fail
/// use async_executor::LocalExecutor;
Expand Down
31 changes: 21 additions & 10 deletions src/static_executors.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ use std::{
future::Future,
marker::PhantomData,
panic::{RefUnwindSafe, UnwindSafe},
sync::atomic::Ordering,
};

impl Executor<'static> {
Expand Down Expand Up @@ -34,11 +35,16 @@ impl Executor<'static> {
/// future::block_on(ex.run(task));
/// ```
pub fn leak(self) -> &'static StaticExecutor {
let ptr = self.state_ptr();
// SAFETY: So long as an Executor lives, it's state pointer will always be valid
// when accessed through state_ptr. This executor will live for the full 'static
// lifetime so this isn't an arbitrary lifetime extension.
let state: &'static State = unsafe { &*ptr };
let ptr = self.state.load(Ordering::Relaxed);

let state: &'static State = if ptr.is_null() {
Box::leak(Box::new(State::new()))
} else {
// SAFETY: So long as an Executor lives, it's state pointer will always be valid
// when accessed through state_ptr. This executor will live for the full 'static
// lifetime so this isn't an arbitrary lifetime extension.
unsafe { &*ptr }
};

std::mem::forget(self);

Expand Down Expand Up @@ -84,11 +90,16 @@ impl LocalExecutor<'static> {
/// future::block_on(ex.run(task));
/// ```
pub fn leak(self) -> &'static StaticLocalExecutor {
let ptr = self.inner.state_ptr();
// SAFETY: So long as a LocalExecutor lives, it's state pointer will always be valid
// when accessed through state_ptr. This executor will live for the full 'static
// lifetime so this isn't an arbitrary lifetime extension.
let state: &'static State = unsafe { &*ptr };
let ptr = self.inner.state.load(Ordering::Relaxed);

let state: &'static State = if ptr.is_null() {
Box::leak(Box::new(State::new()))
} else {
// SAFETY: So long as an Executor lives, it's state pointer will always be valid
// when accessed through state_ptr. This executor will live for the full 'static
// lifetime so this isn't an arbitrary lifetime extension.
unsafe { &*ptr }
};

std::mem::forget(self);

Expand Down