Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,9 @@ import kotlinx.rpc.grpc.descriptor.MethodDescriptor
* // Example: add a header before proceeding
* requestHeaders[MyKeys.Authorization] = token
*
* // Example: modify call options
* callOptions.timeout = 5.seconds
*
* // Example: observe response metadata
* onHeaders { headers -> /* inspect headers */ }
* onClose { status, trailers -> /* log status/trailers */ }
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ import kotlinx.coroutines.flow.Flow
import kotlinx.rpc.RpcCall
import kotlinx.rpc.RpcClient
import kotlinx.rpc.grpc.GrpcMetadata
import kotlinx.rpc.grpc.client.internal.GrpcDefaultCallOptions
import kotlinx.rpc.grpc.client.internal.GrpcCallOptions
import kotlinx.rpc.grpc.client.internal.ManagedChannel
import kotlinx.rpc.grpc.client.internal.ManagedChannelBuilder
import kotlinx.rpc.grpc.client.internal.bidirectionalStreamingRpc
Expand Down Expand Up @@ -58,45 +58,45 @@ public class GrpcClient internal constructor(
}

override suspend fun <T> call(call: RpcCall): T = withGrpcCall(call) { methodDescriptor, request ->
val callOptions = GrpcDefaultCallOptions
val callOptions = GrpcCallOptions()
val trailers = GrpcMetadata()

return when (methodDescriptor.methodType) {
MethodType.UNARY -> unaryRpc(
descriptor = methodDescriptor,
request = request,
callOptions = callOptions,
trailers = trailers,
headers = trailers,
)

MethodType.CLIENT_STREAMING -> @Suppress("UNCHECKED_CAST") clientStreamingRpc(
descriptor = methodDescriptor,
requests = request as Flow<RequestClient>,
callOptions = callOptions,
trailers = trailers,
headers = trailers,
)

else -> error("Wrong method type ${methodDescriptor.methodType}")
}
}

