Skip to content

Commit d41bafa

Browse files
authored
Add stats for packets and sender reports (#751)
1 parent 6d4fab5 commit d41bafa

File tree

1 file changed

+32
-5
lines changed

1 file changed

+32
-5
lines changed

pkg/synchronizer/track.go

Lines changed: 32 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -91,6 +91,15 @@ type TrackSynchronizer struct {
9191
propagationDelayEstimator *OWDEstimator
9292
totalStartTimeAdjustment time.Duration
9393
startTimeAdjustResidual time.Duration
94+
95+
// packet stats
96+
numEmitted uint32
97+
numDroppedOld uint32
98+
numDroppedOutOfOrder uint32
99+
numDroppedEOF uint32
100+
101+
// sender report stats
102+
numSenderReports uint32
94103
}
95104

96105
func newTrackSynchronizer(s *Synchronizer, track TrackRemote) *TrackSynchronizer {
@@ -182,6 +191,7 @@ func (t *TrackSynchronizer) getPTSWithoutRebase(pkt jitter.ExtPacket) (time.Dura
182191
// if first packet of a frame was accepted,
183192
// accept all packets of the frame even if they are old
184193
if ts == t.lastTS {
194+
t.numEmitted++
185195
return t.lastPTSAdjusted, nil
186196
}
187197

@@ -190,6 +200,7 @@ func (t *TrackSynchronizer) getPTSWithoutRebase(pkt jitter.ExtPacket) (time.Dura
190200
// drop all packets of the frame irrespective of whether they are old or not
191201
if ts == t.lastTSOldDropped || t.isPacketTooOld(pkt.ReceivedAt) {
192202
t.lastTSOldDropped = ts
203+
t.numDroppedOld++
193204
t.logger.Infow(
194205
"dropping old packet",
195206
"currentTS", ts,
@@ -259,6 +270,7 @@ func (t *TrackSynchronizer) getPTSWithoutRebase(pkt jitter.ExtPacket) (time.Dura
259270

260271
// if past end time, return EOF
261272
if t.maxPTS > 0 && (adjusted > t.maxPTS) {
273+
t.numDroppedEOF++
262274
return 0, io.EOF
263275
}
264276

@@ -268,6 +280,7 @@ func (t *TrackSynchronizer) getPTSWithoutRebase(pkt jitter.ExtPacket) (time.Dura
268280
t.lastPTS = pts
269281
t.lastPTSAdjusted = adjusted
270282

283+
t.numEmitted++
271284
return adjusted, nil
272285
}
273286

@@ -288,11 +301,13 @@ func (t *TrackSynchronizer) getPTSWithRebase(pkt jitter.ExtPacket) (time.Duratio
288301
// if first packet of a frame was accepted,
289302
// accept all packets of the frame even if they are old
290303
if ts == t.lastTS {
304+
t.numEmitted++
291305
return t.lastPTSAdjusted, nil
292306
}
293307

294308
// packets are expected in order, just a safety net
295309
if (ts - t.lastTS) > (1 << 31) {
310+
t.numDroppedOutOfOrder++
296311
t.logger.Infow(
297312
"dropping out-of-order packet",
298313
"currentTS", ts,
@@ -305,6 +320,7 @@ func (t *TrackSynchronizer) getPTSWithRebase(pkt jitter.ExtPacket) (time.Duratio
305320
// drop all packets of the frame irrespective of whether they are old or not
306321
if ts == t.lastTSOldDropped || t.isPacketTooOld(pkt.ReceivedAt) {
307322
t.lastTSOldDropped = ts
323+
t.numDroppedOld++
308324
t.logger.Infow(
309325
"dropping old packet",
310326
"currentTS", ts,
@@ -385,6 +401,7 @@ func (t *TrackSynchronizer) getPTSWithRebase(pkt jitter.ExtPacket) (time.Duratio
385401

386402
// if past end time, return EOF
387403
if t.maxPTS > 0 && (adjusted > t.maxPTS) {
404+
t.numDroppedEOF++
388405
return 0, io.EOF
389406
}
390407

@@ -394,6 +411,7 @@ func (t *TrackSynchronizer) getPTSWithRebase(pkt jitter.ExtPacket) (time.Duratio
394411
t.lastPTS = pts
395412
t.lastPTSAdjusted = adjusted
396413

414+
t.numEmitted++
397415
return adjusted, nil
398416
}
399417

@@ -419,22 +437,24 @@ func (t *TrackSynchronizer) onSenderReportWithoutRebase(pkt *rtcp.SenderReport)
419437
receivedAt: mono.UnixNano(),
420438
}
421439
if t.lastSR != nil && ((t.lastSR.RTPTime != 0 && (pkt.RTPTime-t.lastSR.RTPTime) > (1<<31)) || pkt.RTPTime == t.lastSR.RTPTime) {
422-
t.logger.Debugw(
440+
t.logger.Infow(
423441
"dropping duplicate or out-of-order sender report",
424442
"receivedSR", wrappedAugmentedSenderReportLogger{augmented},
425443
"state", t,
426444
)
427445
return
428446
}
429447

448+
t.numSenderReports++
449+
430450
var pts time.Duration
431451
if pkt.RTPTime > t.lastTS {
432452
pts = t.lastPTS + t.toDuration(pkt.RTPTime-t.lastTS)
433453
} else {
434454
pts = t.lastPTS - t.toDuration(t.lastTS-pkt.RTPTime)
435455
}
436456
if !t.acceptable(pts - time.Since(t.startTime)) {
437-
t.logger.Debugw(
457+
t.logger.Infow(
438458
"ignoring sender report with unacceptable offset",
439459
"receivedSR", wrappedAugmentedSenderReportLogger{augmented},
440460
"state", t,
@@ -488,22 +508,24 @@ func (t *TrackSynchronizer) onSenderReportWithRebase(pkt *rtcp.SenderReport) {
488508
}
489509

490510
if t.lastSR != nil && ((t.lastSR.RTPTime != 0 && (pkt.RTPTime-t.lastSR.RTPTime) > (1<<31)) || pkt.RTPTime == t.lastSR.RTPTime) {
491-
t.logger.Debugw(
511+
t.logger.Infow(
492512
"dropping duplicate or out-of-order sender report",
493513
"receivedSR", wrappedAugmentedSenderReportLogger{augmented},
494514
"state", t,
495515
)
496516
return
497517
}
498518

519+
t.numSenderReports++
520+
499521
var ptsSR time.Duration
500522
if (pkt.RTPTime - t.lastTS) < (1 << 31) {
501523
ptsSR = t.lastPTS + t.toDuration(pkt.RTPTime-t.lastTS)
502524
} else {
503525
ptsSR = t.lastPTS - t.toDuration(t.lastTS-pkt.RTPTime)
504526
}
505527
if !t.acceptable(ptsSR - time.Since(t.startTime)) {
506-
t.logger.Debugw(
528+
t.logger.Infow(
507529
"ignoring sender report with unacceptable offset",
508530
"receivedSR", wrappedAugmentedSenderReportLogger{augmented},
509531
"state", t,
@@ -627,7 +649,7 @@ func (t *TrackSynchronizer) maybeAdjustStartTime(asr *augmentedSenderReport) int
627649
)
628650
} else {
629651
applied := t.applyQuantizedStartTimeAdvance(time.Duration(startTimeNano - adjustedStartTimeNano))
630-
t.logger.Debugw("adjusting start time", append(getLoggingFields(), "applied", applied)...)
652+
t.logger.Infow("adjusting start time", append(getLoggingFields(), "applied", applied)...)
631653
}
632654
}
633655

@@ -702,6 +724,11 @@ func (t *TrackSynchronizer) MarshalLogObject(e zapcore.ObjectEncoder) error {
702724
e.AddTime("nextPTSAdjustmentAt", t.nextPTSAdjustmentAt)
703725
e.AddObject("propagationDelayEstimator", t.propagationDelayEstimator)
704726
e.AddDuration("totalStartTimeAdjustment", t.totalStartTimeAdjustment)
727+
e.AddUint32("numEmitted", t.numEmitted)
728+
e.AddUint32("numDroppedOld", t.numDroppedOld)
729+
e.AddUint32("numDroppedOutOfOrder", t.numDroppedOutOfOrder)
730+
e.AddUint32("numDroppedEOF", t.numDroppedEOF)
731+
e.AddUint32("numSenderReports", t.numSenderReports)
705732
return nil
706733
}
707734

0 commit comments

Comments
 (0)