Skip to content

Commit 4f8b562

Browse files
committed
rt: add support for non-send closures for thread (un)parking
Add support for non `Send`+`Sync` closures for thread parking and unparking callbacks when using a `LocalRuntime`. Since a `LocalRuntime` will always run its tasks on the same thread, its safe to accept a non `Send`+`Sync` closure. Signed-off-by: Sanskar Jaiswal <[email protected]>
1 parent 3636fd0 commit 4f8b562

File tree

3 files changed

+172
-14
lines changed

3 files changed

+172
-14
lines changed

tokio/src/runtime/builder.rs

Lines changed: 9 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,8 @@ use std::io;
1313
use std::thread::ThreadId;
1414
use std::time::Duration;
1515

16+
// use super::LocalOptions;
17+
1618
/// Builds Tokio Runtime with custom configuration values.
1719
///
1820
/// Methods can be chained in order to set the configuration values. The
@@ -923,7 +925,7 @@ impl Builder {
923925
#[cfg_attr(docsrs, doc(cfg(tokio_unstable)))]
924926
pub fn build_local(&mut self, options: LocalOptions) -> io::Result<LocalRuntime> {
925927
match &self.kind {
926-
Kind::CurrentThread => self.build_current_thread_local_runtime(),
928+
Kind::CurrentThread => self.build_current_thread_local_runtime(options),
927929
#[cfg(feature = "rt-multi-thread")]
928930
Kind::MultiThread => panic!("multi_thread is not supported for LocalRuntime"),
929931
}
@@ -1435,11 +1437,16 @@ impl Builder {
14351437
}
14361438

14371439
#[cfg(tokio_unstable)]
1438-
fn build_current_thread_local_runtime(&mut self) -> io::Result<LocalRuntime> {
1440+
fn build_current_thread_local_runtime(
1441+
&mut self,
1442+
opts: LocalOptions,
1443+
) -> io::Result<LocalRuntime> {
14391444
use crate::runtime::local_runtime::LocalRuntimeScheduler;
14401445

14411446
let tid = std::thread::current().id();
14421447

1448+
self.before_park = opts.before_park;
1449+
self.after_unpark = opts.after_unpark;
14431450
let (scheduler, handle, blocking_pool) =
14441451
self.build_current_thread_runtime_components(Some(tid))?;
14451452

Lines changed: 130 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,18 +1,144 @@
11
use std::marker::PhantomData;
22

3+
use crate::runtime::Callback;
4+
35
/// [`LocalRuntime`]-only config options
46
///
5-
/// Currently, there are no such options, but in the future, things like `!Send + !Sync` hooks may
6-
/// be added.
7-
///
87
/// Use `LocalOptions::default()` to create the default set of options. This type is used with
98
/// [`Builder::build_local`].
109
///
10+
/// When using [`Builder::build_local`], this overrides any pre-configured options set on the
11+
/// [`Builder`].
12+
///
1113
/// [`Builder::build_local`]: crate::runtime::Builder::build_local
1214
/// [`LocalRuntime`]: crate::runtime::LocalRuntime
13-
#[derive(Default, Debug)]
15+
/// [`Builder`]: crate::runtime::Builder
16+
#[derive(Default)]
1417
#[non_exhaustive]
18+
#[allow(missing_debug_implementations)]
1519
pub struct LocalOptions {
1620
/// Marker used to make this !Send and !Sync.
1721
_phantom: PhantomData<*mut u8>,
22+
23+
/// Callback for a worker parking itself
24+
pub(crate) before_park: Option<Callback>,
25+
26+
/// Callback for a worker unparking itself
27+
pub(crate) after_unpark: Option<Callback>,
28+
}
29+
30+
impl LocalOptions {
31+
/// Executes function `f` just before a thread is parked (goes idle).
32+
/// `f` is called within the Tokio context, so functions like [`tokio::spawn`](crate::spawn)
33+
/// can be called, and may result in this thread being unparked immediately.
34+
///
35+
/// This can be used to start work only when the executor is idle, or for bookkeeping
36+
/// and monitoring purposes.
37+
///
38+
/// This differs from the `Builder::on_thread_park` method in that it accepts a non Send + Sync
39+
/// closure.
40+
///
41+
/// Note: There can only be one park callback for a runtime; calling this function
42+
/// more than once replaces the last callback defined, rather than adding to it.
43+
///
44+
/// # Examples
45+
///
46+
/// ```
47+
/// # use tokio::runtime::{Builder, LocalOptions};
48+
/// # pub fn main() {
49+
/// let (tx, rx) = std::sync::mpsc::channel();
50+
/// let mut opts = LocalOptions::default();
51+
/// opts.on_thread_park(move || match rx.recv() {
52+
/// Ok(x) => println!("Received from channel: {}", x),
53+
/// Err(e) => println!("Error receiving from channel: {}", e),
54+
/// });
55+
///
56+
/// let runtime = Builder::new_current_thread()
57+
/// .enable_time()
58+
/// .build_local(opts)
59+
/// .unwrap();
60+
///
61+
/// runtime.block_on(async {
62+
/// tokio::task::spawn_local(async move {
63+
/// tx.send(42).unwrap();
64+
/// });
65+
/// tokio::time::sleep(std::time::Duration::from_millis(1)).await;
66+
/// })
67+
/// }
68+
/// ```
69+
pub fn on_thread_park<F>(&mut self, f: F) -> &mut Self
70+
where
71+
F: Fn() + 'static,
72+
{
73+
self.before_park = Some(std::sync::Arc::new(to_send_sync(f)));
74+
self
75+
}
76+
77+
/// Executes function `f` just after a thread unparks (starts executing tasks).
78+
///
79+
/// This is intended for bookkeeping and monitoring use cases; note that work
80+
/// in this callback will increase latencies when the application has allowed one or
81+
/// more runtime threads to go idle.
82+
///
83+
/// This differs from the `Builder::on_thread_unpark` method in that it accepts a non Send + Sync
84+
/// closure.
85+
///
86+
/// Note: There can only be one unpark callback for a runtime; calling this function
87+
/// more than once replaces the last callback defined, rather than adding to it.
88+
///
89+
/// # Examples
90+
///
91+
/// ```
92+
/// # use tokio::runtime::{Builder, LocalOptions};
93+
/// # pub fn main() {
94+
/// let (tx, rx) = std::sync::mpsc::channel();
95+
/// let mut opts = LocalOptions::default();
96+
/// opts.on_thread_unpark(move || match rx.recv() {
97+
/// Ok(x) => println!("Received from channel: {}", x),
98+
/// Err(e) => println!("Error receiving from channel: {}", e),
99+
/// });
100+
///
101+
/// let runtime = Builder::new_current_thread()
102+
/// .enable_time()
103+
/// .build_local(opts)
104+
/// .unwrap();
105+
///
106+
/// runtime.block_on(async {
107+
/// tokio::task::spawn_local(async move {
108+
/// tx.send(42).unwrap();
109+
/// });
110+
/// tokio::time::sleep(std::time::Duration::from_millis(1)).await;
111+
/// })
112+
/// }
113+
/// ```
114+
pub fn on_thread_unpark<F>(&mut self, f: F) -> &mut Self
115+
where
116+
F: Fn() + 'static,
117+
{
118+
self.after_unpark = Some(std::sync::Arc::new(to_send_sync(f)));
119+
self
120+
}
121+
}
122+
123+
// A wrapper type to allow non-Send + Sync closures to be used in a Send + Sync context.
124+
// This is specifically used for executing callbacks when using a `LocalRuntime`.
125+
struct UnsafeSendSync<T>(T);
126+
127+
// SAFETY: This type is only used in a context where it is guaranteed that the closure will not be
128+
// sent across threads.
129+
unsafe impl<T> Send for UnsafeSendSync<T> {}
130+
unsafe impl<T> Sync for UnsafeSendSync<T> {}
131+
132+
impl<T: Fn()> UnsafeSendSync<T> {
133+
fn call(&self) {
134+
(self.0)()
135+
}
136+
}
137+
138+
fn to_send_sync<F>(f: F) -> impl Fn() + Send + Sync
139+
where
140+
F: Fn(),
141+
{
142+
let f = UnsafeSendSync(f);
143+
move || f.call()
18144
}

tokio/tests/rt_local.rs

Lines changed: 33 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,7 @@ use tokio::task::spawn_local;
66

77
#[test]
88
fn test_spawn_local_in_runtime() {
9-
let rt = rt();
9+
let rt = rt(LocalOptions::default());
1010

1111
let res = rt.block_on(async move {
1212
let (tx, rx) = tokio::sync::oneshot::channel();
@@ -22,9 +22,34 @@ fn test_spawn_local_in_runtime() {
2222
assert_eq!(res, 5);
2323
}
2424

25+
#[test]
26+
fn test_on_thread_park_in_runtime() {
27+
let mut opts = LocalOptions::default();
28+
let called = std::rc::Rc::new(std::cell::RefCell::new(false));
29+
let cc = called.clone();
30+
opts.on_thread_park(move || {
31+
*cc.borrow_mut() = true;
32+
});
33+
let rt = rt(opts);
34+
35+
rt.block_on(async move {
36+
let (tx, rx) = tokio::sync::oneshot::channel();
37+
38+
spawn_local(async {
39+
tokio::task::yield_now().await;
40+
tx.send(5).unwrap();
41+
});
42+
43+
// this is not really required execpt to ensure on_thread_park is called
44+
rx.await.unwrap()
45+
});
46+
47+
assert!(*called.borrow());
48+
}
49+
2550
#[test]
2651
fn test_spawn_from_handle() {
27-
let rt = rt();
52+
let rt = rt(LocalOptions::default());
2853

2954
let (tx, rx) = tokio::sync::oneshot::channel();
3055

@@ -40,7 +65,7 @@ fn test_spawn_from_handle() {
4065

4166
#[test]
4267
fn test_spawn_local_on_runtime_object() {
43-
let rt = rt();
68+
let rt = rt(LocalOptions::default());
4469

4570
let (tx, rx) = tokio::sync::oneshot::channel();
4671

@@ -56,7 +81,7 @@ fn test_spawn_local_on_runtime_object() {
5681

5782
#[test]
5883
fn test_spawn_local_from_guard() {
59-
let rt = rt();
84+
let rt = rt(LocalOptions::default());
6085

6186
let (tx, rx) = tokio::sync::oneshot::channel();
6287

@@ -78,7 +103,7 @@ fn test_spawn_from_guard_other_thread() {
78103
let (tx, rx) = std::sync::mpsc::channel();
79104

80105
std::thread::spawn(move || {
81-
let rt = rt();
106+
let rt = rt(LocalOptions::default());
82107
let handle = rt.handle().clone();
83108

84109
tx.send(handle).unwrap();
@@ -98,7 +123,7 @@ fn test_spawn_local_from_guard_other_thread() {
98123
let (tx, rx) = std::sync::mpsc::channel();
99124

100125
std::thread::spawn(move || {
101-
let rt = rt();
126+
let rt = rt(LocalOptions::default());
102127
let handle = rt.handle().clone();
103128

104129
tx.send(handle).unwrap();
@@ -111,9 +136,9 @@ fn test_spawn_local_from_guard_other_thread() {
111136
spawn_local(async {});
112137
}
113138

114-
fn rt() -> tokio::runtime::LocalRuntime {
139+
fn rt(opts: LocalOptions) -> tokio::runtime::LocalRuntime {
115140
tokio::runtime::Builder::new_current_thread()
116141
.enable_all()
117-
.build_local(LocalOptions::default())
142+
.build_local(opts)
118143
.unwrap()
119144
}

0 commit comments

Comments
 (0)