Skip to content

Commit a1f9a1b

Browse files
authored
grpc: Add Timeout Support (#525)
1 parent 76b381d commit a1f9a1b

File tree

12 files changed

+235
-68
lines changed

12 files changed

+235
-68
lines changed

grpc/grpc-client/src/commonMain/kotlin/kotlinx/rpc/grpc/client/ClientInterceptor.kt

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,9 @@ import kotlinx.rpc.grpc.descriptor.MethodDescriptor
3030
* // Example: add a header before proceeding
3131
* requestHeaders[MyKeys.Authorization] = token
3232
*
33+
* // Example: modify call options
34+
* callOptions.timeout = 5.seconds
35+
*
3336
* // Example: observe response metadata
3437
* onHeaders { headers -> /* inspect headers */ }
3538
* onClose { status, trailers -> /* log status/trailers */ }

grpc/grpc-client/src/commonMain/kotlin/kotlinx/rpc/grpc/client/GrpcClient.kt

Lines changed: 8 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,7 @@ import kotlinx.coroutines.flow.Flow
88
import kotlinx.rpc.RpcCall
99
import kotlinx.rpc.RpcClient
1010
import kotlinx.rpc.grpc.GrpcMetadata
11-
import kotlinx.rpc.grpc.client.internal.GrpcDefaultCallOptions
11+
import kotlinx.rpc.grpc.client.internal.GrpcCallOptions
1212
import kotlinx.rpc.grpc.client.internal.ManagedChannel
1313
import kotlinx.rpc.grpc.client.internal.ManagedChannelBuilder
1414
import kotlinx.rpc.grpc.client.internal.bidirectionalStreamingRpc
@@ -58,45 +58,45 @@ public class GrpcClient internal constructor(
5858
}
5959

6060
override suspend fun <T> call(call: RpcCall): T = withGrpcCall(call) { methodDescriptor, request ->
61-
val callOptions = GrpcDefaultCallOptions
61+
val callOptions = GrpcCallOptions()
6262
val trailers = GrpcMetadata()
6363

6464
return when (methodDescriptor.methodType) {
6565
MethodType.UNARY -> unaryRpc(
6666
descriptor = methodDescriptor,
6767
request = request,
6868
callOptions = callOptions,
69-
trailers = trailers,
69+
headers = trailers,
7070
)
7171

7272
MethodType.CLIENT_STREAMING -> @Suppress("UNCHECKED_CAST") clientStreamingRpc(
7373
descriptor = methodDescriptor,
7474
requests = request as Flow<RequestClient>,
7575
callOptions = callOptions,
76-
trailers = trailers,
76+
headers = trailers,
7777
)
7878

7979
else -> error("Wrong method type ${methodDescriptor.methodType}")
8080
}
8181
}
8282

8383
override fun <T> callServerStreaming(call: RpcCall): Flow<T> = withGrpcCall(call) { methodDescriptor, request ->
84-
val callOptions = GrpcDefaultCallOptions
85-
val trailers = GrpcMetadata()
84+
val callOptions = GrpcCallOptions()
85+
val headers = GrpcMetadata()
8686

8787
when (methodDescriptor.methodType) {
8888
MethodType.SERVER_STREAMING -> serverStreamingRpc(
8989
descriptor = methodDescriptor,
9090
request = request,
9191
callOptions = callOptions,
92-
trailers = trailers,
92+
headers = headers,
9393
)
9494

9595
MethodType.BIDI_STREAMING -> @Suppress("UNCHECKED_CAST") bidirectionalStreamingRpc(
9696
descriptor = methodDescriptor,
9797
requests = request as Flow<RequestClient>,
9898
callOptions = callOptions,
99-
trailers = trailers,
99+
headers = headers,
100100
)
101101

102102
else -> error("Wrong method type ${methodDescriptor.methodType}")

grpc/grpc-client/src/commonMain/kotlin/kotlinx/rpc/grpc/client/internal/GrpcCallOptions.kt

Lines changed: 21 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -4,10 +4,25 @@
44

55
package kotlinx.rpc.grpc.client.internal
66

7-
import kotlinx.rpc.internal.utils.InternalRpcApi
7+
import kotlin.time.Duration
88

9-
@InternalRpcApi
10-
public expect class GrpcCallOptions
11-
12-
@InternalRpcApi
13-
public expect val GrpcDefaultCallOptions: GrpcCallOptions
9+
/**
10+
* The collection of runtime options for a new gRPC call.
11+
*
12+
* This class allows configuring per-call behavior such as timeouts.
13+
*/
14+
public class GrpcCallOptions {
15+
/**
16+
* The maximum duration to wait for the RPC to complete.
17+
*
18+
* If set, the RPC will be canceled (with `DEADLINE_EXCEEDED`)
19+
* if it does not complete within the specified duration.
20+
* The timeout is measured from the moment the call is initiated.
21+
* If `null`, no timeout is applied, and the call may run indefinitely.
22+
*
23+
* The default value is `null`.
24+
*
25+
* @see Duration
26+
*/
27+
public var timeout: Duration? = null
28+
}

grpc/grpc-client/src/commonMain/kotlin/kotlinx/rpc/grpc/client/internal/GrpcChannel.kt

Lines changed: 6 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -8,11 +8,10 @@ import kotlinx.rpc.grpc.descriptor.MethodDescriptor
88
import kotlinx.rpc.internal.utils.InternalRpcApi
99

1010
@InternalRpcApi
11-
public expect abstract class GrpcChannel {
12-
public abstract fun <RequestT, ResponseT> newCall(
13-
methodDescriptor: MethodDescriptor<RequestT, ResponseT>,
14-
callOptions: GrpcCallOptions,
15-
): ClientCall<RequestT, ResponseT>
11+
public expect abstract class GrpcChannel
1612

17-
public abstract fun authority(): String?
18-
}
13+
@InternalRpcApi
14+
public expect fun <RequestT, ResponseT> GrpcChannel.createCall(
15+
methodDescriptor: MethodDescriptor<RequestT, ResponseT>,
16+
callOptions: GrpcCallOptions,
17+
): ClientCall<RequestT, ResponseT>

grpc/grpc-client/src/commonMain/kotlin/kotlinx/rpc/grpc/client/internal/suspendClientCalls.kt

Lines changed: 17 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -39,8 +39,8 @@ import kotlinx.rpc.internal.utils.InternalRpcApi
3939
public suspend fun <Request, Response> GrpcClient.unaryRpc(
4040
descriptor: MethodDescriptor<Request, Response>,
4141
request: Request,
42-
callOptions: GrpcCallOptions = GrpcDefaultCallOptions,
43-
trailers: GrpcMetadata = GrpcMetadata(),
42+
callOptions: GrpcCallOptions = GrpcCallOptions(),
43+
headers: GrpcMetadata = GrpcMetadata(),
4444
): Response {
4545
val type = descriptor.methodType
4646
require(type == MethodType.UNARY) {
@@ -50,7 +50,7 @@ public suspend fun <Request, Response> GrpcClient.unaryRpc(
5050
return rpcImpl(
5151
descriptor = descriptor,
5252
callOptions = callOptions,
53-
trailers = trailers,
53+
headers = headers,
5454
request = flowOf(request)
5555
).singleOrStatus("request", descriptor)
5656
}
@@ -59,8 +59,8 @@ public suspend fun <Request, Response> GrpcClient.unaryRpc(
5959
public fun <Request, Response> GrpcClient.serverStreamingRpc(
6060
descriptor: MethodDescriptor<Request, Response>,
6161
request: Request,
62-
callOptions: GrpcCallOptions = GrpcDefaultCallOptions,
63-
trailers: GrpcMetadata = GrpcMetadata(),
62+
callOptions: GrpcCallOptions = GrpcCallOptions(),
63+
headers: GrpcMetadata = GrpcMetadata(),
6464
): Flow<Response> {
6565
val type = descriptor.methodType
6666
require(type == MethodType.SERVER_STREAMING) {
@@ -70,7 +70,7 @@ public fun <Request, Response> GrpcClient.serverStreamingRpc(
7070
return rpcImpl(
7171
descriptor = descriptor,
7272
callOptions = callOptions,
73-
trailers = trailers,
73+
headers = headers,
7474
request = flowOf(request)
7575
)
7676
}
@@ -79,8 +79,8 @@ public fun <Request, Response> GrpcClient.serverStreamingRpc(
7979
public suspend fun <Request, Response> GrpcClient.clientStreamingRpc(
8080
descriptor: MethodDescriptor<Request, Response>,
8181
requests: Flow<Request>,
82-
callOptions: GrpcCallOptions = GrpcDefaultCallOptions,
83-
trailers: GrpcMetadata = GrpcMetadata(),
82+
callOptions: GrpcCallOptions = GrpcCallOptions(),
83+
headers: GrpcMetadata = GrpcMetadata(),
8484
): Response {
8585
val type = descriptor.methodType
8686
require(type == MethodType.CLIENT_STREAMING) {
@@ -90,7 +90,7 @@ public suspend fun <Request, Response> GrpcClient.clientStreamingRpc(
9090
return rpcImpl(
9191
descriptor = descriptor,
9292
callOptions = callOptions,
93-
trailers = trailers,
93+
headers = headers,
9494
request = requests
9595
).singleOrStatus("response", descriptor)
9696
}
@@ -99,8 +99,8 @@ public suspend fun <Request, Response> GrpcClient.clientStreamingRpc(
9999
public fun <Request, Response> GrpcClient.bidirectionalStreamingRpc(
100100
descriptor: MethodDescriptor<Request, Response>,
101101
requests: Flow<Request>,
102-
callOptions: GrpcCallOptions = GrpcDefaultCallOptions,
103-
trailers: GrpcMetadata = GrpcMetadata(),
102+
callOptions: GrpcCallOptions = GrpcCallOptions(),
103+
headers: GrpcMetadata = GrpcMetadata(),
104104
): Flow<Response> {
105105
val type = descriptor.methodType
106106
check(type == MethodType.BIDI_STREAMING) {
@@ -110,7 +110,7 @@ public fun <Request, Response> GrpcClient.bidirectionalStreamingRpc(
110110
return rpcImpl(
111111
descriptor = descriptor,
112112
callOptions = callOptions,
113-
trailers = trailers,
113+
headers = headers,
114114
request = requests
115115
)
116116
}
@@ -147,13 +147,13 @@ private sealed interface ClientRequest<Request> {
147147
private fun <Request, Response> GrpcClient.rpcImpl(
148148
descriptor: MethodDescriptor<Request, Response>,
149149
callOptions: GrpcCallOptions,
150-
trailers: GrpcMetadata,
150+
headers: GrpcMetadata,
151151
request: Flow<Request>,
152152
): Flow<Response> {
153153
val clientCallScope = ClientCallScopeImpl(
154154
client = this,
155155
method = descriptor,
156-
requestHeaders = trailers,
156+
requestHeaders = headers,
157157
callOptions = callOptions,
158158
)
159159
return clientCallScope.proceed(request)
@@ -165,8 +165,6 @@ private class ClientCallScopeImpl<Request, Response>(
165165
override val requestHeaders: GrpcMetadata,
166166
override val callOptions: GrpcCallOptions,
167167
) : ClientCallScope<Request, Response> {
168-
169-
val call = client.channel.platformApi.newCall(method, callOptions)
170168
val interceptors = client.interceptors
171169
val onHeadersFuture = CallbackFuture<GrpcMetadata>()
172170
val onCloseFuture = CallbackFuture<Pair<Status, GrpcMetadata>>()
@@ -198,6 +196,7 @@ private class ClientCallScopeImpl<Request, Response>(
198196

199197
private fun doCall(request: Flow<Request>): Flow<Response> = flow {
200198
coroutineScope {
199+
val call = client.channel.platformApi.createCall(method, callOptions)
201200

202201
/*
203202
* We maintain a buffer of size 1 so onMessage never has to block: it only gets called after
@@ -207,7 +206,7 @@ private class ClientCallScopeImpl<Request, Response>(
207206
val responses = Channel<Response>(1)
208207
val ready = Ready { call.isReady() }
209208

210-
call.start(channelResponseListener(responses, ready), requestHeaders)
209+
call.start(channelResponseListener(call, responses, ready), requestHeaders)
211210

212211
suspend fun Flow<Request>.send() {
213212
if (method.methodType == MethodType.UNARY || method.methodType == MethodType.SERVER_STREAMING) {
@@ -256,6 +255,7 @@ private class ClientCallScopeImpl<Request, Response>(
256255
}
257256

258257
private fun <Response> channelResponseListener(
258+
call: ClientCall<*, Response>,
259259
responses: Channel<Response>,
260260
ready: Ready,
261261
) = clientCallListener(

grpc/grpc-client/src/jvmMain/kotlin/kotlinx/rpc/grpc/client/internal/GrpcCallOptions.jvm.kt

Lines changed: 7 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -5,12 +5,13 @@
55
package kotlinx.rpc.grpc.client.internal
66

77
import io.grpc.CallOptions
8-
import kotlinx.rpc.grpc.client.internal.GrpcCallOptions
98
import kotlinx.rpc.internal.utils.InternalRpcApi
109

1110
@InternalRpcApi
12-
public actual typealias GrpcCallOptions = CallOptions
13-
14-
@InternalRpcApi
15-
public actual val GrpcDefaultCallOptions: GrpcCallOptions
16-
get() = GrpcCallOptions.DEFAULT
11+
public fun GrpcCallOptions.toJvm(): CallOptions {
12+
var default = CallOptions.DEFAULT
13+
if (timeout != null) {
14+
default = default.withDeadlineAfter(timeout!!.inWholeMilliseconds, java.util.concurrent.TimeUnit.MILLISECONDS)
15+
}
16+
return default
17+
}

grpc/grpc-client/src/jvmMain/kotlin/kotlinx/rpc/grpc/client/internal/GrpcChannel.jvm.kt

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,16 @@
55
package kotlinx.rpc.grpc.client.internal
66

77
import io.grpc.Channel
8+
import kotlinx.rpc.grpc.descriptor.MethodDescriptor
89
import kotlinx.rpc.internal.utils.InternalRpcApi
910

1011
@InternalRpcApi
1112
public actual typealias GrpcChannel = Channel
13+
14+
@InternalRpcApi
15+
public actual fun <RequestT, ResponseT> GrpcChannel.createCall(
16+
methodDescriptor: MethodDescriptor<RequestT, ResponseT>,
17+
callOptions: GrpcCallOptions,
18+
): ClientCall<RequestT, ResponseT> {
19+
return this.newCall(methodDescriptor, callOptions.toJvm())
20+
}

grpc/grpc-client/src/nativeMain/kotlin/kotlinx/rpc/grpc/client/internal/GrpcCallOptions.native.kt

Lines changed: 18 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -2,15 +2,27 @@
22
* Copyright 2023-2025 JetBrains s.r.o and contributors. Use of this source code is governed by the Apache 2.0 license.
33
*/
44

5+
@file:OptIn(ExperimentalForeignApi::class)
6+
57
package kotlinx.rpc.grpc.client.internal
68

7-
import kotlinx.rpc.grpc.client.internal.GrpcCallOptions
9+
import kotlinx.cinterop.CValue
10+
import kotlinx.cinterop.ExperimentalForeignApi
811
import kotlinx.rpc.internal.utils.InternalRpcApi
12+
import libkgrpc.GPR_CLOCK_REALTIME
13+
import libkgrpc.GPR_TIMESPAN
14+
import libkgrpc.gpr_inf_future
15+
import libkgrpc.gpr_now
16+
import libkgrpc.gpr_time_add
17+
import libkgrpc.gpr_time_from_millis
18+
import libkgrpc.gpr_timespec
919

1020
@InternalRpcApi
11-
public actual class GrpcCallOptions {
12-
// TODO: Do something with it
21+
public fun GrpcCallOptions.rawDeadline(): CValue<gpr_timespec> {
22+
return timeout?.let {
23+
gpr_time_add(
24+
gpr_now(GPR_CLOCK_REALTIME),
25+
gpr_time_from_millis(it.inWholeMilliseconds, GPR_TIMESPAN)
26+
)
27+
} ?: gpr_inf_future(GPR_CLOCK_REALTIME)
1328
}
14-
15-
@InternalRpcApi
16-
public actual val GrpcDefaultCallOptions: GrpcCallOptions = GrpcCallOptions()

grpc/grpc-client/src/nativeMain/kotlin/kotlinx/rpc/grpc/client/internal/GrpcChannel.native.kt

Lines changed: 9 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -9,10 +9,16 @@ import kotlinx.rpc.internal.utils.InternalRpcApi
99

1010
@InternalRpcApi
1111
public actual abstract class GrpcChannel {
12-
public actual abstract fun <RequestT, ResponseT> newCall(
12+
public abstract fun <RequestT, ResponseT> newCall(
1313
methodDescriptor: MethodDescriptor<RequestT, ResponseT>,
1414
callOptions: GrpcCallOptions,
1515
): ClientCall<RequestT, ResponseT>
16-
17-
public actual abstract fun authority(): String?
1816
}
17+
18+
@InternalRpcApi
19+
public actual fun <RequestT, ResponseT> GrpcChannel.createCall(
20+
methodDescriptor: MethodDescriptor<RequestT, ResponseT>,
21+
callOptions: GrpcCallOptions,
22+
): ClientCall<RequestT, ResponseT> {
23+
return this.newCall(methodDescriptor, callOptions)
24+
}

grpc/grpc-client/src/nativeMain/kotlin/kotlinx/rpc/grpc/client/internal/NativeManagedChannel.kt

Lines changed: 2 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -20,14 +20,12 @@ import kotlinx.coroutines.SupervisorJob
2020
import kotlinx.coroutines.cancelChildren
2121
import kotlinx.coroutines.withTimeoutOrNull
2222
import kotlinx.rpc.grpc.client.ClientCredentials
23+
import kotlinx.rpc.grpc.descriptor.MethodDescriptor
2324
import kotlinx.rpc.grpc.internal.CompletionQueue
2425
import kotlinx.rpc.grpc.internal.GrpcRuntime
25-
import kotlinx.rpc.grpc.descriptor.MethodDescriptor
2626
import kotlinx.rpc.grpc.internal.internalError
2727
import kotlinx.rpc.grpc.internal.toGrpcSlice
28-
import libkgrpc.GPR_CLOCK_REALTIME
2928
import libkgrpc.GRPC_PROPAGATE_DEFAULTS
30-
import libkgrpc.gpr_inf_future
3129
import libkgrpc.grpc_arg
3230
import libkgrpc.grpc_arg_type
3331
import libkgrpc.grpc_channel_args
@@ -158,7 +156,7 @@ internal class NativeManagedChannel(
158156
completion_queue = cq.raw,
159157
method = methodNameSlice,
160158
host = null,
161-
deadline = gpr_inf_future(GPR_CLOCK_REALTIME),
159+
deadline = callOptions.rawDeadline(),
162160
reserved = null
163161
) ?: error("Failed to create call")
164162

@@ -169,8 +167,4 @@ internal class NativeManagedChannel(
169167
)
170168
}
171169

172-
override fun authority(): String? {
173-
return authority
174-
}
175-
176170
}

0 commit comments

Comments
 (0)