From 7bf01c2a6e1d8ad72c11d6e313b8eb6452b5422f Mon Sep 17 00:00:00 2001 From: Tolya Korniltsev Date: Thu, 17 Jul 2025 22:02:04 +0700 Subject: [PATCH 1/3] feat: generate protos for profiles collector --- opentelemetry-proto/src/proto.rs | 9 +- ....proto.collector.profiles.v1development.rs | 375 ++++++++++++++++++ opentelemetry-proto/tests/grpc_build.rs | 1 + 3 files changed, 384 insertions(+), 1 deletion(-) create mode 100644 opentelemetry-proto/src/proto/tonic/opentelemetry.proto.collector.profiles.v1development.rs diff --git a/opentelemetry-proto/src/proto.rs b/opentelemetry-proto/src/proto.rs index 3055f02a40..71bbb74420 100644 --- a/opentelemetry-proto/src/proto.rs +++ b/opentelemetry-proto/src/proto.rs @@ -242,6 +242,13 @@ pub mod tonic { #[path = "opentelemetry.proto.collector.trace.v1.rs"] pub mod v1; } + + #[cfg(feature = "profiles")] + #[path = ""] + pub mod profiles { + #[path = "opentelemetry.proto.collector.profiles.v1development.rs"] + pub mod v1development; + } } /// Common types used across all signals @@ -295,7 +302,7 @@ pub mod tonic { #[path = ""] pub mod profiles { #[path = "opentelemetry.proto.profiles.v1development.rs"] - pub mod v1; + pub mod v1development; } pub use crate::transform::common::tonic::Attributes; diff --git a/opentelemetry-proto/src/proto/tonic/opentelemetry.proto.collector.profiles.v1development.rs b/opentelemetry-proto/src/proto/tonic/opentelemetry.proto.collector.profiles.v1development.rs new file mode 100644 index 0000000000..8a31c51d0f --- /dev/null +++ b/opentelemetry-proto/src/proto/tonic/opentelemetry.proto.collector.profiles.v1development.rs @@ -0,0 +1,375 @@ +// This file is @generated by prost-build. +#[cfg_attr(feature = "with-schemars", derive(schemars::JsonSchema))] +#[cfg_attr(feature = "with-serde", derive(serde::Serialize, serde::Deserialize))] +#[cfg_attr(feature = "with-serde", serde(rename_all = "camelCase"))] +#[derive(Clone, PartialEq, ::prost::Message)] +pub struct ExportProfilesServiceRequest { + /// An array of ResourceProfiles. + /// For data coming from a single resource this array will typically contain one + /// element. Intermediary nodes (such as OpenTelemetry Collector) that receive + /// data from multiple origins typically batch the data before forwarding further and + /// in that case this array will contain multiple elements. + #[prost(message, repeated, tag = "1")] + pub resource_profiles: ::prost::alloc::vec::Vec< + super::super::super::profiles::v1development::ResourceProfiles, + >, + /// The reference table containing all data shared by profiles across the message being sent. + #[prost(message, optional, tag = "2")] + pub dictionary: ::core::option::Option< + super::super::super::profiles::v1development::ProfilesDictionary, + >, +} +#[cfg_attr(feature = "with-schemars", derive(schemars::JsonSchema))] +#[cfg_attr(feature = "with-serde", derive(serde::Serialize, serde::Deserialize))] +#[cfg_attr(feature = "with-serde", serde(rename_all = "camelCase"))] +#[derive(Clone, PartialEq, ::prost::Message)] +pub struct ExportProfilesServiceResponse { + /// The details of a partially successful export request. + /// + /// If the request is only partially accepted + /// (i.e. when the server accepts only parts of the data and rejects the rest) + /// the server MUST initialize the `partial_success` field and MUST + /// set the `rejected_` with the number of items it rejected. + /// + /// Servers MAY also make use of the `partial_success` field to convey + /// warnings/suggestions to senders even when the request was fully accepted. + /// In such cases, the `rejected_` MUST have a value of `0` and + /// the `error_message` MUST be non-empty. + /// + /// A `partial_success` message with an empty value (rejected_ = 0 and + /// `error_message` = "") is equivalent to it not being set/present. Senders + /// SHOULD interpret it the same way as in the full success case. + #[prost(message, optional, tag = "1")] + pub partial_success: ::core::option::Option, +} +#[cfg_attr(feature = "with-schemars", derive(schemars::JsonSchema))] +#[cfg_attr(feature = "with-serde", derive(serde::Serialize, serde::Deserialize))] +#[cfg_attr(feature = "with-serde", serde(rename_all = "camelCase"))] +#[derive(Clone, PartialEq, ::prost::Message)] +pub struct ExportProfilesPartialSuccess { + /// The number of rejected profiles. + /// + /// A `rejected_` field holding a `0` value indicates that the + /// request was fully accepted. + #[prost(int64, tag = "1")] + pub rejected_profiles: i64, + /// A developer-facing human-readable message in English. It should be used + /// either to explain why the server rejected parts of the data during a partial + /// success or to convey warnings/suggestions during a full success. The message + /// should offer guidance on how users can address such issues. + /// + /// error_message is an optional field. An error_message with an empty value + /// is equivalent to it not being set. + #[prost(string, tag = "2")] + pub error_message: ::prost::alloc::string::String, +} +/// Generated client implementations. +#[cfg(feature = "gen-tonic")] +pub mod profiles_service_client { + #![allow( + unused_variables, + dead_code, + missing_docs, + clippy::wildcard_imports, + clippy::let_unit_value, + )] + use tonic::codegen::*; + use tonic::codegen::http::Uri; + /// Service that can be used to push profiles between one Application instrumented with + /// OpenTelemetry and a collector, or between a collector and a central collector. + #[derive(Debug, Clone)] + pub struct ProfilesServiceClient { + inner: tonic::client::Grpc, + } + impl ProfilesServiceClient { + /// Attempt to create a new client by connecting to a given endpoint. + pub async fn connect(dst: D) -> Result + where + D: TryInto, + D::Error: Into, + { + let conn = tonic::transport::Endpoint::new(dst)?.connect().await?; + Ok(Self::new(conn)) + } + } + impl ProfilesServiceClient + where + T: tonic::client::GrpcService, + T::Error: Into, + T::ResponseBody: Body + std::marker::Send + 'static, + ::Error: Into + std::marker::Send, + { + pub fn new(inner: T) -> Self { + let inner = tonic::client::Grpc::new(inner); + Self { inner } + } + pub fn with_origin(inner: T, origin: Uri) -> Self { + let inner = tonic::client::Grpc::with_origin(inner, origin); + Self { inner } + } + pub fn with_interceptor( + inner: T, + interceptor: F, + ) -> ProfilesServiceClient> + where + F: tonic::service::Interceptor, + T::ResponseBody: Default, + T: tonic::codegen::Service< + http::Request, + Response = http::Response< + >::ResponseBody, + >, + >, + , + >>::Error: Into + std::marker::Send + std::marker::Sync, + { + ProfilesServiceClient::new(InterceptedService::new(inner, interceptor)) + } + /// Compress requests with the given encoding. + /// + /// This requires the server to support it otherwise it might respond with an + /// error. + #[must_use] + pub fn send_compressed(mut self, encoding: CompressionEncoding) -> Self { + self.inner = self.inner.send_compressed(encoding); + self + } + /// Enable decompressing responses. + #[must_use] + pub fn accept_compressed(mut self, encoding: CompressionEncoding) -> Self { + self.inner = self.inner.accept_compressed(encoding); + self + } + /// Limits the maximum size of a decoded message. + /// + /// Default: `4MB` + #[must_use] + pub fn max_decoding_message_size(mut self, limit: usize) -> Self { + self.inner = self.inner.max_decoding_message_size(limit); + self + } + /// Limits the maximum size of an encoded message. + /// + /// Default: `usize::MAX` + #[must_use] + pub fn max_encoding_message_size(mut self, limit: usize) -> Self { + self.inner = self.inner.max_encoding_message_size(limit); + self + } + pub async fn export( + &mut self, + request: impl tonic::IntoRequest, + ) -> std::result::Result< + tonic::Response, + tonic::Status, + > { + self.inner + .ready() + .await + .map_err(|e| { + tonic::Status::unknown( + format!("Service was not ready: {}", e.into()), + ) + })?; + let codec = tonic::codec::ProstCodec::default(); + let path = http::uri::PathAndQuery::from_static( + "/opentelemetry.proto.collector.profiles.v1development.ProfilesService/Export", + ); + let mut req = request.into_request(); + req.extensions_mut() + .insert( + GrpcMethod::new( + "opentelemetry.proto.collector.profiles.v1development.ProfilesService", + "Export", + ), + ); + self.inner.unary(req, path, codec).await + } + } +} +/// Generated server implementations. +#[cfg(feature = "gen-tonic")] +pub mod profiles_service_server { + #![allow( + unused_variables, + dead_code, + missing_docs, + clippy::wildcard_imports, + clippy::let_unit_value, + )] + use tonic::codegen::*; + /// Generated trait containing gRPC methods that should be implemented for use with ProfilesServiceServer. + #[async_trait] + pub trait ProfilesService: std::marker::Send + std::marker::Sync + 'static { + async fn export( + &self, + request: tonic::Request, + ) -> std::result::Result< + tonic::Response, + tonic::Status, + >; + } + /// Service that can be used to push profiles between one Application instrumented with + /// OpenTelemetry and a collector, or between a collector and a central collector. + #[derive(Debug)] + pub struct ProfilesServiceServer { + inner: Arc, + accept_compression_encodings: EnabledCompressionEncodings, + send_compression_encodings: EnabledCompressionEncodings, + max_decoding_message_size: Option, + max_encoding_message_size: Option, + } + impl ProfilesServiceServer { + pub fn new(inner: T) -> Self { + Self::from_arc(Arc::new(inner)) + } + pub fn from_arc(inner: Arc) -> Self { + Self { + inner, + accept_compression_encodings: Default::default(), + send_compression_encodings: Default::default(), + max_decoding_message_size: None, + max_encoding_message_size: None, + } + } + pub fn with_interceptor( + inner: T, + interceptor: F, + ) -> InterceptedService + where + F: tonic::service::Interceptor, + { + InterceptedService::new(Self::new(inner), interceptor) + } + /// Enable decompressing requests with the given encoding. + #[must_use] + pub fn accept_compressed(mut self, encoding: CompressionEncoding) -> Self { + self.accept_compression_encodings.enable(encoding); + self + } + /// Compress responses with the given encoding, if the client supports it. + #[must_use] + pub fn send_compressed(mut self, encoding: CompressionEncoding) -> Self { + self.send_compression_encodings.enable(encoding); + self + } + /// Limits the maximum size of a decoded message. + /// + /// Default: `4MB` + #[must_use] + pub fn max_decoding_message_size(mut self, limit: usize) -> Self { + self.max_decoding_message_size = Some(limit); + self + } + /// Limits the maximum size of an encoded message. + /// + /// Default: `usize::MAX` + #[must_use] + pub fn max_encoding_message_size(mut self, limit: usize) -> Self { + self.max_encoding_message_size = Some(limit); + self + } + } + impl tonic::codegen::Service> for ProfilesServiceServer + where + T: ProfilesService, + B: Body + std::marker::Send + 'static, + B::Error: Into + std::marker::Send + 'static, + { + type Response = http::Response; + type Error = std::convert::Infallible; + type Future = BoxFuture; + fn poll_ready( + &mut self, + _cx: &mut Context<'_>, + ) -> Poll> { + Poll::Ready(Ok(())) + } + fn call(&mut self, req: http::Request) -> Self::Future { + match req.uri().path() { + "/opentelemetry.proto.collector.profiles.v1development.ProfilesService/Export" => { + #[allow(non_camel_case_types)] + struct ExportSvc(pub Arc); + impl< + T: ProfilesService, + > tonic::server::UnaryService + for ExportSvc { + type Response = super::ExportProfilesServiceResponse; + type Future = BoxFuture< + tonic::Response, + tonic::Status, + >; + fn call( + &mut self, + request: tonic::Request, + ) -> Self::Future { + let inner = Arc::clone(&self.0); + let fut = async move { + ::export(&inner, request).await + }; + Box::pin(fut) + } + } + let accept_compression_encodings = self.accept_compression_encodings; + let send_compression_encodings = self.send_compression_encodings; + let max_decoding_message_size = self.max_decoding_message_size; + let max_encoding_message_size = self.max_encoding_message_size; + let inner = self.inner.clone(); + let fut = async move { + let method = ExportSvc(inner); + let codec = tonic::codec::ProstCodec::default(); + let mut grpc = tonic::server::Grpc::new(codec) + .apply_compression_config( + accept_compression_encodings, + send_compression_encodings, + ) + .apply_max_message_size_config( + max_decoding_message_size, + max_encoding_message_size, + ); + let res = grpc.unary(method, req).await; + Ok(res) + }; + Box::pin(fut) + } + _ => { + Box::pin(async move { + let mut response = http::Response::new( + tonic::body::Body::default(), + ); + let headers = response.headers_mut(); + headers + .insert( + tonic::Status::GRPC_STATUS, + (tonic::Code::Unimplemented as i32).into(), + ); + headers + .insert( + http::header::CONTENT_TYPE, + tonic::metadata::GRPC_CONTENT_TYPE, + ); + Ok(response) + }) + } + } + } + } + impl Clone for ProfilesServiceServer { + fn clone(&self) -> Self { + let inner = self.inner.clone(); + Self { + inner, + accept_compression_encodings: self.accept_compression_encodings, + send_compression_encodings: self.send_compression_encodings, + max_decoding_message_size: self.max_decoding_message_size, + max_encoding_message_size: self.max_encoding_message_size, + } + } + } + /// Generated gRPC service name + pub const SERVICE_NAME: &str = "opentelemetry.proto.collector.profiles.v1development.ProfilesService"; + impl tonic::server::NamedService for ProfilesServiceServer { + const NAME: &'static str = SERVICE_NAME; + } +} diff --git a/opentelemetry-proto/tests/grpc_build.rs b/opentelemetry-proto/tests/grpc_build.rs index 45d7faf4f9..ed72b5a306 100644 --- a/opentelemetry-proto/tests/grpc_build.rs +++ b/opentelemetry-proto/tests/grpc_build.rs @@ -13,6 +13,7 @@ const TONIC_PROTO_FILES: &[&str] = &[ "src/proto/opentelemetry-proto/opentelemetry/proto/logs/v1/logs.proto", "src/proto/opentelemetry-proto/opentelemetry/proto/collector/logs/v1/logs_service.proto", "src/proto/opentelemetry-proto/opentelemetry/proto/profiles/v1development/profiles.proto", + "src/proto/opentelemetry-proto/opentelemetry/proto/collector/profiles/v1development/profiles_service.proto", "src/proto/tracez.proto", ]; const TONIC_INCLUDES: &[&str] = &["src/proto/opentelemetry-proto", "src/proto"]; From 8edf1d012200744e3038a6eb9c7076d3ad7bbc77 Mon Sep 17 00:00:00 2001 From: Tolya Korniltsev Date: Fri, 18 Jul 2025 16:40:54 +0700 Subject: [PATCH 2/3] changelog --- opentelemetry-proto/CHANGELOG.md | 1 + 1 file changed, 1 insertion(+) diff --git a/opentelemetry-proto/CHANGELOG.md b/opentelemetry-proto/CHANGELOG.md index abaabde44c..c4128a3c71 100644 --- a/opentelemetry-proto/CHANGELOG.md +++ b/opentelemetry-proto/CHANGELOG.md @@ -3,6 +3,7 @@ ## vNext - Update proto definitions to v1.7.0. +- Added Rust generated protos for profiles collector. [#3077](https://github.com/open-telemetry/opentelemetry-rust/pull/3077) ## 0.30.0 From 828a5672f824a71918c0a3434bb8f597d577e144 Mon Sep 17 00:00:00 2001 From: Tolya Korniltsev Date: Tue, 22 Jul 2025 00:11:23 +0700 Subject: [PATCH 3/3] chore: update changelog for breaking package rename in profiles protos --- opentelemetry-proto/CHANGELOG.md | 1 + 1 file changed, 1 insertion(+) diff --git a/opentelemetry-proto/CHANGELOG.md b/opentelemetry-proto/CHANGELOG.md index c4128a3c71..c25374e08c 100644 --- a/opentelemetry-proto/CHANGELOG.md +++ b/opentelemetry-proto/CHANGELOG.md @@ -4,6 +4,7 @@ - Update proto definitions to v1.7.0. - Added Rust generated protos for profiles collector. [#3077](https://github.com/open-telemetry/opentelemetry-rust/pull/3077) +- **Breaking change**: package opentelemetry_proto::tonic::profiles::v1 renamed to opentelemetry_proto::tonic::profiles::v1development. [#3077](https://github.com/open-telemetry/opentelemetry-rust/pull/3077) ## 0.30.0