Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
36 commits
Select commit Hold shift + click to select a range
e28860c
add macro `cfg_rt_and_time` and `cfg_rt_or_time`
ADD-SP Jul 19, 2025
f75f1d2
impl `wheel::Entry`
ADD-SP Jul 13, 2025
a1a70cf
remove global timer wheel
ADD-SP Jul 13, 2025
0e6fa26
add local timer wheel to the current_thread scheduler
ADD-SP Jul 13, 2025
3244477
add local timer wheel to the multi_thread scheduler
ADD-SP Jul 13, 2025
f0e41d9
remove `tokio/src/runtime/time/entry.rs`
ADD-SP Jul 13, 2025
6ec2e6a
impl `tokio/src/runtime/time/timer.rs`
ADD-SP Jul 13, 2025
65b89a6
adapt the impl of `tokio::time::sleep`
ADD-SP Jul 13, 2025
8bfa08c
adapt the impl of `tokio::time::interval`
ADD-SP Jul 13, 2025
28ccd34
let current_thread scheduler process timers
ADD-SP Jul 13, 2025
e00e79c
let multi_thread scheduler process timers
ADD-SP Jul 13, 2025
e4626ca
remove the `InsertError`
ADD-SP Jul 17, 2025
5a86bd2
fix `tokio-util/tests/time_delay_queue.rs`
ADD-SP Jul 19, 2025
2ece52e
fix unused import `wake_list::WakeList`
ADD-SP Jul 19, 2025
3071a26
re-enable loom tests for timers
ADD-SP Aug 2, 2025
7b376dd
add comment
mox692 Aug 3, 2025
9ed6f70
ci: remove a typo from `spellcheck.dic` (#7524)
ADD-SP Aug 10, 2025
df7b272
add macro `cfg_rt_and_time` and `cfg_rt_or_time`
ADD-SP Jul 19, 2025
fcdf392
impl `wheel::Entry`
ADD-SP Jul 13, 2025
179cce0
remove global timer wheel
ADD-SP Jul 13, 2025
7959551
add local timer wheel to the current_thread scheduler
ADD-SP Jul 13, 2025
79e6e18
add local timer wheel to the multi_thread scheduler
ADD-SP Jul 13, 2025
4fdc79e
remove `tokio/src/runtime/time/entry.rs`
ADD-SP Jul 13, 2025
4a66ce8
impl `tokio/src/runtime/time/timer.rs`
ADD-SP Jul 13, 2025
062e8fa
adapt the impl of `tokio::time::sleep`
ADD-SP Jul 13, 2025
e9503a4
adapt the impl of `tokio::time::interval`
ADD-SP Jul 13, 2025
3d2c0ac
let current_thread scheduler process timers
ADD-SP Jul 13, 2025
c4bad53
let multi_thread scheduler process timers
ADD-SP Jul 13, 2025
97fb0f9
remove the `InsertError`
ADD-SP Jul 17, 2025
a4615ee
fix `tokio-util/tests/time_delay_queue.rs`
ADD-SP Jul 19, 2025
34c59f9
fix unused import `wake_list::WakeList`
ADD-SP Jul 19, 2025
2b58b6f
re-enable loom tests for timers
ADD-SP Aug 2, 2025
91b9c91
fix race conditions while operating on `EntryHandle`
ADD-SP Aug 4, 2025
b7692de
cross-thread cancellation queue
ADD-SP Aug 10, 2025
1820a95
Merge branch 'add_sp/time-local-wheel' into add_sp/time-local-wheel-r…
mox692 Aug 11, 2025
1c7c477
add bench code
mox692 Aug 11, 2025
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .github/workflows/loom.yml
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ jobs:
toolchain: ${{ env.rust_stable }}
- uses: Swatinem/rust-cache@v2
- name: run tests
run: cargo test --lib --release --features full -- --nocapture runtime::time::tests
run: cargo test --lib --release --features full -- --nocapture runtime::time
working-directory: tokio

loom-current-thread:
Expand Down
5 changes: 5 additions & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -32,3 +32,8 @@ unexpected_cfgs = { level = "warn", check-cfg = [
'cfg(tokio_uring)',
'cfg(target_os, values("cygwin"))',
] }

[patch.crates-io]
tokio = { path = "tokio" }
tokio-util = { path = "tokio-util" }
tokio-stream = { path = "tokio-stream" }
7 changes: 7 additions & 0 deletions examples/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,9 @@ tokio = { version = "1.0.0", path = "../tokio", features = ["full", "tracing"] }
tokio-util = { version = "0.7.0", path = "../tokio-util", features = ["full"] }
tokio-stream = { version = "0.1", path = "../tokio-stream" }

axum = { version = "0.8" }
hyper = { version = "1", features = ["server", "http1", "http2"] }

tracing = "0.1"
tracing-subscriber = { version = "0.3.1", default-features = false, features = ["fmt", "ansi", "env-filter", "tracing-log"] }
bytes = "1.0.0"
Expand Down Expand Up @@ -99,5 +102,9 @@ path = "named-pipe-multi-client.rs"
name = "dump"
path = "dump.rs"

[[example]]
name = "timer_bench"
path = "timer_bench.rs"

[lints]
workspace = true
52 changes: 52 additions & 0 deletions examples/timer_bench.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
use std::future::Future;
use std::pin::Pin;
use std::task::{Context, Poll};

use axum::{routing::get, Router};
use tokio::task::JoinHandle;
use tokio::time::Duration;
use tokio_util::future::FutureExt;

#[tokio::main]
async fn main() {
let jh: JoinHandle<()> = tokio::spawn(async move {
let app: Router = Router::new().route("/", get(root));

let listener = tokio::net::TcpListener::bind("0.0.0.0:1111").await.unwrap();
axum::serve(listener, app).await.unwrap();
});
jh.await.unwrap();
}

async fn root() -> &'static str {
// Here, we would create 10 timers and drop them immediately,
// causing a lot of timer wheel accesses.
for _ in 0..10 {
let _ = ReadyOnSecondPoll::default()
.timeout(Duration::from_secs(1))
.await;
}
"Hello, World!"
}

