diff --git a/tests/compression/Cargo.toml b/tests/compression/Cargo.toml index 13616d0d7..169d4f51a 100644 --- a/tests/compression/Cargo.toml +++ b/tests/compression/Cargo.toml @@ -15,7 +15,7 @@ pin-project = "1.0" prost = "0.14" tokio = {version = "1.0", features = ["macros", "rt-multi-thread", "net"]} tokio-stream = "0.1" -tonic = {path = "../../tonic", features = ["gzip", "deflate", "zstd"]} +tonic = {path = "../../tonic", features = ["gzip", "deflate", "zstd", "lz4"]} tower = "0.5" tower-http = {version = "0.6", features = ["map-response-body", "map-request-body"]} diff --git a/tests/compression/src/bidirectional_stream.rs b/tests/compression/src/bidirectional_stream.rs index c30d8796d..815414be2 100644 --- a/tests/compression/src/bidirectional_stream.rs +++ b/tests/compression/src/bidirectional_stream.rs @@ -7,6 +7,7 @@ util::parametrized_tests! { zstd: CompressionEncoding::Zstd, gzip: CompressionEncoding::Gzip, deflate: CompressionEncoding::Deflate, + lz4: CompressionEncoding::Lz4, } #[allow(dead_code)] @@ -36,6 +37,7 @@ async fn client_enabled_server_enabled(encoding: CompressionEncoding) { CompressionEncoding::Gzip => "gzip", CompressionEncoding::Zstd => "zstd", CompressionEncoding::Deflate => "deflate", + CompressionEncoding::Lz4 => "lz4", _ => panic!("unexpected encoding {:?}", self.encoding), }; assert_eq!(req.headers().get("grpc-encoding").unwrap(), expected); @@ -89,6 +91,7 @@ async fn client_enabled_server_enabled(encoding: CompressionEncoding) { CompressionEncoding::Gzip => "gzip", CompressionEncoding::Zstd => "zstd", CompressionEncoding::Deflate => "deflate", + CompressionEncoding::Lz4 => "lz4", _ => panic!("unexpected encoding {encoding:?}"), }; assert_eq!(res.metadata().get("grpc-encoding").unwrap(), expected); diff --git a/tests/compression/src/client_stream.rs b/tests/compression/src/client_stream.rs index bb24f3fe0..8321ab2ad 100644 --- a/tests/compression/src/client_stream.rs +++ b/tests/compression/src/client_stream.rs @@ -7,6 +7,7 @@ util::parametrized_tests! { zstd: CompressionEncoding::Zstd, gzip: CompressionEncoding::Gzip, deflate: CompressionEncoding::Deflate, + lz4: CompressionEncoding::Lz4, } #[allow(dead_code)] @@ -33,6 +34,7 @@ async fn client_enabled_server_enabled(encoding: CompressionEncoding) { CompressionEncoding::Gzip => "gzip", CompressionEncoding::Zstd => "zstd", CompressionEncoding::Deflate => "deflate", + CompressionEncoding::Lz4 => "lz4", _ => panic!("unexpected encoding {:?}", self.encoding), }; assert_eq!(req.headers().get("grpc-encoding").unwrap(), expected); @@ -80,6 +82,7 @@ util::parametrized_tests! { zstd: CompressionEncoding::Zstd, gzip: CompressionEncoding::Gzip, deflate: CompressionEncoding::Deflate, + lz4: CompressionEncoding::Lz4, } #[allow(dead_code)] @@ -131,6 +134,7 @@ util::parametrized_tests! { zstd: CompressionEncoding::Zstd, gzip: CompressionEncoding::Gzip, deflate: CompressionEncoding::Deflate, + lz4: CompressionEncoding::Lz4, } #[allow(dead_code)] @@ -161,6 +165,7 @@ async fn client_enabled_server_disabled(encoding: CompressionEncoding) { CompressionEncoding::Gzip => "gzip", CompressionEncoding::Zstd => "zstd", CompressionEncoding::Deflate => "deflate", + CompressionEncoding::Lz4 => "lz4", _ => panic!("unexpected encoding {encoding:?}"), }; assert_eq!( @@ -174,6 +179,7 @@ util::parametrized_tests! { zstd: CompressionEncoding::Zstd, gzip: CompressionEncoding::Gzip, deflate: CompressionEncoding::Deflate, + lz4: CompressionEncoding::Lz4, } #[allow(dead_code)] @@ -215,6 +221,7 @@ async fn compressing_response_from_client_stream(encoding: CompressionEncoding) CompressionEncoding::Gzip => "gzip", CompressionEncoding::Zstd => "zstd", CompressionEncoding::Deflate => "deflate", + CompressionEncoding::Lz4 => "lz4", _ => panic!("unexpected encoding {encoding:?}"), }; assert_eq!(res.metadata().get("grpc-encoding").unwrap(), expected); diff --git a/tests/compression/src/compressing_request.rs b/tests/compression/src/compressing_request.rs index ff710f038..dbd3f1e4b 100644 --- a/tests/compression/src/compressing_request.rs +++ b/tests/compression/src/compressing_request.rs @@ -7,6 +7,7 @@ util::parametrized_tests! { zstd: CompressionEncoding::Zstd, gzip: CompressionEncoding::Gzip, deflate: CompressionEncoding::Deflate, + lz4: CompressionEncoding::Lz4, } #[allow(dead_code)] @@ -33,6 +34,7 @@ async fn client_enabled_server_enabled(encoding: CompressionEncoding) { CompressionEncoding::Gzip => "gzip", CompressionEncoding::Zstd => "zstd", CompressionEncoding::Deflate => "deflate", + CompressionEncoding::Lz4 => "lz4", _ => panic!("unexpected encoding {:?}", self.encoding), }; assert_eq!(req.headers().get("grpc-encoding").unwrap(), expected); @@ -84,6 +86,7 @@ util::parametrized_tests! { zstd: CompressionEncoding::Zstd, gzip: CompressionEncoding::Gzip, deflate: CompressionEncoding::Deflate, + lz4: CompressionEncoding::Lz4, } #[allow(dead_code)] @@ -93,12 +96,13 @@ async fn client_enabled_server_enabled_multi_encoding(encoding: CompressionEncod let svc = test_server::TestServer::new(Svc::default()) .accept_compressed(CompressionEncoding::Gzip) .accept_compressed(CompressionEncoding::Zstd) - .accept_compressed(CompressionEncoding::Deflate); + .accept_compressed(CompressionEncoding::Deflate) + .accept_compressed(CompressionEncoding::Lz4); let request_bytes_counter = Arc::new(AtomicUsize::new(0)); fn assert_right_encoding(req: http::Request) -> http::Request { - let supported_encodings = ["gzip", "zstd", "deflate"]; + let supported_encodings = ["gzip", "zstd", "deflate", "lz4"]; let req_encoding = req.headers().get("grpc-encoding").unwrap(); assert!(supported_encodings.iter().any(|e| e == req_encoding)); @@ -146,6 +150,7 @@ parametrized_tests! { zstd: CompressionEncoding::Zstd, gzip: CompressionEncoding::Gzip, deflate: CompressionEncoding::Deflate, + lz4: CompressionEncoding::Lz4, } #[allow(dead_code)] @@ -177,6 +182,7 @@ async fn client_enabled_server_disabled(encoding: CompressionEncoding) { CompressionEncoding::Gzip => "gzip", CompressionEncoding::Zstd => "zstd", CompressionEncoding::Deflate => "deflate", + CompressionEncoding::Lz4 => "lz4", _ => panic!("unexpected encoding {encoding:?}"), }; assert_eq!( @@ -194,6 +200,7 @@ parametrized_tests! { zstd: CompressionEncoding::Zstd, gzip: CompressionEncoding::Gzip, deflate: CompressionEncoding::Deflate, + lz4: CompressionEncoding::Lz4, } #[allow(dead_code)] diff --git a/tests/compression/src/compressing_response.rs b/tests/compression/src/compressing_response.rs index 85c5a481e..d53ea0a1b 100644 --- a/tests/compression/src/compressing_response.rs +++ b/tests/compression/src/compressing_response.rs @@ -6,6 +6,7 @@ util::parametrized_tests! { zstd: CompressionEncoding::Zstd, gzip: CompressionEncoding::Gzip, deflate: CompressionEncoding::Deflate, + lz4: CompressionEncoding::Lz4, } #[allow(dead_code)] @@ -38,6 +39,7 @@ async fn client_enabled_server_enabled(encoding: CompressionEncoding) { CompressionEncoding::Gzip => "gzip", CompressionEncoding::Zstd => "zstd", CompressionEncoding::Deflate => "deflate", + CompressionEncoding::Lz4 => "lz4", _ => panic!("unexpected encoding {:?}", self.encoding), }; assert_eq!( @@ -88,6 +90,7 @@ async fn client_enabled_server_enabled(encoding: CompressionEncoding) { CompressionEncoding::Gzip => "gzip", CompressionEncoding::Zstd => "zstd", CompressionEncoding::Deflate => "deflate", + CompressionEncoding::Lz4 => "lz4", _ => panic!("unexpected encoding {encoding:?}"), }; @@ -104,6 +107,7 @@ util::parametrized_tests! { zstd: CompressionEncoding::Zstd, gzip: CompressionEncoding::Gzip, deflate: CompressionEncoding::Deflate, + lz4: CompressionEncoding::Lz4, } #[allow(dead_code)] @@ -180,7 +184,8 @@ async fn client_enabled_server_disabled_multi_encoding() { let mut client = test_client::TestClient::new(mock_io_channel(client).await) .accept_compressed(CompressionEncoding::Gzip) .accept_compressed(CompressionEncoding::Zstd) - .accept_compressed(CompressionEncoding::Deflate); + .accept_compressed(CompressionEncoding::Deflate) + .accept_compressed(CompressionEncoding::Lz4); let res = client.compress_output_unary(()).await.unwrap(); @@ -195,6 +200,7 @@ util::parametrized_tests! { zstd: CompressionEncoding::Zstd, gzip: CompressionEncoding::Gzip, deflate: CompressionEncoding::Deflate, + lz4: CompressionEncoding::Lz4, } #[allow(dead_code)] @@ -266,6 +272,7 @@ util::parametrized_tests! { zstd: CompressionEncoding::Zstd, gzip: CompressionEncoding::Gzip, deflate: CompressionEncoding::Deflate, + lz4: CompressionEncoding::Lz4, } #[allow(dead_code)] @@ -310,6 +317,7 @@ util::parametrized_tests! { zstd: CompressionEncoding::Zstd, gzip: CompressionEncoding::Gzip, deflate: CompressionEncoding::Deflate, + lz4: CompressionEncoding::Lz4, } #[allow(dead_code)] @@ -353,6 +361,7 @@ async fn disabling_compression_on_single_response(encoding: CompressionEncoding) CompressionEncoding::Gzip => "gzip", CompressionEncoding::Zstd => "zstd", CompressionEncoding::Deflate => "deflate", + CompressionEncoding::Lz4 => "lz4", _ => panic!("unexpected encoding {encoding:?}"), }; assert_eq!(res.metadata().get("grpc-encoding").unwrap(), expected); @@ -366,6 +375,7 @@ util::parametrized_tests! { zstd: CompressionEncoding::Zstd, gzip: CompressionEncoding::Gzip, deflate: CompressionEncoding::Deflate, + lz4: CompressionEncoding::Lz4, } #[allow(dead_code)] @@ -411,6 +421,7 @@ async fn disabling_compression_on_response_but_keeping_compression_on_stream( CompressionEncoding::Gzip => "gzip", CompressionEncoding::Zstd => "zstd", CompressionEncoding::Deflate => "deflate", + CompressionEncoding::Lz4 => "lz4", _ => panic!("unexpected encoding {encoding:?}"), }; assert_eq!(res.metadata().get("grpc-encoding").unwrap(), expected); @@ -437,6 +448,7 @@ util::parametrized_tests! { zstd: CompressionEncoding::Zstd, gzip: CompressionEncoding::Gzip, deflate: CompressionEncoding::Deflate, + lz4: CompressionEncoding::Lz4, } #[allow(dead_code)] @@ -482,6 +494,7 @@ async fn disabling_compression_on_response_from_client_stream(encoding: Compress CompressionEncoding::Gzip => "gzip", CompressionEncoding::Zstd => "zstd", CompressionEncoding::Deflate => "deflate", + CompressionEncoding::Lz4 => "lz4", _ => panic!("unexpected encoding {encoding:?}"), }; assert_eq!(res.metadata().get("grpc-encoding").unwrap(), expected); diff --git a/tests/compression/src/server_stream.rs b/tests/compression/src/server_stream.rs index 7a6e1dffe..f295600f1 100644 --- a/tests/compression/src/server_stream.rs +++ b/tests/compression/src/server_stream.rs @@ -7,6 +7,7 @@ util::parametrized_tests! { zstd: CompressionEncoding::Zstd, gzip: CompressionEncoding::Gzip, deflate: CompressionEncoding::Deflate, + lz4: CompressionEncoding::Lz4, } #[allow(dead_code)] @@ -47,6 +48,7 @@ async fn client_enabled_server_enabled(encoding: CompressionEncoding) { CompressionEncoding::Gzip => "gzip", CompressionEncoding::Zstd => "zstd", CompressionEncoding::Deflate => "deflate", + CompressionEncoding::Lz4 => "lz4", _ => panic!("unexpected encoding {encoding:?}"), }; assert_eq!(res.metadata().get("grpc-encoding").unwrap(), expected); @@ -72,6 +74,7 @@ util::parametrized_tests! { client_disabled_server_enabled, zstd: CompressionEncoding::Zstd, gzip: CompressionEncoding::Gzip, + lz4: CompressionEncoding::Lz4, } #[allow(dead_code)] @@ -124,6 +127,7 @@ util::parametrized_tests! { zstd: CompressionEncoding::Zstd, gzip: CompressionEncoding::Gzip, deflate: CompressionEncoding::Deflate, + lz4: CompressionEncoding::Lz4, } #[allow(dead_code)] diff --git a/tests/compression/src/util.rs b/tests/compression/src/util.rs index 095e32290..e998b0717 100644 --- a/tests/compression/src/util.rs +++ b/tests/compression/src/util.rs @@ -162,6 +162,7 @@ impl AssertRightEncoding { CompressionEncoding::Gzip => "gzip", CompressionEncoding::Zstd => "zstd", CompressionEncoding::Deflate => "deflate", + CompressionEncoding::Lz4 => "lz4", _ => panic!("unexpected encoding {:?}", self.encoding), }; assert_eq!(req.headers().get("grpc-encoding").unwrap(), expected); diff --git a/tonic/Cargo.toml b/tonic/Cargo.toml index 27768924c..a9278dee0 100644 --- a/tonic/Cargo.toml +++ b/tonic/Cargo.toml @@ -24,6 +24,7 @@ codegen = ["dep:async-trait"] gzip = ["dep:flate2"] deflate = ["dep:flate2"] zstd = ["dep:zstd"] +lz4 = ["dep:lz4_flex"] default = ["router", "transport", "codegen", "prost"] prost = ["dep:prost"] _tls-any = ["dep:tokio-rustls", "dep:tokio", "tokio?/rt", "tokio?/macros"] # Internal. Please choose one of `tls-ring` or `tls-aws-lc` @@ -91,6 +92,7 @@ webpki-roots = { version = "1", optional = true } # compression flate2 = {version = "1.0", optional = true} zstd = { version = "0.13.0", optional = true } +lz4_flex = { version = "0.11.3", default-features = false, features = ["std", "frame"], optional = true } # channel hyper-timeout = {version = "0.5", optional = true} diff --git a/tonic/src/codec/compression.rs b/tonic/src/codec/compression.rs index b07e380a8..41c416216 100644 --- a/tonic/src/codec/compression.rs +++ b/tonic/src/codec/compression.rs @@ -4,6 +4,8 @@ use bytes::{Buf, BufMut, BytesMut}; use flate2::read::{GzDecoder, GzEncoder}; #[cfg(feature = "deflate")] use flate2::read::{ZlibDecoder, ZlibEncoder}; +#[cfg(feature = "lz4")] +use lz4_flex::frame::{FrameDecoder, FrameEncoder}; use std::fmt; #[cfg(feature = "zstd")] use zstd::stream::read::{Decoder, Encoder}; @@ -16,7 +18,7 @@ pub(crate) const ACCEPT_ENCODING_HEADER: &str = "grpc-accept-encoding"; /// Represents an ordered list of compression encodings that are enabled. #[derive(Debug, Default, Clone, Copy)] pub struct EnabledCompressionEncodings { - inner: [Option; 3], + inner: [Option; 4], } impl EnabledCompressionEncodings { @@ -92,6 +94,9 @@ pub enum CompressionEncoding { #[allow(missing_docs)] #[cfg(feature = "zstd")] Zstd, + #[allow(missing_docs)] + #[cfg(feature = "lz4")] + Lz4, } impl CompressionEncoding { @@ -102,6 +107,8 @@ impl CompressionEncoding { CompressionEncoding::Deflate, #[cfg(feature = "zstd")] CompressionEncoding::Zstd, + #[cfg(feature = "lz4")] + CompressionEncoding::Lz4, ]; /// Based on the `grpc-accept-encoding` header, pick an encoding to use. @@ -123,6 +130,8 @@ impl CompressionEncoding { "deflate" => Some(CompressionEncoding::Deflate), #[cfg(feature = "zstd")] "zstd" => Some(CompressionEncoding::Zstd), + #[cfg(feature = "lz4")] + "lz4" => Some(CompressionEncoding::Lz4), _ => None, }) } @@ -149,6 +158,10 @@ impl CompressionEncoding { b"zstd" if enabled_encodings.is_enabled(CompressionEncoding::Zstd) => { Ok(Some(CompressionEncoding::Zstd)) } + #[cfg(feature = "lz4")] + b"lz4" if enabled_encodings.is_enabled(CompressionEncoding::Lz4) => { + Ok(Some(CompressionEncoding::Lz4)) + } b"identity" => Ok(None), other => { // NOTE: Workaround for lifetime limitation. Resolved at Rust 1.79. @@ -187,10 +200,18 @@ impl CompressionEncoding { CompressionEncoding::Deflate => "deflate", #[cfg(feature = "zstd")] CompressionEncoding::Zstd => "zstd", + #[cfg(feature = "lz4")] + CompressionEncoding::Lz4 => "lz4", } } - #[cfg(any(feature = "gzip", feature = "deflate", feature = "zstd"))] + #[allow(dead_code)] + #[cfg(any( + feature = "gzip", + feature = "deflate", + feature = "zstd", + feature = "lz4" + ))] pub(crate) fn into_header_value(self) -> http::HeaderValue { http::HeaderValue::from_static(self.as_str()) } @@ -219,7 +240,13 @@ pub(crate) fn compress( let capacity = ((len / buffer_growth_interval) + 1) * buffer_growth_interval; out_buf.reserve(capacity); - #[cfg(any(feature = "gzip", feature = "deflate", feature = "zstd"))] + #[allow(unused_mut)] + #[cfg(any( + feature = "gzip", + feature = "deflate", + feature = "zstd", + feature = "lz4" + ))] let mut out_writer = out_buf.writer(); match settings.encoding { @@ -250,6 +277,12 @@ pub(crate) fn compress( )?; std::io::copy(&mut zstd_encoder, &mut out_writer)?; } + #[cfg(feature = "lz4")] + CompressionEncoding::Lz4 => { + let mut lz4_encoder = FrameEncoder::new(out_writer); + std::io::copy(&mut &decompressed_buf[0..len], &mut lz4_encoder)?; + lz4_encoder.finish()?; + } } decompressed_buf.advance(len); @@ -271,7 +304,12 @@ pub(crate) fn decompress( ((estimate_decompressed_len / buffer_growth_interval) + 1) * buffer_growth_interval; out_buf.reserve(capacity); - #[cfg(any(feature = "gzip", feature = "deflate", feature = "zstd"))] + #[cfg(any( + feature = "gzip", + feature = "deflate", + feature = "zstd", + feature = "lz4" + ))] let mut out_writer = out_buf.writer(); match settings.encoding { @@ -290,6 +328,11 @@ pub(crate) fn decompress( let mut zstd_decoder = Decoder::new(&compressed_buf[0..len])?; std::io::copy(&mut zstd_decoder, &mut out_writer)?; } + #[cfg(feature = "lz4")] + CompressionEncoding::Lz4 => { + let mut lz4_decoder = FrameDecoder::new(&compressed_buf[0..len]); + std::io::copy(&mut lz4_decoder, &mut out_writer)?; + } } compressed_buf.advance(len); @@ -329,13 +372,13 @@ mod tests { const GZIP: HeaderValue = HeaderValue::from_static("gzip,identity"); let encodings = EnabledCompressionEncodings { - inner: [Some(CompressionEncoding::Gzip), None, None], + inner: [Some(CompressionEncoding::Gzip), None, None, None], }; assert_eq!(encodings.into_accept_encoding_header_value().unwrap(), GZIP); let encodings = EnabledCompressionEncodings { - inner: [None, None, Some(CompressionEncoding::Gzip)], + inner: [None, None, None, Some(CompressionEncoding::Gzip)], }; assert_eq!(encodings.into_accept_encoding_header_value().unwrap(), GZIP); @@ -347,36 +390,61 @@ mod tests { const ZSTD: HeaderValue = HeaderValue::from_static("zstd,identity"); let encodings = EnabledCompressionEncodings { - inner: [Some(CompressionEncoding::Zstd), None, None], + inner: [Some(CompressionEncoding::Zstd), None, None, None], }; assert_eq!(encodings.into_accept_encoding_header_value().unwrap(), ZSTD); let encodings = EnabledCompressionEncodings { - inner: [None, None, Some(CompressionEncoding::Zstd)], + inner: [None, None, None, Some(CompressionEncoding::Zstd)], }; assert_eq!(encodings.into_accept_encoding_header_value().unwrap(), ZSTD); } #[test] - #[cfg(all(feature = "gzip", feature = "deflate", feature = "zstd"))] - fn convert_compression_encodings_into_header_value() { + #[cfg(feature = "lz4")] + fn convert_lz4_into_header_value() { + const LZ4: HeaderValue = HeaderValue::from_static("lz4,identity"); + + let encodings = EnabledCompressionEncodings { + inner: [Some(CompressionEncoding::Lz4), None, None, None], + }; + + assert_eq!(encodings.into_accept_encoding_header_value().unwrap(), LZ4); + + let encodings = EnabledCompressionEncodings { + inner: [None, None, None, Some(CompressionEncoding::Lz4)], + }; + + assert_eq!(encodings.into_accept_encoding_header_value().unwrap(), LZ4); + } + + #[test] + #[cfg(all( + feature = "gzip", + feature = "deflate", + feature = "zstd", + feature = "lz4" + ))] + fn convert_all_compression_encodings_into_header_value() { let encodings = EnabledCompressionEncodings { inner: [ Some(CompressionEncoding::Gzip), Some(CompressionEncoding::Deflate), Some(CompressionEncoding::Zstd), + Some(CompressionEncoding::Lz4), ], }; assert_eq!( encodings.into_accept_encoding_header_value().unwrap(), - HeaderValue::from_static("gzip,deflate,zstd,identity"), + HeaderValue::from_static("gzip,deflate,zstd,lz4,identity"), ); let encodings = EnabledCompressionEncodings { inner: [ + Some(CompressionEncoding::Lz4), Some(CompressionEncoding::Zstd), Some(CompressionEncoding::Deflate), Some(CompressionEncoding::Gzip), @@ -385,7 +453,7 @@ mod tests { assert_eq!( encodings.into_accept_encoding_header_value().unwrap(), - HeaderValue::from_static("zstd,deflate,gzip,identity"), + HeaderValue::from_static("lz4,zstd,deflate,gzip,identity"), ); } } diff --git a/tonic/src/lib.rs b/tonic/src/lib.rs index 88ac07a86..644498a95 100644 --- a/tonic/src/lib.rs +++ b/tonic/src/lib.rs @@ -39,6 +39,9 @@ //! Not enabled by default. //! - `zstd`: Enables compressing requests, responses, and streams. Depends on [`zstd`]. //! Not enabled by default. +//! - `lz4`: Enables LZ4 compression for requests, responses, and streams. Uses [`lz4_flex`] +//! with unsafe optimizations for maximum performance. Depends on [`lz4_flex`]. +//! Not enabled by default. //! //! # Structure //! @@ -86,6 +89,7 @@ //! [`webpki-roots`]: https://docs.rs/webpki-roots //! [`flate2`]: https://docs.rs/flate2 //! [`zstd`]: https://docs.rs/zstd +//! [`lz4_flex`]: https://docs.rs/lz4_flex #![recursion_limit = "256"] #![doc(