Skip to content

Commit d02c903

Browse files
committed
feat(stats): wip: smoothing instead of derivative
Signed-off-by: Sergey Matov <[email protected]>
1 parent eb16a28 commit d02c903

File tree

2 files changed

+172
-41
lines changed

2 files changed

+172
-41
lines changed

stats/src/dpstats.rs

Lines changed: 24 additions & 41 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,8 @@
11
// SPDX-License-Identifier: Apache-2.0
22
// Copyright Open Network Fabric Authors
3+
//
34

4-
use crate::rate::Derivative;
5+
use crate::rate::{HashMapSmoothing, SavitzkyGolayFilter};
56
use net::packet::Packet;
67
use pipeline::NetworkFunction;
78

@@ -11,7 +12,6 @@ use std::time::{Duration, Instant};
1112
use vpcmap::VpcDiscriminant;
1213
use vpcmap::map::VpcMapReader;
1314

14-
use crate::rate::SavitzkyGolayFilter;
1515
use crate::{RegisteredVpcMetrics, Specification, VpcMetricsSpec};
1616
use net::buffer::PacketBufferMut;
1717
use rand::RngCore;
@@ -54,13 +54,11 @@ pub struct StatsCollector {
5454
/// metrics maps known VpcDiscriminants to their metrics
5555
metrics: hashbrown::HashMap<VpcDiscriminant, RegisteredVpcMetrics>,
5656
/// Outstanding (i.e., not yet submitted) batches. These batches will eventually be collected
57-
/// in to the `submitted` filter in order to calculate rates.
57+
/// in to the `submitted` filter in order to calculate smoothed rates.
5858
outstanding: VecDeque<BatchSummary<u64>>,
59-
/// Filter for batches which have been submitted to the `submitted` filter. This filter is
60-
/// used to calculate rates.
59+
/// Filter for batches which have been submitted; used to calculate smoothed pps/Bps.
60+
/// We push *apportioned per-batch counts* here; with TIME_TICK=1s, smoothing(counts) ≈ smoothing(pps).
6161
submitted: SavitzkyGolayFilter<hashbrown::HashMap<VpcDiscriminant, TransmitSummary<u64>>>,
62-
/// Running cumulative totals used to produce monotonic series into the SG derivative filter.
63-
cumulative_totals: hashbrown::HashMap<VpcDiscriminant, TransmitSummary<u64>>,
6462
/// Reader for the VPC map. This reader is used to determine the VPCs that are currently
6563
/// known to the system.
6664
vpcmap_r: VpcMapReader<VpcMapName>,
@@ -104,7 +102,6 @@ impl StatsCollector {
104102
metrics,
105103
outstanding,
106104
submitted: SavitzkyGolayFilter::new(Self::TIME_TICK),
107-
cumulative_totals: hashbrown::HashMap::new(),
108105
vpcmap_r,
109106
updates,
110107
};
@@ -166,7 +163,7 @@ impl StatsCollector {
166163
}
167164
}
168165

