|
19 | 19 | import com.google.common.util.concurrent.Futures;
|
20 | 20 | import com.google.inject.Inject;
|
21 | 21 | import io.airlift.bootstrap.LifeCycleManager;
|
| 22 | +import io.airlift.configuration.validation.FileExists; |
22 | 23 | import io.airlift.http.client.BodyGenerator;
|
23 | 24 | import io.airlift.http.client.HttpClient;
|
24 | 25 | import io.airlift.http.client.Request;
|
| 26 | +import io.airlift.http.client.StatusResponseHandler.StatusResponse; |
25 | 27 | import io.airlift.json.JsonCodec;
|
26 | 28 | import io.airlift.log.Logger;
|
27 | 29 | import io.airlift.units.Duration;
|
|
31 | 33 | import io.trino.spi.eventlistener.SplitCompletedEvent;
|
32 | 34 | import jakarta.annotation.PreDestroy;
|
33 | 35 |
|
| 36 | +import java.io.File; |
| 37 | +import java.io.FileInputStream; |
| 38 | +import java.io.IOException; |
| 39 | +import java.io.UncheckedIOException; |
34 | 40 | import java.net.URI;
|
35 | 41 | import java.net.URISyntaxException;
|
36 | 42 | import java.util.Map;
|
| 43 | +import java.util.Optional; |
| 44 | +import java.util.Properties; |
37 | 45 | import java.util.concurrent.ScheduledExecutorService;
|
38 | 46 | import java.util.concurrent.TimeUnit;
|
39 | 47 |
|
40 | 48 | import static com.google.common.base.Verify.verify;
|
| 49 | +import static com.google.common.collect.ImmutableMap.toImmutableMap; |
41 | 50 | import static com.google.common.net.HttpHeaders.CONTENT_TYPE;
|
42 | 51 | import static com.google.common.net.MediaType.JSON_UTF_8;
|
43 | 52 | import static io.airlift.concurrent.Threads.daemonThreadsNamed;
|
44 | 53 | import static io.airlift.http.client.JsonBodyGenerator.jsonBodyGenerator;
|
45 |
| -import static io.airlift.http.client.StatusResponseHandler.StatusResponse; |
46 | 54 | import static io.airlift.http.client.StatusResponseHandler.createStatusResponseHandler;
|
47 | 55 | import static java.util.Objects.requireNonNull;
|
48 | 56 | import static java.util.concurrent.Executors.newSingleThreadScheduledExecutor;
|
@@ -73,6 +81,7 @@ public class HttpEventListener
|
73 | 81 | private final Duration maxDelay;
|
74 | 82 | private final double backoffBase;
|
75 | 83 | private final Map<String, String> httpHeaders;
|
| 84 | + private final Optional<@FileExists File> httpHeadersConfigFile; |
76 | 85 | private final URI ingestUri;
|
77 | 86 | private final HttpEventListenerHttpMethod httpMethod;
|
78 | 87 | private final ScheduledExecutorService executor;
|
@@ -102,6 +111,7 @@ public HttpEventListener(
|
102 | 111 | this.backoffBase = config.getBackoffBase();
|
103 | 112 | this.httpMethod = config.getHttpMethod();
|
104 | 113 | this.httpHeaders = ImmutableMap.copyOf(config.getHttpHeaders());
|
| 114 | + this.httpHeadersConfigFile = config.getHttpHeadersConfigFile(); |
105 | 115 |
|
106 | 116 | try {
|
107 | 117 | ingestUri = new URI(config.getIngestUri());
|
@@ -145,13 +155,22 @@ public void splitCompleted(SplitCompletedEvent splitCompletedEvent)
|
145 | 155 |
|
146 | 156 | private void sendLog(BodyGenerator eventBodyGenerator, String queryId)
|
147 | 157 | {
|
148 |
| - Request request = Request.builder() |
| 158 | + Request.Builder requestBuilder = Request.builder() |
149 | 159 | .setMethod(httpMethod.name())
|
150 |
| - .addHeaders(Multimaps.forMap(httpHeaders)) |
151 | 160 | .addHeader(CONTENT_TYPE, JSON_UTF_8.toString())
|
152 | 161 | .setUri(ingestUri)
|
153 |
| - .setBodyGenerator(eventBodyGenerator) |
154 |
| - .build(); |
| 162 | + .setBodyGenerator(eventBodyGenerator); |
| 163 | + |
| 164 | + httpHeadersConfigFile.ifPresentOrElse( |
| 165 | + file -> { |
| 166 | + Map<String, String> fileHeaders = loadHttpHeadersFromFile(file); |
| 167 | + requestBuilder.addHeaders(Multimaps.forMap(fileHeaders)); |
| 168 | + }, |
| 169 | + () -> { |
| 170 | + requestBuilder.addHeaders(Multimaps.forMap(httpHeaders)); |
| 171 | + }); |
| 172 | + |
| 173 | + Request request = requestBuilder.build(); |
155 | 174 |
|
156 | 175 | attemptToSend(request, 0, Duration.valueOf("0s"), queryId);
|
157 | 176 | }
|
@@ -249,6 +268,22 @@ private Duration nextDelay(Duration delay)
|
249 | 268 | return newDuration;
|
250 | 269 | }
|
251 | 270 |
|
| 271 | + public static Map<String, String> loadHttpHeadersFromFile(File file) |
| 272 | + { |
| 273 | + Properties properties = new Properties(); |
| 274 | + try (FileInputStream fis = new FileInputStream(file)) { |
| 275 | + properties.load(fis); |
| 276 | + } |
| 277 | + catch (IOException e) { |
| 278 | + throw new UncheckedIOException("Failed to read HTTP headers config file: " + file, e); |
| 279 | + } |
| 280 | + |
| 281 | + return properties.entrySet().stream() |
| 282 | + .collect(toImmutableMap( |
| 283 | + e -> e.getKey().toString(), |
| 284 | + e -> e.getValue().toString())); |
| 285 | + } |
| 286 | + |
252 | 287 | @Override
|
253 | 288 | public void shutdown()
|
254 | 289 | {
|
|
0 commit comments