Skip to content

Commit 3bfdcc9

Browse files
committed
[telemetry] Improve invoker observability
- Add time taken to receive the first response (headers) from the user service - Add Time taken to replay the journal
1 parent 1f2b4fa commit 3bfdcc9

File tree

3 files changed

+42
-2
lines changed

3 files changed

+42
-2
lines changed

crates/invoker-impl/src/invocation_task/service_protocol_runner.rs

Lines changed: 13 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@ use crate::invocation_task::{
1515
ResponseChunk, ResponseStreamState, TerminalLoopState, X_RESTATE_SERVER,
1616
invocation_id_to_header_value, service_protocol_version_to_header_value,
1717
};
18+
use crate::metric_definitions::{INVOKER_JOURNAL_REPLAY_TIME, INVOKER_SERVICE_RESPONSE_TIME};
1819
use bytes::Bytes;
1920
use futures::future::FusedFuture;
2021
use futures::{FutureExt, Stream, StreamExt};
@@ -44,7 +45,7 @@ use restate_types::schema::deployment::{
4445
use restate_types::service_protocol::ServiceProtocolVersion;
4546
use std::collections::HashSet;
4647
use std::future::poll_fn;
47-
use std::time::Duration;
48+
use std::time::{Duration, Instant};
4849
use tokio::sync::mpsc;
4950
use tokio_stream::wrappers::ReceiverStream;
5051
use tracing::{debug, trace, warn};
@@ -70,6 +71,7 @@ pub struct ServiceProtocolRunner<'a, IR, EE, DMR> {
7071

7172
// task state
7273
next_journal_index: EntryIndex,
74+
http_stream_start_time: Instant,
7375
}
7476

7577
impl<'a, IR, EE, DMR> ServiceProtocolRunner<'a, IR, EE, DMR>
@@ -93,6 +95,8 @@ where
9395
encoder,
9496
decoder,
9597
next_journal_index: 0,
98+
// initial value will be overridden to the actual start time of the http stream
99+
http_stream_start_time: Instant::now(),
96100
}
97101
}
98102

@@ -161,6 +165,8 @@ where
161165
.await
162166
);
163167

168+
// update the start time for the service response time metric
169+
self.http_stream_start_time = Instant::now();
164170
// Initialize the response stream state
165171
let mut http_stream_rx =
166172
ResponseStreamState::initialize(&self.invocation_task.client, request);
@@ -171,6 +177,9 @@ where
171177
.await
172178
);
173179

180+
metrics::histogram!(INVOKER_JOURNAL_REPLAY_TIME)
181+
.record(self.http_stream_start_time.elapsed());
182+
174183
// Check all the entries have been replayed
175184
debug_assert_eq!(self.next_journal_index, journal_size);
176185

@@ -460,6 +469,9 @@ where
460469
&mut self,
461470
mut parts: http::response::Parts,
462471
) -> Result<(), InvokerError> {
472+
metrics::histogram!(INVOKER_SERVICE_RESPONSE_TIME)
473+
.record(self.http_stream_start_time.elapsed());
474+
463475
// if service is running behind a gateway, the service can be down
464476
// but we still get a response code from the gateway itself. In that
465477
// case we still need to return the proper error

crates/invoker-impl/src/invocation_task/service_protocol_runner_v4.rs

Lines changed: 15 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,7 @@
1111
use std::collections::HashSet;
1212
use std::future::poll_fn;
1313
use std::ops::Deref;
14-
use std::time::Duration;
14+
use std::time::{Duration, Instant};
1515

1616
use bytes::Bytes;
1717
use bytestring::ByteString;
@@ -63,6 +63,7 @@ use crate::invocation_task::{
6363
ResponseChunk, ResponseStreamState, TerminalLoopState, X_RESTATE_SERVER,
6464
invocation_id_to_header_value, service_protocol_version_to_header_value,
6565
};
66+
use crate::metric_definitions::{INVOKER_JOURNAL_REPLAY_TIME, INVOKER_SERVICE_RESPONSE_TIME};
6667

6768
/// Provides the value of the invocation id
6869
const INVOCATION_ID_HEADER_NAME: HeaderName = HeaderName::from_static("x-restate-invocation-id");
@@ -85,6 +86,7 @@ pub struct ServiceProtocolRunner<'a, IR, EE, Schemas> {
8586

8687
// task state
8788
command_index: CommandIndex,
89+
http_stream_start_time: Instant,
8890
}
8991

