Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ public class SharedCommunicationObjects {
public OkHttpClient okHttpClient;
public HttpUrl agentUrl;
public Monitoring monitoring;
private DDAgentFeaturesDiscovery featuresDiscovery;
private volatile DDAgentFeaturesDiscovery featuresDiscovery;
private ConfigurationPoller configurationPoller;

public SharedCommunicationObjects() {
Expand Down Expand Up @@ -139,28 +139,34 @@ public void setFeaturesDiscovery(DDAgentFeaturesDiscovery featuresDiscovery) {
}

public DDAgentFeaturesDiscovery featuresDiscovery(Config config) {
if (featuresDiscovery == null) {
createRemaining(config);
featuresDiscovery =
new DDAgentFeaturesDiscovery(
okHttpClient,
monitoring,
agentUrl,
config.isTraceAgentV05Enabled(),
config.isTracerMetricsEnabled());

if (paused) {
// defer remote discovery until remote I/O is allowed
} else {
if (AGENT_THREAD_GROUP.equals(Thread.currentThread().getThreadGroup())) {
featuresDiscovery.discover(); // safe to run on same thread
} else {
// avoid performing blocking I/O operation on application thread
AgentTaskScheduler.INSTANCE.execute(featuresDiscovery::discover);
DDAgentFeaturesDiscovery ret = featuresDiscovery;
if (ret == null) {
synchronized (this) {
if (featuresDiscovery == null) {
createRemaining(config);
ret =
new DDAgentFeaturesDiscovery(
okHttpClient,
monitoring,
agentUrl,
config.isTraceAgentV05Enabled(),
config.isTracerMetricsEnabled());

if (paused) {
// defer remote discovery until remote I/O is allowed
} else {
if (AGENT_THREAD_GROUP.equals(Thread.currentThread().getThreadGroup())) {
ret.discover(); // safe to run on same thread
} else {
// avoid performing blocking I/O operation on application thread
AgentTaskScheduler.INSTANCE.execute(ret::discoverIfOutdated);
}
}
featuresDiscovery = ret;
}
}
}
return featuresDiscovery;
return ret;
}

private static final class FixedConfigUrlSupplier implements Supplier<String> {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -75,8 +75,7 @@ public class AppSecConfigServiceImpl implements AppSecConfigService {
private final ConfigurationPoller configurationPoller;
private WafBuilder wafBuilder;

private MergedAsmFeatures mergedAsmFeatures;
private volatile boolean initialized;
private final MergedAsmFeatures mergedAsmFeatures = new MergedAsmFeatures();

private final ConcurrentHashMap<String, SubconfigListener> subconfigListeners =
new ConcurrentHashMap<>();
Expand Down Expand Up @@ -173,9 +172,7 @@ private class AppSecConfigChangesListener implements ProductListener {
@Override
public void accept(ConfigKey configKey, byte[] content, PollingRateHinter pollingRateHinter)
throws IOException {
if (!initialized) {
throw new IllegalStateException();
}
maybeInitializeDefaultConfig();

if (content == null) {
try {
Expand Down Expand Up @@ -219,8 +216,8 @@ public void accept(ConfigKey configKey, byte[] content, PollingRateHinter pollin
}
defaultConfigActivated = false;
}
super.accept(configKey, content, pollingRateHinter);
usedDDWafConfigKeys.add(configKey.toString());
super.accept(configKey, content, pollingRateHinter);
}

@Override
Expand Down Expand Up @@ -282,13 +279,7 @@ private void subscribeAsmFeatures() {
Product.ASM_FEATURES,
AppSecFeaturesDeserializer.INSTANCE,
(configKey, newConfig, hinter) -> {
if (!hasUserWafConfig && !defaultConfigActivated) {
// features activated in runtime
init();
}
if (!initialized) {
throw new IllegalStateException();
}
maybeInitializeDefaultConfig();
if (newConfig == null) {
mergedAsmFeatures.removeConfig(configKey);
} else {
Expand All @@ -305,10 +296,7 @@ private void subscribeAsmFeatures() {

private void distributeSubConfigurations(
String key, AppSecModuleConfigurer.Reconfiguration reconfiguration) {
if (usedDDWafConfigKeys.isEmpty() && !defaultConfigActivated && !hasUserWafConfig) {
// no config left in the WAF builder, add the default config
init();
}
maybeInitializeDefaultConfig();
for (Map.Entry<String, SubconfigListener> entry : subconfigListeners.entrySet()) {
SubconfigListener listener = entry.getValue();
try {
Expand All @@ -320,6 +308,13 @@ private void distributeSubConfigurations(
}
}

private void maybeInitializeDefaultConfig() {
if (usedDDWafConfigKeys.isEmpty() && !hasUserWafConfig && !defaultConfigActivated) {
// no config left in the WAF builder, add the default config
init();
}
}

@Override
public void init() {
Map<String, Object> wafConfig;
Expand All @@ -341,8 +336,8 @@ public void init() {
} else {
hasUserWafConfig = true;
}
this.mergedAsmFeatures = new MergedAsmFeatures();
this.initialized = true;
this.mergedAsmFeatures.clear();
this.usedDDWafConfigKeys.clear();

if (wafConfig.isEmpty()) {
throw new IllegalStateException("Expected default waf config to be available");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,4 +49,9 @@ private void mergeAutoUserInstrum(
}
target.autoUserInstrum = newValue;
}

public void clear() {
mergedData = null;
configs.clear();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -759,6 +759,67 @@ class AppSecConfigServiceImplSpecification extends DDSpecification {
p.toFile().delete()
}

// https://github.com/DataDog/dd-trace-java/issues/9159
void 'test initialization issues while applying remote config'() {
setup:
final key = new ParsedConfigKey('Test', '1234', 1, 'ASM_DD', 'ID')
final service = new AppSecConfigServiceImpl(config, poller, reconf)
config.getAppSecActivation() >> ProductActivation.ENABLED_INACTIVE

when:
service.maybeSubscribeConfigPolling()

then:
1 * poller.addListener(Product.ASM_DD, _) >> {
listeners.savedWafDataChangesListener = it[1]
}

when:
listeners.savedWafDataChangesListener.accept(key, '''{"rules_override": [{"rules_target": [{"rule_id": "foo"}], "enabled": false}]}'''.getBytes(), NOOP)

then:
noExceptionThrown()
}

void 'config keys are added and removed to the set when receiving ASM_DD payloads'() {
setup:
final key = new ParsedConfigKey('Test', '1234', 1, 'ASM_DD', 'ID')
final service = new AppSecConfigServiceImpl(config, poller, reconf)
config.getAppSecActivation() >> ProductActivation.ENABLED_INACTIVE

when:
service.maybeSubscribeConfigPolling()

then:
1 * poller.addListener(Product.ASM_DD, _) >> {
listeners.savedWafDataChangesListener = it[1]
}
1 * poller.addListener(Product.ASM_FEATURES, _, _) >> {
listeners.savedFeaturesDeserializer = it[1]
listeners.savedFeaturesListener = it[2]
}

when:
listeners.savedFeaturesListener.accept('asm_features conf',
listeners.savedFeaturesDeserializer.deserialize('{"asm":{"enabled": true}}'.bytes),
NOOP)

then:
service.usedDDWafConfigKeys.empty

when:
listeners.savedWafDataChangesListener.accept(key, '''{"rules_override": [{"rules_target": [{"rule_id": "foo"}], "enabled": false}]}'''.getBytes(), NOOP)

then:
service.usedDDWafConfigKeys.toList() == [key.toString()]

when:
listeners.savedWafDataChangesListener.remove(key, NOOP)

then:
service.usedDDWafConfigKeys.empty
}

private static AppSecFeatures autoUserInstrum(String mode) {
return new AppSecFeatures().tap { features ->
features.autoUserInstrum = new AppSecFeatures.AutoUserInstrum().tap { instrum ->
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,6 @@
import static datadog.context.propagation.Propagators.defaultPropagator;
import static datadog.trace.bootstrap.instrumentation.api.AgentTracer.startSpan;
import static datadog.trace.bootstrap.instrumentation.api.AgentTracer.traceConfig;
import static datadog.trace.core.datastreams.TagsProcessor.DIRECTION_OUT;
import static datadog.trace.core.datastreams.TagsProcessor.DIRECTION_TAG;
import static datadog.trace.core.datastreams.TagsProcessor.TYPE_TAG;

import datadog.context.Context;
import datadog.context.propagation.CarrierSetter;
Expand All @@ -14,6 +11,7 @@
import datadog.trace.api.cache.DDCache;
import datadog.trace.api.cache.DDCaches;
import datadog.trace.api.datastreams.DataStreamsContext;
import datadog.trace.api.datastreams.DataStreamsTags;
import datadog.trace.api.naming.SpanNaming;
import datadog.trace.bootstrap.instrumentation.api.AgentSpan;
import datadog.trace.bootstrap.instrumentation.api.InternalSpanTypes;
Expand All @@ -23,7 +21,6 @@
import io.grpc.MethodDescriptor;
import io.grpc.Status;
import java.util.BitSet;
import java.util.LinkedHashMap;
import java.util.Set;
import java.util.function.Function;

Expand All @@ -35,10 +32,8 @@ public class GrpcClientDecorator extends ClientDecorator {
public static final CharSequence GRPC_MESSAGE = UTF8BytesString.create("grpc.message");

private static DataStreamsContext createDsmContext() {
LinkedHashMap<String, String> result = new LinkedHashMap<>();
result.put(DIRECTION_TAG, DIRECTION_OUT);
result.put(TYPE_TAG, "grpc");
return DataStreamsContext.fromTags(result);
return DataStreamsContext.fromTags(
DataStreamsTags.create("grpc", DataStreamsTags.Direction.Outbound));
}

public static final GrpcClientDecorator DECORATE = new GrpcClientDecorator();
Expand Down
Original file line number Diff line number Diff line change
@@ -1,12 +1,9 @@
package datadog.trace.instrumentation.armeria.grpc.server;

import static datadog.trace.core.datastreams.TagsProcessor.DIRECTION_IN;
import static datadog.trace.core.datastreams.TagsProcessor.DIRECTION_TAG;
import static datadog.trace.core.datastreams.TagsProcessor.TYPE_TAG;

import datadog.trace.api.Config;
import datadog.trace.api.cache.DDCache;
import datadog.trace.api.cache.DDCaches;
import datadog.trace.api.datastreams.DataStreamsTags;
import datadog.trace.api.naming.SpanNaming;
import datadog.trace.bootstrap.instrumentation.api.AgentSpan;
import datadog.trace.bootstrap.instrumentation.api.ErrorPriorities;
Expand All @@ -18,7 +15,6 @@
import io.grpc.StatusException;
import io.grpc.StatusRuntimeException;
import java.util.BitSet;
import java.util.LinkedHashMap;
import java.util.function.Function;

public class GrpcServerDecorator extends ServerDecorator {
Expand All @@ -33,15 +29,11 @@ public class GrpcServerDecorator extends ServerDecorator {
public static final CharSequence COMPONENT_NAME = UTF8BytesString.create("armeria-grpc-server");
public static final CharSequence GRPC_MESSAGE = UTF8BytesString.create("grpc.message");

private static final LinkedHashMap<String, String> createServerPathwaySortedTags() {
LinkedHashMap<String, String> result = new LinkedHashMap<>();
result.put(DIRECTION_TAG, DIRECTION_IN);
result.put(TYPE_TAG, "grpc");
return result;
private static DataStreamsTags createServerPathwaySortedTags() {
return DataStreamsTags.create("grpc", DataStreamsTags.Direction.Inbound);
}

public static final LinkedHashMap<String, String> SERVER_PATHWAY_EDGE_TAGS =
createServerPathwaySortedTags();
public static final DataStreamsTags SERVER_PATHWAY_EDGE_TAGS = createServerPathwaySortedTags();
public static final GrpcServerDecorator DECORATE = new GrpcServerDecorator();

private static final Function<String, String> NORMALIZE =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -254,14 +254,12 @@ abstract class ArmeriaGrpcTest extends VersionedNamingTestBase {
if (isDataStreamsEnabled()) {
StatsGroup first = TEST_DATA_STREAMS_WRITER.groups.find { it.parentHash == 0 }
verifyAll(first) {
edgeTags.containsAll(["direction:out", "type:grpc"])
edgeTags.size() == 2
tags.hasAllTags("direction:out", "type:grpc")
}

StatsGroup second = TEST_DATA_STREAMS_WRITER.groups.find { it.parentHash == first.hash }
verifyAll(second) {
edgeTags.containsAll(["direction:in", "type:grpc"])
edgeTags.size() == 2
tags.hasAllTags("direction:in", "type:grpc")
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,18 +2,14 @@

import static datadog.context.propagation.Propagators.defaultPropagator;
import static datadog.trace.bootstrap.instrumentation.api.AgentTracer.traceConfig;
import static datadog.trace.core.datastreams.TagsProcessor.BUS_TAG;
import static datadog.trace.core.datastreams.TagsProcessor.DIRECTION_OUT;
import static datadog.trace.core.datastreams.TagsProcessor.DIRECTION_TAG;
import static datadog.trace.core.datastreams.TagsProcessor.TYPE_TAG;
import static datadog.trace.instrumentation.aws.v2.eventbridge.TextMapInjectAdapter.SETTER;

import datadog.trace.api.datastreams.DataStreamsContext;
import datadog.trace.api.datastreams.DataStreamsTags;
import datadog.trace.api.datastreams.PathwayContext;
import datadog.trace.bootstrap.InstanceStore;
import datadog.trace.bootstrap.instrumentation.api.AgentSpan;
import java.util.ArrayList;
import java.util.LinkedHashMap;
import java.util.List;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand Down Expand Up @@ -89,7 +85,9 @@ private String getTraceContextToInject(
// Inject context
datadog.context.Context context = span;
if (traceConfig().isDataStreamsEnabled()) {
DataStreamsContext dsmContext = DataStreamsContext.fromTags(getTags(eventBusName));
DataStreamsTags tags =
DataStreamsTags.createWithBus(DataStreamsTags.Direction.Outbound, eventBusName);
DataStreamsContext dsmContext = DataStreamsContext.fromTags(tags);
context = context.with(dsmContext);
}
defaultPropagator().inject(context, jsonBuilder, SETTER);
Expand All @@ -111,13 +109,4 @@ private String getTraceContextToInject(
jsonBuilder.append('}');
return jsonBuilder.toString();
}

private LinkedHashMap<String, String> getTags(String eventBusName) {
LinkedHashMap<String, String> sortedTags = new LinkedHashMap<>();
sortedTags.put(DIRECTION_TAG, DIRECTION_OUT);
sortedTags.put(BUS_TAG, eventBusName);
sortedTags.put(TYPE_TAG, "bus");

return sortedTags;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -115,8 +115,7 @@ abstract class AWS1KinesisClientTest extends VersionedNamingTestBase {
pathwayLatencyCount += group.pathwayLatency.count
edgeLatencyCount += group.edgeLatency.count
verifyAll(group) {
edgeTags.containsAll(["direction:" + dsmDirection, "topic:" + streamArn, "type:kinesis"])
edgeTags.size() == 3
tags.hasAllTags("direction:" + dsmDirection, "topic:" + streamArn, "type:kinesis")
}
}
verifyAll {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -96,8 +96,7 @@ abstract class AWS1SnsClientTest extends VersionedNamingTestBase {
pathwayLatencyCount += group.pathwayLatency.count
edgeLatencyCount += group.edgeLatency.count
verifyAll(group) {
edgeTags.containsAll(["direction:" + dsmDirection, "topic:" + topicName, "type:sns"])
edgeTags.size() == 3
tags.hasAllTags("direction:" + dsmDirection, "topic:" + topicName, "type:sns")
}
}
verifyAll {
Expand Down
Loading