Skip to content

Commit df9a0c1

Browse files
authored
[GOBBLIN-2053] Add header and fix prefix configs opentelemetry (#3933)
Add header and fix prefix configs opentelemetry
1 parent 77f9d38 commit df9a0c1

File tree

8 files changed

+114
-22
lines changed

8 files changed

+114
-22
lines changed

gobblin-api/src/main/java/org/apache/gobblin/configuration/ConfigurationKeys.java

Lines changed: 8 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -905,18 +905,18 @@ public class ConfigurationKeys {
905905
public static final String METRICS_CUSTOM_BUILDERS = METRICS_CONFIGURATIONS_PREFIX + "reporting.custom.builders";
906906

907907
// Opentelemetry based metrics reporting
908-
public static final String METRICS_REPORTING_OPENTELEMETRY_ENABLED =
909-
METRICS_CONFIGURATIONS_PREFIX + "reporting.opentelemtry.metrics.enabled";
908+
public static final String METRICS_REPORTING_OPENTELEMETRY_PREFIX = "metrics.reporting.opentelemetry.";
909+
public static final String METRICS_REPORTING_OPENTELEMETRY_ENABLED = METRICS_REPORTING_OPENTELEMETRY_PREFIX + "enabled";
910910

911-
public static final String METRICS_REPORTING_OPENTELEMETRY_CONFIGS =
912-
METRICS_CONFIGURATIONS_PREFIX + "reporting.opentelemetry.configs";
911+
public static final String METRICS_REPORTING_OPENTELEMETRY_CONFIGS_PREFIX = METRICS_REPORTING_OPENTELEMETRY_PREFIX + "configs.";
913912
public static final Boolean DEFAULT_METRICS_REPORTING_OPENTELEMETRY_ENABLED = false;
914913

915-
public static final String METRICS_REPORTING_OPENTELEMETRY_ENDPOINT =
916-
METRICS_CONFIGURATIONS_PREFIX + "reporting.opentelemetry.endpoint";
914+
public static final String METRICS_REPORTING_OPENTELEMETRY_ENDPOINT = METRICS_REPORTING_OPENTELEMETRY_PREFIX + "endpoint";
917915

918-
public static final String METRICS_REPORTING_OPENTELEMETRY_INTERVAL_MILLIS =
919-
METRICS_CONFIGURATIONS_PREFIX + "reporting.opentelemetry.interval.millis";
916+
// Headers to add to the OpenTelemetry HTTP Exporter, formatted as a JSON String with string keys and values
917+
public static final String METRICS_REPORTING_OPENTELEMETRY_HEADERS = METRICS_REPORTING_OPENTELEMETRY_PREFIX + "headers";
918+
919+
public static final String METRICS_REPORTING_OPENTELEMETRY_INTERVAL_MILLIS = METRICS_CONFIGURATIONS_PREFIX + "interval.millis";
920920

921921
/**
922922
* Rest server configuration properties.

gobblin-data-management/src/main/java/org/apache/gobblin/data/management/dataset/DatasetUtils.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -123,7 +123,7 @@ public static PathFilter instantiatePathFilter(Properties props) {
123123
try {
124124
Class<?> pathFilterClass = Class.forName(props.getProperty(PATH_FILTER_KEY));
125125
return (PathFilter) GobblinConstructorUtils.invokeLongestConstructor(pathFilterClass,
126-
PropertiesUtils.extractPropertiesWithPrefixAfterRemovingPrefix(props, CONFIGURATION_KEY_PREFIX));
126+
PropertiesUtils.extractChildProperties(props, CONFIGURATION_KEY_PREFIX));
127127
} catch (ReflectiveOperationException exception) {
128128
throw new RuntimeException(exception);
129129
}

gobblin-metrics-libs/gobblin-metrics/src/main/java/org/apache/gobblin/metrics/OpenTelemetryMetrics.java

Lines changed: 31 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -18,9 +18,12 @@
1818
package org.apache.gobblin.metrics;
1919

2020
import java.time.Duration;
21+
import java.util.HashMap;
22+
import java.util.Map;
2123
import java.util.Properties;
2224

23-
import com.google.common.base.Optional;
25+
import com.fasterxml.jackson.databind.ObjectMapper;
26+
import com.google.common.base.Preconditions;
2427

2528
import io.opentelemetry.api.common.AttributeKey;
2629
import io.opentelemetry.api.common.Attributes;
@@ -32,6 +35,7 @@
3235
import io.opentelemetry.sdk.metrics.export.MetricExporter;
3336
import io.opentelemetry.sdk.metrics.export.PeriodicMetricReader;
3437
import io.opentelemetry.sdk.resources.Resource;
38+
import lombok.extern.slf4j.Slf4j;
3539

3640
import org.apache.gobblin.configuration.ConfigurationKeys;
3741
import org.apache.gobblin.configuration.State;
@@ -40,20 +44,32 @@
4044
* A metrics reporter wrapper that uses the OpenTelemetry standard to emit metrics
4145
* Currently separated from the legacy codehale metrics as we need to maintain backwards compatibility, but eventually
4246
* can replace the old metrics system with tighter integrations once it's stable
47+
* Defaults to using the HTTP exporter where it expects an endpoint and optional headers in JSON string format
4348
*/
4449

50+
@Slf4j
4551
public class OpenTelemetryMetrics extends OpenTelemetryMetricsBase {
4652

4753
private static OpenTelemetryMetrics GLOBAL_INSTANCE;
4854
private static final Long DEFAULT_OPENTELEMETRY_REPORTING_INTERVAL_MILLIS = 10000L;
55+
4956
private OpenTelemetryMetrics(State state) {
5057
super(state);
5158
}
5259

5360
@Override
5461
protected MetricExporter initializeMetricExporter(State state) {
62+
Preconditions.checkArgument(state.contains(ConfigurationKeys.METRICS_REPORTING_OPENTELEMETRY_ENDPOINT),
63+
"OpenTelemetry endpoint must be provided");
5564
OtlpHttpMetricExporterBuilder httpExporterBuilder = OtlpHttpMetricExporter.builder();
5665
httpExporterBuilder.setEndpoint(state.getProp(ConfigurationKeys.METRICS_REPORTING_OPENTELEMETRY_ENDPOINT));
66+
67+
if (state.contains(ConfigurationKeys.METRICS_REPORTING_OPENTELEMETRY_HEADERS)) {
68+
Map<String, String> headers = parseHttpHeaders(state.getProp(ConfigurationKeys.METRICS_REPORTING_OPENTELEMETRY_HEADERS));
69+
for (Map.Entry<String, String> header : headers.entrySet()) {
70+
httpExporterBuilder.addHeader(header.getKey(), header.getValue());
71+
}
72+
}
5773
return httpExporterBuilder.build();
5874
}
5975

@@ -67,8 +83,9 @@ public static OpenTelemetryMetrics getInstance(State state) {
6783

6884
@Override
6985
protected void initialize(State state) {
70-
Properties metricProps = PropertiesUtils.extractPropertiesWithPrefix(state.getProperties(), Optional.of(
71-
ConfigurationKeys.METRICS_REPORTING_OPENTELEMETRY_CONFIGS));
86+
log.info("Initializing OpenTelemetry metrics");
87+
Properties metricProps = PropertiesUtils.extractChildProperties(state.getProperties(),
88+
ConfigurationKeys.METRICS_REPORTING_OPENTELEMETRY_CONFIGS_PREFIX);
7289
AttributesBuilder attributesBuilder = Attributes.builder();
7390
for (String key : metricProps.stringPropertyNames()) {
7491
attributesBuilder.put(AttributeKey.stringKey(key), metricProps.getProperty(key));
@@ -87,4 +104,15 @@ protected void initialize(State state) {
87104

88105
this.openTelemetry = OpenTelemetrySdk.builder().setMeterProvider(meterProvider).buildAndRegisterGlobal();
89106
}
107+
108+
protected static Map<String, String> parseHttpHeaders(String headersString) {
109+
try {
110+
ObjectMapper mapper = new ObjectMapper();
111+
return mapper.readValue(headersString, HashMap.class);
112+
} catch (Exception e) {
113+
String errMsg = "Failed to parse headers: " + headersString;
114+
log.error(errMsg, e);
115+
throw new RuntimeException(errMsg);
116+
}
117+
}
90118
}
Lines changed: 64 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,64 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.gobblin.metrics;
19+
20+
import java.util.Map;
21+
22+
import org.testng.Assert;
23+
import org.testng.annotations.Test;
24+
25+
import org.apache.gobblin.configuration.ConfigurationKeys;
26+
import org.apache.gobblin.configuration.State;
27+
28+
29+
public class OpenTelemetryMetricsTest {
30+
31+
@Test
32+
public void testInitializeOpenTelemetryFailsWithoutEndpoint() {
33+
State opentelemetryState = new State();
34+
opentelemetryState.setProp(ConfigurationKeys.METRICS_REPORTING_OPENTELEMETRY_ENABLED, "true");
35+
Assert.assertThrows(IllegalArgumentException.class, () -> {
36+
OpenTelemetryMetrics.getInstance(opentelemetryState);
37+
});
38+
}
39+
40+
@Test
41+
public void testInitializeOpenTelemetrySucceedsWithEndpoint() {
42+
State opentelemetryState = new State();
43+
opentelemetryState.setProp(ConfigurationKeys.METRICS_REPORTING_OPENTELEMETRY_ENABLED, "true");
44+
opentelemetryState.setProp(ConfigurationKeys.METRICS_REPORTING_OPENTELEMETRY_ENDPOINT, "http://localhost:4317");
45+
// Should not throw an exception
46+
OpenTelemetryMetrics.getInstance(opentelemetryState);
47+
Assert.assertTrue(true);
48+
}
49+
50+
@Test
51+
public void testHeadersParseCorrectly() {
52+
Map<String, String> headers = OpenTelemetryMetrics.parseHttpHeaders(
53+
"{\"Content-Type\":\"application/x-protobuf\",\"headerTag\":\"tag1:value1,tag2:value2\"}");
54+
Assert.assertEquals(headers.size(), 2);
55+
Assert.assertEquals(headers.get("Content-Type"), "application/x-protobuf");
56+
Assert.assertEquals(headers.get("headerTag"), "tag1:value1,tag2:value2");
57+
}
58+
59+
@Test
60+
void testHeadersParseNull() {
61+
Map<String, String> headers = OpenTelemetryMetrics.parseHttpHeaders("{}");
62+
Assert.assertEquals(headers.size(), 0);
63+
}
64+
}

gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/GaaSObservabilityEventProducer.java

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -65,9 +65,8 @@ public abstract class GaaSObservabilityEventProducer implements Closeable {
6565
public static final String GAAS_OBSERVABILITY_EVENT_PRODUCER_CLASS_KEY = GAAS_OBSERVABILITY_EVENT_PRODUCER_PREFIX + "class.name";
6666
public static final String DEFAULT_GAAS_OBSERVABILITY_EVENT_PRODUCER_CLASS = NoopGaaSObservabilityEventProducer.class.getName();
6767
public static final String ISSUES_READ_FAILED_METRIC_NAME = GAAS_OBSERVABILITY_EVENT_PRODUCER_PREFIX + "getIssuesFailedCount";
68-
public static final String GAAS_OBSERVABILITY_METRICS_PREFIX = GAAS_OBSERVABILITY_EVENT_PRODUCER_PREFIX + "metrics.";
69-
public static final String GAAS_OBSERVABILITY_JOB_STATUS_METRIC_NAME = "gaas.observability.jobStatus";
70-
public static final String GAAS_OBSERVABILITY_GROUP_NAME = GAAS_OBSERVABILITY_METRICS_PREFIX + "groupName";
68+
public static final String GAAS_OBSERVABILITY_METRICS_GROUPNAME = GAAS_OBSERVABILITY_EVENT_PRODUCER_PREFIX + "metrics";
69+
public static final String GAAS_OBSERVABILITY_JOB_STATUS_METRIC_NAME = "jobStatus";
7170

7271
protected MetricContext metricContext;
7372
protected State state;
@@ -99,17 +98,18 @@ protected OpenTelemetryMetricsBase getOpentelemetryMetrics(State state) {
9998
private void setupMetrics(State state) {
10099
this.opentelemetryMetrics = getOpentelemetryMetrics(state);
101100
if (this.opentelemetryMetrics != null) {
102-
this.jobStatusMetric = this.opentelemetryMetrics.getMeter(state.getProp(GAAS_OBSERVABILITY_GROUP_NAME))
101+
this.jobStatusMetric = this.opentelemetryMetrics.getMeter(GAAS_OBSERVABILITY_METRICS_GROUPNAME)
103102
.gaugeBuilder(GAAS_OBSERVABILITY_JOB_STATUS_METRIC_NAME)
104103
.ofLongs()
105104
.buildObserver();
106-
this.opentelemetryMetrics.getMeter(state.getProp(GAAS_OBSERVABILITY_GROUP_NAME))
105+
this.opentelemetryMetrics.getMeter(GAAS_OBSERVABILITY_METRICS_GROUPNAME)
107106
.batchCallback(() -> {
108107
for (GaaSObservabilityEventExperimental event : this.eventCollector) {
109108
Attributes tags = getEventAttributes(event);
110109
int status = event.getJobStatus() != JobStatus.SUCCEEDED ? 1 : 0;
111110
this.jobStatusMetric.record(status, tags);
112111
}
112+
log.info("Submitted {} job status events", this.eventCollector.size());
113113
// Empty the list of events as they are all emitted at this point.
114114
this.eventCollector.clear();
115115
}, this.jobStatusMetric);

gobblin-service/src/test/java/org/apache/gobblin/service/monitoring/GaaSObservabilityProducerTest.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -262,7 +262,7 @@ public void testMockProduceMetrics() throws Exception {
262262
// Check number of meters
263263
Assert.assertEquals(metrics.size(), 1);
264264
Map<String, MetricData > metricsByName = metrics.stream().collect(Collectors.toMap(metric -> metric.getName(), metricData -> metricData));
265-
MetricData jobStatusMetric = metricsByName.get("gaas.observability.jobStatus");
265+
MetricData jobStatusMetric = metricsByName.get("jobStatus");
266266
// Check the attributes of the metrics
267267
List<LongPointData> datapoints = jobStatusMetric.getLongGaugeData().getPoints().stream().collect(Collectors.toList());
268268
Assert.assertEquals(datapoints.size(), 2);

gobblin-utility/src/main/java/org/apache/gobblin/util/PropertiesUtils.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -157,7 +157,7 @@ public static Properties extractPropertiesWithPrefix(Properties properties, Opti
157157
* @param prefix of keys to be extracted
158158
* @return a {@link Properties} instance
159159
*/
160-
public static Properties extractPropertiesWithPrefixAfterRemovingPrefix(Properties properties, String prefix) {
160+
public static Properties extractChildProperties(Properties properties, String prefix) {
161161
Preconditions.checkNotNull(properties);
162162
Preconditions.checkNotNull(prefix);
163163

gobblin-utility/src/test/java/org/apache/gobblin/util/PropertiesUtilsTest.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -63,19 +63,19 @@ public void testExtractPropertiesWithPrefixAfterRemovingPrefix() {
6363
properties.setProperty("k2.kk", "v3");
6464

6565
// First prefix
66-
Properties extractedPropertiesK1 = PropertiesUtils.extractPropertiesWithPrefixAfterRemovingPrefix(properties, "k1.");
66+
Properties extractedPropertiesK1 = PropertiesUtils.extractChildProperties(properties, "k1.");
6767
Assert.assertEquals(extractedPropertiesK1.getProperty("kk1"), "v1");
6868
Assert.assertEquals(extractedPropertiesK1.getProperty("kk2"), "v2");
6969
Assert.assertTrue(!extractedPropertiesK1.containsKey("k2.kk"));
7070

7171
// Second prefix
72-
Properties extractedPropertiesK2 = PropertiesUtils.extractPropertiesWithPrefixAfterRemovingPrefix(properties, "k2");
72+
Properties extractedPropertiesK2 = PropertiesUtils.extractChildProperties(properties, "k2");
7373
Assert.assertTrue(!extractedPropertiesK2.containsKey("k1.kk1"));
7474
Assert.assertTrue(!extractedPropertiesK2.containsKey("k1.kk2"));
7575
Assert.assertEquals(extractedPropertiesK2.getProperty(".kk"), "v3");
7676

7777
// Missing prefix
78-
Properties extractedPropertiesK3 = PropertiesUtils.extractPropertiesWithPrefixAfterRemovingPrefix(properties, "k3");
78+
Properties extractedPropertiesK3 = PropertiesUtils.extractChildProperties(properties, "k3");
7979
Assert.assertTrue(!extractedPropertiesK3.containsKey("k1.kk1"));
8080
Assert.assertTrue(!extractedPropertiesK3.containsKey("k1.kk1"));
8181
Assert.assertTrue(!extractedPropertiesK3.containsKey("k2.kk"));

0 commit comments

Comments
 (0)