File tree Expand file tree Collapse file tree 1 file changed +29
-1
lines changed Expand file tree Collapse file tree 1 file changed +29
-1
lines changed Original file line number Diff line number Diff line change 4
4
5
5
//! Implements a packet stats sink.
6
6
7
+ use crate :: rate:: Derivative ;
7
8
use net:: packet:: Packet ;
8
9
use pipeline:: NetworkFunction ;
9
10
@@ -257,7 +258,34 @@ impl StatsCollector {
257
258
} ) ;
258
259
self . submitted . push ( concluded. vpc ) ;
259
260
260
- // TODO: add in rate calculations
261
+ let filters_by_src: hashbrown:: HashMap <
262
+ VpcDiscriminant ,
263
+ TransmitSummary < SavitzkyGolayFilter < u64 > > ,
264
+ > = ( & self . submitted ) . into ( ) ;
265
+ if let Ok ( rates_by_src) =
266
+ <hashbrown:: HashMap < _ , TransmitSummary < SavitzkyGolayFilter < u64 > > > >:: derivative (
267
+ & filters_by_src,
268
+ )
269
+ {
270
+ rates_by_src. iter ( ) . for_each ( |( & src, tx_summary) | {
271
+ let metrics = match self . metrics . get ( & src) {
272
+ None => {
273
+ warn ! ( "lost metrics for src {src}" ) ;
274
+ return ;
275
+ }
276
+ Some ( metrics) => metrics,
277
+ } ;
278
+ tx_summary. dst . iter ( ) . for_each ( |( dst, rate) | {
279
+ if let Some ( action) = metrics. peering . get ( dst) {
280
+ action. tx . packet . rate . metric . set ( rate. packets ) ;
281
+ action. tx . byte . rate . metric . set ( rate. bytes ) ;
282
+ } else {
283
+ warn ! ( "lost metrics for src {src} to dst {dst}" ) ;
284
+ }
285
+ } ) ;
286
+ } ) ;
287
+ }
288
+
261
289
// TODO: add in drop metrics
262
290
}
263
291
}
You can’t perform that action at this time.
0 commit comments