Skip to content

Commit 0c017be

Browse files
authored
Add collect once check for client streams (#431)
1 parent edeae55 commit 0c017be

File tree

4 files changed

+23
-0
lines changed

4 files changed

+23
-0
lines changed

krpc/krpc-server/src/commonMain/kotlin/kotlinx/rpc/krpc/server/internal/ServerStreamContext.kt

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@
44

55
package kotlinx.rpc.krpc.server.internal
66

7+
import kotlinx.atomicfu.atomic
78
import kotlinx.coroutines.channels.Channel
89
import kotlinx.coroutines.flow.Flow
910
import kotlinx.coroutines.flow.flow
@@ -65,7 +66,13 @@ internal class ServerStreamContext {
6566
map.remove(streamId)
6667
}
6768

69+
val collected = AtomicBoolean(false)
70+
6871
val flow = flow {
72+
check(collected.value.compareAndSet(expect = false, update = true)) {
73+
"Request streams can only be collected once"
74+
}
75+
6976
for (message in channel) {
7077
when (message) {
7178
is StreamCancel -> {
@@ -92,5 +99,9 @@ internal class ServerStreamContext {
9299
private fun streamCanceled(): Throwable = NoSuchElementException("Stream canceled")
93100
}
94101

102+
private class AtomicBoolean(initialValue: Boolean) {
103+
val value = atomic(initialValue)
104+
}
105+
95106
private data class StreamCancel(val cause: Throwable? = null)
96107
private data object StreamEnd

krpc/krpc-test/src/commonMain/kotlin/kotlinx/rpc/krpc/test/KrpcTestService.kt

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -63,6 +63,7 @@ interface KrpcTestService {
6363
suspend fun nullableParam(arg1: String?): String
6464
suspend fun nullableReturn(returnNull: Boolean): TestClass?
6565
suspend fun variance(arg2: TestList<in TestClass>, arg3: TestList2<TestClass>): TestList<out TestClass>?
66+
suspend fun collectOnce(flow: Flow<String>)
6667

6768
suspend fun nonSerializableClass(localDate: LocalDate): LocalDate
6869
suspend fun nonSerializableClassWithSerializer(

krpc/krpc-test/src/commonMain/kotlin/kotlinx/rpc/krpc/test/KrpcTestServiceBackend.kt

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@ import kotlinx.serialization.Serializable
1111
import kotlin.coroutines.resumeWithException
1212
import kotlin.test.assertContentEquals
1313
import kotlin.test.assertEquals
14+
import kotlin.test.assertFailsWith
1415

1516
@OptIn(ExperimentalCoroutinesApi::class)
1617
class KrpcTestServiceBackend : KrpcTestService {
@@ -114,6 +115,11 @@ class KrpcTestServiceBackend : KrpcTestService {
114115
return TestList(3)
115116
}
116117

118+
override suspend fun collectOnce(flow: Flow<String>) {
119+
flow.toList()
120+
assertFailsWith<IllegalStateException> { flow.toList() }
121+
}
122+
117123
override suspend fun nonSerializableClass(localDate: LocalDate): LocalDate {
118124
return LocalDate(localDate.year, localDate.month, localDate.day + 1)
119125
}

krpc/krpc-test/src/commonMain/kotlin/kotlinx/rpc/krpc/test/KrpcTransportTestBase.kt

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -259,6 +259,11 @@ abstract class KrpcTransportTestBase {
259259
assertEquals(TestList(3), result)
260260
}
261261

262+
@Test
263+
fun collectOnce() = runTest {
264+
client.collectOnce(flowOf("test1", "test2", "test3"))
265+
}
266+
262267
@Test
263268
fun incomingStreamSyncCollect() = runTest {
264269
val result = client.incomingStreamSyncCollect(flowOf("test1", "test2", "test3"))

0 commit comments

Comments
 (0)