diff --git a/tonic/src/codec/decode.rs b/tonic/src/codec/decode.rs index 742660e31..d77e0973c 100644 --- a/tonic/src/codec/decode.rs +++ b/tonic/src/codec/decode.rs @@ -32,6 +32,7 @@ struct StreamingInner { decompress_buf: BytesMut, encoding: Option, max_message_size: Option, + span: tracing::Span, } impl Unpin for Streaming {} @@ -40,12 +41,36 @@ impl Unpin for Streaming {} enum State { ReadHeader, ReadBody { + span: tracing::Span, compression: Option, len: usize, }, Error(Status), } +impl State { + fn read_body(compression: Option, len: usize) -> Self { + let span = tracing::debug_span!( + "read_body", + body.compression = compression.map(|c| c.as_str()).unwrap_or("none"), + body.bytes.compressed = compression.is_some().then_some(len), + body.bytes.uncompressed = compression.is_none().then_some(len), + ); + Self::ReadBody { + span, + compression, + len, + } + } + + fn span(&self) -> Option<&tracing::Span> { + match self { + Self::ReadBody { span, .. } => Some(span), + Self::ReadHeader | Self::Error(_) => None, + } + } +} + #[derive(Debug, PartialEq, Eq)] enum Direction { Request, @@ -132,6 +157,7 @@ impl Streaming { decompress_buf: BytesMut::new(), encoding, max_message_size, + span: tracing::debug_span!("streaming"), }, } } @@ -142,7 +168,8 @@ impl StreamingInner { &mut self, buffer_settings: BufferSettings, ) -> Result>, Status> { - if let State::ReadHeader = self.state { + let _guard = self.span.enter(); + if let State::ReadHeader = &self.state { if self.buf.remaining() < HEADER_SIZE { return Ok(None); } @@ -192,13 +219,17 @@ impl StreamingInner { self.buf.reserve(len); - self.state = State::ReadBody { - compression: compression_encoding, - len, - } + self.state = State::read_body(compression_encoding, len) } - if let State::ReadBody { len, compression } = self.state { + if let State::ReadBody { + len, + span, + compression, + } = &self.state + { + let (len, compression) = (*len, *compression); + let _guard = span.enter(); // if we haven't read enough of the message then return and keep // reading if self.buf.remaining() < len || self.buf.len() < len { @@ -228,6 +259,7 @@ impl StreamingInner { return Err(Status::new(Code::Internal, message)); } let decompressed_len = self.decompress_buf.len(); + span.record("body.bytes.uncompressed", decompressed_len); DecodeBuf::new(&mut self.decompress_buf, decompressed_len) } else { DecodeBuf::new(&mut self.buf, len) @@ -241,6 +273,7 @@ impl StreamingInner { // Returns Some(()) if data was found or None if the loop in `poll_next` should break fn poll_frame(&mut self, cx: &mut Context<'_>) -> Poll, Status>> { + let _guard = self.state.span().unwrap_or(&self.span).enter(); let chunk = match ready!(Pin::new(&mut self.body).poll_frame(cx)) { Some(Ok(d)) => Some(d), Some(Err(status)) => { @@ -248,6 +281,7 @@ impl StreamingInner { return Poll::Ready(Ok(None)); } + drop(_guard); let _ = std::mem::replace(&mut self.state, State::Error(status.clone())); debug!("decoder inner stream error: {:?}", status); return Poll::Ready(Err(status));