diff --git a/examples/tracing-http-propagator/src/server.rs b/examples/tracing-http-propagator/src/server.rs index 49d4a084e7..d00165842a 100644 --- a/examples/tracing-http-propagator/src/server.rs +++ b/examples/tracing-http-propagator/src/server.rs @@ -15,12 +15,17 @@ use opentelemetry_sdk::{ error::OTelSdkResult, logs::{LogProcessor, SdkLogRecord, SdkLoggerProvider}, propagation::{BaggagePropagator, TraceContextPropagator}, - trace::{SdkTracerProvider, SpanProcessor}, + trace::{FinishedSpan, ReadableSpan, SdkTracerProvider, SpanProcessor}, }; use opentelemetry_semantic_conventions::trace; use opentelemetry_stdout::{LogExporter, SpanExporter}; use std::time::Duration; -use std::{convert::Infallible, net::SocketAddr, sync::OnceLock}; +use std::{ + collections::HashMap, + convert::Infallible, + net::SocketAddr, + sync::{Mutex, OnceLock}, +}; use tokio::net::TcpListener; use tracing::info; use tracing_subscriber::{layer::SubscriberExt, util::SubscriberInitExt}; @@ -84,6 +89,7 @@ async fn router( let span = tracer .span_builder("router") .with_kind(SpanKind::Server) + .with_attributes([KeyValue::new("http.route", req.uri().path().to_string())]) .start_with_context(tracer, &parent_cx); info!(name = "router", message = "Dispatching request"); @@ -105,6 +111,66 @@ async fn router( response } +#[derive(Debug, Default)] +/// A custom span processor that counts concurrent requests for each route (indentified by the http.route +/// attribute) and adds that information to the span attributes. +struct RouteConcurrencyCounterSpanProcessor(Mutex>); + +impl SpanProcessor for RouteConcurrencyCounterSpanProcessor { + fn force_flush(&self) -> OTelSdkResult { + Ok(()) + } + + fn shutdown_with_timeout(&self, _timeout: Duration) -> crate::OTelSdkResult { + Ok(()) + } + + fn on_start(&self, span: &mut opentelemetry_sdk::trace::Span, _cx: &Context) { + if !matches!(span.span_kind(), SpanKind::Server) { + return; + } + let Some(route) = span + .attributes() + .iter() + .find(|kv| kv.key.as_str() == "http.route") + else { + return; + }; + let Ok(mut counts) = self.0.lock() else { + return; + }; + let count = counts.entry(route.key.clone()).or_default(); + *count += 1; + span.set_attribute(KeyValue::new( + "http.route.concurrent_requests", + *count as i64, + )); + } + + fn on_end(&self, span: &mut FinishedSpan) { + if !matches!(span.span_kind(), SpanKind::Server) { + return; + } + let Some(route) = span + .attributes() + .iter() + .find(|kv| kv.key.as_str() == "http.route") + else { + return; + }; + let Ok(mut counts) = self.0.lock() else { + return; + }; + let Some(count) = counts.get_mut(&route.key) else { + return; + }; + *count -= 1; + if *count == 0 { + counts.remove(&route.key); + } + } +} + /// A custom log processor that enriches LogRecords with baggage attributes. /// Baggage information is not added automatically without this processor. #[derive(Debug)] @@ -142,7 +208,7 @@ impl SpanProcessor for EnrichWithBaggageSpanProcessor { } } - fn on_end(&self, _span: opentelemetry_sdk::trace::SpanData) {} + fn on_end(&self, _span: &mut opentelemetry_sdk::trace::FinishedSpan) {} } fn init_tracer() -> SdkTracerProvider { @@ -158,6 +224,7 @@ fn init_tracer() -> SdkTracerProvider { // Setup tracerprovider with stdout exporter // that prints the spans to stdout. let provider = SdkTracerProvider::builder() + .with_span_processor(RouteConcurrencyCounterSpanProcessor::default()) .with_span_processor(EnrichWithBaggageSpanProcessor) .with_simple_exporter(SpanExporter::default()) .build(); diff --git a/opentelemetry-sdk/Cargo.toml b/opentelemetry-sdk/Cargo.toml index aa92787ea7..c18a0d6488 100644 --- a/opentelemetry-sdk/Cargo.toml +++ b/opentelemetry-sdk/Cargo.toml @@ -117,6 +117,12 @@ name = "log" harness = false required-features = ["logs"] +[[bench]] +name = "span_processor_api" +harness = false +required-features = ["testing"] + + [lib] bench = false diff --git a/opentelemetry-sdk/benches/batch_span_processor.rs b/opentelemetry-sdk/benches/batch_span_processor.rs index d67077d729..306714a414 100644 --- a/opentelemetry-sdk/benches/batch_span_processor.rs +++ b/opentelemetry-sdk/benches/batch_span_processor.rs @@ -4,10 +4,10 @@ use opentelemetry::trace::{ SpanContext, SpanId, SpanKind, Status, TraceFlags, TraceId, TraceState, }; use opentelemetry_sdk::testing::trace::NoopSpanExporter; -use opentelemetry_sdk::trace::SpanData; use opentelemetry_sdk::trace::{ BatchConfigBuilder, BatchSpanProcessor, SpanEvents, SpanLinks, SpanProcessor, }; +use opentelemetry_sdk::trace::{FinishedSpan, SpanData}; use std::sync::Arc; use tokio::runtime::Runtime; @@ -62,7 +62,8 @@ fn criterion_benchmark(c: &mut Criterion) { let spans = get_span_data(); handles.push(tokio::spawn(async move { for span in spans { - span_processor.on_end(span); + let mut span = FinishedSpan::new(span); + span_processor.on_end(&mut span); tokio::task::yield_now().await; } })); diff --git a/opentelemetry-sdk/benches/span_processor_api.rs b/opentelemetry-sdk/benches/span_processor_api.rs new file mode 100644 index 0000000000..eb0561bb04 --- /dev/null +++ b/opentelemetry-sdk/benches/span_processor_api.rs @@ -0,0 +1,88 @@ +use std::time::Duration; + +use criterion::{black_box, criterion_group, criterion_main, Criterion}; +use opentelemetry::{ + trace::{Span, Tracer, TracerProvider}, + Context, KeyValue, +}; +use opentelemetry_sdk::trace as sdktrace; + +#[cfg(not(target_os = "windows"))] +use pprof::criterion::{Output, PProfProfiler}; + +/* +Adding results in comments for a quick reference. + Chip: Apple M1 Max + Total Number of Cores: 10 (8 performance and 2 efficiency) + +SpanProcessorApi/0_processors + time: [385.15 ns 386.14 ns 387.25 ns] +SpanProcessorApi/1_processors + time: [385.73 ns 387.17 ns 388.85 ns] +SpanProcessorApi/2_processors + time: [384.84 ns 385.66 ns 386.50 ns] +SpanProcessorApi/4_processors + time: [386.78 ns 388.17 ns 389.58 ns] +*/ + +#[derive(Debug)] +struct NoopSpanProcessor; + +impl sdktrace::SpanProcessor for NoopSpanProcessor { + fn on_start(&self, _span: &mut sdktrace::Span, _parent_cx: &Context) {} + fn on_end(&self, _span: &mut sdktrace::FinishedSpan) {} + fn force_flush(&self) -> opentelemetry_sdk::error::OTelSdkResult { + Ok(()) + } + fn shutdown_with_timeout(&self, _timeout: Duration) -> opentelemetry_sdk::error::OTelSdkResult { + Ok(()) + } +} + +fn create_tracer(span_processors_count: usize) -> sdktrace::SdkTracer { + let mut builder = sdktrace::SdkTracerProvider::builder(); + for _ in 0..span_processors_count { + builder = builder.with_span_processor(NoopSpanProcessor); + } + builder.build().tracer("tracer") +} + +fn create_span(tracer: &sdktrace::Tracer) -> sdktrace::Span { + let mut span = tracer.start("foo"); + span.set_attribute(KeyValue::new("key1", false)); + span.set_attribute(KeyValue::new("key2", "hello")); + span.set_attribute(KeyValue::new("key4", 123.456)); + span.add_event("my_event", vec![KeyValue::new("key1", "value1")]); + span +} + +fn criterion_benchmark(c: &mut Criterion) { + let mut group = c.benchmark_group("SpanProcessorApi"); + for i in [0, 1, 2, 4] { + group.bench_function(format!("{}_processors", i), |b| { + let tracer = create_tracer(i); + b.iter(|| { + black_box(create_span(&tracer)); + }); + }); + } +} + +#[cfg(not(target_os = "windows"))] +criterion_group! { + name = benches; + config = Criterion::default().with_profiler(PProfProfiler::new(100, Output::Flamegraph(None))) + .warm_up_time(std::time::Duration::from_secs(1)) + .measurement_time(std::time::Duration::from_secs(2)); + targets = criterion_benchmark +} + +#[cfg(target_os = "windows")] +criterion_group! { + name = benches; + config = Criterion::default().warm_up_time(std::time::Duration::from_secs(1)) + .measurement_time(std::time::Duration::from_secs(2)); + targets = criterion_benchmark +} + +criterion_main!(benches); diff --git a/opentelemetry-sdk/src/trace/mod.rs b/opentelemetry-sdk/src/trace/mod.rs index 42b299a606..4b54fd0e9a 100644 --- a/opentelemetry-sdk/src/trace/mod.rs +++ b/opentelemetry-sdk/src/trace/mod.rs @@ -39,7 +39,7 @@ pub use id_generator::{IdGenerator, RandomIdGenerator}; pub use links::SpanLinks; pub use provider::{SdkTracerProvider, TracerProviderBuilder}; pub use sampler::{Sampler, ShouldSample}; -pub use span::Span; +pub use span::{FinishedSpan, ReadableSpan, Span}; pub use span_limit::SpanLimits; pub use span_processor::{ BatchConfig, BatchConfigBuilder, BatchSpanProcessor, BatchSpanProcessorBuilder, @@ -137,7 +137,7 @@ mod tests { } } - fn on_end(&self, _span: SpanData) { + fn on_end(&self, _span: &mut FinishedSpan) { // TODO: Accessing Context::current() will panic today and hence commented out. // See https://github.com/open-telemetry/opentelemetry-rust/issues/2871 // let _c = Context::current(); diff --git a/opentelemetry-sdk/src/trace/provider.rs b/opentelemetry-sdk/src/trace/provider.rs index 2b05f89aea..d554abdb02 100644 --- a/opentelemetry-sdk/src/trace/provider.rs +++ b/opentelemetry-sdk/src/trace/provider.rs @@ -471,8 +471,8 @@ mod tests { SERVICE_NAME, TELEMETRY_SDK_LANGUAGE, TELEMETRY_SDK_NAME, TELEMETRY_SDK_VERSION, }; use crate::trace::provider::TracerProviderInner; - use crate::trace::{Config, Span, SpanProcessor}; - use crate::trace::{SdkTracerProvider, SpanData}; + use crate::trace::SdkTracerProvider; + use crate::trace::{Config, FinishedSpan, Span, SpanProcessor}; use crate::Resource; use opentelemetry::trace::{Tracer, TracerProvider}; use opentelemetry::{Context, Key, KeyValue, Value}; @@ -526,7 +526,7 @@ mod tests { .fetch_add(1, Ordering::SeqCst); } - fn on_end(&self, _span: SpanData) { + fn on_end(&self, _span: &mut FinishedSpan) { // ignore } @@ -789,7 +789,7 @@ mod tests { // No operation needed for this processor } - fn on_end(&self, _span: SpanData) { + fn on_end(&self, _span: &mut FinishedSpan) { // No operation needed for this processor } diff --git a/opentelemetry-sdk/src/trace/span.rs b/opentelemetry-sdk/src/trace/span.rs index 9b0ae253ce..58c873733d 100644 --- a/opentelemetry-sdk/src/trace/span.rs +++ b/opentelemetry-sdk/src/trace/span.rs @@ -192,49 +192,41 @@ impl opentelemetry::trace::Span for Span { /// Finishes the span with given timestamp. fn end_with_timestamp(&mut self, timestamp: SystemTime) { - self.ensure_ended_and_exported(Some(timestamp)); + if let Some(data) = self.data.as_mut() { + data.end_time = timestamp; + } + self.end_and_export(); } } impl Span { - fn ensure_ended_and_exported(&mut self, timestamp: Option) { - // skip if data has already been exported - let mut data = match self.data.take() { - Some(data) => data, - None => return, - }; - - let provider = self.tracer.provider(); - // skip if provider has been shut down - if provider.is_shutdown() { + /// Span ending logic + /// + /// The end timestamp of the span has to be set before calling this function + fn end_and_export(&mut self) { + if self.tracer.provider().is_shutdown() { return; } - // ensure end time is set via explicit end or implicitly on drop - if let Some(timestamp) = timestamp { - data.end_time = timestamp; - } else if data.end_time == data.start_time { - data.end_time = opentelemetry::time::now(); - } + let Span { + data, + tracer, + span_context, + .. + } = self; - match provider.span_processors() { - [] => {} - [processor] => { - processor.on_end(build_export_data( - data, - self.span_context.clone(), - &self.tracer, - )); - } - processors => { - for processor in processors { - processor.on_end(build_export_data( - data.clone(), - self.span_context.clone(), - &self.tracer, - )); - } - } + // Grab the span data from the span and leave it empty + // This way we don't have to clone the data + let Some(data) = data.take() else { return }; + let span_context: SpanContext = + std::mem::replace(span_context, SpanContext::empty_context()); + + let mut finished_span = FinishedSpan::new(build_export_data(data, span_context, tracer)); + + let span_processors = tracer.provider().span_processors(); + for (i, processor) in span_processors.iter().enumerate() { + finished_span.reset(i == span_processors.len() - 1); + processor.on_end(&mut finished_span); } } } @@ -242,7 +234,303 @@ impl Span { impl Drop for Span { /// Report span on inner drop fn drop(&mut self) { - self.ensure_ended_and_exported(None); + if let Some(ref mut data) = self.data { + // if the span end_time has not been set, set it to now + if data.end_time == data.start_time { + data.end_time = opentelemetry::time::now(); + } + } + self.end_and_export(); + } +} + +/// Represents a finished span passed to a span processor. +/// +/// The data associated with the span is not writable, but it can be read +/// through the `ReadableSpan` trait. +/// +/// Taking ownership of the span data is done by calling `consume`. +/// If `consume`` is never called, the on_end method will not perform any copy of +/// the span data. +/// +/// ``` +/// use opentelemetry_sdk::trace::{FinishedSpan, ReadableSpan}; +/// fn on_end(span: &mut FinishedSpan) { +/// // Read the span data without consuming it +/// if span.name() != Some("my_span") { +/// return; +/// } +/// // Consume the span data, potentially cloning it +/// let span = span.consume(); +/// # let _ = span; +/// } +/// ``` +pub struct FinishedSpan { + span: Option, + is_last_processor: bool, + is_consumed: bool, +} + +impl FinishedSpan { + /// Creates a new `FinishedSpan` with the given span data. + pub fn new(span_data: crate::trace::SpanData) -> Self { + FinishedSpan { + span: Some(span_data), + is_last_processor: true, + is_consumed: false, + } + } + + fn reset(&mut self, last_processor: bool) { + self.is_last_processor = last_processor; + self.is_consumed = false; + } + + /// Takes ownership of the span data in the `FinishedSpan`. + /// + /// Returns `None` if the span data has already been consumed. + pub fn consume(&mut self) -> Option { + if self.is_consumed { + opentelemetry::otel_error!(name: "FinishedSpan.ConsumeTwice", message = "consume called twice on FinishedSpan in the same span processor"); + return None; + } + self.is_consumed = true; + if self.is_last_processor { + self.span.take() + } else { + self.span.clone() + } + } +} + +impl ReadableSpan for FinishedSpan { + fn context(&self) -> &SpanContext { + match self.span { + Some(ref data) => &data.span_context, + None => &SpanContext::NONE, + } + } + + fn parent_span_id(&self) -> SpanId { + match self.span { + Some(ref data) => data.parent_span_id, + None => SpanId::INVALID, + } + } + + fn span_kind(&self) -> SpanKind { + match self.span { + Some(ref data) => data.span_kind.clone(), + None => SpanKind::Internal, + } + } + + fn name(&self) -> Option<&str> { + self.span.as_ref().map(|s| s.name.as_ref()) + } + fn start_time(&self) -> Option { + self.span.as_ref().map(|s| s.start_time) + } + fn end_time(&self) -> Option { + self.span.as_ref().map(|s| s.end_time) + } + fn attributes(&self) -> &[KeyValue] { + match self.span { + Some(ref data) => data.attributes.as_slice(), + None => &[], + } + } + fn dropped_attributes_count(&self) -> u32 { + match self.span { + Some(ref data) => data.dropped_attributes_count, + None => 0, + } + } + fn events(&self) -> &[Event] { + match self.span { + Some(ref data) => data.events.events.as_slice(), + None => &[], + } + } + fn dropped_events_count(&self) -> u32 { + match self.span { + Some(ref data) => data.events.dropped_count, + None => 0, + } + } + fn links(&self) -> &[Link] { + match self.span { + Some(ref data) => data.links.links.as_slice(), + None => &[], + } + } + + fn dropped_links_count(&self) -> u32 { + match self.span { + Some(ref data) => data.links.dropped_count, + None => 0, + } + } + fn status(&self) -> &Status { + match self.span { + Some(ref data) => &data.status, + None => &Status::Unset, + } + } +} + +impl std::fmt::Debug for FinishedSpan { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + let mut fmt = f.debug_struct("FinishedSpan"); + match &self.span { + Some(s) if !self.is_consumed => fmt.field("span", s), + _ => fmt.field("consumed", &self.is_consumed), + }; + fmt.finish() + } +} + +/// A trait for reading span data. +pub trait ReadableSpan { + /// Returns the `SpanContext` of the span. + fn context(&self) -> &SpanContext; + + /// Returns the `SpanId` of the parent span. + fn parent_span_id(&self) -> SpanId; + + /// Returns the `SpanKind` of the span. + /// + /// Returns `SpanKind::Internal` if the span is not recording. + fn span_kind(&self) -> SpanKind; + + /// Returns the name of the span. + /// + /// Returns `None` if the span is not recording. + fn name(&self) -> Option<&str>; + + /// Returns the start time of the span. + /// + /// Returns `None` if the span is not recording. + fn start_time(&self) -> Option; + + /// Returns the end time of the span. + /// + /// Returns `None` if + /// * the span is not recording. + /// * the span has not been ended. + fn end_time(&self) -> Option; + + /// Returns the attributes of the span. + /// + /// Returns an empty slice if the span is not recording. + fn attributes(&self) -> &[KeyValue]; + + /// Returns the number of dropped attributes. + fn dropped_attributes_count(&self) -> u32; + + /// Returns the events associated to the span. + /// + /// Returns an empty slice if the span is not recording. + fn events(&self) -> &[Event]; + + /// Returns the number of dropped events. + fn dropped_events_count(&self) -> u32; + + /// Returns the span links associated to the span. + /// + /// Returns an empty slice if the span is not recording. + fn links(&self) -> &[Link]; + + /// Returns the number of dropped links. + fn dropped_links_count(&self) -> u32; + + /// Returns the status of the span. + /// + /// Returns `Status::Unset` if the span is not recording. + fn status(&self) -> &Status; +} + +impl ReadableSpan for Span { + fn context(&self) -> &SpanContext { + &self.span_context + } + + fn parent_span_id(&self) -> SpanId { + self.data + .as_ref() + .map(|data| data.parent_span_id) + .unwrap_or_else(|| SpanId::INVALID) + } + + fn span_kind(&self) -> SpanKind { + self.data + .as_ref() + .map(|data| data.span_kind.clone()) + .unwrap_or(SpanKind::Internal) + } + + /// Returns the name of the span. + /// + /// Returns `None` if the span is not recording. + fn name(&self) -> Option<&str> { + Some(&self.data.as_ref()?.name) + } + + fn start_time(&self) -> Option { + self.data.as_ref().map(|data| data.start_time) + } + + fn end_time(&self) -> Option { + self.data.as_ref().map(|data| data.end_time) + } + + fn attributes(&self) -> &[KeyValue] { + self.data + .as_ref() + .map(|data| data.attributes.as_slice()) + .unwrap_or(&[]) + } + + fn dropped_attributes_count(&self) -> u32 { + self.data + .as_ref() + .map(|data| data.dropped_attributes_count) + .unwrap_or(0) + } + + fn events(&self) -> &[Event] { + self.data + .as_ref() + .map(|data| data.events.events.as_slice()) + .unwrap_or(&[]) + } + + fn dropped_events_count(&self) -> u32 { + self.data + .as_ref() + .map(|data| data.events.dropped_count) + .unwrap_or(0) + } + + fn links(&self) -> &[Link] { + self.data + .as_ref() + .map(|data| data.links.links.as_slice()) + .unwrap_or(&[]) + } + + fn dropped_links_count(&self) -> u32 { + self.data + .as_ref() + .map(|data| data.links.dropped_count) + .unwrap_or(0) + } + + fn status(&self) -> &Status { + self.data + .as_ref() + .map(|data| &data.status) + .unwrap_or(&Status::Unset) } } @@ -275,9 +563,10 @@ mod tests { DEFAULT_MAX_ATTRIBUTES_PER_EVENT, DEFAULT_MAX_ATTRIBUTES_PER_LINK, DEFAULT_MAX_ATTRIBUTES_PER_SPAN, DEFAULT_MAX_EVENT_PER_SPAN, DEFAULT_MAX_LINKS_PER_SPAN, }; - use crate::trace::{SpanEvents, SpanLinks}; - use opentelemetry::trace::{self, SpanBuilder, TraceFlags, TraceId, Tracer}; - use opentelemetry::{trace::Span as _, trace::TracerProvider}; + use crate::trace::{SdkTracer, SpanEvents, SpanLinks, SpanProcessor}; + use opentelemetry::trace::{ + self, SamplingResult, Span as _, SpanBuilder, TraceFlags, TraceId, Tracer, TracerProvider, + }; use std::time::Duration; use std::vec; @@ -705,6 +994,82 @@ mod tests { assert_eq!(event_vec.len(), DEFAULT_MAX_EVENT_PER_SPAN as usize); } + fn make_test_span(tracer: &SdkTracer, sampling_decision: trace::SamplingDecision) -> Span { + tracer + .span_builder("test_span") + .with_sampling_result(SamplingResult { + decision: sampling_decision, + attributes: vec![], + trace_state: Default::default(), + }) + .with_attributes(vec![KeyValue::new("k", "v")]) + .with_kind(SpanKind::Client) + .with_events(vec![Event::with_name("test_event")]) + .with_links(vec![Link::with_context(SpanContext::new( + TraceId::from_bytes((1234_u128).to_ne_bytes()), + SpanId::from_bytes((5678_u64).to_ne_bytes()), + Default::default(), + false, + Default::default(), + ))]) + .with_span_id(SpanId::from_bytes((1337_u64).to_ne_bytes())) + .start(tracer) + } + + #[test] + fn test_readable_span() { + use super::ReadableSpan; + + let provider = crate::trace::SdkTracerProvider::builder() + .with_simple_exporter(NoopSpanExporter::new()) + .build(); + let tracer = provider.tracer("test"); + { + // ReadableSpan trait methods for recording span + let span = make_test_span(&tracer, trace::SamplingDecision::RecordOnly); + + assert_eq!( + span.context().span_id(), + SpanId::from_bytes((1337_u64).to_ne_bytes()) + ); + + assert_eq!(span.name(), Some("test_span")); + assert_eq!(span.span_kind(), SpanKind::Client); + assert!(span.start_time().is_some()); + assert!(span.end_time().is_some()); + assert_eq!(span.attributes(), &[KeyValue::new("k", "v")]); + assert_eq!(span.dropped_attributes_count(), 0); + assert_eq!(span.events().len(), 1); + assert_eq!(span.events()[0].name, "test_event"); + assert_eq!(span.dropped_events_count(), 0); + assert_eq!(span.links().len(), 1); + } + + { + // ReadableSpan trait methods for non-recording span + let span = make_test_span(&tracer, trace::SamplingDecision::Drop); + + assert_eq!( + span.context().span_id(), + SpanId::from_bytes((1337_u64).to_ne_bytes()) + ); + + assert_eq!(span.name(), None); + assert_eq!(span.span_kind(), SpanKind::Internal); + assert!(span.start_time().is_none()); + assert!(span.end_time().is_none()); + assert_eq!(span.attributes(), &[]); + assert_eq!(span.dropped_attributes_count(), 0); + assert_eq!(span.events().len(), 0); + assert_eq!(span.dropped_events_count(), 0); + assert_eq!(span.links().len(), 0); + assert_eq!( + span.context().span_id(), + SpanId::from_bytes((1337_u64).to_ne_bytes()) + ); + } + } + #[test] fn test_span_exported_data() { let provider = crate::trace::SdkTracerProvider::builder() @@ -725,4 +1090,89 @@ mod tests { // return none if the provider has already been dropped assert!(dropped_span.exported_data().is_none()); } + + #[test] + fn test_finished_span_consume() { + use super::ReadableSpan; + + #[derive(Debug)] + struct TestSpanProcessor; + impl SpanProcessor for TestSpanProcessor { + fn on_end(&self, span: &mut FinishedSpan) { + assert_eq!( + span.context().span_id(), + SpanId::from_bytes((1337_u64).to_ne_bytes()) + ); + + assert_eq!(span.name(), Some("test_span")); + assert_eq!(span.span_kind(), SpanKind::Client); + assert!(span.start_time().is_some()); + assert!(span.end_time().is_some()); + assert_eq!(span.attributes(), &[KeyValue::new("k", "v")]); + assert_eq!(span.dropped_attributes_count(), 0); + assert_eq!(span.events().len(), 1); + assert_eq!(span.events()[0].name, "test_event"); + assert_eq!(span.dropped_events_count(), 0); + assert_eq!(span.links().len(), 1); + + let _ = span.consume(); + } + + fn on_start(&self, _span: &mut Span, _cx: &opentelemetry::Context) {} + + fn force_flush(&self) -> crate::error::OTelSdkResult { + Ok(()) + } + + fn shutdown_with_timeout(&self, _timeout: Duration) -> crate::error::OTelSdkResult { + Ok(()) + } + } + + let provider = crate::trace::SdkTracerProvider::builder() + .with_span_processor(TestSpanProcessor) + .with_span_processor(TestSpanProcessor) + .build(); + drop(make_test_span( + &provider.tracer("test"), + trace::SamplingDecision::RecordAndSample, + )); + let res = provider.shutdown(); + println!("{:?}", res); + assert!(res.is_ok()); + } + + #[test] + fn test_finished_span_consume_twice() { + #[derive(Debug)] + struct TestSpanProcessor; + impl SpanProcessor for TestSpanProcessor { + fn on_end(&self, span: &mut FinishedSpan) { + let _ = span.consume(); + assert!(span.consume().is_none()); + } + + fn on_start(&self, _span: &mut Span, _cx: &opentelemetry::Context) {} + + fn force_flush(&self) -> crate::error::OTelSdkResult { + Ok(()) + } + + fn shutdown_with_timeout(&self, _timeout: Duration) -> crate::error::OTelSdkResult { + Ok(()) + } + } + + let provider = crate::trace::SdkTracerProvider::builder() + .with_span_processor(TestSpanProcessor) + .build(); + drop(make_test_span( + &provider.tracer("test"), + trace::SamplingDecision::RecordAndSample, + )); + + let res = provider.shutdown(); + println!("{:?}", res); + assert!(res.is_ok()); + } } diff --git a/opentelemetry-sdk/src/trace/span_processor.rs b/opentelemetry-sdk/src/trace/span_processor.rs index 67862ecb8a..8d08c6291b 100644 --- a/opentelemetry-sdk/src/trace/span_processor.rs +++ b/opentelemetry-sdk/src/trace/span_processor.rs @@ -79,11 +79,11 @@ pub trait SpanProcessor: Send + Sync + std::fmt::Debug { /// synchronously on the thread that started the span, therefore it should /// not block or throw exceptions. fn on_start(&self, span: &mut Span, cx: &Context); + /// `on_end` is called after a `Span` is ended (i.e., the end timestamp is /// already set). This method is called synchronously within the `Span::end` /// API, therefore it should not block or throw an exception. - /// TODO - This method should take reference to `SpanData` - fn on_end(&self, span: SpanData); + fn on_end(&self, span: &mut FinishedSpan); /// Force the spans lying in the cache to be exported. fn force_flush(&self) -> OTelSdkResult; /// Shuts down the processor. Called when SDK is shut down. This is an @@ -133,10 +133,11 @@ impl SpanProcessor for SimpleSpanProcessor { // Ignored } - fn on_end(&self, span: SpanData) { - if !span.span_context.is_sampled() { + fn on_end(&self, span: &mut FinishedSpan) { + if !span.context().is_sampled() { return; } + let Some(span) = span.consume() else { return }; let result = self .exporter @@ -240,6 +241,9 @@ use std::sync::mpsc::Receiver; use std::sync::mpsc::RecvTimeoutError; use std::sync::mpsc::SyncSender; +use super::span::FinishedSpan; +use super::ReadableSpan; + /// Messages exchanged between the main thread and the background thread. #[allow(clippy::large_enum_variant)] #[derive(Debug)] @@ -513,7 +517,7 @@ impl SpanProcessor for BatchSpanProcessor { } /// Handles span end. - fn on_end(&self, span: SpanData) { + fn on_end(&self, span: &mut FinishedSpan) { if self.is_shutdown.load(Ordering::Relaxed) { // this is a warning, as the user is trying to emit after the processor has been shutdown otel_warn!( @@ -522,6 +526,9 @@ impl SpanProcessor for BatchSpanProcessor { ); return; } + let Some(span) = span.consume() else { + return; + }; let result = self.span_sender.try_send(span); if result.is_err() { @@ -864,8 +871,8 @@ mod tests { OTEL_BSP_EXPORT_TIMEOUT_DEFAULT, OTEL_BSP_MAX_CONCURRENT_EXPORTS, OTEL_BSP_MAX_CONCURRENT_EXPORTS_DEFAULT, OTEL_BSP_MAX_EXPORT_BATCH_SIZE_DEFAULT, }; - use crate::trace::InMemorySpanExporterBuilder; use crate::trace::{BatchConfig, BatchConfigBuilder, SpanEvents, SpanLinks}; + use crate::trace::{FinishedSpan, InMemorySpanExporterBuilder}; use crate::trace::{SpanData, SpanExporter}; use opentelemetry::trace::{SpanContext, SpanId, SpanKind, Status}; use std::fmt::Debug; @@ -876,7 +883,7 @@ mod tests { let exporter = InMemorySpanExporterBuilder::new().build(); let processor = SimpleSpanProcessor::new(exporter.clone()); let span_data = new_test_export_span_data(); - processor.on_end(span_data.clone()); + processor.on_end(&mut FinishedSpan::new(span_data.clone())); assert_eq!(exporter.get_finished_spans().unwrap()[0], span_data); let _result = processor.shutdown(); } @@ -899,7 +906,7 @@ mod tests { status: Status::Unset, instrumentation_scope: Default::default(), }; - processor.on_end(unsampled); + processor.on_end(&mut FinishedSpan::new(unsampled)); assert!(exporter.get_finished_spans().unwrap().is_empty()); } @@ -908,7 +915,7 @@ mod tests { let exporter = InMemorySpanExporterBuilder::new().build(); let processor = SimpleSpanProcessor::new(exporter.clone()); let span_data = new_test_export_span_data(); - processor.on_end(span_data.clone()); + processor.on_end(&mut FinishedSpan::new(span_data.clone())); assert!(!exporter.get_finished_spans().unwrap().is_empty()); let _result = processor.shutdown(); // Assume shutdown is called by ensuring spans are empty in the exporter @@ -1110,7 +1117,7 @@ mod tests { let processor = BatchSpanProcessor::new(exporter, config); let test_span = create_test_span("test_span"); - processor.on_end(test_span.clone()); + processor.on_end(&mut FinishedSpan::new(test_span.clone())); // Wait for flush interval to ensure the span is processed std::thread::sleep(Duration::from_secs(6)); @@ -1133,7 +1140,7 @@ mod tests { // Create a test span and send it to the processor let test_span = create_test_span("force_flush_span"); - processor.on_end(test_span.clone()); + processor.on_end(&mut FinishedSpan::new(test_span.clone())); // Call force_flush to immediately export the spans let flush_result = processor.force_flush(); @@ -1162,7 +1169,7 @@ mod tests { // Create a test span and send it to the processor let test_span = create_test_span("shutdown_span"); - processor.on_end(test_span.clone()); + processor.on_end(&mut FinishedSpan::new(test_span.clone())); // Call shutdown to flush and export all pending spans let shutdown_result = processor.shutdown(); @@ -1200,9 +1207,9 @@ mod tests { let span2 = create_test_span("span2"); let span3 = create_test_span("span3"); // This span should be dropped - processor.on_end(span1.clone()); - processor.on_end(span2.clone()); - processor.on_end(span3.clone()); // This span exceeds the queue size + processor.on_end(&mut FinishedSpan::new(span1.clone())); + processor.on_end(&mut FinishedSpan::new(span2.clone())); + processor.on_end(&mut FinishedSpan::new(span3.clone())); // This span exceeds the queue size // Wait for the scheduled delay to expire std::thread::sleep(Duration::from_secs(3)); @@ -1242,7 +1249,7 @@ mod tests { KeyValue::new("key1", "value1"), KeyValue::new("key2", "value2"), ]; - processor.on_end(span_data.clone()); + processor.on_end(&mut FinishedSpan::new(span_data.clone())); // Force flush to export the span let _ = processor.force_flush(); @@ -1273,7 +1280,7 @@ mod tests { // Create a span and send it to the processor let test_span = create_test_span("resource_test"); - processor.on_end(test_span.clone()); + processor.on_end(&mut FinishedSpan::new(test_span.clone())); // Force flush to ensure the span is exported let _ = processor.force_flush(); @@ -1308,7 +1315,7 @@ mod tests { for _ in 0..4 { let span = new_test_export_span_data(); - processor.on_end(span); + processor.on_end(&mut FinishedSpan::new(span)); } processor.force_flush().unwrap(); @@ -1331,7 +1338,7 @@ mod tests { for _ in 0..4 { let span = new_test_export_span_data(); - processor.on_end(span); + processor.on_end(&mut FinishedSpan::new(span)); } processor.force_flush().unwrap(); @@ -1358,7 +1365,7 @@ mod tests { let processor_clone = Arc::clone(&processor); let handle = tokio::spawn(async move { let span = new_test_export_span_data(); - processor_clone.on_end(span); + processor_clone.on_end(&mut FinishedSpan::new(span)); }); handles.push(handle); } diff --git a/opentelemetry-sdk/src/trace/span_processor_with_async_runtime.rs b/opentelemetry-sdk/src/trace/span_processor_with_async_runtime.rs index 95e5d2397a..8bf858ac3e 100644 --- a/opentelemetry-sdk/src/trace/span_processor_with_async_runtime.rs +++ b/opentelemetry-sdk/src/trace/span_processor_with_async_runtime.rs @@ -4,7 +4,7 @@ use crate::runtime::{to_interval_stream, RuntimeChannel, TrySend}; use crate::trace::BatchConfig; use crate::trace::Span; use crate::trace::SpanProcessor; -use crate::trace::{SpanData, SpanExporter}; +use crate::trace::{FinishedSpan, ReadableSpan, SpanData, SpanExporter}; use futures_channel::oneshot; use futures_util::pin_mut; use futures_util::{ @@ -103,10 +103,11 @@ impl SpanProcessor for BatchSpanProcessor { // Ignored } - fn on_end(&self, span: SpanData) { - if !span.span_context.is_sampled() { + fn on_end(&self, span: &mut FinishedSpan) { + if !span.context().is_sampled() { return; } + let Some(span) = span.consume() else { return }; let result = self.message_sender.try_send(BatchMessage::ExportSpan(span)); @@ -432,7 +433,7 @@ mod tests { OTEL_BSP_MAX_QUEUE_SIZE_DEFAULT, OTEL_BSP_SCHEDULE_DELAY, OTEL_BSP_SCHEDULE_DELAY_DEFAULT, }; use crate::trace::{BatchConfig, BatchConfigBuilder, InMemorySpanExporterBuilder}; - use crate::trace::{SpanData, SpanExporter}; + use crate::trace::{FinishedSpan, SpanData, SpanExporter}; use futures_util::Future; use std::fmt::Debug; use std::time::Duration; @@ -519,7 +520,7 @@ mod tests { } }); tokio::time::sleep(Duration::from_secs(1)).await; // skip the first - processor.on_end(new_test_export_span_data()); + processor.on_end(&mut FinishedSpan::new(new_test_export_span_data())); let flush_res = processor.force_flush(); assert!(flush_res.is_ok()); let _shutdown_result = processor.shutdown(); @@ -546,7 +547,7 @@ mod tests { }; let processor = BatchSpanProcessor::new(exporter, config, runtime::TokioCurrentThread); tokio::time::sleep(Duration::from_secs(1)).await; // skip the first - processor.on_end(new_test_export_span_data()); + processor.on_end(&mut FinishedSpan::new(new_test_export_span_data())); let flush_res = processor.force_flush(); if time_out { assert!(flush_res.is_err()); diff --git a/stress/src/traces.rs b/stress/src/traces.rs index e0f15099e5..1e73d1bebf 100644 --- a/stress/src/traces.rs +++ b/stress/src/traces.rs @@ -15,7 +15,7 @@ use opentelemetry::{ }; use opentelemetry_sdk::{ error::OTelSdkResult, - trace::{self as sdktrace, SpanData, SpanProcessor}, + trace::{self as sdktrace, FinishedSpan, SpanProcessor}, }; use std::time::Duration; @@ -37,7 +37,7 @@ impl SpanProcessor for NoOpSpanProcessor { // No-op } - fn on_end(&self, _span: SpanData) { + fn on_end(&self, _span: &mut FinishedSpan) { // No-op }