From e947416f6c7005dc32f2851bc647922d84a432c6 Mon Sep 17 00:00:00 2001 From: "J.D. LaFayette" Date: Fri, 16 Sep 2022 13:49:39 -0700 Subject: [PATCH 1/3] Set scrape timeout based on Prometheus header --- .../rsocket/PrometheusController.java | 28 ++++++++++++++++++- .../PrometheusControllerProperties.java | 21 ++++++++++++++ 2 files changed, 48 insertions(+), 1 deletion(-) diff --git a/proxy-server/src/main/java/io/micrometer/prometheus/rsocket/PrometheusController.java b/proxy-server/src/main/java/io/micrometer/prometheus/rsocket/PrometheusController.java index ff600f8..dd18266 100644 --- a/proxy-server/src/main/java/io/micrometer/prometheus/rsocket/PrometheusController.java +++ b/proxy-server/src/main/java/io/micrometer/prometheus/rsocket/PrometheusController.java @@ -32,6 +32,7 @@ import io.rsocket.util.DefaultPayload; import org.springframework.stereotype.Controller; import org.springframework.web.bind.annotation.GetMapping; +import org.springframework.web.bind.annotation.RequestHeader; import org.springframework.web.bind.annotation.RestController; import io.rsocket.util.EmptyPayload; @@ -48,6 +49,7 @@ import java.nio.channels.ClosedChannelException; import java.security.*; import java.security.spec.PKCS8EncodedKeySpec; +import java.time.Duration; import java.util.Map; import java.util.concurrent.ConcurrentHashMap; import java.util.stream.Collectors; @@ -60,6 +62,7 @@ */ @RestController public class PrometheusController { + private static final Duration NO_TIMEOUT = Duration.ofSeconds(Long.MAX_VALUE); private final PrometheusMeterRegistry meterRegistry; private final Timer scrapeTimerSuccess; private final Timer scrapeTimerClosed; @@ -157,7 +160,9 @@ public Mono proxyMetrics() { } @GetMapping(value = "/metrics/connected", produces = "text/plain") - public Mono prometheus() { + public Mono prometheus(@RequestHeader(value = "X-Prometheus-Scrape-Timeout-Seconds", required = false) String timeoutHeader) { + Duration timeout = determineTimeout(timeoutHeader); + return Flux .fromIterable(scrapableApps.entrySet()) .flatMap(socketAndState -> { @@ -167,6 +172,7 @@ public Mono prometheus() { return rsocket .requestResponse(connectionState.createKeyPayload()) .map(payload -> connectionState.receiveScrapePayload(payload, sample)) + .timeout(timeout) .onErrorResume(throwable -> { scrapableApps.remove(rsocket); @@ -184,6 +190,26 @@ public Mono prometheus() { .collect(Collectors.joining("\n")); } + private Duration determineTimeout(String timeoutHeader) { + if (timeoutHeader == null) { + return NO_TIMEOUT; + } + + try { + Double timeoutMillis = Double.parseDouble(timeoutHeader) * 1E3; + Duration timeout = Duration + .ofMillis(timeoutMillis.longValue()) + .minus(properties.getTimeoutOffset()); + + if (timeout.isNegative() || timeout.isZero()) { + return NO_TIMEOUT; + } + return timeout; + } catch (NumberFormatException e) { + return NO_TIMEOUT; + } + } + class ConnectionState { private final KeyPair keyPair; diff --git a/proxy-server/src/main/java/io/micrometer/prometheus/rsocket/PrometheusControllerProperties.java b/proxy-server/src/main/java/io/micrometer/prometheus/rsocket/PrometheusControllerProperties.java index 936cc95..defa3e7 100644 --- a/proxy-server/src/main/java/io/micrometer/prometheus/rsocket/PrometheusControllerProperties.java +++ b/proxy-server/src/main/java/io/micrometer/prometheus/rsocket/PrometheusControllerProperties.java @@ -17,12 +17,16 @@ import org.springframework.boot.context.properties.ConfigurationProperties; +import java.time.Duration; + /** * @author Christian Tzolov */ @ConfigurationProperties("micrometer.prometheus-proxy") public class PrometheusControllerProperties { + private static final Duration DEFAULT_TIMEOUT_OFFSET = Duration.ZERO; + /** * Proxy accept TCP port. */ @@ -33,6 +37,11 @@ public class PrometheusControllerProperties { */ private int websocketPort = 8081; + /** + * Scrape timeout offset. + */ + private Duration timeoutOffset = DEFAULT_TIMEOUT_OFFSET; + public int getTcpPort() { return tcpPort; } @@ -48,4 +57,16 @@ public int getWebsocketPort() { public void setWebsocketPort(int websocketPort) { this.websocketPort = websocketPort; } + + public Duration getTimeoutOffset() { + return timeoutOffset; + } + + public void setTimeoutOffset(Duration timeoutOffset) { + if (timeoutOffset == null || timeoutOffset.isNegative()) { + this.timeoutOffset = DEFAULT_TIMEOUT_OFFSET; + } else { + this.timeoutOffset = timeoutOffset; + } + } } From 6f723474ad5cbe8630cabbb4e388f6508271e0e3 Mon Sep 17 00:00:00 2001 From: "J.D. LaFayette" Date: Wed, 12 Oct 2022 17:26:24 -0700 Subject: [PATCH 2/3] simplify construction of timeout duration --- .../io/micrometer/prometheus/rsocket/PrometheusController.java | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/proxy-server/src/main/java/io/micrometer/prometheus/rsocket/PrometheusController.java b/proxy-server/src/main/java/io/micrometer/prometheus/rsocket/PrometheusController.java index dd18266..4e36b9d 100644 --- a/proxy-server/src/main/java/io/micrometer/prometheus/rsocket/PrometheusController.java +++ b/proxy-server/src/main/java/io/micrometer/prometheus/rsocket/PrometheusController.java @@ -196,9 +196,8 @@ private Duration determineTimeout(String timeoutHeader) { } try { - Double timeoutMillis = Double.parseDouble(timeoutHeader) * 1E3; Duration timeout = Duration - .ofMillis(timeoutMillis.longValue()) + .ofMillis((long) (Double.parseDouble(timeoutHeader) * 1_000)) .minus(properties.getTimeoutOffset()); if (timeout.isNegative() || timeout.isZero()) { From 82dd2d5982454350db098a7c0dc83b708766bc06 Mon Sep 17 00:00:00 2001 From: "J.D. LaFayette" Date: Thu, 13 Oct 2022 16:00:41 -0700 Subject: [PATCH 3/3] apply timeout only if it is determined successfully --- .../prometheus/rsocket/PrometheusController.java | 15 ++++++++------- 1 file changed, 8 insertions(+), 7 deletions(-) diff --git a/proxy-server/src/main/java/io/micrometer/prometheus/rsocket/PrometheusController.java b/proxy-server/src/main/java/io/micrometer/prometheus/rsocket/PrometheusController.java index 4e36b9d..adb3956 100644 --- a/proxy-server/src/main/java/io/micrometer/prometheus/rsocket/PrometheusController.java +++ b/proxy-server/src/main/java/io/micrometer/prometheus/rsocket/PrometheusController.java @@ -62,7 +62,6 @@ */ @RestController public class PrometheusController { - private static final Duration NO_TIMEOUT = Duration.ofSeconds(Long.MAX_VALUE); private final PrometheusMeterRegistry meterRegistry; private final Timer scrapeTimerSuccess; private final Timer scrapeTimerClosed; @@ -169,10 +168,12 @@ public Mono prometheus(@RequestHeader(value = "X-Prometheus-Scrape-Timeo ConnectionState connectionState = socketAndState.getValue(); RSocket rsocket = socketAndState.getKey(); Timer.Sample sample = Timer.start(); - return rsocket - .requestResponse(connectionState.createKeyPayload()) + Mono request = rsocket.requestResponse(connectionState.createKeyPayload()); + if (timeout != null) { + request = request.timeout(timeout); + } + return request .map(payload -> connectionState.receiveScrapePayload(payload, sample)) - .timeout(timeout) .onErrorResume(throwable -> { scrapableApps.remove(rsocket); @@ -192,7 +193,7 @@ public Mono prometheus(@RequestHeader(value = "X-Prometheus-Scrape-Timeo private Duration determineTimeout(String timeoutHeader) { if (timeoutHeader == null) { - return NO_TIMEOUT; + return null; } try { @@ -201,11 +202,11 @@ private Duration determineTimeout(String timeoutHeader) { .minus(properties.getTimeoutOffset()); if (timeout.isNegative() || timeout.isZero()) { - return NO_TIMEOUT; + return null; } return timeout; } catch (NumberFormatException e) { - return NO_TIMEOUT; + return null; } }