diff --git a/crates/bevy_diagnostic/Cargo.toml b/crates/bevy_diagnostic/Cargo.toml index de04e5ec9946b..b803136931b41 100644 --- a/crates/bevy_diagnostic/Cargo.toml +++ b/crates/bevy_diagnostic/Cargo.toml @@ -60,6 +60,7 @@ bevy_platform = { path = "../bevy_platform", version = "0.17.0-dev", default-fea ] } # other +atomic-waker = { version = "1", default-features = false } const-fnv1a-hash = "1.1.0" serde = { version = "1.0", default-features = false, features = [ "alloc", diff --git a/crates/bevy_diagnostic/src/diagnostic.rs b/crates/bevy_diagnostic/src/diagnostic.rs index dd1e3576431d6..937da5b258502 100644 --- a/crates/bevy_diagnostic/src/diagnostic.rs +++ b/crates/bevy_diagnostic/src/diagnostic.rs @@ -39,18 +39,18 @@ impl DiagnosticPath { pub fn new(path: impl Into>) -> DiagnosticPath { let path = path.into(); - debug_assert!(!path.is_empty(), "diagnostic path can't be empty"); + debug_assert!(!path.is_empty(), "diagnostic path should not be empty"); debug_assert!( !path.starts_with('/'), - "diagnostic path can't be start with `/`" + "diagnostic path should not start with `/`" ); debug_assert!( !path.ends_with('/'), - "diagnostic path can't be end with `/`" + "diagnostic path should not end with `/`" ); debug_assert!( !path.contains("//"), - "diagnostic path can't contain empty components" + "diagnostic path should not contain empty components" ); DiagnosticPath { diff --git a/crates/bevy_diagnostic/src/system_information_diagnostics_plugin.rs b/crates/bevy_diagnostic/src/system_information_diagnostics_plugin.rs index 83d3663895ca5..0f61d88720160 100644 --- a/crates/bevy_diagnostic/src/system_information_diagnostics_plugin.rs +++ b/crates/bevy_diagnostic/src/system_information_diagnostics_plugin.rs @@ -9,12 +9,12 @@ use bevy_ecs::resource::Resource; /// Any system diagnostics gathered by this plugin may not be current when you access them. /// /// Supported targets: -/// * linux, -/// * windows, -/// * android, +/// * linux +/// * windows +/// * android /// * macOS /// -/// NOT supported when using the `bevy/dynamic` feature even when using previously mentioned targets +/// NOT supported when using the `bevy/dynamic` feature even when using previously mentioned targets. /// /// # See also /// @@ -69,20 +69,27 @@ pub struct SystemInfo { not(feature = "dynamic_linking"), feature = "std", ))] -pub mod internal { +mod internal { + use core::{ + pin::Pin, + task::{Context, Poll}, + }; + use std::sync::{ + mpsc::{self, Receiver, Sender}, + Arc, + }; + use alloc::{ format, string::{String, ToString}, - sync::Arc, - vec::Vec, }; + use atomic_waker::AtomicWaker; use bevy_app::{App, First, Startup, Update}; use bevy_ecs::resource::Resource; - use bevy_ecs::{prelude::ResMut, system::Local}; - use bevy_platform::time::Instant; - use bevy_tasks::{available_parallelism, block_on, poll_once, AsyncComputeTaskPool, Task}; + use bevy_ecs::{prelude::ResMut, system::Commands}; + use bevy_platform::{cell::SyncCell, time::Instant}; + use bevy_tasks::{AsyncComputeTaskPool, Task}; use log::info; - use std::sync::Mutex; use sysinfo::{CpuRefreshKind, MemoryRefreshKind, RefreshKind, System}; use crate::{Diagnostic, Diagnostics, DiagnosticsStore}; @@ -91,14 +98,30 @@ pub mod internal { const BYTES_TO_GIB: f64 = 1.0 / 1024.0 / 1024.0 / 1024.0; + /// Sets up the system information diagnostics plugin. + /// + /// The plugin spawns a single background task in the async task pool that always reschedules. + /// The [`wake_diagnostic_task`] system wakes this task once per frame during the [`First`] + /// schedule. If enough time has passed since the last refresh, it sends [`SysinfoRefreshData`] + /// through a channel. The [`read_diagnostic_task`] system receives this data during the + /// [`Update`] schedule and adds it as diagnostic measurements. pub(super) fn setup_plugin(app: &mut App) { app.add_systems(Startup, setup_system) - .add_systems(First, launch_diagnostic_tasks) - .add_systems(Update, read_diagnostic_tasks) - .init_resource::(); + .add_systems(First, wake_diagnostic_task) + .add_systems(Update, read_diagnostic_task); } - fn setup_system(mut diagnostics: ResMut) { + fn setup_system(mut diagnostics: ResMut, mut commands: Commands) { + let (tx, rx) = mpsc::channel(); + let diagnostic_task = DiagnosticTask::new(tx); + let waker = Arc::clone(&diagnostic_task.waker); + let task = AsyncComputeTaskPool::get().spawn(diagnostic_task); + commands.insert_resource(SysinfoTask { + _task: task, + receiver: SyncCell::new(rx), + waker, + }); + diagnostics.add( Diagnostic::new(SystemInformationDiagnosticsPlugin::SYSTEM_CPU_USAGE).with_suffix("%"), ); @@ -121,78 +144,92 @@ pub mod internal { process_mem_usage: f64, } - #[derive(Resource, Default)] - struct SysinfoTasks { - tasks: Vec>, + impl SysinfoRefreshData { + fn new(system: &mut System) -> Self { + let pid = sysinfo::get_current_pid().expect("Failed to get current process ID"); + system.refresh_processes(sysinfo::ProcessesToUpdate::Some(&[pid]), true); + + system.refresh_cpu_specifics(CpuRefreshKind::nothing().with_cpu_usage()); + system.refresh_memory(); + + let system_cpu_usage = system.global_cpu_usage().into(); + let total_mem = system.total_memory() as f64; + let used_mem = system.used_memory() as f64; + let system_mem_usage = used_mem / total_mem * 100.0; + + let process_mem_usage = system + .process(pid) + .map(|p| p.memory() as f64 * BYTES_TO_GIB) + .unwrap_or(0.0); + + let process_cpu_usage = system + .process(pid) + .map(|p| p.cpu_usage() as f64 / system.cpus().len() as f64) + .unwrap_or(0.0); + + Self { + system_cpu_usage, + system_mem_usage, + process_cpu_usage, + process_mem_usage, + } + } } - fn launch_diagnostic_tasks( - mut tasks: ResMut, - // TODO: Consider a fair mutex - mut sysinfo: Local>>>, - // TODO: FromWorld for Instant? - mut last_refresh: Local>, - ) { - let sysinfo = sysinfo.get_or_insert_with(|| { - Arc::new(Mutex::new(System::new_with_specifics( - RefreshKind::nothing() - .with_cpu(CpuRefreshKind::nothing().with_cpu_usage()) - .with_memory(MemoryRefreshKind::everything()), - ))) - }); + #[derive(Resource)] + struct SysinfoTask { + _task: Task<()>, + receiver: SyncCell>, + waker: Arc, + } - let last_refresh = last_refresh.get_or_insert_with(Instant::now); - - let thread_pool = AsyncComputeTaskPool::get(); - - // Only queue a new system refresh task when necessary - // Queuing earlier than that will not give new data - if last_refresh.elapsed() > sysinfo::MINIMUM_CPU_UPDATE_INTERVAL - // These tasks don't yield and will take up all of the task pool's - // threads if we don't limit their amount. - && tasks.tasks.len() * 2 < available_parallelism() - { - let sys = Arc::clone(sysinfo); - let task = thread_pool.spawn(async move { - let mut sys = sys.lock().unwrap(); - let pid = sysinfo::get_current_pid().expect("Failed to get current process ID"); - sys.refresh_processes(sysinfo::ProcessesToUpdate::Some(&[pid]), true); - - sys.refresh_cpu_specifics(CpuRefreshKind::nothing().with_cpu_usage()); - sys.refresh_memory(); - let system_cpu_usage = sys.global_cpu_usage().into(); - let total_mem = sys.total_memory() as f64; - let used_mem = sys.used_memory() as f64; - let system_mem_usage = used_mem / total_mem * 100.0; - - let process_mem_usage = sys - .process(pid) - .map(|p| p.memory() as f64 * BYTES_TO_GIB) - .unwrap_or(0.0); - - let process_cpu_usage = sys - .process(pid) - .map(|p| p.cpu_usage() as f64 / sys.cpus().len() as f64) - .unwrap_or(0.0); - - SysinfoRefreshData { - system_cpu_usage, - system_mem_usage, - process_cpu_usage, - process_mem_usage, - } - }); - tasks.tasks.push(task); - *last_refresh = Instant::now(); + struct DiagnosticTask { + system: System, + last_refresh: Instant, + sender: Sender, + waker: Arc, + } + + impl DiagnosticTask { + fn new(sender: Sender) -> Self { + Self { + system: System::new_with_specifics( + RefreshKind::nothing() + .with_cpu(CpuRefreshKind::nothing().with_cpu_usage()) + .with_memory(MemoryRefreshKind::everything()), + ), + // Avoids initial delay on first refresh + last_refresh: Instant::now() - sysinfo::MINIMUM_CPU_UPDATE_INTERVAL, + sender, + waker: Arc::default(), + } } } - fn read_diagnostic_tasks(mut diagnostics: Diagnostics, mut tasks: ResMut) { - tasks.tasks.retain_mut(|task| { - let Some(data) = block_on(poll_once(task)) else { - return true; - }; + impl Future for DiagnosticTask { + type Output = (); + + fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { + self.waker.register(cx.waker()); + + if self.last_refresh.elapsed() > sysinfo::MINIMUM_CPU_UPDATE_INTERVAL { + self.last_refresh = Instant::now(); + + let sysinfo_refresh_data = SysinfoRefreshData::new(&mut self.system); + self.sender.send(sysinfo_refresh_data).unwrap(); + } + + // Always reschedules + Poll::Pending + } + } + fn wake_diagnostic_task(task: ResMut) { + task.waker.wake(); + } + + fn read_diagnostic_task(mut diagnostics: Diagnostics, mut task: ResMut) { + while let Ok(data) = task.receiver.get().try_recv() { diagnostics.add_measurement( &SystemInformationDiagnosticsPlugin::SYSTEM_CPU_USAGE, || data.system_cpu_usage, @@ -209,8 +246,7 @@ pub mod internal { &SystemInformationDiagnosticsPlugin::PROCESS_MEM_USAGE, || data.process_mem_usage, ); - false - }); + } } impl Default for SystemInfo { @@ -252,7 +288,7 @@ pub mod internal { not(feature = "dynamic_linking"), feature = "std", )))] -pub mod internal { +mod internal { use alloc::string::ToString; use bevy_app::{App, Startup};