From aad780e55da2564168ba4ee4837eb5d45233f26a Mon Sep 17 00:00:00 2001 From: sukun Date: Thu, 27 Mar 2025 02:00:02 +0530 Subject: [PATCH] webrtc: support receiving 256kB messages In experiments with js we've found that increasing the message size increases throughput. See: https://github.com/libp2p/specs/pull/628#issuecomment-2451491070 for details. This changes the protobuf reader for the stream to read 256kB messages. This also forces a change to the connection SCTP read buffer to be increased to about 2.5 MB, to support 1 message being buffered for 10 streams. This isn't enough to support larger messages. We most likely need to change the inferred SDP of the server to use 256kB maxMessageSize, and need some backwards compatible mechanism in the handshake to opt in to large messages. See: https://github.com/libp2p/specs/pull/628 for details --- go.mod | 10 +-- go.sum | 20 ++--- p2p/transport/webrtc/connection.go | 4 +- p2p/transport/webrtc/listener.go | 2 +- p2p/transport/webrtc/stream.go | 58 +++++++------- p2p/transport/webrtc/stream_test.go | 102 ++++++++++++++----------- p2p/transport/webrtc/stream_write.go | 19 ++++- p2p/transport/webrtc/transport.go | 10 ++- p2p/transport/webrtc/transport_test.go | 2 +- test-plans/go.mod | 10 +-- test-plans/go.sum | 20 ++--- 11 files changed, 144 insertions(+), 113 deletions(-) diff --git a/go.mod b/go.mod index 200a51437a..85bd16314a 100644 --- a/go.mod +++ b/go.mod @@ -47,11 +47,11 @@ require ( github.com/multiformats/go-varint v0.0.7 github.com/pbnjay/memory v0.0.0-20210728143218-7b4eea64cf58 github.com/pion/datachannel v1.5.10 - github.com/pion/ice/v4 v4.0.6 + github.com/pion/ice/v4 v4.0.8 github.com/pion/logging v0.2.3 - github.com/pion/sctp v1.8.36 + github.com/pion/sctp v1.8.37 github.com/pion/stun v0.6.1 - github.com/pion/webrtc/v4 v4.0.10 + github.com/pion/webrtc/v4 v4.0.14 github.com/prometheus/client_golang v1.21.0 github.com/prometheus/client_model v0.6.1 github.com/quic-go/quic-go v0.50.0 @@ -93,8 +93,8 @@ require ( github.com/pion/mdns/v2 v2.0.7 // indirect github.com/pion/randutil v0.1.0 // indirect github.com/pion/rtcp v1.2.15 // indirect - github.com/pion/rtp v1.8.11 // indirect - github.com/pion/sdp/v3 v3.0.10 // indirect + github.com/pion/rtp v1.8.13 // indirect + github.com/pion/sdp/v3 v3.0.11 // indirect github.com/pion/srtp/v3 v3.0.4 // indirect github.com/pion/stun/v3 v3.0.0 // indirect github.com/pion/transport/v2 v2.2.10 // indirect diff --git a/go.sum b/go.sum index 23e1a7c551..83ee8350e2 100644 --- a/go.sum +++ b/go.sum @@ -216,8 +216,8 @@ github.com/pion/dtls/v2 v2.2.12 h1:KP7H5/c1EiVAAKUmXyCzPiQe5+bCJrpOeKg/L05dunk= github.com/pion/dtls/v2 v2.2.12/go.mod h1:d9SYc9fch0CqK90mRk1dC7AkzzpwJj6u2GU3u+9pqFE= github.com/pion/dtls/v3 v3.0.4 h1:44CZekewMzfrn9pmGrj5BNnTMDCFwr+6sLH+cCuLM7U= github.com/pion/dtls/v3 v3.0.4/go.mod h1:R373CsjxWqNPf6MEkfdy3aSe9niZvL/JaKlGeFphtMg= -github.com/pion/ice/v4 v4.0.6 h1:jmM9HwI9lfetQV/39uD0nY4y++XZNPhvzIPCb8EwxUM= -github.com/pion/ice/v4 v4.0.6/go.mod h1:y3M18aPhIxLlcO/4dn9X8LzLLSma84cx6emMSu14FGw= +github.com/pion/ice/v4 v4.0.8 h1:ajNx0idNG+S+v9Phu4LSn2cs8JEfTsA1/tEjkkAVpFY= +github.com/pion/ice/v4 v4.0.8/go.mod h1:y3M18aPhIxLlcO/4dn9X8LzLLSma84cx6emMSu14FGw= github.com/pion/interceptor v0.1.37 h1:aRA8Zpab/wE7/c0O3fh1PqY0AJI3fCSEM5lRWJVorwI= github.com/pion/interceptor v0.1.37/go.mod h1:JzxbJ4umVTlZAf+/utHzNesY8tmRkM2lVmkS82TTj8Y= github.com/pion/logging v0.2.2/go.mod h1:k0/tDVsRCX2Mb2ZEmTqNa7CWsQPc+YYCB7Q+5pahoms= @@ -229,12 +229,12 @@ github.com/pion/randutil v0.1.0 h1:CFG1UdESneORglEsnimhUjf33Rwjubwj6xfiOXBa3mA= github.com/pion/randutil v0.1.0/go.mod h1:XcJrSMMbbMRhASFVOlj/5hQial/Y8oH/HVo7TBZq+j8= github.com/pion/rtcp v1.2.15 h1:LZQi2JbdipLOj4eBjK4wlVoQWfrZbh3Q6eHtWtJBZBo= github.com/pion/rtcp v1.2.15/go.mod h1:jlGuAjHMEXwMUHK78RgX0UmEJFV4zUKOFHR7OP+D3D0= -github.com/pion/rtp v1.8.11 h1:17xjnY5WO5hgO6SD3/NTIUPvSFw/PbLsIJyz1r1yNIk= -github.com/pion/rtp v1.8.11/go.mod h1:8uMBJj32Pa1wwx8Fuv/AsFhn8jsgw+3rUC2PfoBZ8p4= -github.com/pion/sctp v1.8.36 h1:owNudmnz1xmhfYje5L/FCav3V9wpPRePHle3Zi+P+M0= -github.com/pion/sctp v1.8.36/go.mod h1:cNiLdchXra8fHQwmIoqw0MbLLMs+f7uQ+dGMG2gWebE= -github.com/pion/sdp/v3 v3.0.10 h1:6MChLE/1xYB+CjumMw+gZ9ufp2DPApuVSnDT8t5MIgA= -github.com/pion/sdp/v3 v3.0.10/go.mod h1:88GMahN5xnScv1hIMTqLdu/cOcUkj6a9ytbncwMCq2E= +github.com/pion/rtp v1.8.13 h1:8uSUPpjSL4OlwZI8Ygqu7+h2p9NPFB+yAZ461Xn5sNg= +github.com/pion/rtp v1.8.13/go.mod h1:8uMBJj32Pa1wwx8Fuv/AsFhn8jsgw+3rUC2PfoBZ8p4= +github.com/pion/sctp v1.8.37 h1:ZDmGPtRPX9mKCiVXtMbTWybFw3z/hVKAZgU81wcOrqs= +github.com/pion/sctp v1.8.37/go.mod h1:cNiLdchXra8fHQwmIoqw0MbLLMs+f7uQ+dGMG2gWebE= +github.com/pion/sdp/v3 v3.0.11 h1:VhgVSopdsBKwhCFoyyPmT1fKMeV9nLMrEKxNOdy3IVI= +github.com/pion/sdp/v3 v3.0.11/go.mod h1:88GMahN5xnScv1hIMTqLdu/cOcUkj6a9ytbncwMCq2E= github.com/pion/srtp/v3 v3.0.4 h1:2Z6vDVxzrX3UHEgrUyIGM4rRouoC7v+NiF1IHtp9B5M= github.com/pion/srtp/v3 v3.0.4/go.mod h1:1Jx3FwDoxpRaTh1oRV8A/6G1BnFL+QI82eK4ms8EEJQ= github.com/pion/stun v0.6.1 h1:8lp6YejULeHBF8NmV8e2787BogQhduZugh5PdhDyyN4= @@ -249,8 +249,8 @@ github.com/pion/transport/v3 v3.0.7 h1:iRbMH05BzSNwhILHoBoAPxoB9xQgOaJk+591KC9P1 github.com/pion/transport/v3 v3.0.7/go.mod h1:YleKiTZ4vqNxVwh77Z0zytYi7rXHl7j6uPLGhhz9rwo= github.com/pion/turn/v4 v4.0.0 h1:qxplo3Rxa9Yg1xXDxxH8xaqcyGUtbHYw4QSCvmFWvhM= github.com/pion/turn/v4 v4.0.0/go.mod h1:MuPDkm15nYSklKpN8vWJ9W2M0PlyQZqYt1McGuxG7mA= -github.com/pion/webrtc/v4 v4.0.10 h1:Hq/JLjhqLxi+NmCtE8lnRPDr8H4LcNvwg8OxVcdv56Q= -github.com/pion/webrtc/v4 v4.0.10/go.mod h1:ViHLVaNpiuvaH8pdiuQxuA9awuE6KVzAXx3vVWilOck= +github.com/pion/webrtc/v4 v4.0.14 h1:nyds/sFRR+HvmWoBa6wrL46sSfpArE0qR883MBW96lg= +github.com/pion/webrtc/v4 v4.0.14/go.mod h1:R3+qTnQTS03UzwDarYecgioNf7DYgTsldxnCXB821Kk= github.com/pkg/errors v0.8.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0= github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM= github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= diff --git a/p2p/transport/webrtc/connection.go b/p2p/transport/webrtc/connection.go index 77b293fadb..d75c309c51 100644 --- a/p2p/transport/webrtc/connection.go +++ b/p2p/transport/webrtc/connection.go @@ -188,7 +188,7 @@ func (c *connection) OpenStream(ctx context.Context) (network.MuxedStream, error dc.Close() return nil, fmt.Errorf("detach channel failed for stream(%d): %w", streamID, err) } - str := newStream(dc, rwc, func() { c.removeStream(streamID) }) + str := newStream(dc, rwc, maxSendMessageSize, func() { c.removeStream(streamID) }) if err := c.addStream(str); err != nil { str.Reset() return nil, fmt.Errorf("failed to add stream(%d) to connection: %w", streamID, err) @@ -201,7 +201,7 @@ func (c *connection) AcceptStream() (network.MuxedStream, error) { case <-c.ctx.Done(): return nil, c.closeErr case dc := <-c.acceptQueue: - str := newStream(dc.channel, dc.stream, func() { c.removeStream(*dc.channel.ID()) }) + str := newStream(dc.channel, dc.stream, maxSendMessageSize, func() { c.removeStream(*dc.channel.ID()) }) if err := c.addStream(str); err != nil { str.Reset() return nil, err diff --git a/p2p/transport/webrtc/listener.go b/p2p/transport/webrtc/listener.go index 3534fd2c76..0ec05ec0e9 100644 --- a/p2p/transport/webrtc/listener.go +++ b/p2p/transport/webrtc/listener.go @@ -253,7 +253,7 @@ func (l *listener) setupConnection( if err != nil { return nil, err } - handshakeChannel := newStream(w.HandshakeDataChannel, rwc, func() {}) + handshakeChannel := newStream(w.HandshakeDataChannel, rwc, maxSendMessageSize, nil) // we do not yet know A's peer ID so accept any inbound remotePubKey, err := l.transport.noiseHandshake(ctx, w.PeerConnection, handshakeChannel, "", crypto.SHA256, true) if err != nil { diff --git a/p2p/transport/webrtc/stream.go b/p2p/transport/webrtc/stream.go index 39873c9d66..c92457ccb0 100644 --- a/p2p/transport/webrtc/stream.go +++ b/p2p/transport/webrtc/stream.go @@ -15,22 +15,9 @@ import ( ) const ( - // maxMessageSize is the maximum message size of the Protobuf message we send / receive. - maxMessageSize = 16384 - // maxSendBuffer is the maximum data we enqueue on the underlying data channel for writes. - // The underlying SCTP layer has an unbounded buffer for writes. We limit the amount enqueued - // per stream is limited to avoid a single stream monopolizing the entire connection. - maxSendBuffer = 2 * maxMessageSize - // sendBufferLowThreshold is the threshold below which we write more data on the underlying - // data channel. We want a notification as soon as we can write 1 full sized message. - sendBufferLowThreshold = maxSendBuffer - maxMessageSize - // maxTotalControlMessagesSize is the maximum total size of all control messages we will - // write on this stream. - // 4 control messages of size 10 bytes + 10 bytes buffer. This number doesn't need to be - // exact. In the worst case, we enqueue these many bytes more in the webrtc peer connection - // send queue. - maxTotalControlMessagesSize = 50 - + // maxSendMessageSize is the maximum message size of the Protobuf message we send / receive. + // NOTE: Change `varintOverhead` if you change this. + maxSendMessageSize = 16384 // Proto overhead assumption is 5 bytes protoOverhead = 5 // Varint overhead is assumed to be 2 bytes. This is safe since @@ -40,9 +27,20 @@ const ( // is less than or equal to 2 ^ 14, the varint will not be more than // 2 bytes in length. varintOverhead = 2 + + // maxTotalControlMessagesSize is the maximum total size of all control messages we will + // write on this stream. + // 4 control messages of size 10 bytes + 10 bytes buffer. This number doesn't need to be + // exact. In the worst case, we enqueue these many bytes more in the webrtc peer connection + // send queue. + maxTotalControlMessagesSize = 50 + // maxFINACKWait is the maximum amount of time a stream will wait to read // FIN_ACK before closing the data channel maxFINACKWait = 10 * time.Second + + // maxReceiveMessageSize is the maximum message size of the Protobuf message we receive. + maxReceiveMessageSize = 256<<10 + 1<<10 // 1kB buffer ) type receiveState uint8 @@ -79,11 +77,12 @@ type stream struct { nextMessage *pb.Message receiveState receiveState - writer pbio.Writer // concurrent writes prevented by mx - writeStateChanged chan struct{} - sendState sendState - writeDeadline time.Time - writeError error + writer pbio.Writer // concurrent writes prevented by mx + writeStateChanged chan struct{} + sendState sendState + writeDeadline time.Time + writeError error + maxSendMessageSize int controlMessageReaderOnce sync.Once // controlMessageReaderEndTime is the end time for reading FIN_ACK from the control @@ -105,20 +104,21 @@ var _ network.MuxedStream = &stream{} func newStream( channel *webrtc.DataChannel, rwc datachannel.ReadWriteCloser, + maxSendMessageSize int, onDone func(), ) *stream { s := &stream{ - reader: pbio.NewDelimitedReader(rwc, maxMessageSize), - writer: pbio.NewDelimitedWriter(rwc), - writeStateChanged: make(chan struct{}, 1), - id: *channel.ID(), - dataChannel: rwc.(*datachannel.DataChannel), - onDone: onDone, + reader: pbio.NewDelimitedReader(rwc, maxReceiveMessageSize), + writer: pbio.NewDelimitedWriter(rwc), + writeStateChanged: make(chan struct{}, 1), + id: *channel.ID(), + dataChannel: rwc.(*datachannel.DataChannel), + onDone: onDone, + maxSendMessageSize: maxSendMessageSize, } - s.dataChannel.SetBufferedAmountLowThreshold(sendBufferLowThreshold) + s.dataChannel.SetBufferedAmountLowThreshold(uint64(s.sendBufferLowThreshold())) s.dataChannel.OnBufferedAmountLow(func() { s.notifyWriteStateChanged() - }) return s } diff --git a/p2p/transport/webrtc/stream_test.go b/p2p/transport/webrtc/stream_test.go index 4153ff830a..461ed27ff8 100644 --- a/p2p/transport/webrtc/stream_test.go +++ b/p2p/transport/webrtc/stream_test.go @@ -3,6 +3,7 @@ package libp2pwebrtc import ( "crypto/rand" "errors" + "fmt" "io" "os" "sync/atomic" @@ -148,8 +149,8 @@ func TestStreamSimpleReadWriteClose(t *testing.T) { client, server := getDetachedDataChannels(t) var clientDone, serverDone atomic.Bool - clientStr := newStream(client.dc, client.rwc, func() { clientDone.Store(true) }) - serverStr := newStream(server.dc, server.rwc, func() { serverDone.Store(true) }) + clientStr := newStream(client.dc, client.rwc, maxSendMessageSize, func() { clientDone.Store(true) }) + serverStr := newStream(server.dc, server.rwc, maxSendMessageSize, func() { serverDone.Store(true) }) // send a foobar from the client n, err := clientStr.Write([]byte("foobar")) @@ -194,8 +195,8 @@ func TestStreamSimpleReadWriteClose(t *testing.T) { func TestStreamPartialReads(t *testing.T) { client, server := getDetachedDataChannels(t) - clientStr := newStream(client.dc, client.rwc, func() {}) - serverStr := newStream(server.dc, server.rwc, func() {}) + clientStr := newStream(client.dc, client.rwc, maxSendMessageSize, func() {}) + serverStr := newStream(server.dc, server.rwc, maxSendMessageSize, func() {}) _, err := serverStr.Write([]byte("foobar")) require.NoError(t, err) @@ -217,8 +218,8 @@ func TestStreamPartialReads(t *testing.T) { func TestStreamSkipEmptyFrames(t *testing.T) { client, server := getDetachedDataChannels(t) - clientStr := newStream(client.dc, client.rwc, func() {}) - serverStr := newStream(server.dc, server.rwc, func() {}) + clientStr := newStream(client.dc, client.rwc, maxSendMessageSize, func() {}) + serverStr := newStream(server.dc, server.rwc, maxSendMessageSize, func() {}) for i := 0; i < 10; i++ { require.NoError(t, serverStr.writer.WriteMsg(&pb.Message{})) @@ -252,7 +253,7 @@ func TestStreamSkipEmptyFrames(t *testing.T) { func TestStreamReadReturnsOnClose(t *testing.T) { client, _ := getDetachedDataChannels(t) - clientStr := newStream(client.dc, client.rwc, func() {}) + clientStr := newStream(client.dc, client.rwc, maxSendMessageSize, func() {}) errChan := make(chan error, 1) go func() { _, err := clientStr.Read([]byte{0}) @@ -275,8 +276,8 @@ func TestStreamResets(t *testing.T) { client, server := getDetachedDataChannels(t) var clientDone, serverDone atomic.Bool - clientStr := newStream(client.dc, client.rwc, func() { clientDone.Store(true) }) - serverStr := newStream(server.dc, server.rwc, func() { serverDone.Store(true) }) + clientStr := newStream(client.dc, client.rwc, maxSendMessageSize, func() { clientDone.Store(true) }) + serverStr := newStream(server.dc, server.rwc, maxSendMessageSize, func() { serverDone.Store(true) }) // send a foobar from the client _, err := clientStr.Write([]byte("foobar")) @@ -311,8 +312,8 @@ func TestStreamResets(t *testing.T) { func TestStreamReadDeadlineAsync(t *testing.T) { client, server := getDetachedDataChannels(t) - clientStr := newStream(client.dc, client.rwc, func() {}) - serverStr := newStream(server.dc, server.rwc, func() {}) + clientStr := newStream(client.dc, client.rwc, maxSendMessageSize, func() {}) + serverStr := newStream(server.dc, server.rwc, maxSendMessageSize, func() {}) timeout := 100 * time.Millisecond if os.Getenv("CI") != "" { @@ -342,8 +343,8 @@ func TestStreamReadDeadlineAsync(t *testing.T) { func TestStreamWriteDeadlineAsync(t *testing.T) { client, server := getDetachedDataChannels(t) - clientStr := newStream(client.dc, client.rwc, func() {}) - serverStr := newStream(server.dc, server.rwc, func() {}) + clientStr := newStream(client.dc, client.rwc, maxSendMessageSize, func() {}) + serverStr := newStream(server.dc, server.rwc, maxSendMessageSize, func() {}) _ = serverStr b := make([]byte, 1024) @@ -372,8 +373,8 @@ func TestStreamWriteDeadlineAsync(t *testing.T) { func TestStreamReadAfterClose(t *testing.T) { client, server := getDetachedDataChannels(t) - clientStr := newStream(client.dc, client.rwc, func() {}) - serverStr := newStream(server.dc, server.rwc, func() {}) + clientStr := newStream(client.dc, client.rwc, maxSendMessageSize, func() {}) + serverStr := newStream(server.dc, server.rwc, maxSendMessageSize, func() {}) serverStr.Close() b := make([]byte, 1) @@ -384,8 +385,8 @@ func TestStreamReadAfterClose(t *testing.T) { client, server = getDetachedDataChannels(t) - clientStr = newStream(client.dc, client.rwc, func() {}) - serverStr = newStream(server.dc, server.rwc, func() {}) + clientStr = newStream(client.dc, client.rwc, maxSendMessageSize, func() {}) + serverStr = newStream(server.dc, server.rwc, maxSendMessageSize, func() {}) serverStr.Reset() b = make([]byte, 1) @@ -399,8 +400,8 @@ func TestStreamCloseAfterFINACK(t *testing.T) { client, server := getDetachedDataChannels(t) done := make(chan bool, 1) - clientStr := newStream(client.dc, client.rwc, func() { done <- true }) - serverStr := newStream(server.dc, server.rwc, func() {}) + clientStr := newStream(client.dc, client.rwc, maxSendMessageSize, func() { done <- true }) + serverStr := newStream(server.dc, server.rwc, maxSendMessageSize, func() {}) go func() { err := clientStr.Close() @@ -427,8 +428,8 @@ func TestStreamFinAckAfterStopSending(t *testing.T) { client, server := getDetachedDataChannels(t) done := make(chan bool, 1) - clientStr := newStream(client.dc, client.rwc, func() { done <- true }) - serverStr := newStream(server.dc, server.rwc, func() {}) + clientStr := newStream(client.dc, client.rwc, maxSendMessageSize, func() { done <- true }) + serverStr := newStream(server.dc, server.rwc, maxSendMessageSize, func() {}) go func() { clientStr.CloseRead() @@ -460,8 +461,8 @@ func TestStreamConcurrentClose(t *testing.T) { start := make(chan bool, 2) done := make(chan bool, 2) - clientStr := newStream(client.dc, client.rwc, func() { done <- true }) - serverStr := newStream(server.dc, server.rwc, func() { done <- true }) + clientStr := newStream(client.dc, client.rwc, maxSendMessageSize, func() { done <- true }) + serverStr := newStream(server.dc, server.rwc, maxSendMessageSize, func() { done <- true }) go func() { start <- true @@ -495,7 +496,7 @@ func TestStreamResetAfterClose(t *testing.T) { client, server := getDetachedDataChannels(t) done := make(chan bool, 2) - clientStr := newStream(client.dc, client.rwc, func() { done <- true }) + clientStr := newStream(client.dc, client.rwc, maxSendMessageSize, func() { done <- true }) clientStr.Close() select { @@ -520,7 +521,7 @@ func TestStreamDataChannelCloseOnFINACK(t *testing.T) { client, server := getDetachedDataChannels(t) done := make(chan bool, 1) - clientStr := newStream(client.dc, client.rwc, func() { done <- true }) + clientStr := newStream(client.dc, client.rwc, maxSendMessageSize, func() { done <- true }) clientStr.Close() @@ -540,24 +541,35 @@ func TestStreamDataChannelCloseOnFINACK(t *testing.T) { } func TestStreamChunking(t *testing.T) { - client, server := getDetachedDataChannels(t) - - clientStr := newStream(client.dc, client.rwc, func() {}) - serverStr := newStream(server.dc, server.rwc, func() {}) - - const N = (16 << 10) + 1000 - go func() { - data := make([]byte, N) - _, err := clientStr.Write(data) - require.NoError(t, err) - }() - - data := make([]byte, N) - n, err := serverStr.Read(data) - require.NoError(t, err) - require.LessOrEqual(t, n, 16<<10) - - nn, err := serverStr.Read(data) - require.NoError(t, err) - require.Equal(t, nn+n, N) + for _, msgSize := range []int{16 << 10, 32 << 10, 64 << 10, 128 << 10, 256 << 10} { + t.Run(fmt.Sprintf("msgSize=%d", msgSize), func(t *testing.T) { + client, server := getDetachedDataChannels(t) + defer client.dc.Close() + defer server.dc.Close() + + clientStr := newStream(client.dc, client.rwc, msgSize, nil) + // server should read large messages even if it can only send 16 kB messages. + serverStr := newStream(server.dc, server.rwc, 16<<10, nil) + + N := msgSize + 1000 + input := make([]byte, N) + _, err := rand.Read(input) + require.NoError(t, err) + go func() { + n, err := clientStr.Write(input) + require.NoError(t, err) + require.Equal(t, n, len(input)) + }() + + data := make([]byte, N) + n, err := serverStr.Read(data) + require.NoError(t, err) + require.LessOrEqual(t, n, msgSize) + // shouldn't be much less than msgSize + require.GreaterOrEqual(t, n, msgSize-100) + _, err = serverStr.Read(data[n:]) + require.NoError(t, err) + require.Equal(t, input, data) + }) + } } diff --git a/p2p/transport/webrtc/stream_write.go b/p2p/transport/webrtc/stream_write.go index 01fddac331..8629955191 100644 --- a/p2p/transport/webrtc/stream_write.go +++ b/p2p/transport/webrtc/stream_write.go @@ -84,7 +84,7 @@ func (s *stream) Write(b []byte) (int, error) { s.mx.Lock() continue } - end := maxMessageSize + end := s.maxSendMessageSize if end > availableSpace { end = availableSpace } @@ -110,11 +110,24 @@ func (s *stream) SetWriteDeadline(t time.Time) error { return nil } +// sendBufferSize() is the maximum data we enqueue on the underlying data channel for writes. +// The underlying SCTP layer has an unbounded buffer for writes. We limit the amount enqueued +// per stream is limited to avoid a single stream monopolizing the entire connection. +func (s *stream) sendBufferSize() int { + return 2 * s.maxSendMessageSize +} + +// sendBufferLowThreshold() is the threshold below which we write more data on the underlying +// data channel. We want a notification as soon as we can write 1 full sized message. +func (s *stream) sendBufferLowThreshold() int { + return s.sendBufferSize() - s.maxSendMessageSize +} + func (s *stream) availableSendSpace() int { buffered := int(s.dataChannel.BufferedAmount()) - availableSpace := maxSendBuffer - buffered + availableSpace := s.sendBufferSize() - buffered if availableSpace+maxTotalControlMessagesSize < 0 { // this should never happen, but better check - log.Errorw("data channel buffered more data than the maximum amount", "max", maxSendBuffer, "buffered", buffered) + log.Errorw("data channel buffered more data than the maximum amount", "max", s.sendBufferSize(), "buffered", buffered) } return availableSpace } diff --git a/p2p/transport/webrtc/transport.go b/p2p/transport/webrtc/transport.go index 6f16fb729d..f3322c8117 100644 --- a/p2p/transport/webrtc/transport.go +++ b/p2p/transport/webrtc/transport.go @@ -68,7 +68,13 @@ const ( DefaultFailedTimeout = 30 * time.Second DefaultKeepaliveTimeout = 15 * time.Second - sctpReceiveBufferSize = 100_000 + // sctpReceiveBufferSize is the size of the buffer for incoming messages. + // + // This is enough space for enqueuing 10 full sized messages. + // Besides throughput, this only matters if an application is using multiple dependent + // streams, say streams 1 & 2. It reads from stream 1 only after receiving message from + // stream 2. A buffer of 10 messages should serve all such situations. + sctpReceiveBufferSize = 10 * maxReceiveMessageSize ) type WebRTCTransport struct { @@ -367,7 +373,7 @@ func (t *WebRTCTransport) dial(ctx context.Context, scope network.ConnManagement if err != nil { return nil, err } - channel := newStream(w.HandshakeDataChannel, detached, func() {}) + channel := newStream(w.HandshakeDataChannel, detached, maxSendMessageSize, nil) remotePubKey, err := t.noiseHandshake(ctx, w.PeerConnection, channel, p, remoteHashFunction, false) if err != nil { diff --git a/p2p/transport/webrtc/transport_test.go b/p2p/transport/webrtc/transport_test.go index 13b50412db..83f65c8c3f 100644 --- a/p2p/transport/webrtc/transport_test.go +++ b/p2p/transport/webrtc/transport_test.go @@ -546,7 +546,7 @@ func TestTransportWebRTC_Deadline(t *testing.T) { require.NoError(t, err) stream.SetWriteDeadline(time.Now().Add(100 * time.Millisecond)) - largeBuffer := make([]byte, 2*1024*1024) + largeBuffer := make([]byte, 20*1024*1024) _, err = stream.Write(largeBuffer) require.ErrorIs(t, err, os.ErrDeadlineExceeded) diff --git a/test-plans/go.mod b/test-plans/go.mod index d363d183b5..ef231457fe 100644 --- a/test-plans/go.mod +++ b/test-plans/go.mod @@ -59,22 +59,22 @@ require ( github.com/pion/datachannel v1.5.10 // indirect github.com/pion/dtls/v2 v2.2.12 // indirect github.com/pion/dtls/v3 v3.0.4 // indirect - github.com/pion/ice/v4 v4.0.6 // indirect + github.com/pion/ice/v4 v4.0.8 // indirect github.com/pion/interceptor v0.1.37 // indirect github.com/pion/logging v0.2.3 // indirect github.com/pion/mdns/v2 v2.0.7 // indirect github.com/pion/randutil v0.1.0 // indirect github.com/pion/rtcp v1.2.15 // indirect - github.com/pion/rtp v1.8.11 // indirect - github.com/pion/sctp v1.8.36 // indirect - github.com/pion/sdp/v3 v3.0.10 // indirect + github.com/pion/rtp v1.8.13 // indirect + github.com/pion/sctp v1.8.37 // indirect + github.com/pion/sdp/v3 v3.0.11 // indirect github.com/pion/srtp/v3 v3.0.4 // indirect github.com/pion/stun v0.6.1 // indirect github.com/pion/stun/v3 v3.0.0 // indirect github.com/pion/transport/v2 v2.2.10 // indirect github.com/pion/transport/v3 v3.0.7 // indirect github.com/pion/turn/v4 v4.0.0 // indirect - github.com/pion/webrtc/v4 v4.0.10 // indirect + github.com/pion/webrtc/v4 v4.0.14 // indirect github.com/prometheus/client_golang v1.21.0 // indirect github.com/prometheus/client_model v0.6.1 // indirect github.com/prometheus/common v0.62.0 // indirect diff --git a/test-plans/go.sum b/test-plans/go.sum index c5599a7949..365f617fff 100644 --- a/test-plans/go.sum +++ b/test-plans/go.sum @@ -193,8 +193,8 @@ github.com/pion/dtls/v2 v2.2.12 h1:KP7H5/c1EiVAAKUmXyCzPiQe5+bCJrpOeKg/L05dunk= github.com/pion/dtls/v2 v2.2.12/go.mod h1:d9SYc9fch0CqK90mRk1dC7AkzzpwJj6u2GU3u+9pqFE= github.com/pion/dtls/v3 v3.0.4 h1:44CZekewMzfrn9pmGrj5BNnTMDCFwr+6sLH+cCuLM7U= github.com/pion/dtls/v3 v3.0.4/go.mod h1:R373CsjxWqNPf6MEkfdy3aSe9niZvL/JaKlGeFphtMg= -github.com/pion/ice/v4 v4.0.6 h1:jmM9HwI9lfetQV/39uD0nY4y++XZNPhvzIPCb8EwxUM= -github.com/pion/ice/v4 v4.0.6/go.mod h1:y3M18aPhIxLlcO/4dn9X8LzLLSma84cx6emMSu14FGw= +github.com/pion/ice/v4 v4.0.8 h1:ajNx0idNG+S+v9Phu4LSn2cs8JEfTsA1/tEjkkAVpFY= +github.com/pion/ice/v4 v4.0.8/go.mod h1:y3M18aPhIxLlcO/4dn9X8LzLLSma84cx6emMSu14FGw= github.com/pion/interceptor v0.1.37 h1:aRA8Zpab/wE7/c0O3fh1PqY0AJI3fCSEM5lRWJVorwI= github.com/pion/interceptor v0.1.37/go.mod h1:JzxbJ4umVTlZAf+/utHzNesY8tmRkM2lVmkS82TTj8Y= github.com/pion/logging v0.2.2/go.mod h1:k0/tDVsRCX2Mb2ZEmTqNa7CWsQPc+YYCB7Q+5pahoms= @@ -206,12 +206,12 @@ github.com/pion/randutil v0.1.0 h1:CFG1UdESneORglEsnimhUjf33Rwjubwj6xfiOXBa3mA= github.com/pion/randutil v0.1.0/go.mod h1:XcJrSMMbbMRhASFVOlj/5hQial/Y8oH/HVo7TBZq+j8= github.com/pion/rtcp v1.2.15 h1:LZQi2JbdipLOj4eBjK4wlVoQWfrZbh3Q6eHtWtJBZBo= github.com/pion/rtcp v1.2.15/go.mod h1:jlGuAjHMEXwMUHK78RgX0UmEJFV4zUKOFHR7OP+D3D0= -github.com/pion/rtp v1.8.11 h1:17xjnY5WO5hgO6SD3/NTIUPvSFw/PbLsIJyz1r1yNIk= -github.com/pion/rtp v1.8.11/go.mod h1:8uMBJj32Pa1wwx8Fuv/AsFhn8jsgw+3rUC2PfoBZ8p4= -github.com/pion/sctp v1.8.36 h1:owNudmnz1xmhfYje5L/FCav3V9wpPRePHle3Zi+P+M0= -github.com/pion/sctp v1.8.36/go.mod h1:cNiLdchXra8fHQwmIoqw0MbLLMs+f7uQ+dGMG2gWebE= -github.com/pion/sdp/v3 v3.0.10 h1:6MChLE/1xYB+CjumMw+gZ9ufp2DPApuVSnDT8t5MIgA= -github.com/pion/sdp/v3 v3.0.10/go.mod h1:88GMahN5xnScv1hIMTqLdu/cOcUkj6a9ytbncwMCq2E= +github.com/pion/rtp v1.8.13 h1:8uSUPpjSL4OlwZI8Ygqu7+h2p9NPFB+yAZ461Xn5sNg= +github.com/pion/rtp v1.8.13/go.mod h1:8uMBJj32Pa1wwx8Fuv/AsFhn8jsgw+3rUC2PfoBZ8p4= +github.com/pion/sctp v1.8.37 h1:ZDmGPtRPX9mKCiVXtMbTWybFw3z/hVKAZgU81wcOrqs= +github.com/pion/sctp v1.8.37/go.mod h1:cNiLdchXra8fHQwmIoqw0MbLLMs+f7uQ+dGMG2gWebE= +github.com/pion/sdp/v3 v3.0.11 h1:VhgVSopdsBKwhCFoyyPmT1fKMeV9nLMrEKxNOdy3IVI= +github.com/pion/sdp/v3 v3.0.11/go.mod h1:88GMahN5xnScv1hIMTqLdu/cOcUkj6a9ytbncwMCq2E= github.com/pion/srtp/v3 v3.0.4 h1:2Z6vDVxzrX3UHEgrUyIGM4rRouoC7v+NiF1IHtp9B5M= github.com/pion/srtp/v3 v3.0.4/go.mod h1:1Jx3FwDoxpRaTh1oRV8A/6G1BnFL+QI82eK4ms8EEJQ= github.com/pion/stun v0.6.1 h1:8lp6YejULeHBF8NmV8e2787BogQhduZugh5PdhDyyN4= @@ -226,8 +226,8 @@ github.com/pion/transport/v3 v3.0.7 h1:iRbMH05BzSNwhILHoBoAPxoB9xQgOaJk+591KC9P1 github.com/pion/transport/v3 v3.0.7/go.mod h1:YleKiTZ4vqNxVwh77Z0zytYi7rXHl7j6uPLGhhz9rwo= github.com/pion/turn/v4 v4.0.0 h1:qxplo3Rxa9Yg1xXDxxH8xaqcyGUtbHYw4QSCvmFWvhM= github.com/pion/turn/v4 v4.0.0/go.mod h1:MuPDkm15nYSklKpN8vWJ9W2M0PlyQZqYt1McGuxG7mA= -github.com/pion/webrtc/v4 v4.0.10 h1:Hq/JLjhqLxi+NmCtE8lnRPDr8H4LcNvwg8OxVcdv56Q= -github.com/pion/webrtc/v4 v4.0.10/go.mod h1:ViHLVaNpiuvaH8pdiuQxuA9awuE6KVzAXx3vVWilOck= +github.com/pion/webrtc/v4 v4.0.14 h1:nyds/sFRR+HvmWoBa6wrL46sSfpArE0qR883MBW96lg= +github.com/pion/webrtc/v4 v4.0.14/go.mod h1:R3+qTnQTS03UzwDarYecgioNf7DYgTsldxnCXB821Kk= github.com/pkg/errors v0.8.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0= github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM= github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=