Skip to content

rt: add support for non-send closures for thread (un)parking #7420

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 1 commit into
base: master
Choose a base branch
from

Conversation

aryan9600
Copy link

Add support for non Send+Sync closures for thread parking and unparking callbacks when using a LocalRuntime. Since a LocalRuntime will always run its tasks on the same thread, its safe to accept a non Send+Sync closure.

Motivation

Since LocalRuntime can run non Send+Sync functions, we should accept thread parking/unparking callbacks that do not implement Send and Sync.

Solution

Add two methods on LocalOptions, on_thread_park and on_thread_unpark that accept a Fn() + 'static. These callbacks are then converted into Fn() + Send + Sync + 'static. This requires unsafe code, but unsafe is acceptable here because of the behaviour of LocalRuntime

fixes #7370

@aryan9600 aryan9600 force-pushed the non-send-thread-park branch from be90205 to eeb45d3 Compare June 25, 2025 11:16
// This is specifically used for executing callbacks when using a `LocalRuntime`, since the
// callbacks will never be sent across threads and thus do not need to be Send or Sync.
struct UnsafeSendSync<T>(T);
unsafe impl<T> Send for UnsafeSendSync<T> {}
Copy link
Contributor

@Daksh14 Daksh14 Jun 25, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

shouldn't this be unsafe impl<T: Send> Send for UnsafeSendSync<T> {}
and unsafe impl<T: Send + Sync> Sync for UnsafeSendSync<T> {}

Any unsafe usage should be documented with // SAFETY: at least

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the whole point is that T is not Send or Sync.

sorry, my bad, will update the comments to begin with // SAFETY:

@aryan9600 aryan9600 force-pushed the non-send-thread-park branch from eeb45d3 to c124e08 Compare June 25, 2025 17:21
Add support for non `Send`+`Sync` closures for thread parking and unparking
callbacks when using a `LocalRuntime`. Since a `LocalRuntime` will always
run its tasks on the same thread, its safe to accept a non `Send`+`Sync`
closure.

Signed-off-by: Sanskar Jaiswal <[email protected]>
@aryan9600 aryan9600 force-pushed the non-send-thread-park branch from c124e08 to 4f8b562 Compare June 26, 2025 05:34
@mox692 mox692 added A-tokio Area: The main tokio crate M-runtime Module: tokio/runtime labels Jun 27, 2025
@ADD-SP ADD-SP added the C-enhancement Category: A PR with an enhancement or bugfix. label Jun 29, 2025
Comment on lines +16 to +17
// use super::LocalOptions;

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
// use super::LocalOptions;

Outdated lines?

/// This can be used to start work only when the executor is idle, or for bookkeeping
/// and monitoring purposes.
///
/// This differs from the `Builder::on_thread_park` method in that it accepts a non Send + Sync
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
/// This differs from the `Builder::on_thread_park` method in that it accepts a non Send + Sync
/// This differs from the [`Builder::on_thread_park`] method in that it accepts a non Send + Sync

#[non_exhaustive]
#[allow(missing_debug_implementations)]
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I guess this is because we cannot #[derive(Debug)] for Fn. We could try to impl Debug manually, so that we don't need this clippy exception.

Comment on lines +23 to +24
/// Callback for a worker parking itself
pub(crate) before_park: Option<Callback>,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
/// Callback for a worker parking itself
pub(crate) before_park: Option<Callback>,
/// To run before the local runtime is parked.
pub(crate) before_park: Option<Callback>,

This is to align with

/// To run before each worker thread is parked.
pub(super) before_park: Option<Callback>,

Comment on lines +26 to +27
/// Callback for a worker unparking itself
pub(crate) after_unpark: Option<Callback>,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
/// Callback for a worker unparking itself
pub(crate) after_unpark: Option<Callback>,
/// To run before the local runtime is spawned.
pub(crate) after_unpark: Option<Callback>,

Comment on lines +46 to +68
/// ```
/// # use tokio::runtime::{Builder, LocalOptions};
/// # pub fn main() {
/// let (tx, rx) = std::sync::mpsc::channel();
/// let mut opts = LocalOptions::default();
/// opts.on_thread_park(move || match rx.recv() {
/// Ok(x) => println!("Received from channel: {}", x),
/// Err(e) => println!("Error receiving from channel: {}", e),
/// });
///
/// let runtime = Builder::new_current_thread()
/// .enable_time()
/// .build_local(opts)
/// .unwrap();
///
/// runtime.block_on(async {
/// tokio::task::spawn_local(async move {
/// tx.send(42).unwrap();
/// });
/// tokio::time::sleep(std::time::Duration::from_millis(1)).await;
/// })
/// }
/// ```
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This might not be rendered correctly.

Comment on lines +91 to +113
/// ```
/// # use tokio::runtime::{Builder, LocalOptions};
/// # pub fn main() {
/// let (tx, rx) = std::sync::mpsc::channel();
/// let mut opts = LocalOptions::default();
/// opts.on_thread_unpark(move || match rx.recv() {
/// Ok(x) => println!("Received from channel: {}", x),
/// Err(e) => println!("Error receiving from channel: {}", e),
/// });
///
/// let runtime = Builder::new_current_thread()
/// .enable_time()
/// .build_local(opts)
/// .unwrap();
///
/// runtime.block_on(async {
/// tokio::task::spawn_local(async move {
/// tx.send(42).unwrap();
/// });
/// tokio::time::sleep(std::time::Duration::from_millis(1)).await;
/// })
/// }
/// ```
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This might not be rendered correctly.

self
}

/// Executes function `f` just after a thread unparks (starts executing tasks).
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
/// Executes function `f` just after a thread unparks (starts executing tasks).
/// Executes function `f` just after the local runtime unparks (starts executing tasks).

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We need a test to make sure callbacks will not be executed when using the Handle::block_on.

#[test]
fn test_on_thread_park_in_runtime() {
let mut opts = LocalOptions::default();
let called = std::rc::Rc::new(std::cell::RefCell::new(false));
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
let called = std::rc::Rc::new(std::cell::RefCell::new(false));
// this makes the `on_thread_park` callback `!Send + !Sync`
let called = std::rc::Rc::new(std::cell::RefCell::new(false));

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We also want to cover the on_thread_unpark callback.

@ADD-SP ADD-SP added the R-loom-current-thread Run loom current-thread tests on this PR label Jun 29, 2025
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
A-tokio Area: The main tokio crate C-enhancement Category: A PR with an enhancement or bugfix. M-runtime Module: tokio/runtime R-loom-current-thread Run loom current-thread tests on this PR
Projects
None yet
Development

Successfully merging this pull request may close these issues.

Non-Send version of callbacks such as on_thread_park
4 participants