#[derive(Debug, Default)]
// 3 implementations
struct ReadyOnSecondPoll {
counter: usize,
}

impl Future for ReadyOnSecondPoll {
type Output = ();

fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
let this: &mut ReadyOnSecondPoll = self.get_mut();
this.counter += 1;

if this.counter == 2 {
Poll::Ready(())
} else {
cx.waker().clone().wake();
Poll::Pending
}
}
}
35 changes: 35 additions & 0 deletions pr7467.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
My interests are:
- [ ] how often does wakeup happen through the core
* every `park` function call.
* also, now they consider the local wheel for wakeup time.
- [ ] how is "cross thread cancellation" implemented
* workerのpark近辺で追加の実装:
* ほかのスレッドとかから送られてきたcancelを処理する
* insert_inject_timersで, injection timer queeにあるやつをlocalに追加
* parkから起きた際に, process_expired_timers でlocalのwheelを探索
* キャンセルに関しては, ほかのthreadでtimerがdropされたら tx で timer生成元のwheelに通知するようになってるc
- [ ] ほかのthreadからのtimerのcancelは, 同期されるのか?
* 登録されたwheelをlockしにかからなくていいのか?
* -> 基本的には, sendするだけ.
- [ ] block_in_placeとコンパチか?
- [ ] main threadでsleepを呼んだ場合は?
* injection queueに追加されると, draftには書いてあるが,
* injectからtaskが別スレッドに移動した時もcancelがちゃんと電波される?
- [ ] deadlockとか怒らない?
* 2つのthreadで生成されたcancelが, swap stealされて, お互いを待つみたいな
- [ ] loadを分散させる必要ってない?
* 1つのworkerだけがやたらtimerをハンドルするみたいなことはないのか?


### bench

```bash
# build
$ cargo build --release --package examples --example timer_bench

# on another shell
$ wrk -c 4096 -t 256 --latency -d 1m http://0.0.0.0:1111

# bench
$ flamegraph -- /path/to/binary
```
5 changes: 3 additions & 2 deletions spellcheck.dic
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
307
308
&
+
<
Expand Down Expand Up @@ -99,6 +99,7 @@ destructors
destructure
Destructures
Dev
Dmitry
dns
DNS
DoS
Expand Down Expand Up @@ -193,7 +194,6 @@ ok
oneshot
ORed
os
overweighing
parker
parsers
peekable
Expand Down Expand Up @@ -300,6 +300,7 @@ versa
versioned
versioning
vtable
Vyukov's
waker
wakers
Wakers
Expand Down
88 changes: 3 additions & 85 deletions tokio-util/tests/time_delay_queue.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
#![cfg(feature = "full")]

use futures::StreamExt;
use tokio::time::{self, sleep, sleep_until, Duration, Instant};
use tokio::time::{self, sleep, Duration, Instant};
use tokio_test::{assert_pending, assert_ready, task};
use tokio_util::time::DelayQueue;

