From 971d51f886ca6e32116759d240f1b4d7827a02da Mon Sep 17 00:00:00 2001 From: Marcus Griep Date: Wed, 26 Jun 2024 12:29:03 -0600 Subject: [PATCH 1/3] feat: add debug spans for decoding requests Closes: #1759 --- tonic/src/codec/decode.rs | 79 ++++++++++++++++++++++++++++++++------- 1 file changed, 66 insertions(+), 13 deletions(-) diff --git a/tonic/src/codec/decode.rs b/tonic/src/codec/decode.rs index 742660e31..829b40e34 100644 --- a/tonic/src/codec/decode.rs +++ b/tonic/src/codec/decode.rs @@ -38,12 +38,43 @@ impl Unpin for Streaming {} #[derive(Debug, Clone)] enum State { - ReadHeader, + ReadHeader { + span: Option, + }, ReadBody { + span: tracing::Span, compression: Option, len: usize, }, - Error(Status), + Error(Box), +} + +impl State { + fn read_header() -> Self { + Self::ReadHeader { span: None } + } + + fn read_body(compression: Option, len: usize) -> Self { + let span = tracing::debug_span!( + "read_body", + body.compression = compression.map(|c| c.as_str()).unwrap_or("none"), + body.bytes.compressed = compression.is_some().then_some(len), + body.bytes.uncompressed = compression.is_none().then_some(len), + ); + Self::ReadBody { + span, + compression, + len, + } + } + + fn span(&self) -> Option<&tracing::Span> { + match self { + Self::ReadHeader { span } => span.as_ref(), + Self::ReadBody { span, .. } => Some(span), + Self::Error(_) => None, + } + } } #[derive(Debug, PartialEq, Eq)] @@ -125,7 +156,7 @@ impl Streaming { .map_frame(|frame| frame.map_data(|mut buf| buf.copy_to_bytes(buf.remaining()))) .map_err(|err| Status::map_error(err.into())) .boxed_unsync(), - state: State::ReadHeader, + state: State::read_header(), direction, buf: BytesMut::with_capacity(buffer_size), trailers: None, @@ -142,7 +173,19 @@ impl StreamingInner { &mut self, buffer_settings: BufferSettings, ) -> Result>, Status> { - if let State::ReadHeader = self.state { + if let State::ReadHeader { span } = &mut self.state { + if !self.buf.has_remaining() { + return Ok(None); + } + + let span = span.get_or_insert_with(|| { + tracing::debug_span!( + "read_header", + body.compression = "none", + body.bytes = tracing::field::Empty, + ) + }); + let _guard = span.enter(); if self.buf.remaining() < HEADER_SIZE { return Ok(None); } @@ -151,7 +194,8 @@ impl StreamingInner { 0 => None, 1 => { { - if self.encoding.is_some() { + if let Some(ce) = self.encoding { + span.record("body.compression", ce.as_str()); self.encoding } else { // https://grpc.github.io/grpc/core/md_doc_compression.html @@ -177,6 +221,7 @@ impl StreamingInner { }; let len = self.buf.get_u32() as usize; + span.record("body.bytes", len); let limit = self .max_message_size .unwrap_or(DEFAULT_MAX_RECV_MESSAGE_SIZE); @@ -191,14 +236,19 @@ impl StreamingInner { } self.buf.reserve(len); + drop(_guard); - self.state = State::ReadBody { - compression: compression_encoding, - len, - } + self.state = State::read_body(compression_encoding, len) } - if let State::ReadBody { len, compression } = self.state { + if let State::ReadBody { + len, + span, + compression, + } = &self.state + { + let (len, compression) = (*len, *compression); + let _guard = span.enter(); // if we haven't read enough of the message then return and keep // reading if self.buf.remaining() < len || self.buf.len() < len { @@ -228,6 +278,7 @@ impl StreamingInner { return Err(Status::new(Code::Internal, message)); } let decompressed_len = self.decompress_buf.len(); + span.record("body.bytes.uncompressed", decompressed_len); DecodeBuf::new(&mut self.decompress_buf, decompressed_len) } else { DecodeBuf::new(&mut self.buf, len) @@ -241,6 +292,7 @@ impl StreamingInner { // Returns Some(()) if data was found or None if the loop in `poll_next` should break fn poll_frame(&mut self, cx: &mut Context<'_>) -> Poll, Status>> { + let _guard = self.state.span().map(|s| s.enter()); let chunk = match ready!(Pin::new(&mut self.body).poll_frame(cx)) { Some(Ok(d)) => Some(d), Some(Err(status)) => { @@ -248,7 +300,8 @@ impl StreamingInner { return Poll::Ready(Ok(None)); } - let _ = std::mem::replace(&mut self.state, State::Error(status.clone())); + drop(_guard); + let _ = std::mem::replace(&mut self.state, State::Error(Box::new(status.clone()))); debug!("decoder inner stream error: {:?}", status); return Poll::Ready(Err(status)); } @@ -378,7 +431,7 @@ impl Streaming { match self.inner.decode_chunk(self.decoder.buffer_settings())? { Some(mut decode_buf) => match self.decoder.decode(&mut decode_buf)? { Some(msg) => { - self.inner.state = State::ReadHeader; + self.inner.state = State::read_header(); Ok(Some(msg)) } None => Ok(None), @@ -394,7 +447,7 @@ impl Stream for Streaming { fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { loop { if let State::Error(status) = &self.inner.state { - return Poll::Ready(Some(Err(status.clone()))); + return Poll::Ready(Some(Err(*status.clone()))); } if let Some(item) = self.decode_chunk()? { From 6495650638f9d5a68e97b1d79e3ff3116cab4f9c Mon Sep 17 00:00:00 2001 From: Marcus Griep Date: Wed, 3 Jul 2024 09:49:36 -0600 Subject: [PATCH 2/3] feat: remove span for header, have span for whole stream --- tonic/src/codec/decode.rs | 39 ++++++++++----------------------------- 1 file changed, 10 insertions(+), 29 deletions(-) diff --git a/tonic/src/codec/decode.rs b/tonic/src/codec/decode.rs index 829b40e34..dd18231bc 100644 --- a/tonic/src/codec/decode.rs +++ b/tonic/src/codec/decode.rs @@ -32,15 +32,14 @@ struct StreamingInner { decompress_buf: BytesMut, encoding: Option, max_message_size: Option, + span: tracing::Span, } impl Unpin for Streaming {} #[derive(Debug, Clone)] enum State { - ReadHeader { - span: Option, - }, + ReadHeader, ReadBody { span: tracing::Span, compression: Option, @@ -50,10 +49,6 @@ enum State { } impl State { - fn read_header() -> Self { - Self::ReadHeader { span: None } - } - fn read_body(compression: Option, len: usize) -> Self { let span = tracing::debug_span!( "read_body", @@ -70,9 +65,8 @@ impl State { fn span(&self) -> Option<&tracing::Span> { match self { - Self::ReadHeader { span } => span.as_ref(), Self::ReadBody { span, .. } => Some(span), - Self::Error(_) => None, + Self::ReadHeader | Self::Error(_) => None, } } } @@ -156,13 +150,14 @@ impl Streaming { .map_frame(|frame| frame.map_data(|mut buf| buf.copy_to_bytes(buf.remaining()))) .map_err(|err| Status::map_error(err.into())) .boxed_unsync(), - state: State::read_header(), + state: State::ReadHeader, direction, buf: BytesMut::with_capacity(buffer_size), trailers: None, decompress_buf: BytesMut::new(), encoding, max_message_size, + span: tracing::debug_span!("streaming"), }, } } @@ -173,19 +168,8 @@ impl StreamingInner { &mut self, buffer_settings: BufferSettings, ) -> Result>, Status> { - if let State::ReadHeader { span } = &mut self.state { - if !self.buf.has_remaining() { - return Ok(None); - } - - let span = span.get_or_insert_with(|| { - tracing::debug_span!( - "read_header", - body.compression = "none", - body.bytes = tracing::field::Empty, - ) - }); - let _guard = span.enter(); + let _guard = self.span.enter(); + if let State::ReadHeader = &self.state { if self.buf.remaining() < HEADER_SIZE { return Ok(None); } @@ -194,8 +178,7 @@ impl StreamingInner { 0 => None, 1 => { { - if let Some(ce) = self.encoding { - span.record("body.compression", ce.as_str()); + if self.encoding.is_some() { self.encoding } else { // https://grpc.github.io/grpc/core/md_doc_compression.html @@ -221,7 +204,6 @@ impl StreamingInner { }; let len = self.buf.get_u32() as usize; - span.record("body.bytes", len); let limit = self .max_message_size .unwrap_or(DEFAULT_MAX_RECV_MESSAGE_SIZE); @@ -236,7 +218,6 @@ impl StreamingInner { } self.buf.reserve(len); - drop(_guard); self.state = State::read_body(compression_encoding, len) } @@ -292,7 +273,7 @@ impl StreamingInner { // Returns Some(()) if data was found or None if the loop in `poll_next` should break fn poll_frame(&mut self, cx: &mut Context<'_>) -> Poll, Status>> { - let _guard = self.state.span().map(|s| s.enter()); + let _guard = self.state.span().unwrap_or(&self.span).enter(); let chunk = match ready!(Pin::new(&mut self.body).poll_frame(cx)) { Some(Ok(d)) => Some(d), Some(Err(status)) => { @@ -431,7 +412,7 @@ impl Streaming { match self.inner.decode_chunk(self.decoder.buffer_settings())? { Some(mut decode_buf) => match self.decoder.decode(&mut decode_buf)? { Some(msg) => { - self.inner.state = State::read_header(); + self.inner.state = State::ReadHeader; Ok(Some(msg)) } None => Ok(None), From 198de46ea7ad6af64e25cd8b125ac0c80b0e6483 Mon Sep 17 00:00:00 2001 From: Marcus Griep Date: Wed, 3 Jul 2024 09:51:37 -0600 Subject: [PATCH 3/3] chore: revert boxing `Status` --- tonic/src/codec/decode.rs | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/tonic/src/codec/decode.rs b/tonic/src/codec/decode.rs index dd18231bc..d77e0973c 100644 --- a/tonic/src/codec/decode.rs +++ b/tonic/src/codec/decode.rs @@ -45,7 +45,7 @@ enum State { compression: Option, len: usize, }, - Error(Box), + Error(Status), } impl State { @@ -282,7 +282,7 @@ impl StreamingInner { } drop(_guard); - let _ = std::mem::replace(&mut self.state, State::Error(Box::new(status.clone()))); + let _ = std::mem::replace(&mut self.state, State::Error(status.clone())); debug!("decoder inner stream error: {:?}", status); return Poll::Ready(Err(status)); } @@ -428,7 +428,7 @@ impl Stream for Streaming { fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { loop { if let State::Error(status) = &self.inner.state { - return Poll::Ready(Some(Err(*status.clone()))); + return Poll::Ready(Some(Err(status.clone()))); } if let Some(item) = self.decode_chunk()? {