override fun <T> callServerStreaming(call: RpcCall): Flow<T> = withGrpcCall(call) { methodDescriptor, request ->
val callOptions = GrpcDefaultCallOptions
val trailers = GrpcMetadata()
val callOptions = GrpcCallOptions()
val headers = GrpcMetadata()

when (methodDescriptor.methodType) {
MethodType.SERVER_STREAMING -> serverStreamingRpc(
descriptor = methodDescriptor,
request = request,
callOptions = callOptions,
trailers = trailers,
headers = headers,
)

MethodType.BIDI_STREAMING -> @Suppress("UNCHECKED_CAST") bidirectionalStreamingRpc(
descriptor = methodDescriptor,
requests = request as Flow<RequestClient>,
callOptions = callOptions,
trailers = trailers,
headers = headers,
)

else -> error("Wrong method type ${methodDescriptor.methodType}")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,10 +4,25 @@

package kotlinx.rpc.grpc.client.internal

import kotlinx.rpc.internal.utils.InternalRpcApi
import kotlin.time.Duration

@InternalRpcApi
public expect class GrpcCallOptions

@InternalRpcApi
public expect val GrpcDefaultCallOptions: GrpcCallOptions
/**
* The collection of runtime options for a new gRPC call.
*
* This class allows configuring per-call behavior such as timeouts.
*/
public class GrpcCallOptions {
/**
* The maximum duration to wait for the RPC to complete.
*
* If set, the RPC will be canceled (with `DEADLINE_EXCEEDED`)
* if it does not complete within the specified duration.
* The timeout is measured from the moment the call is initiated.
* If `null`, no timeout is applied, and the call may run indefinitely.
*
* The default value is `null`.
*
* @see Duration
*/
public var timeout: Duration? = null
}
Original file line number Diff line number Diff line change
Expand Up @@ -8,11 +8,10 @@ import kotlinx.rpc.grpc.descriptor.MethodDescriptor
import kotlinx.rpc.internal.utils.InternalRpcApi

@InternalRpcApi
public expect abstract class GrpcChannel {
public abstract fun <RequestT, ResponseT> newCall(
methodDescriptor: MethodDescriptor<RequestT, ResponseT>,
callOptions: GrpcCallOptions,
): ClientCall<RequestT, ResponseT>
public expect abstract class GrpcChannel

public abstract fun authority(): String?
}
@InternalRpcApi
public expect fun <RequestT, ResponseT> GrpcChannel.createCall(
methodDescriptor: MethodDescriptor<RequestT, ResponseT>,
callOptions: GrpcCallOptions,
): ClientCall<RequestT, ResponseT>
Original file line number Diff line number Diff line change
Expand Up @@ -39,8 +39,8 @@ import kotlinx.rpc.internal.utils.InternalRpcApi
public suspend fun <Request, Response> GrpcClient.unaryRpc(
descriptor: MethodDescriptor<Request, Response>,
request: Request,
callOptions: GrpcCallOptions = GrpcDefaultCallOptions,
trailers: GrpcMetadata = GrpcMetadata(),
callOptions: GrpcCallOptions = GrpcCallOptions(),
headers: GrpcMetadata = GrpcMetadata(),
): Response {
val type = descriptor.methodType
require(type == MethodType.UNARY) {
Expand All @@ -50,7 +50,7 @@ public suspend fun <Request, Response> GrpcClient.unaryRpc(
return rpcImpl(
descriptor = descriptor,
callOptions = callOptions,
trailers = trailers,
headers = headers,
request = flowOf(request)
).singleOrStatus("request", descriptor)
}
Expand All @@ -59,8 +59,8 @@ public suspend fun <Request, Response> GrpcClient.unaryRpc(
public fun <Request, Response> GrpcClient.serverStreamingRpc(
descriptor: MethodDescriptor<Request, Response>,
request: Request,
callOptions: GrpcCallOptions = GrpcDefaultCallOptions,
trailers: GrpcMetadata = GrpcMetadata(),
callOptions: GrpcCallOptions = GrpcCallOptions(),
headers: GrpcMetadata = GrpcMetadata(),
): Flow<Response> {
val type = descriptor.methodType
require(type == MethodType.SERVER_STREAMING) {
Expand All @@ -70,7 +70,7 @@ public fun <Request, Response> GrpcClient.serverStreamingRpc(
return rpcImpl(
descriptor = descriptor,
callOptions = callOptions,
trailers = trailers,
headers = headers,
request = flowOf(request)
)
}
Expand All @@ -79,8 +79,8 @@ public fun <Request, Response> GrpcClient.serverStreamingRpc(
public suspend fun <Request, Response> GrpcClient.clientStreamingRpc(
descriptor: MethodDescriptor<Request, Response>,
requests: Flow<Request>,
callOptions: GrpcCallOptions = GrpcDefaultCallOptions,
trailers: GrpcMetadata = GrpcMetadata(),
callOptions: GrpcCallOptions = GrpcCallOptions(),
headers: GrpcMetadata = GrpcMetadata(),
): Response {
val type = descriptor.methodType
require(type == MethodType.CLIENT_STREAMING) {
Expand All @@ -90,7 +90,7 @@ public suspend fun <Request, Response> GrpcClient.clientStreamingRpc(
return rpcImpl(
descriptor = descriptor,
callOptions = callOptions,
trailers = trailers,
headers = headers,
request = requests
).singleOrStatus("response", descriptor)
}
Expand All @@ -99,8 +99,8 @@ public suspend fun <Request, Response> GrpcClient.clientStreamingRpc(
public fun <Request, Response> GrpcClient.bidirectionalStreamingRpc(
descriptor: MethodDescriptor<Request, Response>,
requests: Flow<Request>,
callOptions: GrpcCallOptions = GrpcDefaultCallOptions,
trailers: GrpcMetadata = GrpcMetadata(),
callOptions: GrpcCallOptions = GrpcCallOptions(),
headers: GrpcMetadata = GrpcMetadata(),
): Flow<Response> {
val type = descriptor.methodType
check(type == MethodType.BIDI_STREAMING) {
Expand All @@ -110,7 +110,7 @@ public fun <Request, Response> GrpcClient.bidirectionalStreamingRpc(
return rpcImpl(
descriptor = descriptor,
callOptions = callOptions,
trailers = trailers,
headers = headers,
request = requests
)
}
Expand Down Expand Up @@ -147,13 +147,13 @@ private sealed interface ClientRequest<Request> {
private fun <Request, Response> GrpcClient.rpcImpl(
descriptor: MethodDescriptor<Request, Response>,
callOptions: GrpcCallOptions,
trailers: GrpcMetadata,
headers: GrpcMetadata,
request: Flow<Request>,
): Flow<Response> {
val clientCallScope = ClientCallScopeImpl(
client = this,
method = descriptor,
requestHeaders = trailers,
requestHeaders = headers,
callOptions = callOptions,
)
return clientCallScope.proceed(request)
Expand All @@ -165,8 +165,6 @@ private class ClientCallScopeImpl<Request, Response>(
override val requestHeaders: GrpcMetadata,
override val callOptions: GrpcCallOptions,
) : ClientCallScope<Request, Response> {

val call = client.channel.platformApi.newCall(method, callOptions)
val interceptors = client.interceptors
val onHeadersFuture = CallbackFuture<GrpcMetadata>()
val onCloseFuture = CallbackFuture<Pair<Status, GrpcMetadata>>()
Expand Down Expand Up @@ -198,6 +196,7 @@ private class ClientCallScopeImpl<Request, Response>(

private fun doCall(request: Flow<Request>): Flow<Response> = flow {
coroutineScope {
val call = client.channel.platformApi.createCall(method, callOptions)

/*
* We maintain a buffer of size 1 so onMessage never has to block: it only gets called after
Expand All @@ -207,7 +206,7 @@ private class ClientCallScopeImpl<Request, Response>(
val responses = Channel<Response>(1)
val ready = Ready { call.isReady() }

call.start(channelResponseListener(responses, ready), requestHeaders)
call.start(channelResponseListener(call, responses, ready), requestHeaders)

suspend fun Flow<Request>.send() {
if (method.methodType == MethodType.UNARY || method.methodType == MethodType.SERVER_STREAMING) {
Expand Down Expand Up @@ -256,6 +255,7 @@ private class ClientCallScopeImpl<Request, Response>(
}

private fun <Response> channelResponseListener(
call: ClientCall<*, Response>,
responses: Channel<Response>,
ready: Ready,
) = clientCallListener(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,12 +5,13 @@
package kotlinx.rpc.grpc.client.internal

import io.grpc.CallOptions
import kotlinx.rpc.grpc.client.internal.GrpcCallOptions
import kotlinx.rpc.internal.utils.InternalRpcApi

@InternalRpcApi
public actual typealias GrpcCallOptions = CallOptions

@InternalRpcApi
public actual val GrpcDefaultCallOptions: GrpcCallOptions
get() = GrpcCallOptions.DEFAULT
public fun GrpcCallOptions.toJvm(): CallOptions {
var default = CallOptions.DEFAULT
if (timeout != null) {
default = default.withDeadlineAfter(timeout!!.inWholeMilliseconds, java.util.concurrent.TimeUnit.MILLISECONDS)
}
return default
}
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,16 @@
package kotlinx.rpc.grpc.client.internal

import io.grpc.Channel
import kotlinx.rpc.grpc.descriptor.MethodDescriptor
import kotlinx.rpc.internal.utils.InternalRpcApi

@InternalRpcApi
public actual typealias GrpcChannel = Channel

@InternalRpcApi
public actual fun <RequestT, ResponseT> GrpcChannel.createCall(
methodDescriptor: MethodDescriptor<RequestT, ResponseT>,
callOptions: GrpcCallOptions,
): ClientCall<RequestT, ResponseT> {
return this.newCall(methodDescriptor, callOptions.toJvm())
}
Original file line number Diff line number Diff line change
Expand Up @@ -2,15 +2,27 @@
* Copyright 2023-2025 JetBrains s.r.o and contributors. Use of this source code is governed by the Apache 2.0 license.
*/

@file:OptIn(ExperimentalForeignApi::class)

package kotlinx.rpc.grpc.client.internal

import kotlinx.rpc.grpc.client.internal.GrpcCallOptions
import kotlinx.cinterop.CValue
import kotlinx.cinterop.ExperimentalForeignApi
import kotlinx.rpc.internal.utils.InternalRpcApi
import libkgrpc.GPR_CLOCK_REALTIME
import libkgrpc.GPR_TIMESPAN
import libkgrpc.gpr_inf_future
import libkgrpc.gpr_now
import libkgrpc.gpr_time_add
import libkgrpc.gpr_time_from_millis
import libkgrpc.gpr_timespec

@InternalRpcApi
public actual class GrpcCallOptions {
// TODO: Do something with it
public fun GrpcCallOptions.rawDeadline(): CValue<gpr_timespec> {
return timeout?.let {
gpr_time_add(
gpr_now(GPR_CLOCK_REALTIME),
gpr_time_from_millis(it.inWholeMilliseconds, GPR_TIMESPAN)
)
} ?: gpr_inf_future(GPR_CLOCK_REALTIME)
}

@InternalRpcApi
public actual val GrpcDefaultCallOptions: GrpcCallOptions = GrpcCallOptions()
Original file line number Diff line number Diff line change
Expand Up @@ -9,10 +9,16 @@ import kotlinx.rpc.internal.utils.InternalRpcApi

@InternalRpcApi
public actual abstract class GrpcChannel {
public actual abstract fun <RequestT, ResponseT> newCall(
public abstract fun <RequestT, ResponseT> newCall(
methodDescriptor: MethodDescriptor<RequestT, ResponseT>,
callOptions: GrpcCallOptions,
): ClientCall<RequestT, ResponseT>

public actual abstract fun authority(): String?
}

@InternalRpcApi
public actual fun <RequestT, ResponseT> GrpcChannel.createCall(
methodDescriptor: MethodDescriptor<RequestT, ResponseT>,
callOptions: GrpcCallOptions,
): ClientCall<RequestT, ResponseT> {
return this.newCall(methodDescriptor, callOptions)
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,14 +20,12 @@ import kotlinx.coroutines.SupervisorJob
import kotlinx.coroutines.cancelChildren
import kotlinx.coroutines.withTimeoutOrNull
import kotlinx.rpc.grpc.client.ClientCredentials
import kotlinx.rpc.grpc.descriptor.MethodDescriptor
import kotlinx.rpc.grpc.internal.CompletionQueue
import kotlinx.rpc.grpc.internal.GrpcRuntime
import kotlinx.rpc.grpc.descriptor.MethodDescriptor
import kotlinx.rpc.grpc.internal.internalError
import kotlinx.rpc.grpc.internal.toGrpcSlice
import libkgrpc.GPR_CLOCK_REALTIME
import libkgrpc.GRPC_PROPAGATE_DEFAULTS
import libkgrpc.gpr_inf_future
import libkgrpc.grpc_arg
import libkgrpc.grpc_arg_type
import libkgrpc.grpc_channel_args
Expand Down Expand Up @@ -158,7 +156,7 @@ internal class NativeManagedChannel(
completion_queue = cq.raw,
method = methodNameSlice,
host = null,
deadline = gpr_inf_future(GPR_CLOCK_REALTIME),
deadline = callOptions.rawDeadline(),
reserved = null
) ?: error("Failed to create call")

Expand All @@ -169,8 +167,4 @@ internal class NativeManagedChannel(
)
}

override fun authority(): String? {
return authority
}

}
Loading
Loading