diff --git a/bottlecap/src/config/flush_strategy.rs b/bottlecap/src/config/flush_strategy.rs index a72a24161..51e9710eb 100644 --- a/bottlecap/src/config/flush_strategy.rs +++ b/bottlecap/src/config/flush_strategy.rs @@ -8,10 +8,27 @@ pub struct PeriodicStrategy { #[derive(Clone, Copy, Debug, PartialEq)] pub enum FlushStrategy { + // Flush every 1s and at the end of the invocation Default, + // User specifies the interval in milliseconds, will not block on the runtimeDone event + Periodically(PeriodicStrategy), + // Always flush at the end of the invocation End, + // Flush both (1) at the end of the invocation and (2) periodically with the specified interval EndPeriodically(PeriodicStrategy), + // Flush in a non-blocking, asynchronous manner, so the next invocation can start without waiting + // for the flush to complete + Continuously(PeriodicStrategy), +} + +// A restricted subset of `FlushStrategy`. The Default strategy is now allowed, which is required to be +// translated into a concrete strategy. +#[allow(clippy::module_name_repetitions)] +#[derive(Clone, Copy, Debug, PartialEq)] +pub enum ConcreteFlushStrategy { Periodically(PeriodicStrategy), + End, + EndPeriodically(PeriodicStrategy), Continuously(PeriodicStrategy), } diff --git a/bottlecap/src/config/mod.rs b/bottlecap/src/config/mod.rs index e1e2f7f1e..8cbc32842 100644 --- a/bottlecap/src/config/mod.rs +++ b/bottlecap/src/config/mod.rs @@ -240,6 +240,7 @@ pub struct Config { pub api_key: String, pub log_level: LogLevel, + // Timeout for the request to flush data to Datadog endpoint pub flush_timeout: u64, // Proxy diff --git a/bottlecap/src/lifecycle/flush_control.rs b/bottlecap/src/lifecycle/flush_control.rs index 1d60594b7..6af1f88ad 100644 --- a/bottlecap/src/lifecycle/flush_control.rs +++ b/bottlecap/src/lifecycle/flush_control.rs @@ -1,4 +1,4 @@ -use crate::config::flush_strategy::FlushStrategy; +use crate::config::flush_strategy::{ConcreteFlushStrategy, FlushStrategy}; use std::time; use tokio::time::{Interval, MissedTickBehavior::Skip}; @@ -15,6 +15,7 @@ pub struct FlushControl { flush_timeout: u64, } +// The flush behavior for the current moment #[derive(Copy, Clone, Debug, PartialEq)] pub enum FlushDecision { Continuous, @@ -23,12 +24,6 @@ pub enum FlushDecision { Dont, } -// 1. Default Strategy -// - Flush every 1s and at the end of the invocation -// 2. Periodic Strategy -// - User specifies the interval in milliseconds, will not block on the runtimeDone event -// 3. End strategy -// - Always flush at the end of the invocation impl FlushControl { #[must_use] pub fn new(flush_strategy: FlushStrategy, flush_timeout: u64) -> FlushControl { @@ -58,11 +53,15 @@ impl FlushControl { i } FlushStrategy::End => { + // Set the race flush interval to the maximum value of Lambda timeout, so flush will + // only happen at the end of the invocation, and race flush will never happen. tokio::time::interval(tokio::time::Duration::from_millis(FIFTEEN_MINUTES)) } } } + // Evaluate the flush decision for the current moment, based on the flush strategy, current time, + // and the past invocation times. #[must_use] pub fn evaluate_flush_decision(&mut self) -> FlushDecision { let now = time::SystemTime::now() @@ -70,17 +69,13 @@ impl FlushControl { .expect("unable to poll clock, unrecoverable") .as_secs(); self.invocation_times.add(now); - let evaluated_flush_strategy = if self.flush_strategy == FlushStrategy::Default { - &self.invocation_times.should_adapt(now, self.flush_timeout) - } else { - // User specified one - &self.flush_strategy - }; - match evaluated_flush_strategy { - FlushStrategy::Default => { - unreachable!("should_adapt must translate default strategy to concrete strategy") - } - FlushStrategy::Periodically(strategy) => { + let concrete_flush_strategy = self.invocation_times.evaluate_concrete_strategy( + now, + self.flush_timeout, + self.flush_strategy, + ); + match concrete_flush_strategy { + ConcreteFlushStrategy::Periodically(strategy) => { if self.interval_passed(now, strategy.interval) { self.last_flush = now; // TODO calculate periodic rate. if it's more frequent than the flush_timeout @@ -90,7 +85,7 @@ impl FlushControl { FlushDecision::Dont } } - FlushStrategy::Continuously(strategy) => { + ConcreteFlushStrategy::Continuously(strategy) => { if self.interval_passed(now, strategy.interval) { self.last_flush = now; // TODO calculate periodic rate. if it's more frequent than the flush_timeout @@ -100,8 +95,8 @@ impl FlushControl { FlushDecision::Dont } } - FlushStrategy::End => FlushDecision::End, - FlushStrategy::EndPeriodically(strategy) => { + ConcreteFlushStrategy::End => FlushDecision::End, + ConcreteFlushStrategy::EndPeriodically(strategy) => { if self.interval_passed(now, strategy.interval) { self.last_flush = now; FlushDecision::End diff --git a/bottlecap/src/lifecycle/invocation_times.rs b/bottlecap/src/lifecycle/invocation_times.rs index 4ddea1de1..dca4777e1 100644 --- a/bottlecap/src/lifecycle/invocation_times.rs +++ b/bottlecap/src/lifecycle/invocation_times.rs @@ -1,4 +1,4 @@ -use crate::config::flush_strategy::{FlushStrategy, PeriodicStrategy}; +use crate::config::flush_strategy::{ConcreteFlushStrategy, FlushStrategy, PeriodicStrategy}; const TWENTY_SECONDS: u64 = 20 * 1000; const LOOKBACK_COUNT: usize = 20; @@ -23,39 +23,62 @@ impl InvocationTimes { self.head = (self.head + 1) % LOOKBACK_COUNT; } - pub(crate) fn should_adapt(&self, now: u64, flush_timeout: u64) -> FlushStrategy { - // If the buffer isn't full, then we haven't seen enough invocations, so we should flush. - for idx in self.head..LOOKBACK_COUNT { - if self.times[idx] == 0 { - return FlushStrategy::End; + // Translate FlushStrategy to a ConcreteFlushStrategy + // For FlushStrategy::Default, evaluate based on past invocation times. Otherwise, return the + // strategy as is. + pub(crate) fn evaluate_concrete_strategy( + &self, + now: u64, + flush_timeout: u64, + flush_strategy: FlushStrategy, + ) -> ConcreteFlushStrategy { + match flush_strategy { + FlushStrategy::Periodically(p) => ConcreteFlushStrategy::Periodically(p), + FlushStrategy::End => ConcreteFlushStrategy::End, + FlushStrategy::Continuously(p) => ConcreteFlushStrategy::Continuously(p), + FlushStrategy::EndPeriodically(p) => ConcreteFlushStrategy::EndPeriodically(p), + FlushStrategy::Default => { + // If the buffer isn't full, then we haven't seen enough invocations, so we should flush + // at the end of the invocation. + for idx in self.head..LOOKBACK_COUNT { + if self.times[idx] == 0 { + return ConcreteFlushStrategy::End; + } + } + + // Now we've seen at least 20 invocations. Possible cases: + // 1. If the average time between invocations is longer than 2 minutes, stick to End strategy. + // 2. If average interval is shorter than 2 minutes: + // 2.1 If it's very short, use the continuous strategy to minimize delaying the next invocation. + // 2.2 If it's not too short, use the periodic strategy to minimize the risk that + // flushing is delayed due to the Lambda environment being frozen between invocations. + // We get the average time between each invocation by taking the difference between newest (`now`) and the + // oldest invocation in the buffer, then dividing by `LOOKBACK_COUNT - 1`. + let oldest = self.times[self.head]; + + let elapsed = now - oldest; + let should_adapt = + (elapsed as f64 / (LOOKBACK_COUNT - 1) as f64) < ONE_TWENTY_SECONDS; + if should_adapt { + // Both units here are in seconds + if elapsed < flush_timeout { + return ConcreteFlushStrategy::Continuously(PeriodicStrategy { + interval: TWENTY_SECONDS, + }); + } + return ConcreteFlushStrategy::Periodically(PeriodicStrategy { + interval: TWENTY_SECONDS, + }); + } + ConcreteFlushStrategy::End } } - - // Now we've seen at least 20 invocations. Switch to periodic if we're invoked at least once every 2 minutes. - // We get the average time between each invocation by taking the difference between newest (`now`) and the - // oldest invocation in the buffer, then dividing by `LOOKBACK_COUNT - 1`. - let oldest = self.times[self.head]; - - let elapsed = now - oldest; - let should_adapt = (elapsed as f64 / (LOOKBACK_COUNT - 1) as f64) < ONE_TWENTY_SECONDS; - if should_adapt { - // Both units here are in seconds - if elapsed < flush_timeout { - return FlushStrategy::Continuously(PeriodicStrategy { - interval: TWENTY_SECONDS, - }); - } - return FlushStrategy::Periodically(PeriodicStrategy { - interval: TWENTY_SECONDS, - }); - } - FlushStrategy::End } } #[cfg(test)] mod tests { - use crate::config::flush_strategy::{FlushStrategy, PeriodicStrategy}; + use crate::config::flush_strategy::{ConcreteFlushStrategy, FlushStrategy, PeriodicStrategy}; use crate::lifecycle::invocation_times::{self, TWENTY_SECONDS}; #[test] @@ -75,7 +98,10 @@ mod tests { invocation_times.add(timestamp); assert_eq!(invocation_times.times[0], timestamp); assert_eq!(invocation_times.head, 1); - assert_eq!(invocation_times.should_adapt(1, 60), FlushStrategy::End); + assert_eq!( + invocation_times.evaluate_concrete_strategy(1, 60, FlushStrategy::Default), + ConcreteFlushStrategy::End + ); } #[test] @@ -88,8 +114,8 @@ mod tests { assert_eq!(invocation_times.times[0], 20); assert_eq!(invocation_times.head, 1); assert_eq!( - invocation_times.should_adapt(21, 60), - FlushStrategy::Continuously(PeriodicStrategy { + invocation_times.evaluate_concrete_strategy(21, 60, FlushStrategy::Default), + ConcreteFlushStrategy::Continuously(PeriodicStrategy { interval: TWENTY_SECONDS }) ); @@ -105,8 +131,8 @@ mod tests { assert_eq!(invocation_times.times[0], 20); assert_eq!(invocation_times.head, 1); assert_eq!( - invocation_times.should_adapt(21, 1), - FlushStrategy::Periodically(PeriodicStrategy { + invocation_times.evaluate_concrete_strategy(21, 1, FlushStrategy::Default), + ConcreteFlushStrategy::Periodically(PeriodicStrategy { interval: TWENTY_SECONDS }) ); @@ -122,7 +148,10 @@ mod tests { // should wrap around assert_eq!(invocation_times.times[0], 5019); assert_eq!(invocation_times.head, 1); - assert_eq!(invocation_times.should_adapt(10000, 60), FlushStrategy::End); + assert_eq!( + invocation_times.evaluate_concrete_strategy(10000, 60, FlushStrategy::Default), + ConcreteFlushStrategy::End + ); } #[test] @@ -140,8 +169,8 @@ mod tests { 1901 ); assert_eq!( - invocation_times.should_adapt(2501, 60), - FlushStrategy::Periodically(PeriodicStrategy { + invocation_times.evaluate_concrete_strategy(2501, 60, FlushStrategy::Default), + ConcreteFlushStrategy::Periodically(PeriodicStrategy { interval: TWENTY_SECONDS }) ); @@ -161,6 +190,9 @@ mod tests { invocation_times.times[invocation_times::LOOKBACK_COUNT - 1], 2471 ); - assert_eq!(invocation_times.should_adapt(3251, 60), FlushStrategy::End); + assert_eq!( + invocation_times.evaluate_concrete_strategy(3251, 60, FlushStrategy::Default), + ConcreteFlushStrategy::End + ); } }