Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 6 additions & 2 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

8 changes: 5 additions & 3 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ chrono = { workspace = true }
clap = { workspace = true, features = ["derive", "wrap_help"] }
tonic = { workspace = true }
prost = { workspace = true }
tokio = { workspace = true, features = ["macros", "rt-multi-thread"] }
tokio = { workspace = true, features = ["macros", "rt-multi-thread", "fs"] }
nix = { workspace = true, features = ["user", "hostname"] }
wasmtime = { workspace = true }
wasmtime-wasi = { workspace = true }
Expand Down Expand Up @@ -50,6 +50,8 @@ influxdb-line-protocol = { workspace = true }
psh-proto = { workspace = true }
mimalloc = { workspace = true }
nvml-wrapper = { workspace = true }
perf-event-rs = { workspace = true }
num_cpus = { workspace = true }

[lints]
workspace = true
Expand All @@ -58,7 +60,7 @@ workspace = true
host-op-perf = { path = "crates/op/host-op-perf" }
host-op-system = { path = "crates/op/host-op-system" }
psh-system = { path = "crates/psh-system" }
perf-event-rs = { git = "https://github.com/OptimatistOpenSource/perf-event-rs.git", rev = "423ca26f53b27193d2321028dae5fd362a9673e9" }
perf-event-rs = { git = "https://github.com/OptimatistOpenSource/perf-event-rs.git", rev = "d6881f34b8a9cde1d70dab5fb1415271e6b0bb25" }
tokio = "^1"
libc = "^0.2"
chrono = "^0.4"
Expand Down Expand Up @@ -87,7 +89,7 @@ local-ip-address = "^0.6"
TinyUFO = "0.4"
crossbeam = "0.8"
influxdb-line-protocol = "2"
psh-proto = { git = "https://github.com/OptimatistOpenSource/psh-proto.git", rev = "ca2919053029cb584b478611f8bf8496bf3cf7f7" }
psh-proto = { git = "https://github.com/OptimatistOpenSource/psh-proto.git", rev = "c82356c52925ac0abf9cd93c7245c3518d78e096" }
mimalloc = "0.1"
nvml-wrapper = "0.10.0"

Expand Down
83 changes: 72 additions & 11 deletions src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,11 +32,16 @@ use log::log_init;
use mimalloc::MiMalloc;
use nix::unistd::geteuid;
use opentelemetry_otlp::ExportConfig;
use psh_proto::HeartbeatReq;
use runtime::{Task, TaskRuntime};
use psh_proto::{
HeartbeatReq, PerfDataProto,
export_data_req::{Data, data::DataType},
};
use runtime::{TaskRuntime, WasmTask};
use services::rpc::RpcClient;
use tokio::{runtime::Runtime, try_join};

use self::services::{rpc::WhichTask, sampling::Profiler};

#[global_allocator]
static GLOBAL: MiMalloc = mimalloc::MiMalloc;

