diff --git a/krpc/krpc-server/src/commonMain/kotlin/kotlinx/rpc/krpc/server/internal/ServerStreamContext.kt b/krpc/krpc-server/src/commonMain/kotlin/kotlinx/rpc/krpc/server/internal/ServerStreamContext.kt index 8e2f01b73..0311dd4fa 100644 --- a/krpc/krpc-server/src/commonMain/kotlin/kotlinx/rpc/krpc/server/internal/ServerStreamContext.kt +++ b/krpc/krpc-server/src/commonMain/kotlin/kotlinx/rpc/krpc/server/internal/ServerStreamContext.kt @@ -4,6 +4,7 @@ package kotlinx.rpc.krpc.server.internal +import kotlinx.atomicfu.atomic import kotlinx.coroutines.channels.Channel import kotlinx.coroutines.flow.Flow import kotlinx.coroutines.flow.flow @@ -65,7 +66,13 @@ internal class ServerStreamContext { map.remove(streamId) } + val collected = AtomicBoolean(false) + val flow = flow { + check(collected.value.compareAndSet(expect = false, update = true)) { + "Request streams can only be collected once" + } + for (message in channel) { when (message) { is StreamCancel -> { @@ -92,5 +99,9 @@ internal class ServerStreamContext { private fun streamCanceled(): Throwable = NoSuchElementException("Stream canceled") } +private class AtomicBoolean(initialValue: Boolean) { + val value = atomic(initialValue) +} + private data class StreamCancel(val cause: Throwable? = null) private data object StreamEnd diff --git a/krpc/krpc-test/src/commonMain/kotlin/kotlinx/rpc/krpc/test/KrpcTestService.kt b/krpc/krpc-test/src/commonMain/kotlin/kotlinx/rpc/krpc/test/KrpcTestService.kt index 54f191724..ddf1d9cbe 100644 --- a/krpc/krpc-test/src/commonMain/kotlin/kotlinx/rpc/krpc/test/KrpcTestService.kt +++ b/krpc/krpc-test/src/commonMain/kotlin/kotlinx/rpc/krpc/test/KrpcTestService.kt @@ -63,6 +63,7 @@ interface KrpcTestService { suspend fun nullableParam(arg1: String?): String suspend fun nullableReturn(returnNull: Boolean): TestClass? suspend fun variance(arg2: TestList, arg3: TestList2): TestList? + suspend fun collectOnce(flow: Flow) suspend fun nonSerializableClass(localDate: LocalDate): LocalDate suspend fun nonSerializableClassWithSerializer( diff --git a/krpc/krpc-test/src/commonMain/kotlin/kotlinx/rpc/krpc/test/KrpcTestServiceBackend.kt b/krpc/krpc-test/src/commonMain/kotlin/kotlinx/rpc/krpc/test/KrpcTestServiceBackend.kt index d626714bb..1caabb763 100644 --- a/krpc/krpc-test/src/commonMain/kotlin/kotlinx/rpc/krpc/test/KrpcTestServiceBackend.kt +++ b/krpc/krpc-test/src/commonMain/kotlin/kotlinx/rpc/krpc/test/KrpcTestServiceBackend.kt @@ -11,6 +11,7 @@ import kotlinx.serialization.Serializable import kotlin.coroutines.resumeWithException import kotlin.test.assertContentEquals import kotlin.test.assertEquals +import kotlin.test.assertFailsWith @OptIn(ExperimentalCoroutinesApi::class) class KrpcTestServiceBackend : KrpcTestService { @@ -114,6 +115,11 @@ class KrpcTestServiceBackend : KrpcTestService { return TestList(3) } + override suspend fun collectOnce(flow: Flow) { + flow.toList() + assertFailsWith { flow.toList() } + } + override suspend fun nonSerializableClass(localDate: LocalDate): LocalDate { return LocalDate(localDate.year, localDate.month, localDate.day + 1) } diff --git a/krpc/krpc-test/src/commonMain/kotlin/kotlinx/rpc/krpc/test/KrpcTransportTestBase.kt b/krpc/krpc-test/src/commonMain/kotlin/kotlinx/rpc/krpc/test/KrpcTransportTestBase.kt index 3ea42e6a0..11136c826 100644 --- a/krpc/krpc-test/src/commonMain/kotlin/kotlinx/rpc/krpc/test/KrpcTransportTestBase.kt +++ b/krpc/krpc-test/src/commonMain/kotlin/kotlinx/rpc/krpc/test/KrpcTransportTestBase.kt @@ -259,6 +259,11 @@ abstract class KrpcTransportTestBase { assertEquals(TestList(3), result) } + @Test + fun collectOnce() = runTest { + client.collectOnce(flowOf("test1", "test2", "test3")) + } + @Test fun incomingStreamSyncCollect() = runTest { val result = client.incomingStreamSyncCollect(flowOf("test1", "test2", "test3"))