Skip to content

Commit 939c67c

Browse files
reduce allocations in okhttp transport (#69)
1 parent 61976a8 commit 939c67c

File tree

6 files changed

+56
-102
lines changed

6 files changed

+56
-102
lines changed

rsocket-core/src/main/kotlin/io/rsocket/kotlin/internal/Exceptions.kt

Lines changed: 0 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -22,18 +22,6 @@ import io.rsocket.kotlin.internal.frame.ErrorFrameFlyweight
2222

2323
internal object Exceptions {
2424

25-
/**
26-
* Creates [Throwable] with no stack trace
27-
*/
28-
fun <T : Throwable> noStacktrace(ex: T): T {
29-
ex.stackTrace = arrayOf(StackTraceElement(
30-
ex.javaClass.name,
31-
"<init>",
32-
null,
33-
-1))
34-
return ex
35-
}
36-
3725
/**
3826
* Creates [RuntimeException] from given Error [Frame]
3927
*/

rsocket-core/src/main/kotlin/io/rsocket/kotlin/internal/RSocketRequester.kt

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,6 @@ import io.reactivex.processors.UnicastProcessor
2323
import io.rsocket.kotlin.*
2424
import io.rsocket.kotlin.exceptions.ApplicationException
2525
import io.rsocket.kotlin.exceptions.ChannelRequestException
26-
import io.rsocket.kotlin.internal.Exceptions.noStacktrace
2726
import org.reactivestreams.Publisher
2827
import org.reactivestreams.Subscriber
2928
import org.reactivestreams.Subscription
@@ -358,6 +357,6 @@ internal class RSocketRequester(
358357
}
359358

360359
companion object {
361-
private val closedException = noStacktrace(ClosedChannelException())
360+
private val closedException = ClosedChannelException()
362361
}
363362
}

rsocket-core/src/main/kotlin/io/rsocket/kotlin/internal/RSocketResponder.kt

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,6 @@ import io.rsocket.kotlin.internal.RSocketResponder.DisposableSubscription.Compan
2626
import io.rsocket.kotlin.exceptions.ApplicationException
2727
import io.rsocket.kotlin.internal.frame.FrameHeaderFlyweight.FLAGS_C
2828
import io.rsocket.kotlin.internal.frame.FrameHeaderFlyweight.FLAGS_M
29-
import io.rsocket.kotlin.internal.Exceptions.noStacktrace
3029
import io.rsocket.kotlin.DefaultPayload
3130
import org.reactivestreams.Publisher
3231
import org.reactivestreams.Subscriber
@@ -298,6 +297,6 @@ internal class RSocketResponder(
298297
}
299298

300299
companion object {
301-
private val closedException = noStacktrace(ClosedChannelException())
300+
private val closedException = ClosedChannelException()
302301
}
303302
}

rsocket-core/src/test/kotlin/io/rsocket/kotlin/util/ExceptionsTest.kt

Lines changed: 0 additions & 41 deletions
This file was deleted.

rsocket-transport-okhttp/src/main/kotlin/io/rsocket/transport/okhttp/OkWebsocket.kt

Lines changed: 21 additions & 45 deletions
Original file line numberDiff line numberDiff line change
@@ -16,8 +16,7 @@
1616

1717
package io.rsocket.transport.okhttp
1818

