diff --git a/docs/modules/ROOT/pages/http-server.adoc b/docs/modules/ROOT/pages/http-server.adoc index d1d67df61d..63ae735317 100644 --- a/docs/modules/ROOT/pages/http-server.adoc +++ b/docs/modules/ROOT/pages/http-server.adoc @@ -786,6 +786,35 @@ include::{examples-dir}/idle/timeout/Application.java[lines=18..35] ---- <1> Configures the default idle timeout to 1 second. +==== Configuring Ping Frame for HTTP/2 Health Check +When using the HTTP/2 protocol, it is recommended to configure a *PING* frame to maintain the connection and ensure timely health checks. +The HttpServer in Reactor Netty allows setting up a *PING* frame to prevent idle connections from being prematurely closed when *idleTimeout* is configured. + +[NOTE] +==== +To enable HTTP/2 PING frame-based health checks, you must configure `idleTimeout`. +Without `idleTimeout`, the connection may remain open indefinitely, preventing proper detection of inactive or unresponsive connections. +Setting an appropriate `idleTimeout` ensures that PING-based health checks can effectively terminate unresponsive connections. +==== + +*Benefits of Using PING Frames* + +- Actively monitors connection health by checking real-time responses to *PING* frames. +- Ensures that health checks detect unresponsive connections quickly. +- Helps maintain long-lived connections in an efficient manner. + +To enable *PING* frames for HTTP/2 connections, configure the HttpServer as follows: + +{examples-link}/liveness/Application.java +---- +include::{examples-dir}/liveness/Application.java[lines=26..50] +---- +<1> To set up a health check using HTTP2 Ping frame, `idleTimeout` must be set first. +<2> Sets the interval for sending `HTTP/2` `PING` frames and receiving `ACK` responses +<3> Sets the execution interval for the scheduler that sends `HTTP/2` `PING frames and periodically checks for `ACK` responses +<4> Sets the threshold for retrying `HTTP/2` `PING` frame transmissions. + + [[http-server-ssl-tls-timeout]] === SSL/TLS Timeout `HttpServer` supports the SSL/TLS functionality provided by Netty. diff --git a/reactor-netty-examples/src/main/java/reactor/netty/examples/documentation/http/server/liveness/Application.java b/reactor-netty-examples/src/main/java/reactor/netty/examples/documentation/http/server/liveness/Application.java new file mode 100644 index 0000000000..ad75804f73 --- /dev/null +++ b/reactor-netty-examples/src/main/java/reactor/netty/examples/documentation/http/server/liveness/Application.java @@ -0,0 +1,50 @@ +/* + * Copyright (c) 2025 VMware, Inc. or its affiliates, All Rights Reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package reactor.netty.examples.documentation.http.server.liveness; + +import reactor.netty.DisposableServer; +import reactor.netty.http.Http2SslContextSpec; +import reactor.netty.http.HttpProtocol; +import reactor.netty.http.server.HttpServer; + +import java.io.File; +import java.time.Duration; + +public class Application { + + public static void main(String[] args) { + File cert = new File("certificate.crt"); + File key = new File("private.key"); + + Http2SslContextSpec http2SslContextSpec = Http2SslContextSpec.forServer(cert, key); + + DisposableServer server = + HttpServer.create() + .port(8080) + .protocol(HttpProtocol.H2) + .secure(spec -> spec.sslContext(http2SslContextSpec)) + .idleTimeout(Duration.ofSeconds(1)) //<1> + .http2Settings( + builder -> builder.pingAckTimeout(Duration.ofMillis(600)) // <2> + .pingScheduleInterval(Duration.ofMillis(300)) // <3> + .pingAckDropThreshold(2) // <4> + ) + .bindNow(); + + server.onDispose() + .block(); + } +} diff --git a/reactor-netty-http/src/main/java/reactor/netty/http/Http2ConnectionLiveness.java b/reactor-netty-http/src/main/java/reactor/netty/http/Http2ConnectionLiveness.java new file mode 100644 index 0000000000..f31cf4e089 --- /dev/null +++ b/reactor-netty-http/src/main/java/reactor/netty/http/Http2ConnectionLiveness.java @@ -0,0 +1,292 @@ +/* + * Copyright (c) 2025 VMware, Inc. or its affiliates, All Rights Reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package reactor.netty.http; + +import io.netty.channel.Channel; +import io.netty.channel.ChannelFuture; +import io.netty.channel.ChannelFutureListener; +import io.netty.channel.ChannelHandlerContext; +import io.netty.handler.codec.http2.Http2DataFrame; +import io.netty.handler.codec.http2.Http2FrameCodec; +import io.netty.handler.codec.http2.Http2FrameWriter; +import io.netty.handler.codec.http2.Http2PingFrame; +import reactor.util.Logger; +import reactor.util.Loggers; +import reactor.util.annotation.Nullable; + +import java.time.Duration; +import java.util.concurrent.ScheduledFuture; +import java.util.concurrent.ThreadLocalRandom; + +import static java.util.concurrent.TimeUnit.NANOSECONDS; + +/** + * Supports connection health checks using HTTP/2 Ping Frames. + * + *

Http2ConnectionLiveness sends a ping frame at the specified interval when no frame is being read or written, + * ensuring the connection health is monitored. If a ping ACK frame is not received within the configured interval, + * the connection will be closed.

+ * + *

Ping frame checking will not be performed while a read or write operation is in progress.

+ * + *

Be cautious when setting a very short interval, as it may cause the connection to be closed, + * even if the keep-alive setting is enabled.

+ * + *

If no interval is specified, no ping frame checking will be performed.

+ * + * @author raccoonback + * @since 1.2.5 + */ +public final class Http2ConnectionLiveness implements HttpConnectionLiveness { + + static final Logger log = Loggers.getLogger(Http2ConnectionLiveness.class); + + private ScheduledFuture pingScheduler; + + private final ChannelFutureListener pingWriteListener = new PingWriteListener(); + private final Http2FrameWriter http2FrameWriter; + private final long pingAckTimeoutNanos; + private final long pingScheduleIntervalNanos; + private final int pingAckDropThreshold; + + private int pingAckDropCount; + private long lastSentPingData; + private long lastReceivedPingTime; + private long lastSendingPingTime; + private boolean isPingAckPending; + + /** + * Constructs a new {@code Http2ConnectionLiveness} instance. + * + * @param http2FrameCodec the HTTP/2 frame codec + * @param pingAckTimeout the ping ACK timeout duration + * @param pingScheduleInterval the ping schedule interval duration + * @param pingAckDropThreshold the ping ACK drop threshold + */ + public Http2ConnectionLiveness( + Http2FrameCodec http2FrameCodec, + @Nullable Duration pingAckTimeout, + @Nullable Duration pingScheduleInterval, + @Nullable Integer pingAckDropThreshold + ) { + this.http2FrameWriter = http2FrameCodec.encoder() + .frameWriter(); + + if (pingAckTimeout != null) { + this.pingAckTimeoutNanos = pingAckTimeout.toNanos(); + } + else { + this.pingAckTimeoutNanos = 0L; + } + + if (pingScheduleInterval != null) { + this.pingScheduleIntervalNanos = pingScheduleInterval.toNanos(); + } + else { + this.pingScheduleIntervalNanos = 0L; + } + + if (pingAckDropThreshold != null) { + this.pingAckDropThreshold = pingAckDropThreshold; + } + else { + this.pingAckDropThreshold = 0; + } + } + + /** + * Checks the liveness of the connection and schedules a ping if necessary. + * + * @param ctx the {@link ChannelHandlerContext} of the connection + */ + @Override + @SuppressWarnings("FutureReturnValueIgnored") + public void check(ChannelHandlerContext ctx) { + if (isPingIntervalConfigured()) { + if (pingScheduler == null) { + isPingAckPending = false; + pingAckDropCount = 0; + pingScheduler = ctx.executor() + .schedule( + new PingTimeoutTask(ctx), + pingAckTimeoutNanos, + NANOSECONDS + ); + } + + return; + } + + ctx.close(); + } + + /** + * Receives a message from the peer and processes it if it is a ping frame. + * + * @param msg the message received from the peer + */ + @Override + public void receive(Object msg) { + if (msg instanceof Http2PingFrame) { + Http2PingFrame frame = (Http2PingFrame) msg; + if (frame.ack() && frame.content() == lastSentPingData) { + lastReceivedPingTime = System.nanoTime(); + } + } + + if (msg instanceof Http2DataFrame) { + cancel(); + } + } + + /** + * Cancels the scheduled ping task. + */ + @Override + public void cancel() { + if (pingScheduler != null) { + pingScheduler.cancel(false); + pingScheduler = null; + } + } + + private boolean isPingIntervalConfigured() { + return pingAckTimeoutNanos > 0 + && pingScheduleIntervalNanos > 0; + } + + /** + * A task that handles ping timeouts. + */ + class PingTimeoutTask implements Runnable { + + private final ChannelHandlerContext ctx; + + PingTimeoutTask(ChannelHandlerContext ctx) { + this.ctx = ctx; + } + + @Override + public void run() { + Channel channel = ctx.channel(); + if (channel == null || !channel.isOpen()) { + return; + } + + if (!isPingAckPending) { + if (log.isDebugEnabled()) { + log.debug("Attempting to send a ping frame to the channel: {}", channel); + } + + writePing(ctx); + pingScheduler = invokeNextSchedule(); + return; + } + + if (isOutOfTimeRange()) { + countPingDrop(); + + if (isExceedAckDropThreshold()) { + if (log.isInfoEnabled()) { + log.info("Closing the channel due to delayed ping frame response (timeout: {} ns). {}", pingAckTimeoutNanos, channel); + } + + close(); + return; + } + + if (log.isInfoEnabled()) { + log.info("Dropping ping ACK frame in channel (ping data: {}). channel: {}", lastSentPingData, channel); + } + + writePing(ctx); + pingScheduler = invokeNextSchedule(); + return; + } + + isPingAckPending = false; + pingAckDropCount = 0; + pingScheduler = invokeNextSchedule(); + } + + private void writePing(ChannelHandlerContext ctx) { + lastSentPingData = ThreadLocalRandom.current().nextLong(); + + http2FrameWriter + .writePing(ctx, false, lastSentPingData, ctx.newPromise()) + .addListener(pingWriteListener); + ctx.flush(); + } + + private boolean isOutOfTimeRange() { + return pingAckTimeoutNanos < Math.abs(lastReceivedPingTime - lastSendingPingTime); + } + + private void countPingDrop() { + pingAckDropCount++; + } + + private boolean isExceedAckDropThreshold() { + return pingAckDropCount > pingAckDropThreshold; + } + + private ScheduledFuture invokeNextSchedule() { + return ctx.executor() + .schedule( + this, + pingScheduleIntervalNanos, + NANOSECONDS + ); + } + + private void close() { + ctx.close() + .addListener(future -> { + if (future.isSuccess()) { + if (log.isDebugEnabled()) { + log.debug("Channel closed after liveness check: {}", ctx.channel()); + } + } + else if (log.isDebugEnabled()) { + log.debug("Failed to close the channel: {}. Cause: {}", ctx.channel(), future.cause()); + } + }); + } + } + + /** + * A listener that handles the completion of ping frame writes. + */ + private class PingWriteListener implements ChannelFutureListener { + + @Override + public void operationComplete(ChannelFuture future) { + if (future.isSuccess()) { + if (log.isDebugEnabled()) { + log.debug("Successfully wrote PING frame to the channel: {}", future.channel()); + } + + isPingAckPending = true; + lastSendingPingTime = System.nanoTime(); + } + else { + if (log.isDebugEnabled()) { + log.debug("Failed to write PING frame to the channel: {}", future.channel()); + } + } + } + } +} diff --git a/reactor-netty-http/src/main/java/reactor/netty/http/Http2SettingsSpec.java b/reactor-netty-http/src/main/java/reactor/netty/http/Http2SettingsSpec.java index f09ab288da..21d0bbc06b 100644 --- a/reactor-netty-http/src/main/java/reactor/netty/http/Http2SettingsSpec.java +++ b/reactor-netty-http/src/main/java/reactor/netty/http/Http2SettingsSpec.java @@ -19,6 +19,7 @@ import io.netty.handler.codec.http2.Http2Settings; import reactor.util.annotation.Nullable; +import java.time.Duration; import java.util.Objects; /** @@ -102,6 +103,79 @@ public interface Builder { * @return {@code this} */ //Builder pushEnabled(boolean pushEnabled); + + /** + * Sets the interval for sending HTTP/2 PING frames and receiving ACK responses. + * + *

+ * This method configures the time interval at which PING frames are sent to the peer. + * The interval should be chosen carefully to balance between detecting connection issues + * and minimizing unnecessary network traffic. + *

+ * + *

+ * If the interval is set too short, it may cause excessive network overhead. + * If set too long, connection failures may not be detected in a timely manner. + *

+ * + * @param pingAckTimeout the interval in between consecutive PING frames + * and ACK responses. Must be a positive value. + */ + default Builder pingAckTimeout(Duration pingAckTimeout) { + return this; + } + + /** + * Sets the execution interval for the scheduler that sends HTTP/2 PING frames + * and periodically checks for ACK responses. + * + *

+ * This method configures the time interval at which the scheduler runs + * to send PING frames and verify if ACK responses are received within + * the expected timeframe. + * Proper tuning of this interval helps in detecting connection issues + * while avoiding unnecessary network overhead. + *

+ * + *

+ * If the interval is too short, it may increase network and CPU usage. + * Conversely, setting it too long may delay the detection of connection failures. + *

+ * + * @param pingScheduleInterval the interval in at which the scheduler executes. + * Must be a positive value. + */ + default Builder pingScheduleInterval(Duration pingScheduleInterval) { + return this; + } + + /** + * Sets the threshold for retrying HTTP/2 PING frame transmissions. + * + *

+ * This method defines the maximum number of attempts to send a PING frame + * before considering the connection as unresponsive. + * If the threshold is exceeded without receiving an ACK response, + * the connection may be closed or marked as unhealthy. + *

+ * + *

+ * A lower threshold can detect connection failures more quickly but may lead + * to premature disconnections. Conversely, a higher threshold allows more retries + * but may delay failure detection. + *

+ * + *

+ * If this value is not specified, it defaults to 0, meaning only one attempt to send a PING frame is made without retries. + *

+ * + * @param pingAckDropThreshold the maximum number of PING transmission attempts. + * Must be a positive integer. + * The default value is 0, meaning no retries will occur and only one PING frame will be sent. + */ + default Builder pingAckDropThreshold(Integer pingAckDropThreshold) { + return this; + } } /** @@ -196,6 +270,36 @@ public Boolean pushEnabled() { return pushEnabled; } + /** + * Returns the configured {@code pingAckTimeout} value or null. + * + * @return the configured {@code pingAckTimeout} value or null + */ + @Nullable + public Duration pingAckTimeout() { + return pingAckTimeout; + } + + /** + * Returns the configured {@code pingScheduleInterval} value or null. + * + * @return the configured {@code pingScheduleInterval} value or null + */ + @Nullable + public Duration pingScheduleInterval() { + return pingScheduleInterval; + } + + /** + * Returns the configured {@code pingAckDropThreshold} value or null. + * + * @return the configured {@code pingAckDropThreshold} value or null + */ + @Nullable + public Integer pingAckDropThreshold() { + return pingAckDropThreshold; + } + @Override public boolean equals(Object o) { if (this == o) { @@ -212,7 +316,10 @@ public boolean equals(Object o) { Objects.equals(maxFrameSize, that.maxFrameSize) && maxHeaderListSize.equals(that.maxHeaderListSize) && Objects.equals(maxStreams, that.maxStreams) && - Objects.equals(pushEnabled, that.pushEnabled); + Objects.equals(pushEnabled, that.pushEnabled) && + Objects.equals(pingAckTimeout, that.pingAckTimeout) && + Objects.equals(pingScheduleInterval, that.pingScheduleInterval) && + Objects.equals(pingAckDropThreshold, that.pingAckDropThreshold); } @Override @@ -226,6 +333,9 @@ public int hashCode() { result = 31 * result + (maxHeaderListSize == null ? 0 : Long.hashCode(maxHeaderListSize)); result = 31 * result + (maxStreams == null ? 0 : Long.hashCode(maxStreams)); result = 31 * result + (pushEnabled == null ? 0 : Boolean.hashCode(pushEnabled)); + result = 31 * result + (pingAckTimeout == null ? 0 : Objects.hashCode(pingAckTimeout)); + result = 31 * result + (pingScheduleInterval == null ? 0 : Objects.hashCode(pingScheduleInterval)); + result = 31 * result + (pingAckDropThreshold == null ? 0 : Integer.hashCode(pingAckDropThreshold)); return result; } @@ -237,6 +347,9 @@ public int hashCode() { final Long maxHeaderListSize; final Long maxStreams; final Boolean pushEnabled; + final Duration pingAckTimeout; + final Duration pingScheduleInterval; + final Integer pingAckDropThreshold; Http2SettingsSpec(Build build) { Http2Settings settings = build.http2Settings; @@ -254,11 +367,17 @@ public int hashCode() { maxHeaderListSize = settings.maxHeaderListSize(); maxStreams = build.maxStreams; pushEnabled = settings.pushEnabled(); + pingAckTimeout = build.pingAckTimeout; + pingScheduleInterval = build.pingScheduleInterval; + pingAckDropThreshold = build.pingAckDropThreshold; } static final class Build implements Builder { Boolean connectProtocolEnabled; Long maxStreams; + Duration pingAckTimeout; + Duration pingScheduleInterval; + Integer pingAckDropThreshold; final Http2Settings http2Settings = Http2Settings.defaultSettings(); @Override @@ -311,6 +430,24 @@ public Builder maxStreams(long maxStreams) { return this; } + @Override + public Builder pingAckTimeout(Duration pingAckTimeout) { + this.pingAckTimeout = pingAckTimeout; + return this; + } + + @Override + public Builder pingScheduleInterval(Duration pingScheduleInterval) { + this.pingScheduleInterval = pingScheduleInterval; + return this; + } + + @Override + public Builder pingAckDropThreshold(Integer pingAckDropThreshold) { + this.pingAckDropThreshold = pingAckDropThreshold; + return this; + } + /* @Override public Builder pushEnabled(boolean pushEnabled) { diff --git a/reactor-netty-http/src/main/java/reactor/netty/http/HttpConnectionImmediateClose.java b/reactor-netty-http/src/main/java/reactor/netty/http/HttpConnectionImmediateClose.java new file mode 100644 index 0000000000..eb226b2ffc --- /dev/null +++ b/reactor-netty-http/src/main/java/reactor/netty/http/HttpConnectionImmediateClose.java @@ -0,0 +1,70 @@ +/* + * Copyright (c) 2025 VMware, Inc. or its affiliates, All Rights Reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package reactor.netty.http; + +import io.netty.channel.ChannelHandlerContext; + +/** + * This class implements the {@link HttpConnectionLiveness} interface and provides + * a mechanism to immediately close the HTTP connection without checking its liveness. + * + *

+ * The methods in this class are no-ops except for the {@code check} method, which + * closes the connection. + *

+ * + * @author raccoonback + * @since 1.2.5 + */ +public final class HttpConnectionImmediateClose implements HttpConnectionLiveness { + + /** + * Constructs a new {@code HttpConnectionImmediateClose} instance. + */ + public HttpConnectionImmediateClose() { + } + + /** + * Cancels the HTTP connection. + * This method is a no-op. + */ + @Override + public void cancel() { + // no op + } + + /** + * Receives a message from the peer. + * This method is a no-op. + * + * @param msg the message received from the peer + */ + @Override + public void receive(Object msg) { + // no op + } + + /** + * Checks the liveness of the connection and closes it immediately. + * + * @param ctx the {@link ChannelHandlerContext} of the connection + */ + @Override + @SuppressWarnings("FutureReturnValueIgnored") + public void check(ChannelHandlerContext ctx) { + ctx.close(); + } +} diff --git a/reactor-netty-http/src/main/java/reactor/netty/http/HttpConnectionLiveness.java b/reactor-netty-http/src/main/java/reactor/netty/http/HttpConnectionLiveness.java new file mode 100644 index 0000000000..09c08e9d70 --- /dev/null +++ b/reactor-netty-http/src/main/java/reactor/netty/http/HttpConnectionLiveness.java @@ -0,0 +1,47 @@ +/* + * Copyright (c) 2025 VMware, Inc. or its affiliates, All Rights Reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package reactor.netty.http; + +import io.netty.channel.ChannelHandlerContext; + +/** + * Interface for checking the liveness of an HTTP connection. + * This interface provides methods to cancel the HTTP connection, + * process messages received from the peer, and start checking the liveness of the connection. + * + * @author raccoonback + * @since 1.2.5 + */ +public interface HttpConnectionLiveness { + + /** + * Cancel Http Connection by protocol. + * + */ + void cancel(); + + /** + * Receive the message received from peer. + * + */ + void receive(Object msg); + + /** + * Start checking liveness. + * + */ + void check(ChannelHandlerContext ctx); +} diff --git a/reactor-netty-http/src/main/java/reactor/netty/http/server/IdleTimeoutHandler.java b/reactor-netty-http/src/main/java/reactor/netty/http/IdleTimeoutHandler.java similarity index 50% rename from reactor-netty-http/src/main/java/reactor/netty/http/server/IdleTimeoutHandler.java rename to reactor-netty-http/src/main/java/reactor/netty/http/IdleTimeoutHandler.java index d31f6310b8..e2128582b3 100644 --- a/reactor-netty-http/src/main/java/reactor/netty/http/server/IdleTimeoutHandler.java +++ b/reactor-netty-http/src/main/java/reactor/netty/http/IdleTimeoutHandler.java @@ -1,5 +1,5 @@ /* - * Copyright (c) 2022-2024 VMware, Inc. or its affiliates, All Rights Reserved. + * Copyright (c) 2022-2025 VMware, Inc. or its affiliates, All Rights Reserved. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -13,7 +13,7 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package reactor.netty.http.server; +package reactor.netty.http; import io.netty.channel.ChannelHandler; import io.netty.channel.ChannelHandlerContext; @@ -24,6 +24,8 @@ import io.netty.handler.timeout.IdleStateEvent; import io.netty.handler.timeout.IdleStateHandler; import reactor.netty.NettyPipeline; +import reactor.util.Logger; +import reactor.util.Loggers; import reactor.util.annotation.Nullable; import java.time.Duration; @@ -31,28 +33,73 @@ import static reactor.netty.ReactorNetty.format; -final class IdleTimeoutHandler extends IdleStateHandler { +/** + * A handler that manages idle timeout for HTTP connections. + * This handler will close the connection if it remains idle for the specified duration. + * It also checks the liveness of the HTTP connection. + * + * @author raccoonback + * @since 1.2.5 + */ +public final class IdleTimeoutHandler extends IdleStateHandler { + + private static final Logger log = Loggers.getLogger(IdleTimeoutHandler.class); - IdleTimeoutHandler(long idleTimeout) { - super(idleTimeout, 0, 0, TimeUnit.MILLISECONDS); + private final HttpConnectionLiveness httpConnectionLiveness; + + private IdleTimeoutHandler(long idleTimeout, HttpConnectionLiveness httpConnectionLiveness) { + super(0, 0, idleTimeout, TimeUnit.MILLISECONDS); + this.httpConnectionLiveness = httpConnectionLiveness; } @Override @SuppressWarnings("FutureReturnValueIgnored") protected void channelIdle(ChannelHandlerContext ctx, IdleStateEvent evt) { - if (evt.state() == IdleState.READER_IDLE) { - if (HttpServerOperations.log.isDebugEnabled()) { - HttpServerOperations.log.debug(format(ctx.channel(), + if (evt.state() == IdleState.ALL_IDLE) { + if (log.isDebugEnabled()) { + log.debug(format(ctx.channel(), "Connection was idle for [{}ms], as per configuration the connection will be closed."), - getReaderIdleTimeInMillis()); + getAllIdleTimeInMillis()); } // FutureReturnValueIgnored is deliberate - ctx.close(); + + httpConnectionLiveness.check(ctx); } + ctx.fireUserEventTriggered(evt); } - static void addIdleTimeoutHandler(ChannelPipeline pipeline, @Nullable Duration idleTimeout) { + + @Override + public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { + httpConnectionLiveness.receive(msg); + + super.channelRead(ctx, msg); + } + + @Override + public void channelInactive(ChannelHandlerContext ctx) throws Exception { + httpConnectionLiveness.cancel(); + + super.channelInactive(ctx); + } + + @Override + public void handlerRemoved(ChannelHandlerContext ctx) throws Exception { + httpConnectionLiveness.cancel(); + + super.handlerRemoved(ctx); + } + + /** + * Adds an idle timeout handler to the server pipeline. + * + * @param pipeline the channel pipeline + * @param idleTimeout the idle timeout duration + * @param httpConnectionLiveness the HTTP connection liveness checker + */ + public static void addIdleTimeoutHandler(ChannelPipeline pipeline, @Nullable Duration idleTimeout, + HttpConnectionLiveness httpConnectionLiveness) { if (idleTimeout != null && pipeline.get(NettyPipeline.IdleTimeoutHandler) == null) { String baseName = null; if (pipeline.get(NettyPipeline.HttpCodec) != null) { @@ -73,11 +120,20 @@ static void addIdleTimeoutHandler(ChannelPipeline pipeline, @Nullable Duration i pipeline.addAfter(baseName, NettyPipeline.IdleTimeoutHandler, - new IdleTimeoutHandler(idleTimeout.toMillis())); + new IdleTimeoutHandler( + idleTimeout.toMillis(), + httpConnectionLiveness + ) + ); } } - static void removeIdleTimeoutHandler(ChannelPipeline pipeline) { + /** + * Removes the idle timeout handler from the pipeline if it exists. + * + * @param pipeline the channel pipeline from which the handler will be removed + */ + public static void removeIdleTimeoutHandler(ChannelPipeline pipeline) { if (pipeline.get(NettyPipeline.IdleTimeoutHandler) != null) { pipeline.remove(NettyPipeline.IdleTimeoutHandler); } diff --git a/reactor-netty-http/src/main/java/reactor/netty/http/server/HttpServerConfig.java b/reactor-netty-http/src/main/java/reactor/netty/http/server/HttpServerConfig.java index 8949760048..508abdf0ff 100644 --- a/reactor-netty-http/src/main/java/reactor/netty/http/server/HttpServerConfig.java +++ b/reactor-netty-http/src/main/java/reactor/netty/http/server/HttpServerConfig.java @@ -65,10 +65,13 @@ import reactor.netty.channel.AbstractChannelMetricsHandler; import reactor.netty.channel.ChannelMetricsRecorder; import reactor.netty.channel.ChannelOperations; +import reactor.netty.http.HttpConnectionImmediateClose; +import reactor.netty.http.Http2ConnectionLiveness; import reactor.netty.http.Http2SettingsSpec; import reactor.netty.http.Http3SettingsSpec; import reactor.netty.http.HttpProtocol; import reactor.netty.http.HttpResources; +import reactor.netty.http.IdleTimeoutHandler; import reactor.netty.http.logging.HttpMessageLogFactory; import reactor.netty.http.logging.ReactorNettyHttpMessageLogFactory; import reactor.netty.http.server.compression.HttpCompressionOptionsSpec; @@ -320,13 +323,13 @@ public Function uriTagValue() { boolean accessLogEnabled; Function accessLog; + boolean errorLogEnabled; + Function errorLog; HttpCompressionOptionsSpec compressionOptions; BiPredicate compressPredicate; ServerCookieDecoder cookieDecoder; ServerCookieEncoder cookieEncoder; HttpRequestDecoderSpec decoder; - boolean errorLogEnabled; - Function errorLog; HttpServerFormDecoderProvider formDecoderProvider; BiFunction forwardedHeaderHandler; Http2SettingsSpec http2Settings; @@ -501,13 +504,13 @@ static Http2Settings http2Settings(@Nullable Http2SettingsSpec http2Settings) { static void addStreamHandlers(Channel ch, boolean accessLogEnabled, @Nullable Function accessLog, + boolean errorLogEnabled, + @Nullable Function errorLog, @Nullable HttpCompressionOptionsSpec compressionOptions, @Nullable BiPredicate compressPredicate, @Nullable Boolean connectProtocolEnabled, ServerCookieDecoder decoder, ServerCookieEncoder encoder, - boolean errorLogEnabled, - @Nullable Function errorLog, HttpServerFormDecoderProvider formDecoderProvider, @Nullable BiFunction forwardedHeaderHandler, HttpMessageLogFactory httpMessageLogFactory, @@ -625,12 +628,12 @@ static void configureHttp3Pipeline( ChannelPipeline p, boolean accessLogEnabled, @Nullable Function accessLog, + boolean errorLogEnabled, + @Nullable Function errorLog, @Nullable HttpCompressionOptionsSpec compressionOptions, @Nullable BiPredicate compressPredicate, ServerCookieDecoder cookieDecoder, ServerCookieEncoder cookieEncoder, - boolean errorLogEnabled, - @Nullable Function errorLog, HttpServerFormDecoderProvider formDecoderProvider, @Nullable BiFunction forwardedHeaderHandler, HttpMessageLogFactory httpMessageLogFactory, @@ -643,11 +646,12 @@ static void configureHttp3Pipeline( @Nullable Duration readTimeout, @Nullable Duration requestTimeout, @Nullable Function uriTagValue, - boolean validate) { + boolean validate, + @Nullable Duration idleTimeout) { p.remove(NettyPipeline.ReactiveBridge); - p.addLast(NettyPipeline.HttpCodec, newHttp3ServerConnectionHandler(accessLogEnabled, accessLog, compressionOptions, - compressPredicate, cookieDecoder, cookieEncoder, errorLogEnabled, errorLog, formDecoderProvider, + p.addLast(NettyPipeline.HttpCodec, newHttp3ServerConnectionHandler(accessLogEnabled, accessLog, + compressionOptions, compressPredicate, cookieDecoder, cookieEncoder, errorLogEnabled, errorLog, formDecoderProvider, forwardedHeaderHandler, httpMessageLogFactory, listener, mapHandle, methodTagValue, metricsRecorder, minCompressionSize, opsFactory, readTimeout, requestTimeout, uriTagValue, validate)); @@ -655,18 +659,24 @@ static void configureHttp3Pipeline( // Connection metrics are not applicable p.remove(NettyPipeline.ChannelMetricsHandler); } + + IdleTimeoutHandler.addIdleTimeoutHandler( + p, + idleTimeout, + new HttpConnectionImmediateClose() + ); } static void configureH2Pipeline(ChannelPipeline p, boolean accessLogEnabled, @Nullable Function accessLog, + boolean errorLogEnabled, + @Nullable Function errorLog, @Nullable HttpCompressionOptionsSpec compressionOptions, @Nullable BiPredicate compressPredicate, ServerCookieDecoder cookieDecoder, ServerCookieEncoder cookieEncoder, boolean enableGracefulShutdown, - boolean errorLogEnabled, - @Nullable Function errorLog, HttpServerFormDecoderProvider formDecoderProvider, @Nullable BiFunction forwardedHeaderHandler, @Nullable Http2SettingsSpec http2SettingsSpec, @@ -711,12 +721,21 @@ static void configureH2Pipeline(ChannelPipeline p, } p.addLast(NettyPipeline.HttpCodec, http2FrameCodec) .addLast(NettyPipeline.H2MultiplexHandler, - new Http2MultiplexHandler(new H2Codec(accessLogEnabled, accessLog, compressionOptions, compressPredicate, - http2SettingsSpec != null ? http2SettingsSpec.connectProtocolEnabled() : null, - cookieDecoder, cookieEncoder, errorLogEnabled, errorLog, formDecoderProvider, forwardedHeaderHandler, httpMessageLogFactory, listener, + new Http2MultiplexHandler(new H2Codec(accessLogEnabled, accessLog, errorLogEnabled, errorLog, + compressionOptions, compressPredicate, http2SettingsSpec != null ? http2SettingsSpec.connectProtocolEnabled() : null, + cookieDecoder, cookieEncoder, formDecoderProvider, forwardedHeaderHandler, httpMessageLogFactory, listener, mapHandle, methodTagValue, metricsRecorder, minCompressionSize, opsFactory, readTimeout, requestTimeout, uriTagValue))); - IdleTimeoutHandler.addIdleTimeoutHandler(p, idleTimeout); + IdleTimeoutHandler.addIdleTimeoutHandler( + p, + idleTimeout, + new Http2ConnectionLiveness( + http2FrameCodec, + http2SettingsSpec != null ? http2SettingsSpec.pingAckTimeout() : null, + http2SettingsSpec != null ? http2SettingsSpec.pingScheduleInterval() : null, + http2SettingsSpec != null ? http2SettingsSpec.pingAckDropThreshold() : null + ) + ); if (metricsRecorder != null) { if (metricsRecorder instanceof MicrometerHttpServerMetricsRecorder) { @@ -732,14 +751,14 @@ static void configureH2Pipeline(ChannelPipeline p, static void configureHttp11OrH2CleartextPipeline(ChannelPipeline p, boolean accessLogEnabled, @Nullable Function accessLog, + boolean errorLogEnabled, + @Nullable Function errorLog, @Nullable HttpCompressionOptionsSpec compressionOptions, @Nullable BiPredicate compressPredicate, ServerCookieDecoder cookieDecoder, ServerCookieEncoder cookieEncoder, HttpRequestDecoderSpec decoder, boolean enableGracefulShutdown, - boolean errorLogEnabled, - @Nullable Function errorLog, HttpServerFormDecoderProvider formDecoderProvider, @Nullable BiFunction forwardedHeaderHandler, @Nullable Http2SettingsSpec http2SettingsSpec, @@ -766,12 +785,12 @@ static void configureHttp11OrH2CleartextPipeline(ChannelPipeline p, HttpServerCodec httpServerCodec = new HttpServerCodec(decoderConfig); - Http11OrH2CleartextCodec upgrader = new Http11OrH2CleartextCodec(accessLogEnabled, accessLog, compressionOptions, - compressPredicate, cookieDecoder, cookieEncoder, p.get(NettyPipeline.LoggingHandler) != null, enableGracefulShutdown, - errorLogEnabled, errorLog, formDecoderProvider, forwardedHeaderHandler, http2SettingsSpec, httpMessageLogFactory, listener, mapHandle, - methodTagValue, metricsRecorder, minCompressionSize, opsFactory, readTimeout, requestTimeout, uriTagValue, decoder.validateHeaders()); + Http11OrH2CleartextCodec upgrader = new Http11OrH2CleartextCodec(accessLogEnabled, accessLog, errorLogEnabled, errorLog, + compressionOptions, compressPredicate, cookieDecoder, cookieEncoder, p.get(NettyPipeline.LoggingHandler) != null, + enableGracefulShutdown, formDecoderProvider, forwardedHeaderHandler, idleTimeout, http2SettingsSpec, httpMessageLogFactory, listener, + mapHandle, methodTagValue, metricsRecorder, minCompressionSize, opsFactory, readTimeout, requestTimeout, uriTagValue, decoder.validateHeaders()); - ChannelHandler http2ServerHandler = new H2CleartextCodec(upgrader, http2SettingsSpec != null ? http2SettingsSpec.maxStreams() : null); + ChannelHandler http2ServerHandler = new H2CleartextCodec(upgrader, http2SettingsSpec != null ? http2SettingsSpec.maxStreams() : null, idleTimeout, http2SettingsSpec); HttpServerUpgradeHandler httpServerUpgradeHandler = readTimeout == null && requestTimeout == null ? new HttpServerUpgradeHandler(httpServerCodec, upgrader, decoder.h2cMaxContentLength()) : @@ -785,8 +804,8 @@ static void configureHttp11OrH2CleartextPipeline(ChannelPipeline p, .addBefore(NettyPipeline.ReactiveBridge, NettyPipeline.HttpTrafficHandler, new HttpTrafficHandler(compressPredicate, compressionOptions, cookieDecoder, cookieEncoder, formDecoderProvider, - forwardedHeaderHandler, httpMessageLogFactory, idleTimeout, listener, mapHandle, maxKeepAliveRequests, - readTimeout, requestTimeout, decoder.validateHeaders())); + forwardedHeaderHandler, httpMessageLogFactory, idleTimeout, http2SettingsSpec, listener, mapHandle, + maxKeepAliveRequests, readTimeout, requestTimeout, decoder.validateHeaders())); if (accessLogEnabled) { p.addAfter(NettyPipeline.HttpTrafficHandler, NettyPipeline.AccessLogHandler, AccessLogHandlerFactory.H1.create(accessLog)); @@ -822,20 +841,26 @@ else if (metricsRecorder instanceof ContextAwareHttpServerMetricsRecorder) { if (errorLogEnabled) { p.addBefore(NettyPipeline.ReactiveBridge, NettyPipeline.ErrorLogHandler, new DefaultErrorLogHandler(errorLog)); } + + IdleTimeoutHandler.addIdleTimeoutHandler( + p, + idleTimeout, + new HttpConnectionImmediateClose() + ); } @SuppressWarnings("deprecation") static void configureHttp11Pipeline(ChannelPipeline p, boolean accessLogEnabled, @Nullable Function accessLog, + boolean errorLogEnabled, + @Nullable Function errorLog, @Nullable HttpCompressionOptionsSpec compressionOptions, @Nullable BiPredicate compressPredicate, ServerCookieDecoder cookieDecoder, ServerCookieEncoder cookieEncoder, boolean channelOpened, HttpRequestDecoderSpec decoder, - boolean errorLogEnabled, - @Nullable Function errorLog, HttpServerFormDecoderProvider formDecoderProvider, @Nullable BiFunction forwardedHeaderHandler, HttpMessageLogFactory httpMessageLogFactory, @@ -863,7 +888,7 @@ static void configureHttp11Pipeline(ChannelPipeline p, .addBefore(NettyPipeline.ReactiveBridge, NettyPipeline.HttpTrafficHandler, new HttpTrafficHandler(compressPredicate, compressionOptions, cookieDecoder, cookieEncoder, formDecoderProvider, - forwardedHeaderHandler, httpMessageLogFactory, idleTimeout, listener, mapHandle, maxKeepAliveRequests, + forwardedHeaderHandler, httpMessageLogFactory, idleTimeout, null, listener, mapHandle, maxKeepAliveRequests, readTimeout, requestTimeout, decoder.validateHeaders())); if (accessLogEnabled) { @@ -903,6 +928,12 @@ else if (metricsRecorder instanceof ContextAwareHttpServerMetricsRecorder) { if (errorLogEnabled) { p.addBefore(NettyPipeline.ReactiveBridge, NettyPipeline.ErrorLogHandler, new DefaultErrorLogHandler(errorLog)); } + + IdleTimeoutHandler.addIdleTimeoutHandler( + p, + idleTimeout, + new HttpConnectionImmediateClose() + ); } static final boolean ACCESS_LOG = Boolean.parseBoolean(System.getProperty(ACCESS_LOG_ENABLED, "false")); @@ -992,12 +1023,14 @@ static final class H2CleartextCodec extends ChannelHandlerAdapter { final boolean addHttp2FrameCodec; final boolean removeMetricsHandler; final Long maxStreams; + final Duration idleTimeout; + final Http2SettingsSpec http2SettingsSpec; /** * Used when full H2 preface is received. */ - H2CleartextCodec(Http11OrH2CleartextCodec upgrader, @Nullable Long maxStreams) { - this(upgrader, true, true, maxStreams); + H2CleartextCodec(Http11OrH2CleartextCodec upgrader, @Nullable Long maxStreams, @Nullable Duration idleTimeout, @Nullable Http2SettingsSpec http2SettingsSpec) { + this(upgrader, true, true, maxStreams, idleTimeout, http2SettingsSpec); } /** @@ -1005,11 +1038,13 @@ static final class H2CleartextCodec extends ChannelHandlerAdapter { * is added by {@link Http2ServerUpgradeCodec} */ H2CleartextCodec(Http11OrH2CleartextCodec upgrader, boolean addHttp2FrameCodec, boolean removeMetricsHandler, - @Nullable Long maxStreams) { + @Nullable Long maxStreams, @Nullable Duration idleTimeout, @Nullable Http2SettingsSpec http2SettingsSpec) { this.upgrader = upgrader; this.addHttp2FrameCodec = addHttp2FrameCodec; this.removeMetricsHandler = removeMetricsHandler; this.maxStreams = maxStreams; + this.idleTimeout = idleTimeout; + this.http2SettingsSpec = http2SettingsSpec; } @Override @@ -1046,6 +1081,20 @@ public void handlerAdded(ChannelHandlerContext ctx) { } pipeline.remove(NettyPipeline.HttpTrafficHandler); pipeline.remove(NettyPipeline.ReactiveBridge); + + if (idleTimeout != null) { + IdleTimeoutHandler.removeIdleTimeoutHandler(pipeline); + IdleTimeoutHandler.addIdleTimeoutHandler( + pipeline, + idleTimeout, + new Http2ConnectionLiveness( + upgrader.http2FrameCodec, + http2SettingsSpec != null ? http2SettingsSpec.pingAckTimeout() : null, + http2SettingsSpec != null ? http2SettingsSpec.pingScheduleInterval() : null, + http2SettingsSpec != null ? http2SettingsSpec.pingAckDropThreshold() : null + ) + ); + } } } @@ -1053,13 +1102,13 @@ static final class H2Codec extends ChannelInitializer { final boolean accessLogEnabled; final Function accessLog; + final boolean errorLogEnabled; + final Function errorLog; final HttpCompressionOptionsSpec compressionOptions; final BiPredicate compressPredicate; final Boolean connectProtocolEnabled; final ServerCookieDecoder cookieDecoder; final ServerCookieEncoder cookieEncoder; - final boolean errorLogEnabled; - final Function errorLog; final HttpServerFormDecoderProvider formDecoderProvider; final BiFunction forwardedHeaderHandler; final HttpMessageLogFactory httpMessageLogFactory; @@ -1077,13 +1126,13 @@ static final class H2Codec extends ChannelInitializer { H2Codec( boolean accessLogEnabled, @Nullable Function accessLog, + boolean errorLogEnabled, + @Nullable Function errorLog, @Nullable HttpCompressionOptionsSpec compressionOptions, @Nullable BiPredicate compressPredicate, @Nullable Boolean connectProtocolEnabled, ServerCookieDecoder decoder, ServerCookieEncoder encoder, - boolean errorLogEnabled, - @Nullable Function errorLog, HttpServerFormDecoderProvider formDecoderProvider, @Nullable BiFunction forwardedHeaderHandler, HttpMessageLogFactory httpMessageLogFactory, @@ -1098,13 +1147,13 @@ static final class H2Codec extends ChannelInitializer { @Nullable Function uriTagValue) { this.accessLogEnabled = accessLogEnabled; this.accessLog = accessLog; + this.errorLogEnabled = errorLogEnabled; + this.errorLog = errorLog; this.compressionOptions = compressionOptions; this.compressPredicate = compressPredicate; this.connectProtocolEnabled = connectProtocolEnabled; this.cookieDecoder = decoder; this.cookieEncoder = encoder; - this.errorLogEnabled = errorLogEnabled; - this.errorLog = errorLog; this.formDecoderProvider = formDecoderProvider; this.forwardedHeaderHandler = forwardedHeaderHandler; this.httpMessageLogFactory = httpMessageLogFactory; @@ -1122,8 +1171,8 @@ static final class H2Codec extends ChannelInitializer { @Override protected void initChannel(Channel ch) { ch.pipeline().remove(this); - addStreamHandlers(ch, accessLogEnabled, accessLog, compressionOptions, compressPredicate, connectProtocolEnabled, cookieDecoder, cookieEncoder, - errorLogEnabled, errorLog, formDecoderProvider, forwardedHeaderHandler, httpMessageLogFactory, listener, mapHandle, + addStreamHandlers(ch, accessLogEnabled, accessLog, errorLogEnabled, errorLog, compressionOptions, compressPredicate, connectProtocolEnabled, + cookieDecoder, cookieEncoder, formDecoderProvider, forwardedHeaderHandler, httpMessageLogFactory, listener, mapHandle, methodTagValue, metricsRecorder, minCompressionSize, opsFactory, readTimeout, requestTimeout, uriTagValue); } } @@ -1133,13 +1182,13 @@ static final class Http11OrH2CleartextCodec extends ChannelInitializer final boolean accessLogEnabled; final Function accessLog; + final boolean errorLogEnabled; + final Function errorLog; final HttpCompressionOptionsSpec compressionOptions; final BiPredicate compressPredicate; final Boolean connectProtocolEnabled; final ServerCookieDecoder cookieDecoder; final ServerCookieEncoder cookieEncoder; - final boolean errorLogEnabled; - final Function errorLog; final HttpServerFormDecoderProvider formDecoderProvider; final BiFunction forwardedHeaderHandler; final Http2FrameCodec http2FrameCodec; @@ -1155,20 +1204,23 @@ static final class Http11OrH2CleartextCodec extends ChannelInitializer final Duration readTimeout; final Duration requestTimeout; final Function uriTagValue; + final Duration idleTimeout; + final Http2SettingsSpec http2SettingsSpec; Http11OrH2CleartextCodec( boolean accessLogEnabled, @Nullable Function accessLog, + boolean errorLogEnabled, + @Nullable Function errorLog, @Nullable HttpCompressionOptionsSpec compressionOptions, @Nullable BiPredicate compressPredicate, ServerCookieDecoder cookieDecoder, ServerCookieEncoder cookieEncoder, boolean debug, boolean enableGracefulShutdown, - boolean errorLogEnabled, - @Nullable Function errorLog, HttpServerFormDecoderProvider formDecoderProvider, @Nullable BiFunction forwardedHeaderHandler, + @Nullable Duration idleTimeout, @Nullable Http2SettingsSpec http2SettingsSpec, HttpMessageLogFactory httpMessageLogFactory, ConnectionObserver listener, @@ -1183,13 +1235,13 @@ static final class Http11OrH2CleartextCodec extends ChannelInitializer boolean validate) { this.accessLogEnabled = accessLogEnabled; this.accessLog = accessLog; + this.errorLogEnabled = errorLogEnabled; + this.errorLog = errorLog; this.compressionOptions = compressionOptions; this.compressPredicate = compressPredicate; this.connectProtocolEnabled = http2SettingsSpec != null ? http2SettingsSpec.connectProtocolEnabled() : null; this.cookieDecoder = cookieDecoder; this.cookieEncoder = cookieEncoder; - this.errorLogEnabled = errorLogEnabled; - this.errorLog = errorLog; this.formDecoderProvider = formDecoderProvider; this.forwardedHeaderHandler = forwardedHeaderHandler; Http2FrameCodecBuilder http2FrameCodecBuilder = @@ -1224,6 +1276,8 @@ static final class Http11OrH2CleartextCodec extends ChannelInitializer this.readTimeout = readTimeout; this.requestTimeout = requestTimeout; this.uriTagValue = uriTagValue; + this.idleTimeout = idleTimeout; + this.http2SettingsSpec = http2SettingsSpec; } /** @@ -1232,8 +1286,8 @@ static final class Http11OrH2CleartextCodec extends ChannelInitializer @Override protected void initChannel(Channel ch) { ch.pipeline().remove(this); - addStreamHandlers(ch, accessLogEnabled, accessLog, compressionOptions, compressPredicate, connectProtocolEnabled, cookieDecoder, - cookieEncoder, errorLogEnabled, errorLog, formDecoderProvider, forwardedHeaderHandler, httpMessageLogFactory, + addStreamHandlers(ch, accessLogEnabled, accessLog, errorLogEnabled, errorLog, compressionOptions, compressPredicate, + connectProtocolEnabled, cookieDecoder, cookieEncoder, formDecoderProvider, forwardedHeaderHandler, httpMessageLogFactory, listener, mapHandle, methodTagValue, metricsRecorder, minCompressionSize, opsFactory, readTimeout, requestTimeout, uriTagValue); } @@ -1241,7 +1295,7 @@ protected void initChannel(Channel ch) { @Nullable public HttpServerUpgradeHandler.UpgradeCodec newUpgradeCodec(CharSequence protocol) { if (AsciiString.contentEquals(Http2CodecUtil.HTTP_UPGRADE_PROTOCOL_NAME, protocol)) { - return new Http2ServerUpgradeCodec(http2FrameCodec, new H2CleartextCodec(this, false, false, maxStreams)); + return new Http2ServerUpgradeCodec(http2FrameCodec, new H2CleartextCodec(this, false, false, maxStreams, idleTimeout, http2SettingsSpec)); } else { return null; @@ -1276,14 +1330,14 @@ static final class H2OrHttp11Codec extends ApplicationProtocolNegotiationHandler final boolean accessLogEnabled; final Function accessLog; + final boolean errorLogEnabled; + final Function errorLog; final HttpCompressionOptionsSpec compressionOptions; final BiPredicate compressPredicate; final ServerCookieDecoder cookieDecoder; final ServerCookieEncoder cookieEncoder; final HttpRequestDecoderSpec decoder; final boolean enableGracefulShutdown; - final boolean errorLogEnabled; - final Function errorLog; final HttpServerFormDecoderProvider formDecoderProvider; final BiFunction forwardedHeaderHandler; final Http2SettingsSpec http2SettingsSpec; @@ -1310,14 +1364,14 @@ static final class H2OrHttp11Codec extends ApplicationProtocolNegotiationHandler super(ApplicationProtocolNames.HTTP_1_1); this.accessLogEnabled = initializer.accessLogEnabled; this.accessLog = initializer.accessLog; + this.errorLogEnabled = initializer.errorLogEnabled; + this.errorLog = initializer.errorLog; this.compressionOptions = initializer.compressionOptions; this.compressPredicate = compressPredicate(initializer.compressPredicate, initializer.minCompressionSize); this.cookieDecoder = initializer.cookieDecoder; this.cookieEncoder = initializer.cookieEncoder; this.decoder = initializer.decoder; this.enableGracefulShutdown = initializer.enableGracefulShutdown; - this.errorLogEnabled = initializer.errorLogEnabled; - this.errorLog = initializer.errorLog; this.formDecoderProvider = initializer.formDecoderProvider; this.forwardedHeaderHandler = initializer.forwardedHeaderHandler; this.http2SettingsSpec = initializer.http2SettingsSpec; @@ -1345,23 +1399,18 @@ protected void configurePipeline(ChannelHandlerContext ctx, String protocol) { ChannelPipeline p = ctx.pipeline(); if (ApplicationProtocolNames.HTTP_2.equals(protocol)) { - configureH2Pipeline(p, accessLogEnabled, accessLog, compressionOptions, compressPredicate, cookieDecoder, cookieEncoder, - enableGracefulShutdown, errorLogEnabled, errorLog, formDecoderProvider, forwardedHeaderHandler, http2SettingsSpec, + configureH2Pipeline(p, accessLogEnabled, accessLog, errorLogEnabled, errorLog, compressionOptions, compressPredicate, + cookieDecoder, cookieEncoder, enableGracefulShutdown, formDecoderProvider, forwardedHeaderHandler, http2SettingsSpec, httpMessageLogFactory, idleTimeout, listener, mapHandle, methodTagValue, metricsRecorder, minCompressionSize, opsFactory, readTimeout, requestTimeout, uriTagValue, decoder.validateHeaders()); return; } if (!supportOnlyHttp2 && ApplicationProtocolNames.HTTP_1_1.equals(protocol)) { - configureHttp11Pipeline(p, accessLogEnabled, accessLog, compressionOptions, compressPredicate, cookieDecoder, cookieEncoder, - true, decoder, errorLogEnabled, errorLog, formDecoderProvider, forwardedHeaderHandler, httpMessageLogFactory, + configureHttp11Pipeline(p, accessLogEnabled, accessLog, errorLogEnabled, errorLog, compressionOptions, compressPredicate, + cookieDecoder, cookieEncoder, true, decoder, formDecoderProvider, forwardedHeaderHandler, httpMessageLogFactory, idleTimeout, listener, mapHandle, maxKeepAliveRequests, methodTagValue, metricsRecorder, minCompressionSize, readTimeout, requestTimeout, uriTagValue); - - // When the server is configured with HTTP/1.1 and H2 and HTTP/1.1 is negotiated, - // when channelActive event happens, this HttpTrafficHandler is still not in the pipeline, - // and will not be able to add IdleTimeoutHandler. So in this use case add IdleTimeoutHandler here. - IdleTimeoutHandler.addIdleTimeoutHandler(ctx.pipeline(), idleTimeout); return; } @@ -1373,14 +1422,14 @@ static final class HttpServerChannelInitializer implements ChannelPipelineConfig final boolean accessLogEnabled; final Function accessLog; + final boolean errorLogEnabled; + final Function errorLog; final HttpCompressionOptionsSpec compressionOptions; final BiPredicate compressPredicate; final ServerCookieDecoder cookieDecoder; final ServerCookieEncoder cookieEncoder; final HttpRequestDecoderSpec decoder; final boolean enableGracefulShutdown; - final boolean errorLogEnabled; - final Function errorLog; final HttpServerFormDecoderProvider formDecoderProvider; final BiFunction forwardedHeaderHandler; final Http2SettingsSpec http2SettingsSpec; @@ -1404,14 +1453,14 @@ static final class HttpServerChannelInitializer implements ChannelPipelineConfig HttpServerChannelInitializer(HttpServerConfig config) { this.accessLogEnabled = config.accessLogEnabled; this.accessLog = config.accessLog; + this.errorLogEnabled = config.errorLogEnabled; + this.errorLog = config.errorLog; this.compressionOptions = config.compressionOptions; this.compressPredicate = config.compressPredicate; this.cookieDecoder = config.cookieDecoder; this.cookieEncoder = config.cookieEncoder; this.decoder = config.decoder; this.enableGracefulShutdown = config.channelGroup() != null; - this.errorLogEnabled = config.errorLogEnabled; - this.errorLog = config.errorLog; this.formDecoderProvider = config.formDecoderProvider; this.forwardedHeaderHandler = config.forwardedHeaderHandler; this.http2SettingsSpec = config.http2Settings; @@ -1461,14 +1510,14 @@ else if ((protocols & h11) == h11) { channel.pipeline(), accessLogEnabled, accessLog, + errorLogEnabled, + errorLog, compressionOptions, compressPredicate(compressPredicate, minCompressionSize), cookieDecoder, cookieEncoder, false, decoder, - errorLogEnabled, - errorLog, formDecoderProvider, forwardedHeaderHandler, httpMessageLogFactory, @@ -1496,13 +1545,13 @@ else if ((protocols & h2) == h2) { channel.pipeline(), accessLogEnabled, accessLog, + errorLogEnabled, + errorLog, compressionOptions, compressPredicate(compressPredicate, minCompressionSize), cookieDecoder, cookieEncoder, enableGracefulShutdown, - errorLogEnabled, - errorLog, formDecoderProvider, forwardedHeaderHandler, http2SettingsSpec, @@ -1525,12 +1574,12 @@ else if ((protocols & h3) == h3) { channel.pipeline(), accessLogEnabled, accessLog, + errorLogEnabled, + errorLog, compressionOptions, compressPredicate(compressPredicate, minCompressionSize), cookieDecoder, cookieEncoder, - errorLogEnabled, - errorLog, formDecoderProvider, forwardedHeaderHandler, httpMessageLogFactory, @@ -1543,7 +1592,8 @@ else if ((protocols & h3) == h3) { readTimeout, requestTimeout, uriTagValue, - decoder.validateHeaders()); + decoder.validateHeaders(), + idleTimeout); } } else { @@ -1552,14 +1602,14 @@ else if ((protocols & h3) == h3) { channel.pipeline(), accessLogEnabled, accessLog, + errorLogEnabled, + errorLog, compressionOptions, compressPredicate(compressPredicate, minCompressionSize), cookieDecoder, cookieEncoder, decoder, enableGracefulShutdown, - errorLogEnabled, - errorLog, formDecoderProvider, forwardedHeaderHandler, http2SettingsSpec, @@ -1581,14 +1631,14 @@ else if ((protocols & h11) == h11) { channel.pipeline(), accessLogEnabled, accessLog, + errorLogEnabled, + errorLog, compressionOptions, compressPredicate(compressPredicate, minCompressionSize), cookieDecoder, cookieEncoder, false, decoder, - errorLogEnabled, - errorLog, formDecoderProvider, forwardedHeaderHandler, httpMessageLogFactory, @@ -1608,13 +1658,13 @@ else if ((protocols & h2c) == h2c) { channel.pipeline(), accessLogEnabled, accessLog, + errorLogEnabled, + errorLog, compressionOptions, compressPredicate(compressPredicate, minCompressionSize), cookieDecoder, cookieEncoder, enableGracefulShutdown, - errorLogEnabled, - errorLog, formDecoderProvider, forwardedHeaderHandler, http2SettingsSpec, diff --git a/reactor-netty-http/src/main/java/reactor/netty/http/server/HttpTrafficHandler.java b/reactor-netty-http/src/main/java/reactor/netty/http/server/HttpTrafficHandler.java index 7079ac2ea2..82ed034ac8 100644 --- a/reactor-netty-http/src/main/java/reactor/netty/http/server/HttpTrafficHandler.java +++ b/reactor-netty-http/src/main/java/reactor/netty/http/server/HttpTrafficHandler.java @@ -15,17 +15,17 @@ */ package reactor.netty.http.server; -import java.net.SocketAddress; -import java.time.Duration; -import java.time.ZonedDateTime; -import java.util.Optional; -import java.util.Queue; -import java.util.function.BiFunction; -import java.util.function.BiPredicate; +import static io.netty.handler.codec.http.HttpUtil.isContentLengthSet; +import static io.netty.handler.codec.http.HttpUtil.isKeepAlive; +import static io.netty.handler.codec.http.HttpUtil.isTransferEncodingChunked; +import static io.netty.handler.codec.http.HttpUtil.setKeepAlive; +import static io.netty.handler.codec.http.LastHttpContent.EMPTY_LAST_CONTENT; +import static reactor.netty.ReactorNetty.format; import io.netty.channel.ChannelDuplexHandler; import io.netty.channel.ChannelFutureListener; import io.netty.channel.ChannelHandlerContext; +import io.netty.channel.ChannelPipeline; import io.netty.channel.ChannelPromise; import io.netty.handler.codec.DecoderResult; import io.netty.handler.codec.DecoderResultProvider; @@ -43,27 +43,32 @@ import io.netty.handler.codec.http.LastHttpContent; import io.netty.handler.codec.http.cookie.ServerCookieDecoder; import io.netty.handler.codec.http.cookie.ServerCookieEncoder; +import io.netty.handler.codec.http2.Http2FrameCodec; import io.netty.handler.ssl.SslHandler; import io.netty.util.ReferenceCountUtil; +import java.net.SocketAddress; +import java.time.Duration; +import java.time.ZonedDateTime; +import java.util.Optional; +import java.util.Queue; +import java.util.function.BiFunction; +import java.util.function.BiPredicate; import reactor.core.Exceptions; import reactor.core.publisher.Mono; import reactor.netty.Connection; import reactor.netty.ConnectionObserver; import reactor.netty.ReactorNetty; import reactor.netty.channel.ChannelOperations; +import reactor.netty.http.Http2ConnectionLiveness; +import reactor.netty.http.Http2SettingsSpec; +import reactor.netty.http.HttpConnectionImmediateClose; +import reactor.netty.http.IdleTimeoutHandler; import reactor.netty.http.logging.HttpMessageArgProviderFactory; import reactor.netty.http.logging.HttpMessageLogFactory; import reactor.netty.http.server.compression.HttpCompressionOptionsSpec; import reactor.util.annotation.Nullable; import reactor.util.concurrent.Queues; -import static io.netty.handler.codec.http.HttpUtil.isContentLengthSet; -import static io.netty.handler.codec.http.HttpUtil.isKeepAlive; -import static io.netty.handler.codec.http.HttpUtil.isTransferEncodingChunked; -import static io.netty.handler.codec.http.HttpUtil.setKeepAlive; -import static io.netty.handler.codec.http.LastHttpContent.EMPTY_LAST_CONTENT; -import static reactor.netty.ReactorNetty.format; - /** * Replace {@link io.netty.handler.codec.http.HttpServerKeepAliveHandler} with extra * handler management. @@ -85,6 +90,7 @@ final class HttpTrafficHandler extends ChannelDuplexHandler implements Runnable final BiFunction forwardedHeaderHandler; final HttpMessageLogFactory httpMessageLogFactory; final Duration idleTimeout; + final Http2SettingsSpec http2SettingsSpec; final ConnectionObserver listener; final BiFunction, ? super Connection, ? extends Mono> mapHandle; @@ -121,6 +127,7 @@ final class HttpTrafficHandler extends ChannelDuplexHandler implements Runnable @Nullable BiFunction forwardedHeaderHandler, HttpMessageLogFactory httpMessageLogFactory, @Nullable Duration idleTimeout, + @Nullable Http2SettingsSpec http2SettingsSpec, ConnectionObserver listener, @Nullable BiFunction, ? super Connection, ? extends Mono> mapHandle, int maxKeepAliveRequests, @@ -136,6 +143,7 @@ final class HttpTrafficHandler extends ChannelDuplexHandler implements Runnable this.cookieDecoder = decoder; this.httpMessageLogFactory = httpMessageLogFactory; this.idleTimeout = idleTimeout; + this.http2SettingsSpec = http2SettingsSpec; this.mapHandle = mapHandle; this.maxKeepAliveRequests = maxKeepAliveRequests; this.readTimeout = readTimeout; @@ -155,8 +163,7 @@ public void handlerAdded(ChannelHandlerContext ctx) throws Exception { @Override public void channelActive(ChannelHandlerContext ctx) { - IdleTimeoutHandler.addIdleTimeoutHandler(ctx.pipeline(), idleTimeout); - + setupIdleTimeoutHandler(ctx.pipeline()); ctx.fireChannelActive(); } @@ -546,7 +553,7 @@ void handleLastHttpContent(Object msg, ChannelPromise promise) { ctx.executor().execute(this); } else { - IdleTimeoutHandler.addIdleTimeoutHandler(ctx.pipeline(), idleTimeout); + setupIdleTimeoutHandler(ctx.pipeline()); ctx.read(); } } @@ -690,6 +697,29 @@ static boolean isMultipart(HttpResponse response) { MULTIPART_PREFIX.length()); } + private void setupIdleTimeoutHandler(ChannelPipeline pipeline) { + Http2FrameCodec httpCodec = pipeline.get(Http2FrameCodec.class); + if (httpCodec != null) { + IdleTimeoutHandler.addIdleTimeoutHandler( + pipeline, + idleTimeout, + new Http2ConnectionLiveness( + httpCodec, + http2SettingsSpec != null ? http2SettingsSpec.pingAckTimeout() : null, + http2SettingsSpec != null ? http2SettingsSpec.pingScheduleInterval() : null, + http2SettingsSpec != null ? http2SettingsSpec.pingAckDropThreshold() : null + ) + ); + return; + } + + IdleTimeoutHandler.addIdleTimeoutHandler( + pipeline, + idleTimeout, + new HttpConnectionImmediateClose() + ); + } + static final class HttpRequestHolder { final HttpRequest request; final ZonedDateTime timestamp; diff --git a/reactor-netty-http/src/main/resources/META-INF/native-image/io.projectreactor.netty/reactor-netty-http/reflect-config.json b/reactor-netty-http/src/main/resources/META-INF/native-image/io.projectreactor.netty/reactor-netty-http/reflect-config.json index 65c4c74cdd..b4a26c7a4e 100644 --- a/reactor-netty-http/src/main/resources/META-INF/native-image/io.projectreactor.netty/reactor-netty-http/reflect-config.json +++ b/reactor-netty-http/src/main/resources/META-INF/native-image/io.projectreactor.netty/reactor-netty-http/reflect-config.json @@ -260,9 +260,9 @@ }, { "condition": { - "typeReachable": "reactor.netty.http.server.IdleTimeoutHandler" + "typeReachable": "reactor.netty.http.IdleTimeoutHandler" }, - "name": "reactor.netty.http.server.IdleTimeoutHandler", + "name": "reactor.netty.http.IdleTimeoutHandler", "queryAllPublicMethods": true }, { diff --git a/reactor-netty-http/src/test/java/reactor/netty/http/HttpProtocolsTests.java b/reactor-netty-http/src/test/java/reactor/netty/http/HttpProtocolsTests.java index 6082f15679..1488fafca3 100644 --- a/reactor-netty-http/src/test/java/reactor/netty/http/HttpProtocolsTests.java +++ b/reactor-netty-http/src/test/java/reactor/netty/http/HttpProtocolsTests.java @@ -38,11 +38,13 @@ import io.netty.handler.codec.http.HttpServerCodec; import io.netty.handler.codec.http.LastHttpContent; import io.netty.handler.codec.http2.Http2Connection; +import io.netty.handler.codec.http2.Http2ConnectionAdapter; import io.netty.handler.codec.http2.Http2DataFrame; import io.netty.handler.codec.http2.Http2FrameCodec; import io.netty.handler.codec.http2.Http2HeadersFrame; import io.netty.handler.codec.http2.Http2SettingsAckFrame; import io.netty.handler.codec.http2.Http2SettingsFrame; +import io.netty.handler.codec.http2.Http2Stream; import io.netty.handler.codec.http2.Http2StreamChannel; import io.netty.handler.ssl.SslHandshakeCompletionEvent; import io.netty.handler.ssl.util.InsecureTrustManagerFactory; @@ -52,7 +54,6 @@ import org.junit.jupiter.api.AfterAll; import org.junit.jupiter.params.ParameterizedTest; import org.junit.jupiter.params.provider.MethodSource; -import org.mockito.Mockito; import org.reactivestreams.Publisher; import org.slf4j.LoggerFactory; import reactor.core.publisher.Flux; @@ -727,13 +728,22 @@ else if (serverProtocols.length == 2 && serverProtocols[1] == HttpProtocol.H2) { } Http2FrameCodec http2FrameCodec = conn.channel().parent().pipeline().get(Http2FrameCodec.class); - Http2Connection.Listener goAwayFrameListener = Mockito.mock(Http2Connection.Listener.class); - Mockito.doAnswer(invocation -> { - goAwayReceived.countDown(); - return null; - }) - .when(goAwayFrameListener) - .onGoAwayReceived(Mockito.anyInt(), Mockito.anyLong(), Mockito.any()); + Http2Connection.Listener goAwayFrameListener = new Http2ConnectionAdapter() { + @Override + public void onGoAwayReceived(int lastStreamId, long errorCode, ByteBuf debugData) { + goAwayReceived.countDown(); + + super.onGoAwayReceived(lastStreamId, errorCode, debugData); + } + + @Override + public void onStreamClosed(Http2Stream stream) { + goAwayReceived.countDown(); + + super.onStreamClosed(stream); + } + }; + http2FrameCodec.connection().addListener(goAwayFrameListener); }) .port(disposableServer.port()) diff --git a/reactor-netty-http/src/test/java/reactor/netty/http/server/HttpConnectionLivenessTest.java b/reactor-netty-http/src/test/java/reactor/netty/http/server/HttpConnectionLivenessTest.java new file mode 100644 index 0000000000..95ac1de551 --- /dev/null +++ b/reactor-netty-http/src/test/java/reactor/netty/http/server/HttpConnectionLivenessTest.java @@ -0,0 +1,818 @@ +/* + * Copyright (c) 2025 VMware, Inc. or its affiliates, All Rights Reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package reactor.netty.http.server; + +import io.netty.channel.Channel; +import io.netty.channel.ChannelFuture; +import io.netty.channel.ChannelFutureListener; +import io.netty.channel.ChannelHandlerContext; +import io.netty.channel.SimpleChannelInboundHandler; +import io.netty.handler.codec.http2.DefaultHttp2PingFrame; +import io.netty.handler.codec.http2.Http2FrameCodec; +import io.netty.handler.codec.http2.Http2FrameCodecBuilder; +import io.netty.handler.codec.http2.Http2FrameWriter; +import io.netty.handler.codec.http2.Http2PingFrame; +import io.netty.handler.ssl.SslContext; +import io.netty.handler.ssl.SslContextBuilder; +import io.netty.handler.ssl.util.InsecureTrustManagerFactory; +import io.netty.handler.ssl.util.SelfSignedCertificate; +import org.junit.jupiter.api.BeforeAll; +import org.junit.jupiter.api.Nested; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.CsvSource; +import reactor.core.publisher.Mono; +import reactor.netty.BaseHttpTest; +import reactor.netty.NettyPipeline; +import reactor.netty.http.HttpConnectionLiveness; +import reactor.netty.resources.ConnectionProvider; +import reactor.util.Logger; +import reactor.util.Loggers; + +import javax.net.ssl.SSLException; +import java.security.cert.CertificateException; +import java.time.Duration; +import java.time.LocalDateTime; +import java.time.ZoneId; +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.atomic.AtomicReference; +import java.util.stream.Collectors; + +import static org.assertj.core.api.Assertions.assertThat; +import static reactor.netty.http.HttpProtocol.H2; +import static reactor.netty.http.HttpProtocol.H2C; +import static reactor.netty.http.HttpProtocol.HTTP11; + +/** + * This test class verifies {@link HttpConnectionLiveness} with server side. + * + * @author raccoonback + * @since 1.2.5 + */ +class HttpConnectionLivenessTest extends BaseHttpTest { + + static final Logger log = Loggers.getLogger(HttpConnectionLivenessTest.class); + + static SslContext sslServer; + static SslContext sslClient; + + @BeforeAll + static void createSelfSignedCertificate() throws CertificateException, SSLException { + SelfSignedCertificate ssc = new SelfSignedCertificate(); + sslServer = SslContextBuilder.forServer(ssc.certificate(), ssc.privateKey()) + .build(); + sslClient = SslContextBuilder.forClient() + .trustManager(InsecureTrustManagerFactory.INSTANCE) + .build(); + } + + @Nested + class Http2Test { + + @Test + void successReceiveResponse() { + disposableServer = createServer() + .protocol(H2) + .secure(spec -> spec.sslContext(sslServer)) + .handle((req, resp) -> resp.sendString(Mono.just("Test"))) + .bindNow(); + + String result = createClient(disposableServer::address) + .protocol(H2) + .secure(spec -> spec.sslContext(sslClient)) + .get() + .uri("/") + .responseSingle((resp, bytes) -> bytes.asString()) + .block(); + + assertThat(result).isEqualTo("Test"); + } + + @Test + void maintainConnectionWithoutPingCheckWhenNotConfigured() { + AtomicReference connectedServerChannel = new AtomicReference<>(); + disposableServer = createServer() + .protocol(H2) + .maxKeepAliveRequests(1) + .secure(spec -> spec.sslContext(sslServer)) + .doOnConnection(connection -> { + connectedServerChannel.set(connection.channel()); + }) + .handle((req, resp) -> resp.sendString(Mono.just("Test"))) + .bindNow(); + + Http2PingFrameHandler handler = new Http2PingFrameHandler(); + createClient(disposableServer::address) + .protocol(H2) + .secure(spec -> spec.sslContext(sslClient)) + .doOnChannelInit((connectionObserver, channel, remoteAddress) -> { + Http2FrameCodec http2FrameCodec = Http2FrameCodecBuilder.forClient() + .autoAckPingFrame(false) + .autoAckSettingsFrame(true) + .build(); + + channel.pipeline().replace(NettyPipeline.HttpCodec, NettyPipeline.HttpCodec, http2FrameCodec); + channel.pipeline().addLast(handler); + }) + .get() + .uri("/") + .response() + .block(); + + Mono.delay(Duration.ofSeconds(5)) + .block(); + + assertThat(connectedServerChannel.get().parent().isOpen()).isTrue(); + assertThat(handler.getReceivedPingTimes()).isEmpty(); + } + + @ParameterizedTest + @CsvSource({ + "100,300,3", "300,100,3", + "100,300,3", "300,100,3" + }) + void closeConnectionIfPingFrameDelayed(Integer pingAckTimeout, Integer pingScheduleInterval, Integer pingAckDropThreshold) { + AtomicReference connectedServerChannel = new AtomicReference<>(); + disposableServer = createServer() + .protocol(H2) + .maxKeepAliveRequests(1) + .secure(spec -> spec.sslContext(sslServer)) + .doOnConnection(connection -> { + connectedServerChannel.set(connection.channel()); + }) + .idleTimeout(Duration.ofMillis(300)) + .http2Settings(builder -> { + builder.pingAckTimeout(Duration.ofMillis(pingAckTimeout)) + .pingScheduleInterval(Duration.ofMillis(pingScheduleInterval)) + .pingAckDropThreshold(pingAckDropThreshold); + }) + .handle((req, resp) -> resp.sendString(Mono.just("Test"))) + .bindNow(); + + Http2PingFrameHandler handler = new Http2PingFrameHandler( + (ctx, frame, receivedPingTimes) -> Mono.delay(Duration.ofMillis(600)) + .doOnNext( + unUsed -> ctx.writeAndFlush(new DefaultHttp2PingFrame(frame.content(), true)) + .addListener(ChannelFutureListener.CLOSE_ON_FAILURE) + ) + .subscribe() + ); + createClient(disposableServer::address) + .protocol(H2) + .secure(spec -> spec.sslContext(sslClient)) + .doOnChannelInit((connectionObserver, channel, remoteAddress) -> { + Http2FrameCodec http2FrameCodec = Http2FrameCodecBuilder.forClient() + .autoAckPingFrame(false) + .autoAckSettingsFrame(true) + .build(); + + channel.pipeline().replace(NettyPipeline.HttpCodec, NettyPipeline.HttpCodec, http2FrameCodec); + channel.pipeline().addLast(handler); + }) + .get() + .uri("/") + .response() + .block(); + + Mono.delay(Duration.ofSeconds(3)) + .block(); + + assertThat(connectedServerChannel.get().parent().isOpen()).isFalse(); + assertThat(handler.getReceivedPingTimes()).hasSizeGreaterThan(pingAckDropThreshold); + } + + @ParameterizedTest + @CsvSource({ + "100,300,3", "300,100,3", + "100,300,3", "300,100,3" + }) + void closeConnectionInPoolIfPingFrameDelayed(Integer pingAckTimeout, Integer pingScheduleInterval, Integer pingAckDropThreshold) { + AtomicReference connectedServerChannel = new AtomicReference<>(); + disposableServer = createServer() + .protocol(H2) + .maxKeepAliveRequests(1) + .secure(spec -> spec.sslContext(sslServer)) + .doOnConnection(connection -> { + connectedServerChannel.set(connection.channel()); + }) + .idleTimeout(Duration.ofMillis(300)) + .http2Settings(builder -> { + builder.pingAckTimeout(Duration.ofMillis(pingAckTimeout)) + .pingScheduleInterval(Duration.ofMillis(pingScheduleInterval)) + .pingAckDropThreshold(pingAckDropThreshold); + }) + .handle((req, resp) -> resp.sendString(Mono.just("Test"))) + .bindNow(); + + Http2PingFrameHandler handler = new Http2PingFrameHandler( + (ctx, frame, receivedPingTimes) -> Mono.delay(Duration.ofMillis(600)) + .doOnNext( + unUsed -> ctx.writeAndFlush(new DefaultHttp2PingFrame(frame.content(), true)) + .addListener(ChannelFutureListener.CLOSE_ON_FAILURE) + ) + .subscribe() + ); + + ConnectionProvider pool = ConnectionProvider.create("closeConnectionInPoolIfPingFrameDelayed", 1); + createClient(pool, disposableServer::address) + .protocol(H2) + .secure(spec -> spec.sslContext(sslClient)) + .doOnChannelInit((connectionObserver, channel, remoteAddress) -> { + Http2FrameCodec http2FrameCodec = Http2FrameCodecBuilder.forClient() + .autoAckPingFrame(false) + .autoAckSettingsFrame(true) + .build(); + + channel.pipeline().replace(NettyPipeline.HttpCodec, NettyPipeline.HttpCodec, http2FrameCodec); + channel.pipeline().addLast(handler); + }) + .get() + .uri("/") + .response() + .block(); + + Mono.delay(Duration.ofSeconds(4)) + .block(); + + assertThat(connectedServerChannel.get().parent().isOpen()).isFalse(); + assertThat(handler.getReceivedPingTimes()).hasSizeGreaterThan(pingAckDropThreshold); + + pool.dispose(); + } + + @ParameterizedTest + @CsvSource({ + "300,600,0", "600,300,0", + "300,600,0", "600,300,0" + }) + void ackPingFrameWithinInterval(Integer pingAckTimeout, Integer pingScheduleInterval, Integer pingAckDropThreshold) { + AtomicReference connectedServerChannel = new AtomicReference<>(); + disposableServer = createServer() + .protocol(H2) + .maxKeepAliveRequests(1) + .secure(spec -> spec.sslContext(sslServer)) + .doOnConnection(connection -> { + connectedServerChannel.set(connection.channel()); + }) + .idleTimeout(Duration.ofMillis(300)) + .http2Settings(builder -> { + builder.pingAckTimeout(Duration.ofMillis(pingAckTimeout)) + .pingScheduleInterval(Duration.ofMillis(pingScheduleInterval)) + .pingAckDropThreshold(pingAckDropThreshold); + }) + .handle((req, resp) -> resp.sendString(Mono.just("Test"))) + .bindNow(); + + Http2PingFrameHandler handler = new Http2PingFrameHandler(); + createClient(disposableServer::address) + .protocol(H2) + .secure(spec -> spec.sslContext(sslClient)) + .doOnChannelInit((connectionObserver, channel, remoteAddress) -> { + Http2FrameCodec http2FrameCodec = Http2FrameCodecBuilder.forClient() + .autoAckPingFrame(false) + .autoAckSettingsFrame(true) + .build(); + + channel.pipeline().replace(NettyPipeline.HttpCodec, NettyPipeline.HttpCodec, http2FrameCodec); + channel.pipeline().addLast(handler); + }) + .get() + .uri("/") + .response() + .block(); + + Mono.delay(Duration.ofSeconds(5)) + .block(); + + assertThat(connectedServerChannel.get().parent().isOpen()).isTrue(); + assertThat(handler.getReceivedPingTimes()).hasSizeGreaterThanOrEqualTo(2); + } + + @ParameterizedTest + @CsvSource({ + "300,600,0", "600,300,0", + "300,600,0", "600,300,0" + }) + void connectionRetentionInPoolOnPingFrameAck(Integer pingAckTimeout, Integer pingScheduleInterval, Integer pingAckDropThreshold) { + AtomicReference connectedServerChannel = new AtomicReference<>(); + disposableServer = createServer() + .protocol(H2) + .maxKeepAliveRequests(1) + .secure(spec -> spec.sslContext(sslServer)) + .doOnConnection(connection -> { + connectedServerChannel.set(connection.channel()); + }) + .idleTimeout(Duration.ofMillis(300)) + .http2Settings(builder -> { + builder.pingAckTimeout(Duration.ofMillis(pingAckTimeout)) + .pingScheduleInterval(Duration.ofMillis(pingScheduleInterval)) + .pingAckDropThreshold(pingAckDropThreshold); + }) + .handle((req, resp) -> resp.sendString(Mono.just("Test"))) + .bindNow(); + + Http2PingFrameHandler handler = new Http2PingFrameHandler(); + ConnectionProvider pool = ConnectionProvider.create("connectionRetentionInPoolOnPingFrameAck", 1); + createClient(pool, disposableServer::address) + .protocol(H2) + .secure(spec -> spec.sslContext(sslClient)) + .doOnChannelInit((connectionObserver, channel, remoteAddress) -> { + Http2FrameCodec http2FrameCodec = Http2FrameCodecBuilder.forClient() + .autoAckPingFrame(false) + .autoAckSettingsFrame(true) + .build(); + + channel.pipeline().replace(NettyPipeline.HttpCodec, NettyPipeline.HttpCodec, http2FrameCodec); + channel.pipeline().addLast(handler); + }) + .get() + .uri("/") + .response() + .block(); + + Mono.delay(Duration.ofSeconds(5)) + .block(); + + assertThat(connectedServerChannel.get().parent().isOpen()).isTrue(); + assertThat(handler.getReceivedPingTimes()).hasSizeGreaterThanOrEqualTo(2); + + pool.dispose(); + } + + @ParameterizedTest + @CsvSource({ + "300,600,3", "600,300,3", + "300,600,3", "600,300,3" + }) + void ackPingFrameWithinThreshold(Integer pingAckTimeout, Integer pingScheduleInterval, Integer pingAckDropThreshold) { + AtomicReference connectedServerChannel = new AtomicReference<>(); + disposableServer = createServer() + .protocol(H2) + .maxKeepAliveRequests(1) + .secure(spec -> spec.sslContext(sslServer)) + .doOnConnection(connection -> { + connectedServerChannel.set(connection.channel()); + }) + .idleTimeout(Duration.ofMillis(300)) + .http2Settings(builder -> { + builder.pingAckTimeout(Duration.ofMillis(pingAckTimeout)) + .pingScheduleInterval(Duration.ofMillis(pingScheduleInterval)) + .pingAckDropThreshold(pingAckDropThreshold); + }) + .handle((req, resp) -> resp.sendString(Mono.just("Test"))) + .bindNow(); + + Http2PingFrameHandler handler = new Http2PingFrameHandler( + (ctx, frame, receivedPingTimes) -> { + int delayTime = 0; + if (receivedPingTimes.size() % 3 != 0) { + delayTime = 600; + } + + Mono.delay(Duration.ofMillis(delayTime)) + .doOnNext( + unUsed -> ctx.writeAndFlush(new DefaultHttp2PingFrame(frame.content(), true)) + .addListener(ChannelFutureListener.CLOSE_ON_FAILURE) + ) + .subscribe(); + } + ); + createClient(disposableServer::address) + .protocol(H2) + .secure(spec -> spec.sslContext(sslClient)) + .doOnChannelInit((connectionObserver, channel, remoteAddress) -> { + Http2FrameCodec http2FrameCodec = Http2FrameCodecBuilder.forClient() + .autoAckPingFrame(false) + .autoAckSettingsFrame(true) + .build(); + + channel.pipeline().replace(NettyPipeline.HttpCodec, NettyPipeline.HttpCodec, http2FrameCodec); + channel.pipeline().addLast(handler); + }) + .get() + .uri("/") + .response() + .block(); + + Mono.delay(Duration.ofSeconds(5)) + .block(); + + assertThat(connectedServerChannel.get().parent().isOpen()).isTrue(); + assertThat(handler.getReceivedPingTimes()).hasSizeGreaterThanOrEqualTo(2); + } + } + + @Nested + class H2cTest { + + @Test + void successReceiveResponse() { + disposableServer = createServer() + .protocol(H2C) + .handle((req, resp) -> resp.sendString(Mono.just("Test"))) + .bindNow(); + + String result = createClient(disposableServer::address) + .protocol(H2C) + .get() + .uri("/") + .responseSingle((resp, bytes) -> bytes.asString()) + .block(); + + assertThat(result).isEqualTo("Test"); + } + + @Test + void maintainConnectionWithoutPingCheckWhenNotConfigured() { + AtomicReference connectedServerChannel = new AtomicReference<>(); + disposableServer = createServer() + .protocol(H2C) + .maxKeepAliveRequests(1) + .doOnConnection(connection -> { + connectedServerChannel.set(connection.channel()); + }) + .handle((req, resp) -> resp.sendString(Mono.just("Test"))) + .bindNow(); + + Http2PingFrameHandler handler = new Http2PingFrameHandler(); + createClient(disposableServer::address) + .protocol(H2C) + .doOnChannelInit((connectionObserver, channel, remoteAddress) -> { + Http2FrameCodec http2FrameCodec = Http2FrameCodecBuilder.forClient() + .autoAckPingFrame(false) + .autoAckSettingsFrame(true) + .build(); + + channel.pipeline().replace(NettyPipeline.HttpCodec, NettyPipeline.HttpCodec, http2FrameCodec); + channel.pipeline().addLast(handler); + }) + .get() + .uri("/") + .response() + .block(); + + Mono.delay(Duration.ofSeconds(5)) + .block(); + + assertThat(connectedServerChannel.get().parent().isOpen()).isTrue(); + assertThat(handler.getReceivedPingTimes()).isEmpty(); + } + + @ParameterizedTest + @CsvSource({ + "100,300,3", "300,100,3", + "100,300,3", "300,100,3" + }) + void closeConnectionIfPingFrameDelayed(Integer pingAckTimeout, Integer pingScheduleInterval, Integer pingAckDropThreshold) { + AtomicReference connectedServerChannel = new AtomicReference<>(); + disposableServer = createServer() + .protocol(H2C) + .maxKeepAliveRequests(1) + .doOnConnection(connection -> { + connectedServerChannel.set(connection.channel()); + }) + .idleTimeout(Duration.ofMillis(300)) + .http2Settings(builder -> { + builder.pingAckTimeout(Duration.ofMillis(pingAckTimeout)) + .pingScheduleInterval(Duration.ofMillis(pingScheduleInterval)) + .pingAckDropThreshold(pingAckDropThreshold); + }) + .handle((req, resp) -> resp.sendString(Mono.just("Test"))) + .bindNow(); + + Http2PingFrameHandler handler = new Http2PingFrameHandler( + (ctx, frame, receivedPingTimes) -> Mono.delay(Duration.ofMillis(600)) + .doOnNext( + unUsed -> ctx.writeAndFlush(new DefaultHttp2PingFrame(frame.content(), true)) + .addListener(ChannelFutureListener.CLOSE_ON_FAILURE) + ) + .subscribe() + ); + createClient(disposableServer::address) + .protocol(H2C) + .doOnChannelInit((connectionObserver, channel, remoteAddress) -> { + Http2FrameCodec http2FrameCodec = Http2FrameCodecBuilder.forClient() + .autoAckPingFrame(false) + .autoAckSettingsFrame(true) + .build(); + + channel.pipeline().replace(NettyPipeline.HttpCodec, NettyPipeline.HttpCodec, http2FrameCodec); + channel.pipeline().addLast(handler); + }) + .get() + .uri("/") + .response() + .block(); + + Mono.delay(Duration.ofSeconds(3)) + .block(); + + assertThat(connectedServerChannel.get().parent().isOpen()).isFalse(); + assertThat(handler.getReceivedPingTimes()).hasSizeGreaterThan(pingAckDropThreshold); + } + + @ParameterizedTest + @CsvSource({ + "100,300,3", "300,100,3", + "100,300,3", "300,100,3" + }) + void closeConnectionInPoolIfPingFrameDelayed(Integer pingAckTimeout, Integer pingScheduleInterval, Integer pingAckDropThreshold) { + AtomicReference connectedServerChannel = new AtomicReference<>(); + disposableServer = createServer() + .protocol(H2C) + .maxKeepAliveRequests(1) + .doOnConnection(connection -> { + connectedServerChannel.set(connection.channel()); + }) + .idleTimeout(Duration.ofMillis(300)) + .http2Settings(builder -> { + builder.pingAckTimeout(Duration.ofMillis(pingAckTimeout)) + .pingScheduleInterval(Duration.ofMillis(pingScheduleInterval)) + .pingAckDropThreshold(pingAckDropThreshold); + }) + .handle((req, resp) -> resp.sendString(Mono.just("Test"))) + .bindNow(); + + Http2PingFrameHandler handler = new Http2PingFrameHandler( + (ctx, frame, receivedPingTimes) -> Mono.delay(Duration.ofMillis(600)) + .doOnNext( + unUsed -> ctx.writeAndFlush(new DefaultHttp2PingFrame(frame.content(), true)) + .addListener(ChannelFutureListener.CLOSE_ON_FAILURE) + ) + .subscribe() + ); + ConnectionProvider pool = ConnectionProvider.create("closeConnectionInPoolIfPingFrameDelayed", 1); + createClient(pool, disposableServer::address) + .protocol(H2C) + .doOnChannelInit((connectionObserver, channel, remoteAddress) -> { + Http2FrameCodec http2FrameCodec = Http2FrameCodecBuilder.forClient() + .autoAckPingFrame(false) + .autoAckSettingsFrame(true) + .build(); + + channel.pipeline().replace(NettyPipeline.HttpCodec, NettyPipeline.HttpCodec, http2FrameCodec); + channel.pipeline().addLast(handler); + }) + .get() + .response() + .block(); + + Mono.delay(Duration.ofSeconds(4)) + .block(); + + assertThat(connectedServerChannel.get().parent().isOpen()).isFalse(); + assertThat(handler.getReceivedPingTimes()).hasSizeGreaterThan(pingAckDropThreshold); + + pool.dispose(); + } + + @ParameterizedTest + @CsvSource({ + "300,600,0", "600,300,0", + "300,600,0", "600,300,0" + }) + void ackPingFrameWithinInterval(Integer pingAckTimeout, Integer pingScheduleInterval, Integer pingAckDropThreshold) { + AtomicReference connectedServerChannel = new AtomicReference<>(); + disposableServer = createServer() + .protocol(H2C) + .maxKeepAliveRequests(1) + .doOnConnection(connection -> { + connectedServerChannel.set(connection.channel()); + }) + .idleTimeout(Duration.ofMillis(300)) + .http2Settings(builder -> { + builder.pingAckTimeout(Duration.ofMillis(pingAckTimeout)) + .pingScheduleInterval(Duration.ofMillis(pingScheduleInterval)) + .pingAckDropThreshold(pingAckDropThreshold); + }) + .handle((req, resp) -> resp.sendString(Mono.just("Test"))) + .bindNow(); + + Http2PingFrameHandler handler = new Http2PingFrameHandler(); + createClient(disposableServer::address) + .protocol(H2C) + .doOnChannelInit((connectionObserver, channel, remoteAddress) -> { + Http2FrameCodec http2FrameCodec = Http2FrameCodecBuilder.forClient() + .autoAckPingFrame(false) + .autoAckSettingsFrame(true) + .build(); + + channel.pipeline().replace(NettyPipeline.HttpCodec, NettyPipeline.HttpCodec, http2FrameCodec); + channel.pipeline().addLast(handler); + }) + .get() + .uri("/") + .response() + .block(); + + Mono.delay(Duration.ofSeconds(5)) + .block(); + + assertThat(connectedServerChannel.get().parent().isOpen()).isTrue(); + assertThat(handler.getReceivedPingTimes()).hasSizeGreaterThanOrEqualTo(2); + } + + @ParameterizedTest + @CsvSource({ + "300,600,0", "600,300,0", + "300,600,0", "600,300,0" + }) + void connectionRetentionInPoolOnPingFrameAck(Integer pingAckTimeout, Integer pingScheduleInterval, Integer pingAckDropThreshold) { + AtomicReference connectedServerChannel = new AtomicReference<>(); + disposableServer = createServer() + .protocol(H2C) + .maxKeepAliveRequests(1) + .doOnConnection(connection -> { + connectedServerChannel.set(connection.channel()); + }) + .idleTimeout(Duration.ofMillis(300)) + .http2Settings(builder -> { + builder.pingAckTimeout(Duration.ofMillis(pingAckTimeout)) + .pingScheduleInterval(Duration.ofMillis(pingScheduleInterval)) + .pingAckDropThreshold(pingAckDropThreshold); + }) + .handle((req, resp) -> resp.sendString(Mono.just("Test"))) + .bindNow(); + + Http2PingFrameHandler handler = new Http2PingFrameHandler(); + ConnectionProvider pool = ConnectionProvider.create("connectionRetentionInPoolOnPingFrameAck", 1); + createClient(pool, disposableServer::address) + .protocol(H2C) + .doOnChannelInit((connectionObserver, channel, remoteAddress) -> { + Http2FrameCodec http2FrameCodec = Http2FrameCodecBuilder.forClient() + .autoAckPingFrame(false) + .autoAckSettingsFrame(true) + .build(); + + channel.pipeline().replace(NettyPipeline.HttpCodec, NettyPipeline.HttpCodec, http2FrameCodec); + channel.pipeline().addLast(handler); + }) + .get() + .uri("/") + .response() + .block(); + + Mono.delay(Duration.ofSeconds(5)) + .block(); + + assertThat(connectedServerChannel.get().parent().isOpen()).isTrue(); + assertThat(handler.getReceivedPingTimes()).hasSizeGreaterThanOrEqualTo(2); + + pool.dispose(); + } + + @ParameterizedTest + @CsvSource({ + "300,600,3", "600,300,3", + "300,600,3", "600,300,3" + }) + void ackPingFrameWithinThreshold(Integer pingAckTimeout, Integer pingScheduleInterval, Integer pingAckDropThreshold) { + AtomicReference connectedServerChannel = new AtomicReference<>(); + disposableServer = createServer() + .protocol(H2C) + .maxKeepAliveRequests(1) + .doOnConnection(connection -> { + connectedServerChannel.set(connection.channel()); + }) + .idleTimeout(Duration.ofMillis(300)) + .http2Settings(builder -> { + builder.pingAckTimeout(Duration.ofMillis(pingAckTimeout)) + .pingScheduleInterval(Duration.ofMillis(pingScheduleInterval)) + .pingAckDropThreshold(pingAckDropThreshold); + }) + .handle((req, resp) -> resp.sendString(Mono.just("Test"))) + .bindNow(); + + Http2PingFrameHandler handler = new Http2PingFrameHandler( + (ctx, frame, receivedPingTimes) -> { + int delayTime = 0; + if (receivedPingTimes.size() % 3 != 0) { + delayTime = 600; + } + + Mono.delay(Duration.ofMillis(delayTime)) + .doOnNext( + unUsed -> ctx.writeAndFlush(new DefaultHttp2PingFrame(frame.content(), true)) + .addListener(ChannelFutureListener.CLOSE_ON_FAILURE) + ) + .subscribe(); + } + ); + createClient(disposableServer::address) + .protocol(H2C) + .doOnChannelInit((connectionObserver, channel, remoteAddress) -> { + Http2FrameCodec http2FrameCodec = Http2FrameCodecBuilder.forClient() + .autoAckPingFrame(false) + .autoAckSettingsFrame(true) + .build(); + + channel.pipeline().replace(NettyPipeline.HttpCodec, NettyPipeline.HttpCodec, http2FrameCodec); + channel.pipeline().addLast(handler); + }) + .get() + .uri("/") + .response() + .block(); + + Mono.delay(Duration.ofSeconds(5)) + .block(); + + assertThat(connectedServerChannel.get().parent().isOpen()).isTrue(); + assertThat(handler.getReceivedPingTimes()).hasSizeGreaterThanOrEqualTo(2); + } + } + + @Nested + class Http11Test { + + @Test + void closeWithoutDelay() { + AtomicReference connectedServerChannel = new AtomicReference<>(); + disposableServer = createServer() + .protocol(HTTP11) + .maxKeepAliveRequests(1) + .doOnConnection(connection -> { + connectedServerChannel.set(connection.channel()); + }) + .secure(spec -> spec.sslContext(sslServer)) + .handle((req, resp) -> resp.sendString(Mono.just("Test"))) + .bindNow(); + + ConnectionProvider provider = ConnectionProvider.builder("closeWithoutDelay") + .maxIdleTime(Duration.ofMillis(100)) + .build(); + createClient(provider, disposableServer::address) + .protocol(HTTP11) + .secure(spec -> spec.sslContext(sslClient)) + .get() + .uri("/") + .response() + .block(); + + Mono.delay(Duration.ofSeconds(1)) + .block(); + + assertThat(connectedServerChannel.get().isOpen()).isFalse(); + } + } + + private static final class Http2PingFrameHandler extends SimpleChannelInboundHandler { + + private final List receivedPingTimes = new ArrayList<>(); + + private final TriConsumer> consumer; + + private Http2PingFrameHandler() { + this.consumer = (ctx, frame, receivedPings) -> { + Http2FrameCodec channelHandler = ctx.pipeline().get(Http2FrameCodec.class); + Http2FrameWriter http2FrameWriter = channelHandler.encoder() + .frameWriter(); + + http2FrameWriter.writePing(ctx, true, frame.content(), ctx.newPromise()) + .addListener((ChannelFuture future) -> { + if (future.isSuccess()) { + log.debug("[Http2PingFrameHandler] Wrote PING frame to {} channel.", future.channel()); + } + else { + log.debug("[Http2PingFrameHandler] Failed to wrote PING frame to {} channel.", future.channel()); + } + }); + }; + } + + private Http2PingFrameHandler(TriConsumer> consumer) { + this.consumer = consumer; + } + + @Override + protected void channelRead0(ChannelHandlerContext ctx, Http2PingFrame frame) throws InterruptedException { + receivedPingTimes.add(LocalDateTime.now(ZoneId.systemDefault())); + consumer.accept(ctx, frame, receivedPingTimes); + } + + public List getReceivedPingTimes() { + return receivedPingTimes.stream() + .sorted() + .collect(Collectors.toList()); + } + } + + @FunctionalInterface + public interface TriConsumer { + void accept(T t, U u, V v); + } +} diff --git a/reactor-netty-http/src/test/java/reactor/netty/http/server/HttpIdleTimeoutTest.java b/reactor-netty-http/src/test/java/reactor/netty/http/server/HttpIdleTimeoutTest.java new file mode 100644 index 0000000000..afa0cfe723 --- /dev/null +++ b/reactor-netty-http/src/test/java/reactor/netty/http/server/HttpIdleTimeoutTest.java @@ -0,0 +1,187 @@ +/* + * Copyright (c) 2025 VMware, Inc. or its affiliates, All Rights Reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package reactor.netty.http.server; + +import io.netty.channel.Channel; +import io.netty.handler.ssl.SslContext; +import io.netty.handler.ssl.SslContextBuilder; +import io.netty.handler.ssl.util.InsecureTrustManagerFactory; +import io.netty.handler.ssl.util.SelfSignedCertificate; +import org.junit.jupiter.api.BeforeAll; +import org.junit.jupiter.api.Test; +import reactor.core.publisher.Mono; +import reactor.netty.BaseHttpTest; + +import javax.net.ssl.SSLException; +import java.security.cert.CertificateException; +import java.time.Duration; + +import static org.assertj.core.api.Assertions.assertThat; +import static reactor.netty.http.HttpProtocol.H2; +import static reactor.netty.http.HttpProtocol.H2C; +import static reactor.netty.http.HttpProtocol.HTTP11; + +class HttpIdleTimeoutTest extends BaseHttpTest { + + static SslContext sslServer; + static SslContext sslClient; + + @BeforeAll + static void createSelfSignedCertificate() throws CertificateException, SSLException { + SelfSignedCertificate ssc = new SelfSignedCertificate(); + sslServer = SslContextBuilder.forServer(ssc.certificate(), ssc.privateKey()) + .build(); + sslClient = SslContextBuilder.forClient() + .trustManager(InsecureTrustManagerFactory.INSTANCE) + .build(); + } + + @Test + void idleTimeoutInHttp11() { + disposableServer = createServer() + .protocol(HTTP11) + .idleTimeout(Duration.ofSeconds(3)) + .handle((req, resp) -> resp.sendString(Mono.just("Test"))) + .bindNow(); + + Channel channel = createClient(disposableServer::address) + .protocol(HTTP11) + .get() + .uri("/") + .responseConnection((conn, receiver) -> Mono.just(receiver.channel())) + .single() + .block(); + + Mono.delay(Duration.ofSeconds(2)) + .block(); + + assertThat(channel.isOpen()).isTrue(); + } + + @Test + void idleTimeoutInH2C() { + disposableServer = createServer() + .protocol(HTTP11, H2C) + .idleTimeout(Duration.ofSeconds(3)) + .handle((req, resp) -> resp.sendString(Mono.just("Test"))) + .bindNow(); + + Channel channel = createClient(disposableServer::address) + .protocol(HTTP11, H2C) + .get() + .uri("/") + .responseConnection((conn, receiver) -> Mono.just(receiver.channel())) + .single() + .block(); + + Mono.delay(Duration.ofSeconds(2)) + .block(); + + assertThat(channel.parent().isOpen()).isTrue(); + } + + @Test + void idleTimeoutInHttp2() { + disposableServer = createServer() + .protocol(H2) + .idleTimeout(Duration.ofSeconds(3)) + .secure(spec -> spec.sslContext(sslServer)) + .handle((req, resp) -> resp.sendString(Mono.just("Test"))) + .bindNow(); + + Channel channel = createClient(disposableServer::address) + .protocol(H2) + .secure(spec -> spec.sslContext(sslClient)) + .get() + .uri("/") + .responseConnection((conn, receiver) -> Mono.just(receiver.channel())) + .single() + .block(); + + Mono.delay(Duration.ofSeconds(2)) + .block(); + + assertThat(channel.parent().isOpen()).isTrue(); + } + + @Test + void closeAfterIdleTimeoutInHttp11() { + disposableServer = createServer() + .protocol(HTTP11) + .idleTimeout(Duration.ofSeconds(2)) + .handle((req, resp) -> resp.sendString(Mono.just("Test"))) + .bindNow(); + + Channel channel = createClient(disposableServer::address) + .protocol(HTTP11) + .get() + .uri("/") + .responseConnection((conn, receiver) -> Mono.just(receiver.channel())) + .single() + .block(); + + Mono.delay(Duration.ofSeconds(3)) + .block(); + + assertThat(channel.isOpen()).isFalse(); + } + + @Test + void closeAfterIdleTimeoutInH2C() { + disposableServer = createServer() + .protocol(HTTP11, H2C) + .idleTimeout(Duration.ofSeconds(2)) + .handle((req, resp) -> resp.sendString(Mono.just("Test"))) + .bindNow(); + + Channel channel = createClient(disposableServer::address) + .protocol(HTTP11, H2C) + .get() + .uri("/") + .responseConnection((conn, receiver) -> Mono.just(receiver.channel())) + .single() + .block(); + + Mono.delay(Duration.ofSeconds(3)) + .block(); + + assertThat(channel.parent().isOpen()).isFalse(); + } + + @Test + void closeAfterIdleTimeoutInHttp2() { + disposableServer = createServer() + .protocol(H2) + .idleTimeout(Duration.ofSeconds(2)) + .secure(spec -> spec.sslContext(sslServer)) + .handle((req, resp) -> resp.sendString(Mono.just("Test"))) + .bindNow(); + + Channel channel = createClient(disposableServer::address) + .protocol(H2) + .secure(spec -> spec.sslContext(sslClient)) + .get() + .uri("/") + .responseConnection((conn, receiver) -> Mono.just(receiver.channel())) + .single() + .block(); + + Mono.delay(Duration.ofSeconds(3)) + .block(); + + assertThat(channel.parent().isOpen()).isFalse(); + } +} diff --git a/reactor-netty-http/src/test/java/reactor/netty/resources/PooledConnectionProviderDefaultMetricsTest.java b/reactor-netty-http/src/test/java/reactor/netty/resources/PooledConnectionProviderDefaultMetricsTest.java index b63dff62a7..74ebb039f6 100644 --- a/reactor-netty-http/src/test/java/reactor/netty/resources/PooledConnectionProviderDefaultMetricsTest.java +++ b/reactor-netty-http/src/test/java/reactor/netty/resources/PooledConnectionProviderDefaultMetricsTest.java @@ -1,5 +1,5 @@ /* - * Copyright (c) 2019-2024 VMware, Inc. or its affiliates, All Rights Reserved. + * Copyright (c) 2019-2025 VMware, Inc. or its affiliates, All Rights Reserved. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -388,6 +388,8 @@ private void testIssue3060(ConnectionProvider provider) throws Exception { /* https://github.com/reactor/reactor-netty/issues/3519 */ @Test public void testConnectionProviderDisableAllBuiltInMetrics() throws Exception { + REGISTRY.clear(); + disposableServer = createServer() .handle((req, res) -> res.sendString(Mono.just("testConnectionProviderDisableAllBuiltInMetrics")))