diff --git a/Cargo.toml b/Cargo.toml index c8feeb5..0353bdf 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -6,7 +6,7 @@ name = "async-executor" version = "1.13.2" authors = ["Stjepan Glavina ", "John Nunley "] edition = "2021" -rust-version = "1.63" +rust-version = "1.65" description = "Async executor" license = "Apache-2.0 OR MIT" repository = "https://github.com/smol-rs/async-executor" diff --git a/src/lib.rs b/src/lib.rs index 74e02db..0acf675 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -90,7 +90,7 @@ pub use static_executors::*; /// ``` pub struct Executor<'a> { /// The executor state. - state: AtomicPtr, + pub(crate) state: AtomicPtr, /// Makes the `'a` lifetime invariant. _marker: PhantomData>, @@ -163,10 +163,11 @@ impl<'a> Executor<'a> { /// }); /// ``` pub fn spawn(&self, future: impl Future + Send + 'a) -> Task { - let mut active = self.state().active(); + let state = self.state(); + let mut active = state.active(); // SAFETY: `T` and the future are `Send`. - unsafe { self.spawn_inner(future, &mut active) } + unsafe { Self::spawn_inner(state, future, &mut active) } } /// Spawns many tasks onto the executor. @@ -214,12 +215,13 @@ impl<'a> Executor<'a> { futures: impl IntoIterator, handles: &mut impl Extend>, ) { - let mut active = Some(self.state().active()); + let state = self.state(); + let mut active = Some(state.as_ref().active()); // Convert the futures into tasks. let tasks = futures.into_iter().enumerate().map(move |(i, future)| { // SAFETY: `T` and the future are `Send`. - let task = unsafe { self.spawn_inner(future, active.as_mut().unwrap()) }; + let task = unsafe { Self::spawn_inner(state, future, active.as_mut().unwrap()) }; // Yield the lock every once in a while to ease contention. if i.wrapping_sub(1) % 500 == 0 { @@ -240,14 +242,13 @@ impl<'a> Executor<'a> { /// /// If this is an `Executor`, `F` and `T` must be `Send`. unsafe fn spawn_inner( - &self, + state: Pin<&'a State>, future: impl Future + 'a, active: &mut Slab, ) -> Task { // Remove the task from the set of active tasks when the future finishes. let entry = active.vacant_entry(); let index = entry.key(); - let state = self.state_as_arc(); let future = AsyncCallOnDrop::new(future, move || drop(state.active().try_remove(index))); // Create the task and register it in the set of active tasks. @@ -269,12 +270,16 @@ impl<'a> Executor<'a> { // the `Executor` is drained of all of its runnables. This ensures that // runnables are dropped and this precondition is satisfied. // - // `self.schedule()` is `Send`, `Sync` and `'static`, as checked below. + // `Self::schedule` is `Send`, `Sync` and `'static`, as checked below. // Therefore we do not need to worry about what is done with the // `Waker`. + // + // `Self::schedule` may not be k`'static`, but we make sure that the `Waker` does + // not outlive `'a`. When the executor is dropped, the `active` field is + // drained and all of the `Waker`s are woken. let (runnable, task) = Builder::new() .propagate_panic(true) - .spawn_unchecked(|()| future, self.schedule()); + .spawn_unchecked(|()| future, Self::schedule(state)); entry.insert(runnable.waker()); runnable.schedule(); @@ -345,9 +350,7 @@ impl<'a> Executor<'a> { } /// Returns a function that schedules a runnable task when it gets woken up. - fn schedule(&self) -> impl Fn(Runnable) + Send + Sync + 'static { - let state = self.state_as_arc(); - + fn schedule(state: Pin<&'a State>) -> impl Fn(Runnable) + Send + Sync + 'a { // TODO: If possible, push into the current local queue and notify the ticker. move |runnable| { state.queue.push(runnable).unwrap(); @@ -357,12 +360,11 @@ impl<'a> Executor<'a> { /// Returns a pointer to the inner state. #[inline] - fn state_ptr(&self) -> *const State { + fn state(&self) -> Pin<&'a State> { #[cold] fn alloc_state(atomic_ptr: &AtomicPtr) -> *mut State { let state = Arc::new(State::new()); - // TODO: Switch this to use cast_mut once the MSRV can be bumped past 1.65 - let ptr = Arc::into_raw(state) as *mut State; + let ptr = Arc::into_raw(state).cast_mut(); if let Err(actual) = atomic_ptr.compare_exchange( std::ptr::null_mut(), ptr, @@ -381,26 +383,10 @@ impl<'a> Executor<'a> { if ptr.is_null() { ptr = alloc_state(&self.state); } - ptr - } - /// Returns a reference to the inner state. - #[inline] - fn state(&self) -> &State { // SAFETY: So long as an Executor lives, it's state pointer will always be valid - // when accessed through state_ptr. - unsafe { &*self.state_ptr() } - } - - // Clones the inner state Arc - #[inline] - fn state_as_arc(&self) -> Arc { - // SAFETY: So long as an Executor lives, it's state pointer will always be a valid - // Arc when accessed through state_ptr. - let arc = unsafe { Arc::from_raw(self.state_ptr()) }; - let clone = arc.clone(); - std::mem::forget(arc); - clone + // and will never be moved until it's dropped. + Pin::new(unsafe { &*ptr }) } } @@ -415,7 +401,7 @@ impl Drop for Executor<'_> { // via Arc::into_raw in state_ptr. let state = unsafe { Arc::from_raw(ptr) }; - let mut active = state.active(); + let mut active = state.pin().active(); for w in active.drain() { w.wake(); } @@ -517,11 +503,12 @@ impl<'a> LocalExecutor<'a> { /// }); /// ``` pub fn spawn(&self, future: impl Future + 'a) -> Task { - let mut active = self.inner().state().active(); + let state = self.inner().state(); + let mut active = state.active(); // SAFETY: This executor is not thread safe, so the future and its result // cannot be sent to another thread. - unsafe { self.inner().spawn_inner(future, &mut active) } + unsafe { Executor::spawn_inner(state, future, &mut active) } } /// Spawns many tasks onto the executor. @@ -569,13 +556,14 @@ impl<'a> LocalExecutor<'a> { futures: impl IntoIterator, handles: &mut impl Extend>, ) { - let mut active = self.inner().state().active(); + let state = self.inner().state(); + let mut active = state.active(); // Convert all of the futures to tasks. let tasks = futures.into_iter().map(|future| { // SAFETY: This executor is not thread safe, so the future and its result // cannot be sent to another thread. - unsafe { self.inner().spawn_inner(future, &mut active) } + unsafe { Executor::spawn_inner(state, future, &mut active) } // As only one thread can spawn or poll tasks at a time, there is no need // to release lock contention here. @@ -694,9 +682,16 @@ impl State { } } + fn pin(&self) -> Pin<&Self> { + Pin::new(self) + } + /// Returns a reference to currently active tasks. - fn active(&self) -> MutexGuard<'_, Slab> { - self.active.lock().unwrap_or_else(|e| e.into_inner()) + fn active(self: Pin<&Self>) -> MutexGuard<'_, Slab> { + self.get_ref() + .active + .lock() + .unwrap_or_else(|e| e.into_inner()) } /// Notifies a sleeping ticker. @@ -1192,13 +1187,14 @@ fn _ensure_send_and_sync() { is_sync::>(Executor::new()); let ex = Executor::new(); + let state = ex.state(); is_send(ex.run(pending::<()>())); is_sync(ex.run(pending::<()>())); is_send(ex.tick()); is_sync(ex.tick()); - is_send(ex.schedule()); - is_sync(ex.schedule()); - is_static(ex.schedule()); + is_send(Executor::schedule(state)); + is_sync(Executor::schedule(state)); + is_static(Executor::schedule(state)); /// ```compile_fail /// use async_executor::LocalExecutor; diff --git a/src/static_executors.rs b/src/static_executors.rs index c43679d..1d3fcce 100644 --- a/src/static_executors.rs +++ b/src/static_executors.rs @@ -7,6 +7,7 @@ use std::{ future::Future, marker::PhantomData, panic::{RefUnwindSafe, UnwindSafe}, + sync::atomic::Ordering, }; impl Executor<'static> { @@ -34,11 +35,16 @@ impl Executor<'static> { /// future::block_on(ex.run(task)); /// ``` pub fn leak(self) -> &'static StaticExecutor { - let ptr = self.state_ptr(); - // SAFETY: So long as an Executor lives, it's state pointer will always be valid - // when accessed through state_ptr. This executor will live for the full 'static - // lifetime so this isn't an arbitrary lifetime extension. - let state: &'static State = unsafe { &*ptr }; + let ptr = self.state.load(Ordering::Relaxed); + + let state: &'static State = if ptr.is_null() { + Box::leak(Box::new(State::new())) + } else { + // SAFETY: So long as an Executor lives, it's state pointer will always be valid + // when accessed through state_ptr. This executor will live for the full 'static + // lifetime so this isn't an arbitrary lifetime extension. + unsafe { &*ptr } + }; std::mem::forget(self); @@ -84,11 +90,16 @@ impl LocalExecutor<'static> { /// future::block_on(ex.run(task)); /// ``` pub fn leak(self) -> &'static StaticLocalExecutor { - let ptr = self.inner.state_ptr(); - // SAFETY: So long as a LocalExecutor lives, it's state pointer will always be valid - // when accessed through state_ptr. This executor will live for the full 'static - // lifetime so this isn't an arbitrary lifetime extension. - let state: &'static State = unsafe { &*ptr }; + let ptr = self.inner.state.load(Ordering::Relaxed); + + let state: &'static State = if ptr.is_null() { + Box::leak(Box::new(State::new())) + } else { + // SAFETY: So long as an Executor lives, it's state pointer will always be valid + // when accessed through state_ptr. This executor will live for the full 'static + // lifetime so this isn't an arbitrary lifetime extension. + unsafe { &*ptr } + }; std::mem::forget(self);