Skip to content

Commit 03d688e

Browse files
committed
feat(rpc): support profiling task
1 parent ed417d1 commit 03d688e

File tree

4 files changed

+117
-35
lines changed

4 files changed

+117
-35
lines changed

src/main.rs

Lines changed: 54 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -32,11 +32,16 @@ use log::log_init;
3232
use mimalloc::MiMalloc;
3333
use nix::unistd::geteuid;
3434
use opentelemetry_otlp::ExportConfig;
35-
use psh_proto::HeartbeatReq;
36-
use runtime::{Task, TaskRuntime};
35+
use psh_proto::{
36+
HeartbeatReq, PerfDataProto,
37+
export_data_req::{Data, data::DataType},
38+
};
39+
use runtime::{TaskRuntime, WasmTask};
3740
use services::rpc::RpcClient;
3841
use tokio::{runtime::Runtime, try_join};
3942

43+
use self::services::{rpc::WhichTask, sampling::Profiler};
44+
4045
#[global_allocator]
4146
static GLOBAL: MiMalloc = mimalloc::MiMalloc;
4247

@@ -86,7 +91,7 @@ fn main() -> Result<()> {
8691
let task_rt = TaskRuntime::new()?;
8792

8893
if let Some(args) = wasm_with_args {
89-
let task = Task {
94+
let task = WasmTask {
9095
id: None,
9196
wasm_component: fs::read(&args[0])?,
9297
wasm_component_args: args,
@@ -143,14 +148,52 @@ async fn async_tasks(remote_cfg: RemoteConfig, mut task_rt: TaskRuntime) -> Resu
143148
loop {
144149
let idle = task_rt.is_idle();
145150
if idle {
146-
if let Some(mut task) = client.get_task(instance_id.clone()).await? {
147-
let task_id = task
148-
.id
149-
.as_ref()
150-
.map(|it| it.to_string())
151-
.expect("No task id provided");
152-
task.wasm_component_args.insert(0, task_id);
153-
task_rt.schedule(task)?
151+
if let Some(task) = client.get_task(instance_id.clone()).await? {
152+
match task {
153+
WhichTask::Wasm(mut task) => {
154+
let task_id = task
155+
.id
156+
.as_ref()
157+
.map(|it| it.to_string())
158+
.expect("No task id provided");
159+
task.wasm_component_args.insert(0, task_id);
160+
task_rt.schedule(task)?;
161+
}
162+
WhichTask::Profiling(profiling_task) => {
163+
let mut profiler = Profiler::new(
164+
profiling_task.process,
165+
profiling_task.mmap_pages as _,
166+
profiling_task.overflow_by,
167+
profiling_task.stack_depth,
168+
)?;
169+
let task_time_slice = {
170+
let delta = profiling_task.end_time.timestamp_millis()
171+
- Utc::now().timestamp_millis();
172+
delta.max(0) as u64
173+
};
174+
175+
let perf_data = tokio::task::spawn_blocking(
176+
move || -> anyhow::Result<PerfDataProto> {
177+
profiler.enable()?;
178+
std::thread::sleep(Duration::from_millis(task_time_slice));
179+
profiler.disable()?;
180+
Ok(profiler.perf_data_proto())
181+
},
182+
)
183+
.await??;
184+
if let Some(task_id) = profiling_task.id {
185+
let data = Data {
186+
data_type: Some(DataType::PerfData(perf_data)),
187+
};
188+
client
189+
.export_data(psh_proto::ExportDataReq {
190+
task_id,
191+
data: vec![data],
192+
})
193+
.await?;
194+
}
195+
}
196+
};
154197
}
155198
}
156199

src/runtime/data_export.rs

Lines changed: 7 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,10 @@ use profiling::data_export::{
2626
common::FieldValue as WitFieldValue, measurement::Point, metric::Sample,
2727
};
2828
use prost::Message;
29-
use psh_proto::{Data, DataType, ExportDataReq};
29+
use psh_proto::{
30+
ExportDataReq, LineProtocolData,
31+
export_data_req::{Data, data::DataType},
32+
};
3033
use wasmtime::component::Linker;
3134

3235
use crate::{TOKIO_RUNTIME, services::rpc::RpcClient};
@@ -172,8 +175,7 @@ impl profiling::data_export::file::Host for DataExportCtx {
172175
};
173176

174177
let data = Data {
175-
ty: DataType::File as _,
176-
bytes,
178+
data_type: Some(DataType::File(psh_proto::FileData { bytes })),
177179
};
178180
ctx.exporter.schedule(data);
179181

@@ -201,8 +203,7 @@ impl profiling::data_export::metric::Host for DataExportCtx {
201203
};
202204

203205
let data = Data {
204-
ty: DataType::LineProtocol as _,
205-
bytes,
206+
data_type: Some(DataType::LineProtocol(LineProtocolData { bytes })),
206207
};
207208
ctx.exporter.schedule(data);
208209

@@ -238,8 +239,7 @@ impl profiling::data_export::measurement::Host for DataExportCtx {
238239
};
239240

240241
let data = Data {
241-
ty: DataType::LineProtocol as _,
242-
bytes,
242+
data_type: Some(DataType::LineProtocol(LineProtocolData { bytes })),
243243
};
244244
ctx.exporter.schedule(data);
245245

src/runtime/mod.rs

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -39,16 +39,17 @@ pub use state::PshState;
3939

4040
use crate::services::rpc::RpcClient;
4141

42-
pub struct Task {
42+
#[derive(Clone, Debug, PartialEq, Eq, PartialOrd, Ord)]
43+
pub struct WasmTask {
4344
pub id: Option<String>,
4445
pub wasm_component: Vec<u8>,
4546
pub wasm_component_args: Vec<String>,
4647
pub end_time: DateTime<Utc>,
4748
}
4849

4950
pub struct TaskRuntime {
50-
tx: Sender<Task>,
51-
rx: Option<Receiver<Task>>,
51+
tx: Sender<WasmTask>,
52+
rx: Option<Receiver<WasmTask>>,
5253
len: Arc<AtomicUsize>,
5354
finished_task_id: Arc<Mutex<Vec<String>>>,
5455
}
@@ -65,7 +66,7 @@ impl TaskRuntime {
6566
})
6667
}
6768

68-
pub fn schedule(&self, task: Task) -> Result<()> {
69+
pub fn schedule(&self, task: WasmTask) -> Result<()> {
6970
self.len.fetch_add(1, Ordering::Release);
7071
self.tx.send(task)?;
7172
Ok(())

src/services/rpc.rs

Lines changed: 51 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,10 @@
1313
// see <https://www.gnu.org/licenses/>.
1414

1515
use anyhow::{Result, bail};
16+
use chrono::DateTime;
1617
use chrono::{TimeZone, Utc, offset::LocalResult};
18+
use perf_event_rs::sampling::OverflowBy;
19+
use psh_proto::task::TaskType;
1720
use psh_proto::{
1821
ExportDataReq, GetTaskReq, HeartbeatReq, TaskDoneReq, Unit,
1922
psh_service_client::PshServiceClient,
@@ -26,7 +29,7 @@ use tonic::{
2629
transport::{Channel, ClientTlsConfig, Endpoint},
2730
};
2831

29-
use crate::{config::RpcConfig, runtime::Task, services::host_info::new_info_req};
32+
use crate::{config::RpcConfig, runtime::WasmTask, services::host_info::new_info_req};
3033

3134
#[derive(Clone)]
3235
pub struct RpcClient {
@@ -80,6 +83,20 @@ where
8083
}
8184
}
8285

86+
pub enum WhichTask {
87+
Wasm(WasmTask),
88+
Profiling(ProfilingTask),
89+
}
90+
91+
pub struct ProfilingTask {
92+
pub id: Option<String>,
93+
pub process: perf_event_rs::config::Process,
94+
pub mmap_pages: u64,
95+
pub overflow_by: OverflowBy,
96+
pub stack_depth: Option<u16>,
97+
pub end_time: DateTime<Utc>,
98+
}
99+
83100
impl RpcClient {
84101
pub async fn new(config: &RpcConfig, token: String) -> Result<Self> {
85102
let ep = Endpoint::from_shared(config.addr.clone())?
@@ -132,7 +149,7 @@ impl RpcClient {
132149
Ok(())
133150
}
134151

135-
pub async fn get_task(&mut self, instance_id: String) -> Result<Option<Task>> {
152+
pub async fn get_task(&mut self, instance_id: String) -> Result<Option<WhichTask>> {
136153
let get_task_req = GetTaskReq { instance_id };
137154
let token = &self.token;
138155

@@ -142,20 +159,41 @@ impl RpcClient {
142159
self.client.get_task(req).await
143160
})
144161
.await?;
145-
let task = match response.into_inner().task {
146-
Some(task) => task,
147-
None => return Ok(None),
162+
let Some(task): Option<psh_proto::Task> = response.into_inner().task else {
163+
return Ok(None);
148164
};
149165

150-
let end_time = match Utc.timestamp_millis_opt(task.end_time as _) {
151-
LocalResult::Single(t) => t,
152-
_ => bail!("Invalid task end time"),
166+
let LocalResult::Single(end_time) = Utc.timestamp_millis_opt(task.end_time as _) else {
167+
bail!("Invalid task end time")
153168
};
154-
let task = Task {
155-
id: Some(task.id),
156-
wasm_component: task.wasm,
157-
wasm_component_args: task.wasm_args,
158-
end_time,
169+
let Some(task_type) = task.task_type else {
170+
return Ok(None);
171+
};
172+
173+
let task = match task_type {
174+
TaskType::Profiling(profiling_task) => {
175+
let Some(process) = profiling_task.process else {
176+
return Ok(None);
177+
};
178+
let Some(overflow_by) = profiling_task.overflow_by else {
179+
return Ok(None);
180+
};
181+
let process: perf_event_rs::config::Process = process.into();
182+
WhichTask::Profiling(ProfilingTask {
183+
id: task.id.into(),
184+
process,
185+
mmap_pages: profiling_task.mmap_pages,
186+
overflow_by: overflow_by.into(),
187+
stack_depth: profiling_task.stack_depth.map(|v| v as _),
188+
end_time,
189+
})
190+
}
191+
TaskType::Wasm(wasm_task) => WhichTask::Wasm(WasmTask {
192+
id: Some(task.id),
193+
wasm_component: wasm_task.wasm,
194+
wasm_component_args: wasm_task.wasm_args,
195+
end_time,
196+
}),
159197
};
160198

161199
Ok(Some(task))

0 commit comments

Comments
 (0)