Skip to content

Commit 9ea2a19

Browse files
committed
add snapstart span
1 parent ebf4d41 commit 9ea2a19

File tree

8 files changed

+170
-1
lines changed

8 files changed

+170
-1
lines changed

bottlecap/src/bin/bottlecap/main.rs

Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -803,6 +803,7 @@ async fn blocking_flush_all(
803803
}
804804

805805
#[allow(clippy::too_many_arguments)]
806+
#[allow(clippy::too_many_lines)]
806807
async fn handle_event_bus_event(
807808
event: Event,
808809
invocation_processor_handle: InvocationProcessorHandle,
@@ -848,6 +849,24 @@ async fn handle_event_bus_event(
848849
error!("Failed to send platform init report to processor: {}", e);
849850
}
850851
}
852+
TelemetryRecord::PlatformRestoreStart { .. } => {
853+
if let Err(e) = invocation_processor_handle
854+
.on_platform_restore_start(event.time)
855+
.await
856+
{
857+
error!("Failed to send platform restore start to processor: {}", e);
858+
}
859+
}
860+
TelemetryRecord::PlatformRestoreReport { metrics, .. } => {
861+
if let Some(m) = metrics {
862+
if let Err(e) = invocation_processor_handle
863+
.on_platform_restore_report(m.duration_ms, event.time.timestamp())
864+
.await
865+
{
866+
error!("Failed to send platform restore report to processor: {}", e);
867+
}
868+
}
869+
}
851870
TelemetryRecord::PlatformStart { request_id, .. } => {
852871
if let Err(e) = invocation_processor_handle
853872
.on_platform_start(request_id, event.time)

bottlecap/src/extension/telemetry/events.rs

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -142,6 +142,8 @@ pub enum TelemetryRecord {
142142
status: Status,
143143
/// When unsuccessful, the `error_type` describes what kind of error occurred
144144
error_type: Option<String>,
145+
/// Metrics about the restore phase
146+
metrics: Option<RestoreReportMetrics>,
145147
},
146148
#[serde(rename = "platform.restoreRuntimeDone", rename_all = "camelCase")]
147149
PlatformRestoreRuntimeDone {
@@ -203,6 +205,14 @@ pub struct InitReportMetrics {
203205
pub duration_ms: f64,
204206
}
205207

208+
/// Restore report metrics
209+
#[derive(Clone, Copy, Debug, Deserialize, PartialEq)]
210+
#[serde(rename_all = "camelCase")]
211+
pub struct RestoreReportMetrics {
212+
/// Duration of restore phase in milliseconds
213+
pub duration_ms: f64,
214+
}
215+
206216
/// Runtime done metrics
207217
#[derive(Clone, Copy, Debug, Deserialize, PartialEq)]
208218
#[serde(rename_all = "camelCase")]

bottlecap/src/lifecycle/invocation/context.rs

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,10 @@ pub struct Context {
3535
///
3636
/// This span is only present if the function is being invoked for the first time.
3737
pub cold_start_span: Option<Span>,
38+
/// The span representing the `SnapStart` restore phase.
39+
///
40+
/// This span is only present if the function is using `SnapStart` and being invoked for the first time.
41+
pub snapstart_restore_span: Option<Span>,
3842
/// The extracted span context from the incoming request, used for distributed
3943
/// tracing.
4044
///
@@ -87,6 +91,7 @@ impl Default for Context {
8791
invocation_span: Span::default(),
8892
runtime_done_received: false,
8993
cold_start_span: None,
94+
snapstart_restore_span: None,
9095
tracer_span: None,
9196
extracted_span_context: None,
9297
}

bottlecap/src/lifecycle/invocation/processor.rs

Lines changed: 63 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -262,6 +262,53 @@ impl Processor {
262262
}
263263
}
264264

265+
/// Called when the `SnapStart` restore phase starts.
266+
///
267+
/// This is used to create a `snapstart_restore` span, since this telemetry event does not
268+
/// provide a `request_id`, we try to guess which invocation is the restore similar to init.
269+
pub fn on_platform_restore_start(&mut self, time: DateTime<Utc>) {
270+
let start_time: i64 = SystemTime::from(time)
271+
.duration_since(UNIX_EPOCH)
272+
.expect("time went backwards")
273+
.as_nanos()
274+
.try_into()
275+
.unwrap_or_default();
276+
277+
// Get the closest context
278+
let Some(context) = self.context_buffer.get_closest_mut(start_time) else {
279+
debug!("Cannot process on platform restore start, no invocation context found");
280+
return;
281+
};
282+
283+
// Create a SnapStart restore span
284+
let mut snapstart_restore_span = create_empty_span(
285+
String::from("aws.lambda.snapstart_restore"),
286+
&self.resource,
287+
&self.service,
288+
);
289+
snapstart_restore_span.span_id = generate_span_id();
290+
snapstart_restore_span.start = start_time;
291+
context.snapstart_restore_span = Some(snapstart_restore_span);
292+
}
293+
294+
/// Given the duration of the platform restore report, set the snapstart restore duration.
295+
///
296+
#[allow(clippy::cast_possible_truncation)]
297+
pub fn on_platform_restore_report(&mut self, duration_ms: f64, timestamp: i64) {
298+
self.enhanced_metrics
299+
.set_snapstart_restore_duration_metric(duration_ms, timestamp);
300+
301+
let Some(context) = self.context_buffer.get_closest_mut(timestamp) else {
302+
debug!("Cannot process on platform restore report, no invocation context found");
303+
return;
304+
};
305+
306+
if let Some(snapstart_restore_span) = &mut context.snapstart_restore_span {
307+
// `round` is intentionally meant to be a whole integer
308+
snapstart_restore_span.duration = (duration_ms * MS_TO_NS) as i64;
309+
}
310+
}
311+
265312
/// Given a `request_id` and the time of the platform start, add the start time to the context buffer.
266313
///
267314
pub fn on_platform_start(&mut self, request_id: String, time: DateTime<Utc>) {
@@ -409,6 +456,14 @@ impl Processor {
409456
cold_start_span.parent_id = context.invocation_span.parent_id;
410457
}
411458
}
459+
460+
// Handle snapstart restore span if present
461+
if let Some(snapstart_restore_span) = &mut context.snapstart_restore_span {
462+
if context.invocation_span.trace_id != 0 {
463+
snapstart_restore_span.trace_id = context.invocation_span.trace_id;
464+
snapstart_restore_span.parent_id = context.invocation_span.parent_id;
465+
}
466+
}
412467
Some(context.clone())
413468
}
414469

@@ -431,7 +486,14 @@ impl Processor {
431486
traces.push(ws.clone());
432487
}
433488

434-
if let Some(cold_start_span) = &context.cold_start_span {
489+
// SnapStart includes telemetry events from Init (Cold Start).
490+
// However, these Init events are from when the snapshot was created and
491+
// not when the lambda sandbox is actually created.
492+
// So, if we have a snapstart restore span, use it instead of cold start span.
493+
if let Some(snapstart_restore_span) = &context.snapstart_restore_span {
494+
body_size += std::mem::size_of_val(snapstart_restore_span);
495+
traces.push(snapstart_restore_span.clone());
496+
} else if let Some(cold_start_span) = &context.cold_start_span {
435497
body_size += std::mem::size_of_val(cold_start_span);
436498
traces.push(cold_start_span.clone());
437499
}

bottlecap/src/lifecycle/invocation/processor_service.rs

Lines changed: 39 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -44,6 +44,13 @@ pub enum ProcessorCommand {
4444
duration_ms: f64,
4545
timestamp: i64,
4646
},
47+
PlatformRestoreStart {
48+
time: DateTime<Utc>,
49+
},
50+
PlatformRestoreReport {
51+
duration_ms: f64,
52+
timestamp: i64,
53+
},
4754
PlatformStart {
4855
request_id: String,
4956
time: DateTime<Utc>,
@@ -144,6 +151,28 @@ impl InvocationProcessorHandle {
144151
.await
145152
}
146153

154+
pub async fn on_platform_restore_start(
155+
&self,
156+
time: DateTime<Utc>,
157+
) -> Result<(), mpsc::error::SendError<ProcessorCommand>> {
158+
self.sender
159+
.send(ProcessorCommand::PlatformRestoreStart { time })
160+
.await
161+
}
162+
163+
pub async fn on_platform_restore_report(
164+
&self,
165+
duration_ms: f64,
166+
timestamp: i64,
167+
) -> Result<(), mpsc::error::SendError<ProcessorCommand>> {
168+
self.sender
169+
.send(ProcessorCommand::PlatformRestoreReport {
170+
duration_ms,
171+
timestamp,
172+
})
173+
.await
174+
}
175+
147176
pub async fn on_platform_start(
148177
&self,
149178
request_id: String,
@@ -405,6 +434,16 @@ impl InvocationProcessorService {
405434
self.processor
406435
.on_platform_init_report(init_type, duration_ms, timestamp);
407436
}
437+
ProcessorCommand::PlatformRestoreStart { time } => {
438+
self.processor.on_platform_restore_start(time);
439+
}
440+
ProcessorCommand::PlatformRestoreReport {
441+
duration_ms,
442+
timestamp,
443+
} => {
444+
self.processor
445+
.on_platform_restore_report(duration_ms, timestamp);
446+
}
408447
ProcessorCommand::PlatformStart { request_id, time } => {
409448
self.processor.on_platform_start(request_id, time);
410449
}

bottlecap/src/logs/lambda/processor.rs

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -222,6 +222,18 @@ impl LambdaProcessor {
222222
None,
223223
))
224224
},
225+
TelemetryRecord::PlatformRestoreStart { .. } => {
226+
if let Err(e) = self.event_bus.send(Event::Telemetry(event)).await {
227+
error!("Failed to send PlatformRestoreStart to the main event bus: {}", e);
228+
}
229+
Err("Unsupported event type".into())
230+
}
231+
TelemetryRecord::PlatformRestoreReport { .. } => {
232+
if let Err(e) = self.event_bus.send(Event::Telemetry(event)).await {
233+
error!("Failed to send PlatformRestoreReport to the main event bus: {}", e);
234+
}
235+
Err("Unsupported event type".into())
236+
}
225237
// TODO: PlatformInitRuntimeDone
226238
// TODO: PlatformExtension
227239
// TODO: PlatformTelemetrySubscription

bottlecap/src/metrics/enhanced/constants.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,8 @@ pub const DURATION_METRIC: &str = "aws.lambda.enhanced.duration";
1717
pub const POST_RUNTIME_DURATION_METRIC: &str = "aws.lambda.enhanced.post_runtime_duration";
1818
pub const ESTIMATED_COST_METRIC: &str = "aws.lambda.enhanced.estimated_cost";
1919
pub const INIT_DURATION_METRIC: &str = "aws.lambda.enhanced.init_duration";
20+
pub const SNAPSTART_RESTORE_DURATION_METRIC: &str =
21+
"aws.lambda.enhanced.snapstart_restore_duration";
2022
pub const RESPONSE_LATENCY_METRIC: &str = "aws.lambda.enhanced.response_latency";
2123
pub const RESPONSE_DURATION_METRIC: &str = "aws.lambda.enhanced.response_duration";
2224
pub const PRODUCED_BYTES_METRIC: &str = "aws.lambda.enhanced.produced_bytes";

bottlecap/src/metrics/enhanced/lambda.rs

Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -118,6 +118,26 @@ impl Lambda {
118118
}
119119
}
120120

121+
pub fn set_snapstart_restore_duration_metric(
122+
&mut self,
123+
restore_duration_ms: f64,
124+
timestamp: i64,
125+
) {
126+
if !self.config.enhanced_metrics {
127+
return;
128+
}
129+
let metric = Metric::new(
130+
constants::SNAPSTART_RESTORE_DURATION_METRIC.into(),
131+
MetricValue::distribution(restore_duration_ms * constants::MS_TO_SEC),
132+
self.get_dynamic_value_tags(),
133+
Some(timestamp),
134+
);
135+
136+
if let Err(e) = self.aggr_handle.insert_batch(vec![metric]) {
137+
error!("failed to insert metric: {}", e);
138+
}
139+
}
140+
121141
pub fn set_invoked_received(&mut self) {
122142
self.invoked_received = true;
123143
}

0 commit comments

Comments
 (0)