|
1 | 1 | #![cfg_attr(loom, allow(unused_imports))] |
2 | 2 |
|
| 3 | +use crate::runtime::blocking::BlockingPool; |
3 | 4 | use crate::runtime::handle::Handle; |
4 | | -use crate::runtime::{blocking, driver, Callback, HistogramBuilder, Runtime, TaskCallback}; |
| 5 | +use crate::runtime::scheduler::CurrentThread; |
| 6 | +use crate::runtime::{blocking, driver, Callback, HistogramBuilder, Runtime}; |
5 | 7 | #[cfg(tokio_unstable)] |
6 | | -use crate::runtime::{metrics::HistogramConfiguration, LocalOptions, LocalRuntime, TaskMeta}; |
| 8 | +use crate::runtime::{ |
| 9 | + metrics::HistogramConfiguration, LocalOptions, LocalRuntime, OptionalTaskHooksFactory, |
| 10 | + TaskHookHarnessFactory, |
| 11 | +}; |
7 | 12 | use crate::util::rand::{RngSeed, RngSeedGenerator}; |
8 | | - |
9 | | -use crate::runtime::blocking::BlockingPool; |
10 | | -use crate::runtime::scheduler::CurrentThread; |
11 | 13 | use std::fmt; |
12 | 14 | use std::io; |
| 15 | +#[cfg(tokio_unstable)] |
| 16 | +use std::sync::Arc; |
13 | 17 | use std::thread::ThreadId; |
14 | 18 | use std::time::Duration; |
15 | 19 |
|
@@ -85,19 +89,8 @@ pub struct Builder { |
85 | 89 | /// To run after each thread is unparked. |
86 | 90 | pub(super) after_unpark: Option<Callback>, |
87 | 91 |
|
88 | | - /// To run before each task is spawned. |
89 | | - pub(super) before_spawn: Option<TaskCallback>, |
90 | | - |
91 | | - /// To run before each poll |
92 | 92 | #[cfg(tokio_unstable)] |
93 | | - pub(super) before_poll: Option<TaskCallback>, |
94 | | - |
95 | | - /// To run after each poll |
96 | | - #[cfg(tokio_unstable)] |
97 | | - pub(super) after_poll: Option<TaskCallback>, |
98 | | - |
99 | | - /// To run after each task is terminated. |
100 | | - pub(super) after_termination: Option<TaskCallback>, |
| 93 | + pub(super) task_hook_harness_factory: OptionalTaskHooksFactory, |
101 | 94 |
|
102 | 95 | /// Customizable keep alive timeout for `BlockingPool` |
103 | 96 | pub(super) keep_alive: Option<Duration>, |
@@ -311,13 +304,8 @@ impl Builder { |
311 | 304 | before_park: None, |
312 | 305 | after_unpark: None, |
313 | 306 |
|
314 | | - before_spawn: None, |
315 | | - after_termination: None, |
316 | | - |
317 | 307 | #[cfg(tokio_unstable)] |
318 | | - before_poll: None, |
319 | | - #[cfg(tokio_unstable)] |
320 | | - after_poll: None, |
| 308 | + task_hook_harness_factory: None, |
321 | 309 |
|
322 | 310 | keep_alive: None, |
323 | 311 |
|
@@ -706,188 +694,19 @@ impl Builder { |
706 | 694 | self |
707 | 695 | } |
708 | 696 |
|
709 | | - /// Executes function `f` just before a task is spawned. |
710 | | - /// |
711 | | - /// `f` is called within the Tokio context, so functions like |
712 | | - /// [`tokio::spawn`](crate::spawn) can be called, and may result in this callback being |
713 | | - /// invoked immediately. |
714 | | - /// |
715 | | - /// This can be used for bookkeeping or monitoring purposes. |
716 | | - /// |
717 | | - /// Note: There can only be one spawn callback for a runtime; calling this function more |
718 | | - /// than once replaces the last callback defined, rather than adding to it. |
719 | | - /// |
720 | | - /// This *does not* support [`LocalSet`](crate::task::LocalSet) at this time. |
721 | | - /// |
722 | | - /// **Note**: This is an [unstable API][unstable]. The public API of this type |
723 | | - /// may break in 1.x releases. See [the documentation on unstable |
724 | | - /// features][unstable] for details. |
725 | | - /// |
726 | | - /// [unstable]: crate#unstable-features |
727 | | - /// |
728 | | - /// # Examples |
729 | | - /// |
730 | | - /// ``` |
731 | | - /// # use tokio::runtime; |
732 | | - /// # pub fn main() { |
733 | | - /// let runtime = runtime::Builder::new_current_thread() |
734 | | - /// .on_task_spawn(|_| { |
735 | | - /// println!("spawning task"); |
736 | | - /// }) |
737 | | - /// .build() |
738 | | - /// .unwrap(); |
| 697 | + /// Factory method for producing "fallback" task hook harnesses. |
739 | 698 | /// |
740 | | - /// runtime.block_on(async { |
741 | | - /// tokio::task::spawn(std::future::ready(())); |
742 | | - /// |
743 | | - /// for _ in 0..64 { |
744 | | - /// tokio::task::yield_now().await; |
745 | | - /// } |
746 | | - /// }) |
747 | | - /// # } |
748 | | - /// ``` |
| 699 | + /// The order of operations for assigning the hook harness for a task are as follows: |
| 700 | + /// 1. [`crate::task::spawn_with_hooks`], if used. |
| 701 | + /// 2. [`crate::runtime::task_hooks::TaskHookHarnessFactory`], if it returns something other than [Option::None]. |
| 702 | + /// 3. This function. |
749 | 703 | #[cfg(all(not(loom), tokio_unstable))] |
750 | 704 | #[cfg_attr(docsrs, doc(cfg(tokio_unstable)))] |
751 | | - pub fn on_task_spawn<F>(&mut self, f: F) -> &mut Self |
752 | | - where |
753 | | - F: Fn(&TaskMeta<'_>) + Send + Sync + 'static, |
754 | | - { |
755 | | - self.before_spawn = Some(std::sync::Arc::new(f)); |
756 | | - self |
757 | | - } |
758 | | - |
759 | | - /// Executes function `f` just before a task is polled |
760 | | - /// |
761 | | - /// `f` is called within the Tokio context, so functions like |
762 | | - /// [`tokio::spawn`](crate::spawn) can be called, and may result in this callback being |
763 | | - /// invoked immediately. |
764 | | - /// |
765 | | - /// **Note**: This is an [unstable API][unstable]. The public API of this type |
766 | | - /// may break in 1.x releases. See [the documentation on unstable |
767 | | - /// features][unstable] for details. |
768 | | - /// |
769 | | - /// [unstable]: crate#unstable-features |
770 | | - /// |
771 | | - /// # Examples |
772 | | - /// |
773 | | - /// ``` |
774 | | - /// # use std::sync::{atomic::AtomicUsize, Arc}; |
775 | | - /// # use tokio::task::yield_now; |
776 | | - /// # pub fn main() { |
777 | | - /// let poll_start_counter = Arc::new(AtomicUsize::new(0)); |
778 | | - /// let poll_start = poll_start_counter.clone(); |
779 | | - /// let rt = tokio::runtime::Builder::new_multi_thread() |
780 | | - /// .enable_all() |
781 | | - /// .on_before_task_poll(move |meta| { |
782 | | - /// println!("task {} is about to be polled", meta.id()) |
783 | | - /// }) |
784 | | - /// .build() |
785 | | - /// .unwrap(); |
786 | | - /// let task = rt.spawn(async { |
787 | | - /// yield_now().await; |
788 | | - /// }); |
789 | | - /// let _ = rt.block_on(task); |
790 | | - /// |
791 | | - /// # } |
792 | | - /// ``` |
793 | | - #[cfg(tokio_unstable)] |
794 | | - pub fn on_before_task_poll<F>(&mut self, f: F) -> &mut Self |
| 705 | + pub fn hook_harness_factory<T>(&mut self, hooks: T) -> &mut Self |
795 | 706 | where |
796 | | - F: Fn(&TaskMeta<'_>) + Send + Sync + 'static, |
| 707 | + T: TaskHookHarnessFactory + Send + Sync + 'static, |
797 | 708 | { |
798 | | - self.before_poll = Some(std::sync::Arc::new(f)); |
799 | | - self |
800 | | - } |
801 | | - |
802 | | - /// Executes function `f` just after a task is polled |
803 | | - /// |
804 | | - /// `f` is called within the Tokio context, so functions like |
805 | | - /// [`tokio::spawn`](crate::spawn) can be called, and may result in this callback being |
806 | | - /// invoked immediately. |
807 | | - /// |
808 | | - /// **Note**: This is an [unstable API][unstable]. The public API of this type |
809 | | - /// may break in 1.x releases. See [the documentation on unstable |
810 | | - /// features][unstable] for details. |
811 | | - /// |
812 | | - /// [unstable]: crate#unstable-features |
813 | | - /// |
814 | | - /// # Examples |
815 | | - /// |
816 | | - /// ``` |
817 | | - /// # use std::sync::{atomic::AtomicUsize, Arc}; |
818 | | - /// # use tokio::task::yield_now; |
819 | | - /// # pub fn main() { |
820 | | - /// let poll_stop_counter = Arc::new(AtomicUsize::new(0)); |
821 | | - /// let poll_stop = poll_stop_counter.clone(); |
822 | | - /// let rt = tokio::runtime::Builder::new_multi_thread() |
823 | | - /// .enable_all() |
824 | | - /// .on_after_task_poll(move |meta| { |
825 | | - /// println!("task {} completed polling", meta.id()); |
826 | | - /// }) |
827 | | - /// .build() |
828 | | - /// .unwrap(); |
829 | | - /// let task = rt.spawn(async { |
830 | | - /// yield_now().await; |
831 | | - /// }); |
832 | | - /// let _ = rt.block_on(task); |
833 | | - /// |
834 | | - /// # } |
835 | | - /// ``` |
836 | | - #[cfg(tokio_unstable)] |
837 | | - pub fn on_after_task_poll<F>(&mut self, f: F) -> &mut Self |
838 | | - where |
839 | | - F: Fn(&TaskMeta<'_>) + Send + Sync + 'static, |
840 | | - { |
841 | | - self.after_poll = Some(std::sync::Arc::new(f)); |
842 | | - self |
843 | | - } |
844 | | - |
845 | | - /// Executes function `f` just after a task is terminated. |
846 | | - /// |
847 | | - /// `f` is called within the Tokio context, so functions like |
848 | | - /// [`tokio::spawn`](crate::spawn) can be called. |
849 | | - /// |
850 | | - /// This can be used for bookkeeping or monitoring purposes. |
851 | | - /// |
852 | | - /// Note: There can only be one task termination callback for a runtime; calling this |
853 | | - /// function more than once replaces the last callback defined, rather than adding to it. |
854 | | - /// |
855 | | - /// This *does not* support [`LocalSet`](crate::task::LocalSet) at this time. |
856 | | - /// |
857 | | - /// **Note**: This is an [unstable API][unstable]. The public API of this type |
858 | | - /// may break in 1.x releases. See [the documentation on unstable |
859 | | - /// features][unstable] for details. |
860 | | - /// |
861 | | - /// [unstable]: crate#unstable-features |
862 | | - /// |
863 | | - /// # Examples |
864 | | - /// |
865 | | - /// ``` |
866 | | - /// # use tokio::runtime; |
867 | | - /// # pub fn main() { |
868 | | - /// let runtime = runtime::Builder::new_current_thread() |
869 | | - /// .on_task_terminate(|_| { |
870 | | - /// println!("killing task"); |
871 | | - /// }) |
872 | | - /// .build() |
873 | | - /// .unwrap(); |
874 | | - /// |
875 | | - /// runtime.block_on(async { |
876 | | - /// tokio::task::spawn(std::future::ready(())); |
877 | | - /// |
878 | | - /// for _ in 0..64 { |
879 | | - /// tokio::task::yield_now().await; |
880 | | - /// } |
881 | | - /// }) |
882 | | - /// # } |
883 | | - /// ``` |
884 | | - #[cfg(all(not(loom), tokio_unstable))] |
885 | | - #[cfg_attr(docsrs, doc(cfg(tokio_unstable)))] |
886 | | - pub fn on_task_terminate<F>(&mut self, f: F) -> &mut Self |
887 | | - where |
888 | | - F: Fn(&TaskMeta<'_>) + Send + Sync + 'static, |
889 | | - { |
890 | | - self.after_termination = Some(std::sync::Arc::new(f)); |
| 709 | + self.task_hook_harness_factory = Some(Arc::new(hooks)); |
891 | 710 | self |
892 | 711 | } |
893 | 712 |
|
@@ -1508,12 +1327,8 @@ impl Builder { |
1508 | 1327 | Config { |
1509 | 1328 | before_park: self.before_park.clone(), |
1510 | 1329 | after_unpark: self.after_unpark.clone(), |
1511 | | - before_spawn: self.before_spawn.clone(), |
1512 | 1330 | #[cfg(tokio_unstable)] |
1513 | | - before_poll: self.before_poll.clone(), |
1514 | | - #[cfg(tokio_unstable)] |
1515 | | - after_poll: self.after_poll.clone(), |
1516 | | - after_termination: self.after_termination.clone(), |
| 1331 | + task_hook_factory: self.task_hook_harness_factory.clone(), |
1517 | 1332 | global_queue_interval: self.global_queue_interval, |
1518 | 1333 | event_interval: self.event_interval, |
1519 | 1334 | local_queue_capacity: self.local_queue_capacity, |
@@ -1662,12 +1477,8 @@ cfg_rt_multi_thread! { |
1662 | 1477 | Config { |
1663 | 1478 | before_park: self.before_park.clone(), |
1664 | 1479 | after_unpark: self.after_unpark.clone(), |
1665 | | - before_spawn: self.before_spawn.clone(), |
1666 | | - #[cfg(tokio_unstable)] |
1667 | | - before_poll: self.before_poll.clone(), |
1668 | 1480 | #[cfg(tokio_unstable)] |
1669 | | - after_poll: self.after_poll.clone(), |
1670 | | - after_termination: self.after_termination.clone(), |
| 1481 | + task_hook_factory: self.task_hook_harness_factory.clone(), |
1671 | 1482 | global_queue_interval: self.global_queue_interval, |
1672 | 1483 | event_interval: self.event_interval, |
1673 | 1484 | local_queue_capacity: self.local_queue_capacity, |
@@ -1715,12 +1526,8 @@ cfg_rt_multi_thread! { |
1715 | 1526 | Config { |
1716 | 1527 | before_park: self.before_park.clone(), |
1717 | 1528 | after_unpark: self.after_unpark.clone(), |
1718 | | - before_spawn: self.before_spawn.clone(), |
1719 | | - after_termination: self.after_termination.clone(), |
1720 | | - #[cfg(tokio_unstable)] |
1721 | | - before_poll: self.before_poll.clone(), |
1722 | 1529 | #[cfg(tokio_unstable)] |
1723 | | - after_poll: self.after_poll.clone(), |
| 1530 | + task_hook_factory: self.task_hook_harness_factory.clone(), |
1724 | 1531 | global_queue_interval: self.global_queue_interval, |
1725 | 1532 | event_interval: self.event_interval, |
1726 | 1533 | local_queue_capacity: self.local_queue_capacity, |
|
0 commit comments