|
15 | 15 |
|
16 | 16 | package software.amazon.opentelemetry.javaagent.providers;
|
17 | 17 |
|
| 18 | +import com.fasterxml.jackson.core.JsonProcessingException; |
| 19 | +import com.fasterxml.jackson.core.type.TypeReference; |
| 20 | +import com.fasterxml.jackson.databind.ObjectMapper; |
| 21 | +import com.fasterxml.jackson.dataformat.yaml.YAMLFactory; |
18 | 22 | import io.opentelemetry.api.common.Attributes;
|
19 | 23 | import io.opentelemetry.api.common.AttributesBuilder;
|
20 |
| -import io.opentelemetry.contrib.awsxray.AlwaysRecordSampler; |
| 24 | +import io.opentelemetry.contrib.awsxray.AwsXrayAdaptiveSamplingConfig; |
| 25 | +import io.opentelemetry.contrib.awsxray.AwsXrayRemoteSampler; |
21 | 26 | import io.opentelemetry.contrib.awsxray.ResourceHolder;
|
22 | 27 | import io.opentelemetry.exporter.otlp.http.logs.OtlpHttpLogRecordExporter;
|
23 | 28 | import io.opentelemetry.exporter.otlp.http.metrics.OtlpHttpMetricExporter;
|
|
42 | 47 | import io.opentelemetry.sdk.trace.SpanProcessor;
|
43 | 48 | import io.opentelemetry.sdk.trace.export.SpanExporter;
|
44 | 49 | import io.opentelemetry.sdk.trace.samplers.Sampler;
|
| 50 | +import java.io.IOException; |
| 51 | +import java.nio.charset.StandardCharsets; |
| 52 | +import java.nio.file.Files; |
| 53 | +import java.nio.file.Path; |
| 54 | +import java.nio.file.Paths; |
45 | 55 | import java.time.Duration;
|
46 | 56 | import java.util.ArrayList;
|
47 | 57 | import java.util.Arrays;
|
@@ -142,11 +152,16 @@ public final class AwsApplicationSignalsCustomizerProvider
|
142 | 152 | private static final String OTEL_EXPORTER_OTLP_LOGS_COMPRESSION_CONFIG =
|
143 | 153 | "otel.exporter.otlp.logs.compression";
|
144 | 154 |
|
| 155 | + private static final String AWS_XRAY_ADAPTIVE_SAMPLING_CONFIG = |
| 156 | + "aws.xray.adaptive.sampling.config"; |
| 157 | + |
145 | 158 | // UDP packet can be upto 64KB. To limit the packet size, we limit the exported batch size.
|
146 | 159 | // This is a bit of a magic number, as there is no simple way to tell how many spans can make a
|
147 | 160 | // 64KB batch since spans can vary in size.
|
148 | 161 | private static final int LAMBDA_SPAN_EXPORT_BATCH_SIZE = 10;
|
149 | 162 |
|
| 163 | + private Sampler sampler; |
| 164 | + |
150 | 165 | public void customize(AutoConfigurationCustomizer autoConfiguration) {
|
151 | 166 | autoConfiguration.addPropertiesCustomizer(this::customizeProperties);
|
152 | 167 | autoConfiguration.addPropertiesCustomizer(this::customizeLambdaEnvProperties);
|
@@ -281,6 +296,27 @@ private Resource customizeResource(Resource resource, ConfigProperties configPro
|
281 | 296 | }
|
282 | 297 |
|
283 | 298 | private Sampler customizeSampler(Sampler sampler, ConfigProperties configProps) {
|
| 299 | + if (sampler instanceof AwsXrayRemoteSampler) { |
| 300 | + String config = configProps.getString(AWS_XRAY_ADAPTIVE_SAMPLING_CONFIG); |
| 301 | + AwsXrayAdaptiveSamplingConfig parsedConfig = null; |
| 302 | + |
| 303 | + try { |
| 304 | + parsedConfig = parseConfigString(config); |
| 305 | + } catch (Exception e) { |
| 306 | + logger.log( |
| 307 | + Level.WARNING, "Failed to parse adaptive sampling configuration: {0}", e.getMessage()); |
| 308 | + } |
| 309 | + |
| 310 | + if (parsedConfig != null) { |
| 311 | + try { |
| 312 | + ((AwsXrayRemoteSampler) sampler).setAdaptiveSamplingConfig(parsedConfig); |
| 313 | + } catch (Exception e) { |
| 314 | + logger.log( |
| 315 | + Level.WARNING, "Error processing adaptive sampling config: {0}", e.getMessage()); |
| 316 | + } |
| 317 | + } |
| 318 | + this.sampler = sampler; |
| 319 | + } |
284 | 320 | if (isApplicationSignalsEnabled(configProps)) {
|
285 | 321 | return AlwaysRecordSampler.create(sampler);
|
286 | 322 | }
|
@@ -344,10 +380,13 @@ private SdkTracerProviderBuilder customizeTracerProviderBuilder(
|
344 | 380 | .build();
|
345 | 381 |
|
346 | 382 | // Construct and set application signals metrics processor
|
347 |
| - SpanProcessor spanMetricsProcessor = |
| 383 | + AwsSpanMetricsProcessorBuilder awsSpanMetricsProcessorBuilder = |
348 | 384 | AwsSpanMetricsProcessorBuilder.create(
|
349 |
| - meterProvider, ResourceHolder.getResource(), meterProvider::forceFlush) |
350 |
| - .build(); |
| 385 | + meterProvider, ResourceHolder.getResource(), meterProvider::forceFlush); |
| 386 | + if (this.sampler != null) { |
| 387 | + awsSpanMetricsProcessorBuilder.setSampler(this.sampler); |
| 388 | + } |
| 389 | + SpanProcessor spanMetricsProcessor = awsSpanMetricsProcessorBuilder.build(); |
351 | 390 | tracerProviderBuilder.addSpanProcessor(spanMetricsProcessor);
|
352 | 391 | }
|
353 | 392 | return tracerProviderBuilder;
|
@@ -423,11 +462,14 @@ SpanExporter customizeSpanExporter(SpanExporter spanExporter, ConfigProperties c
|
423 | 462 | }
|
424 | 463 |
|
425 | 464 | if (isApplicationSignalsEnabled(configProps)) {
|
426 |
| - return AwsMetricAttributesSpanExporterBuilder.create( |
427 |
| - spanExporter, ResourceHolder.getResource()) |
428 |
| - .build(); |
| 465 | + spanExporter = |
| 466 | + AwsMetricAttributesSpanExporterBuilder.create(spanExporter, ResourceHolder.getResource()) |
| 467 | + .build(); |
429 | 468 | }
|
430 | 469 |
|
| 470 | + if (this.sampler instanceof AwsXrayRemoteSampler) { |
| 471 | + ((AwsXrayRemoteSampler) this.sampler).setSpanExporter(spanExporter); |
| 472 | + } |
431 | 473 | return spanExporter;
|
432 | 474 | }
|
433 | 475 |
|
@@ -467,6 +509,44 @@ LogRecordExporter customizeLogsExporter(
|
467 | 509 | return logsExporter;
|
468 | 510 | }
|
469 | 511 |
|
| 512 | + static AwsXrayAdaptiveSamplingConfig parseConfigString(String config) |
| 513 | + throws JsonProcessingException { |
| 514 | + if (config == null) { |
| 515 | + return null; |
| 516 | + } |
| 517 | + |
| 518 | + // Check if the config is a file path and the file exists |
| 519 | + Path path = Paths.get(config); |
| 520 | + if (Files.exists(path)) { |
| 521 | + try { |
| 522 | + config = String.join("\n", Files.readAllLines(path, StandardCharsets.UTF_8)); |
| 523 | + } catch (IOException e) { |
| 524 | + throw new IllegalArgumentException( |
| 525 | + "Failed to read adaptive sampling configuration file: " + e.getMessage(), e); |
| 526 | + } |
| 527 | + } |
| 528 | + |
| 529 | + ObjectMapper yamlMapper = new ObjectMapper(new YAMLFactory()); |
| 530 | + Map<String, Object> configMap = |
| 531 | + yamlMapper.readValue(config, new TypeReference<Map<String, Object>>() {}); |
| 532 | + |
| 533 | + Object versionObj = configMap.get("version"); |
| 534 | + if (versionObj == null) { |
| 535 | + throw new IllegalArgumentException( |
| 536 | + "Missing required 'version' field in adaptive sampling configuration"); |
| 537 | + } |
| 538 | + |
| 539 | + double version = ((Number) versionObj).doubleValue(); |
| 540 | + if (version >= 2L) { |
| 541 | + throw new IllegalArgumentException( |
| 542 | + "Incompatible adaptive sampling config version: " |
| 543 | + + version |
| 544 | + + ". This version of the AWS X-Ray remote sampler only supports versions strictly below 2.0."); |
| 545 | + } |
| 546 | + |
| 547 | + return yamlMapper.readValue(config, AwsXrayAdaptiveSamplingConfig.class); |
| 548 | + } |
| 549 | + |
470 | 550 | private enum ApplicationSignalsExporterProvider {
|
471 | 551 | INSTANCE;
|
472 | 552 |
|
|
0 commit comments