Expand Down Expand Up @@ -86,7 +91,7 @@ fn main() -> Result<()> {
let task_rt = TaskRuntime::new()?;

if let Some(args) = wasm_with_args {
let task = Task {
let task = WasmTask {
id: None,
wasm_component: fs::read(&args[0])?,
wasm_component_args: args,
Expand Down Expand Up @@ -143,14 +148,70 @@ async fn async_tasks(remote_cfg: RemoteConfig, mut task_rt: TaskRuntime) -> Resu
loop {
let idle = task_rt.is_idle();
if idle {
if let Some(mut task) = client.get_task(instance_id.clone()).await? {
let task_id = task
.id
.as_ref()
.map(|it| it.to_string())
.expect("No task id provided");
task.wasm_component_args.insert(0, task_id);
task_rt.schedule(task)?
if let Some(task) = client.get_task(instance_id.clone()).await? {
match task {
WhichTask::Wasm(mut task) => {
let task_id = task
.id
.as_ref()
.map(|it| it.to_string())
.expect("No task id provided");
task.wasm_component_args.insert(0, task_id);
task_rt.schedule(task)?;
}
WhichTask::Profiling(profiling_task) => {
let mut profiler = Profiler::new(
profiling_task.process,
profiling_task.mmap_pages as _,
profiling_task.overflow_by,
profiling_task.stack_depth,
)?;
let task_time_slice = {
let delta = profiling_task.end_time.timestamp_millis()
- Utc::now().timestamp_millis();
delta.max(0) as u64
};

let perf_data = tokio::task::spawn_blocking(
move || -> anyhow::Result<PerfDataProto> {
profiler.enable()?;
std::thread::sleep(Duration::from_millis(task_time_slice));
profiler.disable()?;
Ok(profiler.perf_data_proto())
},
)
.await??;
if let Some(task_id) = profiling_task.id {
let mut data = vec![];
for ele in &perf_data.events {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ele -> event?

let Some(event_type) = &ele.event_type else {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

读取ELF文件为什么需要event_type? 这个不是强关联吧?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

文件名是在MmapEvent里面的,所以需要从里面获取文件名。

continue;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

这些地方需要打印错误日志,帮助后续的排查。默认静默掉这些错误,排查起来比较麻烦,可能是网络问题,也可能是参数错误。

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

event_type 在proto里面是用 oneof定义, oneof event_type {,使用oneof就会生成一个option的字段,这里没有就跳过是合适的。

};

if let psh_proto::perf_data_proto::perf_event::EventType::MmapEvent(event)= event_type{
let Some(filename) = &event.filename else {
continue;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

同上

};

let data_type = DataType::ElfFile(psh_proto::ElfFile {
filename: filename.to_owned(),
build_id: event.build_id.clone(),
arch: std::env::consts::ARCH.to_string(),
bytes: tokio::fs::read(filename).await?
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

应该是先做一些校验,然后在判断是不是要读取文件内容,而不是直接读取。比如文件不存在怎么处理, build_id不一致怎么处理,用户是否允许读取某个文件/目录等等。

});
data.push(Data { data_type: Some(data_type) });
}
}
let dat = Data {
data_type: Some(DataType::PerfData(perf_data)),
};
data.push(dat);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

采样和上传ELF文件两个可以分开,放在一起反而不方便。不是所有的ELF文件都需要上传。

client
.export_data(psh_proto::ExportDataReq { task_id, data })
.await?;
}
}
};
}
}

Expand Down
14 changes: 7 additions & 7 deletions src/runtime/data_export.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,10 @@ use profiling::data_export::{
common::FieldValue as WitFieldValue, measurement::Point, metric::Sample,
};
use prost::Message;
use psh_proto::{Data, DataType, ExportDataReq};
use psh_proto::{
ExportDataReq, LineProtocolData,
export_data_req::{Data, data::DataType},
};
use wasmtime::component::Linker;

use crate::{TOKIO_RUNTIME, services::rpc::RpcClient};
Expand Down Expand Up @@ -172,8 +175,7 @@ impl profiling::data_export::file::Host for DataExportCtx {
};

let data = Data {
ty: DataType::File as _,
bytes,
data_type: Some(DataType::File(psh_proto::FileData { bytes })),
};
ctx.exporter.schedule(data);

Expand Down Expand Up @@ -201,8 +203,7 @@ impl profiling::data_export::metric::Host for DataExportCtx {
};

let data = Data {
ty: DataType::LineProtocol as _,
bytes,
data_type: Some(DataType::LineProtocol(LineProtocolData { bytes })),
};
ctx.exporter.schedule(data);

Expand Down Expand Up @@ -238,8 +239,7 @@ impl profiling::data_export::measurement::Host for DataExportCtx {
};

let data = Data {
ty: DataType::LineProtocol as _,
bytes,
data_type: Some(DataType::LineProtocol(LineProtocolData { bytes })),
};
ctx.exporter.schedule(data);

Expand Down
9 changes: 5 additions & 4 deletions src/runtime/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -39,16 +39,17 @@ pub use state::PshState;

use crate::services::rpc::RpcClient;

pub struct Task {
#[derive(Clone, Debug, PartialEq, Eq, PartialOrd, Ord)]
pub struct WasmTask {
pub id: Option<String>,
pub wasm_component: Vec<u8>,
pub wasm_component_args: Vec<String>,
pub end_time: DateTime<Utc>,
}

pub struct TaskRuntime {
tx: Sender<Task>,
rx: Option<Receiver<Task>>,
tx: Sender<WasmTask>,
rx: Option<Receiver<WasmTask>>,
len: Arc<AtomicUsize>,
finished_task_id: Arc<Mutex<Vec<String>>>,
}
Expand All @@ -65,7 +66,7 @@ impl TaskRuntime {
})
}

pub fn schedule(&self, task: Task) -> Result<()> {
pub fn schedule(&self, task: WasmTask) -> Result<()> {
self.len.fetch_add(1, Ordering::Release);
self.tx.send(task)?;
Ok(())
Expand Down
1 change: 1 addition & 0 deletions src/services/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,3 +14,4 @@

pub mod host_info;
pub mod rpc;
pub mod sampling;
64 changes: 51 additions & 13 deletions src/services/rpc.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,10 @@
// see <https://www.gnu.org/licenses/>.

use anyhow::{Result, bail};
use chrono::DateTime;
use chrono::{TimeZone, Utc, offset::LocalResult};
use perf_event_rs::sampling::OverflowBy;
use psh_proto::task::TaskType;
use psh_proto::{
ExportDataReq, GetTaskReq, HeartbeatReq, TaskDoneReq, Unit,
psh_service_client::PshServiceClient,
Expand All @@ -26,7 +29,7 @@ use tonic::{
transport::{Channel, ClientTlsConfig, Endpoint},
};

use crate::{config::RpcConfig, runtime::Task, services::host_info::new_info_req};
use crate::{config::RpcConfig, runtime::WasmTask, services::host_info::new_info_req};

#[derive(Clone)]
pub struct RpcClient {
Expand Down Expand Up @@ -80,6 +83,20 @@ where
}
}

pub enum WhichTask {
Wasm(WasmTask),
Profiling(ProfilingTask),
}

pub struct ProfilingTask {
pub id: Option<String>,
pub process: perf_event_rs::config::Process,
pub mmap_pages: u64,
pub overflow_by: OverflowBy,
pub stack_depth: Option<u16>,
pub end_time: DateTime<Utc>,
}

impl RpcClient {
pub async fn new(config: &RpcConfig, token: String) -> Result<Self> {
let ep = Endpoint::from_shared(config.addr.clone())?
Expand Down Expand Up @@ -132,7 +149,7 @@ impl RpcClient {
Ok(())
}

pub async fn get_task(&mut self, instance_id: String) -> Result<Option<Task>> {
pub async fn get_task(&mut self, instance_id: String) -> Result<Option<WhichTask>> {
let get_task_req = GetTaskReq { instance_id };
let token = &self.token;

Expand All @@ -142,20 +159,41 @@ impl RpcClient {
self.client.get_task(req).await
})
.await?;
let task = match response.into_inner().task {
Some(task) => task,
None => return Ok(None),
let Some(task): Option<psh_proto::Task> = response.into_inner().task else {
return Ok(None);
};

let end_time = match Utc.timestamp_millis_opt(task.end_time as _) {
LocalResult::Single(t) => t,
_ => bail!("Invalid task end time"),
let LocalResult::Single(end_time) = Utc.timestamp_millis_opt(task.end_time as _) else {
bail!("Invalid task end time")
};
let task = Task {
id: Some(task.id),
wasm_component: task.wasm,
wasm_component_args: task.wasm_args,
end_time,
let Some(task_type) = task.task_type else {
return Ok(None);
};

let task = match task_type {
TaskType::Profiling(profiling_task) => {
let Some(process) = profiling_task.process else {
return Ok(None);
};
let Some(overflow_by) = profiling_task.overflow_by else {
return Ok(None);
};
let process: perf_event_rs::config::Process = process.into();
WhichTask::Profiling(ProfilingTask {
id: task.id.into(),
process,
mmap_pages: profiling_task.mmap_pages,
overflow_by: overflow_by.into(),
stack_depth: profiling_task.stack_depth.map(|v| v as _),
end_time,
})
}
TaskType::Wasm(wasm_task) => WhichTask::Wasm(WasmTask {
id: Some(task.id),
wasm_component: wasm_task.wasm,
wasm_component_args: wasm_task.wasm_args,
end_time,
}),
};

Ok(Some(task))
Expand Down
Loading
Loading