Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 5 additions & 5 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -47,11 +47,11 @@ require (
github.com/multiformats/go-varint v0.0.7
github.com/pbnjay/memory v0.0.0-20210728143218-7b4eea64cf58
github.com/pion/datachannel v1.5.10
github.com/pion/ice/v4 v4.0.6
github.com/pion/ice/v4 v4.0.8
github.com/pion/logging v0.2.3
github.com/pion/sctp v1.8.36
github.com/pion/sctp v1.8.37
github.com/pion/stun v0.6.1
github.com/pion/webrtc/v4 v4.0.10
github.com/pion/webrtc/v4 v4.0.14
github.com/prometheus/client_golang v1.21.0
github.com/prometheus/client_model v0.6.1
github.com/quic-go/quic-go v0.50.0
Expand Down Expand Up @@ -93,8 +93,8 @@ require (
github.com/pion/mdns/v2 v2.0.7 // indirect
github.com/pion/randutil v0.1.0 // indirect
github.com/pion/rtcp v1.2.15 // indirect
github.com/pion/rtp v1.8.11 // indirect
github.com/pion/sdp/v3 v3.0.10 // indirect
github.com/pion/rtp v1.8.13 // indirect
github.com/pion/sdp/v3 v3.0.11 // indirect
github.com/pion/srtp/v3 v3.0.4 // indirect
github.com/pion/stun/v3 v3.0.0 // indirect
github.com/pion/transport/v2 v2.2.10 // indirect
Expand Down
20 changes: 10 additions & 10 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -216,8 +216,8 @@ github.com/pion/dtls/v2 v2.2.12 h1:KP7H5/c1EiVAAKUmXyCzPiQe5+bCJrpOeKg/L05dunk=
github.com/pion/dtls/v2 v2.2.12/go.mod h1:d9SYc9fch0CqK90mRk1dC7AkzzpwJj6u2GU3u+9pqFE=
github.com/pion/dtls/v3 v3.0.4 h1:44CZekewMzfrn9pmGrj5BNnTMDCFwr+6sLH+cCuLM7U=
github.com/pion/dtls/v3 v3.0.4/go.mod h1:R373CsjxWqNPf6MEkfdy3aSe9niZvL/JaKlGeFphtMg=
github.com/pion/ice/v4 v4.0.6 h1:jmM9HwI9lfetQV/39uD0nY4y++XZNPhvzIPCb8EwxUM=
github.com/pion/ice/v4 v4.0.6/go.mod h1:y3M18aPhIxLlcO/4dn9X8LzLLSma84cx6emMSu14FGw=
github.com/pion/ice/v4 v4.0.8 h1:ajNx0idNG+S+v9Phu4LSn2cs8JEfTsA1/tEjkkAVpFY=
github.com/pion/ice/v4 v4.0.8/go.mod h1:y3M18aPhIxLlcO/4dn9X8LzLLSma84cx6emMSu14FGw=
github.com/pion/interceptor v0.1.37 h1:aRA8Zpab/wE7/c0O3fh1PqY0AJI3fCSEM5lRWJVorwI=
github.com/pion/interceptor v0.1.37/go.mod h1:JzxbJ4umVTlZAf+/utHzNesY8tmRkM2lVmkS82TTj8Y=
github.com/pion/logging v0.2.2/go.mod h1:k0/tDVsRCX2Mb2ZEmTqNa7CWsQPc+YYCB7Q+5pahoms=
Expand All @@ -229,12 +229,12 @@ github.com/pion/randutil v0.1.0 h1:CFG1UdESneORglEsnimhUjf33Rwjubwj6xfiOXBa3mA=
github.com/pion/randutil v0.1.0/go.mod h1:XcJrSMMbbMRhASFVOlj/5hQial/Y8oH/HVo7TBZq+j8=
github.com/pion/rtcp v1.2.15 h1:LZQi2JbdipLOj4eBjK4wlVoQWfrZbh3Q6eHtWtJBZBo=
github.com/pion/rtcp v1.2.15/go.mod h1:jlGuAjHMEXwMUHK78RgX0UmEJFV4zUKOFHR7OP+D3D0=
github.com/pion/rtp v1.8.11 h1:17xjnY5WO5hgO6SD3/NTIUPvSFw/PbLsIJyz1r1yNIk=
github.com/pion/rtp v1.8.11/go.mod h1:8uMBJj32Pa1wwx8Fuv/AsFhn8jsgw+3rUC2PfoBZ8p4=
github.com/pion/sctp v1.8.36 h1:owNudmnz1xmhfYje5L/FCav3V9wpPRePHle3Zi+P+M0=
github.com/pion/sctp v1.8.36/go.mod h1:cNiLdchXra8fHQwmIoqw0MbLLMs+f7uQ+dGMG2gWebE=
github.com/pion/sdp/v3 v3.0.10 h1:6MChLE/1xYB+CjumMw+gZ9ufp2DPApuVSnDT8t5MIgA=
github.com/pion/sdp/v3 v3.0.10/go.mod h1:88GMahN5xnScv1hIMTqLdu/cOcUkj6a9ytbncwMCq2E=
github.com/pion/rtp v1.8.13 h1:8uSUPpjSL4OlwZI8Ygqu7+h2p9NPFB+yAZ461Xn5sNg=
github.com/pion/rtp v1.8.13/go.mod h1:8uMBJj32Pa1wwx8Fuv/AsFhn8jsgw+3rUC2PfoBZ8p4=
github.com/pion/sctp v1.8.37 h1:ZDmGPtRPX9mKCiVXtMbTWybFw3z/hVKAZgU81wcOrqs=
github.com/pion/sctp v1.8.37/go.mod h1:cNiLdchXra8fHQwmIoqw0MbLLMs+f7uQ+dGMG2gWebE=
github.com/pion/sdp/v3 v3.0.11 h1:VhgVSopdsBKwhCFoyyPmT1fKMeV9nLMrEKxNOdy3IVI=
github.com/pion/sdp/v3 v3.0.11/go.mod h1:88GMahN5xnScv1hIMTqLdu/cOcUkj6a9ytbncwMCq2E=
github.com/pion/srtp/v3 v3.0.4 h1:2Z6vDVxzrX3UHEgrUyIGM4rRouoC7v+NiF1IHtp9B5M=
github.com/pion/srtp/v3 v3.0.4/go.mod h1:1Jx3FwDoxpRaTh1oRV8A/6G1BnFL+QI82eK4ms8EEJQ=
github.com/pion/stun v0.6.1 h1:8lp6YejULeHBF8NmV8e2787BogQhduZugh5PdhDyyN4=
Expand All @@ -249,8 +249,8 @@ github.com/pion/transport/v3 v3.0.7 h1:iRbMH05BzSNwhILHoBoAPxoB9xQgOaJk+591KC9P1
github.com/pion/transport/v3 v3.0.7/go.mod h1:YleKiTZ4vqNxVwh77Z0zytYi7rXHl7j6uPLGhhz9rwo=
github.com/pion/turn/v4 v4.0.0 h1:qxplo3Rxa9Yg1xXDxxH8xaqcyGUtbHYw4QSCvmFWvhM=
github.com/pion/turn/v4 v4.0.0/go.mod h1:MuPDkm15nYSklKpN8vWJ9W2M0PlyQZqYt1McGuxG7mA=
github.com/pion/webrtc/v4 v4.0.10 h1:Hq/JLjhqLxi+NmCtE8lnRPDr8H4LcNvwg8OxVcdv56Q=
github.com/pion/webrtc/v4 v4.0.10/go.mod h1:ViHLVaNpiuvaH8pdiuQxuA9awuE6KVzAXx3vVWilOck=
github.com/pion/webrtc/v4 v4.0.14 h1:nyds/sFRR+HvmWoBa6wrL46sSfpArE0qR883MBW96lg=
github.com/pion/webrtc/v4 v4.0.14/go.mod h1:R3+qTnQTS03UzwDarYecgioNf7DYgTsldxnCXB821Kk=
github.com/pkg/errors v0.8.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0=
github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM=
github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
Expand Down
4 changes: 2 additions & 2 deletions p2p/transport/webrtc/connection.go
Original file line number Diff line number Diff line change
Expand Up @@ -188,7 +188,7 @@ func (c *connection) OpenStream(ctx context.Context) (network.MuxedStream, error
dc.Close()
return nil, fmt.Errorf("detach channel failed for stream(%d): %w", streamID, err)
}
str := newStream(dc, rwc, func() { c.removeStream(streamID) })
str := newStream(dc, rwc, maxSendMessageSize, func() { c.removeStream(streamID) })
if err := c.addStream(str); err != nil {
str.Reset()
return nil, fmt.Errorf("failed to add stream(%d) to connection: %w", streamID, err)
Expand All @@ -201,7 +201,7 @@ func (c *connection) AcceptStream() (network.MuxedStream, error) {
case <-c.ctx.Done():
return nil, c.closeErr
case dc := <-c.acceptQueue:
str := newStream(dc.channel, dc.stream, func() { c.removeStream(*dc.channel.ID()) })
str := newStream(dc.channel, dc.stream, maxSendMessageSize, func() { c.removeStream(*dc.channel.ID()) })
if err := c.addStream(str); err != nil {
str.Reset()
return nil, err
Expand Down
2 changes: 1 addition & 1 deletion p2p/transport/webrtc/listener.go
Original file line number Diff line number Diff line change
Expand Up @@ -253,7 +253,7 @@ func (l *listener) setupConnection(
if err != nil {
return nil, err
}
handshakeChannel := newStream(w.HandshakeDataChannel, rwc, func() {})
handshakeChannel := newStream(w.HandshakeDataChannel, rwc, maxSendMessageSize, nil)
// we do not yet know A's peer ID so accept any inbound
remotePubKey, err := l.transport.noiseHandshake(ctx, w.PeerConnection, handshakeChannel, "", crypto.SHA256, true)
if err != nil {
Expand Down
58 changes: 29 additions & 29 deletions p2p/transport/webrtc/stream.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,22 +15,9 @@ import (
)

const (
// maxMessageSize is the maximum message size of the Protobuf message we send / receive.
maxMessageSize = 16384
// maxSendBuffer is the maximum data we enqueue on the underlying data channel for writes.
// The underlying SCTP layer has an unbounded buffer for writes. We limit the amount enqueued
// per stream is limited to avoid a single stream monopolizing the entire connection.
maxSendBuffer = 2 * maxMessageSize
// sendBufferLowThreshold is the threshold below which we write more data on the underlying
// data channel. We want a notification as soon as we can write 1 full sized message.
sendBufferLowThreshold = maxSendBuffer - maxMessageSize
// maxTotalControlMessagesSize is the maximum total size of all control messages we will
// write on this stream.
// 4 control messages of size 10 bytes + 10 bytes buffer. This number doesn't need to be
// exact. In the worst case, we enqueue these many bytes more in the webrtc peer connection
// send queue.
maxTotalControlMessagesSize = 50

// maxSendMessageSize is the maximum message size of the Protobuf message we send / receive.
// NOTE: Change `varintOverhead` if you change this.
maxSendMessageSize = 16384
// Proto overhead assumption is 5 bytes
protoOverhead = 5
// Varint overhead is assumed to be 2 bytes. This is safe since
Expand All @@ -40,9 +27,20 @@ const (
// is less than or equal to 2 ^ 14, the varint will not be more than
// 2 bytes in length.
varintOverhead = 2

// maxTotalControlMessagesSize is the maximum total size of all control messages we will
// write on this stream.
// 4 control messages of size 10 bytes + 10 bytes buffer. This number doesn't need to be
// exact. In the worst case, we enqueue these many bytes more in the webrtc peer connection
// send queue.
maxTotalControlMessagesSize = 50

// maxFINACKWait is the maximum amount of time a stream will wait to read
// FIN_ACK before closing the data channel
maxFINACKWait = 10 * time.Second

// maxReceiveMessageSize is the maximum message size of the Protobuf message we receive.
maxReceiveMessageSize = 256<<10 + 1<<10 // 1kB buffer
)

type receiveState uint8
Expand Down Expand Up @@ -79,11 +77,12 @@ type stream struct {
nextMessage *pb.Message
receiveState receiveState

writer pbio.Writer // concurrent writes prevented by mx
writeStateChanged chan struct{}
sendState sendState
writeDeadline time.Time
writeError error
writer pbio.Writer // concurrent writes prevented by mx
writeStateChanged chan struct{}
sendState sendState
writeDeadline time.Time
writeError error
maxSendMessageSize int

controlMessageReaderOnce sync.Once
// controlMessageReaderEndTime is the end time for reading FIN_ACK from the control
Expand All @@ -105,20 +104,21 @@ var _ network.MuxedStream = &stream{}
func newStream(
channel *webrtc.DataChannel,
rwc datachannel.ReadWriteCloser,
maxSendMessageSize int,
onDone func(),
) *stream {
s := &stream{
reader: pbio.NewDelimitedReader(rwc, maxMessageSize),
writer: pbio.NewDelimitedWriter(rwc),
writeStateChanged: make(chan struct{}, 1),
id: *channel.ID(),
dataChannel: rwc.(*datachannel.DataChannel),
onDone: onDone,
reader: pbio.NewDelimitedReader(rwc, maxReceiveMessageSize),
writer: pbio.NewDelimitedWriter(rwc),
writeStateChanged: make(chan struct{}, 1),
id: *channel.ID(),
dataChannel: rwc.(*datachannel.DataChannel),
onDone: onDone,
maxSendMessageSize: maxSendMessageSize,
}
s.dataChannel.SetBufferedAmountLowThreshold(sendBufferLowThreshold)
s.dataChannel.SetBufferedAmountLowThreshold(uint64(s.sendBufferLowThreshold()))
s.dataChannel.OnBufferedAmountLow(func() {
s.notifyWriteStateChanged()

})
return s
}
Expand Down
Loading