Skip to content

Commit 803505d

Browse files
authored
webrtc: support receiving 256kB messages (#3255)
In experiments with js we've found that increasing the message size increases throughput. See: libp2p/specs#628 (comment) for details. This changes the protobuf reader for the stream to read 256kB messages. This also forces a change to the connection SCTP read buffer to be increased to about 2.5 MB, to support 1 message being buffered for 10 streams. This isn't enough to support larger messages. We most likely need to change the inferred SDP of the server to use 256kB maxMessageSize, and need some backwards compatible mechanism in the handshake to opt in to large messages. See: libp2p/specs#628 for details
1 parent 74e6e2c commit 803505d

File tree

11 files changed

+144
-113
lines changed

11 files changed

+144
-113
lines changed

go.mod

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -46,11 +46,11 @@ require (
4646
github.com/multiformats/go-varint v0.0.7
4747
github.com/pbnjay/memory v0.0.0-20210728143218-7b4eea64cf58
4848
github.com/pion/datachannel v1.5.10
49-
github.com/pion/ice/v4 v4.0.6
49+
github.com/pion/ice/v4 v4.0.8
5050
github.com/pion/logging v0.2.3
51-
github.com/pion/sctp v1.8.36
51+
github.com/pion/sctp v1.8.37
5252
github.com/pion/stun v0.6.1
53-
github.com/pion/webrtc/v4 v4.0.10
53+
github.com/pion/webrtc/v4 v4.0.14
5454
github.com/prometheus/client_golang v1.21.0
5555
github.com/prometheus/client_model v0.6.1
5656
github.com/quic-go/quic-go v0.50.0
@@ -89,8 +89,8 @@ require (
8989
github.com/pion/mdns/v2 v2.0.7 // indirect
9090
github.com/pion/randutil v0.1.0 // indirect
9191
github.com/pion/rtcp v1.2.15 // indirect
92-
github.com/pion/rtp v1.8.11 // indirect
93-
github.com/pion/sdp/v3 v3.0.10 // indirect
92+
github.com/pion/rtp v1.8.13 // indirect
93+
github.com/pion/sdp/v3 v3.0.11 // indirect
9494
github.com/pion/srtp/v3 v3.0.4 // indirect
9595
github.com/pion/stun/v3 v3.0.0 // indirect
9696
github.com/pion/transport/v2 v2.2.10 // indirect

go.sum

Lines changed: 10 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -194,8 +194,8 @@ github.com/pion/dtls/v2 v2.2.12 h1:KP7H5/c1EiVAAKUmXyCzPiQe5+bCJrpOeKg/L05dunk=
194194
github.com/pion/dtls/v2 v2.2.12/go.mod h1:d9SYc9fch0CqK90mRk1dC7AkzzpwJj6u2GU3u+9pqFE=
195195
github.com/pion/dtls/v3 v3.0.4 h1:44CZekewMzfrn9pmGrj5BNnTMDCFwr+6sLH+cCuLM7U=
196196
github.com/pion/dtls/v3 v3.0.4/go.mod h1:R373CsjxWqNPf6MEkfdy3aSe9niZvL/JaKlGeFphtMg=
197-
github.com/pion/ice/v4 v4.0.6 h1:jmM9HwI9lfetQV/39uD0nY4y++XZNPhvzIPCb8EwxUM=
198-
github.com/pion/ice/v4 v4.0.6/go.mod h1:y3M18aPhIxLlcO/4dn9X8LzLLSma84cx6emMSu14FGw=
197+
github.com/pion/ice/v4 v4.0.8 h1:ajNx0idNG+S+v9Phu4LSn2cs8JEfTsA1/tEjkkAVpFY=
198+
github.com/pion/ice/v4 v4.0.8/go.mod h1:y3M18aPhIxLlcO/4dn9X8LzLLSma84cx6emMSu14FGw=
199199
github.com/pion/interceptor v0.1.37 h1:aRA8Zpab/wE7/c0O3fh1PqY0AJI3fCSEM5lRWJVorwI=
200200
github.com/pion/interceptor v0.1.37/go.mod h1:JzxbJ4umVTlZAf+/utHzNesY8tmRkM2lVmkS82TTj8Y=
201201
github.com/pion/logging v0.2.2/go.mod h1:k0/tDVsRCX2Mb2ZEmTqNa7CWsQPc+YYCB7Q+5pahoms=
@@ -207,12 +207,12 @@ github.com/pion/randutil v0.1.0 h1:CFG1UdESneORglEsnimhUjf33Rwjubwj6xfiOXBa3mA=
207207
github.com/pion/randutil v0.1.0/go.mod h1:XcJrSMMbbMRhASFVOlj/5hQial/Y8oH/HVo7TBZq+j8=
208208
github.com/pion/rtcp v1.2.15 h1:LZQi2JbdipLOj4eBjK4wlVoQWfrZbh3Q6eHtWtJBZBo=
209209
github.com/pion/rtcp v1.2.15/go.mod h1:jlGuAjHMEXwMUHK78RgX0UmEJFV4zUKOFHR7OP+D3D0=
210-
github.com/pion/rtp v1.8.11 h1:17xjnY5WO5hgO6SD3/NTIUPvSFw/PbLsIJyz1r1yNIk=
211-
github.com/pion/rtp v1.8.11/go.mod h1:8uMBJj32Pa1wwx8Fuv/AsFhn8jsgw+3rUC2PfoBZ8p4=
212-
github.com/pion/sctp v1.8.36 h1:owNudmnz1xmhfYje5L/FCav3V9wpPRePHle3Zi+P+M0=
213-
github.com/pion/sctp v1.8.36/go.mod h1:cNiLdchXra8fHQwmIoqw0MbLLMs+f7uQ+dGMG2gWebE=
214-
github.com/pion/sdp/v3 v3.0.10 h1:6MChLE/1xYB+CjumMw+gZ9ufp2DPApuVSnDT8t5MIgA=
215-
github.com/pion/sdp/v3 v3.0.10/go.mod h1:88GMahN5xnScv1hIMTqLdu/cOcUkj6a9ytbncwMCq2E=
210+
github.com/pion/rtp v1.8.13 h1:8uSUPpjSL4OlwZI8Ygqu7+h2p9NPFB+yAZ461Xn5sNg=
211+
github.com/pion/rtp v1.8.13/go.mod h1:8uMBJj32Pa1wwx8Fuv/AsFhn8jsgw+3rUC2PfoBZ8p4=
212+
github.com/pion/sctp v1.8.37 h1:ZDmGPtRPX9mKCiVXtMbTWybFw3z/hVKAZgU81wcOrqs=
213+
github.com/pion/sctp v1.8.37/go.mod h1:cNiLdchXra8fHQwmIoqw0MbLLMs+f7uQ+dGMG2gWebE=
214+
github.com/pion/sdp/v3 v3.0.11 h1:VhgVSopdsBKwhCFoyyPmT1fKMeV9nLMrEKxNOdy3IVI=
215+
github.com/pion/sdp/v3 v3.0.11/go.mod h1:88GMahN5xnScv1hIMTqLdu/cOcUkj6a9ytbncwMCq2E=
216216
github.com/pion/srtp/v3 v3.0.4 h1:2Z6vDVxzrX3UHEgrUyIGM4rRouoC7v+NiF1IHtp9B5M=
217217
github.com/pion/srtp/v3 v3.0.4/go.mod h1:1Jx3FwDoxpRaTh1oRV8A/6G1BnFL+QI82eK4ms8EEJQ=
218218
github.com/pion/stun v0.6.1 h1:8lp6YejULeHBF8NmV8e2787BogQhduZugh5PdhDyyN4=
@@ -227,8 +227,8 @@ github.com/pion/transport/v3 v3.0.7 h1:iRbMH05BzSNwhILHoBoAPxoB9xQgOaJk+591KC9P1
227227
github.com/pion/transport/v3 v3.0.7/go.mod h1:YleKiTZ4vqNxVwh77Z0zytYi7rXHl7j6uPLGhhz9rwo=
228228
github.com/pion/turn/v4 v4.0.0 h1:qxplo3Rxa9Yg1xXDxxH8xaqcyGUtbHYw4QSCvmFWvhM=
229229
github.com/pion/turn/v4 v4.0.0/go.mod h1:MuPDkm15nYSklKpN8vWJ9W2M0PlyQZqYt1McGuxG7mA=
230-
github.com/pion/webrtc/v4 v4.0.10 h1:Hq/JLjhqLxi+NmCtE8lnRPDr8H4LcNvwg8OxVcdv56Q=
231-
github.com/pion/webrtc/v4 v4.0.10/go.mod h1:ViHLVaNpiuvaH8pdiuQxuA9awuE6KVzAXx3vVWilOck=
230+
github.com/pion/webrtc/v4 v4.0.14 h1:nyds/sFRR+HvmWoBa6wrL46sSfpArE0qR883MBW96lg=
231+
github.com/pion/webrtc/v4 v4.0.14/go.mod h1:R3+qTnQTS03UzwDarYecgioNf7DYgTsldxnCXB821Kk=
232232
github.com/pkg/errors v0.8.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0=
233233
github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM=
234234
github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=

p2p/transport/webrtc/connection.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -188,7 +188,7 @@ func (c *connection) OpenStream(ctx context.Context) (network.MuxedStream, error
188188
dc.Close()
189189
return nil, fmt.Errorf("detach channel failed for stream(%d): %w", streamID, err)
190190
}
191-
str := newStream(dc, rwc, func() { c.removeStream(streamID) })
191+
str := newStream(dc, rwc, maxSendMessageSize, func() { c.removeStream(streamID) })
192192
if err := c.addStream(str); err != nil {
193193
str.Reset()
194194
return nil, fmt.Errorf("failed to add stream(%d) to connection: %w", streamID, err)
@@ -201,7 +201,7 @@ func (c *connection) AcceptStream() (network.MuxedStream, error) {
201201
case <-c.ctx.Done():
202202
return nil, c.closeErr
203203
case dc := <-c.acceptQueue:
204-
str := newStream(dc.channel, dc.stream, func() { c.removeStream(*dc.channel.ID()) })
204+
str := newStream(dc.channel, dc.stream, maxSendMessageSize, func() { c.removeStream(*dc.channel.ID()) })
205205
if err := c.addStream(str); err != nil {
206206
str.Reset()
207207
return nil, err

p2p/transport/webrtc/listener.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -253,7 +253,7 @@ func (l *listener) setupConnection(
253253
if err != nil {
254254
return nil, err
255255
}
256-
handshakeChannel := newStream(w.HandshakeDataChannel, rwc, func() {})
256+
handshakeChannel := newStream(w.HandshakeDataChannel, rwc, maxSendMessageSize, nil)
257257
// we do not yet know A's peer ID so accept any inbound
258258
remotePubKey, err := l.transport.noiseHandshake(ctx, w.PeerConnection, handshakeChannel, "", crypto.SHA256, true)
259259
if err != nil {

p2p/transport/webrtc/stream.go

Lines changed: 29 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -15,22 +15,9 @@ import (
1515
)
1616

1717
const (
18-
// maxMessageSize is the maximum message size of the Protobuf message we send / receive.
19-
maxMessageSize = 16384
20-
// maxSendBuffer is the maximum data we enqueue on the underlying data channel for writes.
21-
// The underlying SCTP layer has an unbounded buffer for writes. We limit the amount enqueued
22-
// per stream is limited to avoid a single stream monopolizing the entire connection.
23-
maxSendBuffer = 2 * maxMessageSize
24-
// sendBufferLowThreshold is the threshold below which we write more data on the underlying
25-
// data channel. We want a notification as soon as we can write 1 full sized message.
26-
sendBufferLowThreshold = maxSendBuffer - maxMessageSize
27-
// maxTotalControlMessagesSize is the maximum total size of all control messages we will
28-
// write on this stream.
29-
// 4 control messages of size 10 bytes + 10 bytes buffer. This number doesn't need to be
30-
// exact. In the worst case, we enqueue these many bytes more in the webrtc peer connection
31-
// send queue.
32-
maxTotalControlMessagesSize = 50
33-
18+
// maxSendMessageSize is the maximum message size of the Protobuf message we send / receive.
19+
// NOTE: Change `varintOverhead` if you change this.
20+
maxSendMessageSize = 16384
3421
// Proto overhead assumption is 5 bytes
3522
protoOverhead = 5
3623
// Varint overhead is assumed to be 2 bytes. This is safe since
@@ -40,9 +27,20 @@ const (
4027
// is less than or equal to 2 ^ 14, the varint will not be more than
4128
// 2 bytes in length.
4229
varintOverhead = 2
30+
31+
// maxTotalControlMessagesSize is the maximum total size of all control messages we will
32+
// write on this stream.
33+
// 4 control messages of size 10 bytes + 10 bytes buffer. This number doesn't need to be
34+
// exact. In the worst case, we enqueue these many bytes more in the webrtc peer connection
35+
// send queue.
36+
maxTotalControlMessagesSize = 50
37+
4338
// maxFINACKWait is the maximum amount of time a stream will wait to read
4439
// FIN_ACK before closing the data channel
4540
maxFINACKWait = 10 * time.Second
41+
42+
// maxReceiveMessageSize is the maximum message size of the Protobuf message we receive.
43+
maxReceiveMessageSize = 256<<10 + 1<<10 // 1kB buffer
4644
)
4745

4846
type receiveState uint8
@@ -79,11 +77,12 @@ type stream struct {
7977
nextMessage *pb.Message
8078
receiveState receiveState
8179

82-
writer pbio.Writer // concurrent writes prevented by mx
83-
writeStateChanged chan struct{}
84-
sendState sendState
85-
writeDeadline time.Time
86-
writeError error
80+
writer pbio.Writer // concurrent writes prevented by mx
81+
writeStateChanged chan struct{}
82+
sendState sendState
83+
writeDeadline time.Time
84+
writeError error
85+
maxSendMessageSize int
8786

8887
controlMessageReaderOnce sync.Once
8988
// controlMessageReaderEndTime is the end time for reading FIN_ACK from the control
@@ -105,20 +104,21 @@ var _ network.MuxedStream = &stream{}
105104
func newStream(
106105
channel *webrtc.DataChannel,
107106
rwc datachannel.ReadWriteCloser,
107+
maxSendMessageSize int,
108108
onDone func(),
109109
) *stream {
110110
s := &stream{
111-
reader: pbio.NewDelimitedReader(rwc, maxMessageSize),
112-
writer: pbio.NewDelimitedWriter(rwc),
113-
writeStateChanged: make(chan struct{}, 1),
114-
id: *channel.ID(),
115-
dataChannel: rwc.(*datachannel.DataChannel),
116-
onDone: onDone,
111+
reader: pbio.NewDelimitedReader(rwc, maxReceiveMessageSize),
112+
writer: pbio.NewDelimitedWriter(rwc),
113+
writeStateChanged: make(chan struct{}, 1),
114+
id: *channel.ID(),
115+
dataChannel: rwc.(*datachannel.DataChannel),
116+
onDone: onDone,
117+
maxSendMessageSize: maxSendMessageSize,
117118
}
118-
s.dataChannel.SetBufferedAmountLowThreshold(sendBufferLowThreshold)
119+
s.dataChannel.SetBufferedAmountLowThreshold(uint64(s.sendBufferLowThreshold()))
119120
s.dataChannel.OnBufferedAmountLow(func() {
120121
s.notifyWriteStateChanged()
121-
122122
})
123123
return s
124124
}

0 commit comments

Comments
 (0)