Expand Down Expand Up @@ -82,8 +82,6 @@ async fn single_short_delay() {

sleep(ms(5)).await;

assert!(queue.is_woken());

let entry = assert_ready_some!(poll!(queue));
assert_eq!(*entry.get_ref(), "foo");

Expand Down Expand Up @@ -221,7 +219,7 @@ async fn reset_much_later() {

sleep(ms(20)).await;

assert!(queue.is_woken());
assert_ready_some!(poll!(queue));
}

// Reproduces tokio-rs/tokio#849.
Expand All @@ -248,7 +246,7 @@ async fn reset_twice() {

sleep(ms(20)).await;

assert!(queue.is_woken());
assert_ready_some!(poll!(queue));
}

/// Regression test: Given an entry inserted with a deadline in the past, so
Expand Down Expand Up @@ -412,8 +410,6 @@ async fn expire_first_key_when_reset_to_expire_earlier() {

sleep(ms(100)).await;

assert!(queue.is_woken());

let entry = assert_ready_some!(poll!(queue)).into_inner();
assert_eq!(entry, "one");
}
Expand All @@ -435,8 +431,6 @@ async fn expire_second_key_when_reset_to_expire_earlier() {

sleep(ms(100)).await;

assert!(queue.is_woken());

let entry = assert_ready_some!(poll!(queue)).into_inner();
assert_eq!(entry, "two");
}
Expand All @@ -457,8 +451,6 @@ async fn reset_first_expiring_item_to_expire_later() {
queue.reset_at(&one, now + ms(300));
sleep(ms(250)).await;

assert!(queue.is_woken());

let entry = assert_ready_some!(poll!(queue)).into_inner();
assert_eq!(entry, "two");
}
Expand Down Expand Up @@ -522,43 +514,6 @@ async fn insert_after_ready_poll() {
assert_eq!("3", res[2]);
}

#[tokio::test]
async fn reset_later_after_slot_starts() {
time::pause();

let mut queue = task::spawn(DelayQueue::new());

let now = Instant::now();

let foo = queue.insert_at("foo", now + ms(100));

assert_pending!(poll!(queue));

sleep_until(now + Duration::from_millis(80)).await;

assert!(!queue.is_woken());

// At this point the queue hasn't been polled, so `elapsed` on the wheel
// for the queue is still at 0 and hence the 1ms resolution slots cover
// [0-64). Resetting the time on the entry to 120 causes it to get put in
// the [64-128) slot. As the queue knows that the first entry is within
// that slot, but doesn't know when, it must wake immediately to advance
// the wheel.
queue.reset_at(&foo, now + ms(120));
assert!(queue.is_woken());

assert_pending!(poll!(queue));

sleep_until(now + Duration::from_millis(119)).await;
assert!(!queue.is_woken());

sleep(ms(1)).await;
assert!(queue.is_woken());

let entry = assert_ready_some!(poll!(queue)).into_inner();
assert_eq!(entry, "foo");
}

#[tokio::test]
async fn reset_inserted_expired() {
time::pause();
Expand All @@ -584,43 +539,6 @@ async fn reset_inserted_expired() {
assert_eq!(queue.len(), 0);
}

#[tokio::test]
async fn reset_earlier_after_slot_starts() {
time::pause();

let mut queue = task::spawn(DelayQueue::new());

let now = Instant::now();

let foo = queue.insert_at("foo", now + ms(200));

assert_pending!(poll!(queue));

sleep_until(now + Duration::from_millis(80)).await;

assert!(!queue.is_woken());

// At this point the queue hasn't been polled, so `elapsed` on the wheel
// for the queue is still at 0 and hence the 1ms resolution slots cover
// [0-64). Resetting the time on the entry to 120 causes it to get put in
// the [64-128) slot. As the queue knows that the first entry is within
// that slot, but doesn't know when, it must wake immediately to advance
// the wheel.
queue.reset_at(&foo, now + ms(120));
assert!(queue.is_woken());

assert_pending!(poll!(queue));

sleep_until(now + Duration::from_millis(119)).await;
assert!(!queue.is_woken());

sleep(ms(1)).await;
assert!(queue.is_woken());

let entry = assert_ready_some!(poll!(queue)).into_inner();
assert_eq!(entry, "foo");
}

#[tokio::test]
async fn insert_in_past_after_poll_fires_immediately() {
time::pause();
Expand Down
24 changes: 24 additions & 0 deletions tokio/src/macros/cfg.rs
Original file line number Diff line number Diff line change
Expand Up @@ -704,3 +704,27 @@ macro_rules! cfg_tokio_uring {
)*
};
}

macro_rules! cfg_rt_and_time{
($($item:item)*) => {
$(
#[cfg(all(
feature = "rt",
feature = "time",
))]
$item
)*
};
}

macro_rules! cfg_rt_or_time{
($($item:item)*) => {
$(
#[cfg(any(
feature = "rt",
feature = "time",
))]
$item
)*
};
}
7 changes: 7 additions & 0 deletions tokio/src/runtime/driver.rs
Original file line number Diff line number Diff line change
Expand Up @@ -113,6 +113,13 @@ impl Handle {
.expect("A Tokio 1.x context was found, but timers are disabled. Call `enable_time` on the runtime builder to enable timers.")
}

pub(crate) fn with_time<F, R>(&self, f: F) -> R
where
F: FnOnce(Option<&crate::runtime::time::Handle>) -> R,
{
f(self.time.as_ref())
}

pub(crate) fn clock(&self) -> &Clock {
&self.clock
}
Expand Down
Loading
Loading