From 30ef6cdd79274170e776a3a8be002a139bf5f787 Mon Sep 17 00:00:00 2001 From: Stefan Bratanov Date: Thu, 14 Sep 2023 15:58:41 +0100 Subject: [PATCH 1/3] [Yamux] Implement basic backpressure --- .../io/libp2p/etc/util/netty/mux/AbstractMuxHandler.kt | 2 ++ .../kotlin/io/libp2p/etc/util/netty/mux/MuxChannel.kt | 4 ++++ .../src/main/kotlin/io/libp2p/mux/mplex/MplexHandler.kt | 4 ++++ .../src/main/kotlin/io/libp2p/mux/yamux/YamuxHandler.kt | 9 +++++++++ 4 files changed, 19 insertions(+) diff --git a/libp2p/src/main/kotlin/io/libp2p/etc/util/netty/mux/AbstractMuxHandler.kt b/libp2p/src/main/kotlin/io/libp2p/etc/util/netty/mux/AbstractMuxHandler.kt index e072ef8b5..68fd27a3d 100644 --- a/libp2p/src/main/kotlin/io/libp2p/etc/util/netty/mux/AbstractMuxHandler.kt +++ b/libp2p/src/main/kotlin/io/libp2p/etc/util/netty/mux/AbstractMuxHandler.kt @@ -83,6 +83,8 @@ abstract class AbstractMuxHandler() : */ abstract fun releaseMessage(msg: TData) + abstract fun isChildWritable(child: MuxChannel): Boolean + abstract fun onChildWrite(child: MuxChannel, data: TData) protected fun onRemoteOpen(id: MuxId) { diff --git a/libp2p/src/main/kotlin/io/libp2p/etc/util/netty/mux/MuxChannel.kt b/libp2p/src/main/kotlin/io/libp2p/etc/util/netty/mux/MuxChannel.kt index 855046c5a..84680d902 100644 --- a/libp2p/src/main/kotlin/io/libp2p/etc/util/netty/mux/MuxChannel.kt +++ b/libp2p/src/main/kotlin/io/libp2p/etc/util/netty/mux/MuxChannel.kt @@ -32,6 +32,10 @@ class MuxChannel( initializer(this) } + override fun isWritable(): Boolean { + return super.isWritable() && parent.isChildWritable(this) + } + override fun doWrite(buf: ChannelOutboundBuffer) { while (true) { val msg = buf.current() ?: break diff --git a/libp2p/src/main/kotlin/io/libp2p/mux/mplex/MplexHandler.kt b/libp2p/src/main/kotlin/io/libp2p/mux/mplex/MplexHandler.kt index 4e061cbab..4d32cdedf 100644 --- a/libp2p/src/main/kotlin/io/libp2p/mux/mplex/MplexHandler.kt +++ b/libp2p/src/main/kotlin/io/libp2p/mux/mplex/MplexHandler.kt @@ -33,6 +33,10 @@ open class MplexHandler( } } + override fun isChildWritable(child: MuxChannel): Boolean { + return true + } + override fun onChildWrite(child: MuxChannel, data: ByteBuf) { val ctx = getChannelHandlerContext() data.sliceMaxSize(maxFrameDataLength) diff --git a/libp2p/src/main/kotlin/io/libp2p/mux/yamux/YamuxHandler.kt b/libp2p/src/main/kotlin/io/libp2p/mux/yamux/YamuxHandler.kt index 79fc1049f..16794f577 100644 --- a/libp2p/src/main/kotlin/io/libp2p/mux/yamux/YamuxHandler.kt +++ b/libp2p/src/main/kotlin/io/libp2p/mux/yamux/YamuxHandler.kt @@ -165,6 +165,15 @@ open class YamuxHandler( goAwayPromise.complete(msg.length) } + override fun isChildWritable(child: MuxChannel): Boolean { + val windowSize = windowSizes[child.id]?.send + return if (windowSize == null) { + false + } else { + windowSize.get() > 0 + } + } + override fun onChildWrite(child: MuxChannel, data: ByteBuf) { val windowSize = windowSizes[child.id]?.send if (windowSize == null) { From 2da811128bbfabb1d631a4c26665f3a1da6740ea Mon Sep 17 00:00:00 2001 From: Stefan Bratanov Date: Thu, 14 Sep 2023 16:41:30 +0100 Subject: [PATCH 2/3] adding comments --- libp2p/src/main/kotlin/io/libp2p/mux/yamux/YamuxHandler.kt | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/libp2p/src/main/kotlin/io/libp2p/mux/yamux/YamuxHandler.kt b/libp2p/src/main/kotlin/io/libp2p/mux/yamux/YamuxHandler.kt index 16794f577..f44da2821 100644 --- a/libp2p/src/main/kotlin/io/libp2p/mux/yamux/YamuxHandler.kt +++ b/libp2p/src/main/kotlin/io/libp2p/mux/yamux/YamuxHandler.kt @@ -201,12 +201,13 @@ open class YamuxHandler( val frame = YamuxFrame(child.id, YamuxType.DATA, 0, length.toLong(), slicedData) getChannelHandlerContext().writeAndFlush(frame) } else { - // wait until the window is increased to send + // add to internal outbound buffer until the window is increased addToSendBuffer(child, data) } } } + // Can't rely only on the Netty outbound buffer to handle window updates, so specifying an internal outbound buffer private fun addToSendBuffer(child: MuxChannel, data: ByteBuf) { val buffer = sendBuffers.getOrPut(child.id) { SendBuffer(child.id) } buffer.add(data) From e43d32ee4641b5a6c89584291a2d09594ba5267a Mon Sep 17 00:00:00 2001 From: Stefan Bratanov Date: Thu, 14 Sep 2023 16:56:05 +0100 Subject: [PATCH 3/3] change javadoc --- .../src/main/kotlin/io/libp2p/core/mux/StreamMuxerProtocol.kt | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/libp2p/src/main/kotlin/io/libp2p/core/mux/StreamMuxerProtocol.kt b/libp2p/src/main/kotlin/io/libp2p/core/mux/StreamMuxerProtocol.kt index 3f7f460a0..ab9f1fbb5 100644 --- a/libp2p/src/main/kotlin/io/libp2p/core/mux/StreamMuxerProtocol.kt +++ b/libp2p/src/main/kotlin/io/libp2p/core/mux/StreamMuxerProtocol.kt @@ -22,7 +22,7 @@ fun interface StreamMuxerProtocol { } /** - * @param maxBufferedConnectionWrites the maximum amount of bytes in the write buffer per connection + * @param maxBufferedConnectionWrites the maximum amount of bytes in the internal write buffer per connection */ @JvmStatic @JvmOverloads