169-
/// Calculate updated stats and submit any expired entries to the rate filter.
166+
/// Calculate updated stats and submit any expired entries to the SG filter.
170167
#[tracing::instrument(level = "trace")]
171168
fn update(&mut self, update: Option<MetricsUpdate>) {
172169
if let Some(update) = update {
@@ -283,7 +280,7 @@ impl StatsCollector {
283280
}
284281
}
285282

286-
/// Submit a concluded set of stats for inclusion in rate calculations
283+
/// Submit a concluded set of stats for inclusion in smoothing calculations
287284
#[tracing::instrument(level = "trace")]
288285
fn submit_expired(&mut self, concluded: BatchSummary<u64>) {
289286
const CAPACITY_PADDING: usize = 16;
@@ -294,11 +291,13 @@ impl StatsCollector {
294291
.last()
295292
.unwrap_or_else(|| unreachable!())
296293
.planned_end;
297-
let duration = Duration::from_secs(1);
294+
let duration = Self::TIME_TICK;
298295
self.outstanding
299296
.push_back(BatchSummary::with_start_and_capacity(
300297
start, duration, capacity,
301298
));
299+
300+
// Update raw packet/byte COUNTS for “total” metrics (monotonic counters)
302301
concluded.vpc.iter().for_each(|(&src, tx_summary)| {
303302
let metrics = match self.metrics.get(&src) {
304303
None => {
@@ -321,40 +320,18 @@ impl StatsCollector {
321320
});
322321
});
323322

324-
// Update cumulative totals from the *concluded* batch (apportioned already)
325-
for (&src, tx_summary) in concluded.vpc.iter() {
326-
let totals = self
327-
.cumulative_totals
328-
.entry(src)
329-
.or_insert_with(TransmitSummary::new);
330-
331-
for (&dst, &stats) in tx_summary.dst.iter() {
332-
match totals.dst.get_mut(&dst) {
333-
Some(entry) => {
334-
entry.packets = entry.packets.saturating_add(stats.packets);
335-
entry.bytes = entry.bytes.saturating_add(stats.bytes);
336-
}
337-
None => {
338-
totals.dst.insert(dst, stats);
339-
}
340-
}
341-
}
342-
}
343-
344-
// Push the cumulative snapshot into the SG derivative filter
345-
debug!("sg snapshot: {:?}", self.cumulative_totals);
346-
self.submitted.push(self.cumulative_totals.clone());
323+
// Push this *apportioned per-batch* snapshot into the SG window.
324+
// With TIME_TICK=1s, smoothing these counts ≈ smoothing pps/Bps directly.
325+
self.submitted.push(concluded.vpc);
347326

327+
// Build per-source filters and smooth.
348328
let filters_by_src: hashbrown::HashMap<
349329
VpcDiscriminant,
350330
TransmitSummary<SavitzkyGolayFilter<u64>>,
351331
> = (&self.submitted).into();
352-
if let Ok(rates_by_src) =
353-
<hashbrown::HashMap<_, TransmitSummary<SavitzkyGolayFilter<u64>>>>::derivative(
354-
&filters_by_src,
355-
)
356-
{
357-
rates_by_src.iter().for_each(|(&src, tx_summary)| {
332+
333+
if let Ok(smoothed_by_src) = filters_by_src.smooth() {
334+
smoothed_by_src.iter().for_each(|(&src, tx_summary)| {
358335
let metrics = match self.metrics.get(&src) {
359336
None => {
360337
warn!("lost metrics for src {src}");
@@ -364,14 +341,20 @@ impl StatsCollector {
364341
};
365342
tx_summary.dst.iter().for_each(|(dst, rate)| {
366343
if let Some(action) = metrics.peering.get(dst) {
344+
// Smoothed packets-per-second / bytes-per-second (since tick=1s)
367345
action.tx.packet.rate.metric.set(rate.packets);
368346
action.tx.byte.rate.metric.set(rate.bytes);
369-
debug!("set rate for src {src} to dst {dst}: {:?}", rate);
347+
trace!(
348+
"smoothed rate src={:?} dst={:?}: pps={:.3} Bps={:.3}",
349+
src, dst, rate.packets, rate.bytes
350+
);
370351
} else {
371352
warn!("lost metrics for src {src} to dst {dst}");
372353
}
373354
});
374355
});
356+
} else {
357+
trace!("Not enough samples yet for smoothing");
375358
}
376359

377360
// TODO: add in drop metrics

stats/src/rate.rs

Lines changed: 148 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,21 @@ pub trait Derivative {
2020
fn derivative(&self) -> Result<Self::Output, Self::Error>;
2121
}
2222

23+
/// A simple trait for computing a smoothed (denoised) value of a time series window.
24+
/// For Savitzky–Golay we use the 5-point (window=5) smoothing polynomial (order=2) coefficients.
25+
pub trait Smooth {
26+
type Error;
27+
type Output;
28+
fn smooth(&self) -> Result<Self::Output, Self::Error>;
29+
}
30+
31+
/// Allows smoothing a map of smootheable values (mirrors the HashMap <-> Derivative pattern).
32+
pub trait HashMapSmoothing {
33+
type Error;
34+
type Output;
35+
fn smooth(&self) -> Result<Self::Output, Self::Error>;
36+
}
37+
2338
/// A filter for computing the derivative of a series of data points.
2439
///
2540
/// This method uses the so-called 5-point stencil or [Savitzky-Golay filter](https://en.wikipedia.org/wiki/Savitzky%E2%80%93Golay_filter) formula for
@@ -461,6 +476,139 @@ impl<T> ExponentiallyWeightedMovingAverage<T> {
461476
}
462477
}
463478

479+
/* ---------------------- Smoothing implementations (SG 0th order) ---------------------- */
480+
481+
impl Smooth for SavitzkyGolayFilter<u64> {
482+
type Error = DerivativeError;
483+
type Output = f64;
484+
485+
fn smooth(&self) -> Result<f64, DerivativeError> {
486+
const SAMPLES: usize = 5;
487+
const COEFFS: [i64; SAMPLES] = [-3, 12, 17, 12, -3]; // / 35
488+
const DEN: f64 = 35.0;
489+
490+
let len = self.data.len();
491+
if len < SAMPLES {
492+
return Err(DerivativeError::NotEnoughSamples(len));
493+
}
494+
debug_assert!(len == SAMPLES);
495+
496+
let mut itr = self.data.iter().cycle().skip(self.idx).copied();
497+
let data: [u64; SAMPLES] = [
498+
itr.next().unwrap_or_else(|| unreachable!()),
499+
itr.next().unwrap_or_else(|| unreachable!()),
500+
itr.next().unwrap_or_else(|| unreachable!()),
501+
itr.next().unwrap_or_else(|| unreachable!()),
502+
itr.next().unwrap_or_else(|| unreachable!()),
503+
];
504+
505+
// Use signed accumulator to handle negative edge coefficients safely.
506+
let acc: i128 = COEFFS
507+
.iter()
508+
.zip(data.iter())
509+
.fold(0i128, |s, (&c, &v)| s + (c as i128) * (v as i128));
510+
511+
Ok((acc as f64) / DEN)
512+
}
513+
}
514+
515+
impl Smooth for SavitzkyGolayFilter<PacketAndByte<u64>> {
516+
type Error = DerivativeError;
517+
type Output = PacketAndByte<f64>;
518+
519+
fn smooth(&self) -> Result<Self::Output, DerivativeError> {
520+
const SAMPLES: usize = 5;
521+
const COEFFS: [i64; SAMPLES] = [-3, 12, 17, 12, -3]; // / 35
522+
const DEN: f64 = 35.0;
523+
524+
let len = self.data.len();
525+
if len < SAMPLES {
526+
return Err(DerivativeError::NotEnoughSamples(len));
527+
}
528+
529+
let mut itr = self.data.iter().cycle().skip(self.idx).copied();
530+
let data: [PacketAndByte<u64>; SAMPLES] = [
531+
itr.next().unwrap_or_else(|| unreachable!()),
532+
itr.next().unwrap_or_else(|| unreachable!()),
533+
itr.next().unwrap_or_else(|| unreachable!()),
534+
itr.next().unwrap_or_else(|| unreachable!()),
535+
itr.next().unwrap_or_else(|| unreachable!()),
536+
];
537+
538+
let acc_packets: i128 = COEFFS
539+
.iter()
540+
.zip(data.iter())
541+
.fold(0i128, |s, (&c, v)| s + (c as i128) * (v.packets as i128));
542+
let acc_bytes: i128 = COEFFS
543+
.iter()
544+
.zip(data.iter())
545+
.fold(0i128, |s, (&c, v)| s + (c as i128) * (v.bytes as i128));
546+
547+
Ok(PacketAndByte {
548+
packets: (acc_packets as f64) / DEN,
549+
bytes: (acc_bytes as f64) / DEN,
550+
})
551+
}
552+
}
553+
554+
impl Smooth for SavitzkyGolayFilter<TransmitSummary<u64>> {
555+
type Error = DerivativeError;
556+
type Output = TransmitSummary<f64>;
557+
558+
fn smooth(&self) -> Result<Self::Output, DerivativeError> {
559+
if self.data.len() != 5 {
560+
return Err(DerivativeError::NotEnoughSamples(self.data.len()));
561+
}
562+
// Convert to per-destination SG filters first, then smooth those.
563+
let x = TransmitSummary::<SavitzkyGolayFilter<u64>>::try_from(self)?;
564+
x.smooth()
565+
}
566+
}
567+
568+
impl<T> Smooth for TransmitSummary<SavitzkyGolayFilter<T>>
569+
where
570+
SavitzkyGolayFilter<T>: Smooth<Output: Default, Error = DerivativeError>,
571+
{
572+
type Error = DerivativeError;
573+
type Output = TransmitSummary<<SavitzkyGolayFilter<T> as Smooth>::Output>;
574+
575+
fn smooth(&self) -> Result<Self::Output, Self::Error> {
576+
let mut out = TransmitSummary::new();
577+
let items = self
578+
.dst
579+
.iter()
580+
.map(|(&k, v)| {
581+
let packets = v.packets.smooth()?;
582+
let bytes = v.bytes.smooth()?;
583+
Ok((k, PacketAndByte { packets, bytes }))
584+
})
585+
.collect::<Result<Vec<_>, DerivativeError>>()?
586+
.into_iter();
587+
588+
for (k, v) in items {
589+
out.dst.insert(k, v);
590+
}
591+
Ok(out)
592+
}
593+
}
594+
595+
impl<K, V, S> HashMapSmoothing for hashbrown::HashMap<K, V, S>
596+
where
597+
K: Hash + Eq + Clone,
598+
V: Smooth,
599+
S: BuildHasher,
600+
{
601+
type Error = ();
602+
type Output = hashbrown::HashMap<K, V::Output>;
603+
604+
fn smooth(&self) -> Result<Self::Output, Self::Error> {
605+
Ok(self
606+
.iter()
607+
.filter_map(|(k, v)| Some((k.clone(), v.smooth().ok()?)))
608+
.collect())
609+
}
610+
}
611+
464612
#[cfg(any(test, feature = "bolero"))]
465613
mod contract {
466614
use crate::rate::{Derivative, SavitzkyGolayFilter};

0 commit comments

Comments
 (0)