9092
impl<'a, IR, EE, Schemas> ServiceProtocolRunner<'a, IR, EE, Schemas>
@@ -108,6 +110,8 @@ where
108110
encoder,
109111
decoder,
110112
command_index: 0,
113+
// initial value will be overridden to the actual start time of the http stream
114+
http_stream_start_time: Instant::now(),
111115
}
112116
}
113117

@@ -176,6 +180,9 @@ where
176180
.await
177181
);
178182

183+
// update the start time for the service response time metric
184+
self.http_stream_start_time = Instant::now();
185+
179186
// Initialize the response stream state
180187
let mut http_stream_rx =
181188
ResponseStreamState::initialize(&self.invocation_task.client, request);
@@ -186,6 +193,9 @@ where
186193
.await
187194
);
188195

196+
metrics::histogram!(INVOKER_JOURNAL_REPLAY_TIME)
197+
.record(self.http_stream_start_time.elapsed());
198+
189199
// If we have the invoker_rx and the protocol type is bidi stream,
190200
// then we can use the bidi_stream loop reading the invoker_rx and the http_stream_rx
191201
if protocol_type == ProtocolType::BidiStream {
@@ -308,6 +318,7 @@ where
308318
let got_headers_future = poll_fn(|cx| http_stream_rx.poll_only_headers(cx)).fuse();
309319
tokio::pin!(got_headers_future);
310320

321+
// TODO: add metrics for delays for when headers are received (first response from service)
311322
loop {
312323
tokio::select! {
313324
got_headers_res = got_headers_future.as_mut(), if !got_headers_future.is_terminated() => {
@@ -518,6 +529,9 @@ where
518529
&mut self,
519530
mut parts: http::response::Parts,
520531
) -> Result<(), InvokerError> {
532+
metrics::histogram!(INVOKER_SERVICE_RESPONSE_TIME)
533+
.record(self.http_stream_start_time.elapsed());
534+
521535
// if service is running behind a gateway, the service can be down
522536
// but we still get a response code from the gateway itself. In that
523537
// case we still need to return the proper error

crates/invoker-impl/src/metric_definitions.rs

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,9 @@ pub const INVOKER_INVOCATION_TASKS: &str = "restate.invoker.invocation_tasks.tot
1818
pub const INVOKER_AVAILABLE_SLOTS: &str = "restate.invoker.available_slots";
1919
pub const INVOKER_CONCURRENCY_LIMIT: &str = "restate.invoker.concurrency_limit";
2020
pub const INVOKER_TASK_DURATION: &str = "restate.invoker.task_duration.seconds";
21+
pub const INVOKER_SERVICE_RESPONSE_TIME: &str = "restate.invoker.service_response_time.seconds";
2122
pub const INVOKER_TASKS_IN_FLIGHT: &str = "restate.invoker.inflight_tasks";
23+
pub const INVOKER_JOURNAL_REPLAY_TIME: &str = "restate.invoker.journal_replay_time.seconds";
2224

2325
pub const TASK_OP_STARTED: &str = "started";
2426
pub const TASK_OP_SUSPENDED: &str = "suspended";
@@ -62,6 +64,18 @@ pub(crate) fn describe_metrics() {
6264
"Time taken to complete an invoker task"
6365
);
6466

67+
describe_histogram!(
68+
INVOKER_SERVICE_RESPONSE_TIME,
69+
Unit::Seconds,
70+
"Time taken to receive the first response (headers) from the user service"
71+
);
72+
73+
describe_histogram!(
74+
INVOKER_JOURNAL_REPLAY_TIME,
75+
Unit::Seconds,
76+
"Time taken to replay the journal"
77+
);
78+
6579
describe_gauge!(
6680
INVOKER_TASKS_IN_FLIGHT,
6781
Unit::Count,

0 commit comments

Comments
 (0)