Skip to content

Commit c51744d

Browse files
committed
Refactor system diagnostics to use a single task
System information diagnostics now use a single task that wakes up every time the `First` schedule runs. The task checks if enough time has passed since the last refresh. If enough time has passed, it refreshes the system information and sends it to a channel. The `read_diagonstic_task` system then reads the system information from the channel to add diagnostic data.
1 parent 0ffd9d6 commit c51744d

File tree

2 files changed

+115
-80
lines changed

2 files changed

+115
-80
lines changed

crates/bevy_diagnostic/src/diagnostic.rs

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -39,18 +39,18 @@ impl DiagnosticPath {
3939
pub fn new(path: impl Into<Cow<'static, str>>) -> DiagnosticPath {
4040
let path = path.into();
4141

42-
debug_assert!(!path.is_empty(), "diagnostic path can't be empty");
42+
debug_assert!(!path.is_empty(), "diagnostic path should not be empty");
4343
debug_assert!(
4444
!path.starts_with('/'),
45-
"diagnostic path can't be start with `/`"
45+
"diagnostic path should not start with `/`"
4646
);
4747
debug_assert!(
4848
!path.ends_with('/'),
49-
"diagnostic path can't be end with `/`"
49+
"diagnostic path should not end with `/`"
5050
);
5151
debug_assert!(
5252
!path.contains("//"),
53-
"diagnostic path can't contain empty components"
53+
"diagnostic path should not contain empty components"
5454
);
5555

5656
DiagnosticPath {

crates/bevy_diagnostic/src/system_information_diagnostics_plugin.rs

Lines changed: 111 additions & 76 deletions
Original file line numberDiff line numberDiff line change
@@ -70,19 +70,25 @@ pub struct SystemInfo {
7070
feature = "std",
7171
))]
7272
pub mod internal {
73+
use core::{
74+
pin::Pin,
75+
task::{Context, Poll, Waker},
76+
};
77+
use std::sync::{
78+
mpsc::{self, Receiver, Sender},
79+
Arc, Mutex,
80+
};
81+
7382
use alloc::{
7483
format,
7584
string::{String, ToString},
76-
sync::Arc,
77-
vec::Vec,
7885
};
7986
use bevy_app::{App, First, Startup, Update};
8087
use bevy_ecs::resource::Resource;
81-
use bevy_ecs::{prelude::ResMut, system::Local};
88+
use bevy_ecs::{prelude::ResMut, system::Commands};
8289
use bevy_platform::time::Instant;
83-
use bevy_tasks::{available_parallelism, block_on, poll_once, AsyncComputeTaskPool, Task};
90+
use bevy_tasks::AsyncComputeTaskPool;
8491
use log::info;
85-
use std::sync::Mutex;
8692
use sysinfo::{CpuRefreshKind, MemoryRefreshKind, RefreshKind, System};
8793

8894
use crate::{Diagnostic, Diagnostics, DiagnosticsStore};
@@ -93,12 +99,20 @@ pub mod internal {
9399

94100
pub(super) fn setup_plugin(app: &mut App) {
95101
app.add_systems(Startup, setup_system)
96-
.add_systems(First, launch_diagnostic_tasks)
97-
.add_systems(Update, read_diagnostic_tasks)
98-
.init_resource::<SysinfoTasks>();
102+
.add_systems(First, wake_diagnostic_task)
103+
.add_systems(Update, read_diagnostic_task);
99104
}
100105

101-
fn setup_system(mut diagnostics: ResMut<DiagnosticsStore>) {
106+
fn setup_system(mut diagnostics: ResMut<DiagnosticsStore>, mut commands: Commands) {
107+
let (tx, rx) = mpsc::channel();
108+
let diagnostic_task = DiagnosticTask::new(tx);
109+
let waker = Arc::clone(&diagnostic_task.waker);
110+
AsyncComputeTaskPool::get().spawn(diagnostic_task).detach();
111+
commands.insert_resource(SysinfoTask {
112+
receiver: Mutex::new(rx),
113+
waker,
114+
});
115+
102116
diagnostics.add(
103117
Diagnostic::new(SystemInformationDiagnosticsPlugin::SYSTEM_CPU_USAGE).with_suffix("%"),
104118
);
@@ -121,78 +135,100 @@ pub mod internal {
121135
process_mem_usage: f64,
122136
}
123137

124-
#[derive(Resource, Default)]
125-
struct SysinfoTasks {
126-
tasks: Vec<Task<SysinfoRefreshData>>,
138+
impl SysinfoRefreshData {
139+
fn new(system: &mut System) -> Self {
140+
let pid = sysinfo::get_current_pid().expect("Failed to get current process ID");
141+
system.refresh_processes(sysinfo::ProcessesToUpdate::Some(&[pid]), true);
142+
143+
system.refresh_cpu_specifics(CpuRefreshKind::nothing().with_cpu_usage());
144+
system.refresh_memory();
145+
146+
let system_cpu_usage = system.global_cpu_usage().into();
147+
let total_mem = system.total_memory() as f64;
148+
let used_mem = system.used_memory() as f64;
149+
let system_mem_usage = used_mem / total_mem * 100.0;
150+
151+
let process_mem_usage = system
152+
.process(pid)
153+
.map(|p| p.memory() as f64 * BYTES_TO_GIB)
154+
.unwrap_or(0.0);
155+
156+
let process_cpu_usage = system
157+
.process(pid)
158+
.map(|p| p.cpu_usage() as f64 / system.cpus().len() as f64)
159+
.unwrap_or(0.0);
160+
161+
Self {
162+
system_cpu_usage,
163+
system_mem_usage,
164+
process_cpu_usage,
165+
process_mem_usage,
166+
}
167+
}
168+
}
169+
170+
#[derive(Resource)]
171+
struct SysinfoTask {
172+
receiver: Mutex<Receiver<SysinfoRefreshData>>,
173+
waker: Arc<Mutex<Option<Waker>>>,
127174
}
128175

129-
fn launch_diagnostic_tasks(
130-
mut tasks: ResMut<SysinfoTasks>,
131-
// TODO: Consider a fair mutex
132-
mut sysinfo: Local<Option<Arc<Mutex<System>>>>,
133-
// TODO: FromWorld for Instant?
134-
mut last_refresh: Local<Option<Instant>>,
135-
) {
136-
let sysinfo = sysinfo.get_or_insert_with(|| {
137-
Arc::new(Mutex::new(System::new_with_specifics(
138-
RefreshKind::nothing()
139-
.with_cpu(CpuRefreshKind::nothing().with_cpu_usage())
140-
.with_memory(MemoryRefreshKind::everything()),
141-
)))
142-
});
176+
struct DiagnosticTask {
177+
system: System,
178+
last_refresh: Instant,
179+
sender: Sender<SysinfoRefreshData>,
180+
waker: Arc<Mutex<Option<Waker>>>,
181+
}
143182

144-
let last_refresh = last_refresh.get_or_insert_with(Instant::now);
145-
146-
let thread_pool = AsyncComputeTaskPool::get();
147-
148-
// Only queue a new system refresh task when necessary
149-
// Queuing earlier than that will not give new data
150-
if last_refresh.elapsed() > sysinfo::MINIMUM_CPU_UPDATE_INTERVAL
151-
// These tasks don't yield and will take up all of the task pool's
152-
// threads if we don't limit their amount.
153-
&& tasks.tasks.len() * 2 < available_parallelism()
154-
{
155-
let sys = Arc::clone(sysinfo);
156-
let task = thread_pool.spawn(async move {
157-
let mut sys = sys.lock().unwrap();
158-
let pid = sysinfo::get_current_pid().expect("Failed to get current process ID");
159-
sys.refresh_processes(sysinfo::ProcessesToUpdate::Some(&[pid]), true);
160-
161-
sys.refresh_cpu_specifics(CpuRefreshKind::nothing().with_cpu_usage());
162-
sys.refresh_memory();
163-
let system_cpu_usage = sys.global_cpu_usage().into();
164-
let total_mem = sys.total_memory() as f64;
165-
let used_mem = sys.used_memory() as f64;
166-
let system_mem_usage = used_mem / total_mem * 100.0;
167-
168-
let process_mem_usage = sys
169-
.process(pid)
170-
.map(|p| p.memory() as f64 * BYTES_TO_GIB)
171-
.unwrap_or(0.0);
172-
173-
let process_cpu_usage = sys
174-
.process(pid)
175-
.map(|p| p.cpu_usage() as f64 / sys.cpus().len() as f64)
176-
.unwrap_or(0.0);
177-
178-
SysinfoRefreshData {
179-
system_cpu_usage,
180-
system_mem_usage,
181-
process_cpu_usage,
182-
process_mem_usage,
183-
}
184-
});
185-
tasks.tasks.push(task);
186-
*last_refresh = Instant::now();
183+
impl DiagnosticTask {
184+
fn new(sender: Sender<SysinfoRefreshData>) -> Self {
185+
Self {
186+
system: System::new_with_specifics(
187+
RefreshKind::nothing()
188+
.with_cpu(CpuRefreshKind::nothing().with_cpu_usage())
189+
.with_memory(MemoryRefreshKind::everything()),
190+
),
191+
last_refresh: Instant::now(),
192+
sender,
193+
waker: Arc::default(),
194+
}
187195
}
188196
}
189197

190-
fn read_diagnostic_tasks(mut diagnostics: Diagnostics, mut tasks: ResMut<SysinfoTasks>) {
191-
tasks.tasks.retain_mut(|task| {
192-
let Some(data) = block_on(poll_once(task)) else {
193-
return true;
194-
};
198+
impl DiagnosticTask {
199+
fn update_waker(&mut self, cx: &mut Context<'_>) {
200+
let mut waker = self.waker.lock().unwrap();
201+
*waker = Some(cx.waker().clone());
202+
}
203+
}
204+
205+
impl Future for DiagnosticTask {
206+
type Output = ();
207+
208+
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
209+
self.update_waker(cx);
210+
211+
if self.last_refresh.elapsed() > sysinfo::MINIMUM_CPU_UPDATE_INTERVAL {
212+
self.last_refresh = Instant::now();
213+
214+
let sysinfo_refresh_data = SysinfoRefreshData::new(&mut self.system);
215+
self.sender.send(sysinfo_refresh_data).unwrap();
216+
}
195217

218+
Poll::Pending
219+
}
220+
}
221+
222+
fn wake_diagnostic_task(task: ResMut<SysinfoTask>) {
223+
let mut waker = task.waker.lock().unwrap();
224+
if let Some(waker) = waker.take() {
225+
waker.wake_by_ref();
226+
}
227+
}
228+
229+
fn read_diagnostic_task(mut diagnostics: Diagnostics, task: ResMut<SysinfoTask>) {
230+
let receiver = task.receiver.lock().unwrap();
231+
while let Ok(data) = receiver.try_recv() {
196232
diagnostics.add_measurement(
197233
&SystemInformationDiagnosticsPlugin::SYSTEM_CPU_USAGE,
198234
|| data.system_cpu_usage,
@@ -209,8 +245,7 @@ pub mod internal {
209245
&SystemInformationDiagnosticsPlugin::PROCESS_MEM_USAGE,
210246
|| data.process_mem_usage,
211247
);
212-
false
213-
});
248+
}
214249
}
215250

216251
impl Default for SystemInfo {

0 commit comments

Comments
 (0)