diff --git a/opentelemetry-exporter-geneva/geneva-uploader/src/client.rs b/opentelemetry-exporter-geneva/geneva-uploader/src/client.rs index fddeba987..0c27c5ba8 100644 --- a/opentelemetry-exporter-geneva/geneva-uploader/src/client.rs +++ b/opentelemetry-exporter-geneva/geneva-uploader/src/client.rs @@ -103,4 +103,18 @@ impl GenevaClient { } Ok(()) } + + pub fn dump_upload_metrics(&self) { + let metrics = self.uploader.metrics(); + println!( + "Upload Metrics: Requests Count: {}, Average data size: {} bytes, Average response time: {}μs", + metrics.request_count(), + metrics.average_data_size(), + metrics.average_response_time_micros(), + ); + } + + pub fn ongoing_requests(&self) -> usize { + self.uploader.ongoing_requests() + } } diff --git a/opentelemetry-exporter-geneva/geneva-uploader/src/config_service/client.rs b/opentelemetry-exporter-geneva/geneva-uploader/src/config_service/client.rs index 123b814af..537c76480 100644 --- a/opentelemetry-exporter-geneva/geneva-uploader/src/config_service/client.rs +++ b/opentelemetry-exporter-geneva/geneva-uploader/src/config_service/client.rs @@ -507,11 +507,9 @@ fn extract_endpoint_from_token(token: &str) -> Result { }; // Decode the Base64-encoded payload into raw bytes - let decoded = general_purpose::URL_SAFE_NO_PAD - .decode(payload) - .map_err(|e| { - GenevaConfigClientError::JwtTokenError(format!("Failed to decode JWT: {e}")) - })?; + let decoded = general_purpose::STANDARD.decode(payload).map_err(|e| { + GenevaConfigClientError::JwtTokenError(format!("Failed to decode JWT: {e}")) + })?; // Convert the raw bytes into a UTF-8 string let decoded_str = String::from_utf8(decoded).map_err(|e| { diff --git a/opentelemetry-exporter-geneva/geneva-uploader/src/ingestion_service/uploader.rs b/opentelemetry-exporter-geneva/geneva-uploader/src/ingestion_service/uploader.rs index 83b4f3df5..9a8863148 100644 --- a/opentelemetry-exporter-geneva/geneva-uploader/src/ingestion_service/uploader.rs +++ b/opentelemetry-exporter-geneva/geneva-uploader/src/ingestion_service/uploader.rs @@ -14,6 +14,78 @@ use thiserror::Error; use url::form_urlencoded::byte_serialize; use uuid::Uuid; +use std::sync::atomic::{AtomicU64, AtomicUsize, Ordering}; + +// Metrics for tracking upload performance +#[derive(Debug, Default)] +pub struct UploadMetrics { + /// Total number of requests processed + request_count: AtomicUsize, + /// Total response time in microseconds + total_response_time_micros: AtomicU64, + /// Total data size processed in bytes + total_data_size: AtomicU64, + /// Ongoing requests + ongoing_requests: AtomicUsize, +} + +impl UploadMetrics { + /// Create new metrics instance + pub fn new() -> Self { + Self::default() + } + + /// Record a request with its response time and data size + fn record_request(&self, response_time_micros: u64, data_size: u64) { + self.request_count.fetch_add(1, Ordering::Relaxed); + self.total_response_time_micros + .fetch_add(response_time_micros, Ordering::Relaxed); + self.total_data_size.fetch_add(data_size, Ordering::Relaxed); + } + + /// Get the average response time in microseconds + pub fn average_response_time_micros(&self) -> f64 { + let count = self.request_count.load(Ordering::Relaxed); + if count == 0 { + 0.0 + } else { + self.total_response_time_micros.load(Ordering::Relaxed) as f64 / count as f64 + } + } + + /// Get the average response time in milliseconds + pub fn average_response_time_millis(&self) -> f64 { + self.average_response_time_micros() / 1000.0 + } + + /// Get the total number of requests processed + pub fn request_count(&self) -> usize { + self.request_count.load(Ordering::Relaxed) + } + + /// Get the average data size in bytes + pub fn average_data_size(&self) -> f64 { + let count = self.request_count.load(Ordering::Relaxed); + if count == 0 { + 0.0 + } else { + self.total_data_size.load(Ordering::Relaxed) as f64 / count as f64 + } + } + + /// Get the total data size processed in bytes + pub fn total_data_size(&self) -> u64 { + self.total_data_size.load(Ordering::Relaxed) + } + + /// Reset all metrics + pub fn reset(&self) { + self.request_count.store(0, Ordering::Relaxed); + self.total_response_time_micros.store(0, Ordering::Relaxed); + self.total_data_size.store(0, Ordering::Relaxed); + } +} + /// Error types for the Geneva Uploader #[derive(Debug, Error)] pub(crate) enum GenevaUploaderError { @@ -115,6 +187,7 @@ pub struct GenevaUploader { pub config_client: Arc, pub config: GenevaUploaderConfig, pub http_client: Client, + pub metrics: Arc, } impl GenevaUploader { @@ -137,7 +210,7 @@ impl GenevaUploader { header::HeaderValue::from_static("application/json"), ); let http_client = Client::builder() - .timeout(Duration::from_secs(30)) + .timeout(Duration::from_secs(100)) .default_headers(headers) .build()?; @@ -145,9 +218,18 @@ impl GenevaUploader { config_client, config: uploader_config, http_client, + metrics: Arc::new(UploadMetrics::new()), }) } + pub(crate) fn metrics(&self) -> &Arc { + &self.metrics + } + + pub(crate) fn ongoing_requests(&self) -> usize { + self.metrics.ongoing_requests.load(Ordering::Relaxed) + } + /// Creates the GIG upload URI with required parameters #[allow(dead_code)] fn create_upload_uri( @@ -246,6 +328,12 @@ impl GenevaUploader { upload_uri ); // Send the upload request + use std::time::Instant; + //println!("Uploading data to: {}", full_url); + let start = Instant::now(); + self.metrics + .ongoing_requests + .fetch_add(1, Ordering::Relaxed); // Increment ongoing requests let response = self .http_client .post(&full_url) @@ -256,6 +344,15 @@ impl GenevaUploader { .body(data) .send() .await?; + self.metrics + .ongoing_requests + .fetch_sub(1, Ordering::Relaxed); // Decrement ongoing requests + let elapsed = start.elapsed(); + self.metrics + .record_request(elapsed.as_micros() as u64, data_size as u64); // ADD THIS LINE + + //println!("Request completed in: {:.2}μs data_size: {}", elapsed.as_micros(), data_size); + let status = response.status(); let body = response.text().await?; diff --git a/stress/src/async_throughput.rs b/stress/src/async_throughput.rs index 57e90a4f3..9f3be9a6e 100644 --- a/stress/src/async_throughput.rs +++ b/stress/src/async_throughput.rs @@ -45,6 +45,10 @@ pub struct ThroughputConfig { pub target_ops: Option, /// Whether to use task spawning (for multi-thread runtime) pub use_spawn: bool, + /// Optional shutdown callback - called when continuous test is interrupted + pub shutdown_callback: Option>, + /// Optional callback to get ongoing requests count + pub ongoing_requests_callback: Option usize + Send + Sync + 'static>>, } impl Default for ThroughputConfig { @@ -54,6 +58,8 @@ impl Default for ThroughputConfig { report_interval: Duration::from_secs(5), target_ops: None, use_spawn: true, + shutdown_callback: None, + ongoing_requests_callback: None, } } } @@ -65,7 +71,7 @@ impl ThroughputTest { /// Run a continuous throughput test (until interrupted) pub async fn run_continuous( name: &str, - config: ThroughputConfig, + mut config: ThroughputConfig, operation_factory: F, ) where F: Fn() -> Fut + Send + 'static, @@ -81,11 +87,33 @@ impl ThroughputTest { let completed_ops = Arc::new(AtomicU64::new(0)); let successful_ops = Arc::new(AtomicU64::new(0)); + // Set up Ctrl+C handler + let shutdown_callback = config.shutdown_callback.take(); + let (shutdown_tx, mut shutdown_rx) = tokio::sync::oneshot::channel::<()>(); + + tokio::spawn(async move { + match tokio::signal::ctrl_c().await { + Ok(()) => { + println!("\nReceived Ctrl+C signal..."); + // Call the shutdown callback if provided + if let Some(callback) = shutdown_callback { + callback(); + } + let _ = shutdown_tx.send(()); + } + Err(err) => { + eprintln!("Unable to listen for shutdown signal: {}", err); + let _ = shutdown_tx.send(()); + } + } + }); + // Clone for reporting thread let completed_clone = Arc::clone(&completed_ops); let successful_clone = Arc::clone(&successful_ops); let start_time = Instant::now(); let report_interval = config.report_interval; + let ongoing_requests_fn = config.ongoing_requests_callback; // Spawn reporting thread let _reporting_handle = std::thread::spawn(move || { @@ -106,14 +134,16 @@ impl ThroughputTest { } else { 0.0 }; + let ongoing_requests = ongoing_requests_fn.as_ref().map(|f| f()).unwrap_or(0); println!( - "Progress: {} ops completed ({} successful, {:.1}%) in {:.2}s = {:.2} ops/sec", + "Progress: {} ops completed ({} successful, {:.1}%) in {:.2}s = {:.2} ops/sec | Ongoing: {}", ops, success, success_percentage, elapsed.as_secs_f64(), - throughput + throughput, + ongoing_requests ); #[cfg(feature = "stats")] { @@ -140,17 +170,25 @@ impl ThroughputTest { .buffer_unordered(config.concurrency); // Process stream until interrupted - while let Some(result) = operation_stream.next().await { - completed_ops.fetch_add(1, Ordering::Relaxed); - match result { - Ok(Ok(_)) => { - successful_ops.fetch_add(1, Ordering::Relaxed); - } - Ok(Err(e)) => { - eprintln!("Operation failed: {e}"); + loop { + tokio::select! { + Some(result) = operation_stream.next() => { + completed_ops.fetch_add(1, Ordering::Relaxed); + match result { + Ok(Ok(_)) => { + successful_ops.fetch_add(1, Ordering::Relaxed); + } + Ok(Err(e)) => { + eprintln!("Operation failed: {e}"); + } + Err(e) => { + eprintln!("Task join error: {e}"); + } + } } - Err(e) => { - eprintln!("Task join error: {e}"); + _ = &mut shutdown_rx => { + println!("Shutting down gracefully..."); + break; } } } @@ -161,14 +199,22 @@ impl ThroughputTest { .buffer_unordered(config.concurrency); // Process stream until interrupted - while let Some(result) = operation_stream.next().await { - completed_ops.fetch_add(1, Ordering::Relaxed); - match result { - Ok(_) => { - successful_ops.fetch_add(1, Ordering::Relaxed); + loop { + tokio::select! { + Some(result) = operation_stream.next() => { + completed_ops.fetch_add(1, Ordering::Relaxed); + match result { + Ok(_) => { + successful_ops.fetch_add(1, Ordering::Relaxed); + } + Err(e) => { + eprintln!("Operation failed: {e}"); + } + } } - Err(e) => { - eprintln!("Operation failed: {e}"); + _ = &mut shutdown_rx => { + println!("Shutting down gracefully..."); + break; } } } diff --git a/stress/src/geneva_exporter.rs b/stress/src/geneva_exporter.rs index 4b944887b..c63d2c4d4 100644 --- a/stress/src/geneva_exporter.rs +++ b/stress/src/geneva_exporter.rs @@ -52,7 +52,7 @@ fn create_test_logs() -> Vec { let mut log_records = Vec::new(); // Create 10 simple log records - for i in 0..10 { + for i in 0..200000 { let log = LogRecord { observed_time_unix_nano: 1700000000000000000 + i, event_name: "StressTestEvent".to_string(), @@ -276,12 +276,22 @@ async fn async_main( match mode { "continuous" => { + let client_for_callback = Arc::clone(&client); + let client_for_ongoing = Arc::clone(&client); + // Run continuous test let config = ThroughputConfig { concurrency, report_interval: std::time::Duration::from_secs(5), target_ops: None, use_spawn: runtime_type != "current", // Use task spawning for multi-thread runtime + shutdown_callback: Some(Box::new(move || { + client_for_callback.dump_upload_metrics(); // Print upload metrics on shutdown + println!("\nContinuous test stopped. Metrics printed."); + })), + ongoing_requests_callback: Some(Box::new(move || { + client_for_ongoing.ongoing_requests() + })), }; ThroughputTest::run_continuous("Geneva Upload", config, move || { @@ -294,7 +304,7 @@ async fn async_main( "fixed" => { // Run fixed test let target = args - .get(3) + .get(args_start_idx + 2) .and_then(|s| s.parse::().ok()) .unwrap_or(10_000); @@ -304,6 +314,7 @@ async fn async_main( use_spawn: runtime_type != "current", // Use task spawning for multi-thread runtime ..Default::default() }; + let client_for_metrics = Arc::clone(&client); let stats = ThroughputTest::run_fixed("Geneva Upload", config, move || { let client = client.clone(); @@ -313,6 +324,8 @@ async fn async_main( .await; stats.print("Final Results"); + client_for_metrics.dump_upload_metrics(); // Print upload metrics + println!("\nTest completed successfully."); } _ => { // Default: Run comparison test