Skip to content

Commit a798896

Browse files
authored
Merge pull request #143 from Thaumy/main
Replace std MPSC channel with crossbeam `SegQueue` for `DataExporter`
2 parents be4df6e + 1d8c4f5 commit a798896

File tree

3 files changed

+44
-20
lines changed

3 files changed

+44
-20
lines changed

Cargo.lock

Lines changed: 23 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

Cargo.toml

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -49,6 +49,7 @@ tracing-subscriber = { workspace = true, features = ["env-filter"] }
4949
daemonize = { workspace = true }
5050
local-ip-address = { workspace = true }
5151
TinyUFO = { workspace = true }
52+
crossbeam = { workspace = true }
5253

5354
[build-dependencies]
5455
tonic-build = { workspace = true }
@@ -89,6 +90,7 @@ which = "^7"
8990
num_cpus = "^1"
9091
local-ip-address = "^0.6"
9192
TinyUFO = "0.4"
93+
crossbeam = "0.8"
9294

9395
[workspace.lints.rust]
9496

src/runtime/data_export.rs

Lines changed: 19 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -15,13 +15,13 @@
1515
use std::{
1616
sync::{
1717
atomic::{AtomicUsize, Ordering},
18-
mpsc::{channel, SendError, Sender, TryRecvError},
1918
Arc,
2019
},
2120
thread::{self, JoinHandle},
2221
};
2322

2423
use chrono::{TimeZone, Utc};
24+
use crossbeam::queue::SegQueue;
2525
use profiling::data_export::common::FieldValue as WitFieldValue;
2626
use profiling::data_export::measurement::Point;
2727
use profiling::data_export::metric::Sample;
@@ -49,7 +49,7 @@ wasmtime::component::bindgen!({
4949
pub struct DataExporter {
5050
bytes_len: Arc<AtomicUsize>,
5151
bytes_watermark: usize,
52-
data_tx: Option<Sender<Data>>,
52+
data_queue: Arc<SegQueue<Option<Data>>>,
5353
exporter: JoinHandle<()>,
5454
}
5555

@@ -63,22 +63,23 @@ impl DataExporter {
6363
// TODO: `bytes_capacity` is not used because we not have a static allocation
6464
// for `Data`, maybe we will remove this in the future
6565
let _ = bytes_capacity;
66-
let (data_tx, data_rx) = channel::<Data>();
66+
let data_queue = Arc::new(SegQueue::<Option<Data>>::new());
6767
let bytes_len = Arc::new(AtomicUsize::new(0));
6868

6969
let exporter = thread::spawn({
70+
let data_queue = Arc::clone(&data_queue);
7071
let bytes_len = Arc::clone(&bytes_len);
7172
move || {
7273
let rt = Runtime::new().expect("Failed to init exporter runtime");
7374
let mut data = Vec::new();
7475
loop {
75-
match data_rx.try_recv() {
76-
Ok(o) => {
76+
match data_queue.pop() {
77+
Some(Some(o)) => {
7778
// No critical section, relaxed ordering is fine.
7879
bytes_len.fetch_sub(o.encoded_len(), Ordering::Relaxed);
7980
data.push(o);
8081
}
81-
Err(e) => {
82+
poped => {
8283
if !data.is_empty() {
8384
let merged = ExportDataReq {
8485
task_id: task_id.clone(),
@@ -91,9 +92,10 @@ impl DataExporter {
9192
});
9293
data.clear();
9394
}
94-
match e {
95-
TryRecvError::Empty => thread::park(),
96-
TryRecvError::Disconnected => break,
95+
match poped {
96+
None => thread::park(),
97+
Some(None) => break,
98+
_ => unreachable!(),
9799
}
98100
}
99101
}
@@ -104,7 +106,7 @@ impl DataExporter {
104106
Self {
105107
bytes_len,
106108
bytes_watermark,
107-
data_tx: Some(data_tx),
109+
data_queue,
108110
exporter,
109111
}
110112
}
@@ -113,24 +115,21 @@ impl DataExporter {
113115
self.exporter.thread().unpark();
114116
}
115117

116-
pub fn schedule(&self, data: Data) -> Result<(), SendError<Data>> {
118+
pub fn schedule(&self, data: Data) {
117119
let encoded_len = data.encoded_len();
118-
// Never failes since we only take it in `Drop`.
119-
let data_tx = self.data_tx.as_ref().expect("unreachable");
120-
data_tx.send(data)?;
120+
self.data_queue.push(Some(data));
121121
// No critical section, relaxed ordering is fine.
122122
let prev = self.bytes_len.fetch_add(encoded_len, Ordering::Relaxed);
123123
if prev > self.bytes_watermark {
124124
self.exporter.thread().unpark();
125125
}
126-
Ok(())
127126
}
128127
}
129128

130129
impl Drop for DataExporter {
131130
fn drop(&mut self) {
132-
// Drop sender early to prevent the exporter thread from being parked forever.
133-
drop(self.data_tx.take());
131+
// Notify the consumer that there is no more data.
132+
self.data_queue.push(None);
134133
self.exporter.thread().unpark();
135134
}
136135
}
@@ -178,7 +177,7 @@ impl profiling::data_export::file::Host for DataExportCtx {
178177
ty: DataType::File as _,
179178
bytes,
180179
};
181-
ctx.exporter.schedule(data)?;
180+
ctx.exporter.schedule(data);
182181

183182
Ok(Ok(()))
184183
}
@@ -211,7 +210,7 @@ impl profiling::data_export::metric::Host for DataExportCtx {
211210
ty: DataType::LineProtocol as _,
212211
bytes,
213212
};
214-
ctx.exporter.schedule(data)?;
213+
ctx.exporter.schedule(data);
215214

216215
Ok(Ok(()))
217216
}
@@ -247,7 +246,7 @@ impl profiling::data_export::measurement::Host for DataExportCtx {
247246
ty: DataType::LineProtocol as _,
248247
bytes,
249248
};
250-
ctx.exporter.schedule(data)?;
249+
ctx.exporter.schedule(data);
251250

252251
Ok(Ok(()))
253252
}

0 commit comments

Comments
 (0)