From ac0c3bd2cb462458c0b0b7aabd38320b0f60a4a4 Mon Sep 17 00:00:00 2001 From: paullegranddc Date: Tue, 22 Apr 2025 17:40:00 +0200 Subject: [PATCH 01/26] feat: add a readable span interface Currently there is no way to read the span data without cloning it. This causes performance issues when the span for span processors that need to read the span data. This commit introduces a new trait `ReadableSpan` in the sdk implemented by SDK Spans that allows to read the span data without cloning it. --- .../tracing-http-propagator/src/server.rs | 69 +++- opentelemetry-sdk/src/trace/mod.rs | 4 +- opentelemetry-sdk/src/trace/provider.rs | 8 +- opentelemetry-sdk/src/trace/span.rs | 315 ++++++++++++++++-- opentelemetry-sdk/src/trace/span_processor.rs | 43 +-- .../span_processor_with_async_runtime.rs | 7 +- stress/src/traces.rs | 4 +- 7 files changed, 396 insertions(+), 54 deletions(-) diff --git a/examples/tracing-http-propagator/src/server.rs b/examples/tracing-http-propagator/src/server.rs index a7aa09aac9..08f0ab6d9f 100644 --- a/examples/tracing-http-propagator/src/server.rs +++ b/examples/tracing-http-propagator/src/server.rs @@ -15,11 +15,16 @@ use opentelemetry_sdk::{ error::OTelSdkResult, logs::{LogProcessor, SdkLogRecord, SdkLoggerProvider}, propagation::{BaggagePropagator, TraceContextPropagator}, - trace::{SdkTracerProvider, SpanProcessor}, + trace::{FinishedSpan, ReadableSpan, SdkTracerProvider, SpanProcessor}, }; use opentelemetry_semantic_conventions::trace; use opentelemetry_stdout::{LogExporter, SpanExporter}; -use std::{convert::Infallible, net::SocketAddr, sync::OnceLock}; +use std::{ + collections::HashMap, + convert::Infallible, + net::SocketAddr, + sync::{Mutex, OnceLock}, +}; use tokio::net::TcpListener; use tracing::info; use tracing_subscriber::{layer::SubscriberExt, util::SubscriberInitExt}; @@ -83,6 +88,7 @@ async fn router( let span = tracer .span_builder("router") .with_kind(SpanKind::Server) + .with_attributes([KeyValue::new("http.route", req.uri().path().to_string())]) .start_with_context(tracer, &parent_cx); info!(name = "router", message = "Dispatching request"); @@ -104,6 +110,62 @@ async fn router( response } +#[derive(Debug, Default)] +/// A custom span processor that counts concurrent requests for each route (indetified by the http.route +/// attribute) and adds that information to the span attributes. +struct RouteConcurrencyCounterSpanProcessor(Mutex>); + +impl SpanProcessor for RouteConcurrencyCounterSpanProcessor { + fn force_flush(&self) -> OTelSdkResult { + Ok(()) + } + + fn shutdown(&self) -> OTelSdkResult { + Ok(()) + } + + fn on_start(&self, span: &mut opentelemetry_sdk::trace::Span, _cx: &Context) { + if !matches!(span.span_kind(), SpanKind::Server) { + return; + } + let Some(route) = span + .attributes() + .iter() + .find(|kv| kv.key.as_str() == "http.route") + else { + return; + }; + let mut counts = self.0.lock().unwrap(); + let count = counts.entry(route.key.clone()).or_default(); + *count += 1; + span.set_attribute(KeyValue::new( + "http.route.concurrent_requests", + count.to_string(), + )); + } + + fn on_end(&self, span: &mut FinishedSpan) { + if !matches!(span.span_kind(), SpanKind::Server) { + return; + } + let Some(route) = span + .attributes() + .iter() + .find(|kv| kv.key.as_str() == "http.route") + else { + return; + }; + let mut counts = self.0.lock().unwrap(); + let Some(count) = counts.get_mut(&route.key) else { + return; + }; + *count -= 1; + if *count == 0 { + counts.remove(&route.key); + } + } +} + /// A custom log processor that enriches LogRecords with baggage attributes. /// Baggage information is not added automatically without this processor. #[derive(Debug)] @@ -145,7 +207,7 @@ impl SpanProcessor for EnrichWithBaggageSpanProcessor { } } - fn on_end(&self, _span: opentelemetry_sdk::trace::SpanData) {} + fn on_end(&self, _span: &mut opentelemetry_sdk::trace::FinishedSpan) {} } fn init_tracer() -> SdkTracerProvider { @@ -161,6 +223,7 @@ fn init_tracer() -> SdkTracerProvider { // Setup tracerprovider with stdout exporter // that prints the spans to stdout. let provider = SdkTracerProvider::builder() + .with_span_processor(RouteConcurrencyCounterSpanProcessor::default()) .with_span_processor(EnrichWithBaggageSpanProcessor) .with_simple_exporter(SpanExporter::default()) .build(); diff --git a/opentelemetry-sdk/src/trace/mod.rs b/opentelemetry-sdk/src/trace/mod.rs index 6561da7d2a..6a7fe9586d 100644 --- a/opentelemetry-sdk/src/trace/mod.rs +++ b/opentelemetry-sdk/src/trace/mod.rs @@ -39,7 +39,7 @@ pub use id_generator::{IdGenerator, RandomIdGenerator}; pub use links::SpanLinks; pub use provider::{SdkTracerProvider, TracerProviderBuilder}; pub use sampler::{Sampler, ShouldSample}; -pub use span::Span; +pub use span::{Span, ReadableSpan, FinishedSpan}; pub use span_limit::SpanLimits; pub use span_processor::{ BatchConfig, BatchConfigBuilder, BatchSpanProcessor, BatchSpanProcessorBuilder, @@ -136,7 +136,7 @@ mod tests { } } - fn on_end(&self, _span: SpanData) { + fn on_end(&self, _span: &mut FinishedSpan) { // TODO: Accessing Context::current() will panic today and hence commented out. // See https://github.com/open-telemetry/opentelemetry-rust/issues/2871 // let _c = Context::current(); diff --git a/opentelemetry-sdk/src/trace/provider.rs b/opentelemetry-sdk/src/trace/provider.rs index 2773b8778b..cb611e2978 100644 --- a/opentelemetry-sdk/src/trace/provider.rs +++ b/opentelemetry-sdk/src/trace/provider.rs @@ -462,8 +462,8 @@ mod tests { SERVICE_NAME, TELEMETRY_SDK_LANGUAGE, TELEMETRY_SDK_NAME, TELEMETRY_SDK_VERSION, }; use crate::trace::provider::TracerProviderInner; - use crate::trace::{Config, Span, SpanProcessor}; - use crate::trace::{SdkTracerProvider, SpanData}; + use crate::trace::SdkTracerProvider; + use crate::trace::{Config, FinishedSpan, Span, SpanProcessor}; use crate::Resource; use opentelemetry::trace::{Tracer, TracerProvider}; use opentelemetry::{Context, Key, KeyValue, Value}; @@ -516,7 +516,7 @@ mod tests { .fetch_add(1, Ordering::SeqCst); } - fn on_end(&self, _span: SpanData) { + fn on_end(&self, _span: &mut FinishedSpan) { // ignore } @@ -779,7 +779,7 @@ mod tests { // No operation needed for this processor } - fn on_end(&self, _span: SpanData) { + fn on_end(&self, _span: &mut FinishedSpan) { // No operation needed for this processor } diff --git a/opentelemetry-sdk/src/trace/span.rs b/opentelemetry-sdk/src/trace/span.rs index 9cb8b88045..976ccd1592 100644 --- a/opentelemetry-sdk/src/trace/span.rs +++ b/opentelemetry-sdk/src/trace/span.rs @@ -198,13 +198,21 @@ impl opentelemetry::trace::Span for Span { impl Span { fn ensure_ended_and_exported(&mut self, timestamp: Option) { + let Span { + data, + tracer, + span_context, + .. + } = self; // skip if data has already been exported - let mut data = match self.data.take() { + let mut data = match data.take() { Some(data) => data, None => return, }; + let span_context: SpanContext = + std::mem::replace(span_context, SpanContext::empty_context()); - let provider = self.tracer.provider(); + let provider = tracer.provider(); // skip if provider has been shut down if provider.is_shutdown() { return; @@ -217,24 +225,18 @@ impl Span { data.end_time = opentelemetry::time::now(); } - match provider.span_processors() { - [] => {} - [processor] => { - processor.on_end(build_export_data( - data, - self.span_context.clone(), - &self.tracer, - )); - } - processors => { - for processor in processors { - processor.on_end(build_export_data( - data.clone(), - self.span_context.clone(), - &self.tracer, - )); - } + let mut finished_span = FinishedSpan { + span: Some(build_export_data(data, span_context, tracer)), + is_last_processor: false, + is_consummed: false, + }; + + for (i, processor) in provider.span_processors().iter().enumerate() { + if i == provider.span_processors().len() - 1 { + finished_span.is_last_processor = true; } + finished_span.is_consummed = false; + processor.on_end(&mut finished_span); } } } @@ -246,6 +248,276 @@ impl Drop for Span { } } +/// Represents a finished span passed to a span processor. +/// +/// The data associated with the span is not writable, but it can be read +/// through the `ReadableSpan` trait. +/// +/// Taking ownership of the span data is done by calling `consume`. +/// If `consume`` is never called, the on_ending method will not perform any copy of +/// the span data. +/// +/// Calling any `ReadableSpan` method on the `FinishedSpan` will panic if the span data +/// has aready been consumed. +/// +/// ``` +/// use opentelemetry_sdk::trace::{FinishedSpan, ReadableSpan}; +/// fn on_end(span: &mut FinishedSpan) { +/// if span.name() != "my_span" { +/// return; +/// } +/// let span = span.consume(); +/// # let _ = span; +/// } +/// ``` +pub struct FinishedSpan { + span: Option, + is_last_processor: bool, + is_consummed: bool, +} + +impl FinishedSpan { + fn span_data_ref(&self) -> &crate::trace::SpanData { + self.span + .as_ref() + .expect("Span data has already been consumed") + } + + #[allow(unused)] // Only exposed for testing purposes, and internal to the crate + /// Creates a new `FinishedSpan` with the given span data. + pub(crate) fn new(span_data: crate::trace::SpanData) -> Self { + FinishedSpan { + span: Some(span_data), + is_last_processor: false, + is_consummed: false, + } + } + + /// Takes ownership of the span data in the `FinishedSpan`. + /// + /// # Panics + /// + /// This function panics + /// * if it called twice in the same SpanProcessor::on_end + pub fn consume(&mut self) -> crate::trace::SpanData { + if self.is_consummed { + panic!("Span data has already been consumed"); + } + self.is_consummed = true; + if self.is_last_processor { + self.span + .take() + .expect("Span data has already been consumed") + } else { + self.span + .as_ref() + .expect("Span data has already been consumed") + .clone() + } + } +} + +impl ReadableSpan for FinishedSpan { + fn context(&self) -> &SpanContext { + &self.span_data_ref().span_context + } + + fn parent_span_id(&self) -> SpanId { + self.span_data_ref().parent_span_id + } + + fn span_kind(&self) -> SpanKind { + self.span_data_ref().span_kind.clone() + } + + fn name(&self) -> &str { + self.span_data_ref().name.as_ref() + } + fn start_time(&self) -> Option { + Some(self.span_data_ref().start_time) + } + fn end_time(&self) -> Option { + Some(self.span_data_ref().end_time) + } + fn attributes(&self) -> &[KeyValue] { + self.span_data_ref().attributes.as_slice() + } + fn dropped_attributes_count(&self) -> u32 { + self.span_data_ref().dropped_attributes_count + } + fn events(&self) -> &[Event] { + self.span_data_ref().events.events.as_slice() + } + fn dropped_events_count(&self) -> u32 { + self.span_data_ref().events.dropped_count + } + fn links(&self) -> &[Link] { + self.span_data_ref().links.links.as_slice() + } + fn dropped_links_count(&self) -> u32 { + self.span_data_ref().links.dropped_count + } + fn status(&self) -> &Status { + &self.span_data_ref().status + } +} + +impl std::fmt::Debug for FinishedSpan { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + let mut fmt = f.debug_struct("FinishedSpan"); + if self.is_consummed { + fmt.field("consummed", &self.is_consummed); + } else { + fmt.field("span", &self.span_data_ref()); + } + fmt.finish() + } +} + +/// A trait for reading span data. +pub trait ReadableSpan { + /// Returns the `SpanContext` of the span. + fn context(&self) -> &SpanContext; + + /// Returns the `SpanId` of the parent span. + fn parent_span_id(&self) -> SpanId; + + /// Returns the `SpanKind` of the span. + /// + /// Returns `SpanKind::Internal` if the span is not recording. + fn span_kind(&self) -> SpanKind; + + /// Returns the name of the span. + /// + /// Returns an empty string if the span is not recording. + fn name(&self) -> &str; + + /// Returns the start time of the span. + /// + /// Returns `None` if the span is not recording. + fn start_time(&self) -> Option; + + /// Returns the end time of the span. + /// + /// Returns `None` if + /// * the span is not recording. + /// * the span has not been ended. + fn end_time(&self) -> Option; + + /// Returns the attributes of the span. + /// + /// Returns an empty slice if the span is not recording. + fn attributes(&self) -> &[KeyValue]; + + /// Returns the number of dropped attributes. + fn dropped_attributes_count(&self) -> u32; + + /// Returns the events associated to the span. + /// + /// Returns an empty slice if the span is not recording. + fn events(&self) -> &[Event]; + + /// Returns the number of dropped events. + fn dropped_events_count(&self) -> u32; + + /// Returns the span links associated to the span. + /// + /// Returns an empty slice if the span is not recording. + fn links(&self) -> &[Link]; + + /// Returns the number of dropped links. + fn dropped_links_count(&self) -> u32; + + /// Returns the status of the span. + /// + /// Returns `Status::Unset` if the span is not recording. + fn status(&self) -> &Status; +} + +impl ReadableSpan for Span { + fn context(&self) -> &SpanContext { + &self.span_context + } + + fn parent_span_id(&self) -> SpanId { + self.data + .as_ref() + .map(|data| data.parent_span_id) + .unwrap_or_else(|| SpanId::INVALID) + } + + fn span_kind(&self) -> SpanKind { + self.data + .as_ref() + .map(|data| data.span_kind.clone()) + .unwrap_or(SpanKind::Internal) + } + + fn name(&self) -> &str { + self.data + .as_ref() + .map(|data| data.name.as_ref()) + .unwrap_or("") + } + + fn start_time(&self) -> Option { + self.data.as_ref().map(|data| data.start_time) + } + + fn end_time(&self) -> Option { + self.data.as_ref().map(|data| data.end_time) + } + + fn attributes(&self) -> &[KeyValue] { + self.data + .as_ref() + .map(|data| data.attributes.as_slice()) + .unwrap_or(&[]) + } + + fn dropped_attributes_count(&self) -> u32 { + self.data + .as_ref() + .map(|data| data.dropped_attributes_count) + .unwrap_or(0) + } + + fn events(&self) -> &[Event] { + self.data + .as_ref() + .map(|data| data.events.events.as_slice()) + .unwrap_or(&[]) + } + + fn dropped_events_count(&self) -> u32 { + self.data + .as_ref() + .map(|data| data.events.dropped_count) + .unwrap_or(0) + } + + fn links(&self) -> &[Link] { + self.data + .as_ref() + .map(|data| data.links.links.as_slice()) + .unwrap_or(&[]) + } + + fn dropped_links_count(&self) -> u32 { + self.data + .as_ref() + .map(|data| data.links.dropped_count) + .unwrap_or(0) + } + + fn status(&self) -> &Status { + self.data + .as_ref() + .map(|data| &data.status) + .unwrap_or(&Status::Unset) + } +} + fn build_export_data( data: SpanData, span_context: SpanContext, @@ -276,8 +548,9 @@ mod tests { DEFAULT_MAX_ATTRIBUTES_PER_SPAN, DEFAULT_MAX_EVENT_PER_SPAN, DEFAULT_MAX_LINKS_PER_SPAN, }; use crate::trace::{SpanEvents, SpanLinks}; - use opentelemetry::trace::{self, SpanBuilder, TraceFlags, TraceId, Tracer}; - use opentelemetry::{trace::Span as _, trace::TracerProvider}; + use opentelemetry::trace::{ + self, Span as _, SpanBuilder, TraceFlags, TraceId, Tracer, TracerProvider, + }; use std::time::Duration; use std::vec; diff --git a/opentelemetry-sdk/src/trace/span_processor.rs b/opentelemetry-sdk/src/trace/span_processor.rs index 595099ef7f..5bf86721f3 100644 --- a/opentelemetry-sdk/src/trace/span_processor.rs +++ b/opentelemetry-sdk/src/trace/span_processor.rs @@ -83,7 +83,7 @@ pub trait SpanProcessor: Send + Sync + std::fmt::Debug { /// already set). This method is called synchronously within the `Span::end` /// API, therefore it should not block or throw an exception. /// TODO - This method should take reference to `SpanData` - fn on_end(&self, span: SpanData); + fn on_end(&self, span: &mut FinishedSpan); /// Force the spans lying in the cache to be exported. fn force_flush(&self) -> OTelSdkResult; /// Shuts down the processor. Called when SDK is shut down. This is an @@ -129,10 +129,11 @@ impl SpanProcessor for SimpleSpanProcessor { // Ignored } - fn on_end(&self, span: SpanData) { - if !span.span_context.is_sampled() { + fn on_end(&self, span: &mut FinishedSpan) { + if !span.context().is_sampled() { return; } + let span = span.consume(); let result = self .exporter @@ -236,6 +237,9 @@ use std::sync::mpsc::Receiver; use std::sync::mpsc::RecvTimeoutError; use std::sync::mpsc::SyncSender; +use super::span::FinishedSpan; +use super::ReadableSpan; + /// Messages exchanged between the main thread and the background thread. #[allow(clippy::large_enum_variant)] #[derive(Debug)] @@ -513,7 +517,7 @@ impl SpanProcessor for BatchSpanProcessor { } /// Handles span end. - fn on_end(&self, span: SpanData) { + fn on_end(&self, span: &mut FinishedSpan) { if self.is_shutdown.load(Ordering::Relaxed) { // this is a warning, as the user is trying to emit after the processor has been shutdown otel_warn!( @@ -522,6 +526,7 @@ impl SpanProcessor for BatchSpanProcessor { ); return; } + let span = span.consume(); let result = self.span_sender.try_send(span); if result.is_err() { @@ -865,7 +870,7 @@ mod tests { OTEL_BSP_EXPORT_TIMEOUT_DEFAULT, OTEL_BSP_MAX_CONCURRENT_EXPORTS, OTEL_BSP_MAX_CONCURRENT_EXPORTS_DEFAULT, OTEL_BSP_MAX_EXPORT_BATCH_SIZE_DEFAULT, }; - use crate::trace::InMemorySpanExporterBuilder; + use crate::trace::{FinishedSpan, InMemorySpanExporterBuilder}; use crate::trace::{BatchConfig, BatchConfigBuilder, SpanEvents, SpanLinks}; use crate::trace::{SpanData, SpanExporter}; use opentelemetry::trace::{SpanContext, SpanId, SpanKind, Status}; @@ -877,7 +882,7 @@ mod tests { let exporter = InMemorySpanExporterBuilder::new().build(); let processor = SimpleSpanProcessor::new(exporter.clone()); let span_data = new_test_export_span_data(); - processor.on_end(span_data.clone()); + processor.on_end(&mut FinishedSpan::new(span_data.clone())); assert_eq!(exporter.get_finished_spans().unwrap()[0], span_data); let _result = processor.shutdown(); } @@ -900,7 +905,7 @@ mod tests { status: Status::Unset, instrumentation_scope: Default::default(), }; - processor.on_end(unsampled); + processor.on_end(&mut FinishedSpan::new(unsampled)); assert!(exporter.get_finished_spans().unwrap().is_empty()); } @@ -909,7 +914,7 @@ mod tests { let exporter = InMemorySpanExporterBuilder::new().build(); let processor = SimpleSpanProcessor::new(exporter.clone()); let span_data = new_test_export_span_data(); - processor.on_end(span_data.clone()); + processor.on_end(&mut FinishedSpan::new(span_data.clone())); assert!(!exporter.get_finished_spans().unwrap().is_empty()); let _result = processor.shutdown(); // Assume shutdown is called by ensuring spans are empty in the exporter @@ -1111,7 +1116,7 @@ mod tests { let processor = BatchSpanProcessor::new(exporter, config); let test_span = create_test_span("test_span"); - processor.on_end(test_span.clone()); + processor.on_end(&mut &mut FinishedSpan::new(test_span.clone())); // Wait for flush interval to ensure the span is processed std::thread::sleep(Duration::from_secs(6)); @@ -1134,7 +1139,7 @@ mod tests { // Create a test span and send it to the processor let test_span = create_test_span("force_flush_span"); - processor.on_end(test_span.clone()); + processor.on_end(&mut FinishedSpan::new(test_span.clone())); // Call force_flush to immediately export the spans let flush_result = processor.force_flush(); @@ -1163,7 +1168,7 @@ mod tests { // Create a test span and send it to the processor let test_span = create_test_span("shutdown_span"); - processor.on_end(test_span.clone()); + processor.on_end(&mut FinishedSpan::new(test_span.clone())); // Call shutdown to flush and export all pending spans let shutdown_result = processor.shutdown(); @@ -1201,9 +1206,9 @@ mod tests { let span2 = create_test_span("span2"); let span3 = create_test_span("span3"); // This span should be dropped - processor.on_end(span1.clone()); - processor.on_end(span2.clone()); - processor.on_end(span3.clone()); // This span exceeds the queue size + processor.on_end(&mut FinishedSpan::new(span1.clone())); + processor.on_end(&mut FinishedSpan::new(span2.clone())); + processor.on_end(&mut FinishedSpan::new(span3.clone())); // This span exceeds the queue size // Wait for the scheduled delay to expire std::thread::sleep(Duration::from_secs(3)); @@ -1243,7 +1248,7 @@ mod tests { KeyValue::new("key1", "value1"), KeyValue::new("key2", "value2"), ]; - processor.on_end(span_data.clone()); + processor.on_end(&mut FinishedSpan::new(span_data.clone())); // Force flush to export the span let _ = processor.force_flush(); @@ -1274,7 +1279,7 @@ mod tests { // Create a span and send it to the processor let test_span = create_test_span("resource_test"); - processor.on_end(test_span.clone()); + processor.on_end(&mut FinishedSpan::new(test_span.clone())); // Force flush to ensure the span is exported let _ = processor.force_flush(); @@ -1309,7 +1314,7 @@ mod tests { for _ in 0..4 { let span = new_test_export_span_data(); - processor.on_end(span); + processor.on_end(&mut FinishedSpan::new(span)); } processor.force_flush().unwrap(); @@ -1332,7 +1337,7 @@ mod tests { for _ in 0..4 { let span = new_test_export_span_data(); - processor.on_end(span); + processor.on_end(&mut FinishedSpan::new(span)); } processor.force_flush().unwrap(); @@ -1359,7 +1364,7 @@ mod tests { let processor_clone = Arc::clone(&processor); let handle = tokio::spawn(async move { let span = new_test_export_span_data(); - processor_clone.on_end(span); + processor_clone.on_end(&mut FinishedSpan::new(span)); }); handles.push(handle); } diff --git a/opentelemetry-sdk/src/trace/span_processor_with_async_runtime.rs b/opentelemetry-sdk/src/trace/span_processor_with_async_runtime.rs index cb4d2fc14b..f41f38edd1 100644 --- a/opentelemetry-sdk/src/trace/span_processor_with_async_runtime.rs +++ b/opentelemetry-sdk/src/trace/span_processor_with_async_runtime.rs @@ -4,7 +4,7 @@ use crate::runtime::{to_interval_stream, RuntimeChannel, TrySend}; use crate::trace::BatchConfig; use crate::trace::Span; use crate::trace::SpanProcessor; -use crate::trace::{SpanData, SpanExporter}; +use crate::trace::{SpanData, FinishedSpan, SpanExporter, ReadableSpan}; use futures_channel::oneshot; use futures_util::pin_mut; use futures_util::{ @@ -102,10 +102,11 @@ impl SpanProcessor for BatchSpanProcessor { // Ignored } - fn on_end(&self, span: SpanData) { - if !span.span_context.is_sampled() { + fn on_end(&self, span: &mut FinishedSpan) { + if !span.context().is_sampled() { return; } + let span = span.consume(); let result = self.message_sender.try_send(BatchMessage::ExportSpan(span)); diff --git a/stress/src/traces.rs b/stress/src/traces.rs index 4cd713e4b2..00fa2ef4b0 100644 --- a/stress/src/traces.rs +++ b/stress/src/traces.rs @@ -16,7 +16,7 @@ use opentelemetry::{ }; use opentelemetry_sdk::{ error::OTelSdkResult, - trace::{self as sdktrace, SpanData, SpanProcessor}, + trace::{self as sdktrace, FinishedSpan, SpanProcessor}, }; mod throughput; @@ -37,7 +37,7 @@ impl SpanProcessor for NoOpSpanProcessor { // No-op } - fn on_end(&self, _span: SpanData) { + fn on_end(&self, _span: &mut FinishedSpan) { // No-op } From a0a8c238da8d8e389766355d53a188695fda1e11 Mon Sep 17 00:00:00 2001 From: paullegranddc Date: Tue, 13 May 2025 16:58:00 +0200 Subject: [PATCH 02/26] feat: add an experimental on_ending api This API allows to mutations of the span when it is ending. It's marked as on development in the spec, but it is useful for span obfuscation for example, which needs to done after attributes can added to the span anymore. --- examples/tracing-http-propagator/Cargo.toml | 2 +- .../tracing-http-propagator/src/server.rs | 39 +++++++++++++++++ opentelemetry-sdk/Cargo.toml | 1 + opentelemetry-sdk/src/trace/span.rs | 42 ++++++++++++------- opentelemetry-sdk/src/trace/span_processor.rs | 17 +++++++- 5 files changed, 84 insertions(+), 17 deletions(-) diff --git a/examples/tracing-http-propagator/Cargo.toml b/examples/tracing-http-propagator/Cargo.toml index 8ecf20eb43..a59231bf5b 100644 --- a/examples/tracing-http-propagator/Cargo.toml +++ b/examples/tracing-http-propagator/Cargo.toml @@ -24,7 +24,7 @@ hyper = { workspace = true, features = ["full"] } hyper-util = { workspace = true, features = ["full"] } tokio = { workspace = true, features = ["full"] } opentelemetry = { path = "../../opentelemetry" } -opentelemetry_sdk = { path = "../../opentelemetry-sdk" } +opentelemetry_sdk = { path = "../../opentelemetry-sdk", features = ["experimental_span_processor_on_ending"]} opentelemetry-http = { path = "../../opentelemetry-http" } opentelemetry-stdout = { workspace = true, features = ["trace", "logs"] } opentelemetry-semantic-conventions = { path = "../../opentelemetry-semantic-conventions" } diff --git a/examples/tracing-http-propagator/src/server.rs b/examples/tracing-http-propagator/src/server.rs index 08f0ab6d9f..bd7ecd57ca 100644 --- a/examples/tracing-http-propagator/src/server.rs +++ b/examples/tracing-http-propagator/src/server.rs @@ -166,6 +166,44 @@ impl SpanProcessor for RouteConcurrencyCounterSpanProcessor { } } +fn obfuscate_http_auth_url(s: &str) -> Option { + let uri = hyper::http::Uri::from_maybe_shared(s.to_owned()).ok()?; + let authority = uri.authority()?; + let (_, url) = authority.as_str().split_once('@')?; + let new_auth = format!("REDACTED_USERNAME:REDACTED_PASSWORD@{url}"); + let mut parts = uri.into_parts(); + parts.authority = Some(hyper::http::uri::Authority::from_maybe_shared(new_auth).ok()?); + Some(hyper::Uri::from_parts(parts).ok()?.to_string()) +} + +#[derive(Debug)] +/// A custom span processor that uses on_ending to obfuscate sensitive information in span attributes. +/// +/// Currently this only overrides http auth information in the URI. +struct SpanOnbfuscationProcessor; + +impl SpanProcessor for SpanOnbfuscationProcessor { + fn force_flush(&self) -> OTelSdkResult { + Ok(()) + } + + fn shutdown(&self) -> OTelSdkResult { + Ok(()) + } + fn on_start(&self, _span: &mut opentelemetry_sdk::trace::Span, _cx: &Context) {} + + fn on_ending(&self, span: &mut opentelemetry_sdk::trace::Span) { + let mut obfuscated_attributes = Vec::new(); + for KeyValue { key, value, .. } in span.attributes() { + if let Some(redacted_uri) = obfuscate_http_auth_url(value.as_str().as_ref()) { + obfuscated_attributes.push((key.clone(), KeyValue::new(key.clone(), redacted_uri))); + } + } + } + + fn on_end(&self, _span: &mut FinishedSpan) {} +} + /// A custom log processor that enriches LogRecords with baggage attributes. /// Baggage information is not added automatically without this processor. #[derive(Debug)] @@ -225,6 +263,7 @@ fn init_tracer() -> SdkTracerProvider { let provider = SdkTracerProvider::builder() .with_span_processor(RouteConcurrencyCounterSpanProcessor::default()) .with_span_processor(EnrichWithBaggageSpanProcessor) + .with_span_processor(SpanOnbfuscationProcessor) .with_simple_exporter(SpanExporter::default()) .build(); diff --git a/opentelemetry-sdk/Cargo.toml b/opentelemetry-sdk/Cargo.toml index 171296e3ef..db65ddc3f8 100644 --- a/opentelemetry-sdk/Cargo.toml +++ b/opentelemetry-sdk/Cargo.toml @@ -58,6 +58,7 @@ experimental_logs_batch_log_processor_with_async_runtime = ["logs", "experimenta experimental_logs_concurrent_log_processor = ["logs"] experimental_trace_batch_span_processor_with_async_runtime = ["trace", "experimental_async_runtime"] experimental_metrics_disable_name_validation = ["metrics"] +experimental_span_processor_on_ending = ["trace"] [[bench]] name = "context" diff --git a/opentelemetry-sdk/src/trace/span.rs b/opentelemetry-sdk/src/trace/span.rs index 976ccd1592..09f5c98a4a 100644 --- a/opentelemetry-sdk/src/trace/span.rs +++ b/opentelemetry-sdk/src/trace/span.rs @@ -198,33 +198,47 @@ impl opentelemetry::trace::Span for Span { impl Span { fn ensure_ended_and_exported(&mut self, timestamp: Option) { + match self.data { + Some(ref mut data) => { + // ensure end time is set via explicit end or implicitly on drop + if let Some(timestamp) = timestamp { + data.end_time = timestamp; + } else if data.end_time == data.start_time { + data.end_time = opentelemetry::time::now(); + } + } + None => { + return; + } + }; + + if self.tracer.provider().is_shutdown() { + return; + } + let provider = self.tracer.provider().clone(); + + #[cfg(feature = "experimental_span_processor_on_ending")] + { + for processor in provider.span_processors() { + processor.on_ending( self); + } + } + let Span { data, tracer, span_context, .. } = self; + // skip if data has already been exported - let mut data = match data.take() { + let data = match data.take() { Some(data) => data, None => return, }; let span_context: SpanContext = std::mem::replace(span_context, SpanContext::empty_context()); - let provider = tracer.provider(); - // skip if provider has been shut down - if provider.is_shutdown() { - return; - } - - // ensure end time is set via explicit end or implicitly on drop - if let Some(timestamp) = timestamp { - data.end_time = timestamp; - } else if data.end_time == data.start_time { - data.end_time = opentelemetry::time::now(); - } - let mut finished_span = FinishedSpan { span: Some(build_export_data(data, span_context, tracer)), is_last_processor: false, diff --git a/opentelemetry-sdk/src/trace/span_processor.rs b/opentelemetry-sdk/src/trace/span_processor.rs index 5bf86721f3..2693a9724c 100644 --- a/opentelemetry-sdk/src/trace/span_processor.rs +++ b/opentelemetry-sdk/src/trace/span_processor.rs @@ -79,10 +79,23 @@ pub trait SpanProcessor: Send + Sync + std::fmt::Debug { /// synchronously on the thread that started the span, therefore it should /// not block or throw exceptions. fn on_start(&self, span: &mut Span, cx: &Context); + #[cfg(feature = "experimental_span_processor_on_ending")] + /// `on_ending` is called when a `Span` is ending. The en timestampe has already + /// been computed. + /// Mutations done to the span in this method will be reflected in the span passed + /// to other span processors. + /// This method is called synchronously within the `Span::end` API, therefore it + /// should not block or throw an exception. + /// If multiple SpanProcessors are registered, their on_ending methods are invoked + /// in the order they have been registered. + fn on_ending(&self, _span: &mut Span) { + // Default implementation is a no-op so existing processor implementations + // don't break if this feature in enabled transitively. + } + /// `on_end` is called after a `Span` is ended (i.e., the end timestamp is /// already set). This method is called synchronously within the `Span::end` /// API, therefore it should not block or throw an exception. - /// TODO - This method should take reference to `SpanData` fn on_end(&self, span: &mut FinishedSpan); /// Force the spans lying in the cache to be exported. fn force_flush(&self) -> OTelSdkResult; @@ -870,8 +883,8 @@ mod tests { OTEL_BSP_EXPORT_TIMEOUT_DEFAULT, OTEL_BSP_MAX_CONCURRENT_EXPORTS, OTEL_BSP_MAX_CONCURRENT_EXPORTS_DEFAULT, OTEL_BSP_MAX_EXPORT_BATCH_SIZE_DEFAULT, }; - use crate::trace::{FinishedSpan, InMemorySpanExporterBuilder}; use crate::trace::{BatchConfig, BatchConfigBuilder, SpanEvents, SpanLinks}; + use crate::trace::{FinishedSpan, InMemorySpanExporterBuilder}; use crate::trace::{SpanData, SpanExporter}; use opentelemetry::trace::{SpanContext, SpanId, SpanKind, Status}; use std::fmt::Debug; From 54fa6f266cca28bc9840d4befdf8a757e7fd4ee6 Mon Sep 17 00:00:00 2001 From: paullegranddc Date: Tue, 13 May 2025 18:09:34 +0200 Subject: [PATCH 03/26] fix: formatting and clippy --- examples/tracing-http-propagator/src/server.rs | 3 ++- opentelemetry-sdk/src/trace/mod.rs | 2 +- opentelemetry-sdk/src/trace/span.rs | 11 ++++++----- .../src/trace/span_processor_with_async_runtime.rs | 2 +- 4 files changed, 10 insertions(+), 8 deletions(-) diff --git a/examples/tracing-http-propagator/src/server.rs b/examples/tracing-http-propagator/src/server.rs index bd7ecd57ca..f9190b418c 100644 --- a/examples/tracing-http-propagator/src/server.rs +++ b/examples/tracing-http-propagator/src/server.rs @@ -167,7 +167,8 @@ impl SpanProcessor for RouteConcurrencyCounterSpanProcessor { } fn obfuscate_http_auth_url(s: &str) -> Option { - let uri = hyper::http::Uri::from_maybe_shared(s.to_owned()).ok()?; + #[allow(clippy::unnecessary_to_owned)] + let uri = hyper::http::Uri::from_maybe_shared(s.to_string()).ok()?; let authority = uri.authority()?; let (_, url) = authority.as_str().split_once('@')?; let new_auth = format!("REDACTED_USERNAME:REDACTED_PASSWORD@{url}"); diff --git a/opentelemetry-sdk/src/trace/mod.rs b/opentelemetry-sdk/src/trace/mod.rs index 6a7fe9586d..7ade8c8ac7 100644 --- a/opentelemetry-sdk/src/trace/mod.rs +++ b/opentelemetry-sdk/src/trace/mod.rs @@ -39,7 +39,7 @@ pub use id_generator::{IdGenerator, RandomIdGenerator}; pub use links::SpanLinks; pub use provider::{SdkTracerProvider, TracerProviderBuilder}; pub use sampler::{Sampler, ShouldSample}; -pub use span::{Span, ReadableSpan, FinishedSpan}; +pub use span::{FinishedSpan, ReadableSpan, Span}; pub use span_limit::SpanLimits; pub use span_processor::{ BatchConfig, BatchConfigBuilder, BatchSpanProcessor, BatchSpanProcessorBuilder, diff --git a/opentelemetry-sdk/src/trace/span.rs b/opentelemetry-sdk/src/trace/span.rs index 09f5c98a4a..ab86881957 100644 --- a/opentelemetry-sdk/src/trace/span.rs +++ b/opentelemetry-sdk/src/trace/span.rs @@ -215,12 +215,12 @@ impl Span { if self.tracer.provider().is_shutdown() { return; } - let provider = self.tracer.provider().clone(); #[cfg(feature = "experimental_span_processor_on_ending")] { + let provider = self.tracer.provider().clone(); for processor in provider.span_processors() { - processor.on_ending( self); + processor.on_ending(self); } } @@ -230,7 +230,7 @@ impl Span { span_context, .. } = self; - + // skip if data has already been exported let data = match data.take() { Some(data) => data, @@ -245,8 +245,9 @@ impl Span { is_consummed: false, }; - for (i, processor) in provider.span_processors().iter().enumerate() { - if i == provider.span_processors().len() - 1 { + let span_processors = tracer.provider().span_processors(); + for (i, processor) in span_processors.iter().enumerate() { + if i == span_processors.len() - 1 { finished_span.is_last_processor = true; } finished_span.is_consummed = false; diff --git a/opentelemetry-sdk/src/trace/span_processor_with_async_runtime.rs b/opentelemetry-sdk/src/trace/span_processor_with_async_runtime.rs index f41f38edd1..1a37b45b15 100644 --- a/opentelemetry-sdk/src/trace/span_processor_with_async_runtime.rs +++ b/opentelemetry-sdk/src/trace/span_processor_with_async_runtime.rs @@ -4,7 +4,7 @@ use crate::runtime::{to_interval_stream, RuntimeChannel, TrySend}; use crate::trace::BatchConfig; use crate::trace::Span; use crate::trace::SpanProcessor; -use crate::trace::{SpanData, FinishedSpan, SpanExporter, ReadableSpan}; +use crate::trace::{FinishedSpan, ReadableSpan, SpanData, SpanExporter}; use futures_channel::oneshot; use futures_util::pin_mut; use futures_util::{ From b72b03150f3178ea3b721325a466a32be6c936de Mon Sep 17 00:00:00 2001 From: paullegranddc Date: Tue, 13 May 2025 20:34:45 +0200 Subject: [PATCH 04/26] fix: some SpanProcessor calls were not finished --- .../benches/batch_span_processor.rs | 5 +++-- opentelemetry-sdk/src/trace/span.rs | 21 +++++++++---------- .../span_processor_with_async_runtime.rs | 6 +++--- 3 files changed, 16 insertions(+), 16 deletions(-) diff --git a/opentelemetry-sdk/benches/batch_span_processor.rs b/opentelemetry-sdk/benches/batch_span_processor.rs index b50c1fe1c4..4c3d16524c 100644 --- a/opentelemetry-sdk/benches/batch_span_processor.rs +++ b/opentelemetry-sdk/benches/batch_span_processor.rs @@ -4,7 +4,7 @@ use opentelemetry::trace::{ SpanContext, SpanId, SpanKind, Status, TraceFlags, TraceId, TraceState, }; use opentelemetry_sdk::testing::trace::NoopSpanExporter; -use opentelemetry_sdk::trace::SpanData; +use opentelemetry_sdk::trace::{FinishedSpan, SpanData}; use opentelemetry_sdk::trace::{ BatchConfigBuilder, BatchSpanProcessor, SpanEvents, SpanLinks, SpanProcessor, }; @@ -62,7 +62,8 @@ fn criterion_benchmark(c: &mut Criterion) { let spans = get_span_data(); handles.push(tokio::spawn(async move { for span in spans { - span_processor.on_end(span); + let mut span= FinishedSpan::new(span); + span_processor.on_end(&mut span); tokio::task::yield_now().await; } })); diff --git a/opentelemetry-sdk/src/trace/span.rs b/opentelemetry-sdk/src/trace/span.rs index ab86881957..b03ac701ad 100644 --- a/opentelemetry-sdk/src/trace/span.rs +++ b/opentelemetry-sdk/src/trace/span.rs @@ -239,18 +239,11 @@ impl Span { let span_context: SpanContext = std::mem::replace(span_context, SpanContext::empty_context()); - let mut finished_span = FinishedSpan { - span: Some(build_export_data(data, span_context, tracer)), - is_last_processor: false, - is_consummed: false, - }; + let mut finished_span = FinishedSpan::new(build_export_data(data, span_context, tracer)); let span_processors = tracer.provider().span_processors(); for (i, processor) in span_processors.iter().enumerate() { - if i == span_processors.len() - 1 { - finished_span.is_last_processor = true; - } - finished_span.is_consummed = false; + finished_span.reset(i == span_processors.len() - 1); processor.on_end(&mut finished_span); } } @@ -278,9 +271,11 @@ impl Drop for Span { /// ``` /// use opentelemetry_sdk::trace::{FinishedSpan, ReadableSpan}; /// fn on_end(span: &mut FinishedSpan) { +/// // Read the span data without consuming it /// if span.name() != "my_span" { /// return; /// } +/// // Consume the span data, potentially cloning it /// let span = span.consume(); /// # let _ = span; /// } @@ -298,9 +293,8 @@ impl FinishedSpan { .expect("Span data has already been consumed") } - #[allow(unused)] // Only exposed for testing purposes, and internal to the crate /// Creates a new `FinishedSpan` with the given span data. - pub(crate) fn new(span_data: crate::trace::SpanData) -> Self { + pub fn new(span_data: crate::trace::SpanData) -> Self { FinishedSpan { span: Some(span_data), is_last_processor: false, @@ -308,6 +302,11 @@ impl FinishedSpan { } } + fn reset(&mut self, last_processor: bool) { + self.is_last_processor = last_processor; + self.is_consummed = false; + } + /// Takes ownership of the span data in the `FinishedSpan`. /// /// # Panics diff --git a/opentelemetry-sdk/src/trace/span_processor_with_async_runtime.rs b/opentelemetry-sdk/src/trace/span_processor_with_async_runtime.rs index 1a37b45b15..c5cf3cd84a 100644 --- a/opentelemetry-sdk/src/trace/span_processor_with_async_runtime.rs +++ b/opentelemetry-sdk/src/trace/span_processor_with_async_runtime.rs @@ -432,7 +432,7 @@ mod tests { OTEL_BSP_MAX_QUEUE_SIZE_DEFAULT, OTEL_BSP_SCHEDULE_DELAY, OTEL_BSP_SCHEDULE_DELAY_DEFAULT, }; use crate::trace::{BatchConfig, BatchConfigBuilder, InMemorySpanExporterBuilder}; - use crate::trace::{SpanData, SpanExporter}; + use crate::trace::{SpanData, SpanExporter, FinishedSpan}; use futures_util::Future; use std::fmt::Debug; use std::time::Duration; @@ -519,7 +519,7 @@ mod tests { } }); tokio::time::sleep(Duration::from_secs(1)).await; // skip the first - processor.on_end(new_test_export_span_data()); + processor.on_end(&mut FinishedSpan::new(new_test_export_span_data())); let flush_res = processor.force_flush(); assert!(flush_res.is_ok()); let _shutdown_result = processor.shutdown(); @@ -546,7 +546,7 @@ mod tests { }; let processor = BatchSpanProcessor::new(exporter, config, runtime::TokioCurrentThread); tokio::time::sleep(Duration::from_secs(1)).await; // skip the first - processor.on_end(new_test_export_span_data()); + processor.on_end(&mut FinishedSpan::new(new_test_export_span_data())); let flush_res = processor.force_flush(); if time_out { assert!(flush_res.is_err()); From afb4197225d1a8b675a1ae81b067c295c7d3fc41 Mon Sep 17 00:00:00 2001 From: paullegranddc Date: Wed, 14 May 2025 11:58:56 +0200 Subject: [PATCH 05/26] fix: lint --- opentelemetry-sdk/benches/batch_span_processor.rs | 4 ++-- .../src/trace/span_processor_with_async_runtime.rs | 2 +- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/opentelemetry-sdk/benches/batch_span_processor.rs b/opentelemetry-sdk/benches/batch_span_processor.rs index 4c3d16524c..b75f597f4d 100644 --- a/opentelemetry-sdk/benches/batch_span_processor.rs +++ b/opentelemetry-sdk/benches/batch_span_processor.rs @@ -4,10 +4,10 @@ use opentelemetry::trace::{ SpanContext, SpanId, SpanKind, Status, TraceFlags, TraceId, TraceState, }; use opentelemetry_sdk::testing::trace::NoopSpanExporter; -use opentelemetry_sdk::trace::{FinishedSpan, SpanData}; use opentelemetry_sdk::trace::{ BatchConfigBuilder, BatchSpanProcessor, SpanEvents, SpanLinks, SpanProcessor, }; +use opentelemetry_sdk::trace::{FinishedSpan, SpanData}; use std::sync::Arc; use tokio::runtime::Runtime; @@ -62,7 +62,7 @@ fn criterion_benchmark(c: &mut Criterion) { let spans = get_span_data(); handles.push(tokio::spawn(async move { for span in spans { - let mut span= FinishedSpan::new(span); + let mut span = FinishedSpan::new(span); span_processor.on_end(&mut span); tokio::task::yield_now().await; } diff --git a/opentelemetry-sdk/src/trace/span_processor_with_async_runtime.rs b/opentelemetry-sdk/src/trace/span_processor_with_async_runtime.rs index c5cf3cd84a..71b4e58172 100644 --- a/opentelemetry-sdk/src/trace/span_processor_with_async_runtime.rs +++ b/opentelemetry-sdk/src/trace/span_processor_with_async_runtime.rs @@ -432,7 +432,7 @@ mod tests { OTEL_BSP_MAX_QUEUE_SIZE_DEFAULT, OTEL_BSP_SCHEDULE_DELAY, OTEL_BSP_SCHEDULE_DELAY_DEFAULT, }; use crate::trace::{BatchConfig, BatchConfigBuilder, InMemorySpanExporterBuilder}; - use crate::trace::{SpanData, SpanExporter, FinishedSpan}; + use crate::trace::{FinishedSpan, SpanData, SpanExporter}; use futures_util::Future; use std::fmt::Debug; use std::time::Duration; From 4954b0d3260c250ed647516907b8c1fc950d8176 Mon Sep 17 00:00:00 2001 From: paullegranddc Date: Wed, 14 May 2025 14:40:04 +0200 Subject: [PATCH 06/26] fix: lint clippy --- opentelemetry-sdk/src/trace/span_processor.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/opentelemetry-sdk/src/trace/span_processor.rs b/opentelemetry-sdk/src/trace/span_processor.rs index 2693a9724c..6a31a3ca5b 100644 --- a/opentelemetry-sdk/src/trace/span_processor.rs +++ b/opentelemetry-sdk/src/trace/span_processor.rs @@ -1129,7 +1129,7 @@ mod tests { let processor = BatchSpanProcessor::new(exporter, config); let test_span = create_test_span("test_span"); - processor.on_end(&mut &mut FinishedSpan::new(test_span.clone())); + processor.on_end(&mut FinishedSpan::new(test_span.clone())); // Wait for flush interval to ensure the span is processed std::thread::sleep(Duration::from_secs(6)); From 224a7edfc72c94fca087929caaec2470395d5064 Mon Sep 17 00:00:00 2001 From: paullegranddc Date: Tue, 20 May 2025 16:42:49 +0200 Subject: [PATCH 07/26] feat: add span processor benchmarks Adds a benchmark creating a span and dropping it, without an exporter. The goal is to estimate the cost of the SDK running without the exporter --- opentelemetry-sdk/Cargo.toml | 6 ++ .../benches/span_processor_api.rs | 88 +++++++++++++++++++ 2 files changed, 94 insertions(+) create mode 100644 opentelemetry-sdk/benches/span_processor_api.rs diff --git a/opentelemetry-sdk/Cargo.toml b/opentelemetry-sdk/Cargo.toml index db65ddc3f8..babd992226 100644 --- a/opentelemetry-sdk/Cargo.toml +++ b/opentelemetry-sdk/Cargo.toml @@ -116,6 +116,12 @@ name = "log" harness = false required-features = ["logs"] +[[bench]] +name = "span_processor_api" +harness = false +required-features = ["testing"] + + [lib] bench = false diff --git a/opentelemetry-sdk/benches/span_processor_api.rs b/opentelemetry-sdk/benches/span_processor_api.rs new file mode 100644 index 0000000000..390da9a0d0 --- /dev/null +++ b/opentelemetry-sdk/benches/span_processor_api.rs @@ -0,0 +1,88 @@ +use std::time::Duration; + +use criterion::{black_box, criterion_group, criterion_main, Criterion}; +use opentelemetry::{ + trace::{Span, Tracer, TracerProvider}, + Context, KeyValue, +}; +use opentelemetry_sdk::trace as sdktrace; + +#[cfg(not(target_os = "windows"))] +use pprof::criterion::{Output, PProfProfiler}; + +/* +Adding results in comments for a quick reference. + Chip: Apple M1 Max + Total Number of Cores: 10 (8 performance and 2 efficiency) + +SpanProcessorApi/0_processors + time: [385.15 ns 386.14 ns 387.25 ns] +SpanProcessorApi/1_processors + time: [385.73 ns 387.17 ns 388.85 ns] +SpanProcessorApi/2_processors + time: [384.84 ns 385.66 ns 386.50 ns] +SpanProcessorApi/4_processors + time: [386.78 ns 388.17 ns 389.58 ns] +*/ + +#[derive(Debug)] +struct NoopSpanProcessor; + +impl sdktrace::SpanProcessor for NoopSpanProcessor { + fn on_start(&self, _span: &mut sdktrace::Span, _parent_cx: &Context) {} + fn on_end(&self, _span: &mut sdktrace::FinishedSpan) {} + fn force_flush(&self) -> opentelemetry_sdk::error::OTelSdkResult { + Ok(()) + } + fn shutdown(&self) -> opentelemetry_sdk::error::OTelSdkResult { + Ok(()) + } +} + +fn create_tracer(span_processors_count: usize) -> sdktrace::SdkTracer { + let mut builder = sdktrace::SdkTracerProvider::builder(); + for _ in 0..span_processors_count { + builder = builder.with_span_processor(NoopSpanProcessor); + } + builder.build().tracer("tracer") +} + +fn create_span(tracer: &sdktrace::Tracer) { + let mut span = tracer.start("foo"); + span.set_attribute(KeyValue::new("key1", false)); + span.set_attribute(KeyValue::new("key2", "hello")); + span.set_attribute(KeyValue::new("key4", 123.456)); + span.add_event("my_event", vec![KeyValue::new("key1", "value1")]); + span.end(); +} + +fn criterion_benchmark(c: &mut Criterion) { + let mut group = c.benchmark_group("SpanProcessorApi"); + for i in [0, 1, 2, 4] { + group.bench_function(format!("{}_processors", i), |b| { + let tracer = create_tracer(i); + b.iter(|| { + black_box(create_span(&tracer)); + }); + }); + } +} + +#[cfg(not(target_os = "windows"))] +criterion_group! { + name = benches; + config = Criterion::default().with_profiler(PProfProfiler::new(100, Output::Flamegraph(None))) + .warm_up_time(std::time::Duration::from_secs(1)) + .measurement_time(std::time::Duration::from_secs(2)); + targets = criterion_benchmark +} + +#[cfg(target_os = "windows")] +criterion_group! { + name = benches; + config = Criterion::default().warm_up_time(std::time::Duration::from_secs(1)) + .measurement_time(std::time::Duration::from_secs(2)); + targets = criterion_benchmark +} + +criterion_main!(benches); From a62a9bdcfc060b4566ddecd6122399a33095a0e4 Mon Sep 17 00:00:00 2001 From: paullegranddc Date: Tue, 20 May 2025 17:18:29 +0200 Subject: [PATCH 08/26] fix: remove unised import --- opentelemetry-sdk/benches/span_processor_api.rs | 2 -- 1 file changed, 2 deletions(-) diff --git a/opentelemetry-sdk/benches/span_processor_api.rs b/opentelemetry-sdk/benches/span_processor_api.rs index 390da9a0d0..5e22c2125f 100644 --- a/opentelemetry-sdk/benches/span_processor_api.rs +++ b/opentelemetry-sdk/benches/span_processor_api.rs @@ -1,5 +1,3 @@ -use std::time::Duration; - use criterion::{black_box, criterion_group, criterion_main, Criterion}; use opentelemetry::{ trace::{Span, Tracer, TracerProvider}, From 5ce1ba46bda67ffee67466d804ae140acea13708 Mon Sep 17 00:00:00 2001 From: paullegranddc Date: Tue, 20 May 2025 17:42:39 +0200 Subject: [PATCH 09/26] fix: typos, so many typos... --- examples/tracing-http-propagator/src/server.rs | 8 ++++---- opentelemetry-sdk/src/trace/span.rs | 14 +++++++------- opentelemetry-sdk/src/trace/span_processor.rs | 2 +- 3 files changed, 12 insertions(+), 12 deletions(-) diff --git a/examples/tracing-http-propagator/src/server.rs b/examples/tracing-http-propagator/src/server.rs index 7ce16c65e1..ff5716f82b 100644 --- a/examples/tracing-http-propagator/src/server.rs +++ b/examples/tracing-http-propagator/src/server.rs @@ -111,7 +111,7 @@ async fn router( } #[derive(Debug, Default)] -/// A custom span processor that counts concurrent requests for each route (indetified by the http.route +/// A custom span processor that counts concurrent requests for each route (indentified by the http.route /// attribute) and adds that information to the span attributes. struct RouteConcurrencyCounterSpanProcessor(Mutex>); @@ -181,9 +181,9 @@ fn obfuscate_http_auth_url(s: &str) -> Option { /// A custom span processor that uses on_ending to obfuscate sensitive information in span attributes. /// /// Currently this only overrides http auth information in the URI. -struct SpanOnbfuscationProcessor; +struct SpanObfuscationProcessor; -impl SpanProcessor for SpanOnbfuscationProcessor { +impl SpanProcessor for SpanObfuscationProcessor { fn force_flush(&self) -> OTelSdkResult { Ok(()) } @@ -260,7 +260,7 @@ fn init_tracer() -> SdkTracerProvider { let provider = SdkTracerProvider::builder() .with_span_processor(RouteConcurrencyCounterSpanProcessor::default()) .with_span_processor(EnrichWithBaggageSpanProcessor) - .with_span_processor(SpanOnbfuscationProcessor) + .with_span_processor(SpanObfuscationProcessor) .with_simple_exporter(SpanExporter::default()) .build(); diff --git a/opentelemetry-sdk/src/trace/span.rs b/opentelemetry-sdk/src/trace/span.rs index b03ac701ad..df314d8df7 100644 --- a/opentelemetry-sdk/src/trace/span.rs +++ b/opentelemetry-sdk/src/trace/span.rs @@ -283,7 +283,7 @@ impl Drop for Span { pub struct FinishedSpan { span: Option, is_last_processor: bool, - is_consummed: bool, + is_consumed: bool, } impl FinishedSpan { @@ -298,13 +298,13 @@ impl FinishedSpan { FinishedSpan { span: Some(span_data), is_last_processor: false, - is_consummed: false, + is_consumed: false, } } fn reset(&mut self, last_processor: bool) { self.is_last_processor = last_processor; - self.is_consummed = false; + self.is_consumed = false; } /// Takes ownership of the span data in the `FinishedSpan`. @@ -314,10 +314,10 @@ impl FinishedSpan { /// This function panics /// * if it called twice in the same SpanProcessor::on_end pub fn consume(&mut self) -> crate::trace::SpanData { - if self.is_consummed { + if self.is_consumed { panic!("Span data has already been consumed"); } - self.is_consummed = true; + self.is_consumed = true; if self.is_last_processor { self.span .take() @@ -379,8 +379,8 @@ impl ReadableSpan for FinishedSpan { impl std::fmt::Debug for FinishedSpan { fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { let mut fmt = f.debug_struct("FinishedSpan"); - if self.is_consummed { - fmt.field("consummed", &self.is_consummed); + if self.is_consumed { + fmt.field("consumed", &self.is_consumed); } else { fmt.field("span", &self.span_data_ref()); } diff --git a/opentelemetry-sdk/src/trace/span_processor.rs b/opentelemetry-sdk/src/trace/span_processor.rs index 6a31a3ca5b..7f6760861b 100644 --- a/opentelemetry-sdk/src/trace/span_processor.rs +++ b/opentelemetry-sdk/src/trace/span_processor.rs @@ -80,7 +80,7 @@ pub trait SpanProcessor: Send + Sync + std::fmt::Debug { /// not block or throw exceptions. fn on_start(&self, span: &mut Span, cx: &Context); #[cfg(feature = "experimental_span_processor_on_ending")] - /// `on_ending` is called when a `Span` is ending. The en timestampe has already + /// `on_ending` is called when a `Span` is ending. The end timestamp has already /// been computed. /// Mutations done to the span in this method will be reflected in the span passed /// to other span processors. From 5dae8a7cc6fb4fa9b32dfc259174af66c83a8ed4 Mon Sep 17 00:00:00 2001 From: paullegranddc Date: Tue, 20 May 2025 17:43:05 +0200 Subject: [PATCH 10/26] fix: insert count as number instead of string in example --- examples/tracing-http-propagator/src/server.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/examples/tracing-http-propagator/src/server.rs b/examples/tracing-http-propagator/src/server.rs index ff5716f82b..6a0e9bfd88 100644 --- a/examples/tracing-http-propagator/src/server.rs +++ b/examples/tracing-http-propagator/src/server.rs @@ -140,7 +140,7 @@ impl SpanProcessor for RouteConcurrencyCounterSpanProcessor { *count += 1; span.set_attribute(KeyValue::new( "http.route.concurrent_requests", - count.to_string(), + *count as i64, )); } From a28a4616aaf6ee6557ca0b6dc845f5a6809b851d Mon Sep 17 00:00:00 2001 From: paullegranddc Date: Tue, 20 May 2025 17:49:32 +0200 Subject: [PATCH 11/26] fix: make on_ending contract more explicit --- opentelemetry-sdk/src/trace/span_processor.rs | 9 ++++++--- 1 file changed, 6 insertions(+), 3 deletions(-) diff --git a/opentelemetry-sdk/src/trace/span_processor.rs b/opentelemetry-sdk/src/trace/span_processor.rs index 7f6760861b..7ba0e83388 100644 --- a/opentelemetry-sdk/src/trace/span_processor.rs +++ b/opentelemetry-sdk/src/trace/span_processor.rs @@ -82,12 +82,15 @@ pub trait SpanProcessor: Send + Sync + std::fmt::Debug { #[cfg(feature = "experimental_span_processor_on_ending")] /// `on_ending` is called when a `Span` is ending. The end timestamp has already /// been computed. - /// Mutations done to the span in this method will be reflected in the span passed - /// to other span processors. /// This method is called synchronously within the `Span::end` API, therefore it /// should not block or throw an exception. + /// /// If multiple SpanProcessors are registered, their on_ending methods are invoked - /// in the order they have been registered. + /// in the order they have been registered, and mutations to the span will be + /// visible to the next processor. + /// + /// The tracer will call `on_ending` for all span processors before calling `on_end` + /// for any of them. fn on_ending(&self, _span: &mut Span) { // Default implementation is a no-op so existing processor implementations // don't break if this feature in enabled transitively. From edb290a48cf426f5ae3b0bab1fc4955458b9f3b0 Mon Sep 17 00:00:00 2001 From: paullegranddc Date: Tue, 20 May 2025 17:55:11 +0200 Subject: [PATCH 12/26] fix: on_ending comment --- opentelemetry-sdk/src/trace/span_processor.rs | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/opentelemetry-sdk/src/trace/span_processor.rs b/opentelemetry-sdk/src/trace/span_processor.rs index 7ba0e83388..3feaa8affc 100644 --- a/opentelemetry-sdk/src/trace/span_processor.rs +++ b/opentelemetry-sdk/src/trace/span_processor.rs @@ -85,9 +85,9 @@ pub trait SpanProcessor: Send + Sync + std::fmt::Debug { /// This method is called synchronously within the `Span::end` API, therefore it /// should not block or throw an exception. /// - /// If multiple SpanProcessors are registered, their on_ending methods are invoked - /// in the order they have been registered, and mutations to the span will be - /// visible to the next processor. + /// If multiple span processors are registered, their on_ending methods are invoked + /// in the order the span processors have been registered, and mutations to the span + /// will be visible to the next processor. /// /// The tracer will call `on_ending` for all span processors before calling `on_end` /// for any of them. From 07840d8810f691251f469d7fb03d140a430d9823 Mon Sep 17 00:00:00 2001 From: paullegranddc Date: Tue, 27 May 2025 11:11:42 +0200 Subject: [PATCH 13/26] nit: move span endtime logic to callers of ensure_ended_and_exported --- opentelemetry-sdk/src/trace/span.rs | 31 +++++++++++++---------------- 1 file changed, 14 insertions(+), 17 deletions(-) diff --git a/opentelemetry-sdk/src/trace/span.rs b/opentelemetry-sdk/src/trace/span.rs index df314d8df7..cba7aef83d 100644 --- a/opentelemetry-sdk/src/trace/span.rs +++ b/opentelemetry-sdk/src/trace/span.rs @@ -192,26 +192,18 @@ impl opentelemetry::trace::Span for Span { /// Finishes the span with given timestamp. fn end_with_timestamp(&mut self, timestamp: SystemTime) { - self.ensure_ended_and_exported(Some(timestamp)); + if let Some(data) = self.data.as_mut() { + data.end_time = timestamp; + } + self.ensure_ended_and_exported(); } } impl Span { - fn ensure_ended_and_exported(&mut self, timestamp: Option) { - match self.data { - Some(ref mut data) => { - // ensure end time is set via explicit end or implicitly on drop - if let Some(timestamp) = timestamp { - data.end_time = timestamp; - } else if data.end_time == data.start_time { - data.end_time = opentelemetry::time::now(); - } - } - None => { - return; - } - }; - + /// Span ending logic + /// + /// The end timestamp of the span has to be set before calling this function + fn ensure_ended_and_exported(&mut self) { if self.tracer.provider().is_shutdown() { return; } @@ -252,7 +244,12 @@ impl Span { impl Drop for Span { /// Report span on inner drop fn drop(&mut self) { - self.ensure_ended_and_exported(None); + if let Some(ref mut data) = self.data { + if data.end_time == data.start_time { + data.end_time = opentelemetry::time::now(); + } + } + self.ensure_ended_and_exported(); } } From 9a2d119dd83f384e66293235e865d5dfafe53533 Mon Sep 17 00:00:00 2001 From: paullegranddc Date: Tue, 27 May 2025 11:44:22 +0200 Subject: [PATCH 14/26] fix: remove panic in FinishedSpan --- opentelemetry-sdk/src/trace/span.rs | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/opentelemetry-sdk/src/trace/span.rs b/opentelemetry-sdk/src/trace/span.rs index cba7aef83d..2f759093de 100644 --- a/opentelemetry-sdk/src/trace/span.rs +++ b/opentelemetry-sdk/src/trace/span.rs @@ -201,7 +201,7 @@ impl opentelemetry::trace::Span for Span { impl Span { /// Span ending logic - /// + /// /// The end timestamp of the span has to be set before calling this function fn ensure_ended_and_exported(&mut self) { if self.tracer.provider().is_shutdown() { @@ -312,7 +312,7 @@ impl FinishedSpan { /// * if it called twice in the same SpanProcessor::on_end pub fn consume(&mut self) -> crate::trace::SpanData { if self.is_consumed { - panic!("Span data has already been consumed"); + opentelemetry::otel_error!(name: "FinishedSpan.ConsumeTwice", message = "consume called twice on FinishedSpan in the same span processor"); } self.is_consumed = true; if self.is_last_processor { From 48a3edf0b8fd39b35a442120c14abf8d7e2a9cd4 Mon Sep 17 00:00:00 2001 From: paullegranddc Date: Tue, 27 May 2025 11:44:34 +0200 Subject: [PATCH 15/26] fix: add tests and fix clippy --- .../benches/span_processor_api.rs | 4 +- opentelemetry-sdk/src/trace/span.rs | 76 ++++++++++++++++++- 2 files changed, 77 insertions(+), 3 deletions(-) diff --git a/opentelemetry-sdk/benches/span_processor_api.rs b/opentelemetry-sdk/benches/span_processor_api.rs index 5e22c2125f..4d6f26540b 100644 --- a/opentelemetry-sdk/benches/span_processor_api.rs +++ b/opentelemetry-sdk/benches/span_processor_api.rs @@ -45,13 +45,13 @@ fn create_tracer(span_processors_count: usize) -> sdktrace::SdkTracer { builder.build().tracer("tracer") } -fn create_span(tracer: &sdktrace::Tracer) { +fn create_span(tracer: &sdktrace::Tracer) -> sdktrace::Span { let mut span = tracer.start("foo"); span.set_attribute(KeyValue::new("key1", false)); span.set_attribute(KeyValue::new("key2", "hello")); span.set_attribute(KeyValue::new("key4", 123.456)); span.add_event("my_event", vec![KeyValue::new("key1", "value1")]); - span.end(); + span } fn criterion_benchmark(c: &mut Criterion) { diff --git a/opentelemetry-sdk/src/trace/span.rs b/opentelemetry-sdk/src/trace/span.rs index 2f759093de..8f64d16d39 100644 --- a/opentelemetry-sdk/src/trace/span.rs +++ b/opentelemetry-sdk/src/trace/span.rs @@ -558,7 +558,7 @@ mod tests { DEFAULT_MAX_ATTRIBUTES_PER_EVENT, DEFAULT_MAX_ATTRIBUTES_PER_LINK, DEFAULT_MAX_ATTRIBUTES_PER_SPAN, DEFAULT_MAX_EVENT_PER_SPAN, DEFAULT_MAX_LINKS_PER_SPAN, }; - use crate::trace::{SpanEvents, SpanLinks}; + use crate::trace::{SpanEvents, SpanLinks, SpanProcessor}; use opentelemetry::trace::{ self, Span as _, SpanBuilder, TraceFlags, TraceId, Tracer, TracerProvider, }; @@ -1009,4 +1009,78 @@ mod tests { // return none if the provider has already been dropped assert!(dropped_span.exported_data().is_none()); } + + #[test] + fn test_finished_span_consume() { + use super::ReadableSpan; + + #[derive(Debug)] + struct TestSpanProcessor; + impl SpanProcessor for TestSpanProcessor { + fn on_end(&self, span: &mut FinishedSpan) { + let parent_id = span.parent_span_id(); + let name = span.name(); + + assert_eq!(name, "test_span"); + assert_eq!(parent_id, SpanId::INVALID); + let _ = span.consume(); + } + + fn on_start(&self, _span: &mut Span, _cx: &opentelemetry::Context) {} + + fn force_flush(&self) -> crate::error::OTelSdkResult { + Ok(()) + } + + fn shutdown(&self) -> crate::error::OTelSdkResult { + Ok(()) + } + } + + let provider = crate::trace::SdkTracerProvider::builder() + .with_span_processor(TestSpanProcessor) + .build(); + let tracer = provider.tracer("test"); + { + tracer.start("test_span"); + } + let res = provider.shutdown(); + println!("{:?}", res); + assert!(res.is_ok()); + } + + #[test] + #[should_panic] + fn test_finished_span_consume_twice() { + #[derive(Debug)] + struct TestSpanProcessor; + impl SpanProcessor for TestSpanProcessor { + fn on_end(&self, span: &mut FinishedSpan) { + let _ = span.consume(); + // consume again to trigger panic + let _ = span.consume(); + } + + fn on_start(&self, _span: &mut Span, _cx: &opentelemetry::Context) {} + + fn force_flush(&self) -> crate::error::OTelSdkResult { + Ok(()) + } + + fn shutdown(&self) -> crate::error::OTelSdkResult { + Ok(()) + } + } + + let provider = crate::trace::SdkTracerProvider::builder() + .with_span_processor(TestSpanProcessor) + .build(); + let tracer = provider.tracer("test"); + { + tracer.start("test_span"); + } + let res = provider.shutdown(); + println!("{:?}", res); + assert!(res.is_ok()); + } } From 4f9b012cc108585d8ed1e1b3d4c9ef59752bc584 Mon Sep 17 00:00:00 2001 From: paullegranddc Date: Tue, 27 May 2025 14:03:31 +0200 Subject: [PATCH 16/26] nit: use None rather than empty string when no recording --- opentelemetry-sdk/src/trace/span.rs | 20 ++++++++++---------- 1 file changed, 10 insertions(+), 10 deletions(-) diff --git a/opentelemetry-sdk/src/trace/span.rs b/opentelemetry-sdk/src/trace/span.rs index b61a146528..efa6dc8a7f 100644 --- a/opentelemetry-sdk/src/trace/span.rs +++ b/opentelemetry-sdk/src/trace/span.rs @@ -341,8 +341,8 @@ impl ReadableSpan for FinishedSpan { self.span_data_ref().span_kind.clone() } - fn name(&self) -> &str { - self.span_data_ref().name.as_ref() + fn name(&self) -> Option<&str> { + Some(&self.span.as_ref()?.name) } fn start_time(&self) -> Option { Some(self.span_data_ref().start_time) @@ -400,8 +400,8 @@ pub trait ReadableSpan { /// Returns the name of the span. /// - /// Returns an empty string if the span is not recording. - fn name(&self) -> &str; + /// Returns `None` if the span is not recording. + fn name(&self) -> Option<&str>; /// Returns the start time of the span. /// @@ -464,11 +464,11 @@ impl ReadableSpan for Span { .unwrap_or(SpanKind::Internal) } - fn name(&self) -> &str { - self.data - .as_ref() - .map(|data| data.name.as_ref()) - .unwrap_or("") + /// Returns the name of the span. + /// + /// Returns `None` if the span is not recording. + fn name(&self) -> Option<&str> { + Some(&self.data.as_ref()?.name) } fn start_time(&self) -> Option { @@ -1021,7 +1021,7 @@ mod tests { let parent_id = span.parent_span_id(); let name = span.name(); - assert_eq!(name, "test_span"); + assert_eq!(name, Some("test_span")); assert_eq!(parent_id, SpanId::INVALID); let _ = span.consume(); } From ddcf5f92a05c36e1df975f2b28bb34f0b2c80374 Mon Sep 17 00:00:00 2001 From: paullegranddc Date: Tue, 27 May 2025 14:36:20 +0200 Subject: [PATCH 17/26] nit: add test coverage for ReadableSpan implementation --- opentelemetry-sdk/src/trace/span.rs | 120 ++++++++++++++++++++++++---- 1 file changed, 104 insertions(+), 16 deletions(-) diff --git a/opentelemetry-sdk/src/trace/span.rs b/opentelemetry-sdk/src/trace/span.rs index efa6dc8a7f..84ea617477 100644 --- a/opentelemetry-sdk/src/trace/span.rs +++ b/opentelemetry-sdk/src/trace/span.rs @@ -269,7 +269,7 @@ impl Drop for Span { /// use opentelemetry_sdk::trace::{FinishedSpan, ReadableSpan}; /// fn on_end(span: &mut FinishedSpan) { /// // Read the span data without consuming it -/// if span.name() != "my_span" { +/// if span.name() != Some("my_span") { /// return; /// } /// // Consume the span data, potentially cloning it @@ -342,7 +342,7 @@ impl ReadableSpan for FinishedSpan { } fn name(&self) -> Option<&str> { - Some(&self.span.as_ref()?.name) + Some(&self.span_data_ref().name) } fn start_time(&self) -> Option { Some(self.span_data_ref().start_time) @@ -558,9 +558,9 @@ mod tests { DEFAULT_MAX_ATTRIBUTES_PER_EVENT, DEFAULT_MAX_ATTRIBUTES_PER_LINK, DEFAULT_MAX_ATTRIBUTES_PER_SPAN, DEFAULT_MAX_EVENT_PER_SPAN, DEFAULT_MAX_LINKS_PER_SPAN, }; - use crate::trace::{SpanEvents, SpanLinks, SpanProcessor}; + use crate::trace::{SdkTracer, SpanEvents, SpanLinks, SpanProcessor}; use opentelemetry::trace::{ - self, Span as _, SpanBuilder, TraceFlags, TraceId, Tracer, TracerProvider, + self, SamplingResult, Span as _, SpanBuilder, TraceFlags, TraceId, Tracer, TracerProvider, }; use std::time::Duration; use std::vec; @@ -989,6 +989,82 @@ mod tests { assert_eq!(event_vec.len(), DEFAULT_MAX_EVENT_PER_SPAN as usize); } + fn make_test_span(tracer: &SdkTracer, sampling_decision: trace::SamplingDecision) -> Span { + tracer + .span_builder("test_span") + .with_sampling_result(SamplingResult { + decision: sampling_decision, + attributes: vec![], + trace_state: Default::default(), + }) + .with_attributes(vec![KeyValue::new("k", "v")]) + .with_kind(SpanKind::Client) + .with_events(vec![Event::with_name("test_event")]) + .with_links(vec![Link::with_context(SpanContext::new( + TraceId::from_bytes((1234_u128).to_ne_bytes()), + SpanId::from_bytes((5678_u64).to_ne_bytes()), + Default::default(), + false, + Default::default(), + ))]) + .with_span_id(SpanId::from_bytes((1337_u64).to_ne_bytes())) + .start(tracer) + } + + #[test] + fn test_readable_span() { + use super::ReadableSpan; + + let provider = crate::trace::SdkTracerProvider::builder() + .with_simple_exporter(NoopSpanExporter::new()) + .build(); + let tracer = provider.tracer("test"); + { + // ReadableSpan trait methods for recording span + let span = make_test_span(&tracer, trace::SamplingDecision::RecordOnly); + + assert_eq!( + span.context().span_id(), + SpanId::from_bytes((1337_u64).to_ne_bytes()) + ); + + assert_eq!(span.name(), Some("test_span")); + assert_eq!(span.span_kind(), SpanKind::Client); + assert!(span.start_time().is_some()); + assert!(span.end_time().is_some()); + assert_eq!(span.attributes(), &[KeyValue::new("k", "v")]); + assert_eq!(span.dropped_attributes_count(), 0); + assert_eq!(span.events().len(), 1); + assert_eq!(span.events()[0].name, "test_event"); + assert_eq!(span.dropped_events_count(), 0); + assert_eq!(span.links().len(), 1); + } + + { + // ReadableSpan trait methods for non-recording span + let span = make_test_span(&tracer, trace::SamplingDecision::Drop); + + assert_eq!( + span.context().span_id(), + SpanId::from_bytes((1337_u64).to_ne_bytes()) + ); + + assert_eq!(span.name(), None); + assert_eq!(span.span_kind(), SpanKind::Internal); + assert!(span.start_time().is_none()); + assert!(span.end_time().is_none()); + assert_eq!(span.attributes(), &[]); + assert_eq!(span.dropped_attributes_count(), 0); + assert_eq!(span.events().len(), 0); + assert_eq!(span.dropped_events_count(), 0); + assert_eq!(span.links().len(), 0); + assert_eq!( + span.context().span_id(), + SpanId::from_bytes((1337_u64).to_ne_bytes()) + ); + } + } + #[test] fn test_span_exported_data() { let provider = crate::trace::SdkTracerProvider::builder() @@ -1018,11 +1094,22 @@ mod tests { struct TestSpanProcessor; impl SpanProcessor for TestSpanProcessor { fn on_end(&self, span: &mut FinishedSpan) { - let parent_id = span.parent_span_id(); - let name = span.name(); + assert_eq!( + span.context().span_id(), + SpanId::from_bytes((1337_u64).to_ne_bytes()) + ); + + assert_eq!(span.name(), Some("test_span")); + assert_eq!(span.span_kind(), SpanKind::Client); + assert!(span.start_time().is_some()); + assert!(span.end_time().is_some()); + assert_eq!(span.attributes(), &[KeyValue::new("k", "v")]); + assert_eq!(span.dropped_attributes_count(), 0); + assert_eq!(span.events().len(), 1); + assert_eq!(span.events()[0].name, "test_event"); + assert_eq!(span.dropped_events_count(), 0); + assert_eq!(span.links().len(), 1); - assert_eq!(name, Some("test_span")); - assert_eq!(parent_id, SpanId::INVALID); let _ = span.consume(); } @@ -1040,10 +1127,10 @@ mod tests { let provider = crate::trace::SdkTracerProvider::builder() .with_span_processor(TestSpanProcessor) .build(); - let tracer = provider.tracer("test"); - { - tracer.start("test_span"); - } + drop(make_test_span( + &provider.tracer("test"), + trace::SamplingDecision::RecordAndSample, + )); let res = provider.shutdown(); println!("{:?}", res); assert!(res.is_ok()); @@ -1075,10 +1162,11 @@ mod tests { let provider = crate::trace::SdkTracerProvider::builder() .with_span_processor(TestSpanProcessor) .build(); - let tracer = provider.tracer("test"); - { - tracer.start("test_span"); - } + drop(make_test_span( + &provider.tracer("test"), + trace::SamplingDecision::RecordAndSample, + )); + let res = provider.shutdown(); println!("{:?}", res); assert!(res.is_ok()); From 28dc62b8fb3c1dd23b034d5c979453a2dc92eb9e Mon Sep 17 00:00:00 2001 From: paullegranddc Date: Tue, 27 May 2025 14:44:03 +0200 Subject: [PATCH 18/26] fix: add non panicking version of FinishedSpan::consume --- opentelemetry-sdk/src/trace/span.rs | 22 ++++++++++++++-------- 1 file changed, 14 insertions(+), 8 deletions(-) diff --git a/opentelemetry-sdk/src/trace/span.rs b/opentelemetry-sdk/src/trace/span.rs index 84ea617477..f680ef1b06 100644 --- a/opentelemetry-sdk/src/trace/span.rs +++ b/opentelemetry-sdk/src/trace/span.rs @@ -294,7 +294,7 @@ impl FinishedSpan { pub fn new(span_data: crate::trace::SpanData) -> Self { FinishedSpan { span: Some(span_data), - is_last_processor: false, + is_last_processor: true, is_consumed: false, } } @@ -314,16 +314,22 @@ impl FinishedSpan { if self.is_consumed { opentelemetry::otel_error!(name: "FinishedSpan.ConsumeTwice", message = "consume called twice on FinishedSpan in the same span processor"); } + self.try_consume() + .expect("Span data has already been consumed") + } + + /// Takes ownership of the span data in the `FinishedSpan`. + /// + /// Returns `None` if the span data has already been consumed. + pub fn try_consume(&mut self) -> Option { + if self.is_consumed { + return None; + } self.is_consumed = true; if self.is_last_processor { - self.span - .take() - .expect("Span data has already been consumed") + self.span.take() } else { - self.span - .as_ref() - .expect("Span data has already been consumed") - .clone() + self.span.clone() } } } From b80d612a47d28da18dd7d1676b23bbecd954d790 Mon Sep 17 00:00:00 2001 From: paullegranddc Date: Tue, 27 May 2025 14:46:05 +0200 Subject: [PATCH 19/26] fix: test non panicking behavior --- opentelemetry-sdk/src/trace/span.rs | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) diff --git a/opentelemetry-sdk/src/trace/span.rs b/opentelemetry-sdk/src/trace/span.rs index f680ef1b06..255480b0ec 100644 --- a/opentelemetry-sdk/src/trace/span.rs +++ b/opentelemetry-sdk/src/trace/span.rs @@ -1143,15 +1143,13 @@ mod tests { } #[test] - #[should_panic] fn test_finished_span_consume_twice() { #[derive(Debug)] struct TestSpanProcessor; impl SpanProcessor for TestSpanProcessor { fn on_end(&self, span: &mut FinishedSpan) { let _ = span.consume(); - // consume again to trigger panic - let _ = span.consume(); + assert!(span.try_consume().is_none()); } fn on_start(&self, _span: &mut Span, _cx: &opentelemetry::Context) {} From 8601bd1e3cdbabef58fe54123a94a57fcfc309d1 Mon Sep 17 00:00:00 2001 From: paullegranddc Date: Tue, 27 May 2025 15:10:28 +0200 Subject: [PATCH 20/26] fix: remove panics in finished span reads --- opentelemetry-sdk/src/trace/span.rs | 72 +++++++++++++++++++---------- 1 file changed, 48 insertions(+), 24 deletions(-) diff --git a/opentelemetry-sdk/src/trace/span.rs b/opentelemetry-sdk/src/trace/span.rs index 255480b0ec..12bf633831 100644 --- a/opentelemetry-sdk/src/trace/span.rs +++ b/opentelemetry-sdk/src/trace/span.rs @@ -284,12 +284,6 @@ pub struct FinishedSpan { } impl FinishedSpan { - fn span_data_ref(&self) -> &crate::trace::SpanData { - self.span - .as_ref() - .expect("Span data has already been consumed") - } - /// Creates a new `FinishedSpan` with the given span data. pub fn new(span_data: crate::trace::SpanData) -> Self { FinishedSpan { @@ -336,57 +330,87 @@ impl FinishedSpan { impl ReadableSpan for FinishedSpan { fn context(&self) -> &SpanContext { - &self.span_data_ref().span_context + match self.span { + Some(ref data) => &data.span_context, + None => &SpanContext::NONE, + } } fn parent_span_id(&self) -> SpanId { - self.span_data_ref().parent_span_id + match self.span { + Some(ref data) => data.parent_span_id, + None => SpanId::INVALID, + } } fn span_kind(&self) -> SpanKind { - self.span_data_ref().span_kind.clone() + match self.span { + Some(ref data) => data.span_kind.clone(), + None => SpanKind::Internal, + } } fn name(&self) -> Option<&str> { - Some(&self.span_data_ref().name) + self.span.as_ref().map(|s| s.name.as_ref()) } fn start_time(&self) -> Option { - Some(self.span_data_ref().start_time) + self.span.as_ref().map(|s| s.start_time) } fn end_time(&self) -> Option { - Some(self.span_data_ref().end_time) + self.span.as_ref().map(|s| s.end_time) } fn attributes(&self) -> &[KeyValue] { - self.span_data_ref().attributes.as_slice() + match self.span { + Some(ref data) => data.attributes.as_slice(), + None => &[], + } } fn dropped_attributes_count(&self) -> u32 { - self.span_data_ref().dropped_attributes_count + match self.span { + Some(ref data) => data.dropped_attributes_count, + None => 0, + } } fn events(&self) -> &[Event] { - self.span_data_ref().events.events.as_slice() + match self.span { + Some(ref data) => data.events.events.as_slice(), + None => &[], + } } fn dropped_events_count(&self) -> u32 { - self.span_data_ref().events.dropped_count + match self.span { + Some(ref data) => data.events.dropped_count, + None => 0, + } } fn links(&self) -> &[Link] { - self.span_data_ref().links.links.as_slice() + match self.span { + Some(ref data) => data.links.links.as_slice(), + None => &[], + } } + fn dropped_links_count(&self) -> u32 { - self.span_data_ref().links.dropped_count + match self.span { + Some(ref data) => data.links.dropped_count, + None => 0, + } } fn status(&self) -> &Status { - &self.span_data_ref().status + match self.span { + Some(ref data) => &data.status, + None => &Status::Unset, + } } } impl std::fmt::Debug for FinishedSpan { fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { let mut fmt = f.debug_struct("FinishedSpan"); - if self.is_consumed { - fmt.field("consumed", &self.is_consumed); - } else { - fmt.field("span", &self.span_data_ref()); - } + match &self.span { + Some(s) if !self.is_consumed => fmt.field("span", s), + _ => fmt.field("consumed", &self.is_consumed), + }; fmt.finish() } } From 86fd3da78ec3b7ba0698d0767ad77d5e6615c6bf Mon Sep 17 00:00:00 2001 From: paullegranddc Date: Tue, 27 May 2025 15:35:42 +0200 Subject: [PATCH 21/26] fix: remove no longer relevant comment --- opentelemetry-sdk/src/trace/span.rs | 3 --- 1 file changed, 3 deletions(-) diff --git a/opentelemetry-sdk/src/trace/span.rs b/opentelemetry-sdk/src/trace/span.rs index 12bf633831..66668adbee 100644 --- a/opentelemetry-sdk/src/trace/span.rs +++ b/opentelemetry-sdk/src/trace/span.rs @@ -262,9 +262,6 @@ impl Drop for Span { /// If `consume`` is never called, the on_ending method will not perform any copy of /// the span data. /// -/// Calling any `ReadableSpan` method on the `FinishedSpan` will panic if the span data -/// has aready been consumed. -/// /// ``` /// use opentelemetry_sdk::trace::{FinishedSpan, ReadableSpan}; /// fn on_end(span: &mut FinishedSpan) { From 2dd7ccfc00dc11953e5158b28a05108ca269d656 Mon Sep 17 00:00:00 2001 From: paullegranddc Date: Tue, 27 May 2025 17:24:29 +0200 Subject: [PATCH 22/26] nit: add comment explaining why we grab the data from the span mutably --- opentelemetry-sdk/src/trace/span.rs | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/opentelemetry-sdk/src/trace/span.rs b/opentelemetry-sdk/src/trace/span.rs index 66668adbee..08cb3d83cb 100644 --- a/opentelemetry-sdk/src/trace/span.rs +++ b/opentelemetry-sdk/src/trace/span.rs @@ -223,11 +223,9 @@ impl Span { .. } = self; - // skip if data has already been exported - let data = match data.take() { - Some(data) => data, - None => return, - }; + // Grab the span data from the span and leave it empty + // This way we don't have to clone the data + let Some(data) = data.take() else { return }; let span_context: SpanContext = std::mem::replace(span_context, SpanContext::empty_context()); @@ -245,6 +243,7 @@ impl Drop for Span { /// Report span on inner drop fn drop(&mut self) { if let Some(ref mut data) = self.data { + // if the span has not been ended, set the end time to now if data.end_time == data.start_time { data.end_time = opentelemetry::time::now(); } @@ -1152,6 +1151,7 @@ mod tests { } let provider = crate::trace::SdkTracerProvider::builder() + .with_span_processor(TestSpanProcessor) .with_span_processor(TestSpanProcessor) .build(); drop(make_test_span( From 79270ce037a504cde915ba0283f1ea6bbab1b650 Mon Sep 17 00:00:00 2001 From: paullegranddc Date: Tue, 27 May 2025 17:33:58 +0200 Subject: [PATCH 23/26] fix: rename ensure_ended_and_exported to end_and_export --- opentelemetry-sdk/src/trace/span.rs | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/opentelemetry-sdk/src/trace/span.rs b/opentelemetry-sdk/src/trace/span.rs index 08cb3d83cb..1007a8d4fd 100644 --- a/opentelemetry-sdk/src/trace/span.rs +++ b/opentelemetry-sdk/src/trace/span.rs @@ -195,7 +195,7 @@ impl opentelemetry::trace::Span for Span { if let Some(data) = self.data.as_mut() { data.end_time = timestamp; } - self.ensure_ended_and_exported(); + self.end_and_export(); } } @@ -203,7 +203,7 @@ impl Span { /// Span ending logic /// /// The end timestamp of the span has to be set before calling this function - fn ensure_ended_and_exported(&mut self) { + fn end_and_export(&mut self) { if self.tracer.provider().is_shutdown() { return; } @@ -243,12 +243,12 @@ impl Drop for Span { /// Report span on inner drop fn drop(&mut self) { if let Some(ref mut data) = self.data { - // if the span has not been ended, set the end time to now + // if the span end_time has not been set, set it to now if data.end_time == data.start_time { data.end_time = opentelemetry::time::now(); } } - self.ensure_ended_and_exported(); + self.end_and_export(); } } From d8f2f80c1d5ec73778d7ad27d4217ebceaa6dd5d Mon Sep 17 00:00:00 2001 From: paullegranddc Date: Tue, 27 May 2025 17:49:49 +0200 Subject: [PATCH 24/26] fix: example return instead of panic on posined lock --- examples/tracing-http-propagator/src/server.rs | 8 ++++++-- 1 file changed, 6 insertions(+), 2 deletions(-) diff --git a/examples/tracing-http-propagator/src/server.rs b/examples/tracing-http-propagator/src/server.rs index 3f4e1f30e6..6e52f9a487 100644 --- a/examples/tracing-http-propagator/src/server.rs +++ b/examples/tracing-http-propagator/src/server.rs @@ -136,7 +136,9 @@ impl SpanProcessor for RouteConcurrencyCounterSpanProcessor { else { return; }; - let mut counts = self.0.lock().unwrap(); + let Ok(mut counts) = self.0.lock() else { + return; + }; let count = counts.entry(route.key.clone()).or_default(); *count += 1; span.set_attribute(KeyValue::new( @@ -156,7 +158,9 @@ impl SpanProcessor for RouteConcurrencyCounterSpanProcessor { else { return; }; - let mut counts = self.0.lock().unwrap(); + let Ok(mut counts) = self.0.lock() else { + return; + }; let Some(count) = counts.get_mut(&route.key) else { return; }; From 343aba2b7d4c2b5896f52f731728d21bef9c1654 Mon Sep 17 00:00:00 2001 From: paullegranddc Date: Tue, 27 May 2025 20:33:57 +0200 Subject: [PATCH 25/26] revert: remove on_ending API --- examples/tracing-http-propagator/Cargo.toml | 2 +- .../tracing-http-propagator/src/server.rs | 41 ------------------- opentelemetry-sdk/Cargo.toml | 1 - opentelemetry-sdk/src/trace/span.rs | 10 +---- opentelemetry-sdk/src/trace/span_processor.rs | 16 -------- 5 files changed, 2 insertions(+), 68 deletions(-) diff --git a/examples/tracing-http-propagator/Cargo.toml b/examples/tracing-http-propagator/Cargo.toml index 6d0c9ef0ac..a4fa13d371 100644 --- a/examples/tracing-http-propagator/Cargo.toml +++ b/examples/tracing-http-propagator/Cargo.toml @@ -25,7 +25,7 @@ hyper = { workspace = true, features = ["full"] } hyper-util = { workspace = true, features = ["full"] } tokio = { workspace = true, features = ["full"] } opentelemetry = { path = "../../opentelemetry" } -opentelemetry_sdk = { path = "../../opentelemetry-sdk", features = ["experimental_span_processor_on_ending"]} +opentelemetry_sdk = { path = "../../opentelemetry-sdk" } opentelemetry-http = { path = "../../opentelemetry-http" } opentelemetry-stdout = { workspace = true, features = ["trace", "logs"] } opentelemetry-semantic-conventions = { path = "../../opentelemetry-semantic-conventions" } diff --git a/examples/tracing-http-propagator/src/server.rs b/examples/tracing-http-propagator/src/server.rs index 6e52f9a487..d00165842a 100644 --- a/examples/tracing-http-propagator/src/server.rs +++ b/examples/tracing-http-propagator/src/server.rs @@ -171,46 +171,6 @@ impl SpanProcessor for RouteConcurrencyCounterSpanProcessor { } } -fn obfuscate_http_auth_url(s: &str) -> Option { - #[allow(clippy::unnecessary_to_owned)] - let uri = hyper::http::Uri::from_maybe_shared(s.to_string()).ok()?; - let authority = uri.authority()?; - let (_, url) = authority.as_str().split_once('@')?; - let new_auth = format!("REDACTED_USERNAME:REDACTED_PASSWORD@{url}"); - let mut parts = uri.into_parts(); - parts.authority = Some(hyper::http::uri::Authority::from_maybe_shared(new_auth).ok()?); - Some(hyper::Uri::from_parts(parts).ok()?.to_string()) -} - -#[derive(Debug)] -/// A custom span processor that uses on_ending to obfuscate sensitive information in span attributes. -/// -/// Currently this only overrides http auth information in the URI. -struct SpanObfuscationProcessor; - -impl SpanProcessor for SpanObfuscationProcessor { - fn force_flush(&self) -> OTelSdkResult { - Ok(()) - } - - fn shutdown_with_timeout(&self, _timeout: Duration) -> crate::OTelSdkResult { - Ok(()) - } - - fn on_start(&self, _span: &mut opentelemetry_sdk::trace::Span, _cx: &Context) {} - - fn on_ending(&self, span: &mut opentelemetry_sdk::trace::Span) { - let mut obfuscated_attributes = Vec::new(); - for KeyValue { key, value, .. } in span.attributes() { - if let Some(redacted_uri) = obfuscate_http_auth_url(value.as_str().as_ref()) { - obfuscated_attributes.push((key.clone(), KeyValue::new(key.clone(), redacted_uri))); - } - } - } - - fn on_end(&self, _span: &mut FinishedSpan) {} -} - /// A custom log processor that enriches LogRecords with baggage attributes. /// Baggage information is not added automatically without this processor. #[derive(Debug)] @@ -266,7 +226,6 @@ fn init_tracer() -> SdkTracerProvider { let provider = SdkTracerProvider::builder() .with_span_processor(RouteConcurrencyCounterSpanProcessor::default()) .with_span_processor(EnrichWithBaggageSpanProcessor) - .with_span_processor(SpanObfuscationProcessor) .with_simple_exporter(SpanExporter::default()) .build(); diff --git a/opentelemetry-sdk/Cargo.toml b/opentelemetry-sdk/Cargo.toml index df74575017..c18a0d6488 100644 --- a/opentelemetry-sdk/Cargo.toml +++ b/opentelemetry-sdk/Cargo.toml @@ -57,7 +57,6 @@ experimental_logs_batch_log_processor_with_async_runtime = ["logs", "experimenta experimental_logs_concurrent_log_processor = ["logs"] experimental_trace_batch_span_processor_with_async_runtime = ["trace", "experimental_async_runtime"] experimental_metrics_disable_name_validation = ["metrics"] -experimental_span_processor_on_ending = ["trace"] [[bench]] name = "context" diff --git a/opentelemetry-sdk/src/trace/span.rs b/opentelemetry-sdk/src/trace/span.rs index 1007a8d4fd..f8662fc15a 100644 --- a/opentelemetry-sdk/src/trace/span.rs +++ b/opentelemetry-sdk/src/trace/span.rs @@ -208,14 +208,6 @@ impl Span { return; } - #[cfg(feature = "experimental_span_processor_on_ending")] - { - let provider = self.tracer.provider().clone(); - for processor in provider.span_processors() { - processor.on_ending(self); - } - } - let Span { data, tracer, @@ -258,7 +250,7 @@ impl Drop for Span { /// through the `ReadableSpan` trait. /// /// Taking ownership of the span data is done by calling `consume`. -/// If `consume`` is never called, the on_ending method will not perform any copy of +/// If `consume`` is never called, the on_end method will not perform any copy of /// the span data. /// /// ``` diff --git a/opentelemetry-sdk/src/trace/span_processor.rs b/opentelemetry-sdk/src/trace/span_processor.rs index 60bce9ff78..47d0e11db5 100644 --- a/opentelemetry-sdk/src/trace/span_processor.rs +++ b/opentelemetry-sdk/src/trace/span_processor.rs @@ -79,22 +79,6 @@ pub trait SpanProcessor: Send + Sync + std::fmt::Debug { /// synchronously on the thread that started the span, therefore it should /// not block or throw exceptions. fn on_start(&self, span: &mut Span, cx: &Context); - #[cfg(feature = "experimental_span_processor_on_ending")] - /// `on_ending` is called when a `Span` is ending. The end timestamp has already - /// been computed. - /// This method is called synchronously within the `Span::end` API, therefore it - /// should not block or throw an exception. - /// - /// If multiple span processors are registered, their on_ending methods are invoked - /// in the order the span processors have been registered, and mutations to the span - /// will be visible to the next processor. - /// - /// The tracer will call `on_ending` for all span processors before calling `on_end` - /// for any of them. - fn on_ending(&self, _span: &mut Span) { - // Default implementation is a no-op so existing processor implementations - // don't break if this feature in enabled transitively. - } /// `on_end` is called after a `Span` is ended (i.e., the end timestamp is /// already set). This method is called synchronously within the `Span::end` From cead7251a5c1cbfad03ee35946183533c7b9a727 Mon Sep 17 00:00:00 2001 From: paullegranddc Date: Tue, 10 Jun 2025 00:41:44 +0200 Subject: [PATCH 26/26] fix: make consume non panicking --- opentelemetry-sdk/src/trace/span.rs | 19 +++---------------- opentelemetry-sdk/src/trace/span_processor.rs | 6 ++++-- .../span_processor_with_async_runtime.rs | 2 +- 3 files changed, 8 insertions(+), 19 deletions(-) diff --git a/opentelemetry-sdk/src/trace/span.rs b/opentelemetry-sdk/src/trace/span.rs index f8662fc15a..f0e1523799 100644 --- a/opentelemetry-sdk/src/trace/span.rs +++ b/opentelemetry-sdk/src/trace/span.rs @@ -286,25 +286,12 @@ impl FinishedSpan { self.is_consumed = false; } - /// Takes ownership of the span data in the `FinishedSpan`. - /// - /// # Panics - /// - /// This function panics - /// * if it called twice in the same SpanProcessor::on_end - pub fn consume(&mut self) -> crate::trace::SpanData { - if self.is_consumed { - opentelemetry::otel_error!(name: "FinishedSpan.ConsumeTwice", message = "consume called twice on FinishedSpan in the same span processor"); - } - self.try_consume() - .expect("Span data has already been consumed") - } - /// Takes ownership of the span data in the `FinishedSpan`. /// /// Returns `None` if the span data has already been consumed. - pub fn try_consume(&mut self) -> Option { + pub fn consume(&mut self) -> Option { if self.is_consumed { + opentelemetry::otel_error!(name: "FinishedSpan.ConsumeTwice", message = "consume called twice on FinishedSpan in the same span processor"); return None; } self.is_consumed = true; @@ -1162,7 +1149,7 @@ mod tests { impl SpanProcessor for TestSpanProcessor { fn on_end(&self, span: &mut FinishedSpan) { let _ = span.consume(); - assert!(span.try_consume().is_none()); + assert!(span.consume().is_none()); } fn on_start(&self, _span: &mut Span, _cx: &opentelemetry::Context) {} diff --git a/opentelemetry-sdk/src/trace/span_processor.rs b/opentelemetry-sdk/src/trace/span_processor.rs index 47d0e11db5..bec4203a4a 100644 --- a/opentelemetry-sdk/src/trace/span_processor.rs +++ b/opentelemetry-sdk/src/trace/span_processor.rs @@ -137,7 +137,7 @@ impl SpanProcessor for SimpleSpanProcessor { if !span.context().is_sampled() { return; } - let span = span.consume(); + let Some(span) = span.consume() else { return }; let result = self .exporter @@ -526,7 +526,9 @@ impl SpanProcessor for BatchSpanProcessor { ); return; } - let span = span.consume(); + let Some(span) = span.consume() else { + return; + }; let result = self.span_sender.try_send(span); if result.is_err() { diff --git a/opentelemetry-sdk/src/trace/span_processor_with_async_runtime.rs b/opentelemetry-sdk/src/trace/span_processor_with_async_runtime.rs index cfa039eefb..c68e10c710 100644 --- a/opentelemetry-sdk/src/trace/span_processor_with_async_runtime.rs +++ b/opentelemetry-sdk/src/trace/span_processor_with_async_runtime.rs @@ -107,7 +107,7 @@ impl SpanProcessor for BatchSpanProcessor { if !span.context().is_sampled() { return; } - let span = span.consume(); + let Some(span) = span.consume() else { return }; let result = self.message_sender.try_send(BatchMessage::ExportSpan(span));