@@ -38,6 +38,18 @@ impl VpcMapName {
38
38
}
39
39
}
40
40
41
+ /// Compute overlap in nanoseconds between [a_start, a_end] and [b_start, b_end].
42
+ #[ inline]
43
+ fn overlap_nanos ( a_start : Instant , a_end : Instant , b_start : Instant , b_end : Instant ) -> u128 {
44
+ let start = if a_start > b_start { a_start } else { b_start } ;
45
+ let end = if a_end < b_end { a_end } else { b_end } ;
46
+ if end > start {
47
+ end. duration_since ( start) . as_nanos ( )
48
+ } else {
49
+ 0
50
+ }
51
+ }
52
+
41
53
/// A `StatsCollector` is responsible for collecting and aggregating packet statistics for a
42
54
/// collection of workers running packet processing pipelines on various threads.
43
55
#[ derive( Debug ) ]
@@ -50,6 +62,7 @@ pub struct StatsCollector {
50
62
/// Filter for batches which have been submitted to the `submitted` filter. This filter is
51
63
/// used to calculate rates.
52
64
submitted : SavitzkyGolayFilter < hashbrown:: HashMap < VpcDiscriminant , TransmitSummary < u64 > > > ,
65
+ /// Running cumulative totals used to produce monotonic series into the SG derivative filter.
53
66
cumulative_totals : hashbrown:: HashMap < VpcDiscriminant , TransmitSummary < u64 > > ,
54
67
/// Reader for the VPC map. This reader is used to determine the VPCs that are currently
55
68
/// known to the system.
@@ -173,38 +186,91 @@ impl StatsCollector {
173
186
}
174
187
} )
175
188
. collect ( ) ;
189
+
190
+ // Proportionally distribute each (src,dst) update across overlapping batches.
176
191
update. summary . vpc . iter ( ) . for_each ( |( src, summary) | {
177
192
summary. dst . iter ( ) . for_each ( |( dst, stats) | {
178
- slices. iter_mut ( ) . for_each ( |batch| {
179
- // TODO: this can be much more efficient
180
- let SplitCount {
181
- inside : packets, ..
182
- } = batch. split_count ( & update, stats. packets ) ;
183
- let SplitCount { inside : bytes, .. } =
184
- batch. split_count ( & update, stats. bytes ) ;
185
- let stats = PacketAndByte { packets, bytes } ;
186
- if packets == 0 && bytes == 0 {
187
- return ;
193
+ if stats. packets == 0 && stats. bytes == 0 {
194
+ return ;
195
+ }
196
+
197
+ let upd_start = update. summary . start ;
198
+ let upd_end = update. start ( ) + update. duration ;
199
+
200
+ // Pre-compute overlaps with all candidate batch slices
201
+ let overlaps: Vec < u128 > = slices
202
+ . iter ( )
203
+ . map ( |b| overlap_nanos ( b. start , b. planned_end , upd_start, upd_end) )
204
+ . collect ( ) ;
205
+ let total_ov: u128 = overlaps. iter ( ) . copied ( ) . sum ( ) ;
206
+ if total_ov == 0 {
207
+ return ;
208
+ }
209
+
210
+ // Integer-safe split: give the remainder to the last overlapping bucket
211
+ let mut rem_pkts = stats. packets ;
212
+ let mut rem_bytes = stats. bytes ;
213
+
214
+ let last_idx = overlaps
215
+ . iter ( )
216
+ . enumerate ( )
217
+ . rfind ( |& ( _, & ov) | ov > 0 )
218
+ . map ( |( i, _) | i) ;
219
+
220
+ for ( i, batch) in slices. iter_mut ( ) . enumerate ( ) {
221
+ let ov = overlaps[ i] ;
222
+ if ov == 0 {
223
+ continue ;
224
+ }
225
+
226
+ let is_last = Some ( i) == last_idx;
227
+
228
+ let pkts_in = if is_last {
229
+ rem_pkts
230
+ } else {
231
+ let v = ( ( stats. packets as u128 ) * ov / total_ov) as u64 ;
232
+ // track remainder
233
+ rem_pkts = rem_pkts. saturating_sub ( v) ;
234
+ v
235
+ } ;
236
+
237
+ let bytes_in = if is_last {
238
+ rem_bytes
239
+ } else {
240
+ let v = ( ( stats. bytes as u128 ) * ov / total_ov) as u64 ;
241
+ rem_bytes = rem_bytes. saturating_sub ( v) ;
242
+ v
243
+ } ;
244
+
245
+ if pkts_in == 0 && bytes_in == 0 {
246
+ continue ;
188
247
}
248
+
249
+ let apportioned = PacketAndByte {
250
+ packets : pkts_in,
251
+ bytes : bytes_in,
252
+ } ;
253
+
189
254
match batch. vpc . get_mut ( src) {
190
255
None => {
191
256
let mut tx_summary = TransmitSummary :: new ( ) ;
192
- tx_summary. dst . insert ( * dst, stats ) ;
257
+ tx_summary. dst . insert ( * dst, apportioned ) ;
193
258
batch. vpc . insert ( * src, tx_summary) ;
194
259
}
195
260
Some ( tx_summary) => match tx_summary. dst . get_mut ( dst) {
196
261
None => {
197
- tx_summary. dst . insert ( * dst, stats ) ;
262
+ tx_summary. dst . insert ( * dst, apportioned ) ;
198
263
}
199
264
Some ( s) => {
200
- * s += stats ;
265
+ * s += apportioned ;
201
266
}
202
267
} ,
203
268
}
204
- } )
269
+ }
205
270
} ) ;
206
271
} ) ;
207
272
}
273
+
208
274
let current_time = Instant :: now ( ) ;
209
275
let mut expired = self
210
276
. outstanding
@@ -258,26 +324,27 @@ impl StatsCollector {
258
324
}
259
325
} ) ;
260
326
} ) ;
261
- //self.submitted.push(concluded.vpc);
327
+
328
+ // Update cumulative totals from the *concluded* batch (apportioned already)
262
329
for ( & src, tx_summary) in concluded. vpc . iter ( ) {
263
330
let totals = self
264
331
. cumulative_totals
265
332
. entry ( src)
266
333
. or_insert_with ( TransmitSummary :: new) ;
267
-
334
+
268
335
for ( & dst, & stats) in tx_summary. dst . iter ( ) {
269
336
match totals. dst . get_mut ( & dst) {
270
337
Some ( entry) => {
271
338
entry. packets = entry. packets . saturating_add ( stats. packets ) ;
272
- entry. bytes = entry. bytes . saturating_add ( stats. bytes ) ;
339
+ entry. bytes = entry. bytes . saturating_add ( stats. bytes ) ;
273
340
}
274
341
None => {
275
342
totals. dst . insert ( dst, stats) ;
276
343
}
277
344
}
278
345
}
279
346
}
280
-
347
+
281
348
// Push the cumulative snapshot into the SG derivative filter
282
349
debug ! ( "sg snapshot: {:?}" , self . cumulative_totals) ;
283
350
self . submitted . push ( self . cumulative_totals . clone ( ) ) ;
0 commit comments