Skip to content

Commit be42a6b

Browse files
committed
Server Netty. Ensure connection reset detection works with HTTP2.
1 parent 12d380f commit be42a6b

File tree

9 files changed

+136
-87
lines changed

9 files changed

+136
-87
lines changed

ktor-server/ktor-server-cio/jvm/test/io/ktor/tests/server/cio/CIOEngineTestJvm.kt

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -90,5 +90,6 @@ class CIOHttpRequestLifecycleTest :
9090
HttpRequestLifecycleTest<CIOApplicationEngine, CIOApplicationEngine.Configuration>(CIO) {
9191
init {
9292
enableSsl = false
93+
enableHttp2 = false
9394
}
9495
}

ktor-server/ktor-server-core/common/src/io/ktor/server/http/HttpRequestLifecycle.kt

Lines changed: 13 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,12 @@ public class HttpRequestLifecycleConfig internal constructor() {
1919
* When `true`, cancels the call coroutine context if the other peer resets the client connection.
2020
* When `false` (default), request processing continues even if the connection is closed.
2121
*
22+
* **When to use this property: **
23+
* - Set to `true` for long-running or resource-intensive requests where you want to stop processing
24+
* immediately when the client disconnects (e.g., streaming, batch processing, heavy computations)
25+
* - Keep as `false` (default) for short requests, or when you need to complete processing regardless
26+
* of client connection status (e.g., important side effects, database transactions)
27+
*
2228
* Example:
2329
* ```kotlin
2430
* install(HttpRequestLifecycle) {
@@ -43,7 +49,8 @@ public val HttpRequestCloseHandlerKey: AttributeKey<() -> Unit> = AttributeKey<(
4349
* the plugin will automatically cancel the request handling coroutine if the client disconnects,
4450
* preventing unnecessary processing and freeing up resources.
4551
*
46-
* Remember, coroutine cancellation doesn't stop blocking operations, so check [call.coroutineContext.isActive] if needed.
52+
* Remember, when the coroutine context is canceled, the next suspension point will throw [CancellationException], but until
53+
* that moment it doesn't stop any blocking operations, so call `call.coroutineContext.ensureActive` if needed.
4754
* Plugin only works for CIO and Netty engines. Other implementations fail on closed connection only when trying to write some response.
4855
*
4956
* This is particularly useful for:
@@ -63,17 +70,14 @@ public val HttpRequestCloseHandlerKey: AttributeKey<() -> Unit> = AttributeKey<(
6370
* try {
6471
* // Long-running operation
6572
* repeat(100) {
66-
* delay(100)
67-
* // Process data...
73+
* // throws an exception if the client disconnects during processing
74+
* call.coroutineContext.ensureActive()
75+
* // Process more data...
76+
* logger.info("Very important work.")
6877
* }
6978
* call.respond("Completed")
7079
* } catch (e: CancellationException) {
71-
* if (e.rootCause is ConnectionClosedException) {
72-
* // Client disconnected, clean up resources
73-
* logger.info("Request cancelled due to client disconnect")
74-
* } else {
75-
* throw e
76-
* }
80+
* // Handle client disconnected, clean up resources
7781
* }
7882
* }
7983
* }

ktor-server/ktor-server-netty/jvm/src/io/ktor/server/netty/NettyApplicationCallHandler.kt

Lines changed: 11 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@ internal class NettyApplicationCallHandler(
2626
private val enginePipeline: EnginePipeline
2727
) : ChannelInboundHandlerAdapter(), CoroutineScope {
2828
private var currentJob: Job? = null
29+
private var currentCall: PipelineCall? = null
2930

3031
override val coroutineContext: CoroutineContext = userCoroutineContext
3132

@@ -36,22 +37,21 @@ internal class NettyApplicationCallHandler(
3637
}
3738
}
3839

39-
private var onConnectionClose: (() -> Unit)? = null
40-
41-
override fun handlerRemoved(ctx: ChannelHandlerContext?) {
42-
if (ctx?.channel()?.isActive == false) {
43-
onConnectionClose?.invoke()
44-
onConnectionClose = null
40+
internal fun onConnectionClose(context: ChannelHandlerContext) {
41+
if (context.channel().isActive) {
42+
return
43+
}
44+
currentCall?.let {
45+
currentCall = null
46+
@OptIn(InternalAPI::class)
47+
it.attributes.getOrNull(HttpRequestCloseHandlerKey)?.invoke()
4548
}
4649
}
4750

48-
@OptIn(InternalAPI::class)
4951
private fun handleRequest(context: ChannelHandlerContext, call: PipelineCall) {
5052
val callContext = CallHandlerCoroutineName + NettyDispatcher.CurrentContext(context)
51-
onConnectionClose = {
52-
call.attributes.getOrNull(HttpRequestCloseHandlerKey)?.invoke()
53-
}
5453

54+
currentCall = call
5555
currentJob = launch(callContext, start = CoroutineStart.UNDISPATCHED) {
5656
when {
5757
call is NettyHttp1ApplicationCall && !call.request.isValid() -> {
@@ -82,8 +82,7 @@ internal class NettyApplicationCallHandler(
8282
}
8383

8484
override fun channelInactive(ctx: ChannelHandlerContext) {
85-
onConnectionClose?.invoke()
86-
onConnectionClose = null
85+
onConnectionClose(ctx)
8786
ctx.fireChannelInactive()
8887
}
8988

ktor-server/ktor-server-netty/jvm/src/io/ktor/server/netty/http1/NettyHttp1Handler.kt

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -88,7 +88,8 @@ internal class NettyHttp1Handler(
8888
}
8989

9090
override fun channelInactive(context: ChannelHandlerContext) {
91-
context.pipeline().remove(NettyApplicationCallHandler::class.java)
91+
val handler = context.pipeline().remove(NettyApplicationCallHandler::class.java)
92+
handler.onConnectionClose(context)
9293
context.fireChannelInactive()
9394
}
9495

ktor-server/ktor-server-netty/jvm/src/io/ktor/server/netty/http2/NettyHttp2Handler.kt

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -160,7 +160,7 @@ internal class NettyHttp2Handler(
160160
try {
161161
Http2FrameCodec::class.javaObjectType.getDeclaredField("streamKey")
162162
.also { it.isAccessible = true }
163-
} catch (cause: Throwable) {
163+
} catch (_: Throwable) {
164164
null
165165
}
166166
}
@@ -174,7 +174,7 @@ internal class NettyHttp2Handler(
174174

175175
try {
176176
function.invoke(this, streamKey, childStream)
177-
} catch (cause: Throwable) {
177+
} catch (_: Throwable) {
178178
return false
179179
}
180180

@@ -187,7 +187,7 @@ internal class NettyHttp2Handler(
187187
private tailrec fun Class<*>.findIdField(): Field {
188188
val idField = try {
189189
getDeclaredField("id")
190-
} catch (t: NoSuchFieldException) {
190+
} catch (_: NoSuchFieldException) {
191191
null
192192
}
193193
if (idField != null) {

ktor-server/ktor-server-netty/jvm/test/io/ktor/tests/server/netty/NettyEngineTest.kt

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -388,4 +388,9 @@ class NettyH2cEnabledTest :
388388
}
389389

390390
class NettyHttpRequestLifecycleTest :
391-
HttpRequestLifecycleTest<NettyApplicationEngine, NettyApplicationEngine.Configuration>(Netty)
391+
HttpRequestLifecycleTest<NettyApplicationEngine, NettyApplicationEngine.Configuration>(Netty) {
392+
init {
393+
enableSsl = true
394+
enableHttp2 = true
395+
}
396+
}

ktor-server/ktor-server-test-base/jvm/src/io/ktor/server/test/base/EngineTestBaseJvm.kt

Lines changed: 40 additions & 34 deletions
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,6 @@ package io.ktor.server.test.base
77
import io.ktor.client.*
88
import io.ktor.client.engine.apache.*
99
import io.ktor.client.engine.cio.*
10-
import io.ktor.client.plugins.*
1110
import io.ktor.client.request.*
1211
import io.ktor.client.statement.*
1312
import io.ktor.http.*
@@ -249,10 +248,10 @@ actual abstract class EngineTestBase<
249248
builder: suspend HttpRequestBuilder.() -> Unit,
250249
block: suspend HttpResponse.(Int) -> Unit
251250
) {
252-
withUrl("http://127.0.0.1:$port$path", port, builder, block)
251+
withHttp1("http://127.0.0.1:$port$path", port, builder, block)
253252

254253
if (enableSsl) {
255-
withUrl("https://127.0.0.1:$sslPort$path", sslPort, builder, block)
254+
withHttp1("https://127.0.0.1:$sslPort$path", sslPort, builder, block)
256255
}
257256

258257
if (enableHttp2 && enableSsl) {
@@ -270,7 +269,7 @@ actual abstract class EngineTestBase<
270269
}
271270
}
272271

273-
private suspend fun withUrl(
272+
protected suspend fun withHttp1(
274273
urlString: String,
275274
port: Int,
276275
builder: suspend HttpRequestBuilder.() -> Unit,
@@ -284,22 +283,13 @@ actual abstract class EngineTestBase<
284283
}
285284
}
286285

287-
private suspend fun withHttp2(
286+
protected suspend fun withHttp2(
288287
url: String,
289288
port: Int,
290289
builder: suspend HttpRequestBuilder.() -> Unit,
291290
block: suspend HttpResponse.(Int) -> Unit
292291
) {
293-
HttpClient(Apache) {
294-
followRedirects = false
295-
expectSuccess = false
296-
engine {
297-
pipelining = true
298-
sslContext = SSLContext.getInstance("SSL").apply {
299-
init(null, trustAllCertificates, SecureRandom())
300-
}
301-
}
302-
}.use { client ->
292+
createApacheClient().use { client ->
303293
client.prepareRequest(url) {
304294
builder()
305295
}.execute { response ->
@@ -310,33 +300,48 @@ actual abstract class EngineTestBase<
310300

311301
companion object {
312302
val keyStoreFile: File = File("build/temp.jks")
313-
lateinit var keyStore: KeyStore
314-
lateinit var sslContext: SSLContext
315-
lateinit var trustManager: X509TrustManager
303+
val keyStore: KeyStore by lazy { generateCertificate(keyStoreFile) }
316304
lateinit var client: HttpClient
317305

318-
@BeforeAll
319-
@JvmStatic
320-
fun setupAll() {
321-
keyStore = generateCertificate(keyStoreFile, algorithm = "SHA256withECDSA", keySizeInBits = 256)
306+
fun createTrustManager(): X509TrustManager {
307+
val sslContext = SSLContext.getInstance("TLS")
322308
val tmf = TrustManagerFactory.getInstance(TrustManagerFactory.getDefaultAlgorithm())
323309
tmf.init(keyStore)
324-
sslContext = SSLContext.getInstance("TLS")
325310
sslContext.init(null, tmf.trustManagers, null)
326-
trustManager = tmf.trustManagers.first { it is X509TrustManager } as X509TrustManager
311+
return tmf.trustManagers.first { it is X509TrustManager } as X509TrustManager
312+
}
327313

328-
client = HttpClient(CIO) {
314+
fun createCIOClient(): HttpClient {
315+
return HttpClient(CIO) {
329316
engine {
330-
https.trustManager = trustManager
317+
https.trustManager = createTrustManager()
331318
https.serverName = "localhost"
332319
requestTimeout = 0
333320
}
321+
followRedirects = false
322+
expectSuccess = false
323+
}
324+
}
334325

326+
fun createApacheClient(): HttpClient {
327+
return HttpClient(Apache) {
335328
followRedirects = false
336329
expectSuccess = false
330+
engine {
331+
pipelining = true
332+
sslContext = SSLContext.getInstance("SSL").apply {
333+
init(null, trustAllCertificates, SecureRandom())
334+
}
335+
}
337336
}
338337
}
339338

339+
@BeforeAll
340+
@JvmStatic
341+
fun setupAll() {
342+
client = createCIOClient()
343+
}
344+
340345
@AfterAll
341346
@JvmStatic
342347
fun cleanup() {
@@ -354,13 +359,14 @@ actual abstract class EngineTestBase<
354359
}
355360
} while (true)
356361
}
357-
}
358362

359-
private val trustAllCertificates = arrayOf<X509TrustManager>(
360-
object : X509TrustManager {
361-
override fun getAcceptedIssuers(): Array<X509Certificate> = emptyArray()
362-
override fun checkClientTrusted(certs: Array<X509Certificate>, authType: String) {}
363-
override fun checkServerTrusted(certs: Array<X509Certificate>, authType: String) {}
364-
}
365-
)
363+
private val trustAllCertificates = arrayOf<X509TrustManager>(
364+
@Suppress("CustomX509TrustManager")
365+
object : X509TrustManager {
366+
override fun getAcceptedIssuers(): Array<X509Certificate> = emptyArray()
367+
override fun checkClientTrusted(certs: Array<X509Certificate>, authType: String) {}
368+
override fun checkServerTrusted(certs: Array<X509Certificate>, authType: String) {}
369+
}
370+
)
371+
}
366372
}

ktor-server/ktor-server-test-suites/jvm/src/io/ktor/server/testing/suites/ConfigTestSuite.kt

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -10,8 +10,6 @@ import java.util.concurrent.*
1010
import kotlin.system.*
1111
import kotlin.test.*
1212

13-
var count = 0
14-
1513
abstract class ConfigTestSuite(val engine: ApplicationEngineFactory<*, *>) {
1614

1715
@Test
@@ -58,4 +56,8 @@ abstract class ConfigTestSuite(val engine: ApplicationEngineFactory<*, *>) {
5856

5957
assertTrue(time < 100, "Stop time is $time")
6058
}
59+
60+
private companion object {
61+
var count = 0
62+
}
6163
}

0 commit comments

Comments
 (0)