Skip to content

Commit 7d2544c

Browse files
authored
Apply start time correction in discrete steps (#748)
1 parent 9dc7c66 commit 7d2544c

File tree

2 files changed

+190
-4
lines changed

2 files changed

+190
-4
lines changed

pkg/synchronizer/track.go

Lines changed: 24 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,7 @@ import (
3434
const (
3535
cStartTimeAdjustWindow = 2 * time.Minute
3636
cStartTimeAdjustThreshold = 5 * time.Second
37+
cStartTimeAdjustStep = 5 * time.Millisecond
3738

3839
cHighDriftLoggingThreshold = 20 * time.Millisecond
3940
)
@@ -90,6 +91,7 @@ type TrackSynchronizer struct {
9091

9192
propagationDelayEstimator *OWDEstimator
9293
totalStartTimeAdjustment time.Duration
94+
startTimeAdjustResidual time.Duration
9395
}
9496

9597
func newTrackSynchronizer(s *Synchronizer, track TrackRemote) *TrackSynchronizer {
@@ -585,7 +587,7 @@ func (t *TrackSynchronizer) maybeAdjustStartTime(asr *augmentedSenderReport) int
585587
if int32(samplesDiff) < 0 {
586588
// out-of-order, pre-start, skip
587589
t.logger.Debugw(
588-
"no adjustment due to pre-staart report",
590+
"no adjustment due to pre-start report",
589591
"receivedSR", wrappedAugmentedSenderReportLogger{asr},
590592
"state", t,
591593
"nowTS", nowTS,
@@ -623,9 +625,8 @@ func (t *TrackSynchronizer) maybeAdjustStartTime(asr *augmentedSenderReport) int
623625
getLoggingFields()...,
624626
)
625627
} else {
626-
t.logger.Debugw("adjusting start time", getLoggingFields()...)
627-
t.totalStartTimeAdjustment += time.Duration(startTimeNano - adjustedStartTimeNano)
628-
t.startTime = time.Unix(0, adjustedStartTimeNano)
628+
applied := t.applyQuantizedStartTimeAdvance(time.Duration(startTimeNano - adjustedStartTimeNano))
629+
t.logger.Debugw("adjusting start time", append(getLoggingFields(), "applied", applied)...)
629630
}
630631
}
631632

@@ -652,6 +653,25 @@ func (t *TrackSynchronizer) isPacketTooOld(packetTime time.Time) bool {
652653
return t.oldPacketThreshold != 0 && mono.Now().Sub(packetTime) > t.oldPacketThreshold
653654
}
654655

656+
func (t *TrackSynchronizer) applyQuantizedStartTimeAdvance(deltaTotal time.Duration) time.Duration {
657+
// include any prior residual
658+
deltaTotal += t.startTimeAdjustResidual
659+
660+
quanta := deltaTotal / cStartTimeAdjustStep
661+
residual := deltaTotal % cStartTimeAdjustStep
662+
663+
if quanta > 0 {
664+
applied := quanta * cStartTimeAdjustStep
665+
t.startTime = t.startTime.Add(-applied)
666+
t.totalStartTimeAdjustment += applied
667+
t.startTimeAdjustResidual = residual
668+
return applied
669+
}
670+
671+
t.startTimeAdjustResidual = deltaTotal
672+
return 0
673+
}
674+
655675
func (t *TrackSynchronizer) MarshalLogObject(e zapcore.ObjectEncoder) error {
656676
if t == nil {
657677
return nil

pkg/synchronizer/track_test.go

Lines changed: 166 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,166 @@
1+
// Copyright 2025 LiveKit, Inc.
2+
//
3+
// Licensed under the Apache License, Version 2.0 (the "License");
4+
// you may not use this file except in compliance with the License.
5+
// You may obtain a copy of the License at
6+
//
7+
// http://www.apache.org/licenses/LICENSE-2.0
8+
//
9+
// Unless required by applicable law or agreed to in writing, software
10+
// distributed under the License is distributed on an "AS IS" BASIS,
11+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
// See the License for the specific language governing permissions and
13+
// limitations under the License.
14+
15+
package synchronizer
16+
17+
import (
18+
"testing"
19+
"time"
20+
21+
"github.com/pion/rtp"
22+
"github.com/pion/webrtc/v4"
23+
"github.com/stretchr/testify/require"
24+
25+
"github.com/livekit/media-sdk/jitter"
26+
"github.com/livekit/protocol/logger"
27+
"github.com/livekit/protocol/utils/mono"
28+
)
29+
30+
// ---- test fakes & helpers ----
31+
32+
type fakeTrack struct {
33+
id string
34+
rate uint32
35+
kind webrtc.RTPCodecType
36+
}
37+
38+
func (f fakeTrack) ID() string { return f.id }
39+
func (f fakeTrack) Codec() webrtc.RTPCodecParameters {
40+
return webrtc.RTPCodecParameters{RTPCodecCapability: webrtc.RTPCodecCapability{ClockRate: f.rate}}
41+
}
42+
func (f fakeTrack) Kind() webrtc.RTPCodecType { return f.kind }
43+
func (f fakeTrack) SSRC() webrtc.SSRC { return 1234 }
44+
45+
func newTSForTests(tc *testing.T, clockRate uint32, kind webrtc.RTPCodecType) *TrackSynchronizer {
46+
t := &TrackSynchronizer{
47+
sync: nil, // construct directly to avoid depending on Synchronizer
48+
track: fakeTrack{id: "t", rate: clockRate, kind: kind},
49+
logger: logger.NewTestLogger(tc),
50+
rtpConverter: newRTPConverter(int64(clockRate)),
51+
maxTsDiff: 200 * time.Millisecond,
52+
maxDriftAdjustment: 5 * time.Millisecond,
53+
}
54+
// set a stable startTime well in the past to make time.Since(startTime) > 0
55+
t.startTime = time.Now().Add(-150 * time.Millisecond)
56+
t.initTime = t.startTime
57+
// pick an arbitrary RTP base
58+
t.startRTP = 1000
59+
return t
60+
}
61+
62+
// ---- tests ----
63+
64+
func TestApplyQuantizedStartTimeAdvance_ExactQuanta(t *testing.T) {
65+
ts := newTSForTests(t, 48000, webrtc.RTPCodecTypeAudio)
66+
base := ts.startTime
67+
68+
// 25ms delta with 5ms step -> apply 25ms, residual 0
69+
applied := ts.applyQuantizedStartTimeAdvance(25 * time.Millisecond)
70+
require.Equal(t, 25*time.Millisecond, applied)
71+
require.Equal(t, 25*time.Millisecond, base.Sub(ts.startTime))
72+
require.Equal(t, time.Duration(0), ts.startTimeAdjustResidual)
73+
require.Equal(t, 25*time.Millisecond, ts.totalStartTimeAdjustment)
74+
}
75+
76+
func TestApplyQuantizedStartTimeAdvance_ResidualCarryAcrossCalls(t *testing.T) {
77+
ts := newTSForTests(t, 48000, webrtc.RTPCodecTypeAudio)
78+
base := ts.startTime
79+
80+
// First call: 3ms (<5ms step) -> apply 0, residual=3ms
81+
applied1 := ts.applyQuantizedStartTimeAdvance(3 * time.Millisecond)
82+
require.Equal(t, time.Duration(0), applied1)
83+
require.Equal(t, 3*time.Millisecond, ts.startTimeAdjustResidual)
84+
require.Equal(t, ts.startTime, base)
85+
require.Equal(t, 3*time.Millisecond, ts.startTimeAdjustResidual)
86+
}
87+
88+
func TestApplyQuantizedStartTimeAdvance_NoOpForZero(t *testing.T) {
89+
ts := newTSForTests(t, 48000, webrtc.RTPCodecTypeAudio)
90+
base := ts.startTime
91+
92+
applied := ts.applyQuantizedStartTimeAdvance(0)
93+
require.Equal(t, time.Duration(0), applied)
94+
require.Equal(t, ts.startTime, base)
95+
require.Equal(t, time.Duration(0), ts.startTimeAdjustResidual)
96+
}
97+
98+
func TestGetPTSWithoutRebase_Increasing(t *testing.T) {
99+
clock := uint32(48000)
100+
ts := newTSForTests(t, clock, webrtc.RTPCodecTypeAudio)
101+
102+
// Simulate accepting two frames in order: 20ms and then 20ms later
103+
// Convert 20ms -> RTP ticks
104+
rtp20ms := ts.rtpConverter.toRTP(20 * time.Millisecond)
105+
106+
now := time.Now()
107+
// First packet initializes lastTS path
108+
ts.lastTS = 0
109+
ts.lastPTS = 0
110+
111+
adj1, err := ts.getPTSWithoutRebase(jitter.ExtPacket{
112+
Packet: &rtp.Packet{Header: rtp.Header{Timestamp: ts.startRTP + rtp20ms}},
113+
ReceivedAt: now,
114+
})
115+
require.NoError(t, err)
116+
117+
adj2, err := ts.getPTSWithoutRebase(jitter.ExtPacket{
118+
Packet: &rtp.Packet{Header: rtp.Header{Timestamp: ts.startRTP + 2*rtp20ms}},
119+
ReceivedAt: now,
120+
})
121+
require.NoError(t, err)
122+
123+
require.Greater(t, adj2, adj1)
124+
}
125+
126+
func TestGetPTSWithRebase_PropelsForward(t *testing.T) {
127+
clock := uint32(48000)
128+
ts := newTSForTests(t, clock, webrtc.RTPCodecTypeAudio)
129+
ts.rtcpSenderReportRebaseEnabled = true
130+
131+
ts.maxTsDiff = 30 * time.Millisecond
132+
133+
// 1) Seed: make adjusted ~500ms on the first packet.
134+
ts.startTime = mono.Now().Add(-500 * time.Millisecond)
135+
ts.currentPTSOffset = 0
136+
ts.lastPTS = 0
137+
ts.startRTP = 100000
138+
ts.lastTS = ts.startRTP
139+
140+
rtp500ms := ts.rtpConverter.toRTP(500 * time.Millisecond)
141+
rtp10ms := ts.rtpConverter.toRTP(10 * time.Millisecond)
142+
143+
// First packet (~500ms)
144+
ts1 := ts.startRTP + rtp500ms
145+
adj1, err := ts.getPTSWithRebase(jitter.ExtPacket{
146+
Packet: &rtp.Packet{Header: rtp.Header{Timestamp: ts1}},
147+
ReceivedAt: time.Time{}, // not used for estimatedPTS here
148+
})
149+
require.NoError(t, err)
150+
require.InDelta(t, 500*time.Millisecond, adj1, float64(20*time.Millisecond))
151+
152+
// 2) Simulate startTime shift LATER (closer to now) so next estimatedPTS is tiny (~5–10ms)
153+
ts.startTime = mono.Now().Add(-5 * time.Millisecond)
154+
155+
// Second packet: +10ms RTP so ts != lastTS. After correction, adjusted will be tiny and < lastPTSAdjusted.
156+
ts2 := ts1 + rtp10ms
157+
prev := ts.lastPTSAdjusted // ~500ms from first call
158+
want := prev + time.Millisecond // propel to ~501ms
159+
160+
adj2, err := ts.getPTSWithRebase(jitter.ExtPacket{
161+
Packet: &rtp.Packet{Header: rtp.Header{Timestamp: ts2}},
162+
ReceivedAt: time.Time{},
163+
})
164+
require.NoError(t, err)
165+
require.Equal(t, want, adj2)
166+
}

0 commit comments

Comments
 (0)