Skip to content

Commit 15ab910

Browse files
committed
webrtc: support receiving 256kB messages
In experiments with js we've found that increasing the message size increases throughput. See: libp2p/specs#628 (comment) for details. This changes the protobuf reader for the stream to read 256kB messages. This also forces a change to the connection SCTP read buffer to be increased to about 2.5 MB, to support 1 message being buffered for 10 streams. This isn't enough to support larger messages. We most likely need to change the inferred SDP of the server to use 256kB maxMessageSize, and need some backwards compatible mechanism in the handshake to opt in to large messages. See: libp2p/specs#628 for details
1 parent 61f03f4 commit 15ab910

File tree

11 files changed

+133
-113
lines changed

11 files changed

+133
-113
lines changed

go.mod

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -47,11 +47,11 @@ require (
4747
github.com/multiformats/go-varint v0.0.7
4848
github.com/pbnjay/memory v0.0.0-20210728143218-7b4eea64cf58
4949
github.com/pion/datachannel v1.5.10
50-
github.com/pion/ice/v4 v4.0.6
50+
github.com/pion/ice/v4 v4.0.8
5151
github.com/pion/logging v0.2.3
52-
github.com/pion/sctp v1.8.36
52+
github.com/pion/sctp v1.8.37
5353
github.com/pion/stun v0.6.1
54-
github.com/pion/webrtc/v4 v4.0.10
54+
github.com/pion/webrtc/v4 v4.0.14
5555
github.com/prometheus/client_golang v1.21.0
5656
github.com/prometheus/client_model v0.6.1
5757
github.com/quic-go/quic-go v0.50.0
@@ -93,8 +93,8 @@ require (
9393
github.com/pion/mdns/v2 v2.0.7 // indirect
9494
github.com/pion/randutil v0.1.0 // indirect
9595
github.com/pion/rtcp v1.2.15 // indirect
96-
github.com/pion/rtp v1.8.11 // indirect
97-
github.com/pion/sdp/v3 v3.0.10 // indirect
96+
github.com/pion/rtp v1.8.13 // indirect
97+
github.com/pion/sdp/v3 v3.0.11 // indirect
9898
github.com/pion/srtp/v3 v3.0.4 // indirect
9999
github.com/pion/stun/v3 v3.0.0 // indirect
100100
github.com/pion/transport/v2 v2.2.10 // indirect

go.sum

Lines changed: 10 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -216,8 +216,8 @@ github.com/pion/dtls/v2 v2.2.12 h1:KP7H5/c1EiVAAKUmXyCzPiQe5+bCJrpOeKg/L05dunk=
216216
github.com/pion/dtls/v2 v2.2.12/go.mod h1:d9SYc9fch0CqK90mRk1dC7AkzzpwJj6u2GU3u+9pqFE=
217217
github.com/pion/dtls/v3 v3.0.4 h1:44CZekewMzfrn9pmGrj5BNnTMDCFwr+6sLH+cCuLM7U=
218218
github.com/pion/dtls/v3 v3.0.4/go.mod h1:R373CsjxWqNPf6MEkfdy3aSe9niZvL/JaKlGeFphtMg=
219-
github.com/pion/ice/v4 v4.0.6 h1:jmM9HwI9lfetQV/39uD0nY4y++XZNPhvzIPCb8EwxUM=
220-
github.com/pion/ice/v4 v4.0.6/go.mod h1:y3M18aPhIxLlcO/4dn9X8LzLLSma84cx6emMSu14FGw=
219+
github.com/pion/ice/v4 v4.0.8 h1:ajNx0idNG+S+v9Phu4LSn2cs8JEfTsA1/tEjkkAVpFY=
220+
github.com/pion/ice/v4 v4.0.8/go.mod h1:y3M18aPhIxLlcO/4dn9X8LzLLSma84cx6emMSu14FGw=
221221
github.com/pion/interceptor v0.1.37 h1:aRA8Zpab/wE7/c0O3fh1PqY0AJI3fCSEM5lRWJVorwI=
222222
github.com/pion/interceptor v0.1.37/go.mod h1:JzxbJ4umVTlZAf+/utHzNesY8tmRkM2lVmkS82TTj8Y=
223223
github.com/pion/logging v0.2.2/go.mod h1:k0/tDVsRCX2Mb2ZEmTqNa7CWsQPc+YYCB7Q+5pahoms=
@@ -229,12 +229,12 @@ github.com/pion/randutil v0.1.0 h1:CFG1UdESneORglEsnimhUjf33Rwjubwj6xfiOXBa3mA=
229229
github.com/pion/randutil v0.1.0/go.mod h1:XcJrSMMbbMRhASFVOlj/5hQial/Y8oH/HVo7TBZq+j8=
230230
github.com/pion/rtcp v1.2.15 h1:LZQi2JbdipLOj4eBjK4wlVoQWfrZbh3Q6eHtWtJBZBo=
231231
github.com/pion/rtcp v1.2.15/go.mod h1:jlGuAjHMEXwMUHK78RgX0UmEJFV4zUKOFHR7OP+D3D0=
232-
github.com/pion/rtp v1.8.11 h1:17xjnY5WO5hgO6SD3/NTIUPvSFw/PbLsIJyz1r1yNIk=
233-
github.com/pion/rtp v1.8.11/go.mod h1:8uMBJj32Pa1wwx8Fuv/AsFhn8jsgw+3rUC2PfoBZ8p4=
234-
github.com/pion/sctp v1.8.36 h1:owNudmnz1xmhfYje5L/FCav3V9wpPRePHle3Zi+P+M0=
235-
github.com/pion/sctp v1.8.36/go.mod h1:cNiLdchXra8fHQwmIoqw0MbLLMs+f7uQ+dGMG2gWebE=
236-
github.com/pion/sdp/v3 v3.0.10 h1:6MChLE/1xYB+CjumMw+gZ9ufp2DPApuVSnDT8t5MIgA=
237-
github.com/pion/sdp/v3 v3.0.10/go.mod h1:88GMahN5xnScv1hIMTqLdu/cOcUkj6a9ytbncwMCq2E=
232+
github.com/pion/rtp v1.8.13 h1:8uSUPpjSL4OlwZI8Ygqu7+h2p9NPFB+yAZ461Xn5sNg=
233+
github.com/pion/rtp v1.8.13/go.mod h1:8uMBJj32Pa1wwx8Fuv/AsFhn8jsgw+3rUC2PfoBZ8p4=
234+
github.com/pion/sctp v1.8.37 h1:ZDmGPtRPX9mKCiVXtMbTWybFw3z/hVKAZgU81wcOrqs=
235+
github.com/pion/sctp v1.8.37/go.mod h1:cNiLdchXra8fHQwmIoqw0MbLLMs+f7uQ+dGMG2gWebE=
236+
github.com/pion/sdp/v3 v3.0.11 h1:VhgVSopdsBKwhCFoyyPmT1fKMeV9nLMrEKxNOdy3IVI=
237+
github.com/pion/sdp/v3 v3.0.11/go.mod h1:88GMahN5xnScv1hIMTqLdu/cOcUkj6a9ytbncwMCq2E=
238238
github.com/pion/srtp/v3 v3.0.4 h1:2Z6vDVxzrX3UHEgrUyIGM4rRouoC7v+NiF1IHtp9B5M=
239239
github.com/pion/srtp/v3 v3.0.4/go.mod h1:1Jx3FwDoxpRaTh1oRV8A/6G1BnFL+QI82eK4ms8EEJQ=
240240
github.com/pion/stun v0.6.1 h1:8lp6YejULeHBF8NmV8e2787BogQhduZugh5PdhDyyN4=
@@ -249,8 +249,8 @@ github.com/pion/transport/v3 v3.0.7 h1:iRbMH05BzSNwhILHoBoAPxoB9xQgOaJk+591KC9P1
249249
github.com/pion/transport/v3 v3.0.7/go.mod h1:YleKiTZ4vqNxVwh77Z0zytYi7rXHl7j6uPLGhhz9rwo=
250250
github.com/pion/turn/v4 v4.0.0 h1:qxplo3Rxa9Yg1xXDxxH8xaqcyGUtbHYw4QSCvmFWvhM=
251251
github.com/pion/turn/v4 v4.0.0/go.mod h1:MuPDkm15nYSklKpN8vWJ9W2M0PlyQZqYt1McGuxG7mA=
252-
github.com/pion/webrtc/v4 v4.0.10 h1:Hq/JLjhqLxi+NmCtE8lnRPDr8H4LcNvwg8OxVcdv56Q=
253-
github.com/pion/webrtc/v4 v4.0.10/go.mod h1:ViHLVaNpiuvaH8pdiuQxuA9awuE6KVzAXx3vVWilOck=
252+
github.com/pion/webrtc/v4 v4.0.14 h1:nyds/sFRR+HvmWoBa6wrL46sSfpArE0qR883MBW96lg=
253+
github.com/pion/webrtc/v4 v4.0.14/go.mod h1:R3+qTnQTS03UzwDarYecgioNf7DYgTsldxnCXB821Kk=
254254
github.com/pkg/errors v0.8.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0=
255255
github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM=
256256
github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=

p2p/transport/webrtc/connection.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -188,7 +188,7 @@ func (c *connection) OpenStream(ctx context.Context) (network.MuxedStream, error
188188
dc.Close()
189189
return nil, fmt.Errorf("detach channel failed for stream(%d): %w", streamID, err)
190190
}
191-
str := newStream(dc, rwc, func() { c.removeStream(streamID) })
191+
str := newStream(dc, rwc, maxSendMessageSize, func() { c.removeStream(streamID) })
192192
if err := c.addStream(str); err != nil {
193193
str.Reset()
194194
return nil, fmt.Errorf("failed to add stream(%d) to connection: %w", streamID, err)
@@ -201,7 +201,7 @@ func (c *connection) AcceptStream() (network.MuxedStream, error) {
201201
case <-c.ctx.Done():
202202
return nil, c.closeErr
203203
case dc := <-c.acceptQueue:
204-
str := newStream(dc.channel, dc.stream, func() { c.removeStream(*dc.channel.ID()) })
204+
str := newStream(dc.channel, dc.stream, maxSendMessageSize, func() { c.removeStream(*dc.channel.ID()) })
205205
if err := c.addStream(str); err != nil {
206206
str.Reset()
207207
return nil, err

p2p/transport/webrtc/listener.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -253,7 +253,7 @@ func (l *listener) setupConnection(
253253
if err != nil {
254254
return nil, err
255255
}
256-
handshakeChannel := newStream(w.HandshakeDataChannel, rwc, func() {})
256+
handshakeChannel := newStream(w.HandshakeDataChannel, rwc, maxSendMessageSize, nil)
257257
// we do not yet know A's peer ID so accept any inbound
258258
remotePubKey, err := l.transport.noiseHandshake(ctx, w.PeerConnection, handshakeChannel, "", crypto.SHA256, true)
259259
if err != nil {

p2p/transport/webrtc/stream.go

Lines changed: 29 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -15,22 +15,9 @@ import (
1515
)
1616

1717
const (
18-
// maxMessageSize is the maximum message size of the Protobuf message we send / receive.
19-
maxMessageSize = 16384
20-
// maxSendBuffer is the maximum data we enqueue on the underlying data channel for writes.
21-
// The underlying SCTP layer has an unbounded buffer for writes. We limit the amount enqueued
22-
// per stream is limited to avoid a single stream monopolizing the entire connection.
23-
maxSendBuffer = 2 * maxMessageSize
24-
// sendBufferLowThreshold is the threshold below which we write more data on the underlying
25-
// data channel. We want a notification as soon as we can write 1 full sized message.
26-
sendBufferLowThreshold = maxSendBuffer - maxMessageSize
27-
// maxTotalControlMessagesSize is the maximum total size of all control messages we will
28-
// write on this stream.
29-
// 4 control messages of size 10 bytes + 10 bytes buffer. This number doesn't need to be
30-
// exact. In the worst case, we enqueue these many bytes more in the webrtc peer connection
31-
// send queue.
32-
maxTotalControlMessagesSize = 50
33-
18+
// maxSendMessageSize is the maximum message size of the Protobuf message we send / receive.
19+
// NOTE: Change `varintOverhead` if you change this.
20+
maxSendMessageSize = 16384
3421
// Proto overhead assumption is 5 bytes
3522
protoOverhead = 5
3623
// Varint overhead is assumed to be 2 bytes. This is safe since
@@ -40,9 +27,20 @@ const (
4027
// is less than or equal to 2 ^ 14, the varint will not be more than
4128
// 2 bytes in length.
4229
varintOverhead = 2
30+
31+
// maxTotalControlMessagesSize is the maximum total size of all control messages we will
32+
// write on this stream.
33+
// 4 control messages of size 10 bytes + 10 bytes buffer. This number doesn't need to be
34+
// exact. In the worst case, we enqueue these many bytes more in the webrtc peer connection
35+
// send queue.
36+
maxTotalControlMessagesSize = 50
37+
4338
// maxFINACKWait is the maximum amount of time a stream will wait to read
4439
// FIN_ACK before closing the data channel
4540
maxFINACKWait = 10 * time.Second
41+
42+
// maxReceiveMessageSize is the maximum message size of the Protobuf message we receive.
43+
maxReceiveMessageSize = 256<<10 + 1<<10 // 1kB buffer
4644
)
4745

4846
type receiveState uint8
@@ -79,11 +77,12 @@ type stream struct {
7977
nextMessage *pb.Message
8078
receiveState receiveState
8179

82-
writer pbio.Writer // concurrent writes prevented by mx
83-
writeStateChanged chan struct{}
84-
sendState sendState
85-
writeDeadline time.Time
86-
writeError error
80+
writer pbio.Writer // concurrent writes prevented by mx
81+
writeStateChanged chan struct{}
82+
sendState sendState
83+
writeDeadline time.Time
84+
writeError error
85+
maxSendMessageSize int
8786

8887
controlMessageReaderOnce sync.Once
8988
// controlMessageReaderEndTime is the end time for reading FIN_ACK from the control
@@ -105,20 +104,21 @@ var _ network.MuxedStream = &stream{}
105104
func newStream(
106105
channel *webrtc.DataChannel,
107106
rwc datachannel.ReadWriteCloser,
107+
maxSendMessageSize int,
108108
onDone func(),
109109
) *stream {
110110
s := &stream{
111-
reader: pbio.NewDelimitedReader(rwc, maxMessageSize),
112-
writer: pbio.NewDelimitedWriter(rwc),
113-
writeStateChanged: make(chan struct{}, 1),
114-
id: *channel.ID(),
115-
dataChannel: rwc.(*datachannel.DataChannel),
116-
onDone: onDone,
111+
reader: pbio.NewDelimitedReader(rwc, maxReceiveMessageSize),
112+
writer: pbio.NewDelimitedWriter(rwc),
113+
writeStateChanged: make(chan struct{}, 1),
114+
id: *channel.ID(),
115+
dataChannel: rwc.(*datachannel.DataChannel),
116+
onDone: onDone,
117+
maxSendMessageSize: maxSendMessageSize,
117118
}
118-
s.dataChannel.SetBufferedAmountLowThreshold(sendBufferLowThreshold)
119+
s.dataChannel.SetBufferedAmountLowThreshold(uint64(s.sendBufferLowThreshold()))
119120
s.dataChannel.OnBufferedAmountLow(func() {
120121
s.notifyWriteStateChanged()
121-
122122
})
123123
return s
124124
}

0 commit comments

Comments
 (0)