Skip to content

Commit 4935fc7

Browse files
authored
Introduces reworked Payload API and new Metadata API (#117)
Composite Metadata support Extensions Metadata support (Routing, Tracing, Auth, PerStream)
1 parent 47b0368 commit 4935fc7

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

50 files changed

+2192
-162
lines changed

benchmarks/src/kotlinMain/kotlin/io/rsocket/kotlin/benchmarks/RSocketKotlinBenchmark.kt

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@
1616

1717
package io.rsocket.kotlin.benchmarks
1818

19+
import io.ktor.utils.io.core.*
1920
import io.rsocket.kotlin.*
2021
import io.rsocket.kotlin.core.*
2122
import io.rsocket.kotlin.payload.*
@@ -62,9 +63,9 @@ class RSocketKotlinBenchmark : RSocketBenchmark<Payload>() {
6263
}
6364
}
6465

65-
override fun createPayload(size: Int): Payload = if (size == 0) Payload.Empty else Payload.wrap(
66-
data = ByteArray(size / 2).also { Random.nextBytes(it) },
67-
metadata = ByteArray(size / 2).also { Random.nextBytes(it) }
66+
override fun createPayload(size: Int): Payload = if (size == 0) Payload.Empty else Payload(
67+
data = ByteReadPacket(ByteArray(size / 2).also { Random.nextBytes(it) }),
68+
metadata = ByteReadPacket(ByteArray(size / 2).also { Random.nextBytes(it) })
6869
)
6970

7071
override fun releasePayload(payload: Payload) {

build.gradle.kts

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -206,13 +206,18 @@ subprojects {
206206
if (name.contains("test", ignoreCase = true) || project.name == "rsocket-test") {
207207
useExperimentalAnnotation("kotlin.time.ExperimentalTime")
208208
useExperimentalAnnotation("kotlin.ExperimentalStdlibApi")
209+
209210
useExperimentalAnnotation("kotlinx.coroutines.ExperimentalCoroutinesApi")
210211
useExperimentalAnnotation("kotlinx.coroutines.InternalCoroutinesApi")
211212
useExperimentalAnnotation("kotlinx.coroutines.ObsoleteCoroutinesApi")
212213
useExperimentalAnnotation("kotlinx.coroutines.FlowPreview")
214+
213215
useExperimentalAnnotation("io.ktor.util.KtorExperimentalAPI")
214216
useExperimentalAnnotation("io.ktor.util.InternalAPI")
215217
useExperimentalAnnotation("io.ktor.utils.io.core.internal.DangerousInternalIoApi")
218+
219+
useExperimentalAnnotation("io.rsocket.kotlin.TransportApi")
220+
useExperimentalAnnotation("io.rsocket.kotlin.ExperimentalMetadataApi")
216221
}
217222
}
218223
}
Lines changed: 59 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,59 @@
1+
/*
2+
* Copyright 2015-2020 the original author or authors.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
import io.ktor.utils.io.core.*
18+
import io.rsocket.kotlin.*
19+
import io.rsocket.kotlin.core.*
20+
import io.rsocket.kotlin.metadata.*
21+
import io.rsocket.kotlin.payload.*
22+
23+
@OptIn(ExperimentalMetadataApi::class)
24+
public fun main() {
25+
val p1 = buildPayload {
26+
data("Hello")
27+
28+
metadata("some text")
29+
}
30+
31+
println("Case: 1")
32+
println(p1.data.readText())
33+
println(p1.metadata?.readText())
34+
35+
val p2 = buildPayload {
36+
data("Hello")
37+
38+
metadata(RoutingMetadata("tag1", "tag2"))
39+
}
40+
41+
val tags = p2.metadata?.read(RoutingMetadata)?.tags.orEmpty()
42+
println("Case: 2")
43+
println(tags)
44+
45+
46+
val p3 = buildPayload {
47+
data("hello")
48+
49+
compositeMetadata {
50+
add(RoutingMetadata("tag3", "t4"))
51+
add(RawMetadata(WellKnownMimeType.ApplicationJson, buildPacket { writeText("{s: 2}") }))
52+
}
53+
}
54+
55+
val cm = p3.metadata!!.read(CompositeMetadata)
56+
println("Case: 3")
57+
println(cm[RoutingMetadata].tags)
58+
println(cm[WellKnownMimeType.ApplicationJson].readText())
59+
}
Lines changed: 28 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,28 @@
1+
/*
2+
* Copyright 2015-2020 the original author or authors.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
import io.ktor.utils.io.core.*
18+
import io.rsocket.kotlin.payload.*
19+
20+
/**
21+
* Simple custom [Payload] factory function with string data and metadata.
22+
* Has almost no overhead over call to [Payload] constructor with data and metadata as [ByteReadPacket].
23+
* Similar functions can be created for all needed use cases
24+
*/
25+
fun Payload(data: String, metadata: String? = null): Payload = buildPayload {
26+
data(data)
27+
if (metadata != null) metadata(metadata)
28+
}

examples/multiplatform-chat/src/clientMain/kotlin/Api.kt

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -51,6 +51,6 @@ suspend fun connectToApiUsingTCP(name: String): Api {
5151

5252
private fun connector(name: String): RSocketConnector = RSocketConnector {
5353
connectionConfig {
54-
setupPayload { Payload(name) }
54+
setupPayload { buildPayload { data(name) } }
5555
}
5656
}

examples/multiplatform-chat/src/clientMain/kotlin/PayloadWithRoute.kt

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -15,9 +15,10 @@
1515
*/
1616

1717
import io.ktor.utils.io.core.*
18+
import io.rsocket.kotlin.metadata.*
1819
import io.rsocket.kotlin.payload.*
1920

20-
fun Payload(route: String, packet: ByteReadPacket): Payload = Payload {
21+
fun Payload(route: String, packet: ByteReadPacket): Payload = buildPayload {
2122
data(packet)
22-
metadata(route)
23+
metadata(RoutingMetadata(route))
2324
}

examples/multiplatform-chat/src/commonMain/kotlin/Serialization.kt

Lines changed: 7 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@
1515
*/
1616

1717
import io.ktor.utils.io.core.*
18+
import io.rsocket.kotlin.metadata.*
1819
import io.rsocket.kotlin.payload.*
1920
import kotlinx.serialization.*
2021
import kotlinx.serialization.protobuf.*
@@ -27,20 +28,15 @@ val ConfiguredProtoBuf = ProtoBuf
2728
inline fun <reified T> ProtoBuf.decodeFromPayload(payload: Payload): T = decodeFromByteArray(payload.data.readBytes())
2829

2930
@ExperimentalSerializationApi
30-
inline fun <reified T> ProtoBuf.encodeToPayload(route: String, value: T): Payload {
31-
return kotlin.runCatching { //TODO some ktor issue...
32-
Payload {
33-
data(encodeToByteArray(value))
34-
metadata(route)
35-
}
36-
}.getOrNull() ?: Payload {
37-
data(encodeToByteArray(value))
38-
metadata(route)
39-
}
31+
inline fun <reified T> ProtoBuf.encodeToPayload(route: String, value: T): Payload = buildPayload {
32+
data(encodeToByteArray(value))
33+
metadata(RoutingMetadata(route))
4034
}
4135

4236
@ExperimentalSerializationApi
43-
inline fun <reified T> ProtoBuf.encodeToPayload(value: T): Payload = Payload(encodeToByteArray(value))
37+
inline fun <reified T> ProtoBuf.encodeToPayload(value: T): Payload = buildPayload {
38+
data(encodeToByteArray(value))
39+
}
4440

4541
@ExperimentalSerializationApi
4642
inline fun <reified I, reified O> ProtoBuf.decoding(payload: Payload, block: (I) -> O): Payload =

examples/multiplatform-chat/src/serverJvmMain/kotlin/App.kt

Lines changed: 14 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -24,13 +24,15 @@ import io.ktor.util.*
2424
import io.ktor.websocket.*
2525
import io.rsocket.kotlin.*
2626
import io.rsocket.kotlin.core.*
27+
import io.rsocket.kotlin.metadata.*
28+
import io.rsocket.kotlin.payload.*
2729
import io.rsocket.kotlin.transport.ktor.*
2830
import io.rsocket.kotlin.transport.ktor.server.*
2931
import kotlinx.coroutines.*
3032
import kotlinx.coroutines.flow.*
3133
import kotlinx.serialization.*
3234

33-
@OptIn(KtorExperimentalAPI::class, ExperimentalSerializationApi::class)
35+
@OptIn(KtorExperimentalAPI::class, ExperimentalSerializationApi::class, ExperimentalMetadataApi::class)
3436
fun main() {
3537
val proto = ConfiguredProtoBuf
3638
val users = Users()
@@ -43,6 +45,8 @@ fun main() {
4345

4446
val rSocketServer = RSocketServer()
4547

48+
fun Payload.route(): String = metadata?.read(RoutingMetadata)?.tags?.first() ?: error("No route provided")
49+
4650
//create acceptor
4751
val acceptor = ConnectionAcceptor {
4852
val userName = config.setupPayload.data.readText()
@@ -52,9 +56,7 @@ fun main() {
5256
RSocketRequestHandler {
5357
fireAndForget {
5458
withContext(session) {
55-
when (val route = it.metadata?.readText()) {
56-
null -> error("No route provided")
57-
59+
when (val route = it.route()) {
5860
"users.deleteMe" -> userApi.deleteMe()
5961

6062
else -> error("Wrong route: $route")
@@ -63,17 +65,15 @@ fun main() {
6365
}
6466
requestResponse {
6567
withContext(session) {
66-
when (val route = it.metadata?.readText()) {
67-
null -> error("No route provided")
68+
when (val route = it.route()) {
69+
"users.getMe" -> proto.encodeToPayload(userApi.getMe())
70+
"users.all" -> proto.encodeToPayload(userApi.all())
6871

69-
"users.getMe" -> proto.encodeToPayload(userApi.getMe())
70-
"users.all" -> proto.encodeToPayload(userApi.all())
72+
"chats.all" -> proto.encodeToPayload(chatsApi.all())
73+
"chats.new" -> proto.decoding<NewChat, Chat>(it) { (name) -> chatsApi.new(name) }
74+
"chats.delete" -> proto.decoding<DeleteChat>(it) { (id) -> chatsApi.delete(id) }
7175

72-
"chats.all" -> proto.encodeToPayload(chatsApi.all())
73-
"chats.new" -> proto.decoding<NewChat, Chat>(it) { (name) -> chatsApi.new(name) }
74-
"chats.delete" -> proto.decoding<DeleteChat>(it) { (id) -> chatsApi.delete(id) }
75-
76-
"messages.send" -> proto.decoding<SendMessage, Message>(it) { (chatId, content) ->
76+
"messages.send" -> proto.decoding<SendMessage, Message>(it) { (chatId, content) ->
7777
messagesApi.send(chatId, content)
7878
}
7979
"messages.history" -> proto.decoding<HistoryMessages, List<Message>>(it) { (chatId, limit) ->
@@ -85,9 +85,7 @@ fun main() {
8585
}
8686
}
8787
requestStream {
88-
when (val route = it.metadata?.readText()) {
89-
null -> error("No route provided")
90-
88+
when (val route = it.route()) {
9189
"messages.stream" -> {
9290
val (chatId, fromMessageId) = proto.decodeFromPayload<StreamMessages>(it)
9391
messagesApi.messages(chatId, fromMessageId).map { m -> proto.encodeToPayload(m) }

examples/nodejs-tcp-transport/src/jsMain/kotlin/Server.kt

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -37,7 +37,7 @@ fun main() {
3737
RSocketRequestHandler {
3838
requestResponse {
3939
println("Received: ${it.data.readText()}")
40-
Payload("Hello from nodejs")
40+
buildPayload { data("Hello from nodejs") }
4141
}
4242
}
4343
}
Lines changed: 109 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,109 @@
1+
/*
2+
* Copyright 2015-2020 the original author or authors.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
import io.ktor.utils.io.core.*
18+
import io.rsocket.kotlin.*
19+
import io.rsocket.kotlin.core.*
20+
import io.rsocket.kotlin.metadata.*
21+
import io.rsocket.kotlin.payload.*
22+
23+
@OptIn(ExperimentalMetadataApi::class)
24+
public fun metadata() {
25+
26+
//writing
27+
28+
val routing = RoutingMetadata("tag1", "route1", "something")
29+
val tracing = ZipkinTracingMetadata(
30+
kind = ZipkinTracingMetadata.Kind.Sample,
31+
traceId = 123L,
32+
spanId = 123L
33+
)
34+
val cm1 = buildCompositeMetadata {
35+
add(routing)
36+
add(tracing)
37+
}
38+
// or
39+
val cm2 = CompositeMetadata(routing, tracing)
40+
41+
//all lambdas are inline, and don't create another objects for builders
42+
//so no overhead comparing to using plain Payload(data, metadata) function
43+
val payload = buildPayload {
44+
data("Some data")
45+
metadata(cm2)
46+
// or
47+
metadata(cm1)
48+
// or
49+
compositeMetadata {
50+
add(routing)
51+
add(tracing)
52+
53+
add(WellKnownMimeType.ApplicationAvro, buildPacket { writeText("some custom metadata building") })
54+
//or
55+
val raw = RawMetadata(WellKnownMimeType.ApplicationAvro, ByteReadPacket.Empty)
56+
57+
add(raw)
58+
}
59+
}
60+
61+
//reading
62+
//property types are not needed, just for better online readability
63+
val cm: CompositeMetadata = payload.metadata?.read(CompositeMetadata) ?: return
64+
65+
val rmr = RawMetadata.reader(WellKnownMimeType.ApplicationAvro)
66+
67+
68+
val rm: RoutingMetadata = cm.get(RoutingMetadata)
69+
val tm: ZipkinTracingMetadata = cm.get(ZipkinTracingMetadata)
70+
71+
//or
72+
val rm1: RoutingMetadata = cm[RoutingMetadata]
73+
val tm1: ZipkinTracingMetadata = cm[ZipkinTracingMetadata]
74+
75+
//or
76+
val rmNull: RoutingMetadata? = cm.getOrNull(RoutingMetadata)
77+
val tmNull: ZipkinTracingMetadata? = cm.getOrNull(ZipkinTracingMetadata)
78+
79+
//or
80+
val rmList: List<RoutingMetadata> = cm.list(RoutingMetadata) //spec allow this
81+
82+
//or
83+
val c: Boolean = cm.contains(RoutingMetadata)
84+
val c2: Boolean = RoutingMetadata in cm
85+
86+
//or
87+
88+
cm.entries.forEach {
89+
if (it.hasMimeTypeOf(RoutingMetadata)) {
90+
val rm2: RoutingMetadata = it.read(RoutingMetadata)
91+
}
92+
//or
93+
val tm2: ZipkinTracingMetadata? = it.readOrNull(ZipkinTracingMetadata)
94+
95+
96+
}
97+
98+
//usage
99+
100+
rm.tags.forEach {
101+
println("tag: $it")
102+
}
103+
104+
tm.traceId
105+
tm.traceIdHigh
106+
107+
val span = "${tm.parentSpanId}-${tm.spanId}"
108+
109+
}

0 commit comments

Comments
 (0)