Skip to content

intiai #4

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 1 commit into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 14 additions & 0 deletions opentelemetry-exporter-geneva/geneva-uploader/src/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -103,4 +103,18 @@ impl GenevaClient {
}
Ok(())
}

pub fn dump_upload_metrics(&self) {
let metrics = self.uploader.metrics();
println!(
"Upload Metrics: Requests Count: {}, Average data size: {} bytes, Average response time: {}μs",
metrics.request_count(),
metrics.average_data_size(),
metrics.average_response_time_micros(),
);
}

pub fn ongoing_requests(&self) -> usize {
self.uploader.ongoing_requests()
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -507,11 +507,9 @@ fn extract_endpoint_from_token(token: &str) -> Result<String> {
};

// Decode the Base64-encoded payload into raw bytes
let decoded = general_purpose::URL_SAFE_NO_PAD
.decode(payload)
.map_err(|e| {
GenevaConfigClientError::JwtTokenError(format!("Failed to decode JWT: {e}"))
})?;
let decoded = general_purpose::STANDARD.decode(payload).map_err(|e| {
GenevaConfigClientError::JwtTokenError(format!("Failed to decode JWT: {e}"))
})?;

// Convert the raw bytes into a UTF-8 string
let decoded_str = String::from_utf8(decoded).map_err(|e| {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,78 @@ use thiserror::Error;
use url::form_urlencoded::byte_serialize;
use uuid::Uuid;

use std::sync::atomic::{AtomicU64, AtomicUsize, Ordering};

// Metrics for tracking upload performance
#[derive(Debug, Default)]
pub struct UploadMetrics {
/// Total number of requests processed
request_count: AtomicUsize,
/// Total response time in microseconds
total_response_time_micros: AtomicU64,
/// Total data size processed in bytes
total_data_size: AtomicU64,
/// Ongoing requests
ongoing_requests: AtomicUsize,
}

impl UploadMetrics {
/// Create new metrics instance
pub fn new() -> Self {
Self::default()
}

/// Record a request with its response time and data size
fn record_request(&self, response_time_micros: u64, data_size: u64) {
self.request_count.fetch_add(1, Ordering::Relaxed);
self.total_response_time_micros
.fetch_add(response_time_micros, Ordering::Relaxed);
self.total_data_size.fetch_add(data_size, Ordering::Relaxed);
}

/// Get the average response time in microseconds
pub fn average_response_time_micros(&self) -> f64 {
let count = self.request_count.load(Ordering::Relaxed);
if count == 0 {
0.0
} else {
self.total_response_time_micros.load(Ordering::Relaxed) as f64 / count as f64
}
}

/// Get the average response time in milliseconds
pub fn average_response_time_millis(&self) -> f64 {
self.average_response_time_micros() / 1000.0
}

/// Get the total number of requests processed
pub fn request_count(&self) -> usize {
self.request_count.load(Ordering::Relaxed)
}

/// Get the average data size in bytes
pub fn average_data_size(&self) -> f64 {
let count = self.request_count.load(Ordering::Relaxed);
if count == 0 {
0.0
} else {
self.total_data_size.load(Ordering::Relaxed) as f64 / count as f64
}
}

/// Get the total data size processed in bytes
pub fn total_data_size(&self) -> u64 {
self.total_data_size.load(Ordering::Relaxed)
}

/// Reset all metrics
pub fn reset(&self) {
self.request_count.store(0, Ordering::Relaxed);
self.total_response_time_micros.store(0, Ordering::Relaxed);
self.total_data_size.store(0, Ordering::Relaxed);
}
}

/// Error types for the Geneva Uploader
#[derive(Debug, Error)]
pub(crate) enum GenevaUploaderError {
Expand Down Expand Up @@ -115,6 +187,7 @@ pub struct GenevaUploader {
pub config_client: Arc<GenevaConfigClient>,
pub config: GenevaUploaderConfig,
pub http_client: Client,
pub metrics: Arc<UploadMetrics>,
}

impl GenevaUploader {
Expand All @@ -137,17 +210,26 @@ impl GenevaUploader {
header::HeaderValue::from_static("application/json"),
);
let http_client = Client::builder()
.timeout(Duration::from_secs(30))
.timeout(Duration::from_secs(100))
.default_headers(headers)
.build()?;

Ok(Self {
config_client,
config: uploader_config,
http_client,
metrics: Arc::new(UploadMetrics::new()),
})
}

pub(crate) fn metrics(&self) -> &Arc<UploadMetrics> {
&self.metrics
}

pub(crate) fn ongoing_requests(&self) -> usize {
self.metrics.ongoing_requests.load(Ordering::Relaxed)
}

/// Creates the GIG upload URI with required parameters
#[allow(dead_code)]
fn create_upload_uri(
Expand Down Expand Up @@ -246,6 +328,12 @@ impl GenevaUploader {
upload_uri
);
// Send the upload request
use std::time::Instant;
//println!("Uploading data to: {}", full_url);
let start = Instant::now();
self.metrics
.ongoing_requests
.fetch_add(1, Ordering::Relaxed); // Increment ongoing requests
let response = self
.http_client
.post(&full_url)
Expand All @@ -256,6 +344,15 @@ impl GenevaUploader {
.body(data)
.send()
.await?;
self.metrics
.ongoing_requests
.fetch_sub(1, Ordering::Relaxed); // Decrement ongoing requests
let elapsed = start.elapsed();
self.metrics
.record_request(elapsed.as_micros() as u64, data_size as u64); // ADD THIS LINE

//println!("Request completed in: {:.2}μs data_size: {}", elapsed.as_micros(), data_size);

let status = response.status();
let body = response.text().await?;

Expand Down
86 changes: 66 additions & 20 deletions stress/src/async_throughput.rs
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,10 @@ pub struct ThroughputConfig {
pub target_ops: Option<usize>,
/// Whether to use task spawning (for multi-thread runtime)
pub use_spawn: bool,
/// Optional shutdown callback - called when continuous test is interrupted
pub shutdown_callback: Option<Box<dyn FnOnce() + Send + 'static>>,
/// Optional callback to get ongoing requests count
pub ongoing_requests_callback: Option<Box<dyn Fn() -> usize + Send + Sync + 'static>>,
}

impl Default for ThroughputConfig {
Expand All @@ -54,6 +58,8 @@ impl Default for ThroughputConfig {
report_interval: Duration::from_secs(5),
target_ops: None,
use_spawn: true,
shutdown_callback: None,
ongoing_requests_callback: None,
}
}
}
Expand All @@ -65,7 +71,7 @@ impl ThroughputTest {
/// Run a continuous throughput test (until interrupted)
pub async fn run_continuous<F, Fut, T, E>(
name: &str,
config: ThroughputConfig,
mut config: ThroughputConfig,
operation_factory: F,
) where
F: Fn() -> Fut + Send + 'static,
Expand All @@ -81,11 +87,33 @@ impl ThroughputTest {
let completed_ops = Arc::new(AtomicU64::new(0));
let successful_ops = Arc::new(AtomicU64::new(0));

// Set up Ctrl+C handler
let shutdown_callback = config.shutdown_callback.take();
let (shutdown_tx, mut shutdown_rx) = tokio::sync::oneshot::channel::<()>();

tokio::spawn(async move {
match tokio::signal::ctrl_c().await {
Ok(()) => {
println!("\nReceived Ctrl+C signal...");
// Call the shutdown callback if provided
if let Some(callback) = shutdown_callback {
callback();
}
let _ = shutdown_tx.send(());
}
Err(err) => {
eprintln!("Unable to listen for shutdown signal: {}", err);
let _ = shutdown_tx.send(());
}
}
});

// Clone for reporting thread
let completed_clone = Arc::clone(&completed_ops);
let successful_clone = Arc::clone(&successful_ops);
let start_time = Instant::now();
let report_interval = config.report_interval;
let ongoing_requests_fn = config.ongoing_requests_callback;

// Spawn reporting thread
let _reporting_handle = std::thread::spawn(move || {
Expand All @@ -106,14 +134,16 @@ impl ThroughputTest {
} else {
0.0
};
let ongoing_requests = ongoing_requests_fn.as_ref().map(|f| f()).unwrap_or(0);

println!(
"Progress: {} ops completed ({} successful, {:.1}%) in {:.2}s = {:.2} ops/sec",
"Progress: {} ops completed ({} successful, {:.1}%) in {:.2}s = {:.2} ops/sec | Ongoing: {}",
ops,
success,
success_percentage,
elapsed.as_secs_f64(),
throughput
throughput,
ongoing_requests
);
#[cfg(feature = "stats")]
{
Expand All @@ -140,17 +170,25 @@ impl ThroughputTest {
.buffer_unordered(config.concurrency);

// Process stream until interrupted
while let Some(result) = operation_stream.next().await {
completed_ops.fetch_add(1, Ordering::Relaxed);
match result {
Ok(Ok(_)) => {
successful_ops.fetch_add(1, Ordering::Relaxed);
}
Ok(Err(e)) => {
eprintln!("Operation failed: {e}");
loop {
tokio::select! {
Some(result) = operation_stream.next() => {
completed_ops.fetch_add(1, Ordering::Relaxed);
match result {
Ok(Ok(_)) => {
successful_ops.fetch_add(1, Ordering::Relaxed);
}
Ok(Err(e)) => {
eprintln!("Operation failed: {e}");
}
Err(e) => {
eprintln!("Task join error: {e}");
}
}
}
Err(e) => {
eprintln!("Task join error: {e}");
_ = &mut shutdown_rx => {
println!("Shutting down gracefully...");
break;
}
}
}
Expand All @@ -161,14 +199,22 @@ impl ThroughputTest {
.buffer_unordered(config.concurrency);

// Process stream until interrupted
while let Some(result) = operation_stream.next().await {
completed_ops.fetch_add(1, Ordering::Relaxed);
match result {
Ok(_) => {
successful_ops.fetch_add(1, Ordering::Relaxed);
loop {
tokio::select! {
Some(result) = operation_stream.next() => {
completed_ops.fetch_add(1, Ordering::Relaxed);
match result {
Ok(_) => {
successful_ops.fetch_add(1, Ordering::Relaxed);
}
Err(e) => {
eprintln!("Operation failed: {e}");
}
}
}
Err(e) => {
eprintln!("Operation failed: {e}");
_ = &mut shutdown_rx => {
println!("Shutting down gracefully...");
break;
}
}
}
Expand Down
17 changes: 15 additions & 2 deletions stress/src/geneva_exporter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ fn create_test_logs() -> Vec<ResourceLogs> {
let mut log_records = Vec::new();

// Create 10 simple log records
for i in 0..10 {
for i in 0..200000 {
let log = LogRecord {
observed_time_unix_nano: 1700000000000000000 + i,
event_name: "StressTestEvent".to_string(),
Expand Down Expand Up @@ -276,12 +276,22 @@ async fn async_main(

match mode {
"continuous" => {
let client_for_callback = Arc::clone(&client);
let client_for_ongoing = Arc::clone(&client);

// Run continuous test
let config = ThroughputConfig {
concurrency,
report_interval: std::time::Duration::from_secs(5),
target_ops: None,
use_spawn: runtime_type != "current", // Use task spawning for multi-thread runtime
shutdown_callback: Some(Box::new(move || {
client_for_callback.dump_upload_metrics(); // Print upload metrics on shutdown
println!("\nContinuous test stopped. Metrics printed.");
})),
ongoing_requests_callback: Some(Box::new(move || {
client_for_ongoing.ongoing_requests()
})),
};

ThroughputTest::run_continuous("Geneva Upload", config, move || {
Expand All @@ -294,7 +304,7 @@ async fn async_main(
"fixed" => {
// Run fixed test
let target = args
.get(3)
.get(args_start_idx + 2)
.and_then(|s| s.parse::<usize>().ok())
.unwrap_or(10_000);

Expand All @@ -304,6 +314,7 @@ async fn async_main(
use_spawn: runtime_type != "current", // Use task spawning for multi-thread runtime
..Default::default()
};
let client_for_metrics = Arc::clone(&client);

let stats = ThroughputTest::run_fixed("Geneva Upload", config, move || {
let client = client.clone();
Expand All @@ -313,6 +324,8 @@ async fn async_main(
.await;

stats.print("Final Results");
client_for_metrics.dump_upload_metrics(); // Print upload metrics
println!("\nTest completed successfully.");
}
_ => {
// Default: Run comparison test
Expand Down
Loading