@@ -17,12 +17,14 @@ use std::collections::HashMap;
1717use std:: future:: Future ;
1818use std:: pin:: Pin ;
1919use std:: sync:: Arc ;
20+ use std:: time:: Duration ;
2021use tokio:: sync:: Mutex ;
2122
2223/// GeneratorBuilder can be used to configure Responder Interceptor
2324#[ derive( Default ) ]
2425pub struct ResponderBuilder {
2526 log2_size : Option < u8 > ,
27+ max_packet_age : Option < Duration > ,
2628}
2729
2830impl ResponderBuilder {
@@ -32,6 +34,15 @@ impl ResponderBuilder {
3234 self . log2_size = Some ( log2_size) ;
3335 self
3436 }
37+
38+ /// with_max_packet_age sets the max age of packets that will be resent.
39+ ///
40+ /// When a resend is requested, packets that were first sent more than `max_packet_age` ago
41+ /// will not be resent.
42+ pub fn with_max_packet_age ( mut self , max_packet_age : Duration ) -> ResponderBuilder {
43+ self . max_packet_age = Some ( max_packet_age) ;
44+ self
45+ }
3546}
3647
3748impl InterceptorBuilder for ResponderBuilder {
@@ -43,6 +54,7 @@ impl InterceptorBuilder for ResponderBuilder {
4354 } else {
4455 13 // 8192 = 1 << 13
4556 } ,
57+ max_packet_age : self . max_packet_age ,
4658 streams : Arc :: new ( Mutex :: new ( HashMap :: new ( ) ) ) ,
4759 } ) ,
4860 } ) )
@@ -51,13 +63,15 @@ impl InterceptorBuilder for ResponderBuilder {
5163
5264pub struct ResponderInternal {
5365 log2_size : u8 ,
66+ max_packet_age : Option < Duration > ,
5467 streams : Arc < Mutex < HashMap < u32 , Arc < ResponderStream > > > > ,
5568}
5669
5770impl ResponderInternal {
5871 async fn resend_packets (
5972 streams : Arc < Mutex < HashMap < u32 , Arc < ResponderStream > > > > ,
6073 nack : TransportLayerNack ,
74+ max_packet_age : Option < Duration > ,
6175 ) {
6276 let stream = {
6377 let m = streams. lock ( ) . await ;
@@ -73,10 +87,19 @@ impl ResponderInternal {
7387 n. range ( Box :: new (
7488 move |seq : u16 | -> Pin < Box < dyn Future < Output = bool > + Send + ' static > > {
7589 let stream3 = Arc :: clone ( & stream2) ;
90+
7691 Box :: pin ( async move {
7792 if let Some ( p) = stream3. get ( seq) . await {
93+ let should_send = max_packet_age
94+ . map ( |max_age| p. age ( ) < max_age)
95+ . unwrap_or ( true ) ;
96+
97+ if !should_send {
98+ return true ;
99+ }
100+
78101 let a = Attributes :: new ( ) ;
79- if let Err ( err) = stream3. next_rtp_writer . write ( & p, & a) . await {
102+ if let Err ( err) = stream3. next_rtp_writer . write ( & p. packet , & a) . await {
80103 log:: warn!( "failed resending nacked packet: {}" , err) ;
81104 }
82105 }
@@ -92,6 +115,7 @@ impl ResponderInternal {
92115
93116pub struct ResponderRtcpReader {
94117 parent_rtcp_reader : Arc < dyn RTCPReader + Send + Sync > ,
118+ max_packet_age : Option < Duration > ,
95119 internal : Arc < ResponderInternal > ,
96120}
97121
@@ -106,8 +130,9 @@ impl RTCPReader for ResponderRtcpReader {
106130 if let Some ( nack) = p. as_any ( ) . downcast_ref :: < TransportLayerNack > ( ) {
107131 let nack = nack. clone ( ) ;
108132 let streams = Arc :: clone ( & self . internal . streams ) ;
133+ let max_packet_age = self . max_packet_age ;
109134 tokio:: spawn ( async move {
110- ResponderInternal :: resend_packets ( streams, nack) . await ;
135+ ResponderInternal :: resend_packets ( streams, nack, max_packet_age ) . await ;
111136 } ) ;
112137 }
113138 }
@@ -138,6 +163,7 @@ impl Interceptor for Responder {
138163 ) -> Arc < dyn RTCPReader + Send + Sync > {
139164 Arc :: new ( ResponderRtcpReader {
140165 internal : Arc :: clone ( & self . internal ) ,
166+ max_packet_age : self . internal . max_packet_age ,
141167 parent_rtcp_reader : reader,
142168 } ) as Arc < dyn RTCPReader + Send + Sync >
143169 }
0 commit comments