From 5a24c634b0766f5cef34819b93679520a8ab8faf Mon Sep 17 00:00:00 2001 From: Mindaugas Vinkelis Date: Sun, 6 Apr 2025 20:44:40 +0300 Subject: [PATCH 1/4] refactor: PushMetricExpoter interface --- .../src/exporter/http/metrics.rs | 4 +- opentelemetry-otlp/src/exporter/http/mod.rs | 6 +- .../src/exporter/tonic/metrics.rs | 6 +- opentelemetry-otlp/src/metric.rs | 9 +- opentelemetry-proto/src/transform/metrics.rs | 20 ++-- opentelemetry-sdk/src/metrics/data/mod.rs | 2 +- opentelemetry-sdk/src/metrics/exporter.rs | 91 ++++++++++++++++++- .../src/metrics/in_memory_exporter.rs | 39 ++++++-- opentelemetry-sdk/src/metrics/mod.rs | 5 +- .../src/metrics/periodic_reader.rs | 15 ++- .../periodic_reader_with_async_runtime.rs | 6 +- opentelemetry-stdout/src/metrics/exporter.rs | 16 ++-- 12 files changed, 164 insertions(+), 55 deletions(-) diff --git a/opentelemetry-otlp/src/exporter/http/metrics.rs b/opentelemetry-otlp/src/exporter/http/metrics.rs index 15033686cc..6e5cf3da4f 100644 --- a/opentelemetry-otlp/src/exporter/http/metrics.rs +++ b/opentelemetry-otlp/src/exporter/http/metrics.rs @@ -4,12 +4,12 @@ use crate::metric::MetricsClient; use http::{header::CONTENT_TYPE, Method}; use opentelemetry::otel_debug; use opentelemetry_sdk::error::{OTelSdkError, OTelSdkResult}; -use opentelemetry_sdk::metrics::data::ResourceMetrics; +use opentelemetry_sdk::metrics::exporter::ResourceMetricsRef; use super::OtlpHttpClient; impl MetricsClient for OtlpHttpClient { - async fn export(&self, metrics: &mut ResourceMetrics) -> OTelSdkResult { + async fn export(&self, metrics: ResourceMetricsRef<'_>) -> OTelSdkResult { let client = self .client .lock() diff --git a/opentelemetry-otlp/src/exporter/http/mod.rs b/opentelemetry-otlp/src/exporter/http/mod.rs index ead431acfa..a45787ea7e 100644 --- a/opentelemetry-otlp/src/exporter/http/mod.rs +++ b/opentelemetry-otlp/src/exporter/http/mod.rs @@ -27,7 +27,7 @@ use std::time::Duration; mod metrics; #[cfg(feature = "metrics")] -use opentelemetry_sdk::metrics::data::ResourceMetrics; +use opentelemetry_sdk::metrics::exporter::ResourceMetricsRef; #[cfg(feature = "logs")] pub(crate) mod logs; @@ -326,11 +326,11 @@ impl OtlpHttpClient { #[cfg(feature = "metrics")] fn build_metrics_export_body( &self, - metrics: &mut ResourceMetrics, + metrics: ResourceMetricsRef<'_>, ) -> Option<(Vec, &'static str)> { use opentelemetry_proto::tonic::collector::metrics::v1::ExportMetricsServiceRequest; - let req: ExportMetricsServiceRequest = (&*metrics).into(); + let req: ExportMetricsServiceRequest = metrics.into(); match self.protocol { #[cfg(feature = "http-json")] diff --git a/opentelemetry-otlp/src/exporter/tonic/metrics.rs b/opentelemetry-otlp/src/exporter/tonic/metrics.rs index 2e5eb9df41..d061dc0f20 100644 --- a/opentelemetry-otlp/src/exporter/tonic/metrics.rs +++ b/opentelemetry-otlp/src/exporter/tonic/metrics.rs @@ -6,7 +6,7 @@ use opentelemetry_proto::tonic::collector::metrics::v1::{ metrics_service_client::MetricsServiceClient, ExportMetricsServiceRequest, }; use opentelemetry_sdk::error::{OTelSdkError, OTelSdkResult}; -use opentelemetry_sdk::metrics::data::ResourceMetrics; +use opentelemetry_sdk::metrics::exporter::ResourceMetricsRef; use tonic::{codegen::CompressionEncoding, service::Interceptor, transport::Channel, Request}; use super::BoxInterceptor; @@ -52,7 +52,7 @@ impl TonicMetricsClient { } impl MetricsClient for TonicMetricsClient { - async fn export(&self, metrics: &mut ResourceMetrics) -> OTelSdkResult { + async fn export(&self, metrics: ResourceMetricsRef<'_>) -> OTelSdkResult { let (mut client, metadata, extensions) = self .inner .lock() @@ -81,7 +81,7 @@ impl MetricsClient for TonicMetricsClient { .export(Request::from_parts( metadata, extensions, - ExportMetricsServiceRequest::from(&*metrics), + ExportMetricsServiceRequest::from(metrics), )) .await .map_err(|e| OTelSdkError::InternalFailure(format!("{e:?}")))?; diff --git a/opentelemetry-otlp/src/metric.rs b/opentelemetry-otlp/src/metric.rs index bdb383b47b..f2b0221244 100644 --- a/opentelemetry-otlp/src/metric.rs +++ b/opentelemetry-otlp/src/metric.rs @@ -17,9 +17,8 @@ use crate::{ExporterBuildError, NoExporterBuilderSet}; use core::fmt; use opentelemetry_sdk::error::OTelSdkResult; -use opentelemetry_sdk::metrics::{ - data::ResourceMetrics, exporter::PushMetricExporter, Temporality, -}; +use opentelemetry_sdk::metrics::exporter::ResourceMetricsRef; +use opentelemetry_sdk::metrics::{exporter::PushMetricExporter, Temporality}; use std::fmt::{Debug, Formatter}; use std::time::Duration; @@ -123,7 +122,7 @@ impl HasHttpConfig for MetricExporterBuilder { pub(crate) trait MetricsClient: fmt::Debug + Send + Sync + 'static { fn export( &self, - metrics: &mut ResourceMetrics, + metrics: ResourceMetricsRef<'_>, ) -> impl std::future::Future + Send; fn shutdown(&self) -> OTelSdkResult; } @@ -149,7 +148,7 @@ impl Debug for MetricExporter { } impl PushMetricExporter for MetricExporter { - async fn export(&self, metrics: &mut ResourceMetrics) -> OTelSdkResult { + async fn export(&self, metrics: ResourceMetricsRef<'_>) -> OTelSdkResult { match &self.client { #[cfg(feature = "grpc-tonic")] SupportedTransportClient::Tonic(client) => client.export(metrics).await, diff --git a/opentelemetry-proto/src/transform/metrics.rs b/opentelemetry-proto/src/transform/metrics.rs index c040898ec3..1c4cd3c004 100644 --- a/opentelemetry-proto/src/transform/metrics.rs +++ b/opentelemetry-proto/src/transform/metrics.rs @@ -11,9 +11,9 @@ pub mod tonic { use opentelemetry_sdk::metrics::data::{ AggregatedMetrics, Exemplar as SdkExemplar, ExponentialHistogram as SdkExponentialHistogram, Gauge as SdkGauge, - Histogram as SdkHistogram, Metric as SdkMetric, MetricData, ResourceMetrics, - ScopeMetrics as SdkScopeMetrics, Sum as SdkSum, + Histogram as SdkHistogram, Metric as SdkMetric, MetricData, Sum as SdkSum, }; + use opentelemetry_sdk::metrics::exporter::{ResourceMetricsRef, ScopeMetricsRef}; use opentelemetry_sdk::metrics::Temporality; use opentelemetry_sdk::Resource as SdkResource; @@ -110,12 +110,12 @@ pub mod tonic { } } - impl From<&ResourceMetrics> for ExportMetricsServiceRequest { - fn from(rm: &ResourceMetrics) -> Self { + impl From> for ExportMetricsServiceRequest { + fn from(rm: ResourceMetricsRef<'_>) -> Self { ExportMetricsServiceRequest { resource_metrics: vec![TonicResourceMetrics { - resource: Some((&rm.resource).into()), - scope_metrics: rm.scope_metrics.iter().map(Into::into).collect(), + resource: Some(rm.resource.into()), + scope_metrics: rm.scope_metrics.map(Into::into).collect(), schema_url: rm.resource.schema_url().map(Into::into).unwrap_or_default(), }], } @@ -131,11 +131,11 @@ pub mod tonic { } } - impl From<&SdkScopeMetrics> for TonicScopeMetrics { - fn from(sm: &SdkScopeMetrics) -> Self { + impl From> for TonicScopeMetrics { + fn from(sm: ScopeMetricsRef<'_>) -> Self { TonicScopeMetrics { - scope: Some((&sm.scope, None).into()), - metrics: sm.metrics.iter().map(Into::into).collect(), + scope: Some((sm.scope, None).into()), + metrics: sm.metrics.map(Into::into).collect(), schema_url: sm .scope .schema_url() diff --git a/opentelemetry-sdk/src/metrics/data/mod.rs b/opentelemetry-sdk/src/metrics/data/mod.rs index 819d2ef5fe..dbff5ba256 100644 --- a/opentelemetry-sdk/src/metrics/data/mod.rs +++ b/opentelemetry-sdk/src/metrics/data/mod.rs @@ -18,7 +18,7 @@ pub struct ResourceMetrics { } /// A collection of metrics produced by a meter. -#[derive(Default, Debug)] +#[derive(Debug, Default)] pub struct ScopeMetrics { /// The [InstrumentationScope] that the meter was created with. pub scope: InstrumentationScope, diff --git a/opentelemetry-sdk/src/metrics/exporter.rs b/opentelemetry-sdk/src/metrics/exporter.rs index f8775a3836..e6ce356bf0 100644 --- a/opentelemetry-sdk/src/metrics/exporter.rs +++ b/opentelemetry-sdk/src/metrics/exporter.rs @@ -1,11 +1,92 @@ //! Interfaces for exporting metrics -use crate::error::OTelSdkResult; -use std::time::Duration; +use opentelemetry::InstrumentationScope; -use crate::metrics::data::ResourceMetrics; +use crate::{error::OTelSdkResult, Resource}; +use std::{fmt::Debug, slice::Iter, time::Duration}; -use super::Temporality; +use super::{ + data::{Metric, ResourceMetrics, ScopeMetrics}, + Temporality, +}; + +/// A collection of [`BatchScopeMetrics`] and the associated [Resource] that created them. +#[derive(Debug)] +pub struct ResourceMetricsRef<'a> { + /// The entity that collected the metrics. + pub resource: &'a Resource, + /// The collection of metrics with unique [InstrumentationScope]s. + pub scope_metrics: BatchScopeMetrics<'a>, +} + +/// Iterator over libraries instrumentation scopes ([`InstrumentationScope`]) together with metrics. +pub struct BatchScopeMetrics<'a> { + iter: Iter<'a, ScopeMetrics>, +} + +/// A collection of metrics produced by a [`InstrumentationScope`] meter. +#[derive(Debug)] +pub struct ScopeMetricsRef<'a> { + /// The [InstrumentationScope] that the meter was created with. + pub scope: &'a InstrumentationScope, + /// The list of aggregations created by the meter. + pub metrics: BatchMetrics<'a>, +} + +/// Iterator over aggregations created by the meter. +pub struct BatchMetrics<'a> { + iter: Iter<'a, Metric>, +} + +impl<'a> ResourceMetricsRef<'a> { + pub(crate) fn new(rm: &'a ResourceMetrics) -> Self { + Self { + resource: &rm.resource, + scope_metrics: BatchScopeMetrics { + iter: rm.scope_metrics.iter(), + }, + } + } +} + +impl<'a> ScopeMetricsRef<'a> { + fn new(sm: &'a ScopeMetrics) -> Self { + Self { + scope: &sm.scope, + metrics: BatchMetrics { + iter: sm.metrics.iter(), + }, + } + } +} + +impl Debug for BatchScopeMetrics<'_> { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + f.debug_struct("BatchScopeMetrics").finish() + } +} + +impl<'a> Iterator for BatchScopeMetrics<'a> { + type Item = ScopeMetricsRef<'a>; + + fn next(&mut self) -> Option { + self.iter.next().map(ScopeMetricsRef::new) + } +} + +impl<'a> Iterator for BatchMetrics<'a> { + type Item = &'a Metric; + + fn next(&mut self) -> Option { + self.iter.next() + } +} + +impl Debug for BatchMetrics<'_> { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + f.debug_struct("BatchMetrics").finish() + } +} /// Exporter handles the delivery of metric data to external receivers. /// @@ -18,7 +99,7 @@ pub trait PushMetricExporter: Send + Sync + 'static { /// considered unrecoverable and will be logged. fn export( &self, - metrics: &mut ResourceMetrics, + metrics: ResourceMetricsRef<'_>, ) -> impl std::future::Future + Send; /// Flushes any metric data held by an exporter. diff --git a/opentelemetry-sdk/src/metrics/in_memory_exporter.rs b/opentelemetry-sdk/src/metrics/in_memory_exporter.rs index 4d65b81d2f..e47c58c952 100644 --- a/opentelemetry-sdk/src/metrics/in_memory_exporter.rs +++ b/opentelemetry-sdk/src/metrics/in_memory_exporter.rs @@ -1,7 +1,5 @@ use crate::error::{OTelSdkError, OTelSdkResult}; -use crate::metrics::data::{ - ExponentialHistogram, Gauge, Histogram, MetricData, ResourceMetrics, Sum, -}; +use crate::metrics::data::{ExponentialHistogram, Gauge, Histogram, MetricData, Sum}; use crate::metrics::exporter::PushMetricExporter; use crate::metrics::Temporality; use crate::InMemoryExporterError; @@ -10,7 +8,8 @@ use std::fmt; use std::sync::{Arc, Mutex}; use std::time::Duration; -use super::data::{AggregatedMetrics, Metric, ScopeMetrics}; +use super::data::{AggregatedMetrics, Metric, ResourceMetrics, ScopeMetrics}; +use super::exporter::ResourceMetricsRef; /// An in-memory metrics exporter that stores metrics data in memory. /// @@ -150,7 +149,31 @@ impl InMemoryMetricExporter { let metrics = self .metrics .lock() - .map(|metrics_guard| metrics_guard.iter().map(Self::clone_metrics).collect()) + .map(|metrics_guard| { + metrics_guard + .iter() + .map(|data| ResourceMetrics { + resource: data.resource.clone(), + scope_metrics: data + .scope_metrics + .iter() + .map(|data| ScopeMetrics { + scope: data.scope.clone(), + metrics: data + .metrics + .iter() + .map(|data| Metric { + name: data.name.clone(), + description: data.description.clone(), + unit: data.unit.clone(), + data: Self::clone_data(&data.data), + }) + .collect(), + }) + .collect(), + }) + .collect() + }) .map_err(InMemoryExporterError::from)?; Ok(metrics) } @@ -172,17 +195,15 @@ impl InMemoryMetricExporter { .map(|mut metrics_guard| metrics_guard.clear()); } - fn clone_metrics(metric: &ResourceMetrics) -> ResourceMetrics { + fn clone_metrics(metric: ResourceMetricsRef<'_>) -> ResourceMetrics { ResourceMetrics { resource: metric.resource.clone(), scope_metrics: metric .scope_metrics - .iter() .map(|scope_metric| ScopeMetrics { scope: scope_metric.scope.clone(), metrics: scope_metric .metrics - .iter() .map(|metric| Metric { name: metric.name.clone(), description: metric.description.clone(), @@ -237,7 +258,7 @@ impl InMemoryMetricExporter { } impl PushMetricExporter for InMemoryMetricExporter { - async fn export(&self, metrics: &mut ResourceMetrics) -> OTelSdkResult { + async fn export(&self, metrics: ResourceMetricsRef<'_>) -> OTelSdkResult { self.metrics .lock() .map(|mut metrics_guard| { diff --git a/opentelemetry-sdk/src/metrics/mod.rs b/opentelemetry-sdk/src/metrics/mod.rs index a2d811ecb2..05597f25e5 100644 --- a/opentelemetry-sdk/src/metrics/mod.rs +++ b/opentelemetry-sdk/src/metrics/mod.rs @@ -122,11 +122,12 @@ pub enum Temporality { #[cfg(all(test, feature = "testing"))] mod tests { - use self::data::{HistogramDataPoint, ScopeMetrics, SumDataPoint}; + use self::data::{HistogramDataPoint, SumDataPoint}; use super::data::MetricData; + use super::data::ResourceMetrics; + use super::data::ScopeMetrics; use super::internal::Number; use super::*; - use crate::metrics::data::ResourceMetrics; use crate::metrics::internal::AggregatedMetricsAccess; use crate::metrics::InMemoryMetricExporter; use crate::metrics::InMemoryMetricExporterBuilder; diff --git a/opentelemetry-sdk/src/metrics/periodic_reader.rs b/opentelemetry-sdk/src/metrics/periodic_reader.rs index 518a0cc3be..dac40fd7d4 100644 --- a/opentelemetry-sdk/src/metrics/periodic_reader.rs +++ b/opentelemetry-sdk/src/metrics/periodic_reader.rs @@ -12,7 +12,10 @@ use opentelemetry::{otel_debug, otel_error, otel_info, otel_warn, Context}; use crate::{ error::{OTelSdkError, OTelSdkResult}, - metrics::{exporter::PushMetricExporter, reader::SdkProducer}, + metrics::{ + exporter::{PushMetricExporter, ResourceMetricsRef}, + reader::SdkProducer, + }, Resource, }; @@ -411,7 +414,7 @@ impl PeriodicReaderInner { // Relying on futures executor to execute async call. // TODO: Pass timeout to exporter - futures_executor::block_on(self.exporter.export(&mut rm)) + futures_executor::block_on(self.exporter.export(ResourceMetricsRef::new(&rm))) } fn force_flush(&self) -> OTelSdkResult { @@ -516,7 +519,9 @@ mod tests { use crate::{ error::{OTelSdkError, OTelSdkResult}, metrics::{ - data::ResourceMetrics, exporter::PushMetricExporter, reader::MetricReader, + data::ResourceMetrics, + exporter::{PushMetricExporter, ResourceMetricsRef}, + reader::MetricReader, InMemoryMetricExporter, SdkMeterProvider, Temporality, }, Resource, @@ -553,7 +558,7 @@ mod tests { } impl PushMetricExporter for MetricExporterThatFailsOnlyOnFirst { - async fn export(&self, _metrics: &mut ResourceMetrics) -> OTelSdkResult { + async fn export(&self, _metrics: ResourceMetricsRef<'_>) -> OTelSdkResult { if self.count.fetch_add(1, Ordering::Relaxed) == 0 { Err(OTelSdkError::InternalFailure("export failed".into())) } else { @@ -584,7 +589,7 @@ mod tests { } impl PushMetricExporter for MockMetricExporter { - async fn export(&self, _metrics: &mut ResourceMetrics) -> OTelSdkResult { + async fn export(&self, _metrics: ResourceMetricsRef<'_>) -> OTelSdkResult { Ok(()) } diff --git a/opentelemetry-sdk/src/metrics/periodic_reader_with_async_runtime.rs b/opentelemetry-sdk/src/metrics/periodic_reader_with_async_runtime.rs index 03e0230157..9ca9bfd063 100644 --- a/opentelemetry-sdk/src/metrics/periodic_reader_with_async_runtime.rs +++ b/opentelemetry-sdk/src/metrics/periodic_reader_with_async_runtime.rs @@ -13,6 +13,7 @@ use futures_util::{ }; use opentelemetry::{otel_debug, otel_error}; +use crate::metrics::exporter::ResourceMetricsRef; use crate::runtime::{to_interval_stream, Runtime}; use crate::{ error::{OTelSdkError, OTelSdkResult}, @@ -259,7 +260,10 @@ impl PeriodicReaderWorker { message = "Calling exporter's export method with collected metrics.", count = self.rm.scope_metrics.len(), ); - let export = self.reader.exporter.export(&mut self.rm); + let export = self + .reader + .exporter + .export(ResourceMetricsRef::new(&self.rm)); let timeout = self.runtime.delay(self.timeout); pin_mut!(export); pin_mut!(timeout); diff --git a/opentelemetry-stdout/src/metrics/exporter.rs b/opentelemetry-stdout/src/metrics/exporter.rs index 2d94507e38..0ec9a073c9 100644 --- a/opentelemetry-stdout/src/metrics/exporter.rs +++ b/opentelemetry-stdout/src/metrics/exporter.rs @@ -1,14 +1,12 @@ use chrono::{DateTime, Utc}; use core::{f64, fmt}; use opentelemetry_sdk::metrics::data::{AggregatedMetrics, MetricData}; +use opentelemetry_sdk::metrics::exporter::{BatchScopeMetrics, ResourceMetricsRef}; use opentelemetry_sdk::metrics::Temporality; use opentelemetry_sdk::{ error::OTelSdkResult, metrics::{ - data::{ - Gauge, GaugeDataPoint, Histogram, HistogramDataPoint, ResourceMetrics, ScopeMetrics, - Sum, SumDataPoint, - }, + data::{Gauge, GaugeDataPoint, Histogram, HistogramDataPoint, Sum, SumDataPoint}, exporter::PushMetricExporter, }, }; @@ -42,7 +40,7 @@ impl fmt::Debug for MetricExporter { impl PushMetricExporter for MetricExporter { /// Write Metrics to stdout - async fn export(&self, metrics: &mut ResourceMetrics) -> OTelSdkResult { + async fn export(&self, metrics: ResourceMetricsRef<'_>) -> OTelSdkResult { if self.is_shutdown.load(atomic::Ordering::SeqCst) { Err(opentelemetry_sdk::error::OTelSdkError::AlreadyShutdown) } else { @@ -55,7 +53,7 @@ impl PushMetricExporter for MetricExporter { metrics.resource.iter().for_each(|(k, v)| { println!("\t -> {}={:?}", k, v); }); - print_metrics(&metrics.scope_metrics); + print_metrics(metrics.scope_metrics); Ok(()) } } @@ -79,8 +77,8 @@ impl PushMetricExporter for MetricExporter { } } -fn print_metrics(metrics: &[ScopeMetrics]) { - for (i, metric) in metrics.iter().enumerate() { +fn print_metrics(metrics: BatchScopeMetrics<'_>) { + for (i, metric) in metrics.enumerate() { println!("\tInstrumentation Scope #{}", i); println!("\t\tName : {}", &metric.scope.name()); if let Some(version) = &metric.scope.version() { @@ -100,7 +98,7 @@ fn print_metrics(metrics: &[ScopeMetrics]) { println!("\t\t\t -> {}: {}", kv.key, kv.value); }); - metric.metrics.iter().enumerate().for_each(|(i, metric)| { + metric.metrics.enumerate().for_each(|(i, metric)| { println!("Metric #{}", i); println!("\t\tName : {}", &metric.name); println!("\t\tDescription : {}", &metric.description); From ffa7d743db89141015ecce7dc6df4c3814c9bf2a Mon Sep 17 00:00:00 2001 From: Mindaugas Vinkelis Date: Fri, 11 Apr 2025 10:22:32 +0300 Subject: [PATCH 2/4] * Renamed ResourceMetric->ResourceMetricData and moved to MetricReader. * Renamed ResourceMetricRef->ResourceMetric and made that iteration would borrow an item, which will allow improving underlying metric collection without any allocations in the future * Updated CHANGELOG.md --- .../src/exporter/http/metrics.rs | 4 +- opentelemetry-otlp/src/exporter/http/mod.rs | 4 +- .../src/exporter/tonic/metrics.rs | 4 +- opentelemetry-otlp/src/metric.rs | 6 +- opentelemetry-proto/src/transform/metrics.rs | 22 +++-- opentelemetry-sdk/CHANGELOG.md | 10 ++ opentelemetry-sdk/benches/metric.rs | 8 +- opentelemetry-sdk/src/metrics/data/mod.rs | 22 +---- opentelemetry-sdk/src/metrics/exporter.rs | 53 +++++------ .../src/metrics/in_memory_exporter.rs | 51 +++++----- .../src/metrics/manual_reader.rs | 4 +- opentelemetry-sdk/src/metrics/mod.rs | 10 +- .../src/metrics/periodic_reader.rs | 25 ++--- .../periodic_reader_with_async_runtime.rs | 27 ++---- opentelemetry-sdk/src/metrics/pipeline.rs | 10 +- opentelemetry-sdk/src/metrics/reader.rs | 30 +++++- .../src/testing/metrics/metric_reader.rs | 7 +- opentelemetry-stdout/src/metrics/exporter.rs | 92 ++++++++++--------- 18 files changed, 208 insertions(+), 181 deletions(-) diff --git a/opentelemetry-otlp/src/exporter/http/metrics.rs b/opentelemetry-otlp/src/exporter/http/metrics.rs index 6e5cf3da4f..454ea03f3c 100644 --- a/opentelemetry-otlp/src/exporter/http/metrics.rs +++ b/opentelemetry-otlp/src/exporter/http/metrics.rs @@ -4,12 +4,12 @@ use crate::metric::MetricsClient; use http::{header::CONTENT_TYPE, Method}; use opentelemetry::otel_debug; use opentelemetry_sdk::error::{OTelSdkError, OTelSdkResult}; -use opentelemetry_sdk::metrics::exporter::ResourceMetricsRef; +use opentelemetry_sdk::metrics::exporter::ResourceMetrics; use super::OtlpHttpClient; impl MetricsClient for OtlpHttpClient { - async fn export(&self, metrics: ResourceMetricsRef<'_>) -> OTelSdkResult { + async fn export(&self, metrics: ResourceMetrics<'_>) -> OTelSdkResult { let client = self .client .lock() diff --git a/opentelemetry-otlp/src/exporter/http/mod.rs b/opentelemetry-otlp/src/exporter/http/mod.rs index a45787ea7e..90481ed962 100644 --- a/opentelemetry-otlp/src/exporter/http/mod.rs +++ b/opentelemetry-otlp/src/exporter/http/mod.rs @@ -27,7 +27,7 @@ use std::time::Duration; mod metrics; #[cfg(feature = "metrics")] -use opentelemetry_sdk::metrics::exporter::ResourceMetricsRef; +use opentelemetry_sdk::metrics::exporter::ResourceMetrics; #[cfg(feature = "logs")] pub(crate) mod logs; @@ -326,7 +326,7 @@ impl OtlpHttpClient { #[cfg(feature = "metrics")] fn build_metrics_export_body( &self, - metrics: ResourceMetricsRef<'_>, + metrics: ResourceMetrics<'_>, ) -> Option<(Vec, &'static str)> { use opentelemetry_proto::tonic::collector::metrics::v1::ExportMetricsServiceRequest; diff --git a/opentelemetry-otlp/src/exporter/tonic/metrics.rs b/opentelemetry-otlp/src/exporter/tonic/metrics.rs index d061dc0f20..48bfd8f2bd 100644 --- a/opentelemetry-otlp/src/exporter/tonic/metrics.rs +++ b/opentelemetry-otlp/src/exporter/tonic/metrics.rs @@ -6,7 +6,7 @@ use opentelemetry_proto::tonic::collector::metrics::v1::{ metrics_service_client::MetricsServiceClient, ExportMetricsServiceRequest, }; use opentelemetry_sdk::error::{OTelSdkError, OTelSdkResult}; -use opentelemetry_sdk::metrics::exporter::ResourceMetricsRef; +use opentelemetry_sdk::metrics::exporter::ResourceMetrics; use tonic::{codegen::CompressionEncoding, service::Interceptor, transport::Channel, Request}; use super::BoxInterceptor; @@ -52,7 +52,7 @@ impl TonicMetricsClient { } impl MetricsClient for TonicMetricsClient { - async fn export(&self, metrics: ResourceMetricsRef<'_>) -> OTelSdkResult { + async fn export(&self, metrics: ResourceMetrics<'_>) -> OTelSdkResult { let (mut client, metadata, extensions) = self .inner .lock() diff --git a/opentelemetry-otlp/src/metric.rs b/opentelemetry-otlp/src/metric.rs index f2b0221244..5a4c7398e0 100644 --- a/opentelemetry-otlp/src/metric.rs +++ b/opentelemetry-otlp/src/metric.rs @@ -17,7 +17,7 @@ use crate::{ExporterBuildError, NoExporterBuilderSet}; use core::fmt; use opentelemetry_sdk::error::OTelSdkResult; -use opentelemetry_sdk::metrics::exporter::ResourceMetricsRef; +use opentelemetry_sdk::metrics::exporter::ResourceMetrics; use opentelemetry_sdk::metrics::{exporter::PushMetricExporter, Temporality}; use std::fmt::{Debug, Formatter}; use std::time::Duration; @@ -122,7 +122,7 @@ impl HasHttpConfig for MetricExporterBuilder { pub(crate) trait MetricsClient: fmt::Debug + Send + Sync + 'static { fn export( &self, - metrics: ResourceMetricsRef<'_>, + metrics: ResourceMetrics<'_>, ) -> impl std::future::Future + Send; fn shutdown(&self) -> OTelSdkResult; } @@ -148,7 +148,7 @@ impl Debug for MetricExporter { } impl PushMetricExporter for MetricExporter { - async fn export(&self, metrics: ResourceMetricsRef<'_>) -> OTelSdkResult { + async fn export(&self, metrics: ResourceMetrics<'_>) -> OTelSdkResult { match &self.client { #[cfg(feature = "grpc-tonic")] SupportedTransportClient::Tonic(client) => client.export(metrics).await, diff --git a/opentelemetry-proto/src/transform/metrics.rs b/opentelemetry-proto/src/transform/metrics.rs index 1c4cd3c004..c6cb94844f 100644 --- a/opentelemetry-proto/src/transform/metrics.rs +++ b/opentelemetry-proto/src/transform/metrics.rs @@ -13,7 +13,7 @@ pub mod tonic { ExponentialHistogram as SdkExponentialHistogram, Gauge as SdkGauge, Histogram as SdkHistogram, Metric as SdkMetric, MetricData, Sum as SdkSum, }; - use opentelemetry_sdk::metrics::exporter::{ResourceMetricsRef, ScopeMetricsRef}; + use opentelemetry_sdk::metrics::exporter::{ResourceMetrics, ScopeMetrics}; use opentelemetry_sdk::metrics::Temporality; use opentelemetry_sdk::Resource as SdkResource; @@ -110,12 +110,16 @@ pub mod tonic { } } - impl From> for ExportMetricsServiceRequest { - fn from(rm: ResourceMetricsRef<'_>) -> Self { + impl From> for ExportMetricsServiceRequest { + fn from(mut rm: ResourceMetrics<'_>) -> Self { + let mut scope_metrics = Vec::new(); + while let Some(scope_metric) = rm.scope_metrics.next() { + scope_metrics.push(scope_metric.into()); + } ExportMetricsServiceRequest { resource_metrics: vec![TonicResourceMetrics { resource: Some(rm.resource.into()), - scope_metrics: rm.scope_metrics.map(Into::into).collect(), + scope_metrics, schema_url: rm.resource.schema_url().map(Into::into).unwrap_or_default(), }], } @@ -131,11 +135,15 @@ pub mod tonic { } } - impl From> for TonicScopeMetrics { - fn from(sm: ScopeMetricsRef<'_>) -> Self { + impl From> for TonicScopeMetrics { + fn from(mut sm: ScopeMetrics<'_>) -> Self { + let mut metrics = Vec::new(); + while let Some(metric) = sm.metrics.next() { + metrics.push(metric.into()); + } TonicScopeMetrics { scope: Some((sm.scope, None).into()), - metrics: sm.metrics.map(Into::into).collect(), + metrics, schema_url: sm .scope .schema_url() diff --git a/opentelemetry-sdk/CHANGELOG.md b/opentelemetry-sdk/CHANGELOG.md index 84e5887c9d..e21dd72088 100644 --- a/opentelemetry-sdk/CHANGELOG.md +++ b/opentelemetry-sdk/CHANGELOG.md @@ -2,6 +2,15 @@ ## vNext +- *Breaking* change for `PushMetricExporter::export` from accepting + `metrics: &mut ResourceMetrics`, to accepting `metrics: ResourceMetrics<'_>`. + In addition, `ResourceMetrics` was also changed to allow improving underlying + metric collection without any allocations in the future. + [#2921](https://github.com/open-telemetry/opentelemetry-rust/pull/2921) +- *Breaking* change for `Metric::data` field: From dynamic `Box` + to new enum `AggregatedMetrics`. + [#2857](https://github.com/open-telemetry/opentelemetry-rust/pull/2857) + [#2868](https://github.com/open-telemetry/opentelemetry-rust/pull/2868) `SdkLogger`, `SdkTracer` modified to respect telemetry suppression based on `Context`. In other words, if the current context has telemetry suppression @@ -40,6 +49,7 @@ also modified to suppress telemetry before invoking exporters. "spec_unstable_metrics_views". This was only required when using Views. [2928](https://github.com/open-telemetry/opentelemetry-rust/pull/2928) + ## 0.29.0 Released 2025-Mar-21 diff --git a/opentelemetry-sdk/benches/metric.rs b/opentelemetry-sdk/benches/metric.rs index 03208542b1..d3fe3b8c37 100644 --- a/opentelemetry-sdk/benches/metric.rs +++ b/opentelemetry-sdk/benches/metric.rs @@ -6,7 +6,7 @@ use opentelemetry::{ use opentelemetry_sdk::{ error::OTelSdkResult, metrics::{ - data::ResourceMetrics, new_view, reader::MetricReader, Aggregation, Instrument, + data::ResourceMetricsData, new_view, reader::MetricReader, Aggregation, Instrument, InstrumentKind, ManualReader, Pipeline, SdkMeterProvider, Stream, Temporality, View, }, Resource, @@ -23,7 +23,7 @@ impl MetricReader for SharedReader { self.0.register_pipeline(pipeline) } - fn collect(&self, rm: &mut ResourceMetrics) -> OTelSdkResult { + fn collect(&self, rm: &mut ResourceMetricsData) -> OTelSdkResult { self.0.collect(rm) } @@ -240,7 +240,7 @@ fn counters(c: &mut Criterion) { }); let (rdr, cntr) = bench_counter(None, "cumulative"); - let mut rm = ResourceMetrics { + let mut rm = ResourceMetricsData { resource: Resource::builder_empty().build(), scope_metrics: Vec::new(), }; @@ -337,7 +337,7 @@ fn benchmark_collect_histogram(b: &mut Bencher, n: usize) { h.record(1, &[]); } - let mut rm = ResourceMetrics { + let mut rm = ResourceMetricsData { resource: Resource::builder_empty().build(), scope_metrics: Vec::new(), }; diff --git a/opentelemetry-sdk/src/metrics/data/mod.rs b/opentelemetry-sdk/src/metrics/data/mod.rs index dbff5ba256..5d14637173 100644 --- a/opentelemetry-sdk/src/metrics/data/mod.rs +++ b/opentelemetry-sdk/src/metrics/data/mod.rs @@ -2,30 +2,10 @@ use std::{borrow::Cow, time::SystemTime}; -use opentelemetry::{InstrumentationScope, KeyValue}; - -use crate::Resource; +use opentelemetry::KeyValue; use super::Temporality; -/// A collection of [ScopeMetrics] and the associated [Resource] that created them. -#[derive(Debug)] -pub struct ResourceMetrics { - /// The entity that collected the metrics. - pub resource: Resource, - /// The collection of metrics with unique [InstrumentationScope]s. - pub scope_metrics: Vec, -} - -/// A collection of metrics produced by a meter. -#[derive(Debug, Default)] -pub struct ScopeMetrics { - /// The [InstrumentationScope] that the meter was created with. - pub scope: InstrumentationScope, - /// The list of aggregations created by the meter. - pub metrics: Vec, -} - /// A collection of one or more aggregated time series from an [Instrument]. /// /// [Instrument]: crate::metrics::Instrument diff --git a/opentelemetry-sdk/src/metrics/exporter.rs b/opentelemetry-sdk/src/metrics/exporter.rs index e6ce356bf0..4de2b4f5dc 100644 --- a/opentelemetry-sdk/src/metrics/exporter.rs +++ b/opentelemetry-sdk/src/metrics/exporter.rs @@ -6,83 +6,84 @@ use crate::{error::OTelSdkResult, Resource}; use std::{fmt::Debug, slice::Iter, time::Duration}; use super::{ - data::{Metric, ResourceMetrics, ScopeMetrics}, + data::Metric, + reader::{ResourceMetricsData, ScopeMetricsData}, Temporality, }; /// A collection of [`BatchScopeMetrics`] and the associated [Resource] that created them. #[derive(Debug)] -pub struct ResourceMetricsRef<'a> { +pub struct ResourceMetrics<'a> { /// The entity that collected the metrics. pub resource: &'a Resource, /// The collection of metrics with unique [InstrumentationScope]s. - pub scope_metrics: BatchScopeMetrics<'a>, + pub scope_metrics: ScopeMetricsLendingIter<'a>, } /// Iterator over libraries instrumentation scopes ([`InstrumentationScope`]) together with metrics. -pub struct BatchScopeMetrics<'a> { - iter: Iter<'a, ScopeMetrics>, +/// Doesn't implement standard [`Iterator`], because it returns borrowed items. AKA "LendingIterator". +pub struct ScopeMetricsLendingIter<'a> { + iter: Iter<'a, ScopeMetricsData>, } /// A collection of metrics produced by a [`InstrumentationScope`] meter. #[derive(Debug)] -pub struct ScopeMetricsRef<'a> { +pub struct ScopeMetrics<'a> { /// The [InstrumentationScope] that the meter was created with. pub scope: &'a InstrumentationScope, /// The list of aggregations created by the meter. - pub metrics: BatchMetrics<'a>, + pub metrics: MetricsLendingIter<'a>, } /// Iterator over aggregations created by the meter. -pub struct BatchMetrics<'a> { +/// Doesn't implement standard [`Iterator`], because it returns borrowed items. AKA "LendingIterator". +pub struct MetricsLendingIter<'a> { iter: Iter<'a, Metric>, } -impl<'a> ResourceMetricsRef<'a> { - pub(crate) fn new(rm: &'a ResourceMetrics) -> Self { +impl<'a> ResourceMetrics<'a> { + pub(crate) fn new(rm: &'a ResourceMetricsData) -> Self { Self { resource: &rm.resource, - scope_metrics: BatchScopeMetrics { + scope_metrics: ScopeMetricsLendingIter { iter: rm.scope_metrics.iter(), }, } } } -impl<'a> ScopeMetricsRef<'a> { - fn new(sm: &'a ScopeMetrics) -> Self { +impl<'a> ScopeMetrics<'a> { + fn new(sm: &'a ScopeMetricsData) -> Self { Self { scope: &sm.scope, - metrics: BatchMetrics { + metrics: MetricsLendingIter { iter: sm.metrics.iter(), }, } } } -impl Debug for BatchScopeMetrics<'_> { +impl Debug for ScopeMetricsLendingIter<'_> { fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { f.debug_struct("BatchScopeMetrics").finish() } } -impl<'a> Iterator for BatchScopeMetrics<'a> { - type Item = ScopeMetricsRef<'a>; - - fn next(&mut self) -> Option { - self.iter.next().map(ScopeMetricsRef::new) +impl ScopeMetricsLendingIter<'_> { + /// Advances the iterator and returns the next value. + pub fn next(&mut self) -> Option> { + self.iter.next().map(ScopeMetrics::new) } } -impl<'a> Iterator for BatchMetrics<'a> { - type Item = &'a Metric; - - fn next(&mut self) -> Option { +impl MetricsLendingIter<'_> { + /// Advances the iterator and returns the next value. + pub fn next(&mut self) -> Option<&Metric> { self.iter.next() } } -impl Debug for BatchMetrics<'_> { +impl Debug for MetricsLendingIter<'_> { fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { f.debug_struct("BatchMetrics").finish() } @@ -99,7 +100,7 @@ pub trait PushMetricExporter: Send + Sync + 'static { /// considered unrecoverable and will be logged. fn export( &self, - metrics: ResourceMetricsRef<'_>, + metrics: ResourceMetrics<'_>, ) -> impl std::future::Future + Send; /// Flushes any metric data held by an exporter. diff --git a/opentelemetry-sdk/src/metrics/in_memory_exporter.rs b/opentelemetry-sdk/src/metrics/in_memory_exporter.rs index e47c58c952..b4affadcd8 100644 --- a/opentelemetry-sdk/src/metrics/in_memory_exporter.rs +++ b/opentelemetry-sdk/src/metrics/in_memory_exporter.rs @@ -8,8 +8,9 @@ use std::fmt; use std::sync::{Arc, Mutex}; use std::time::Duration; -use super::data::{AggregatedMetrics, Metric, ResourceMetrics, ScopeMetrics}; -use super::exporter::ResourceMetricsRef; +use super::data::{AggregatedMetrics, Metric}; +use super::exporter::ResourceMetrics; +use super::reader::{ResourceMetricsData, ScopeMetricsData}; /// An in-memory metrics exporter that stores metrics data in memory. /// @@ -59,7 +60,7 @@ use super::exporter::ResourceMetricsRef; ///# } /// ``` pub struct InMemoryMetricExporter { - metrics: Arc>>, + metrics: Arc>>, temporality: Temporality, } @@ -145,19 +146,19 @@ impl InMemoryMetricExporter { /// let exporter = InMemoryMetricExporter::default(); /// let finished_metrics = exporter.get_finished_metrics().unwrap(); /// ``` - pub fn get_finished_metrics(&self) -> Result, InMemoryExporterError> { + pub fn get_finished_metrics(&self) -> Result, InMemoryExporterError> { let metrics = self .metrics .lock() .map(|metrics_guard| { metrics_guard .iter() - .map(|data| ResourceMetrics { + .map(|data| ResourceMetricsData { resource: data.resource.clone(), scope_metrics: data .scope_metrics .iter() - .map(|data| ScopeMetrics { + .map(|data| ScopeMetricsData { scope: data.scope.clone(), metrics: data .metrics @@ -195,24 +196,26 @@ impl InMemoryMetricExporter { .map(|mut metrics_guard| metrics_guard.clear()); } - fn clone_metrics(metric: ResourceMetricsRef<'_>) -> ResourceMetrics { - ResourceMetrics { + fn clone_metrics(mut metric: ResourceMetrics<'_>) -> ResourceMetricsData { + let mut scope_metrics = Vec::new(); + while let Some(mut scope_metric) = metric.scope_metrics.next() { + let mut metrics = Vec::new(); + while let Some(metric) = scope_metric.metrics.next() { + metrics.push(Metric { + name: metric.name.clone(), + description: metric.description.clone(), + unit: metric.unit.clone(), + data: Self::clone_data(&metric.data), + }); + } + scope_metrics.push(ScopeMetricsData { + scope: scope_metric.scope.clone(), + metrics, + }); + } + ResourceMetricsData { resource: metric.resource.clone(), - scope_metrics: metric - .scope_metrics - .map(|scope_metric| ScopeMetrics { - scope: scope_metric.scope.clone(), - metrics: scope_metric - .metrics - .map(|metric| Metric { - name: metric.name.clone(), - description: metric.description.clone(), - unit: metric.unit.clone(), - data: Self::clone_data(&metric.data), - }) - .collect(), - }) - .collect(), + scope_metrics, } } @@ -258,7 +261,7 @@ impl InMemoryMetricExporter { } impl PushMetricExporter for InMemoryMetricExporter { - async fn export(&self, metrics: ResourceMetricsRef<'_>) -> OTelSdkResult { + async fn export(&self, metrics: ResourceMetrics<'_>) -> OTelSdkResult { self.metrics .lock() .map(|mut metrics_guard| { diff --git a/opentelemetry-sdk/src/metrics/manual_reader.rs b/opentelemetry-sdk/src/metrics/manual_reader.rs index 855a6a14f4..99df24a481 100644 --- a/opentelemetry-sdk/src/metrics/manual_reader.rs +++ b/opentelemetry-sdk/src/metrics/manual_reader.rs @@ -10,8 +10,8 @@ use crate::{ metrics::Temporality, }; +use super::reader::ResourceMetricsData; use super::{ - data::ResourceMetrics, pipeline::Pipeline, reader::{MetricReader, SdkProducer}, }; @@ -90,7 +90,7 @@ impl MetricReader for ManualReader { /// callbacks necessary and returning the results. /// /// Returns an error if called after shutdown. - fn collect(&self, rm: &mut ResourceMetrics) -> OTelSdkResult { + fn collect(&self, rm: &mut ResourceMetricsData) -> OTelSdkResult { let inner = self .inner .lock() diff --git a/opentelemetry-sdk/src/metrics/mod.rs b/opentelemetry-sdk/src/metrics/mod.rs index 05597f25e5..493b083686 100644 --- a/opentelemetry-sdk/src/metrics/mod.rs +++ b/opentelemetry-sdk/src/metrics/mod.rs @@ -124,9 +124,9 @@ pub enum Temporality { mod tests { use self::data::{HistogramDataPoint, SumDataPoint}; use super::data::MetricData; - use super::data::ResourceMetrics; - use super::data::ScopeMetrics; use super::internal::Number; + use super::reader::ResourceMetricsData; + use super::reader::ScopeMetricsData; use super::*; use crate::metrics::internal::AggregatedMetricsAccess; use crate::metrics::InMemoryMetricExporter; @@ -2903,9 +2903,9 @@ mod tests { } fn find_scope_metric<'a>( - metrics: &'a [ScopeMetrics], + metrics: &'a [ScopeMetricsData], name: &'a str, - ) -> Option<&'a ScopeMetrics> { + ) -> Option<&'a ScopeMetricsData> { metrics .iter() .find(|&scope_metric| scope_metric.scope.name() == name) @@ -2916,7 +2916,7 @@ mod tests { meter_provider: SdkMeterProvider, // Saving this on the test context for lifetime simplicity - resource_metrics: Vec, + resource_metrics: Vec, } impl TestContext { diff --git a/opentelemetry-sdk/src/metrics/periodic_reader.rs b/opentelemetry-sdk/src/metrics/periodic_reader.rs index dac40fd7d4..be6a8a6951 100644 --- a/opentelemetry-sdk/src/metrics/periodic_reader.rs +++ b/opentelemetry-sdk/src/metrics/periodic_reader.rs @@ -13,14 +13,16 @@ use opentelemetry::{otel_debug, otel_error, otel_info, otel_warn, Context}; use crate::{ error::{OTelSdkError, OTelSdkResult}, metrics::{ - exporter::{PushMetricExporter, ResourceMetricsRef}, + exporter::{PushMetricExporter, ResourceMetrics}, reader::SdkProducer, }, Resource, }; use super::{ - data::ResourceMetrics, instrument::InstrumentKind, pipeline::Pipeline, reader::MetricReader, + instrument::InstrumentKind, + pipeline::Pipeline, + reader::{MetricReader, ResourceMetricsData}, Temporality, }; @@ -361,7 +363,7 @@ impl PeriodicReaderInner { self.exporter.temporality() } - fn collect(&self, rm: &mut ResourceMetrics) -> OTelSdkResult { + fn collect(&self, rm: &mut ResourceMetricsData) -> OTelSdkResult { let producer = self.producer.lock().expect("lock poisoned"); if let Some(p) = producer.as_ref() { p.upgrade() @@ -384,7 +386,7 @@ impl PeriodicReaderInner { fn collect_and_export(&self) -> OTelSdkResult { // TODO: Reuse the internal vectors. Or refactor to avoid needing any // owned data structures to be passed to exporters. - let mut rm = ResourceMetrics { + let mut rm = ResourceMetricsData { resource: Resource::empty(), scope_metrics: Vec::new(), }; @@ -414,7 +416,7 @@ impl PeriodicReaderInner { // Relying on futures executor to execute async call. // TODO: Pass timeout to exporter - futures_executor::block_on(self.exporter.export(ResourceMetricsRef::new(&rm))) + futures_executor::block_on(self.exporter.export(ResourceMetrics::new(&rm))) } fn force_flush(&self) -> OTelSdkResult { @@ -485,7 +487,7 @@ impl MetricReader for PeriodicReader { self.inner.register_pipeline(pipeline); } - fn collect(&self, rm: &mut ResourceMetrics) -> OTelSdkResult { + fn collect(&self, rm: &mut ResourceMetricsData) -> OTelSdkResult { self.inner.collect(rm) } @@ -519,9 +521,8 @@ mod tests { use crate::{ error::{OTelSdkError, OTelSdkResult}, metrics::{ - data::ResourceMetrics, - exporter::{PushMetricExporter, ResourceMetricsRef}, - reader::MetricReader, + exporter::{PushMetricExporter, ResourceMetrics}, + reader::{MetricReader, ResourceMetricsData}, InMemoryMetricExporter, SdkMeterProvider, Temporality, }, Resource, @@ -558,7 +559,7 @@ mod tests { } impl PushMetricExporter for MetricExporterThatFailsOnlyOnFirst { - async fn export(&self, _metrics: ResourceMetricsRef<'_>) -> OTelSdkResult { + async fn export(&self, _metrics: ResourceMetrics<'_>) -> OTelSdkResult { if self.count.fetch_add(1, Ordering::Relaxed) == 0 { Err(OTelSdkError::InternalFailure("export failed".into())) } else { @@ -589,7 +590,7 @@ mod tests { } impl PushMetricExporter for MockMetricExporter { - async fn export(&self, _metrics: ResourceMetricsRef<'_>) -> OTelSdkResult { + async fn export(&self, _metrics: ResourceMetrics<'_>) -> OTelSdkResult { Ok(()) } @@ -703,7 +704,7 @@ mod tests { let exporter = InMemoryMetricExporter::default(); let reader = PeriodicReader::builder(exporter.clone()).build(); - let rm = &mut ResourceMetrics { + let rm = &mut ResourceMetricsData { resource: Resource::empty(), scope_metrics: Vec::new(), }; diff --git a/opentelemetry-sdk/src/metrics/periodic_reader_with_async_runtime.rs b/opentelemetry-sdk/src/metrics/periodic_reader_with_async_runtime.rs index 9ca9bfd063..e8c53f9981 100644 --- a/opentelemetry-sdk/src/metrics/periodic_reader_with_async_runtime.rs +++ b/opentelemetry-sdk/src/metrics/periodic_reader_with_async_runtime.rs @@ -13,7 +13,8 @@ use futures_util::{ }; use opentelemetry::{otel_debug, otel_error}; -use crate::metrics::exporter::ResourceMetricsRef; +use crate::metrics::exporter::ResourceMetrics; +use crate::metrics::reader::ResourceMetricsData; use crate::runtime::{to_interval_stream, Runtime}; use crate::{ error::{OTelSdkError, OTelSdkResult}, @@ -21,9 +22,7 @@ use crate::{ Resource, }; -use super::{ - data::ResourceMetrics, instrument::InstrumentKind, pipeline::Pipeline, reader::MetricReader, -}; +use super::{instrument::InstrumentKind, pipeline::Pipeline, reader::MetricReader}; const DEFAULT_TIMEOUT: Duration = Duration::from_secs(30); const DEFAULT_INTERVAL: Duration = Duration::from_secs(60); @@ -121,7 +120,7 @@ where reader, timeout: self.timeout, runtime, - rm: ResourceMetrics { + rm: ResourceMetricsData { resource: Resource::empty(), scope_metrics: Vec::new(), }, @@ -239,7 +238,7 @@ struct PeriodicReaderWorker { reader: PeriodicReader, timeout: Duration, runtime: RT, - rm: ResourceMetrics, + rm: ResourceMetricsData, } impl PeriodicReaderWorker { @@ -260,10 +259,7 @@ impl PeriodicReaderWorker { message = "Calling exporter's export method with collected metrics.", count = self.rm.scope_metrics.len(), ); - let export = self - .reader - .exporter - .export(ResourceMetricsRef::new(&self.rm)); + let export = self.reader.exporter.export(ResourceMetrics::new(&self.rm)); let timeout = self.runtime.delay(self.timeout); pin_mut!(export); pin_mut!(timeout); @@ -357,7 +353,7 @@ impl MetricReader for PeriodicReader { worker(self); } - fn collect(&self, rm: &mut ResourceMetrics) -> OTelSdkResult { + fn collect(&self, rm: &mut ResourceMetricsData) -> OTelSdkResult { let inner = self .inner .lock() @@ -447,11 +443,8 @@ impl MetricReader for PeriodicReader { mod tests { use super::PeriodicReader; use crate::error::OTelSdkError; - use crate::metrics::reader::MetricReader; - use crate::{ - metrics::data::ResourceMetrics, metrics::InMemoryMetricExporter, metrics::SdkMeterProvider, - runtime, Resource, - }; + use crate::metrics::reader::{MetricReader, ResourceMetricsData}; + use crate::{metrics::InMemoryMetricExporter, metrics::SdkMeterProvider, runtime, Resource}; use opentelemetry::metrics::MeterProvider; use std::sync::mpsc; @@ -498,7 +491,7 @@ mod tests { // Arrange let exporter = InMemoryMetricExporter::default(); let reader = PeriodicReader::builder(exporter.clone(), runtime::Tokio).build(); - let mut rm = ResourceMetrics { + let mut rm = ResourceMetricsData { resource: Resource::empty(), scope_metrics: Vec::new(), }; diff --git a/opentelemetry-sdk/src/metrics/pipeline.rs b/opentelemetry-sdk/src/metrics/pipeline.rs index 05d2861807..e726399d1a 100644 --- a/opentelemetry-sdk/src/metrics/pipeline.rs +++ b/opentelemetry-sdk/src/metrics/pipeline.rs @@ -11,11 +11,11 @@ use crate::{ error::{OTelSdkError, OTelSdkResult}, metrics::{ aggregation, - data::{Metric, ResourceMetrics, ScopeMetrics}, + data::Metric, error::{MetricError, MetricResult}, instrument::{Instrument, InstrumentId, InstrumentKind, Stream}, internal::{self, AggregateBuilder, Number}, - reader::{MetricReader, SdkProducer}, + reader::{MetricReader, ScopeMetricsData, SdkProducer}, view::View, }, Resource, @@ -23,7 +23,7 @@ use crate::{ use self::internal::AggregateFns; -use super::{aggregation::Aggregation, Temporality}; +use super::{aggregation::Aggregation, reader::ResourceMetricsData, Temporality}; /// Connects all of the instruments created by a meter provider to a [MetricReader]. /// @@ -100,7 +100,7 @@ impl Pipeline { impl SdkProducer for Pipeline { /// Returns aggregated metrics from a single collection. - fn produce(&self, rm: &mut ResourceMetrics) -> OTelSdkResult { + fn produce(&self, rm: &mut ResourceMetricsData) -> OTelSdkResult { let inner = self .inner .lock() @@ -125,7 +125,7 @@ impl SdkProducer for Pipeline { let sm = match rm.scope_metrics.get_mut(i) { Some(sm) => sm, None => { - rm.scope_metrics.push(ScopeMetrics::default()); + rm.scope_metrics.push(ScopeMetricsData::default()); rm.scope_metrics.last_mut().unwrap() } }; diff --git a/opentelemetry-sdk/src/metrics/reader.rs b/opentelemetry-sdk/src/metrics/reader.rs index ae19841155..4d4e0eb0ad 100644 --- a/opentelemetry-sdk/src/metrics/reader.rs +++ b/opentelemetry-sdk/src/metrics/reader.rs @@ -1,9 +1,31 @@ //! Interfaces for reading and producing metrics +use opentelemetry::InstrumentationScope; + use crate::error::OTelSdkResult; +use crate::Resource; use std::time::Duration; use std::{fmt, sync::Weak}; -use super::{data::ResourceMetrics, instrument::InstrumentKind, pipeline::Pipeline, Temporality}; +use super::data::Metric; +use super::{pipeline::Pipeline, InstrumentKind, Temporality}; + +/// A collection of [ScopeMetricsData] and the associated [Resource] that created them. +#[derive(Debug)] +pub struct ResourceMetricsData { + /// The entity that collected the metrics. + pub resource: Resource, + /// The collection of metrics with unique [InstrumentationScope]s. + pub scope_metrics: Vec, +} + +/// A collection of metrics produced by a meter. +#[derive(Debug, Default)] +pub struct ScopeMetricsData { + /// The [InstrumentationScope] that the meter was created with. + pub scope: InstrumentationScope, + /// The list of aggregations created by the meter. + pub metrics: Vec, +} /// The interface used between the SDK and an exporter. /// @@ -27,10 +49,10 @@ pub trait MetricReader: fmt::Debug + Send + Sync + 'static { fn register_pipeline(&self, pipeline: Weak); /// Gathers and returns all metric data related to the [MetricReader] from the - /// SDK and stores it in the provided [ResourceMetrics] reference. + /// SDK and stores it in the provided [ResourceMetricsData] reference. /// /// An error is returned if this is called after shutdown. - fn collect(&self, rm: &mut ResourceMetrics) -> OTelSdkResult; + fn collect(&self, rm: &mut ResourceMetricsData) -> OTelSdkResult; /// Flushes all metric measurements held in an export pipeline. /// @@ -63,5 +85,5 @@ pub trait MetricReader: fmt::Debug + Send + Sync + 'static { /// Produces metrics for a [MetricReader]. pub(crate) trait SdkProducer: fmt::Debug + Send + Sync { /// Returns aggregated metrics from a single collection. - fn produce(&self, rm: &mut ResourceMetrics) -> OTelSdkResult; + fn produce(&self, rm: &mut ResourceMetricsData) -> OTelSdkResult; } diff --git a/opentelemetry-sdk/src/testing/metrics/metric_reader.rs b/opentelemetry-sdk/src/testing/metrics/metric_reader.rs index 27f72321cc..217c1f5052 100644 --- a/opentelemetry-sdk/src/testing/metrics/metric_reader.rs +++ b/opentelemetry-sdk/src/testing/metrics/metric_reader.rs @@ -1,8 +1,7 @@ use crate::error::{OTelSdkError, OTelSdkResult}; +use crate::metrics::reader::ResourceMetricsData; use crate::metrics::Temporality; -use crate::metrics::{ - data::ResourceMetrics, instrument::InstrumentKind, pipeline::Pipeline, reader::MetricReader, -}; +use crate::metrics::{instrument::InstrumentKind, pipeline::Pipeline, reader::MetricReader}; use std::sync::{Arc, Mutex, Weak}; use std::time::Duration; @@ -34,7 +33,7 @@ impl Default for TestMetricReader { impl MetricReader for TestMetricReader { fn register_pipeline(&self, _pipeline: Weak) {} - fn collect(&self, _rm: &mut ResourceMetrics) -> OTelSdkResult { + fn collect(&self, _rm: &mut ResourceMetricsData) -> OTelSdkResult { Ok(()) } diff --git a/opentelemetry-stdout/src/metrics/exporter.rs b/opentelemetry-stdout/src/metrics/exporter.rs index 0ec9a073c9..414162c1ec 100644 --- a/opentelemetry-stdout/src/metrics/exporter.rs +++ b/opentelemetry-stdout/src/metrics/exporter.rs @@ -1,7 +1,9 @@ use chrono::{DateTime, Utc}; use core::{f64, fmt}; use opentelemetry_sdk::metrics::data::{AggregatedMetrics, MetricData}; -use opentelemetry_sdk::metrics::exporter::{BatchScopeMetrics, ResourceMetricsRef}; +use opentelemetry_sdk::metrics::exporter::{ + MetricsLendingIter, ResourceMetrics, ScopeMetricsLendingIter, +}; use opentelemetry_sdk::metrics::Temporality; use opentelemetry_sdk::{ error::OTelSdkResult, @@ -40,7 +42,7 @@ impl fmt::Debug for MetricExporter { impl PushMetricExporter for MetricExporter { /// Write Metrics to stdout - async fn export(&self, metrics: ResourceMetricsRef<'_>) -> OTelSdkResult { + async fn export(&self, metrics: ResourceMetrics<'_>) -> OTelSdkResult { if self.is_shutdown.load(atomic::Ordering::SeqCst) { Err(opentelemetry_sdk::error::OTelSdkError::AlreadyShutdown) } else { @@ -53,7 +55,7 @@ impl PushMetricExporter for MetricExporter { metrics.resource.iter().for_each(|(k, v)| { println!("\t -> {}={:?}", k, v); }); - print_metrics(metrics.scope_metrics); + print_scope_metrics(metrics.scope_metrics); Ok(()) } } @@ -77,17 +79,19 @@ impl PushMetricExporter for MetricExporter { } } -fn print_metrics(metrics: BatchScopeMetrics<'_>) { - for (i, metric) in metrics.enumerate() { - println!("\tInstrumentation Scope #{}", i); - println!("\t\tName : {}", &metric.scope.name()); - if let Some(version) = &metric.scope.version() { +fn print_scope_metrics(mut metrics: ScopeMetricsLendingIter<'_>) { + let mut iter = 0; + while let Some(scope_metric) = metrics.next() { + iter += 1; + println!("\tInstrumentation Scope #{}", iter); + println!("\t\tName : {}", &scope_metric.scope.name()); + if let Some(version) = &scope_metric.scope.version() { println!("\t\tVersion : {:?}", version); } - if let Some(schema_url) = &metric.scope.schema_url() { + if let Some(schema_url) = &scope_metric.scope.schema_url() { println!("\t\tSchemaUrl: {:?}", schema_url); } - metric + scope_metric .scope .attributes() .enumerate() @@ -97,41 +101,47 @@ fn print_metrics(metrics: BatchScopeMetrics<'_>) { } println!("\t\t\t -> {}: {}", kv.key, kv.value); }); + print_metrics(scope_metric.metrics); + } +} - metric.metrics.enumerate().for_each(|(i, metric)| { - println!("Metric #{}", i); - println!("\t\tName : {}", &metric.name); - println!("\t\tDescription : {}", &metric.description); - println!("\t\tUnit : {}", &metric.unit); +fn print_metrics(mut metrics: MetricsLendingIter<'_>) { + let mut iter = 0; + while let Some(metric) = metrics.next() { + iter += 1; - fn print_info(data: &MetricData) - where - T: Debug, - { - match data { - MetricData::Gauge(gauge) => { - println!("\t\tType : Gauge"); - print_gauge(gauge); - } - MetricData::Sum(sum) => { - println!("\t\tType : Sum"); - print_sum(sum); - } - MetricData::Histogram(hist) => { - println!("\t\tType : Histogram"); - print_histogram(hist); - } - MetricData::ExponentialHistogram(_) => { - println!("\t\tType : Exponential Histogram"); - } + println!("Metric #{}", iter); + println!("\t\tName : {}", &metric.name); + println!("\t\tDescription : {}", &metric.description); + println!("\t\tUnit : {}", &metric.unit); + + fn print_info(data: &MetricData) + where + T: Debug, + { + match data { + MetricData::Gauge(gauge) => { + println!("\t\tType : Gauge"); + print_gauge(gauge); + } + MetricData::Sum(sum) => { + println!("\t\tType : Sum"); + print_sum(sum); + } + MetricData::Histogram(hist) => { + println!("\t\tType : Histogram"); + print_histogram(hist); + } + MetricData::ExponentialHistogram(_) => { + println!("\t\tType : Exponential Histogram"); } } - match &metric.data { - AggregatedMetrics::F64(data) => print_info(data), - AggregatedMetrics::U64(data) => print_info(data), - AggregatedMetrics::I64(data) => print_info(data), - } - }); + } + match &metric.data { + AggregatedMetrics::F64(data) => print_info(data), + AggregatedMetrics::U64(data) => print_info(data), + AggregatedMetrics::I64(data) => print_info(data), + } } } From e2280f7c7da0790b535d7828a91fa13ba1c70892 Mon Sep 17 00:00:00 2001 From: Mindaugas Vinkelis Date: Fri, 11 Apr 2025 23:57:37 +0300 Subject: [PATCH 3/4] Make Metric borrowable --- opentelemetry-proto/src/transform/metrics.rs | 14 +++---- opentelemetry-sdk/src/metrics/data/mod.rs | 17 +-------- opentelemetry-sdk/src/metrics/exporter.rs | 26 ++++++++++--- .../src/metrics/in_memory_exporter.rs | 16 +++----- opentelemetry-sdk/src/metrics/instrument.rs | 13 +++++++ opentelemetry-sdk/src/metrics/mod.rs | 38 +++++++++---------- opentelemetry-sdk/src/metrics/pipeline.rs | 34 ++++++++--------- opentelemetry-sdk/src/metrics/reader.rs | 16 +++++++- opentelemetry-stdout/src/metrics/exporter.rs | 6 +-- 9 files changed, 98 insertions(+), 82 deletions(-) diff --git a/opentelemetry-proto/src/transform/metrics.rs b/opentelemetry-proto/src/transform/metrics.rs index c6cb94844f..4930fdfb8b 100644 --- a/opentelemetry-proto/src/transform/metrics.rs +++ b/opentelemetry-proto/src/transform/metrics.rs @@ -11,9 +11,9 @@ pub mod tonic { use opentelemetry_sdk::metrics::data::{ AggregatedMetrics, Exemplar as SdkExemplar, ExponentialHistogram as SdkExponentialHistogram, Gauge as SdkGauge, - Histogram as SdkHistogram, Metric as SdkMetric, MetricData, Sum as SdkSum, + Histogram as SdkHistogram, MetricData, Sum as SdkSum, }; - use opentelemetry_sdk::metrics::exporter::{ResourceMetrics, ScopeMetrics}; + use opentelemetry_sdk::metrics::exporter::{Metric, ResourceMetrics, ScopeMetrics}; use opentelemetry_sdk::metrics::Temporality; use opentelemetry_sdk::Resource as SdkResource; @@ -153,12 +153,12 @@ pub mod tonic { } } - impl From<&SdkMetric> for TonicMetric { - fn from(metric: &SdkMetric) -> Self { + impl From> for TonicMetric { + fn from(metric: Metric<'_>) -> Self { TonicMetric { - name: metric.name.to_string(), - description: metric.description.to_string(), - unit: metric.unit.to_string(), + name: metric.instrument.name.to_string(), + description: metric.instrument.description.to_string(), + unit: metric.instrument.unit.to_string(), metadata: vec![], // internal and currently unused data: Some(match &metric.data { AggregatedMetrics::F64(data) => data.into(), diff --git a/opentelemetry-sdk/src/metrics/data/mod.rs b/opentelemetry-sdk/src/metrics/data/mod.rs index 5d14637173..f3dc09abc9 100644 --- a/opentelemetry-sdk/src/metrics/data/mod.rs +++ b/opentelemetry-sdk/src/metrics/data/mod.rs @@ -1,26 +1,11 @@ //! Types for delivery of pre-aggregated metric time series data. -use std::{borrow::Cow, time::SystemTime}; +use std::time::SystemTime; use opentelemetry::KeyValue; use super::Temporality; -/// A collection of one or more aggregated time series from an [Instrument]. -/// -/// [Instrument]: crate::metrics::Instrument -#[derive(Debug)] -pub struct Metric { - /// The name of the instrument that created this data. - pub name: Cow<'static, str>, - /// The description of the instrument, which can be used in documentation. - pub description: Cow<'static, str>, - /// The unit in which the instrument reports. - pub unit: Cow<'static, str>, - /// The aggregated data from an instrument. - pub data: AggregatedMetrics, -} - /// Aggregated metrics data from an instrument #[derive(Debug)] pub enum AggregatedMetrics { diff --git a/opentelemetry-sdk/src/metrics/exporter.rs b/opentelemetry-sdk/src/metrics/exporter.rs index 4de2b4f5dc..2a7e4b1f97 100644 --- a/opentelemetry-sdk/src/metrics/exporter.rs +++ b/opentelemetry-sdk/src/metrics/exporter.rs @@ -6,9 +6,9 @@ use crate::{error::OTelSdkResult, Resource}; use std::{fmt::Debug, slice::Iter, time::Duration}; use super::{ - data::Metric, - reader::{ResourceMetricsData, ScopeMetricsData}, - Temporality, + data::AggregatedMetrics, + reader::{MetricsData, ResourceMetricsData, ScopeMetricsData}, + InstrumentInfo, Temporality, }; /// A collection of [`BatchScopeMetrics`] and the associated [Resource] that created them. @@ -38,7 +38,18 @@ pub struct ScopeMetrics<'a> { /// Iterator over aggregations created by the meter. /// Doesn't implement standard [`Iterator`], because it returns borrowed items. AKA "LendingIterator". pub struct MetricsLendingIter<'a> { - iter: Iter<'a, Metric>, + iter: Iter<'a, MetricsData>, +} + +/// A collection of one or more aggregated time series from an [Instrument]. +/// +/// [Instrument]: crate::metrics::Instrument +#[derive(Debug)] +pub struct Metric<'a> { + /// The name of the instrument that created this data. + pub instrument: &'a InstrumentInfo, + /// The aggregated data from an instrument. + pub data: &'a AggregatedMetrics, } impl<'a> ResourceMetrics<'a> { @@ -78,8 +89,11 @@ impl ScopeMetricsLendingIter<'_> { impl MetricsLendingIter<'_> { /// Advances the iterator and returns the next value. - pub fn next(&mut self) -> Option<&Metric> { - self.iter.next() + pub fn next(&mut self) -> Option> { + self.iter.next().map(|metric| Metric { + instrument: &metric.instrument, + data: &metric.data, + }) } } diff --git a/opentelemetry-sdk/src/metrics/in_memory_exporter.rs b/opentelemetry-sdk/src/metrics/in_memory_exporter.rs index b4affadcd8..77b6f7d193 100644 --- a/opentelemetry-sdk/src/metrics/in_memory_exporter.rs +++ b/opentelemetry-sdk/src/metrics/in_memory_exporter.rs @@ -8,9 +8,9 @@ use std::fmt; use std::sync::{Arc, Mutex}; use std::time::Duration; -use super::data::{AggregatedMetrics, Metric}; +use super::data::AggregatedMetrics; use super::exporter::ResourceMetrics; -use super::reader::{ResourceMetricsData, ScopeMetricsData}; +use super::reader::{MetricsData, ResourceMetricsData, ScopeMetricsData}; /// An in-memory metrics exporter that stores metrics data in memory. /// @@ -163,10 +163,8 @@ impl InMemoryMetricExporter { metrics: data .metrics .iter() - .map(|data| Metric { - name: data.name.clone(), - description: data.description.clone(), - unit: data.unit.clone(), + .map(|data| MetricsData { + instrument: data.instrument.clone(), data: Self::clone_data(&data.data), }) .collect(), @@ -201,10 +199,8 @@ impl InMemoryMetricExporter { while let Some(mut scope_metric) = metric.scope_metrics.next() { let mut metrics = Vec::new(); while let Some(metric) = scope_metric.metrics.next() { - metrics.push(Metric { - name: metric.name.clone(), - description: metric.description.clone(), - unit: metric.unit.clone(), + metrics.push(MetricsData { + instrument: metric.instrument.clone(), data: Self::clone_data(&metric.data), }); } diff --git a/opentelemetry-sdk/src/metrics/instrument.rs b/opentelemetry-sdk/src/metrics/instrument.rs index 559e9c5328..6ca88ac072 100644 --- a/opentelemetry-sdk/src/metrics/instrument.rs +++ b/opentelemetry-sdk/src/metrics/instrument.rs @@ -65,6 +65,19 @@ impl InstrumentKind { } } +/// Describes properties an instrument is created with +#[derive(Debug, Clone)] +pub struct InstrumentInfo { + /// The human-readable identifier of the instrument. + pub name: Cow<'static, str>, + /// describes the purpose of the instrument. + pub description: Cow<'static, str>, + /// The functional group of the instrument. + pub kind: InstrumentKind, + /// Unit is the unit of measurement recorded by the instrument. + pub unit: Cow<'static, str>, +} + /// Describes properties an instrument is created with, also used for filtering /// in [View](crate::metrics::View)s. /// diff --git a/opentelemetry-sdk/src/metrics/mod.rs b/opentelemetry-sdk/src/metrics/mod.rs index 493b083686..e415e11e72 100644 --- a/opentelemetry-sdk/src/metrics/mod.rs +++ b/opentelemetry-sdk/src/metrics/mod.rs @@ -690,8 +690,8 @@ mod tests { "There should be single metric merging duplicate instruments" ); let metric = &resource_metrics[0].scope_metrics[0].metrics[0]; - assert_eq!(metric.name, "my_counter"); - assert_eq!(metric.unit, "my_unit"); + assert_eq!(metric.instrument.name, "my_counter"); + assert_eq!(metric.instrument.unit, "my_unit"); let MetricData::Sum(sum) = u64::extract_metrics_data_ref(&metric.data) .expect("Sum aggregation expected for Counter instruments by default") else { @@ -756,9 +756,9 @@ mod tests { if let Some(scope1) = scope1 { let metric1 = &scope1.metrics[0]; - assert_eq!(metric1.name, "my_counter"); - assert_eq!(metric1.unit, "my_unit"); - assert_eq!(metric1.description, "my_description"); + assert_eq!(metric1.instrument.name, "my_counter"); + assert_eq!(metric1.instrument.unit, "my_unit"); + assert_eq!(metric1.instrument.description, "my_description"); let MetricData::Sum(sum1) = u64::extract_metrics_data_ref(&metric1.data) .expect("Sum aggregation expected for Counter instruments by default") else { @@ -776,9 +776,9 @@ mod tests { if let Some(scope2) = scope2 { let metric2 = &scope2.metrics[0]; - assert_eq!(metric2.name, "my_counter"); - assert_eq!(metric2.unit, "my_unit"); - assert_eq!(metric2.description, "my_description"); + assert_eq!(metric2.instrument.name, "my_counter"); + assert_eq!(metric2.instrument.unit, "my_unit"); + assert_eq!(metric2.instrument.description, "my_description"); let MetricData::Sum(sum2) = u64::extract_metrics_data_ref(&metric2.data) .expect("Sum aggregation expected for Counter instruments by default") @@ -862,9 +862,9 @@ mod tests { assert!(scope.attributes().eq(&[KeyValue::new("key", "value1")])); let metric = &resource_metrics[0].scope_metrics[0].metrics[0]; - assert_eq!(metric.name, "my_counter"); - assert_eq!(metric.unit, "my_unit"); - assert_eq!(metric.description, "my_description"); + assert_eq!(metric.instrument.name, "my_counter"); + assert_eq!(metric.instrument.unit, "my_unit"); + assert_eq!(metric.instrument.description, "my_description"); let MetricData::Sum(sum) = u64::extract_metrics_data_ref(&metric.data) .expect("Sum aggregation expected for Counter instruments by default") @@ -919,11 +919,11 @@ mod tests { assert!(!resource_metrics.is_empty()); let metric = &resource_metrics[0].scope_metrics[0].metrics[0]; assert_eq!( - metric.name, "test_histogram", + metric.instrument.name, "test_histogram", "View rename should be ignored and original name retained." ); assert_eq!( - metric.unit, "test_unit", + metric.instrument.unit, "test_unit", "View rename of unit should be ignored and original unit retained." ); } @@ -985,7 +985,7 @@ mod tests { .expect("metrics are expected to be exported."); assert!(!resource_metrics.is_empty()); let metric = &resource_metrics[0].scope_metrics[0].metrics[0]; - assert_eq!(metric.name, "my_observable_counter",); + assert_eq!(metric.instrument.name, "my_observable_counter",); let MetricData::Sum(sum) = u64::extract_metrics_data_ref(&metric.data) .expect("Sum aggregation expected for ObservableCounter instruments by default") @@ -1061,7 +1061,7 @@ mod tests { .expect("metrics are expected to be exported."); assert!(!resource_metrics.is_empty()); let metric = &resource_metrics[0].scope_metrics[0].metrics[0]; - assert_eq!(metric.name, "my_counter",); + assert_eq!(metric.instrument.name, "my_counter",); let MetricData::Sum(sum) = u64::extract_metrics_data_ref(&metric.data) .expect("Sum aggregation expected for Counter instruments by default") @@ -3029,9 +3029,9 @@ mod tests { assert!(!resource_metric.scope_metrics[0].metrics.is_empty()); let metric = &resource_metric.scope_metrics[0].metrics[0]; - assert_eq!(metric.name, counter_name); + assert_eq!(metric.instrument.name, counter_name); if let Some(expected_unit) = unit_name { - assert_eq!(metric.unit, expected_unit); + assert_eq!(metric.instrument.unit, expected_unit); } T::extract_metrics_data_ref(&metric.data) @@ -3073,10 +3073,10 @@ mod tests { assert!(!resource_metric.scope_metrics[0].metrics.is_empty()); let metric = &resource_metric.scope_metrics[0].metrics[0]; - assert_eq!(metric.name, counter_name); + assert_eq!(metric.instrument.name, counter_name); if let Some(expected_unit) = unit_name { - assert_eq!(metric.unit, expected_unit); + assert_eq!(metric.instrument.unit, expected_unit); } let aggregation = T::extract_metrics_data_ref(&metric.data) diff --git a/opentelemetry-sdk/src/metrics/pipeline.rs b/opentelemetry-sdk/src/metrics/pipeline.rs index e726399d1a..6e522a971a 100644 --- a/opentelemetry-sdk/src/metrics/pipeline.rs +++ b/opentelemetry-sdk/src/metrics/pipeline.rs @@ -11,12 +11,11 @@ use crate::{ error::{OTelSdkError, OTelSdkResult}, metrics::{ aggregation, - data::Metric, error::{MetricError, MetricResult}, instrument::{Instrument, InstrumentId, InstrumentKind, Stream}, internal::{self, AggregateBuilder, Number}, - reader::{MetricReader, ScopeMetricsData, SdkProducer}, - view::View, + reader::{MetricReader, MetricsData, ScopeMetricsData, SdkProducer}, + view::View, InstrumentInfo, }, Resource, }; @@ -138,10 +137,8 @@ impl SdkProducer for Pipeline { let mut m = sm.metrics.get_mut(j); match (inst.comp_agg.call(m.as_mut().map(|m| &mut m.data)), m) { // No metric to re-use, expect agg to create new metric data - ((len, Some(initial_agg)), None) if len > 0 => sm.metrics.push(Metric { - name: inst.name.clone(), - description: inst.description.clone(), - unit: inst.unit.clone(), + ((len, Some(initial_agg)), None) if len > 0 => sm.metrics.push(MetricsData { + instrument: inst.info.clone(), data: initial_agg, }), // Existing metric can be re-used, update its values @@ -150,9 +147,7 @@ impl SdkProducer for Pipeline { // previous aggregation was of a different type prev_agg.data = data; } - prev_agg.name.clone_from(&inst.name); - prev_agg.description.clone_from(&inst.description); - prev_agg.unit.clone_from(&inst.unit); + prev_agg.instrument.clone_from(&inst.info); } _ => continue, } @@ -175,18 +170,16 @@ impl SdkProducer for Pipeline { /// A synchronization point between a [Pipeline] and an instrument's aggregate function. struct InstrumentSync { - name: Cow<'static, str>, - description: Cow<'static, str>, - unit: Cow<'static, str>, + info: InstrumentInfo, comp_agg: Arc, } impl fmt::Debug for InstrumentSync { fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { f.debug_struct("InstrumentSync") - .field("name", &self.name) - .field("description", &self.description) - .field("unit", &self.unit) + .field("name", &self.info.name) + .field("description", &self.info.description) + .field("unit", &self.info.unit) .finish() } } @@ -410,9 +403,12 @@ where self.pipeline.add_sync( scope.clone(), InstrumentSync { - name: stream.name, - description: stream.description, - unit: stream.unit, + info: InstrumentInfo { + name: stream.name, + description: stream.description, + unit: stream.unit, + kind, + }, comp_agg: collect, }, ); diff --git a/opentelemetry-sdk/src/metrics/reader.rs b/opentelemetry-sdk/src/metrics/reader.rs index 4d4e0eb0ad..196c82429e 100644 --- a/opentelemetry-sdk/src/metrics/reader.rs +++ b/opentelemetry-sdk/src/metrics/reader.rs @@ -6,7 +6,8 @@ use crate::Resource; use std::time::Duration; use std::{fmt, sync::Weak}; -use super::data::Metric; +use super::data::AggregatedMetrics; +use super::InstrumentInfo; use super::{pipeline::Pipeline, InstrumentKind, Temporality}; /// A collection of [ScopeMetricsData] and the associated [Resource] that created them. @@ -24,7 +25,18 @@ pub struct ScopeMetricsData { /// The [InstrumentationScope] that the meter was created with. pub scope: InstrumentationScope, /// The list of aggregations created by the meter. - pub metrics: Vec, + pub metrics: Vec, +} + +/// A collection of one or more aggregated time series from an [Instrument]. +/// +/// [Instrument]: crate::metrics::Instrument +#[derive(Debug)] +pub struct MetricsData { + /// The name of the instrument that created this data. + pub instrument: InstrumentInfo, + /// The aggregated data from an instrument. + pub data: AggregatedMetrics, } /// The interface used between the SDK and an exporter. diff --git a/opentelemetry-stdout/src/metrics/exporter.rs b/opentelemetry-stdout/src/metrics/exporter.rs index 414162c1ec..fdfd782cf7 100644 --- a/opentelemetry-stdout/src/metrics/exporter.rs +++ b/opentelemetry-stdout/src/metrics/exporter.rs @@ -111,9 +111,9 @@ fn print_metrics(mut metrics: MetricsLendingIter<'_>) { iter += 1; println!("Metric #{}", iter); - println!("\t\tName : {}", &metric.name); - println!("\t\tDescription : {}", &metric.description); - println!("\t\tUnit : {}", &metric.unit); + println!("\t\tName : {}", &metric.instrument.name); + println!("\t\tDescription : {}", &metric.instrument.description); + println!("\t\tUnit : {}", &metric.instrument.unit); fn print_info(data: &MetricData) where From fc6205d3b83b007a3fe9aa6810060311ce183c9a Mon Sep 17 00:00:00 2001 From: Mindaugas Vinkelis Date: Sat, 12 Apr 2025 00:39:33 +0300 Subject: [PATCH 4/4] Collect and export with zero allocations and clones --- opentelemetry-proto/src/transform/metrics.rs | 4 +- opentelemetry-sdk/benches/metric.rs | 6 +- opentelemetry-sdk/src/metrics/exporter.rs | 79 ++++++++++++------- .../src/metrics/in_memory_exporter.rs | 30 ++++--- opentelemetry-sdk/src/metrics/mod.rs | 3 +- .../src/metrics/periodic_reader.rs | 66 +++++++++------- .../periodic_reader_with_async_runtime.rs | 78 ++++++++---------- opentelemetry-sdk/src/metrics/pipeline.rs | 17 ++-- opentelemetry-stdout/src/metrics/exporter.rs | 4 +- 9 files changed, 156 insertions(+), 131 deletions(-) diff --git a/opentelemetry-proto/src/transform/metrics.rs b/opentelemetry-proto/src/transform/metrics.rs index 4930fdfb8b..63570aface 100644 --- a/opentelemetry-proto/src/transform/metrics.rs +++ b/opentelemetry-proto/src/transform/metrics.rs @@ -113,7 +113,7 @@ pub mod tonic { impl From> for ExportMetricsServiceRequest { fn from(mut rm: ResourceMetrics<'_>) -> Self { let mut scope_metrics = Vec::new(); - while let Some(scope_metric) = rm.scope_metrics.next() { + while let Some(scope_metric) = rm.scope_metrics.next_scope_metric() { scope_metrics.push(scope_metric.into()); } ExportMetricsServiceRequest { @@ -138,7 +138,7 @@ pub mod tonic { impl From> for TonicScopeMetrics { fn from(mut sm: ScopeMetrics<'_>) -> Self { let mut metrics = Vec::new(); - while let Some(metric) = sm.metrics.next() { + while let Some(metric) = sm.metrics.next_metric() { metrics.push(metric.into()); } TonicScopeMetrics { diff --git a/opentelemetry-sdk/benches/metric.rs b/opentelemetry-sdk/benches/metric.rs index d3fe3b8c37..4d50cb0ab7 100644 --- a/opentelemetry-sdk/benches/metric.rs +++ b/opentelemetry-sdk/benches/metric.rs @@ -6,8 +6,10 @@ use opentelemetry::{ use opentelemetry_sdk::{ error::OTelSdkResult, metrics::{ - data::ResourceMetricsData, new_view, reader::MetricReader, Aggregation, Instrument, - InstrumentKind, ManualReader, Pipeline, SdkMeterProvider, Stream, Temporality, View, + new_view, + reader::{MetricReader, ResourceMetricsData}, + Aggregation, Instrument, InstrumentKind, ManualReader, Pipeline, SdkMeterProvider, Stream, + Temporality, View, }, Resource, }; diff --git a/opentelemetry-sdk/src/metrics/exporter.rs b/opentelemetry-sdk/src/metrics/exporter.rs index 2a7e4b1f97..fcd02dff74 100644 --- a/opentelemetry-sdk/src/metrics/exporter.rs +++ b/opentelemetry-sdk/src/metrics/exporter.rs @@ -3,11 +3,15 @@ use opentelemetry::InstrumentationScope; use crate::{error::OTelSdkResult, Resource}; -use std::{fmt::Debug, slice::Iter, time::Duration}; +use std::{ + fmt::Debug, + slice::Iter, + time::{Duration, SystemTime}, +}; use super::{ - data::AggregatedMetrics, - reader::{MetricsData, ResourceMetricsData, ScopeMetricsData}, + data::{AggregatedMetrics, Sum}, + pipeline::InstrumentSync, InstrumentInfo, Temporality, }; @@ -23,7 +27,7 @@ pub struct ResourceMetrics<'a> { /// Iterator over libraries instrumentation scopes ([`InstrumentationScope`]) together with metrics. /// Doesn't implement standard [`Iterator`], because it returns borrowed items. AKA "LendingIterator". pub struct ScopeMetricsLendingIter<'a> { - iter: Iter<'a, ScopeMetricsData>, + iter: std::collections::hash_map::Iter<'a, InstrumentationScope, Vec>, } /// A collection of metrics produced by a [`InstrumentationScope`] meter. @@ -38,7 +42,9 @@ pub struct ScopeMetrics<'a> { /// Iterator over aggregations created by the meter. /// Doesn't implement standard [`Iterator`], because it returns borrowed items. AKA "LendingIterator". pub struct MetricsLendingIter<'a> { - iter: Iter<'a, MetricsData>, + // for optimization purposes + aggr: AggregatedMetrics, + iter: Iter<'a, InstrumentSync>, } /// A collection of one or more aggregated time series from an [Instrument]. @@ -53,23 +59,13 @@ pub struct Metric<'a> { } impl<'a> ResourceMetrics<'a> { - pub(crate) fn new(rm: &'a ResourceMetricsData) -> Self { - Self { - resource: &rm.resource, - scope_metrics: ScopeMetricsLendingIter { - iter: rm.scope_metrics.iter(), - }, - } - } -} - -impl<'a> ScopeMetrics<'a> { - fn new(sm: &'a ScopeMetricsData) -> Self { + pub(crate) fn new( + resource: &'a Resource, + iter: std::collections::hash_map::Iter<'a, InstrumentationScope, Vec>, + ) -> Self { Self { - scope: &sm.scope, - metrics: MetricsLendingIter { - iter: sm.metrics.iter(), - }, + resource, + scope_metrics: ScopeMetricsLendingIter { iter }, } } } @@ -81,19 +77,42 @@ impl Debug for ScopeMetricsLendingIter<'_> { } impl ScopeMetricsLendingIter<'_> { - /// Advances the iterator and returns the next value. - pub fn next(&mut self) -> Option> { - self.iter.next().map(ScopeMetrics::new) + /// Advances the iterator and returns the next value. + pub fn next_scope_metric(&mut self) -> Option> { + self.iter.next().map(|(scope, instruments)| ScopeMetrics { + scope, + metrics: MetricsLendingIter { + // doesn't matter what we initialize this with, + // it's purpose is to be reused between collections + aggr: AggregatedMetrics::F64(super::data::MetricData::Sum(Sum { + is_monotonic: true, + data_points: Vec::new(), + start_time: SystemTime::now(), + time: SystemTime::now(), + temporality: Temporality::Cumulative, + })), + iter: instruments.iter(), + }, + }) } } impl MetricsLendingIter<'_> { - /// Advances the iterator and returns the next value. - pub fn next(&mut self) -> Option> { - self.iter.next().map(|metric| Metric { - instrument: &metric.instrument, - data: &metric.data, - }) + /// Advances the iterator and returns the next value. + pub fn next_metric(&mut self) -> Option> { + loop { + let inst = self.iter.next()?; + let (len, data) = inst.comp_agg.call(Some(&mut self.aggr)); + if len > 0 { + if let Some(new_aggr) = data { + self.aggr = new_aggr; + } + return Some(Metric { + instrument: &inst.info, + data: &self.aggr, + }); + } + } } } diff --git a/opentelemetry-sdk/src/metrics/in_memory_exporter.rs b/opentelemetry-sdk/src/metrics/in_memory_exporter.rs index 77b6f7d193..517ddaf4fe 100644 --- a/opentelemetry-sdk/src/metrics/in_memory_exporter.rs +++ b/opentelemetry-sdk/src/metrics/in_memory_exporter.rs @@ -194,24 +194,30 @@ impl InMemoryMetricExporter { .map(|mut metrics_guard| metrics_guard.clear()); } - fn clone_metrics(mut metric: ResourceMetrics<'_>) -> ResourceMetricsData { + fn clone_metrics(mut metric: ResourceMetrics<'_>) -> Option { let mut scope_metrics = Vec::new(); - while let Some(mut scope_metric) = metric.scope_metrics.next() { + while let Some(mut scope_metric) = metric.scope_metrics.next_scope_metric() { let mut metrics = Vec::new(); - while let Some(metric) = scope_metric.metrics.next() { + while let Some(metric) = scope_metric.metrics.next_metric() { metrics.push(MetricsData { instrument: metric.instrument.clone(), - data: Self::clone_data(&metric.data), + data: Self::clone_data(metric.data), + }); + } + if !metrics.is_empty() { + scope_metrics.push(ScopeMetricsData { + scope: scope_metric.scope.clone(), + metrics, }); } - scope_metrics.push(ScopeMetricsData { - scope: scope_metric.scope.clone(), - metrics, - }); } - ResourceMetricsData { - resource: metric.resource.clone(), - scope_metrics, + if !scope_metrics.is_empty() { + Some(ResourceMetricsData { + resource: metric.resource.clone(), + scope_metrics, + }) + } else { + None } } @@ -261,7 +267,7 @@ impl PushMetricExporter for InMemoryMetricExporter { self.metrics .lock() .map(|mut metrics_guard| { - metrics_guard.push_back(InMemoryMetricExporter::clone_metrics(metrics)) + metrics_guard.extend(InMemoryMetricExporter::clone_metrics(metrics)) }) .map_err(|_| OTelSdkError::InternalFailure("Failed to lock metrics".to_string())) } diff --git a/opentelemetry-sdk/src/metrics/mod.rs b/opentelemetry-sdk/src/metrics/mod.rs index e415e11e72..78d16e70e9 100644 --- a/opentelemetry-sdk/src/metrics/mod.rs +++ b/opentelemetry-sdk/src/metrics/mod.rs @@ -82,8 +82,7 @@ pub use periodic_reader::*; #[cfg(feature = "experimental_metrics_custom_reader")] pub use pipeline::Pipeline; -#[cfg(feature = "experimental_metrics_custom_reader")] -pub use instrument::InstrumentKind; +pub use instrument::{InstrumentInfo, InstrumentKind}; #[cfg(feature = "spec_unstable_metrics_views")] pub use instrument::*; diff --git a/opentelemetry-sdk/src/metrics/periodic_reader.rs b/opentelemetry-sdk/src/metrics/periodic_reader.rs index be6a8a6951..5350d437ec 100644 --- a/opentelemetry-sdk/src/metrics/periodic_reader.rs +++ b/opentelemetry-sdk/src/metrics/periodic_reader.rs @@ -16,7 +16,6 @@ use crate::{ exporter::{PushMetricExporter, ResourceMetrics}, reader::SdkProducer, }, - Resource, }; use super::{ @@ -350,11 +349,11 @@ impl fmt::Debug for PeriodicReader { struct PeriodicReaderInner { exporter: Arc, message_sender: mpsc::Sender, - producer: Mutex>>, + producer: Mutex>>, } impl PeriodicReaderInner { - fn register_pipeline(&self, producer: Weak) { + fn register_pipeline(&self, producer: Weak) { let mut inner = self.producer.lock().expect("lock poisoned"); *inner = Some(producer); } @@ -384,39 +383,48 @@ impl PeriodicReaderInner { } fn collect_and_export(&self) -> OTelSdkResult { - // TODO: Reuse the internal vectors. Or refactor to avoid needing any - // owned data structures to be passed to exporters. - let mut rm = ResourceMetricsData { - resource: Resource::empty(), - scope_metrics: Vec::new(), + let producer = self.producer.lock().expect("lock poisoned"); + let pipeline = if let Some(p) = producer.as_ref() { + p.upgrade().ok_or(OTelSdkError::AlreadyShutdown)? + } else { + otel_warn!( + name: "PeriodReader.MeterProviderNotRegistered", + message = "PeriodicReader is not registered with MeterProvider. Metrics will not be collected. \ + This occurs when a periodic reader is created but not associated with a MeterProvider \ + by calling `.with_reader(reader)` on MeterProviderBuilder." + ); + return Err(OTelSdkError::InternalFailure( + "MeterProvider is not registered".into(), + )); }; - + drop(producer); + let Ok(inner) = pipeline.inner.lock() else { + otel_warn!( + name: "PeriodReader.PipelineLockPoisoned", + message = "Failed to acquire lock for collect and export" + ); + return Err(OTelSdkError::InternalFailure( + "Paniced while holding a pipeline lock".into(), + )); + }; + for cb in &inner.callbacks { + cb(); + } let current_time = Instant::now(); - let collect_result = self.collect(&mut rm); - let time_taken_for_collect = current_time.elapsed(); + // Relying on futures executor to execute async call. + let res = futures_executor::block_on(self.exporter.export(ResourceMetrics::new( + &pipeline.resource, + inner.aggregations.iter(), + ))); + otel_debug!(name: "PeriodicReaderMetricsCollected", time_taken_in_millis = current_time.elapsed().as_millis()); - #[allow(clippy::question_mark)] - if let Err(e) = collect_result { + if let Err(err) = &res { otel_warn!( name: "PeriodReaderCollectError", - error = format!("{:?}", e) + error = format!("{:?}", err) ); - return Err(OTelSdkError::InternalFailure(e.to_string())); } - - if rm.scope_metrics.is_empty() { - otel_debug!(name: "NoMetricsCollected"); - return Ok(()); - } - - let metrics_count = rm.scope_metrics.iter().fold(0, |count, scope_metrics| { - count + scope_metrics.metrics.len() - }); - otel_debug!(name: "PeriodicReaderMetricsCollected", count = metrics_count, time_taken_in_millis = time_taken_for_collect.as_millis()); - - // Relying on futures executor to execute async call. - // TODO: Pass timeout to exporter - futures_executor::block_on(self.exporter.export(ResourceMetrics::new(&rm))) + res } fn force_flush(&self) -> OTelSdkResult { diff --git a/opentelemetry-sdk/src/metrics/periodic_reader_with_async_runtime.rs b/opentelemetry-sdk/src/metrics/periodic_reader_with_async_runtime.rs index e8c53f9981..8914ff0aab 100644 --- a/opentelemetry-sdk/src/metrics/periodic_reader_with_async_runtime.rs +++ b/opentelemetry-sdk/src/metrics/periodic_reader_with_async_runtime.rs @@ -6,8 +6,6 @@ use std::{ use futures_channel::{mpsc, oneshot}; use futures_util::{ - future::{self, Either}, - pin_mut, stream::{self, FusedStream}, StreamExt, }; @@ -19,7 +17,6 @@ use crate::runtime::{to_interval_stream, Runtime}; use crate::{ error::{OTelSdkError, OTelSdkResult}, metrics::{exporter::PushMetricExporter, reader::SdkProducer}, - Resource, }; use super::{instrument::InstrumentKind, pipeline::Pipeline, reader::MetricReader}; @@ -116,17 +113,7 @@ where .skip(1) // The ticker is fired immediately, so we should skip the first one to align with the interval. .map(|_| Message::Export); let messages = Box::pin(stream::select(message_receiver, ticker)); - PeriodicReaderWorker { - reader, - timeout: self.timeout, - runtime, - rm: ResourceMetricsData { - resource: Resource::empty(), - scope_metrics: Vec::new(), - }, - } - .run(messages) - .await + PeriodicReaderWorker { reader }.run(messages).await }); }; @@ -229,47 +216,50 @@ enum Message { } enum ProducerOrWorker { - Producer(Weak), + Producer(Weak), #[allow(clippy::type_complexity)] Worker(Box) + Send + Sync>), } -struct PeriodicReaderWorker { +struct PeriodicReaderWorker { reader: PeriodicReader, - timeout: Duration, - runtime: RT, - rm: ResourceMetricsData, } -impl PeriodicReaderWorker { +impl PeriodicReaderWorker { async fn collect_and_export(&mut self) -> OTelSdkResult { - self.reader - .collect(&mut self.rm) - .map_err(|e| OTelSdkError::InternalFailure(e.to_string()))?; - if self.rm.scope_metrics.is_empty() { - otel_debug!( - name: "PeriodicReaderWorker.NoMetricsToExport", - ); - // No metrics to export. - return Ok(()); + let inner = self + .reader + .inner + .lock() + .map_err(|_| OTelSdkError::InternalFailure("Failed to lock pipeline".into()))?; + + if inner.is_shutdown { + return Err(OTelSdkError::AlreadyShutdown); } - otel_debug!( - name: "PeriodicReaderWorker.InvokeExporter", - message = "Calling exporter's export method with collected metrics.", - count = self.rm.scope_metrics.len(), - ); - let export = self.reader.exporter.export(ResourceMetrics::new(&self.rm)); - let timeout = self.runtime.delay(self.timeout); - pin_mut!(export); - pin_mut!(timeout); - - match future::select(export, timeout).await { - Either::Left((res, _)) => { - res // return the status of export. - } - Either::Right(_) => Err(OTelSdkError::Timeout(self.timeout)), + let producer = match &inner.sdk_producer_or_worker { + ProducerOrWorker::Producer(sdk_producer) => sdk_producer.upgrade(), + ProducerOrWorker::Worker(_) => None, + } + .ok_or(OTelSdkError::InternalFailure( + "reader is not registered".into(), + ))?; + drop(inner); + + let Ok(producer_inner) = producer.inner.lock() else { + return Err(OTelSdkError::InternalFailure( + "Paniced while holding a pipeline lock".into(), + )); + }; + for cb in &producer_inner.callbacks { + cb(); } + // unfortunatelly we need to block here, because runtime require future to be "Send", + // but we hold a lock for PipelineInner. + futures_executor::block_on(self.reader.exporter.export(ResourceMetrics::new( + &producer.resource, + producer_inner.aggregations.iter(), + ))) } async fn process_message(&mut self, message: Message) -> bool { diff --git a/opentelemetry-sdk/src/metrics/pipeline.rs b/opentelemetry-sdk/src/metrics/pipeline.rs index 6e522a971a..7c9446b692 100644 --- a/opentelemetry-sdk/src/metrics/pipeline.rs +++ b/opentelemetry-sdk/src/metrics/pipeline.rs @@ -15,7 +15,8 @@ use crate::{ instrument::{Instrument, InstrumentId, InstrumentKind, Stream}, internal::{self, AggregateBuilder, Number}, reader::{MetricReader, MetricsData, ScopeMetricsData, SdkProducer}, - view::View, InstrumentInfo, + view::View, + InstrumentInfo, }, Resource, }; @@ -37,7 +38,7 @@ pub struct Pipeline { pub(crate) resource: Resource, reader: Box, views: Vec>, - inner: Mutex, + pub(crate) inner: Mutex, } impl fmt::Debug for Pipeline { @@ -52,9 +53,9 @@ type GenericCallback = Arc; const DEFAULT_CARDINALITY_LIMIT: usize = 2000; #[derive(Default)] -struct PipelineInner { - aggregations: HashMap>, - callbacks: Vec, +pub(crate) struct PipelineInner { + pub(crate) aggregations: HashMap>, + pub(crate) callbacks: Vec, } impl fmt::Debug for PipelineInner { @@ -169,9 +170,9 @@ impl SdkProducer for Pipeline { } /// A synchronization point between a [Pipeline] and an instrument's aggregate function. -struct InstrumentSync { - info: InstrumentInfo, - comp_agg: Arc, +pub(crate) struct InstrumentSync { + pub(crate) info: InstrumentInfo, + pub(crate) comp_agg: Arc, } impl fmt::Debug for InstrumentSync { diff --git a/opentelemetry-stdout/src/metrics/exporter.rs b/opentelemetry-stdout/src/metrics/exporter.rs index fdfd782cf7..03547d1b97 100644 --- a/opentelemetry-stdout/src/metrics/exporter.rs +++ b/opentelemetry-stdout/src/metrics/exporter.rs @@ -81,7 +81,7 @@ impl PushMetricExporter for MetricExporter { fn print_scope_metrics(mut metrics: ScopeMetricsLendingIter<'_>) { let mut iter = 0; - while let Some(scope_metric) = metrics.next() { + while let Some(scope_metric) = metrics.next_scope_metric() { iter += 1; println!("\tInstrumentation Scope #{}", iter); println!("\t\tName : {}", &scope_metric.scope.name()); @@ -107,7 +107,7 @@ fn print_scope_metrics(mut metrics: ScopeMetricsLendingIter<'_>) { fn print_metrics(mut metrics: MetricsLendingIter<'_>) { let mut iter = 0; - while let Some(metric) = metrics.next() { + while let Some(metric) = metrics.next_metric() { iter += 1; println!("Metric #{}", iter);