Skip to content

Commit dc97ff0

Browse files
authored
Reconnectable rsocket implementation (#114)
1 parent e2cfb10 commit dc97ff0

File tree

9 files changed

+571
-18
lines changed

9 files changed

+571
-18
lines changed

build.gradle.kts

Lines changed: 13 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -169,8 +169,20 @@ subprojects {
169169
else -> emptyList()
170170
}
171171
}
172-
else -> emptyList()
172+
else -> emptyList()
173173
}.forEach(KotlinNativeTarget::disableCompilation)
174+
175+
//run tests on release + mimalloc to reduce tests execution time
176+
//compilation is slower in that mode, but work with buffers is much faster
177+
targets.all {
178+
if (this is KotlinNativeTargetWithTests<*>) {
179+
binaries.test(listOf(RELEASE))
180+
testRuns.all { setExecutionSourceFrom(binaries.getTest(RELEASE)) }
181+
compilations.all {
182+
kotlinOptions.freeCompilerArgs += "-Xallocator=mimalloc"
183+
}
184+
}
185+
}
174186
} else {
175187
//if not on CI, use only one native target same as host, DON'T PUBLISH IN THAT MODE LOCALLY!!!
176188
when {
@@ -179,18 +191,6 @@ subprojects {
179191
HostManager.hostIsMac -> macosX64("native")
180192
}
181193
}
182-
183-
//run tests on release + mimalloc to reduce tests execution time
184-
//compilation is slower in that mode
185-
targets.all {
186-
if (this is KotlinNativeTargetWithTests<*>) {
187-
binaries.test(listOf(RELEASE))
188-
testRuns.all { setExecutionSourceFrom(binaries.getTest(RELEASE)) }
189-
compilations.all {
190-
kotlinOptions.freeCompilerArgs += "-Xallocator=mimalloc"
191-
}
192-
}
193-
}
194194
}
195195

