diff --git a/Cargo.lock b/Cargo.lock index 0891299..1bcb8c6 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -695,6 +695,28 @@ dependencies = [ "cfg-if", ] +[[package]] +name = "crossbeam" +version = "0.8.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1137cd7e7fc0fb5d3c5a8678be38ec56e819125d8d7907411fe24ccb943faca8" +dependencies = [ + "crossbeam-channel", + "crossbeam-deque", + "crossbeam-epoch", + "crossbeam-queue", + "crossbeam-utils", +] + +[[package]] +name = "crossbeam-channel" +version = "0.5.14" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "06ba6d68e24814cb8de6bb986db8222d3a027d15872cabc0d18817bc3c0e4471" +dependencies = [ + "crossbeam-utils", +] + [[package]] name = "crossbeam-deque" version = "0.8.5" @@ -2391,6 +2413,7 @@ dependencies = [ "anyhow", "chrono", "clap", + "crossbeam", "daemonize", "host-op-perf", "host-op-system", diff --git a/Cargo.toml b/Cargo.toml index 03d6f2a..b399910 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -49,6 +49,7 @@ tracing-subscriber = { workspace = true, features = ["env-filter"] } daemonize = { workspace = true } local-ip-address = { workspace = true } TinyUFO = { workspace = true } +crossbeam = { workspace = true } [build-dependencies] tonic-build = { workspace = true } @@ -90,6 +91,7 @@ which = "^6" num_cpus = "^1" local-ip-address = "^0.6" TinyUFO = "0.4" +crossbeam = "0.8" [workspace.lints.rust] diff --git a/src/runtime/data_export.rs b/src/runtime/data_export.rs index f8732d1..289e1b7 100644 --- a/src/runtime/data_export.rs +++ b/src/runtime/data_export.rs @@ -15,13 +15,13 @@ use std::{ sync::{ atomic::{AtomicUsize, Ordering}, - mpsc::{channel, SendError, Sender, TryRecvError}, Arc, }, thread::{self, JoinHandle}, }; use chrono::{TimeZone, Utc}; +use crossbeam::queue::SegQueue; use profiling::data_export::common::FieldValue as WitFieldValue; use profiling::data_export::measurement::Point; use profiling::data_export::metric::Sample; @@ -49,7 +49,7 @@ wasmtime::component::bindgen!({ pub struct DataExporter { bytes_len: Arc, bytes_watermark: usize, - data_tx: Option>, + data_queue: Arc>>, exporter: JoinHandle<()>, } @@ -63,22 +63,23 @@ impl DataExporter { // TODO: `bytes_capacity` is not used because we not have a static allocation // for `Data`, maybe we will remove this in the future let _ = bytes_capacity; - let (data_tx, data_rx) = channel::(); + let data_queue = Arc::new(SegQueue::>::new()); let bytes_len = Arc::new(AtomicUsize::new(0)); let exporter = thread::spawn({ + let data_queue = Arc::clone(&data_queue); let bytes_len = Arc::clone(&bytes_len); move || { let rt = Runtime::new().expect("Failed to init exporter runtime"); let mut data = Vec::new(); loop { - match data_rx.try_recv() { - Ok(o) => { + match data_queue.pop() { + Some(Some(o)) => { // No critical section, relaxed ordering is fine. bytes_len.fetch_sub(o.encoded_len(), Ordering::Relaxed); data.push(o); } - Err(e) => { + poped => { if !data.is_empty() { let merged = ExportDataReq { task_id: task_id.clone(), @@ -91,9 +92,10 @@ impl DataExporter { }); data.clear(); } - match e { - TryRecvError::Empty => thread::park(), - TryRecvError::Disconnected => break, + match poped { + None => thread::park(), + Some(None) => break, + _ => unreachable!(), } } } @@ -104,7 +106,7 @@ impl DataExporter { Self { bytes_len, bytes_watermark, - data_tx: Some(data_tx), + data_queue, exporter, } } @@ -113,24 +115,21 @@ impl DataExporter { self.exporter.thread().unpark(); } - pub fn schedule(&self, data: Data) -> Result<(), SendError> { + pub fn schedule(&self, data: Data) { let encoded_len = data.encoded_len(); - // Never failes since we only take it in `Drop`. - let data_tx = self.data_tx.as_ref().expect("unreachable"); - data_tx.send(data)?; + self.data_queue.push(Some(data)); // No critical section, relaxed ordering is fine. let prev = self.bytes_len.fetch_add(encoded_len, Ordering::Relaxed); if prev > self.bytes_watermark { self.exporter.thread().unpark(); } - Ok(()) } } impl Drop for DataExporter { fn drop(&mut self) { - // Drop sender early to prevent the exporter thread from being parked forever. - drop(self.data_tx.take()); + // Notify the consumer that there is no more data. + self.data_queue.push(None); self.exporter.thread().unpark(); } } @@ -178,7 +177,7 @@ impl profiling::data_export::file::Host for DataExportCtx { ty: DataType::File as _, bytes, }; - ctx.exporter.schedule(data)?; + ctx.exporter.schedule(data); Ok(Ok(())) } @@ -211,7 +210,7 @@ impl profiling::data_export::metric::Host for DataExportCtx { ty: DataType::LineProtocol as _, bytes, }; - ctx.exporter.schedule(data)?; + ctx.exporter.schedule(data); Ok(Ok(())) } @@ -247,7 +246,7 @@ impl profiling::data_export::measurement::Host for DataExportCtx { ty: DataType::LineProtocol as _, bytes, }; - ctx.exporter.schedule(data)?; + ctx.exporter.schedule(data); Ok(Ok(())) }