Skip to content

Commit e2cfb10

Browse files
authored
Reworks configuration API using builders instead of data classes (#113)
* Introduce transports API (so reconnect and resume will be easier to support from client side) * rename plugin to interceptor * remove for now server TCP transport support
1 parent f9259fa commit e2cfb10

File tree

68 files changed

+1086
-912
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

68 files changed

+1086
-912
lines changed

README.md

Lines changed: 33 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,7 @@ Transports are implemented based on [ktor](https://github.com/ktorio/ktor) to en
1717
So it depends on `ktor` engines for available transports and platforms (JVM, JS, Native):
1818
* JVM - TCP and WebSocket for both client and server
1919
* JS - WebSocket client only
20-
* Native - TCP for both client and server (linux x64, macos, ios, watchos, tvos)
20+
* Native - TCP client only (linux x64, macos, ios, watchos, tvos)
2121

2222
## Interactions
2323

@@ -113,27 +113,30 @@ dependencies {
113113
//create ktor client
114114
val client = HttpClient(CIO) {
115115
install(WebSockets)
116-
install(RSocketClientSupport) {
117-
//configure rSocket client (all values have defaults)
118-
119-
keepAlive = KeepAlive(
120-
interval = 30.seconds,
121-
maxLifetime = 2.minutes
122-
)
123-
124-
//payload for setup frame
125-
setupPayload = Payload(...)
116+
install(RSocketSupport) {
117+
connector = RSocketConnector {
118+
//configure rSocket connector (all values have defaults)
119+
connectionConfig {
120+
keepAlive = KeepAlive(
121+
interval = 30.seconds,
122+
maxLifetime = 2.minutes
123+
)
124+
125+
//payload for setup frame
126+
setupPayload { Payload("hello world") }
127+
128+
//mime types
129+
payloadMimeType = PayloadMimeType(
130+
data = "application/json",
131+
metadata = "application/json"
132+
)
133+
}
126134

127-
//mime types
128-
payloadMimeType = PayloadMimeType(
129-
data = "application/json",
130-
metadata = "application/json"
131-
)
132-
133-
//optional acceptor for server requests
134-
acceptor = {
135-
RSocketRequestHandler {
136-
requestResponse = { it } //echo request payload
135+
//optional acceptor for server requests
136+
acceptor {
137+
RSocketRequestHandler {
138+
requestResponse { it } //echo request payload
139+
}
137140
}
138141
}
139142
}
@@ -156,27 +159,28 @@ stream.take(5).collect { payload: Payload ->
156159
```kotlin
157160
//create ktor server
158161
embeddedServer(CIO) {
159-
install(RSocketServerSupport) {
162+
install(RSocketSupport) {
160163
//configure rSocket server (all values have defaults)
161-
162-
//install interceptors
163-
plugin = Plugin(
164-
connection = listOf(::SomeConnectionInterceptor)
165-
)
164+
server = RSocketServer {
165+
//install interceptors
166+
interceptors {
167+
forConnection(::SomeConnectionInterceptor)
168+
}
169+
}
166170
}
167171
//configure routing
168172
routing {
169173
//configure route `url:port/rsocket`
170174
rSocket("rsocket") {
171175
RSocketRequestHandler {
172176
//handler for request/response
173-
requestResponse = { request: Payload ->
177+
requestResponse { request: Payload ->
174178
//... some work here
175179
delay(500) // work emulation
176180
Payload("data", "metadata")
177181
}
178182
//handler for request/stream
179-
requestStream = { request: Payload ->
183+
requestStream { request: Payload ->
180184
flow {
181185
repeat(1000) { i ->
182186
emit(Payload("data: $i"))

benchmarks/src/kotlinMain/kotlin/io/rsocket/kotlin/benchmarks/RSocketKotlinBenchmark.kt

Lines changed: 15 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -16,13 +16,11 @@
1616

1717
package io.rsocket.kotlin.benchmarks
1818

19-
import io.ktor.utils.io.core.*
2019
import io.rsocket.kotlin.*
21-
import io.rsocket.kotlin.connection.*
2220
import io.rsocket.kotlin.core.*
2321
import io.rsocket.kotlin.payload.*
22+
import io.rsocket.kotlin.transport.local.*
2423
import kotlinx.coroutines.*
25-
import kotlinx.coroutines.channels.*
2624
import kotlinx.coroutines.flow.*
2725
import kotlin.random.*
2826

@@ -38,28 +36,22 @@ class RSocketKotlinBenchmark : RSocketBenchmark<Payload>() {
3836
payload = createPayload(payloadSize)
3937
payloadsFlow = flow { repeat(5000) { emit(payload.copy()) } }
4038

41-
val clientChannel = Channel<ByteReadPacket>(Channel.UNLIMITED)
42-
val serverChannel = Channel<ByteReadPacket>(Channel.UNLIMITED)
43-
val serverConnection = LocalConnection("server", clientChannel, serverChannel)
44-
val clientConnection = LocalConnection("client", serverChannel, clientChannel)
45-
46-
client = runBlocking {
47-
launch {
48-
server = RSocketServer(ConnectionProvider(serverConnection)).start {
49-
RSocketRequestHandler {
50-
requestResponse = {
51-
it.release()
52-
payload
53-
}
54-
requestStream = {
55-
it.release()
56-
payloadsFlow
57-
}
58-
requestChannel = { it }
59-
}
39+
val localServer = LocalServer()
40+
server = RSocketServer().bind(localServer) {
41+
RSocketRequestHandler {
42+
requestResponse {
43+
it.release()
44+
payload
45+
}
46+
requestStream {
47+
it.release()
48+
payloadsFlow
6049
}
50+
requestChannel { it }
6151
}
62-
RSocketConnector(ConnectionProvider(clientConnection)).connect()
52+
}
53+
return runBlocking {
54+
RSocketConnector().connect(localServer)
6355
}
6456
}
6557

examples/interactions/src/jvmMain/kotlin/RequestChannelExample.kt

Lines changed: 12 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -15,33 +15,30 @@
1515
*/
1616

1717
import io.rsocket.kotlin.*
18-
import io.rsocket.kotlin.connection.*
18+
import io.rsocket.kotlin.core.*
1919
import io.rsocket.kotlin.payload.*
20+
import io.rsocket.kotlin.transport.local.*
2021
import kotlinx.coroutines.*
2122
import kotlinx.coroutines.flow.*
2223

2324

2425
fun main(): Unit = runBlocking {
25-
val (clientConnection, serverConnection) = SimpleLocalConnection()
26-
27-
launch {
28-
serverConnection.startServer {
29-
RSocketRequestHandler {
30-
requestChannel = { request ->
31-
request.buffer(3).take(3).flatMapConcat { payload ->
32-
val data = payload.data.readText()
33-
flow {
34-
repeat(3) {
35-
emit(Payload("$data(copy $it)"))
36-
}
26+
val server = LocalServer()
27+
RSocketServer().bind(server) {
28+
RSocketRequestHandler {
29+
requestChannel { request ->
30+
request.buffer(3).take(3).flatMapConcat { payload ->
31+
val data = payload.data.readText()
32+
flow {
33+
repeat(3) {
34+
emit(Payload("$data(copy $it)"))
3735
}
3836
}
3937
}
4038
}
4139
}
4240
}
43-
44-
val rSocket = clientConnection.connectClient()
41+
val rSocket = RSocketConnector().connect(server)
4542

4643
val request = flow {
4744
emit(Payload("Hello"))

examples/interactions/src/jvmMain/kotlin/RequestResponseErrorExample.kt

Lines changed: 10 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -15,26 +15,23 @@
1515
*/
1616

1717
import io.rsocket.kotlin.*
18-
import io.rsocket.kotlin.connection.*
18+
import io.rsocket.kotlin.core.*
1919
import io.rsocket.kotlin.payload.*
20+
import io.rsocket.kotlin.transport.local.*
2021
import kotlinx.coroutines.*
2122

2223
fun main(): Unit = runBlocking {
23-
val (clientConnection, serverConnection) = SimpleLocalConnection()
24-
25-
launch {
26-
serverConnection.startServer {
27-
RSocketRequestHandler {
28-
requestResponse = {
29-
val data = it.data.readText()
30-
if ("hello" in data) Payload("hello client")
31-
else error("I don't understand you")
32-
}
24+
val server = LocalServer()
25+
RSocketServer().bind(server) {
26+
RSocketRequestHandler {
27+
requestResponse {
28+
val data = it.data.readText()
29+
if ("hello" in data) Payload("hello client")
30+
else error("I don't understand you")
3331
}
3432
}
3533
}
36-
37-
val rSocket = clientConnection.connectClient()
34+
val rSocket = RSocketConnector().connect(server)
3835

3936
val response = rSocket.requestResponse(Payload("hello server"))
4037

examples/interactions/src/jvmMain/kotlin/RequestResponseExample.kt

Lines changed: 12 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -15,28 +15,25 @@
1515
*/
1616

1717
import io.rsocket.kotlin.*
18-
import io.rsocket.kotlin.connection.*
18+
import io.rsocket.kotlin.core.*
1919
import io.rsocket.kotlin.payload.*
20+
import io.rsocket.kotlin.transport.local.*
2021
import kotlinx.coroutines.*
2122

2223
fun main(): Unit = runBlocking {
23-
val (clientConnection, serverConnection) = SimpleLocalConnection()
24-
25-
launch {
26-
serverConnection.startServer {
27-
RSocketRequestHandler {
28-
requestResponse = {
29-
val data = it.data.readText()
30-
val metadata = it.metadata?.readText()
31-
println("Server received payload: data=$data, metadata=$metadata")
32-
33-
Payload(data.repeat(3), "SERVER_METADATA") //response
34-
}
24+
val server = LocalServer()
25+
RSocketServer().bind(server) {
26+
RSocketRequestHandler {
27+
requestResponse {
28+
val data = it.data.readText()
29+
val metadata = it.metadata?.readText()
30+
println("Server received payload: data=$data, metadata=$metadata")
31+
32+
Payload(data.repeat(3), "SERVER_METADATA") //response
3533
}
3634
}
3735
}
38-
39-
val rSocket = clientConnection.connectClient()
36+
val rSocket = RSocketConnector().connect(server)
4037

4138
val response = rSocket.requestResponse(Payload("Hello", "World"))
4239

examples/interactions/src/jvmMain/kotlin/RequestStreamExample.kt

Lines changed: 15 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -15,39 +15,36 @@
1515
*/
1616

1717
import io.rsocket.kotlin.*
18-
import io.rsocket.kotlin.connection.*
18+
import io.rsocket.kotlin.core.*
1919
import io.rsocket.kotlin.payload.*
20+
import io.rsocket.kotlin.transport.local.*
2021
import kotlinx.coroutines.*
2122
import kotlinx.coroutines.flow.*
2223

2324
fun main(): Unit = runBlocking {
24-
val (clientConnection, serverConnection) = SimpleLocalConnection()
25+
val server = LocalServer()
26+
RSocketServer().bind(server) {
27+
RSocketRequestHandler {
28+
requestStream {
29+
val data = it.data.readText()
30+
val metadata = it.metadata?.readText()
31+
println("Server received payload: data=$data, metadata=$metadata")
2532

26-
launch {
27-
serverConnection.startServer {
28-
RSocketRequestHandler {
29-
requestStream = {
30-
val data = it.data.readText()
31-
val metadata = it.metadata?.readText()
32-
println("Server received payload: data=$data, metadata=$metadata")
33-
34-
flow {
35-
repeat(10) { i ->
36-
emit(Payload("Payload: $i", metadata))
37-
println("Server sent: $i")
38-
}
33+
flow {
34+
repeat(10) { i ->
35+
emit(Payload("Payload: $i", metadata))
36+
println("Server sent: $i")
3937
}
4038
}
4139
}
4240
}
4341
}
44-
45-
val rSocket = clientConnection.connectClient()
42+
val rSocket = RSocketConnector().connect(server)
4643

4744
val response = rSocket.requestStream(Payload("Hello", "World"))
4845

4946
response
50-
.buffer(2) //use buffer as first operator to use RequestN semantic
47+
.buffer(2) //use buffer as first operator to use RequestN semantic, so request by 2 elements
5148
.map { it.data.readText().substringAfter("Payload: ").toInt() }
5249
.take(2)
5350
.collect {

examples/interactions/src/jvmMain/kotlin/ServerRequestExample.kt

Lines changed: 20 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -15,42 +15,36 @@
1515
*/
1616

1717
import io.rsocket.kotlin.*
18-
import io.rsocket.kotlin.connection.*
1918
import io.rsocket.kotlin.core.*
2019
import io.rsocket.kotlin.payload.*
20+
import io.rsocket.kotlin.transport.local.*
2121
import kotlinx.coroutines.*
2222

2323
fun main(): Unit = runBlocking {
24-
val (clientConnection, serverConnection) = SimpleLocalConnection()
25-
26-
launch {
27-
serverConnection.startServer { clientRSocket ->
28-
RSocketRequestHandler {
29-
requestResponse = {
30-
val clientRequest = it.data.readText()
31-
println("Server got from client request: $clientRequest")
32-
val response = clientRSocket.requestResponse(Payload("What happens?"))
33-
val clientResponse = response.data.readText()
34-
println("Server got from client response: $clientResponse")
35-
Payload("I'm frustrated because of `$clientResponse`")
36-
}
24+
val server = LocalServer()
25+
RSocketServer().bind(server) {
26+
RSocketRequestHandler {
27+
requestResponse {
28+
val clientRequest = it.data.readText()
29+
println("Server got from client request: $clientRequest")
30+
val response = requester.requestResponse(Payload("What happens?"))
31+
val clientResponse = response.data.readText()
32+
println("Server got from client response: $clientResponse")
33+
Payload("I'm frustrated because of `$clientResponse`")
3734
}
3835
}
3936
}
40-
41-
val rSocket = clientConnection.connectClient(
42-
RSocketConnectorConfiguration(
43-
acceptor = {
44-
RSocketRequestHandler {
45-
requestResponse = {
46-
val serverRequest = it.data.readText()
47-
println("Client got from server request: $serverRequest")
48-
Payload("I'm client!")
49-
}
37+
val rSocket = RSocketConnector {
38+
acceptor {
39+
RSocketRequestHandler {
40+
requestResponse {
41+
val serverRequest = it.data.readText()
42+
println("Client got from server request: $serverRequest")
43+
Payload("I'm client!")
5044
}
5145
}
52-
)
53-
)
46+
}
47+
}.connect(server)
5448

5549
val response = rSocket.requestResponse(Payload("How are you server?"))
5650
val data = response.data.readText()

0 commit comments

Comments
 (0)