Skip to content

Commit 3e890cc

Browse files
authored
rt(unstable): add spawn Location to TaskMeta (#7417)
As described in issue #7411, task spawning APIs are currently annotated with `#[track_caller]`, allowing us to capture the location in the user source code where the task was spawned. This is used for `tracing` events used by `tokio-console` and friends. However, this information is *not* exposed to the runtime `on_task_spawn`, `on_before_task_poll`, `on_after_task_poll`, and `on_task_terminate` hooks, which is a shame, as it would be useful there as well. This branch adds the task's spawn location to the `TaskMeta` struct provided to the runtime's task hooks. This is implemented by storing a `&'static Location<'static>` in the task's `Core` alongside the `task::Id`. In [this comment][1], @ADD-SP suggested storing the `Location` in the task's `Trailer`. I opted to store it in the `Core` instead, as the `Trailer` is intended to store "cold" data that is only accessed when the task _completes_, and not on every poll. Since the task meta is passed to the `on_before_task_poll` and `on_after_task_poll` hooks, we would be accessing the `Trailer` on polls if we stored the `Location` there. Therefore, I put it in the `Core`, instead, which contains data that we access every time the task is polled. Closes #7411 [1]: #7411 (comment)
1 parent 69290a6 commit 3e890cc

File tree

15 files changed

+437
-48
lines changed

15 files changed

+437
-48
lines changed

tokio/src/runtime/blocking/pool.rs

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -379,7 +379,12 @@ impl Spawner {
379379
let fut =
380380
blocking_task::<F, BlockingTask<F>>(BlockingTask::new(func), spawn_meta, id.as_u64());
381381

382-
let (task, handle) = task::unowned(fut, BlockingSchedule::new(rt), id);
382+
let (task, handle) = task::unowned(
383+
fut,
384+
BlockingSchedule::new(rt),
385+
id,
386+
task::SpawnLocation::capture(),
387+
);
383388

384389
let spawned = self.spawn_task(Task::new(task, is_mandatory), rt);
385390
(handle, spawned)

tokio/src/runtime/scheduler/current_thread/mod.rs

Lines changed: 18 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@ use crate::util::{waker_ref, RngSeedGenerator, Wake, WakerRef};
1515
use std::cell::RefCell;
1616
use std::collections::VecDeque;
1717
use std::future::{poll_fn, Future};
18+
use std::panic::Location;
1819
use std::sync::atomic::Ordering::{AcqRel, Release};
1920
use std::task::Poll::{Pending, Ready};
2021
use std::task::Waker;
@@ -445,6 +446,7 @@ impl Context {
445446

446447
impl Handle {
447448
/// Spawns a future onto the `CurrentThread` scheduler
449+
#[track_caller]
448450
pub(crate) fn spawn<F>(
449451
me: &Arc<Self>,
450452
future: F,
@@ -454,10 +456,15 @@ impl Handle {
454456
F: crate::future::Future + Send + 'static,
455457
F::Output: Send + 'static,
456458
{
457-
let (handle, notified) = me.shared.owned.bind(future, me.clone(), id);
459+
let spawned_at = Location::caller();
460+
let (handle, notified) = me
461+
.shared
462+
.owned
463+
.bind(future, me.clone(), id, spawned_at.into());
458464

459465
me.task_hooks.spawn(&TaskMeta {
460466
id,
467+
spawned_at,
461468
_phantom: Default::default(),
462469
});
463470

@@ -474,6 +481,7 @@ impl Handle {
474481
/// This should only be used when this is a `LocalRuntime` or in another case where the runtime
475482
/// provably cannot be driven from or moved to different threads from the one on which the task
476483
/// is spawned.
484+
#[track_caller]
477485
pub(crate) unsafe fn spawn_local<F>(
478486
me: &Arc<Self>,
479487
future: F,
@@ -483,10 +491,15 @@ impl Handle {
483491
F: crate::future::Future + 'static,
484492
F::Output: 'static,
485493
{
486-
let (handle, notified) = me.shared.owned.bind_local(future, me.clone(), id);
494+
let spawned_at = Location::caller();
495+
let (handle, notified) =
496+
me.shared
497+
.owned
498+
.bind_local(future, me.clone(), id, spawned_at.into());
487499

488500
me.task_hooks.spawn(&TaskMeta {
489501
id,
502+
spawned_at,
490503
_phantom: Default::default(),
491504
});
492505

@@ -771,16 +784,16 @@ impl CoreGuard<'_> {
771784
let task = context.handle.shared.owned.assert_owner(task);
772785

773786
#[cfg(tokio_unstable)]
774-
let task_id = task.task_id();
787+
let task_meta = task.task_meta();
775788

776789
let (c, ()) = context.run_task(core, || {
777790
#[cfg(tokio_unstable)]
778-
context.handle.task_hooks.poll_start_callback(task_id);
791+
context.handle.task_hooks.poll_start_callback(&task_meta);
779792

780793
task.run();
781794

782795
#[cfg(tokio_unstable)]
783-
context.handle.task_hooks.poll_stop_callback(task_id);
796+
context.handle.task_hooks.poll_stop_callback(&task_meta);
784797
});
785798

786799
core = c;

tokio/src/runtime/scheduler/mod.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -117,6 +117,7 @@ cfg_rt! {
117117
}
118118
}
119119

120+
#[track_caller]
120121
pub(crate) fn spawn<F>(&self, future: F, id: Id) -> JoinHandle<F::Output>
121122
where
122123
F: Future + Send + 'static,
@@ -136,6 +137,7 @@ cfg_rt! {
136137
/// This should only be called in `LocalRuntime` if the runtime has been verified to be owned
137138
/// by the current thread.
138139
#[allow(irrefutable_let_patterns)]
140+
#[track_caller]
139141
pub(crate) unsafe fn spawn_local<F>(&self, future: F, id: Id) -> JoinHandle<F::Output>
140142
where
141143
F: Future + 'static,

tokio/src/runtime/scheduler/multi_thread/handle.rs

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@ use crate::runtime::{
1010
use crate::util::RngSeedGenerator;
1111

1212
use std::fmt;
13+
use std::panic::Location;
1314

1415
mod metrics;
1516

@@ -37,6 +38,7 @@ pub(crate) struct Handle {
3738

3839
impl Handle {
3940
/// Spawns a future onto the thread pool
41+
#[track_caller]
4042
pub(crate) fn spawn<F>(me: &Arc<Self>, future: F, id: task::Id) -> JoinHandle<F::Output>
4143
where
4244
F: crate::future::Future + Send + 'static,
@@ -49,15 +51,21 @@ impl Handle {
4951
self.close();
5052
}
5153

54+
#[track_caller]
5255
pub(super) fn bind_new_task<T>(me: &Arc<Self>, future: T, id: task::Id) -> JoinHandle<T::Output>
5356
where
5457
T: Future + Send + 'static,
5558
T::Output: Send + 'static,
5659
{
57-
let (handle, notified) = me.shared.owned.bind(future, me.clone(), id);
60+
let spawned_at = Location::caller();
61+
let (handle, notified) = me
62+
.shared
63+
.owned
64+
.bind(future, me.clone(), id, spawned_at.into());
5865

5966
me.task_hooks.spawn(&TaskMeta {
6067
id,
68+
spawned_at,
6169
_phantom: Default::default(),
6270
});
6371

tokio/src/runtime/scheduler/multi_thread/worker.rs

Lines changed: 12 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -568,7 +568,7 @@ impl Context {
568568

569569
fn run_task(&self, task: Notified, mut core: Box<Core>) -> RunResult {
570570
#[cfg(tokio_unstable)]
571-
let task_id = task.task_id();
571+
let task_meta = task.task_meta();
572572

573573
let task = self.worker.handle.shared.owned.assert_owner(task);
574574

@@ -592,12 +592,15 @@ impl Context {
592592
// Unlike the poll time above, poll start callback is attached to the task id,
593593
// so it is tightly associated with the actual poll invocation.
594594
#[cfg(tokio_unstable)]
595-
self.worker.handle.task_hooks.poll_start_callback(task_id);
595+
self.worker
596+
.handle
597+
.task_hooks
598+
.poll_start_callback(&task_meta);
596599

597600
task.run();
598601

599602
#[cfg(tokio_unstable)]
600-
self.worker.handle.task_hooks.poll_stop_callback(task_id);
603+
self.worker.handle.task_hooks.poll_stop_callback(&task_meta);
601604

602605
let mut lifo_polls = 0;
603606

@@ -663,15 +666,18 @@ impl Context {
663666
let task = self.worker.handle.shared.owned.assert_owner(task);
664667

665668
#[cfg(tokio_unstable)]
666-
let task_id = task.task_id();
669+
let task_meta = task.task_meta();
667670

668671
#[cfg(tokio_unstable)]
669-
self.worker.handle.task_hooks.poll_start_callback(task_id);
672+
self.worker
673+
.handle
674+
.task_hooks
675+
.poll_start_callback(&task_meta);
670676

671677
task.run();
672678

673679
#[cfg(tokio_unstable)]
674-
self.worker.handle.task_hooks.poll_stop_callback(task_id);
680+
self.worker.handle.task_hooks.poll_stop_callback(&task_meta);
675681
}
676682
})
677683
}

tokio/src/runtime/task/core.rs

Lines changed: 67 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,8 @@ use crate::runtime::task::{Id, Schedule, TaskHarnessScheduleHooks};
1818
use crate::util::linked_list;
1919

2020
use std::num::NonZeroU64;
21+
#[cfg(tokio_unstable)]
22+
use std::panic::Location;
2123
use std::pin::Pin;
2224
use std::ptr::NonNull;
2325
use std::task::{Context, Poll, Waker};
@@ -141,6 +143,13 @@ pub(super) struct Core<T: Future, S> {
141143
/// The task's ID, used for populating `JoinError`s.
142144
pub(super) task_id: Id,
143145

146+
/// The source code location where the task was spawned.
147+
///
148+
/// This is used for populating the `TaskMeta` passed to the task runtime
149+
/// hooks.
150+
#[cfg(tokio_unstable)]
151+
pub(super) spawned_at: &'static Location<'static>,
152+
144153
/// Either the future or the output.
145154
pub(super) stage: CoreStage<T>,
146155
}
@@ -208,7 +217,13 @@ pub(super) enum Stage<T: Future> {
208217
impl<T: Future, S: Schedule> Cell<T, S> {
209218
/// Allocates a new task cell, containing the header, trailer, and core
210219
/// structures.
211-
pub(super) fn new(future: T, scheduler: S, state: State, task_id: Id) -> Box<Cell<T, S>> {
220+
pub(super) fn new(
221+
future: T,
222+
scheduler: S,
223+
state: State,
224+
task_id: Id,
225+
#[cfg(tokio_unstable)] spawned_at: &'static Location<'static>,
226+
) -> Box<Cell<T, S>> {
212227
// Separated into a non-generic function to reduce LLVM codegen
213228
fn new_header(
214229
state: State,
@@ -242,13 +257,21 @@ impl<T: Future, S: Schedule> Cell<T, S> {
242257
stage: UnsafeCell::new(Stage::Running(future)),
243258
},
244259
task_id,
260+
#[cfg(tokio_unstable)]
261+
spawned_at,
245262
},
246263
});
247264

248265
#[cfg(debug_assertions)]
249266
{
250267
// Using a separate function for this code avoids instantiating it separately for every `T`.
251-
unsafe fn check<S>(header: &Header, trailer: &Trailer, scheduler: &S, task_id: &Id) {
268+
unsafe fn check<S>(
269+
header: &Header,
270+
trailer: &Trailer,
271+
scheduler: &S,
272+
task_id: &Id,
273+
#[cfg(tokio_unstable)] spawn_location: &&'static Location<'static>,
274+
) {
252275
let trailer_addr = trailer as *const Trailer as usize;
253276
let trailer_ptr = unsafe { Header::get_trailer(NonNull::from(header)) };
254277
assert_eq!(trailer_addr, trailer_ptr.as_ptr() as usize);
@@ -260,13 +283,24 @@ impl<T: Future, S: Schedule> Cell<T, S> {
260283
let id_addr = task_id as *const Id as usize;
261284
let id_ptr = unsafe { Header::get_id_ptr(NonNull::from(header)) };
262285
assert_eq!(id_addr, id_ptr.as_ptr() as usize);
286+
287+
#[cfg(tokio_unstable)]
288+
{
289+
let spawn_location_addr =
290+
spawn_location as *const &'static Location<'static> as usize;
291+
let spawn_location_ptr =
292+
unsafe { Header::get_spawn_location_ptr(NonNull::from(header)) };
293+
assert_eq!(spawn_location_addr, spawn_location_ptr.as_ptr() as usize);
294+
}
263295
}
264296
unsafe {
265297
check(
266298
&result.header,
267299
&result.trailer,
268300
&result.core.scheduler,
269301
&result.core.task_id,
302+
#[cfg(tokio_unstable)]
303+
&result.core.spawned_at,
270304
);
271305
}
272306
}
@@ -450,6 +484,37 @@ impl Header {
450484
*ptr
451485
}
452486

487+
/// Gets a pointer to the source code location where the task containing
488+
/// this `Header` was spawned.
489+
///
490+
/// # Safety
491+
///
492+
/// The provided raw pointer must point at the header of a task.
493+
#[cfg(tokio_unstable)]
494+
pub(super) unsafe fn get_spawn_location_ptr(
495+
me: NonNull<Header>,
496+
) -> NonNull<&'static Location<'static>> {
497+
let offset = me.as_ref().vtable.spawn_location_offset;
498+
let spawned_at = me
499+
.as_ptr()
500+
.cast::<u8>()
501+
.add(offset)
502+
.cast::<&'static Location<'static>>();
503+
NonNull::new_unchecked(spawned_at)
504+
}
505+
506+
/// Gets the source code location where the task containing
507+
/// this `Header` was spawned
508+
///
509+
/// # Safety
510+
///
511+
/// The provided raw pointer must point at the header of a task.
512+
#[cfg(tokio_unstable)]
513+
pub(super) unsafe fn get_spawn_location(me: NonNull<Header>) -> &'static Location<'static> {
514+
let ptr = Header::get_spawn_location_ptr(me).as_ptr();
515+
*ptr
516+
}
517+
453518
/// Gets the tracing id of the task containing this `Header`.
454519
///
455520
/// # Safety

tokio/src/runtime/task/harness.rs

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@ use crate::runtime::task::state::{Snapshot, State};
44
use crate::runtime::task::waker::waker_ref;
55
use crate::runtime::task::{Id, JoinError, Notified, RawTask, Schedule, Task};
66

7+
#[cfg(tokio_unstable)]
78
use crate::runtime::TaskMeta;
89
use std::any::Any;
910
use std::mem;
@@ -367,10 +368,12 @@ where
367368
//
368369
// We call this in a separate block so that it runs after the task appears to have
369370
// completed and will still run if the destructor panics.
371+
#[cfg(tokio_unstable)]
370372
if let Some(f) = self.trailer().hooks.task_terminate_callback.as_ref() {
371373
let _ = panic::catch_unwind(panic::AssertUnwindSafe(|| {
372374
f(&TaskMeta {
373375
id: self.core().task_id,
376+
spawned_at: self.core().spawned_at,
374377
_phantom: Default::default(),
375378
})
376379
}));

tokio/src/runtime/task/list.rs

Lines changed: 7 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,7 @@
88
99
use crate::future::Future;
1010
use crate::loom::cell::UnsafeCell;
11-
use crate::runtime::task::{JoinHandle, LocalNotified, Notified, Schedule, Task};
11+
use crate::runtime::task::{JoinHandle, LocalNotified, Notified, Schedule, SpawnLocation, Task};
1212
use crate::util::linked_list::{Link, LinkedList};
1313
use crate::util::sharded_list;
1414

@@ -91,13 +91,14 @@ impl<S: 'static> OwnedTasks<S> {
9191
task: T,
9292
scheduler: S,
9393
id: super::Id,
94+
spawned_at: SpawnLocation,
9495
) -> (JoinHandle<T::Output>, Option<Notified<S>>)
9596
where
9697
S: Schedule,
9798
T: Future + Send + 'static,
9899
T::Output: Send + 'static,
99100
{
100-
let (task, notified, join) = super::new_task(task, scheduler, id);
101+
let (task, notified, join) = super::new_task(task, scheduler, id, spawned_at);
101102
let notified = unsafe { self.bind_inner(task, notified) };
102103
(join, notified)
103104
}
@@ -111,13 +112,14 @@ impl<S: 'static> OwnedTasks<S> {
111112
task: T,
112113
scheduler: S,
113114
id: super::Id,
115+
spawned_at: SpawnLocation,
114116
) -> (JoinHandle<T::Output>, Option<Notified<S>>)
115117
where
116118
S: Schedule,
117119
T: Future + 'static,
118120
T::Output: 'static,
119121
{
120-
let (task, notified, join) = super::new_task(task, scheduler, id);
122+
let (task, notified, join) = super::new_task(task, scheduler, id, spawned_at);
121123
let notified = unsafe { self.bind_inner(task, notified) };
122124
(join, notified)
123125
}
@@ -258,13 +260,14 @@ impl<S: 'static> LocalOwnedTasks<S> {
258260
task: T,
259261
scheduler: S,
260262
id: super::Id,
263+
spawned_at: SpawnLocation,
261264
) -> (JoinHandle<T::Output>, Option<Notified<S>>)
262265
where
263266
S: Schedule,
264267
T: Future + 'static,
265268
T::Output: 'static,
266269
{
267-
let (task, notified, join) = super::new_task(task, scheduler, id);
270+
let (task, notified, join) = super::new_task(task, scheduler, id, spawned_at);
268271

269272
unsafe {
270273
// safety: We just created the task, so we have exclusive access

0 commit comments

Comments
 (0)