Skip to content

Commit c05dd2f

Browse files
committed
Add TWCC tests
1 parent e26c7f6 commit c05dd2f

File tree

2 files changed

+156
-122
lines changed

2 files changed

+156
-122
lines changed

bwe_test.go

Lines changed: 137 additions & 109 deletions
Original file line numberDiff line numberDiff line change
@@ -67,120 +67,148 @@ func TestMain(m *testing.M) {
6767

6868
func TestBWE(t *testing.T) {
6969
networks := map[string]vnetFactory{
70-
"constant_capacity_1mbps_very_low_latency": createVirtualNetwork(1_000_000, 80_000, 50*time.Millisecond),
71-
"constant_capacity_5mbps_very_low_latency": createVirtualNetwork(5_000_000, 80_000, 50*time.Millisecond),
72-
"constant_capacity_1mbps_low_latency": createVirtualNetwork(1_000_000, 80_000, 50*time.Millisecond),
73-
"constant_capacity_5mbps_low_latency": createVirtualNetwork(5_000_000, 80_000, 50*time.Millisecond),
74-
"constant_capacity_1mbps_medium_latency": createVirtualNetwork(1_000_000, 80_000, 150*time.Millisecond),
75-
"constant_capacity_5mbps_medium_latency": createVirtualNetwork(5_000_000, 80_000, 150*time.Millisecond),
76-
"constant_capacity_1mbps_high_latency": createVirtualNetwork(1_000_000, 80_000, 300*time.Millisecond),
77-
"constant_capacity_5mbps_high_latency": createVirtualNetwork(5_000_000, 80_000, 300*time.Millisecond),
70+
"1mbps_very_low_latency": createVirtualNetwork(1_000_000, 80_000, 50*time.Millisecond),
71+
"5mbps_very_low_latency": createVirtualNetwork(5_000_000, 80_000, 50*time.Millisecond),
72+
"1mbps_low_latency": createVirtualNetwork(1_000_000, 80_000, 50*time.Millisecond),
73+
"5mbps_low_latency": createVirtualNetwork(5_000_000, 80_000, 50*time.Millisecond),
74+
"1mbps_medium_latency": createVirtualNetwork(1_000_000, 80_000, 150*time.Millisecond),
75+
"5mbps_medium_latency": createVirtualNetwork(5_000_000, 80_000, 150*time.Millisecond),
76+
"1mbps_high_latency": createVirtualNetwork(1_000_000, 80_000, 300*time.Millisecond),
77+
"5mbps_high_latency": createVirtualNetwork(5_000_000, 80_000, 300*time.Millisecond),
7878
}
79-
for name, vnf := range networks {
80-
t.Run(name, func(t *testing.T) {
81-
synctest.Test(t, func(t *testing.T) {
82-
t.Helper()
83-
84-
logger, cleanup := testLogger(t)
85-
defer cleanup()
86-
87-
onTrack := make(chan struct{})
88-
connected := make(chan struct{})
89-
done := make(chan struct{})
90-
91-
network := vnf(t)
92-
93-
receiver, err := newPeer(
94-
registerDefaultCodecs(),
95-
setVNet(network.left, []string{"10.0.1.1"}),
96-
onRemoteTrack(func(track *webrtc.TrackRemote) {
97-
close(onTrack)
98-
go func() {
99-
buf := make([]byte, 1500)
100-
for {
101-
select {
102-
case <-done:
103-
return
104-
default:
105-
_, _, err := track.Read(buf)
106-
if errors.Is(err, io.EOF) {
79+
peerOptions := map[string]struct {
80+
receiver []option
81+
sender []option
82+
}{
83+
"gcc-ccfb": {
84+
receiver: []option{
85+
registerCCFB(),
86+
},
87+
sender: []option{
88+
registerPacer(),
89+
initGCC(),
90+
},
91+
},
92+
"gcc-twcc": {
93+
receiver: []option{
94+
registerTWCC(),
95+
},
96+
sender: []option{
97+
registerPacer(),
98+
registerTWCCHeaderExtension(),
99+
initGCC(),
100+
},
101+
},
102+
}
103+
for netName, vnf := range networks {
104+
for peerName, pos := range peerOptions {
105+
t.Run(fmt.Sprintf("%v-%v", netName, peerName), func(t *testing.T) {
106+
synctest.Test(t, func(t *testing.T) {
107+
t.Helper()
108+
109+
logger, cleanup := testLogger(t)
110+
defer cleanup()
111+
112+
onTrack := make(chan struct{})
113+
connected := make(chan struct{})
114+
done := make(chan struct{})
115+
116+
network := vnf(t)
117+
118+
receiverOptions := []option{
119+
registerDefaultCodecs(),
120+
setVNet(network.left, []string{"10.0.1.1"}),
121+
onRemoteTrack(func(track *webrtc.TrackRemote) {
122+
close(onTrack)
123+
go func() {
124+
buf := make([]byte, 1500)
125+
for {
126+
select {
127+
case <-done:
107128
return
129+
default:
130+
_, _, err := track.Read(buf)
131+
if errors.Is(err, io.EOF) {
132+
return
133+
}
134+
assert.NoError(t, err)
108135
}
109-
assert.NoError(t, err)
110136
}
111-
}
112-
}()
113-
}),
114-
registerPacketLogger(logger.With("vantage-point", "receiver")),
115-
registerCCFB(),
116-
)
117-
assert.NoError(t, err)
118-
119-
err = receiver.addRemoteTrack()
120-
assert.NoError(t, err)
121-
122-
var codec *perfectCodec
123-
sender, err := newPeer(
124-
registerDefaultCodecs(),
125-
onConnected(func() { close(connected) }),
126-
setVNet(network.right, []string{"10.0.2.1"}),
127-
registerPacketLogger(logger.With("vantage-point", "sender")),
128-
registerRTPFB(),
129-
registerPacer(),
130-
initGCC(func(rate int) {
131-
logger.Info("setting codec target bitrate", "rate", rate)
132-
codec.setTargetBitrate(rate)
133-
}),
134-
)
135-
assert.NoError(t, err)
136-
137-
track, err := sender.addLocalTrack()
138-
assert.NoError(t, err)
139-
140-
codec = newPerfectCodec(track, 1_000_000)
141-
go func() {
142-
<-connected
143-
codec.start()
144-
}()
145-
146-
offer, err := sender.createOffer()
147-
assert.NoError(t, err)
148-
149-
err = receiver.setRemoteDescription(offer)
150-
assert.NoError(t, err)
151-
152-
answer, err := receiver.createAnswer()
153-
assert.NoError(t, err)
154-
155-
err = sender.setRemoteDescription(answer)
156-
assert.NoError(t, err)
157-
158-
synctest.Wait()
159-
160-
select {
161-
case <-onTrack:
162-
case <-time.After(5 * time.Second):
163-
assert.Fail(t, "on track not called")
164-
}
165-
166-
time.Sleep(100 * time.Second)
167-
close(done)
168-
169-
err = codec.Close()
170-
assert.NoError(t, err)
171-
172-
err = sender.pc.Close()
173-
assert.NoError(t, err)
174-
175-
err = receiver.pc.Close()
176-
assert.NoError(t, err)
177-
178-
err = network.Close()
179-
assert.NoError(t, err)
180-
181-
synctest.Wait()
137+
}()
138+
}),
139+
registerPacketLogger(logger.With("vantage-point", "receiver")),
140+
}
141+
receiverOptions = append(receiverOptions, pos.receiver...)
142+
receiver, err := newPeer(receiverOptions...)
143+
assert.NoError(t, err)
144+
145+
err = receiver.addRemoteTrack()
146+
assert.NoError(t, err)
147+
148+
var codec *perfectCodec
149+
senderOptions := []option{
150+
registerDefaultCodecs(),
151+
onConnected(func() { close(connected) }),
152+
setVNet(network.right, []string{"10.0.2.1"}),
153+
registerPacketLogger(logger.With("vantage-point", "sender")),
154+
registerRTPFB(),
155+
setOnRateCallback(func(rate int) {
156+
logger.Info("setting codec target bitrate", "rate", rate)
157+
codec.setTargetBitrate(rate)
158+
}),
159+
}
160+
senderOptions = append(senderOptions, pos.sender...)
161+
sender, err := newPeer(senderOptions...)
162+
assert.NoError(t, err)
163+
164+
track, err := sender.addLocalTrack()
165+
assert.NoError(t, err)
166+
167+
codec = newPerfectCodec(track, 1_000_000)
168+
go func() {
169+
<-connected
170+
codec.start()
171+
}()
172+
173+
offer, err := sender.createOffer()
174+
assert.NoError(t, err)
175+
176+
err = receiver.setRemoteDescription(offer)
177+
assert.NoError(t, err)
178+
179+
answer, err := receiver.createAnswer()
180+
assert.NoError(t, err)
181+
182+
err = sender.setRemoteDescription(answer)
183+
assert.NoError(t, err)
184+
185+
synctest.Wait()
186+
187+
select {
188+
case <-onTrack:
189+
case <-time.After(5 * time.Second):
190+
assert.Fail(t, "on track not called")
191+
}
192+
193+
time.Sleep(100 * time.Second)
194+
close(done)
195+
196+
err = codec.Close()
197+
assert.NoError(t, err)
198+
199+
err = sender.pc.Close()
200+
assert.NoError(t, err)
201+
202+
err = receiver.pc.Close()
203+
assert.NoError(t, err)
204+
205+
err = network.Close()
206+
assert.NoError(t, err)
207+
208+
synctest.Wait()
209+
})
182210
})
183-
})
211+
}
184212
}
185213
}
186214

peer_test.go

Lines changed: 19 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -96,18 +96,17 @@ func registerTWCC() option {
9696
}
9797
}
9898

99-
//
100-
// func registerTWCCHeaderExtension() option {
101-
// return func(p *peer) error {
102-
// twccHdrExt, err := twcc.NewHeaderExtensionInterceptor()
103-
// if err != nil {
104-
// return err
105-
// }
106-
// p.interceptorRegistry.Add(twccHdrExt)
107-
//
108-
// return nil
109-
// }
110-
// }
99+
func registerTWCCHeaderExtension() option {
100+
return func(p *peer) error {
101+
twccHdrExt, err := twcc.NewHeaderExtensionInterceptor()
102+
if err != nil {
103+
return err
104+
}
105+
p.interceptorRegistry.Add(twccHdrExt)
106+
107+
return nil
108+
}
109+
}
111110

112111
func registerCCFB() option {
113112
return func(p *peer) error {
@@ -121,12 +120,19 @@ func registerCCFB() option {
121120
}
122121
}
123122

124-
func initGCC(onRateUpdate func(int)) option {
123+
func initGCC() option {
125124
return func(p *peer) (err error) {
126125
p.estimator, err = gcc.NewSendSideController(1_000_000, 128_000, 50_000_000)
127126
if err != nil {
128127
return err
129128
}
129+
130+
return nil
131+
}
132+
}
133+
134+
func setOnRateCallback(onRateUpdate func(int)) option {
135+
return func(p *peer) error {
130136
p.onRateUpdate = onRateUpdate
131137

132138
return nil

0 commit comments

Comments
 (0)