196196
//common configuration
Lines changed: 101 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,101 @@
1+
/*
2+
* Copyright 2015-2020 the original author or authors.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
import io.ktor.utils.io.core.*
18+
import io.rsocket.kotlin.*
19+
import io.rsocket.kotlin.core.*
20+
import io.rsocket.kotlin.payload.*
21+
import io.rsocket.kotlin.transport.*
22+
import io.rsocket.kotlin.transport.local.*
23+
import kotlinx.atomicfu.*
24+
import kotlinx.coroutines.*
25+
import kotlinx.coroutines.flow.*
26+
27+
@TransportApi
28+
fun main(): Unit = runBlocking {
29+
val server = LocalServer()
30+
RSocketServer().bind(server) {
31+
RSocketRequestHandler {
32+
requestStream { requestPayload ->
33+
val data = requestPayload.data.readText()
34+
val metadata = requestPayload.metadata?.readText()
35+
println("Server received payload: data=$data, metadata=$metadata")
36+
37+
flow {
38+
repeat(50) { i ->
39+
delay(100)
40+
emit(Payload("Payload: $i", metadata))
41+
println("Server sent: $i")
42+
}
43+
}
44+
}
45+
}
46+
}
47+
48+
val rSocket = RSocketConnector {
49+
//reconnect 10 times with 1 second delay if connection establishment failed
50+
reconnectable(10) {
51+
delay(1000)
52+
true
53+
}
54+
}.connect(ClientTransport {
55+
val connection = DisconnectableConnection(server.connect())
56+
launch {
57+
delay(500)
58+
connection.disconnect() //emulate connection fail
59+
}
60+
connection
61+
})
62+
63+
//do request
64+
try {
65+
rSocket.requestStream(Payload("Hello", "World")).buffer(3).collect {
66+
val index = it.data.readText().substringAfter("Payload: ").toInt()
67+
println("Client receives index: $index")
68+
}
69+
} catch (e: Throwable) {
70+
println("Request failed with error: $e")
71+
}
72+
73+
//do request just after it
74+
75+
rSocket.requestStream(Payload("Hello", "World")).buffer(3).take(3).collect {
76+
val index = it.data.readText().substringAfter("Payload: ").toInt()
77+
println("Client receives index: $index after reconnection")
78+
}
79+
80+
}
81+
82+
@TransportApi
83+
private class DisconnectableConnection(
84+
private val connection: Connection,
85+
) : Connection by connection {
86+
private val disconnected = atomic(false)
87+
88+
fun disconnect() {
89+
disconnected.value = true
90+
}
91+
92+
override suspend fun send(packet: ByteReadPacket) {
93+
if (disconnected.value) error("Disconnected")
94+
connection.send(packet)
95+
}
96+
97+
override suspend fun receive(): ByteReadPacket {
98+
if (disconnected.value) error("Disconnected")
99+
return connection.receive()
100+
}
101+
}
Lines changed: 70 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,70 @@
1+
/*
2+
* Copyright 2015-2020 the original author or authors.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
import io.rsocket.kotlin.*
18+
import io.rsocket.kotlin.core.*
19+
import io.rsocket.kotlin.payload.*
20+
import io.rsocket.kotlin.transport.*
21+
import io.rsocket.kotlin.transport.local.*
22+
import kotlinx.coroutines.*
23+
import kotlinx.coroutines.flow.*
24+
25+
@TransportApi
26+
fun main(): Unit = runBlocking {
27+
val server = LocalServer()
28+
RSocketServer().bind(server) {
29+
RSocketRequestHandler {
30+
requestStream { requestPayload ->
31+
val data = requestPayload.data.readText()
32+
val metadata = requestPayload.metadata?.readText()
33+
println("Server received payload: data=$data, metadata=$metadata")
34+
35+
flow {
36+
repeat(50) { i ->
37+
delay(100)
38+
emit(Payload("Payload: $i", metadata))
39+
println("Server sent: $i")
40+
}
41+
}
42+
}
43+
}
44+
}
45+
46+
//emulate connection establishment error
47+
var first = true
48+
49+
val rSocket = RSocketConnector {
50+
//reconnect 10 times with 1 second delay if connection establishment failed
51+
reconnectable(10) {
52+
delay(1000)
53+
println("Retry after error: $it")
54+
true
55+
}
56+
}.connect(ClientTransport {
57+
if (first) {
58+
first = false
59+
error("Connection establishment failed") //emulate connection establishment error
60+
}
61+
server.connect()
62+
})
63+
64+
//do request
65+
rSocket.requestStream(Payload("Hello", "World")).buffer(3).take(3).collect {
66+
val index = it.data.readText().substringAfter("Payload: ").toInt()
67+
println("Client receives index: $index")
68+
}
69+
70+
}

rsocket-core/src/commonMain/kotlin/io/rsocket/kotlin/core/RSocketConnector.kt

Lines changed: 12 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -28,9 +28,19 @@ class RSocketConnector internal constructor(
2828
private val interceptors: Interceptors,
2929
private val connectionConfigProvider: () -> ConnectionConfig,
3030
private val acceptor: ConnectionAcceptor,
31+
private val reconnectPredicate: ReconnectPredicate?,
3132
) {
3233

33-
suspend fun connect(transport: ClientTransport): RSocket {
34+
suspend fun connect(transport: ClientTransport): RSocket = when (reconnectPredicate) {
35+
null -> connectOnce(transport)
36+
else -> ReconnectableRSocket(
37+
logger = loggerFactory.logger("io.rsocket.kotlin.connection"),
38+
connect = { connectOnce(transport) },
39+
predicate = reconnectPredicate
40+
)
41+
}
42+
43+
private suspend fun connectOnce(transport: ClientTransport): RSocket {
3444
val connection = transport.connect().wrapConnection()
3545
val connectionConfig = connectionConfigProvider()
3646

@@ -49,5 +59,5 @@ class RSocketConnector internal constructor(
4959

5060
private fun Connection.wrapConnection(): Connection =
5161
interceptors.wrapConnection(this)
52-
.logging(loggerFactory.logger("io.rsocket.kotlin.frame.Frame"))
62+
.logging(loggerFactory.logger("io.rsocket.kotlin.frame"))
5363
}

rsocket-core/src/commonMain/kotlin/io/rsocket/kotlin/core/RSocketConnectorBuilder.kt

Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,7 @@ public class RSocketConnectorBuilder internal constructor() {
2828
private val connectionConfig: ConnectionConfigBuilder = ConnectionConfigBuilder()
2929
private val interceptors: InterceptorsBuilder = InterceptorsBuilder()
3030
private var acceptor: ConnectionAcceptor? = null
31+
private var reconnectPredicate: ReconnectPredicate? = null
3132

3233
public fun connectionConfig(configure: ConnectionConfigBuilder.() -> Unit) {
3334
connectionConfig.configure()
@@ -41,6 +42,30 @@ public class RSocketConnectorBuilder internal constructor() {
4142
acceptor = block
4243
}
4344

45+
/**
46+
* When configured, [RSocketConnector.connect] will return custom [RSocket] implementation,
47+
* which will try to reconnect if connection lost and [retries] are not exhausted with [predicate] returning `true`.
48+
*
49+
* **This is not Resumption**: by using [reconnectable] only connection will be re-established, streams will fail
50+
*
51+
* @param retries number of retries to do, if connection establishment failed
52+
*/
53+
public fun reconnectable(retries: Long, predicate: suspend (cause: Throwable) -> Boolean = { true }) {
54+
reconnectPredicate = { cause, attempt -> predicate(cause) && attempt < retries }
55+
}
56+
57+
/**
58+
* When configured, [RSocketConnector.connect] will return custom [RSocket] implementation,
59+
* which will try to reconnect if connection lost and [predicate] returns `true`.
60+
*
61+
* **This is not Resumption**: by using [reconnectable] only connection will be re-established, streams will fail
62+
*
63+
* @param predicate predicate for retry logic
64+
*/
65+
public fun reconnectable(predicate: suspend (cause: Throwable, attempt: Long) -> Boolean) {
66+
reconnectPredicate = predicate
67+
}
68+
4469
public class ConnectionConfigBuilder internal constructor() {
4570
public var keepAlive: KeepAlive = KeepAlive()
4671
public var payloadMimeType: PayloadMimeType = PayloadMimeType()
@@ -71,6 +96,7 @@ public class RSocketConnectorBuilder internal constructor() {
7196
interceptors.build(),
7297
connectionConfig.producer(),
7398
acceptor ?: defaultAcceptor,
99+
reconnectPredicate
74100
)
75101

76102
private companion object {

rsocket-core/src/commonMain/kotlin/io/rsocket/kotlin/core/RSocketServer.kt

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -64,7 +64,7 @@ class RSocketServer internal constructor(
6464

6565
private fun Connection.wrapConnection(): Connection =
6666
interceptors.wrapConnection(this)
67-
.logging(loggerFactory.logger("io.rsocket.kotlin.frame.Frame"))
67+
.logging(loggerFactory.logger("io.rsocket.kotlin.frame"))
6868

6969
private suspend fun Connection.failSetup(error: RSocketError.Setup): Nothing {
7070
sendFrame(ErrorFrame(0, error))

0 commit comments

Comments
 (0)