Skip to content

Commit 472812a

Browse files
committed
feature flags for anything that uses tokio since it might now be available
1 parent 643fd54 commit 472812a

File tree

3 files changed

+109
-53
lines changed

3 files changed

+109
-53
lines changed

tonic/src/codec/compression.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -293,6 +293,7 @@ pub(crate) fn compress(
293293
}
294294

295295
/// Decompress `len` bytes from `compressed_buf` into `out_buf`.
296+
#[cfg(any(feature = "gzip", feature = "deflate", feature = "zstd"))]
296297
#[allow(unused_variables, unreachable_code)]
297298
pub(crate) fn decompress(
298299
settings: CompressionSettings,

tonic/src/codec/decode.rs

Lines changed: 38 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,6 @@
1-
use super::compression::{decompress, CompressionEncoding, CompressionSettings};
1+
use super::compression::CompressionEncoding;
2+
#[cfg(any(feature = "gzip", feature = "deflate", feature = "zstd"))]
3+
use super::compression::{decompress, CompressionSettings};
24
use super::{BufferSettings, DecodeBuf, Decoder, DEFAULT_MAX_RECV_MESSAGE_SIZE, HEADER_SIZE};
35
use crate::{body::Body, metadata::MetadataMap, Code, Status};
46
use bytes::{Buf, BufMut, BytesMut};
@@ -30,6 +32,7 @@ struct StreamingInner {
3032
direction: Direction,
3133
buf: BytesMut,
3234
trailers: Option<HeaderMap>,
35+
#[cfg(any(feature = "gzip", feature = "deflate", feature = "zstd"))]
3336
decompress_buf: BytesMut,
3437
encoding: Option<CompressionEncoding>,
3538
max_message_size: Option<usize>,
@@ -136,6 +139,7 @@ impl<T> Streaming<T> {
136139
direction,
137140
buf: BytesMut::with_capacity(buffer_size),
138141
trailers: None,
142+
#[cfg(any(feature = "gzip", feature = "deflate", feature = "zstd"))]
139143
decompress_buf: BytesMut::new(),
140144
encoding,
141145
max_message_size,
@@ -147,6 +151,10 @@ impl<T> Streaming<T> {
147151
impl StreamingInner {
148152
fn decode_chunk(
149153
&mut self,
154+
#[cfg_attr(
155+
not(any(feature = "gzip", feature = "deflate", feature = "zstd")),
156+
allow(unused_variables)
157+
)]
150158
buffer_settings: BufferSettings,
151159
) -> Result<Option<DecodeBuf<'_>>, Status> {
152160
if let State::ReadHeader = self.state {
@@ -209,26 +217,36 @@ impl StreamingInner {
209217
return Ok(None);
210218
}
211219

212-
let decode_buf = if let Some(encoding) = compression {
213-
self.decompress_buf.clear();
214-
215-
if let Err(err) = decompress(
216-
CompressionSettings::new(encoding, buffer_settings.buffer_size),
217-
&mut self.buf,
218-
&mut self.decompress_buf,
219-
len,
220-
) {
221-
let message = if let Direction::Response(status) = self.direction {
222-
format!(
223-
"Error decompressing: {err}, while receiving response with status: {status}"
224-
)
225-
} else {
226-
format!("Error decompressing: {err}, while sending request")
227-
};
228-
return Err(Status::internal(message));
220+
let decode_buf = if let Some(_encoding) = compression {
221+
#[cfg(any(feature = "gzip", feature = "deflate", feature = "zstd"))]
222+
{
223+
let encoding = _encoding;
224+
self.decompress_buf.clear();
225+
226+
if let Err(err) = decompress(
227+
CompressionSettings::new(encoding, buffer_settings.buffer_size),
228+
&mut self.buf,
229+
&mut self.decompress_buf,
230+
len,
231+
) {
232+
let message = if let Direction::Response(status) = self.direction {
233+
format!(
234+
"Error decompressing: {err}, while receiving response with status: {status}"
235+
)
236+
} else {
237+
format!("Error decompressing: {err}, while sending request")
238+
};
239+
return Err(Status::internal(message));
240+
}
241+
let decompressed_len = self.decompress_buf.len();
242+
DecodeBuf::new(&mut self.decompress_buf, decompressed_len)
243+
}
244+
#[cfg(not(any(feature = "gzip", feature = "deflate", feature = "zstd")))]
245+
{
246+
// This branch is unreachable when no compression features are enabled
247+
// because CompressionEncoding has no variants
248+
unreachable!("Compression encoding without compression features")
229249
}
230-
let decompressed_len = self.decompress_buf.len();
231-
DecodeBuf::new(&mut self.decompress_buf, decompressed_len)
232250
} else {
233251
DecodeBuf::new(&mut self.buf, len)
234252
};

tonic/src/codec/encode.rs

Lines changed: 70 additions & 33 deletions
Original file line numberDiff line numberDiff line change
@@ -7,14 +7,17 @@ use bytes::{BufMut, Bytes, BytesMut};
77
use http::HeaderMap;
88
use http_body::{Body, Frame};
99
use pin_project::pin_project;
10+
#[cfg(any(feature = "transport", feature = "channel", feature = "server"))]
1011
use std::future::Future;
1112
use std::{
1213
pin::Pin,
1314
task::{ready, Context, Poll},
1415
};
16+
#[cfg(any(feature = "transport", feature = "channel", feature = "server"))]
1517
use tokio::task::JoinHandle;
1618
use tokio_stream::{adapters::Fuse, Stream, StreamExt};
1719

20+
#[cfg(any(feature = "transport", feature = "channel", feature = "server"))]
1821
#[derive(Debug)]
1922
struct CompressionResult {
2023
compressed_data: BytesMut,
@@ -38,6 +41,7 @@ struct EncodedBytes<T, U> {
3841
buf: BytesMut,
3942
uncompression_buf: BytesMut,
4043
error: Option<Status>,
44+
#[cfg(any(feature = "transport", feature = "channel", feature = "server"))]
4145
#[pin]
4246
compression_task: Option<JoinHandle<Result<CompressionResult, Status>>>,
4347
}
@@ -74,6 +78,7 @@ impl<T: Encoder, U: Stream> EncodedBytes<T, U> {
7478
buf,
7579
uncompression_buf,
7680
error: None,
81+
#[cfg(any(feature = "transport", feature = "channel", feature = "server"))]
7782
compression_task: None,
7883
}
7984
}
@@ -113,7 +118,10 @@ where
113118
uncompression_buf: &mut BytesMut,
114119
compression_encoding: Option<CompressionEncoding>,
115120
max_message_size: Option<usize>,
116-
compression_task: &mut Pin<&mut Option<JoinHandle<Result<CompressionResult, Status>>>>,
121+
#[cfg(any(feature = "transport", feature = "channel", feature = "server"))]
122+
compression_task: &mut Pin<
123+
&mut Option<JoinHandle<Result<CompressionResult, Status>>>,
124+
>,
117125
buffer_settings: &BufferSettings,
118126
) -> Result<bool, Status> {
119127
let compression_settings = compression_encoding
@@ -127,6 +135,8 @@ where
127135

128136
let uncompressed_len = uncompression_buf.len();
129137

138+
// Check if we should use spawn_blocking (only when tokio is available)
139+
#[cfg(any(feature = "transport", feature = "channel", feature = "server"))]
130140
if let Some(spawn_threshold) = settings.spawn_blocking_threshold {
131141
if uncompressed_len >= spawn_threshold
132142
&& uncompressed_len >= settings.compression_threshold
@@ -156,6 +166,7 @@ where
156166
Ok(false)
157167
}
158168

169+
#[cfg(any(feature = "transport", feature = "channel", feature = "server"))]
159170
fn poll_compression_task(
160171
compression_task: &mut Pin<&mut Option<JoinHandle<Result<CompressionResult, Status>>>>,
161172
buf: &mut BytesMut,
@@ -164,7 +175,7 @@ where
164175
cx: &mut Context<'_>,
165176
) -> Poll<Option<Result<Bytes, Status>>> {
166177
if let Some(task) = compression_task.as_mut().as_pin_mut() {
167-
match task.poll(cx) {
178+
match Future::poll(task, cx) {
168179
Poll::Ready(Ok(Ok(result))) => {
169180
compression_task.set(None);
170181

@@ -226,6 +237,7 @@ where
226237
buf,
227238
uncompression_buf,
228239
error,
240+
#[cfg(any(feature = "transport", feature = "channel", feature = "server"))]
229241
mut compression_task,
230242
} = self.project();
231243
let buffer_settings = encoder.buffer_settings();
@@ -235,17 +247,20 @@ where
235247
}
236248

237249
// Check if we have an in-flight compression task
238-
match Self::poll_compression_task(
239-
&mut compression_task,
240-
buf,
241-
*max_message_size,
242-
&buffer_settings,
243-
cx,
244-
) {
245-
Poll::Ready(Some(result)) => return Poll::Ready(Some(result)),
246-
Poll::Pending => return Poll::Pending,
247-
Poll::Ready(None) => {
248-
// Task completed, continue processing
250+
#[cfg(any(feature = "transport", feature = "channel", feature = "server"))]
251+
{
252+
match Self::poll_compression_task(
253+
&mut compression_task,
254+
buf,
255+
*max_message_size,
256+
&buffer_settings,
257+
cx,
258+
) {
259+
Poll::Ready(Some(result)) => return Poll::Ready(Some(result)),
260+
Poll::Pending => return Poll::Pending,
261+
Poll::Ready(None) => {
262+
// Task completed, continue processing
263+
}
249264
}
250265
}
251266

@@ -268,32 +283,53 @@ where
268283
uncompression_buf,
269284
*compression_encoding,
270285
*max_message_size,
286+
#[cfg(any(
287+
feature = "transport",
288+
feature = "channel",
289+
feature = "server"
290+
))]
271291
&mut compression_task,
272292
&buffer_settings,
273293
) {
274294
Ok(true) => {
275-
// We just spawned/armed the blocking compression task.
276-
// Poll it once right away so it can capture our waker.
277-
match Self::poll_compression_task(
278-
&mut compression_task,
279-
buf,
280-
*max_message_size,
281-
&buffer_settings,
282-
cx,
283-
) {
284-
Poll::Ready(Some(result)) => {
285-
return Poll::Ready(Some(result));
286-
}
287-
Poll::Ready(None) => {
288-
if buf.len() >= buffer_settings.yield_threshold {
289-
return Poll::Ready(Some(Ok(buf
290-
.split_to(buf.len())
291-
.freeze())));
295+
#[cfg(any(
296+
feature = "transport",
297+
feature = "channel",
298+
feature = "server"
299+
))]
300+
{
301+
// We just spawned/armed the blocking compression task.
302+
// Poll it once right away so it can capture our waker.
303+
match Self::poll_compression_task(
304+
&mut compression_task,
305+
buf,
306+
*max_message_size,
307+
&buffer_settings,
308+
cx,
309+
) {
310+
Poll::Ready(Some(result)) => {
311+
return Poll::Ready(Some(result));
312+
}
313+
Poll::Ready(None) => {
314+
if buf.len() >= buffer_settings.yield_threshold {
315+
return Poll::Ready(Some(Ok(buf
316+
.split_to(buf.len())
317+
.freeze())));
318+
}
319+
}
320+
Poll::Pending => {
321+
return Poll::Pending;
292322
}
293323
}
294-
Poll::Pending => {
295-
return Poll::Pending;
296-
}
324+
}
325+
#[cfg(not(any(
326+
feature = "transport",
327+
feature = "channel",
328+
feature = "server"
329+
)))]
330+
{
331+
// This shouldn't happen when tokio is not available
332+
unreachable!("spawn_blocking returned true without tokio")
297333
}
298334
}
299335
Ok(false) => {
@@ -319,6 +355,7 @@ where
319355
}
320356

321357
/// Compress data in a blocking task (called via spawn_blocking)
358+
#[cfg(any(feature = "transport", feature = "channel", feature = "server"))]
322359
fn compress_blocking(
323360
data: Bytes,
324361
settings: CompressionSettings,

0 commit comments

Comments
 (0)