diff --git a/proxy-server/src/main/java/io/micrometer/prometheus/rsocket/PrometheusController.java b/proxy-server/src/main/java/io/micrometer/prometheus/rsocket/PrometheusController.java index ff600f8..adb3956 100644 --- a/proxy-server/src/main/java/io/micrometer/prometheus/rsocket/PrometheusController.java +++ b/proxy-server/src/main/java/io/micrometer/prometheus/rsocket/PrometheusController.java @@ -32,6 +32,7 @@ import io.rsocket.util.DefaultPayload; import org.springframework.stereotype.Controller; import org.springframework.web.bind.annotation.GetMapping; +import org.springframework.web.bind.annotation.RequestHeader; import org.springframework.web.bind.annotation.RestController; import io.rsocket.util.EmptyPayload; @@ -48,6 +49,7 @@ import java.nio.channels.ClosedChannelException; import java.security.*; import java.security.spec.PKCS8EncodedKeySpec; +import java.time.Duration; import java.util.Map; import java.util.concurrent.ConcurrentHashMap; import java.util.stream.Collectors; @@ -157,15 +159,20 @@ public Mono proxyMetrics() { } @GetMapping(value = "/metrics/connected", produces = "text/plain") - public Mono prometheus() { + public Mono prometheus(@RequestHeader(value = "X-Prometheus-Scrape-Timeout-Seconds", required = false) String timeoutHeader) { + Duration timeout = determineTimeout(timeoutHeader); + return Flux .fromIterable(scrapableApps.entrySet()) .flatMap(socketAndState -> { ConnectionState connectionState = socketAndState.getValue(); RSocket rsocket = socketAndState.getKey(); Timer.Sample sample = Timer.start(); - return rsocket - .requestResponse(connectionState.createKeyPayload()) + Mono request = rsocket.requestResponse(connectionState.createKeyPayload()); + if (timeout != null) { + request = request.timeout(timeout); + } + return request .map(payload -> connectionState.receiveScrapePayload(payload, sample)) .onErrorResume(throwable -> { scrapableApps.remove(rsocket); @@ -184,6 +191,25 @@ public Mono prometheus() { .collect(Collectors.joining("\n")); } + private Duration determineTimeout(String timeoutHeader) { + if (timeoutHeader == null) { + return null; + } + + try { + Duration timeout = Duration + .ofMillis((long) (Double.parseDouble(timeoutHeader) * 1_000)) + .minus(properties.getTimeoutOffset()); + + if (timeout.isNegative() || timeout.isZero()) { + return null; + } + return timeout; + } catch (NumberFormatException e) { + return null; + } + } + class ConnectionState { private final KeyPair keyPair; diff --git a/proxy-server/src/main/java/io/micrometer/prometheus/rsocket/PrometheusControllerProperties.java b/proxy-server/src/main/java/io/micrometer/prometheus/rsocket/PrometheusControllerProperties.java index 936cc95..defa3e7 100644 --- a/proxy-server/src/main/java/io/micrometer/prometheus/rsocket/PrometheusControllerProperties.java +++ b/proxy-server/src/main/java/io/micrometer/prometheus/rsocket/PrometheusControllerProperties.java @@ -17,12 +17,16 @@ import org.springframework.boot.context.properties.ConfigurationProperties; +import java.time.Duration; + /** * @author Christian Tzolov */ @ConfigurationProperties("micrometer.prometheus-proxy") public class PrometheusControllerProperties { + private static final Duration DEFAULT_TIMEOUT_OFFSET = Duration.ZERO; + /** * Proxy accept TCP port. */ @@ -33,6 +37,11 @@ public class PrometheusControllerProperties { */ private int websocketPort = 8081; + /** + * Scrape timeout offset. + */ + private Duration timeoutOffset = DEFAULT_TIMEOUT_OFFSET; + public int getTcpPort() { return tcpPort; } @@ -48,4 +57,16 @@ public int getWebsocketPort() { public void setWebsocketPort(int websocketPort) { this.websocketPort = websocketPort; } + + public Duration getTimeoutOffset() { + return timeoutOffset; + } + + public void setTimeoutOffset(Duration timeoutOffset) { + if (timeoutOffset == null || timeoutOffset.isNegative()) { + this.timeoutOffset = DEFAULT_TIMEOUT_OFFSET; + } else { + this.timeoutOffset = timeoutOffset; + } + } }