diff --git a/bottlecap/Cargo.lock b/bottlecap/Cargo.lock index d2b0c04ba..1ff2d4d81 100644 --- a/bottlecap/Cargo.lock +++ b/bottlecap/Cargo.lock @@ -492,14 +492,14 @@ dependencies = [ "base64 0.22.1", "bytes 1.10.1", "chrono", - "datadog-fips 0.1.0 (git+https://github.com/DataDog/serverless-components?rev=c3d8ed4f90591c6958921145d485463860307f78)", + "datadog-fips", "datadog-protos 0.1.0 (git+https://github.com/DataDog/saluki/?rev=c89b58e5784b985819baf11f13f7d35876741222)", "datadog-trace-agent", - "datadog-trace-normalization 19.1.0", - "datadog-trace-obfuscation 19.1.0", - "datadog-trace-protobuf 19.1.0", - "datadog-trace-utils 19.1.0", - "ddcommon 19.1.0", + "datadog-trace-normalization", + "datadog-trace-obfuscation", + "datadog-trace-protobuf", + "datadog-trace-utils", + "ddcommon", "ddsketch-agent 0.1.0 (git+https://github.com/DataDog/saluki/)", "dogstatsd", "figment", @@ -724,18 +724,7 @@ dependencies = [ [[package]] name = "datadog-fips" version = "0.1.0" -source = "git+https://github.com/DataDog/serverless-components?rev=985120329d0ba96c1ec8d719cc38e1f7ce11a092#985120329d0ba96c1ec8d719cc38e1f7ce11a092" -dependencies = [ - "reqwest", - "rustls", - "rustls-native-certs", - "tracing", -] - -[[package]] -name = "datadog-fips" -version = "0.1.0" -source = "git+https://github.com/DataDog/serverless-components?rev=c3d8ed4f90591c6958921145d485463860307f78#c3d8ed4f90591c6958921145d485463860307f78" +source = "git+https://github.com/DataDog/serverless-components?rev=d131de8419c191ce21c91bb30b5915c4d8a2cc5a#d131de8419c191ce21c91bb30b5915c4d8a2cc5a" dependencies = [ "reqwest", "rustls", @@ -772,15 +761,15 @@ dependencies = [ [[package]] name = "datadog-trace-agent" version = "0.1.0" -source = "git+https://github.com/DataDog/serverless-components?rev=c3d8ed4f90591c6958921145d485463860307f78#c3d8ed4f90591c6958921145d485463860307f78" +source = "git+https://github.com/DataDog/serverless-components?rev=d131de8419c191ce21c91bb30b5915c4d8a2cc5a#d131de8419c191ce21c91bb30b5915c4d8a2cc5a" dependencies = [ "anyhow", "async-trait", - "datadog-trace-normalization 17.0.0", - "datadog-trace-obfuscation 17.0.0", - "datadog-trace-protobuf 17.0.0", - "datadog-trace-utils 17.0.0", - "ddcommon 17.0.0", + "datadog-trace-normalization", + "datadog-trace-obfuscation", + "datadog-trace-protobuf", + "datadog-trace-utils", + "ddcommon", "http-body-util", "hyper 1.6.0", "hyper-util", @@ -791,39 +780,13 @@ dependencies = [ "tracing", ] -[[package]] -name = "datadog-trace-normalization" -version = "17.0.0" -source = "git+https://github.com/DataDog/libdatadog?rev=d6a2da32c6b92d6865a7e7987c8a1df2203fb1ae#d6a2da32c6b92d6865a7e7987c8a1df2203fb1ae" -dependencies = [ - "anyhow", - "datadog-trace-protobuf 17.0.0", -] - [[package]] name = "datadog-trace-normalization" version = "19.1.0" source = "git+https://github.com/DataDog/libdatadog?rev=8a49c7df2d9cbf05118bfd5b85772676f71b34f2#8a49c7df2d9cbf05118bfd5b85772676f71b34f2" dependencies = [ "anyhow", - "datadog-trace-protobuf 19.1.0", -] - -[[package]] -name = "datadog-trace-obfuscation" -version = "17.0.0" -source = "git+https://github.com/DataDog/libdatadog?rev=d6a2da32c6b92d6865a7e7987c8a1df2203fb1ae#d6a2da32c6b92d6865a7e7987c8a1df2203fb1ae" -dependencies = [ - "anyhow", - "datadog-trace-protobuf 17.0.0", - "datadog-trace-utils 17.0.0", - "ddcommon 17.0.0", - "log", - "percent-encoding", - "regex", - "serde", - "serde_json", - "url", + "datadog-trace-protobuf", ] [[package]] @@ -832,9 +795,9 @@ version = "19.1.0" source = "git+https://github.com/DataDog/libdatadog?rev=8a49c7df2d9cbf05118bfd5b85772676f71b34f2#8a49c7df2d9cbf05118bfd5b85772676f71b34f2" dependencies = [ "anyhow", - "datadog-trace-protobuf 19.1.0", - "datadog-trace-utils 19.1.0", - "ddcommon 19.1.0", + "datadog-trace-protobuf", + "datadog-trace-utils", + "ddcommon", "log", "percent-encoding", "regex", @@ -843,16 +806,6 @@ dependencies = [ "url", ] -[[package]] -name = "datadog-trace-protobuf" -version = "17.0.0" -source = "git+https://github.com/DataDog/libdatadog?rev=d6a2da32c6b92d6865a7e7987c8a1df2203fb1ae#d6a2da32c6b92d6865a7e7987c8a1df2203fb1ae" -dependencies = [ - "prost", - "serde", - "serde_bytes", -] - [[package]] name = "datadog-trace-protobuf" version = "19.1.0" @@ -863,34 +816,6 @@ dependencies = [ "serde_bytes", ] -[[package]] -name = "datadog-trace-utils" -version = "17.0.0" -source = "git+https://github.com/DataDog/libdatadog?rev=d6a2da32c6b92d6865a7e7987c8a1df2203fb1ae#d6a2da32c6b92d6865a7e7987c8a1df2203fb1ae" -dependencies = [ - "anyhow", - "bytes 1.10.1", - "datadog-trace-normalization 17.0.0", - "datadog-trace-protobuf 17.0.0", - "ddcommon 17.0.0", - "flate2", - "futures 0.3.31", - "http-body-util", - "hyper 1.6.0", - "hyper-http-proxy", - "log", - "prost", - "rand 0.8.5", - "rmp", - "rmp-serde", - "rmpv", - "serde", - "serde_json", - "tinybytes 17.0.0", - "tokio", - "zstd", -] - [[package]] name = "datadog-trace-utils" version = "19.1.0" @@ -898,9 +823,9 @@ source = "git+https://github.com/DataDog/libdatadog?rev=8a49c7df2d9cbf05118bfd5b dependencies = [ "anyhow", "bytes 1.10.1", - "datadog-trace-normalization 19.1.0", - "datadog-trace-protobuf 19.1.0", - "ddcommon 19.1.0", + "datadog-trace-normalization", + "datadog-trace-protobuf", + "ddcommon", "flate2", "futures 0.3.31", "http-body-util", @@ -913,49 +838,12 @@ dependencies = [ "rmpv", "serde", "serde_json", - "tinybytes 19.1.0", + "tinybytes", "tokio", "tracing", "zstd", ] -[[package]] -name = "ddcommon" -version = "17.0.0" -source = "git+https://github.com/DataDog/libdatadog?rev=d6a2da32c6b92d6865a7e7987c8a1df2203fb1ae#d6a2da32c6b92d6865a7e7987c8a1df2203fb1ae" -dependencies = [ - "anyhow", - "cc", - "const_format", - "futures 0.3.31", - "futures-core", - "futures-util", - "hex", - "http 1.3.1", - "http-body 1.0.1", - "http-body-util", - "hyper 1.6.0", - "hyper-rustls", - "hyper-util", - "libc", - "log", - "memfd", - "nix 0.29.0", - "pin-project", - "rand 0.8.5", - "regex", - "rmp", - "rmp-serde", - "rustls", - "rustls-native-certs", - "serde", - "static_assertions", - "tokio", - "tokio-rustls", - "tower-service", - "windows-sys 0.52.0", -] - [[package]] name = "ddcommon" version = "19.1.0" @@ -1078,9 +966,9 @@ dependencies = [ [[package]] name = "dogstatsd" version = "0.1.0" -source = "git+https://github.com/DataDog/serverless-components?rev=985120329d0ba96c1ec8d719cc38e1f7ce11a092#985120329d0ba96c1ec8d719cc38e1f7ce11a092" +source = "git+https://github.com/DataDog/serverless-components?rev=d131de8419c191ce21c91bb30b5915c4d8a2cc5a#d131de8419c191ce21c91bb30b5915c4d8a2cc5a" dependencies = [ - "datadog-fips 0.1.0 (git+https://github.com/DataDog/serverless-components?rev=985120329d0ba96c1ec8d719cc38e1f7ce11a092)", + "datadog-fips", "datadog-protos 0.1.0 (git+https://github.com/DataDog/saluki/?rev=c89b58e5784b985819baf11f13f7d35876741222)", "ddsketch-agent 0.1.0 (git+https://github.com/DataDog/saluki/?rev=c89b58e5784b985819baf11f13f7d35876741222)", "derive_more", @@ -2089,15 +1977,6 @@ version = "2.7.4" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "78ca9ab1a0babb1e7d5695e3530886289c18cf2f87ec19a575a0abdce112e3a3" -[[package]] -name = "memfd" -version = "0.6.4" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "b2cffa4ad52c6f791f4f8b15f0c05f9824b2ced1160e88cc393d64fff9a8ac64" -dependencies = [ - "rustix 0.38.44", -] - [[package]] name = "mime" version = "0.3.17" @@ -3475,14 +3354,6 @@ dependencies = [ "crunchy", ] -[[package]] -name = "tinybytes" -version = "17.0.0" -source = "git+https://github.com/DataDog/libdatadog?rev=d6a2da32c6b92d6865a7e7987c8a1df2203fb1ae#d6a2da32c6b92d6865a7e7987c8a1df2203fb1ae" -dependencies = [ - "serde", -] - [[package]] name = "tinybytes" version = "19.1.0" diff --git a/bottlecap/Cargo.toml b/bottlecap/Cargo.toml index 262057313..0281b64d8 100644 --- a/bottlecap/Cargo.toml +++ b/bottlecap/Cargo.toml @@ -57,9 +57,9 @@ datadog-trace-protobuf = { git = "https://github.com/DataDog/libdatadog", rev = datadog-trace-utils = { git = "https://github.com/DataDog/libdatadog", rev = "8a49c7df2d9cbf05118bfd5b85772676f71b34f2" , features = ["mini_agent"] } datadog-trace-normalization = { git = "https://github.com/DataDog/libdatadog", rev = "8a49c7df2d9cbf05118bfd5b85772676f71b34f2" } datadog-trace-obfuscation = { git = "https://github.com/DataDog/libdatadog", rev = "8a49c7df2d9cbf05118bfd5b85772676f71b34f2" } -dogstatsd = { git = "https://github.com/DataDog/serverless-components", rev = "985120329d0ba96c1ec8d719cc38e1f7ce11a092", default-features = false } -datadog-trace-agent = { git = "https://github.com/DataDog/serverless-components", rev = "c3d8ed4f90591c6958921145d485463860307f78" } -datadog-fips = { git = "https://github.com/DataDog/serverless-components", rev = "c3d8ed4f90591c6958921145d485463860307f78", default-features = false } +dogstatsd = { git = "https://github.com/DataDog/serverless-components", rev = "d131de8419c191ce21c91bb30b5915c4d8a2cc5a", default-features = false } +datadog-trace-agent = { git = "https://github.com/DataDog/serverless-components", rev = "d131de8419c191ce21c91bb30b5915c4d8a2cc5a" } +datadog-fips = { git = "https://github.com/DataDog/serverless-components", rev = "d131de8419c191ce21c91bb30b5915c4d8a2cc5a", default-features = false } axum = { version = "0.8.4", default-features = false, features = ["default"] } [dev-dependencies] diff --git a/bottlecap/LICENSE-3rdparty.csv b/bottlecap/LICENSE-3rdparty.csv index b42a0f3b2..b8205f160 100644 --- a/bottlecap/LICENSE-3rdparty.csv +++ b/bottlecap/LICENSE-3rdparty.csv @@ -108,7 +108,6 @@ log,https://github.com/rust-lang/log,MIT OR Apache-2.0,The Rust Project Develope matchers,https://github.com/hawkw/matchers,MIT,Eliza Weisman matchit,https://github.com/ibraheemdev/matchit,MIT AND BSD-3-Clause,Ibraheem Ahmed memchr,https://github.com/BurntSushi/memchr,Unlicense OR MIT,"Andrew Gallant , bluss" -memfd,https://github.com/lucab/memfd-rs,MIT OR Apache-2.0,"Luca Bruno , Simonas Kazlauskas " mime,https://github.com/hyperium/mime,MIT OR Apache-2.0,Sean McArthur miniz_oxide,https://github.com/Frommi/miniz_oxide/tree/master/miniz_oxide,MIT OR Zlib OR Apache-2.0,"Frommi , oyvindln , Rich Geldreich richgel99@gmail.com" mio,https://github.com/tokio-rs/mio,MIT,"Carl Lerche , Thomas de Zeeuw , Tokio Contributors " diff --git a/bottlecap/src/bin/bottlecap/main.rs b/bottlecap/src/bin/bottlecap/main.rs index 2aa46a66a..0fdc255d5 100644 --- a/bottlecap/src/bin/bottlecap/main.rs +++ b/bottlecap/src/bin/bottlecap/main.rs @@ -1057,11 +1057,11 @@ fn start_trace_agent( let stats_processor = Arc::new(stats_processor::ServerlessStatsProcessor {}); // Traces - let trace_flusher = Arc::new(trace_flusher::ServerlessTraceFlusher { - aggregator: trace_aggregator.clone(), - config: Arc::clone(config), - api_key_factory: Arc::clone(api_key_factory), - }); + let trace_flusher = Arc::new(trace_flusher::ServerlessTraceFlusher::new( + trace_aggregator.clone(), + config.clone(), + api_key_factory.clone(), + )); let obfuscation_config = obfuscation_config::ObfuscationConfig { tag_replace_rules: config.apm_replace_tags.clone(), diff --git a/bottlecap/src/config/env.rs b/bottlecap/src/config/env.rs index adbb5c745..7509a1da8 100644 --- a/bottlecap/src/config/env.rs +++ b/bottlecap/src/config/env.rs @@ -163,9 +163,21 @@ pub struct EnvConfig { /// @env `DD_APM_CONFIG_OBFUSCATION_HTTP_REMOVE_PATHS_WITH_DIGITS` #[serde(deserialize_with = "deserialize_optional_bool_from_anything")] pub apm_config_obfuscation_http_remove_paths_with_digits: Option, + /// @env `DD_APM_CONFIG_COMPRESSION_LEVEL` + /// + /// The Agent compresses traces before sending them. The `compression_level` parameter + /// accepts values from 0 (no compression) to 9 (maximum compression but + /// higher resource usage). + pub apm_config_compression_level: Option, /// @env `DD_APM_FEATURES` #[serde(deserialize_with = "deserialize_array_from_comma_separated_string")] pub apm_features: Vec, + /// @env `DD_APM_ADDITIONAL_ENDPOINTS` + /// + /// Additional endpoints to send traces to. + /// + #[serde(deserialize_with = "deserialize_additional_endpoints")] + pub apm_additional_endpoints: HashMap>, // // Trace Propagation /// @env `DD_TRACE_PROPAGATION_STYLE` @@ -347,7 +359,9 @@ fn merge_config(config: &mut Config, env_config: &EnvConfig) { env_config, apm_config_obfuscation_http_remove_paths_with_digits ); + merge_option_to_value!(config, env_config, apm_config_compression_level); merge_vec!(config, env_config, apm_features); + merge_hashmap!(config, env_config, apm_additional_endpoints); // Trace Propagation merge_vec!(config, env_config, trace_propagation_style); @@ -533,10 +547,12 @@ mod tests { "DD_APM_CONFIG_OBFUSCATION_HTTP_REMOVE_PATHS_WITH_DIGITS", "true", ); + jail.set_env("DD_APM_CONFIG_COMPRESSION_LEVEL", "3"); jail.set_env( "DD_APM_FEATURES", "enable_otlp_compute_top_level_by_span_kind,enable_stats_by_span_kind", ); + jail.set_env("DD_APM_ADDITIONAL_ENDPOINTS", "{\"https://trace.agent.datadoghq.com\": [\"apikey2\", \"apikey3\"], \"https://trace.agent.datadoghq.eu\": [\"apikey4\"]}"); // Trace Propagation jail.set_env("DD_TRACE_PROPAGATION_STYLE", "datadog"); @@ -673,10 +689,21 @@ mod tests { ), apm_config_obfuscation_http_remove_query_string: true, apm_config_obfuscation_http_remove_paths_with_digits: true, + apm_config_compression_level: 3, apm_features: vec![ "enable_otlp_compute_top_level_by_span_kind".to_string(), "enable_stats_by_span_kind".to_string(), ], + apm_additional_endpoints: HashMap::from([ + ( + "https://trace.agent.datadoghq.com".to_string(), + vec!["apikey2".to_string(), "apikey3".to_string()], + ), + ( + "https://trace.agent.datadoghq.eu".to_string(), + vec!["apikey4".to_string()], + ), + ]), trace_propagation_style: vec![TracePropagationStyle::Datadog], trace_propagation_style_extract: vec![TracePropagationStyle::B3], trace_propagation_extract_first: true, diff --git a/bottlecap/src/config/mod.rs b/bottlecap/src/config/mod.rs index 8b7909030..e1e2f7f1e 100644 --- a/bottlecap/src/config/mod.rs +++ b/bottlecap/src/config/mod.rs @@ -273,7 +273,9 @@ pub struct Config { pub apm_replace_tags: Option>, pub apm_config_obfuscation_http_remove_query_string: bool, pub apm_config_obfuscation_http_remove_paths_with_digits: bool, + pub apm_config_compression_level: i32, pub apm_features: Vec, + pub apm_additional_endpoints: HashMap>, // // Trace Propagation pub trace_propagation_style: Vec, @@ -365,7 +367,9 @@ impl Default for Config { apm_replace_tags: None, apm_config_obfuscation_http_remove_query_string: false, apm_config_obfuscation_http_remove_paths_with_digits: false, + apm_config_compression_level: 6, apm_features: vec![], + apm_additional_endpoints: HashMap::new(), trace_propagation_style: vec![ TracePropagationStyle::Datadog, TracePropagationStyle::TraceContext, diff --git a/bottlecap/src/config/yaml.rs b/bottlecap/src/config/yaml.rs index f35a54bfe..7b944ea42 100644 --- a/bottlecap/src/config/yaml.rs +++ b/bottlecap/src/config/yaml.rs @@ -132,7 +132,10 @@ pub struct ApmConfig { #[serde(deserialize_with = "deserialize_apm_replace_rules")] pub replace_tags: Option>, pub obfuscation: Option, + pub compression_level: Option, pub features: Vec, + #[serde(deserialize_with = "deserialize_additional_endpoints")] + pub additional_endpoints: HashMap>, } impl ApmConfig { @@ -410,6 +413,18 @@ fn merge_config(config: &mut Config, yaml_config: &YamlConfig) { yaml_config.apm_config, replace_tags ); + merge_option_to_value!( + config, + apm_config_compression_level, + yaml_config.apm_config, + compression_level + ); + merge_hashmap!( + config, + apm_additional_endpoints, + yaml_config.apm_config, + additional_endpoints + ); // Not using the macro here because we need to call a method on the struct if let Some(remove_query_string) = yaml_config @@ -641,8 +656,8 @@ http_protocol: "http1" # Endpoints additional_endpoints: "https://app.datadoghq.com": - - "apikey2" - - "apikey3" + - apikey2 + - apikey3 "https://app.datadoghq.eu": - apikey4 @@ -677,9 +692,16 @@ apm_config: http: remove_query_string: true remove_paths_with_digits: true + compression_level: 3 features: - "enable_otlp_compute_top_level_by_span_kind" - "enable_stats_by_span_kind" + additional_endpoints: + "https://trace.agent.datadoghq.com": + - apikey2 + - apikey3 + "https://trace.agent.datadoghq.eu": + - apikey4 service_mapping: old-service:new-service @@ -797,10 +819,21 @@ extension_version: "compatibility" apm_replace_tags: Some(vec![]), apm_config_obfuscation_http_remove_query_string: true, apm_config_obfuscation_http_remove_paths_with_digits: true, + apm_config_compression_level: 3, apm_features: vec![ "enable_otlp_compute_top_level_by_span_kind".to_string(), "enable_stats_by_span_kind".to_string(), ], + apm_additional_endpoints: HashMap::from([ + ( + "https://trace.agent.datadoghq.com".to_string(), + vec!["apikey2".to_string(), "apikey3".to_string()], + ), + ( + "https://trace.agent.datadoghq.eu".to_string(), + vec!["apikey4".to_string()], + ), + ]), trace_propagation_style: vec![TracePropagationStyle::Datadog], trace_propagation_style_extract: vec![TracePropagationStyle::B3], trace_propagation_extract_first: true, diff --git a/bottlecap/src/lifecycle/invocation/processor.rs b/bottlecap/src/lifecycle/invocation/processor.rs index 1f8c246b0..822d1f9cb 100644 --- a/bottlecap/src/lifecycle/invocation/processor.rs +++ b/bottlecap/src/lifecycle/invocation/processor.rs @@ -41,6 +41,7 @@ use crate::{ }; pub const MS_TO_NS: f64 = 1_000_000.0; +pub const S_TO_MS: u64 = 1_000; pub const S_TO_NS: f64 = 1_000_000_000.0; pub const PROACTIVE_INITIALIZATION_THRESHOLD_MS: u64 = 10_000; diff --git a/bottlecap/src/otlp/agent.rs b/bottlecap/src/otlp/agent.rs index d86da5600..349557ed3 100644 --- a/bottlecap/src/otlp/agent.rs +++ b/bottlecap/src/otlp/agent.rs @@ -49,7 +49,7 @@ impl Agent { trace_tx: Sender, ) -> Self { let port = Self::parse_port( - &config.otlp_config_receiver_protocols_http_endpoint, + config.otlp_config_receiver_protocols_http_endpoint.as_ref(), OTLP_AGENT_HTTP_PORT, ); let cancel_token = CancellationToken::new(); @@ -70,9 +70,7 @@ impl Agent { self.cancel_token.clone() } - // TODO (Yiming): Fix this lint - #[allow(clippy::ref_option)] - fn parse_port(endpoint: &Option, default_port: u16) -> u16 { + fn parse_port(endpoint: Option<&String>, default_port: u16) -> u16 { if let Some(endpoint) = endpoint { let port = endpoint.split(':').nth(1); if let Some(port) = port { @@ -204,7 +202,10 @@ mod tests { fn test_parse_port_with_valid_endpoint() { // Test with a valid endpoint containing a port let endpoint = Some("localhost:8080".to_string()); - assert_eq!(Agent::parse_port(&endpoint, OTLP_AGENT_HTTP_PORT), 8080); + assert_eq!( + Agent::parse_port(endpoint.as_ref(), OTLP_AGENT_HTTP_PORT), + 8080 + ); } #[test] @@ -212,7 +213,7 @@ mod tests { // Test with an endpoint containing an invalid port format let endpoint = Some("localhost:invalid".to_string()); assert_eq!( - Agent::parse_port(&endpoint, OTLP_AGENT_HTTP_PORT), + Agent::parse_port(endpoint.as_ref(), OTLP_AGENT_HTTP_PORT), OTLP_AGENT_HTTP_PORT ); } @@ -222,7 +223,7 @@ mod tests { // Test with an endpoint missing a port let endpoint = Some("localhost".to_string()); assert_eq!( - Agent::parse_port(&endpoint, OTLP_AGENT_HTTP_PORT), + Agent::parse_port(endpoint.as_ref(), OTLP_AGENT_HTTP_PORT), OTLP_AGENT_HTTP_PORT ); } @@ -232,7 +233,7 @@ mod tests { // Test with None endpoint let endpoint: Option = None; assert_eq!( - Agent::parse_port(&endpoint, OTLP_AGENT_HTTP_PORT), + Agent::parse_port(endpoint.as_ref(), OTLP_AGENT_HTTP_PORT), OTLP_AGENT_HTTP_PORT ); } @@ -242,7 +243,7 @@ mod tests { // Test with an empty endpoint let endpoint = Some("".to_string()); assert_eq!( - Agent::parse_port(&endpoint, OTLP_AGENT_HTTP_PORT), + Agent::parse_port(endpoint.as_ref(), OTLP_AGENT_HTTP_PORT), OTLP_AGENT_HTTP_PORT ); } diff --git a/bottlecap/src/proxy/interceptor.rs b/bottlecap/src/proxy/interceptor.rs index d7efeadf3..2b0d0a68e 100644 --- a/bottlecap/src/proxy/interceptor.rs +++ b/bottlecap/src/proxy/interceptor.rs @@ -37,7 +37,7 @@ pub fn start( aws_config: Arc, invocation_processor: Arc>, ) -> Result> { - let socket = get_proxy_socket_address(&aws_config.aws_lwa_proxy_lambda_runtime_api); + let socket = get_proxy_socket_address(aws_config.aws_lwa_proxy_lambda_runtime_api.as_ref()); let shutdown_token = CancellationToken::new(); let mut connector = HttpConnector::new(); @@ -110,11 +110,8 @@ async fn graceful_shutdown(tasks: Arc>>, shutdown_token: Cance /// If the LWA proxy lambda runtime API is not provided, the default Extension /// host and port will be used. /// -// TODO (Yiming): Fix this lint -#[allow(clippy::ref_option)] -fn get_proxy_socket_address(aws_lwa_proxy_lambda_runtime_api: &Option) -> SocketAddr { +fn get_proxy_socket_address(aws_lwa_proxy_lambda_runtime_api: Option<&String>) -> SocketAddr { if let Some(socket_addr) = aws_lwa_proxy_lambda_runtime_api - .as_ref() .and_then(|uri_str| lwa::get_lwa_proxy_socket_address(uri_str).ok()) { debug!("PROXY | get_proxy_socket_address | LWA proxy detected"); diff --git a/bottlecap/src/traces/stats_aggregator.rs b/bottlecap/src/traces/stats_aggregator.rs index 9693f722d..3ae98ff23 100644 --- a/bottlecap/src/traces/stats_aggregator.rs +++ b/bottlecap/src/traces/stats_aggregator.rs @@ -7,9 +7,9 @@ use std::collections::VecDeque; /// // const MAX_BATCH_ENTRIES_SIZE: usize = 4000; -/// Aproximate size an entry in a stat payload occupies -/// -/// +// Aproximate size an entry in a stat payload occupies +// +// // const MAX_ENTRY_SIZE_BYTES: usize = 375; /// Maximum content size per payload in compressed bytes, diff --git a/bottlecap/src/traces/stats_flusher.rs b/bottlecap/src/traces/stats_flusher.rs index 266d9d465..71a86fe9d 100644 --- a/bottlecap/src/traces/stats_flusher.rs +++ b/bottlecap/src/traces/stats_flusher.rs @@ -8,6 +8,7 @@ use tokio::sync::Mutex; use tokio::sync::OnceCell; use crate::config; +use crate::lifecycle::invocation::processor::S_TO_MS; use crate::traces::stats_aggregator::StatsAggregator; use datadog_trace_protobuf::pb; use datadog_trace_utils::{config_utils::trace_stats_url, stats_utils}; @@ -75,7 +76,7 @@ impl StatsFlusher for ServerlessStatsFlusher { url: hyper::Uri::from_str(&stats_url) .expect("can't make URI from stats url, exiting"), api_key: Some(api_key_clone.into()), - timeout_ms: self.config.flush_timeout * 1_000, + timeout_ms: self.config.flush_timeout * S_TO_MS, test_token: None, } } diff --git a/bottlecap/src/traces/trace_flusher.rs b/bottlecap/src/traces/trace_flusher.rs index 8fb6a2b47..74a4f487d 100644 --- a/bottlecap/src/traces/trace_flusher.rs +++ b/bottlecap/src/traces/trace_flusher.rs @@ -2,17 +2,22 @@ // SPDX-License-Identifier: Apache-2.0 use async_trait::async_trait; +use ddcommon::Endpoint; +use futures::future::join_all; +use std::str::FromStr; use std::sync::Arc; use tokio::sync::Mutex; use tracing::{debug, error}; use datadog_trace_utils::{ + config_utils::trace_intake_url_prefixed, send_data::SendDataBuilder, trace_utils::{self, SendData}, }; use dogstatsd::api_key::ApiKeyFactory; use crate::config::Config; +use crate::lifecycle::invocation::processor::S_TO_MS; use crate::traces::trace_aggregator::TraceAggregator; #[async_trait] @@ -26,7 +31,11 @@ pub trait TraceFlusher { Self: Sized; /// Given a `Vec`, a tracer payload, send it to the Datadog intake endpoint. /// Returns the traces back if there was an error sending them. - async fn send(&self, traces: Vec) -> Option>; + async fn send( + &self, + traces: Vec, + endpoint: Option<&Endpoint>, + ) -> Option>; /// Flushes traces by getting every available batch on the aggregator. /// If `failed_traces` is provided, it will attempt to send those instead of fetching new traces. @@ -40,6 +49,7 @@ pub struct ServerlessTraceFlusher { pub aggregator: Arc>, pub config: Arc, pub api_key_factory: Arc, + pub additional_endpoints: Vec, } #[async_trait] @@ -49,10 +59,28 @@ impl TraceFlusher for ServerlessTraceFlusher { config: Arc, api_key_factory: Arc, ) -> Self { + let mut additional_endpoints: Vec = Vec::new(); + + for (endpoint_url, api_keys) in config.apm_additional_endpoints.clone() { + for api_key in api_keys { + let trace_intake_url = trace_intake_url_prefixed(&endpoint_url); + let endpoint = Endpoint { + url: hyper::Uri::from_str(&trace_intake_url) + .expect("can't parse additional trace intake URL, exiting"), + api_key: Some(api_key.clone().into()), + timeout_ms: config.flush_timeout * S_TO_MS, + test_token: None, + }; + + additional_endpoints.push(endpoint); + } + } + ServerlessTraceFlusher { aggregator, config, api_key_factory, + additional_endpoints, } } @@ -62,13 +90,13 @@ impl TraceFlusher for ServerlessTraceFlusher { return None; }; - let mut failed_batch: Option> = None; + let mut failed_batch: Vec = Vec::new(); if let Some(traces) = failed_traces { // If we have traces from a previous failed attempt, try to send those first if !traces.is_empty() { debug!("Retrying to send {} previously failed traces", traces.len()); - let retry_result = self.send(traces).await; + let retry_result = self.send(traces, None).await; if retry_result.is_some() { // Still failed, return to retry later return retry_result; @@ -87,36 +115,59 @@ impl TraceFlusher for ServerlessTraceFlusher { .map(|builder| builder.with_api_key(api_key)) .map(SendDataBuilder::build) .collect(); - if let Some(failed) = self.send(traces).await { - // Keep track of the failed batch - failed_batch = Some(failed); - // Stop processing more batches if we have a failure + if let Some(mut failed) = self.send(traces.clone(), None).await { + failed_batch.append(&mut failed); + } + + // Send to additional endpoints + let tasks = self.additional_endpoints.iter().map(|endpoint| { + let traces_clone = traces.clone(); + async move { self.send(traces_clone, Some(endpoint)).await } + }); + for mut failed in join_all(tasks).await.into_iter().flatten() { + failed_batch.append(&mut failed); + } + + // Stop processing more batches if we have a failure + if !failed_batch.is_empty() { break; } trace_builders = guard.get_batch(); } - failed_batch + if !failed_batch.is_empty() { + return Some(failed_batch); + } + + None } - async fn send(&self, traces: Vec) -> Option> { + async fn send( + &self, + traces: Vec, + endpoint: Option<&Endpoint>, + ) -> Option> { if traces.is_empty() { return None; } let start = std::time::Instant::now(); - debug!("Flushing {} traces", traces.len()); - - // Since we return the original traces on error, we need to clone them before coalescing - let traces_clone = traces.clone(); let coalesced_traces = trace_utils::coalesce_send_data(traces); let mut tasks = Vec::with_capacity(coalesced_traces.len()); - - for traces in coalesced_traces { - let proxy_https = self.config.proxy_https.clone(); + debug!("Flushing {} traces", coalesced_traces.len()); + + for trace in &coalesced_traces { + let trace_with_endpoint = match endpoint { + Some(additional_endpoint) => trace.with_endpoint(additional_endpoint.clone()), + None => trace.clone(), + }; + let proxy = self.config.proxy_https.clone(); tasks.push(tokio::spawn(async move { - traces.send_proxy(proxy_https.as_deref()).await.last_result + trace_with_endpoint + .send_proxy(proxy.as_deref()) + .await + .last_result })); } @@ -126,13 +177,13 @@ impl TraceFlusher for ServerlessTraceFlusher { if let Err(e) = result { error!("Error sending trace: {e:?}"); // Return the original traces for retry - return Some(traces_clone); + return Some(coalesced_traces.clone()); } } Err(e) => { error!("Task join error: {e:?}"); // Return the original traces for retry if a task panics - return Some(traces_clone); + return Some(coalesced_traces.clone()); } } } diff --git a/bottlecap/src/traces/trace_processor.rs b/bottlecap/src/traces/trace_processor.rs index ca65bb405..fba5d4781 100644 --- a/bottlecap/src/traces/trace_processor.rs +++ b/bottlecap/src/traces/trace_processor.rs @@ -2,6 +2,7 @@ // SPDX-License-Identifier: Apache-2.0 use crate::config; +use crate::lifecycle::invocation::processor::S_TO_MS; use crate::tags::provider; use crate::traces::span_pointers::{attach_span_pointers_to_meta, SpanPointer}; use crate::traces::{ @@ -167,7 +168,7 @@ impl TraceProcessor for ServerlessTraceProcessor { .expect("can't parse trace intake URL, exiting"), // Will be set at flush time api_key: None, - timeout_ms: config.flush_timeout * 1_000, + timeout_ms: config.flush_timeout * S_TO_MS, test_token: None, };