19-
import io.netty.buffer.ByteBuf
20-
import io.netty.buffer.Unpooled
19+
import io.netty.buffer.ByteBufAllocator
2120
import io.reactivex.Completable
2221
import io.reactivex.Flowable
2322
import io.reactivex.Single
@@ -35,9 +34,7 @@ internal class OkWebsocket(client: OkHttpClient,
3534
private val isOpen = AtomicBoolean()
3635
@Volatile
3736
private var failErr: ClosedChannelException? = null
38-
private val defFailErr by lazy {
39-
noStacktrace(ClosedChannelException())
40-
}
37+
private val defFailErr by lazy { ClosedChannelException() }
4138
private val connection = BehaviorProcessor.create<OkHttpWebSocketConnection>()
4239
private val frames = UnicastProcessor.create<Frame>()
4340

@@ -48,23 +45,15 @@ internal class OkWebsocket(client: OkHttpClient,
4845
}
4946

5047
override fun onMessage(webSocket: WebSocket?, bytes: ByteString) {
51-
val msgBuffer = Unpooled.wrappedBuffer(bytes.asByteBuffer())
52-
val frameBuffer = writeFrame(msgBuffer)
53-
54-
frames.onNext(Frame.from(frameBuffer))
55-
}
48+
val messageSize = bytes.size()
49+
val buffer = allocator.buffer(messageSize + frameLengthSize)
5650

57-
private fun writeFrame(msgBuffer: ByteBuf): ByteBuf {
58-
val msgSize = msgBuffer.readableBytes()
59-
val frameSize = msgSize + frameLengthSize
60-
val frameBuffer = Unpooled.buffer(frameSize, frameSize)
51+
buffer.writeByte(messageSize shr 16)
52+
buffer.writeByte(messageSize shr 8)
53+
buffer.writeByte(messageSize)
54+
buffer.writeBytes(bytes.toByteArray())
6155

62-
frameBuffer.writeByte(msgSize shr 16)
63-
frameBuffer.writeByte(msgSize shr 8)
64-
frameBuffer.writeByte(msgSize)
65-
frameBuffer.writeBytes(msgBuffer)
66-
67-
return frameBuffer
56+
frames.onNext(Frame.from(buffer))
6857
}
6958

7059
override fun onClosed(webSocket: WebSocket?,
@@ -98,14 +87,12 @@ internal class OkWebsocket(client: OkHttpClient,
9887

9988
fun send(frames: Publisher<Frame>): Completable =
10089
Flowable.fromPublisher(frames)
101-
.map { it.content() }
102-
.map { byteBuf ->
103-
val byteString = ByteString.of(
104-
byteBuf.skipBytes(frameLengthSize).nioBuffer())
105-
byteBuf.release()
106-
byteString
107-
}
108-
.flatMapCompletable { ws.sendAsync(it) }
90+
.map { frame ->
91+
val content = frame.content()
92+
val contentByteString = ByteString.of(content.skipBytes(frameLengthSize).nioBuffer())
93+
frame.release()
94+
ws.sendOrThrowOnFailure(contentByteString)
95+
}.ignoreElements()
10996

11097
fun close(): Completable = Completable.create { e ->
11198
ws.close(normalClose, "close")
@@ -118,27 +105,16 @@ internal class OkWebsocket(client: OkHttpClient,
118105

119106
internal fun isOpen(): Boolean = isOpen.get()
120107

121-
private fun WebSocket.sendAsync(bytes: ByteString): Completable =
122-
Completable.create { e ->
123-
if (send(bytes))
124-
e.onComplete()
125-
else
126-
e.onError(failErr ?: defFailErr)
127-
}
108+
private fun WebSocket.sendOrThrowOnFailure(bytes: ByteString) {
109+
if (!send(bytes)) {
110+
throw (failErr ?: defFailErr)
111+
}
112+
}
128113

129114
companion object {
130115
private const val normalClose = 1000
131116
private const val frameLengthSize = 3
132-
133-
private fun <T : Throwable> noStacktrace(ex: T): T {
134-
ex.stackTrace = arrayOf(StackTraceElement(
135-
ex.javaClass.name,
136-
"<init>",
137-
null,
138-
-1))
139-
return ex
140-
}
141-
117+
private val allocator = ByteBufAllocator.DEFAULT
142118
}
143119
}
144120

Lines changed: 33 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,33 @@
1+
#
2+
# Copyright 2015-2018 the original author or authors.
3+
# <p>
4+
# Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
5+
# the License. You may obtain a copy of the License at
6+
# <p>
7+
# http://www.apache.org/licenses/LICENSE-2.0
8+
# <p>
9+
# Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
10+
# an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
11+
# specific language governing permissions and limitations under the License.
12+
#
13+
14+
15+
#
16+
#
17+
# Licensed under the Apache License, Version 2.0 (the "License");
18+
# you may not use this file except in compliance with the License.
19+
# You may obtain a copy of the License at
20+
#
21+
# http://www.apache.org/licenses/LICENSE-2.0
22+
#
23+
# Unless required by applicable law or agreed to in writing, software
24+
# distributed under the License is distributed on an "AS IS" BASIS,
25+
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
26+
# See the License for the specific language governing permissions and
27+
# limitations under the License.
28+
#
29+
log4j.rootLogger=INFO, stdout
30+
31+
log4j.appender.stdout=org.apache.log4j.ConsoleAppender
32+
log4j.appender.stdout.layout=org.apache.log4j.PatternLayout
33+
log4j.appender.stdout.layout.ConversionPattern=%d{dd MMM yyyy HH:mm:ss,SSS} %5p [%t] (%F:%L) - %m%n

0 commit comments

Comments
 (0)