diff --git a/go.mod b/go.mod index 200a51437a..85bd16314a 100644 --- a/go.mod +++ b/go.mod @@ -47,11 +47,11 @@ require ( github.com/multiformats/go-varint v0.0.7 github.com/pbnjay/memory v0.0.0-20210728143218-7b4eea64cf58 github.com/pion/datachannel v1.5.10 - github.com/pion/ice/v4 v4.0.6 + github.com/pion/ice/v4 v4.0.8 github.com/pion/logging v0.2.3 - github.com/pion/sctp v1.8.36 + github.com/pion/sctp v1.8.37 github.com/pion/stun v0.6.1 - github.com/pion/webrtc/v4 v4.0.10 + github.com/pion/webrtc/v4 v4.0.14 github.com/prometheus/client_golang v1.21.0 github.com/prometheus/client_model v0.6.1 github.com/quic-go/quic-go v0.50.0 @@ -93,8 +93,8 @@ require ( github.com/pion/mdns/v2 v2.0.7 // indirect github.com/pion/randutil v0.1.0 // indirect github.com/pion/rtcp v1.2.15 // indirect - github.com/pion/rtp v1.8.11 // indirect - github.com/pion/sdp/v3 v3.0.10 // indirect + github.com/pion/rtp v1.8.13 // indirect + github.com/pion/sdp/v3 v3.0.11 // indirect github.com/pion/srtp/v3 v3.0.4 // indirect github.com/pion/stun/v3 v3.0.0 // indirect github.com/pion/transport/v2 v2.2.10 // indirect diff --git a/go.sum b/go.sum index 23e1a7c551..83ee8350e2 100644 --- a/go.sum +++ b/go.sum @@ -216,8 +216,8 @@ github.com/pion/dtls/v2 v2.2.12 h1:KP7H5/c1EiVAAKUmXyCzPiQe5+bCJrpOeKg/L05dunk= github.com/pion/dtls/v2 v2.2.12/go.mod h1:d9SYc9fch0CqK90mRk1dC7AkzzpwJj6u2GU3u+9pqFE= github.com/pion/dtls/v3 v3.0.4 h1:44CZekewMzfrn9pmGrj5BNnTMDCFwr+6sLH+cCuLM7U= github.com/pion/dtls/v3 v3.0.4/go.mod h1:R373CsjxWqNPf6MEkfdy3aSe9niZvL/JaKlGeFphtMg= -github.com/pion/ice/v4 v4.0.6 h1:jmM9HwI9lfetQV/39uD0nY4y++XZNPhvzIPCb8EwxUM= -github.com/pion/ice/v4 v4.0.6/go.mod h1:y3M18aPhIxLlcO/4dn9X8LzLLSma84cx6emMSu14FGw= +github.com/pion/ice/v4 v4.0.8 h1:ajNx0idNG+S+v9Phu4LSn2cs8JEfTsA1/tEjkkAVpFY= +github.com/pion/ice/v4 v4.0.8/go.mod h1:y3M18aPhIxLlcO/4dn9X8LzLLSma84cx6emMSu14FGw= github.com/pion/interceptor v0.1.37 h1:aRA8Zpab/wE7/c0O3fh1PqY0AJI3fCSEM5lRWJVorwI= github.com/pion/interceptor v0.1.37/go.mod h1:JzxbJ4umVTlZAf+/utHzNesY8tmRkM2lVmkS82TTj8Y= github.com/pion/logging v0.2.2/go.mod h1:k0/tDVsRCX2Mb2ZEmTqNa7CWsQPc+YYCB7Q+5pahoms= @@ -229,12 +229,12 @@ github.com/pion/randutil v0.1.0 h1:CFG1UdESneORglEsnimhUjf33Rwjubwj6xfiOXBa3mA= github.com/pion/randutil v0.1.0/go.mod h1:XcJrSMMbbMRhASFVOlj/5hQial/Y8oH/HVo7TBZq+j8= github.com/pion/rtcp v1.2.15 h1:LZQi2JbdipLOj4eBjK4wlVoQWfrZbh3Q6eHtWtJBZBo= github.com/pion/rtcp v1.2.15/go.mod h1:jlGuAjHMEXwMUHK78RgX0UmEJFV4zUKOFHR7OP+D3D0= -github.com/pion/rtp v1.8.11 h1:17xjnY5WO5hgO6SD3/NTIUPvSFw/PbLsIJyz1r1yNIk= -github.com/pion/rtp v1.8.11/go.mod h1:8uMBJj32Pa1wwx8Fuv/AsFhn8jsgw+3rUC2PfoBZ8p4= -github.com/pion/sctp v1.8.36 h1:owNudmnz1xmhfYje5L/FCav3V9wpPRePHle3Zi+P+M0= -github.com/pion/sctp v1.8.36/go.mod h1:cNiLdchXra8fHQwmIoqw0MbLLMs+f7uQ+dGMG2gWebE= -github.com/pion/sdp/v3 v3.0.10 h1:6MChLE/1xYB+CjumMw+gZ9ufp2DPApuVSnDT8t5MIgA= -github.com/pion/sdp/v3 v3.0.10/go.mod h1:88GMahN5xnScv1hIMTqLdu/cOcUkj6a9ytbncwMCq2E= +github.com/pion/rtp v1.8.13 h1:8uSUPpjSL4OlwZI8Ygqu7+h2p9NPFB+yAZ461Xn5sNg= +github.com/pion/rtp v1.8.13/go.mod h1:8uMBJj32Pa1wwx8Fuv/AsFhn8jsgw+3rUC2PfoBZ8p4= +github.com/pion/sctp v1.8.37 h1:ZDmGPtRPX9mKCiVXtMbTWybFw3z/hVKAZgU81wcOrqs= +github.com/pion/sctp v1.8.37/go.mod h1:cNiLdchXra8fHQwmIoqw0MbLLMs+f7uQ+dGMG2gWebE= +github.com/pion/sdp/v3 v3.0.11 h1:VhgVSopdsBKwhCFoyyPmT1fKMeV9nLMrEKxNOdy3IVI= +github.com/pion/sdp/v3 v3.0.11/go.mod h1:88GMahN5xnScv1hIMTqLdu/cOcUkj6a9ytbncwMCq2E= github.com/pion/srtp/v3 v3.0.4 h1:2Z6vDVxzrX3UHEgrUyIGM4rRouoC7v+NiF1IHtp9B5M= github.com/pion/srtp/v3 v3.0.4/go.mod h1:1Jx3FwDoxpRaTh1oRV8A/6G1BnFL+QI82eK4ms8EEJQ= github.com/pion/stun v0.6.1 h1:8lp6YejULeHBF8NmV8e2787BogQhduZugh5PdhDyyN4= @@ -249,8 +249,8 @@ github.com/pion/transport/v3 v3.0.7 h1:iRbMH05BzSNwhILHoBoAPxoB9xQgOaJk+591KC9P1 github.com/pion/transport/v3 v3.0.7/go.mod h1:YleKiTZ4vqNxVwh77Z0zytYi7rXHl7j6uPLGhhz9rwo= github.com/pion/turn/v4 v4.0.0 h1:qxplo3Rxa9Yg1xXDxxH8xaqcyGUtbHYw4QSCvmFWvhM= github.com/pion/turn/v4 v4.0.0/go.mod h1:MuPDkm15nYSklKpN8vWJ9W2M0PlyQZqYt1McGuxG7mA= -github.com/pion/webrtc/v4 v4.0.10 h1:Hq/JLjhqLxi+NmCtE8lnRPDr8H4LcNvwg8OxVcdv56Q= -github.com/pion/webrtc/v4 v4.0.10/go.mod h1:ViHLVaNpiuvaH8pdiuQxuA9awuE6KVzAXx3vVWilOck= +github.com/pion/webrtc/v4 v4.0.14 h1:nyds/sFRR+HvmWoBa6wrL46sSfpArE0qR883MBW96lg= +github.com/pion/webrtc/v4 v4.0.14/go.mod h1:R3+qTnQTS03UzwDarYecgioNf7DYgTsldxnCXB821Kk= github.com/pkg/errors v0.8.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0= github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM= github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= diff --git a/p2p/transport/webrtc/connection.go b/p2p/transport/webrtc/connection.go index 77b293fadb..d75c309c51 100644 --- a/p2p/transport/webrtc/connection.go +++ b/p2p/transport/webrtc/connection.go @@ -188,7 +188,7 @@ func (c *connection) OpenStream(ctx context.Context) (network.MuxedStream, error dc.Close() return nil, fmt.Errorf("detach channel failed for stream(%d): %w", streamID, err) } - str := newStream(dc, rwc, func() { c.removeStream(streamID) }) + str := newStream(dc, rwc, maxSendMessageSize, func() { c.removeStream(streamID) }) if err := c.addStream(str); err != nil { str.Reset() return nil, fmt.Errorf("failed to add stream(%d) to connection: %w", streamID, err) @@ -201,7 +201,7 @@ func (c *connection) AcceptStream() (network.MuxedStream, error) { case <-c.ctx.Done(): return nil, c.closeErr case dc := <-c.acceptQueue: - str := newStream(dc.channel, dc.stream, func() { c.removeStream(*dc.channel.ID()) }) + str := newStream(dc.channel, dc.stream, maxSendMessageSize, func() { c.removeStream(*dc.channel.ID()) }) if err := c.addStream(str); err != nil { str.Reset() return nil, err diff --git a/p2p/transport/webrtc/listener.go b/p2p/transport/webrtc/listener.go index 3534fd2c76..0ec05ec0e9 100644 --- a/p2p/transport/webrtc/listener.go +++ b/p2p/transport/webrtc/listener.go @@ -253,7 +253,7 @@ func (l *listener) setupConnection( if err != nil { return nil, err } - handshakeChannel := newStream(w.HandshakeDataChannel, rwc, func() {}) + handshakeChannel := newStream(w.HandshakeDataChannel, rwc, maxSendMessageSize, nil) // we do not yet know A's peer ID so accept any inbound remotePubKey, err := l.transport.noiseHandshake(ctx, w.PeerConnection, handshakeChannel, "", crypto.SHA256, true) if err != nil { diff --git a/p2p/transport/webrtc/stream.go b/p2p/transport/webrtc/stream.go index 39873c9d66..c92457ccb0 100644 --- a/p2p/transport/webrtc/stream.go +++ b/p2p/transport/webrtc/stream.go @@ -15,22 +15,9 @@ import ( ) const ( - // maxMessageSize is the maximum message size of the Protobuf message we send / receive. - maxMessageSize = 16384 - // maxSendBuffer is the maximum data we enqueue on the underlying data channel for writes. - // The underlying SCTP layer has an unbounded buffer for writes. We limit the amount enqueued - // per stream is limited to avoid a single stream monopolizing the entire connection. - maxSendBuffer = 2 * maxMessageSize - // sendBufferLowThreshold is the threshold below which we write more data on the underlying - // data channel. We want a notification as soon as we can write 1 full sized message. - sendBufferLowThreshold = maxSendBuffer - maxMessageSize - // maxTotalControlMessagesSize is the maximum total size of all control messages we will - // write on this stream. - // 4 control messages of size 10 bytes + 10 bytes buffer. This number doesn't need to be - // exact. In the worst case, we enqueue these many bytes more in the webrtc peer connection - // send queue. - maxTotalControlMessagesSize = 50 - + // maxSendMessageSize is the maximum message size of the Protobuf message we send / receive. + // NOTE: Change `varintOverhead` if you change this. + maxSendMessageSize = 16384 // Proto overhead assumption is 5 bytes protoOverhead = 5 // Varint overhead is assumed to be 2 bytes. This is safe since @@ -40,9 +27,20 @@ const ( // is less than or equal to 2 ^ 14, the varint will not be more than // 2 bytes in length. varintOverhead = 2 + + // maxTotalControlMessagesSize is the maximum total size of all control messages we will + // write on this stream. + // 4 control messages of size 10 bytes + 10 bytes buffer. This number doesn't need to be + // exact. In the worst case, we enqueue these many bytes more in the webrtc peer connection + // send queue. + maxTotalControlMessagesSize = 50 + // maxFINACKWait is the maximum amount of time a stream will wait to read // FIN_ACK before closing the data channel maxFINACKWait = 10 * time.Second + + // maxReceiveMessageSize is the maximum message size of the Protobuf message we receive. + maxReceiveMessageSize = 256<<10 + 1<<10 // 1kB buffer ) type receiveState uint8 @@ -79,11 +77,12 @@ type stream struct { nextMessage *pb.Message receiveState receiveState - writer pbio.Writer // concurrent writes prevented by mx - writeStateChanged chan struct{} - sendState sendState - writeDeadline time.Time - writeError error + writer pbio.Writer // concurrent writes prevented by mx + writeStateChanged chan struct{} + sendState sendState + writeDeadline time.Time + writeError error + maxSendMessageSize int controlMessageReaderOnce sync.Once // controlMessageReaderEndTime is the end time for reading FIN_ACK from the control @@ -105,20 +104,21 @@ var _ network.MuxedStream = &stream{} func newStream( channel *webrtc.DataChannel, rwc datachannel.ReadWriteCloser, + maxSendMessageSize int, onDone func(), ) *stream { s := &stream{ - reader: pbio.NewDelimitedReader(rwc, maxMessageSize), - writer: pbio.NewDelimitedWriter(rwc), - writeStateChanged: make(chan struct{}, 1), - id: *channel.ID(), - dataChannel: rwc.(*datachannel.DataChannel), - onDone: onDone, + reader: pbio.NewDelimitedReader(rwc, maxReceiveMessageSize), + writer: pbio.NewDelimitedWriter(rwc), + writeStateChanged: make(chan struct{}, 1), + id: *channel.ID(), + dataChannel: rwc.(*datachannel.DataChannel), + onDone: onDone, + maxSendMessageSize: maxSendMessageSize, } - s.dataChannel.SetBufferedAmountLowThreshold(sendBufferLowThreshold) + s.dataChannel.SetBufferedAmountLowThreshold(uint64(s.sendBufferLowThreshold())) s.dataChannel.OnBufferedAmountLow(func() { s.notifyWriteStateChanged() - }) return s } diff --git a/p2p/transport/webrtc/stream_test.go b/p2p/transport/webrtc/stream_test.go index 4153ff830a..461ed27ff8 100644 --- a/p2p/transport/webrtc/stream_test.go +++ b/p2p/transport/webrtc/stream_test.go @@ -3,6 +3,7 @@ package libp2pwebrtc import ( "crypto/rand" "errors" + "fmt" "io" "os" "sync/atomic" @@ -148,8 +149,8 @@ func TestStreamSimpleReadWriteClose(t *testing.T) { client, server := getDetachedDataChannels(t) var clientDone, serverDone atomic.Bool - clientStr := newStream(client.dc, client.rwc, func() { clientDone.Store(true) }) - serverStr := newStream(server.dc, server.rwc, func() { serverDone.Store(true) }) + clientStr := newStream(client.dc, client.rwc, maxSendMessageSize, func() { clientDone.Store(true) }) + serverStr := newStream(server.dc, server.rwc, maxSendMessageSize, func() { serverDone.Store(true) }) // send a foobar from the client n, err := clientStr.Write([]byte("foobar")) @@ -194,8 +195,8 @@ func TestStreamSimpleReadWriteClose(t *testing.T) { func TestStreamPartialReads(t *testing.T) { client, server := getDetachedDataChannels(t) - clientStr := newStream(client.dc, client.rwc, func() {}) - serverStr := newStream(server.dc, server.rwc, func() {}) + clientStr := newStream(client.dc, client.rwc, maxSendMessageSize, func() {}) + serverStr := newStream(server.dc, server.rwc, maxSendMessageSize, func() {}) _, err := serverStr.Write([]byte("foobar")) require.NoError(t, err) @@ -217,8 +218,8 @@ func TestStreamPartialReads(t *testing.T) { func TestStreamSkipEmptyFrames(t *testing.T) { client, server := getDetachedDataChannels(t) - clientStr := newStream(client.dc, client.rwc, func() {}) - serverStr := newStream(server.dc, server.rwc, func() {}) + clientStr := newStream(client.dc, client.rwc, maxSendMessageSize, func() {}) + serverStr := newStream(server.dc, server.rwc, maxSendMessageSize, func() {}) for i := 0; i < 10; i++ { require.NoError(t, serverStr.writer.WriteMsg(&pb.Message{})) @@ -252,7 +253,7 @@ func TestStreamSkipEmptyFrames(t *testing.T) { func TestStreamReadReturnsOnClose(t *testing.T) { client, _ := getDetachedDataChannels(t) - clientStr := newStream(client.dc, client.rwc, func() {}) + clientStr := newStream(client.dc, client.rwc, maxSendMessageSize, func() {}) errChan := make(chan error, 1) go func() { _, err := clientStr.Read([]byte{0}) @@ -275,8 +276,8 @@ func TestStreamResets(t *testing.T) { client, server := getDetachedDataChannels(t) var clientDone, serverDone atomic.Bool - clientStr := newStream(client.dc, client.rwc, func() { clientDone.Store(true) }) - serverStr := newStream(server.dc, server.rwc, func() { serverDone.Store(true) }) + clientStr := newStream(client.dc, client.rwc, maxSendMessageSize, func() { clientDone.Store(true) }) + serverStr := newStream(server.dc, server.rwc, maxSendMessageSize, func() { serverDone.Store(true) }) // send a foobar from the client _, err := clientStr.Write([]byte("foobar")) @@ -311,8 +312,8 @@ func TestStreamResets(t *testing.T) { func TestStreamReadDeadlineAsync(t *testing.T) { client, server := getDetachedDataChannels(t) - clientStr := newStream(client.dc, client.rwc, func() {}) - serverStr := newStream(server.dc, server.rwc, func() {}) + clientStr := newStream(client.dc, client.rwc, maxSendMessageSize, func() {}) + serverStr := newStream(server.dc, server.rwc, maxSendMessageSize, func() {}) timeout := 100 * time.Millisecond if os.Getenv("CI") != "" { @@ -342,8 +343,8 @@ func TestStreamReadDeadlineAsync(t *testing.T) { func TestStreamWriteDeadlineAsync(t *testing.T) { client, server := getDetachedDataChannels(t) - clientStr := newStream(client.dc, client.rwc, func() {}) - serverStr := newStream(server.dc, server.rwc, func() {}) + clientStr := newStream(client.dc, client.rwc, maxSendMessageSize, func() {}) + serverStr := newStream(server.dc, server.rwc, maxSendMessageSize, func() {}) _ = serverStr b := make([]byte, 1024) @@ -372,8 +373,8 @@ func TestStreamWriteDeadlineAsync(t *testing.T) { func TestStreamReadAfterClose(t *testing.T) { client, server := getDetachedDataChannels(t) - clientStr := newStream(client.dc, client.rwc, func() {}) - serverStr := newStream(server.dc, server.rwc, func() {}) + clientStr := newStream(client.dc, client.rwc, maxSendMessageSize, func() {}) + serverStr := newStream(server.dc, server.rwc, maxSendMessageSize, func() {}) serverStr.Close() b := make([]byte, 1) @@ -384,8 +385,8 @@ func TestStreamReadAfterClose(t *testing.T) { client, server = getDetachedDataChannels(t) - clientStr = newStream(client.dc, client.rwc, func() {}) - serverStr = newStream(server.dc, server.rwc, func() {}) + clientStr = newStream(client.dc, client.rwc, maxSendMessageSize, func() {}) + serverStr = newStream(server.dc, server.rwc, maxSendMessageSize, func() {}) serverStr.Reset() b = make([]byte, 1) @@ -399,8 +400,8 @@ func TestStreamCloseAfterFINACK(t *testing.T) { client, server := getDetachedDataChannels(t) done := make(chan bool, 1) - clientStr := newStream(client.dc, client.rwc, func() { done <- true }) - serverStr := newStream(server.dc, server.rwc, func() {}) + clientStr := newStream(client.dc, client.rwc, maxSendMessageSize, func() { done <- true }) + serverStr := newStream(server.dc, server.rwc, maxSendMessageSize, func() {}) go func() { err := clientStr.Close() @@ -427,8 +428,8 @@ func TestStreamFinAckAfterStopSending(t *testing.T) { client, server := getDetachedDataChannels(t) done := make(chan bool, 1) - clientStr := newStream(client.dc, client.rwc, func() { done <- true }) - serverStr := newStream(server.dc, server.rwc, func() {}) + clientStr := newStream(client.dc, client.rwc, maxSendMessageSize, func() { done <- true }) + serverStr := newStream(server.dc, server.rwc, maxSendMessageSize, func() {}) go func() { clientStr.CloseRead() @@ -460,8 +461,8 @@ func TestStreamConcurrentClose(t *testing.T) { start := make(chan bool, 2) done := make(chan bool, 2) - clientStr := newStream(client.dc, client.rwc, func() { done <- true }) - serverStr := newStream(server.dc, server.rwc, func() { done <- true }) + clientStr := newStream(client.dc, client.rwc, maxSendMessageSize, func() { done <- true }) + serverStr := newStream(server.dc, server.rwc, maxSendMessageSize, func() { done <- true }) go func() { start <- true @@ -495,7 +496,7 @@ func TestStreamResetAfterClose(t *testing.T) { client, server := getDetachedDataChannels(t) done := make(chan bool, 2) - clientStr := newStream(client.dc, client.rwc, func() { done <- true }) + clientStr := newStream(client.dc, client.rwc, maxSendMessageSize, func() { done <- true }) clientStr.Close() select { @@ -520,7 +521,7 @@ func TestStreamDataChannelCloseOnFINACK(t *testing.T) { client, server := getDetachedDataChannels(t) done := make(chan bool, 1) - clientStr := newStream(client.dc, client.rwc, func() { done <- true }) + clientStr := newStream(client.dc, client.rwc, maxSendMessageSize, func() { done <- true }) clientStr.Close() @@ -540,24 +541,35 @@ func TestStreamDataChannelCloseOnFINACK(t *testing.T) { } func TestStreamChunking(t *testing.T) { - client, server := getDetachedDataChannels(t) - - clientStr := newStream(client.dc, client.rwc, func() {}) - serverStr := newStream(server.dc, server.rwc, func() {}) - - const N = (16 << 10) + 1000 - go func() { - data := make([]byte, N) - _, err := clientStr.Write(data) - require.NoError(t, err) - }() - - data := make([]byte, N) - n, err := serverStr.Read(data) - require.NoError(t, err) - require.LessOrEqual(t, n, 16<<10) - - nn, err := serverStr.Read(data) - require.NoError(t, err) - require.Equal(t, nn+n, N) + for _, msgSize := range []int{16 << 10, 32 << 10, 64 << 10, 128 << 10, 256 << 10} { + t.Run(fmt.Sprintf("msgSize=%d", msgSize), func(t *testing.T) { + client, server := getDetachedDataChannels(t) + defer client.dc.Close() + defer server.dc.Close() + + clientStr := newStream(client.dc, client.rwc, msgSize, nil) + // server should read large messages even if it can only send 16 kB messages. + serverStr := newStream(server.dc, server.rwc, 16<<10, nil) + + N := msgSize + 1000 + input := make([]byte, N) + _, err := rand.Read(input) + require.NoError(t, err) + go func() { + n, err := clientStr.Write(input) + require.NoError(t, err) + require.Equal(t, n, len(input)) + }() + + data := make([]byte, N) + n, err := serverStr.Read(data) + require.NoError(t, err) + require.LessOrEqual(t, n, msgSize) + // shouldn't be much less than msgSize + require.GreaterOrEqual(t, n, msgSize-100) + _, err = serverStr.Read(data[n:]) + require.NoError(t, err) + require.Equal(t, input, data) + }) + } } diff --git a/p2p/transport/webrtc/stream_write.go b/p2p/transport/webrtc/stream_write.go index 01fddac331..8629955191 100644 --- a/p2p/transport/webrtc/stream_write.go +++ b/p2p/transport/webrtc/stream_write.go @@ -84,7 +84,7 @@ func (s *stream) Write(b []byte) (int, error) { s.mx.Lock() continue } - end := maxMessageSize + end := s.maxSendMessageSize if end > availableSpace { end = availableSpace } @@ -110,11 +110,24 @@ func (s *stream) SetWriteDeadline(t time.Time) error { return nil } +// sendBufferSize() is the maximum data we enqueue on the underlying data channel for writes. +// The underlying SCTP layer has an unbounded buffer for writes. We limit the amount enqueued +// per stream is limited to avoid a single stream monopolizing the entire connection. +func (s *stream) sendBufferSize() int { + return 2 * s.maxSendMessageSize +} + +// sendBufferLowThreshold() is the threshold below which we write more data on the underlying +// data channel. We want a notification as soon as we can write 1 full sized message. +func (s *stream) sendBufferLowThreshold() int { + return s.sendBufferSize() - s.maxSendMessageSize +} + func (s *stream) availableSendSpace() int { buffered := int(s.dataChannel.BufferedAmount()) - availableSpace := maxSendBuffer - buffered + availableSpace := s.sendBufferSize() - buffered if availableSpace+maxTotalControlMessagesSize < 0 { // this should never happen, but better check - log.Errorw("data channel buffered more data than the maximum amount", "max", maxSendBuffer, "buffered", buffered) + log.Errorw("data channel buffered more data than the maximum amount", "max", s.sendBufferSize(), "buffered", buffered) } return availableSpace } diff --git a/p2p/transport/webrtc/transport.go b/p2p/transport/webrtc/transport.go index 6f16fb729d..f3322c8117 100644 --- a/p2p/transport/webrtc/transport.go +++ b/p2p/transport/webrtc/transport.go @@ -68,7 +68,13 @@ const ( DefaultFailedTimeout = 30 * time.Second DefaultKeepaliveTimeout = 15 * time.Second - sctpReceiveBufferSize = 100_000 + // sctpReceiveBufferSize is the size of the buffer for incoming messages. + // + // This is enough space for enqueuing 10 full sized messages. + // Besides throughput, this only matters if an application is using multiple dependent + // streams, say streams 1 & 2. It reads from stream 1 only after receiving message from + // stream 2. A buffer of 10 messages should serve all such situations. + sctpReceiveBufferSize = 10 * maxReceiveMessageSize ) type WebRTCTransport struct { @@ -367,7 +373,7 @@ func (t *WebRTCTransport) dial(ctx context.Context, scope network.ConnManagement if err != nil { return nil, err } - channel := newStream(w.HandshakeDataChannel, detached, func() {}) + channel := newStream(w.HandshakeDataChannel, detached, maxSendMessageSize, nil) remotePubKey, err := t.noiseHandshake(ctx, w.PeerConnection, channel, p, remoteHashFunction, false) if err != nil { diff --git a/p2p/transport/webrtc/transport_test.go b/p2p/transport/webrtc/transport_test.go index 13b50412db..83f65c8c3f 100644 --- a/p2p/transport/webrtc/transport_test.go +++ b/p2p/transport/webrtc/transport_test.go @@ -546,7 +546,7 @@ func TestTransportWebRTC_Deadline(t *testing.T) { require.NoError(t, err) stream.SetWriteDeadline(time.Now().Add(100 * time.Millisecond)) - largeBuffer := make([]byte, 2*1024*1024) + largeBuffer := make([]byte, 20*1024*1024) _, err = stream.Write(largeBuffer) require.ErrorIs(t, err, os.ErrDeadlineExceeded) diff --git a/test-plans/go.mod b/test-plans/go.mod index d363d183b5..ef231457fe 100644 --- a/test-plans/go.mod +++ b/test-plans/go.mod @@ -59,22 +59,22 @@ require ( github.com/pion/datachannel v1.5.10 // indirect github.com/pion/dtls/v2 v2.2.12 // indirect github.com/pion/dtls/v3 v3.0.4 // indirect - github.com/pion/ice/v4 v4.0.6 // indirect + github.com/pion/ice/v4 v4.0.8 // indirect github.com/pion/interceptor v0.1.37 // indirect github.com/pion/logging v0.2.3 // indirect github.com/pion/mdns/v2 v2.0.7 // indirect github.com/pion/randutil v0.1.0 // indirect github.com/pion/rtcp v1.2.15 // indirect - github.com/pion/rtp v1.8.11 // indirect - github.com/pion/sctp v1.8.36 // indirect - github.com/pion/sdp/v3 v3.0.10 // indirect + github.com/pion/rtp v1.8.13 // indirect + github.com/pion/sctp v1.8.37 // indirect + github.com/pion/sdp/v3 v3.0.11 // indirect github.com/pion/srtp/v3 v3.0.4 // indirect github.com/pion/stun v0.6.1 // indirect github.com/pion/stun/v3 v3.0.0 // indirect github.com/pion/transport/v2 v2.2.10 // indirect github.com/pion/transport/v3 v3.0.7 // indirect github.com/pion/turn/v4 v4.0.0 // indirect - github.com/pion/webrtc/v4 v4.0.10 // indirect + github.com/pion/webrtc/v4 v4.0.14 // indirect github.com/prometheus/client_golang v1.21.0 // indirect github.com/prometheus/client_model v0.6.1 // indirect github.com/prometheus/common v0.62.0 // indirect diff --git a/test-plans/go.sum b/test-plans/go.sum index c5599a7949..365f617fff 100644 --- a/test-plans/go.sum +++ b/test-plans/go.sum @@ -193,8 +193,8 @@ github.com/pion/dtls/v2 v2.2.12 h1:KP7H5/c1EiVAAKUmXyCzPiQe5+bCJrpOeKg/L05dunk= github.com/pion/dtls/v2 v2.2.12/go.mod h1:d9SYc9fch0CqK90mRk1dC7AkzzpwJj6u2GU3u+9pqFE= github.com/pion/dtls/v3 v3.0.4 h1:44CZekewMzfrn9pmGrj5BNnTMDCFwr+6sLH+cCuLM7U= github.com/pion/dtls/v3 v3.0.4/go.mod h1:R373CsjxWqNPf6MEkfdy3aSe9niZvL/JaKlGeFphtMg= -github.com/pion/ice/v4 v4.0.6 h1:jmM9HwI9lfetQV/39uD0nY4y++XZNPhvzIPCb8EwxUM= -github.com/pion/ice/v4 v4.0.6/go.mod h1:y3M18aPhIxLlcO/4dn9X8LzLLSma84cx6emMSu14FGw= +github.com/pion/ice/v4 v4.0.8 h1:ajNx0idNG+S+v9Phu4LSn2cs8JEfTsA1/tEjkkAVpFY= +github.com/pion/ice/v4 v4.0.8/go.mod h1:y3M18aPhIxLlcO/4dn9X8LzLLSma84cx6emMSu14FGw= github.com/pion/interceptor v0.1.37 h1:aRA8Zpab/wE7/c0O3fh1PqY0AJI3fCSEM5lRWJVorwI= github.com/pion/interceptor v0.1.37/go.mod h1:JzxbJ4umVTlZAf+/utHzNesY8tmRkM2lVmkS82TTj8Y= github.com/pion/logging v0.2.2/go.mod h1:k0/tDVsRCX2Mb2ZEmTqNa7CWsQPc+YYCB7Q+5pahoms= @@ -206,12 +206,12 @@ github.com/pion/randutil v0.1.0 h1:CFG1UdESneORglEsnimhUjf33Rwjubwj6xfiOXBa3mA= github.com/pion/randutil v0.1.0/go.mod h1:XcJrSMMbbMRhASFVOlj/5hQial/Y8oH/HVo7TBZq+j8= github.com/pion/rtcp v1.2.15 h1:LZQi2JbdipLOj4eBjK4wlVoQWfrZbh3Q6eHtWtJBZBo= github.com/pion/rtcp v1.2.15/go.mod h1:jlGuAjHMEXwMUHK78RgX0UmEJFV4zUKOFHR7OP+D3D0= -github.com/pion/rtp v1.8.11 h1:17xjnY5WO5hgO6SD3/NTIUPvSFw/PbLsIJyz1r1yNIk= -github.com/pion/rtp v1.8.11/go.mod h1:8uMBJj32Pa1wwx8Fuv/AsFhn8jsgw+3rUC2PfoBZ8p4= -github.com/pion/sctp v1.8.36 h1:owNudmnz1xmhfYje5L/FCav3V9wpPRePHle3Zi+P+M0= -github.com/pion/sctp v1.8.36/go.mod h1:cNiLdchXra8fHQwmIoqw0MbLLMs+f7uQ+dGMG2gWebE= -github.com/pion/sdp/v3 v3.0.10 h1:6MChLE/1xYB+CjumMw+gZ9ufp2DPApuVSnDT8t5MIgA= -github.com/pion/sdp/v3 v3.0.10/go.mod h1:88GMahN5xnScv1hIMTqLdu/cOcUkj6a9ytbncwMCq2E= +github.com/pion/rtp v1.8.13 h1:8uSUPpjSL4OlwZI8Ygqu7+h2p9NPFB+yAZ461Xn5sNg= +github.com/pion/rtp v1.8.13/go.mod h1:8uMBJj32Pa1wwx8Fuv/AsFhn8jsgw+3rUC2PfoBZ8p4= +github.com/pion/sctp v1.8.37 h1:ZDmGPtRPX9mKCiVXtMbTWybFw3z/hVKAZgU81wcOrqs= +github.com/pion/sctp v1.8.37/go.mod h1:cNiLdchXra8fHQwmIoqw0MbLLMs+f7uQ+dGMG2gWebE= +github.com/pion/sdp/v3 v3.0.11 h1:VhgVSopdsBKwhCFoyyPmT1fKMeV9nLMrEKxNOdy3IVI= +github.com/pion/sdp/v3 v3.0.11/go.mod h1:88GMahN5xnScv1hIMTqLdu/cOcUkj6a9ytbncwMCq2E= github.com/pion/srtp/v3 v3.0.4 h1:2Z6vDVxzrX3UHEgrUyIGM4rRouoC7v+NiF1IHtp9B5M= github.com/pion/srtp/v3 v3.0.4/go.mod h1:1Jx3FwDoxpRaTh1oRV8A/6G1BnFL+QI82eK4ms8EEJQ= github.com/pion/stun v0.6.1 h1:8lp6YejULeHBF8NmV8e2787BogQhduZugh5PdhDyyN4= @@ -226,8 +226,8 @@ github.com/pion/transport/v3 v3.0.7 h1:iRbMH05BzSNwhILHoBoAPxoB9xQgOaJk+591KC9P1 github.com/pion/transport/v3 v3.0.7/go.mod h1:YleKiTZ4vqNxVwh77Z0zytYi7rXHl7j6uPLGhhz9rwo= github.com/pion/turn/v4 v4.0.0 h1:qxplo3Rxa9Yg1xXDxxH8xaqcyGUtbHYw4QSCvmFWvhM= github.com/pion/turn/v4 v4.0.0/go.mod h1:MuPDkm15nYSklKpN8vWJ9W2M0PlyQZqYt1McGuxG7mA= -github.com/pion/webrtc/v4 v4.0.10 h1:Hq/JLjhqLxi+NmCtE8lnRPDr8H4LcNvwg8OxVcdv56Q= -github.com/pion/webrtc/v4 v4.0.10/go.mod h1:ViHLVaNpiuvaH8pdiuQxuA9awuE6KVzAXx3vVWilOck= +github.com/pion/webrtc/v4 v4.0.14 h1:nyds/sFRR+HvmWoBa6wrL46sSfpArE0qR883MBW96lg= +github.com/pion/webrtc/v4 v4.0.14/go.mod h1:R3+qTnQTS03UzwDarYecgioNf7DYgTsldxnCXB821Kk= github.com/pkg/errors v0.8.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0= github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM= github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=