@@ -12,6 +12,7 @@ use std::{
1212 pin:: Pin ,
1313 task:: { ready, Context , Poll } ,
1414} ;
15+ #[ cfg( any( feature = "transport" , feature = "channel" , feature = "server" ) ) ]
1516use tokio:: task:: JoinHandle ;
1617use tokio_stream:: { adapters:: Fuse , Stream , StreamExt } ;
1718
@@ -38,6 +39,7 @@ struct EncodedBytes<T, U> {
3839 buf : BytesMut ,
3940 uncompression_buf : BytesMut ,
4041 error : Option < Status > ,
42+ #[ cfg( any( feature = "transport" , feature = "channel" , feature = "server" ) ) ]
4143 #[ pin]
4244 compression_task : Option < JoinHandle < Result < CompressionResult , Status > > > ,
4345}
@@ -74,6 +76,7 @@ impl<T: Encoder, U: Stream> EncodedBytes<T, U> {
7476 buf,
7577 uncompression_buf,
7678 error : None ,
79+ #[ cfg( any( feature = "transport" , feature = "channel" , feature = "server" ) ) ]
7780 compression_task : None ,
7881 }
7982 }
@@ -113,7 +116,10 @@ where
113116 uncompression_buf : & mut BytesMut ,
114117 compression_encoding : Option < CompressionEncoding > ,
115118 max_message_size : Option < usize > ,
116- compression_task : & mut Pin < & mut Option < JoinHandle < Result < CompressionResult , Status > > > > ,
119+ #[ cfg( any( feature = "transport" , feature = "channel" , feature = "server" ) ) ]
120+ compression_task : & mut Pin <
121+ & mut Option < JoinHandle < Result < CompressionResult , Status > > > ,
122+ > ,
117123 buffer_settings : & BufferSettings ,
118124 ) -> Result < bool , Status > {
119125 let compression_settings = compression_encoding
@@ -127,6 +133,8 @@ where
127133
128134 let uncompressed_len = uncompression_buf. len ( ) ;
129135
136+ // Check if we should use spawn_blocking (only when tokio is available)
137+ #[ cfg( any( feature = "transport" , feature = "channel" , feature = "server" ) ) ]
130138 if let Some ( spawn_threshold) = settings. spawn_blocking_threshold {
131139 if uncompressed_len >= spawn_threshold
132140 && uncompressed_len >= settings. compression_threshold
@@ -156,6 +164,7 @@ where
156164 Ok ( false )
157165 }
158166
167+ #[ cfg( any( feature = "transport" , feature = "channel" , feature = "server" ) ) ]
159168 fn poll_compression_task (
160169 compression_task : & mut Pin < & mut Option < JoinHandle < Result < CompressionResult , Status > > > > ,
161170 buf : & mut BytesMut ,
@@ -226,6 +235,7 @@ where
226235 buf,
227236 uncompression_buf,
228237 error,
238+ #[ cfg( any( feature = "transport" , feature = "channel" , feature = "server" ) ) ]
229239 mut compression_task,
230240 } = self . project ( ) ;
231241 let buffer_settings = encoder. buffer_settings ( ) ;
@@ -235,17 +245,20 @@ where
235245 }
236246
237247 // Check if we have an in-flight compression task
238- match Self :: poll_compression_task (
239- & mut compression_task,
240- buf,
241- * max_message_size,
242- & buffer_settings,
243- cx,
244- ) {
245- Poll :: Ready ( Some ( result) ) => return Poll :: Ready ( Some ( result) ) ,
246- Poll :: Pending => return Poll :: Pending ,
247- Poll :: Ready ( None ) => {
248- // Task completed, continue processing
248+ #[ cfg( any( feature = "transport" , feature = "channel" , feature = "server" ) ) ]
249+ {
250+ match Self :: poll_compression_task (
251+ & mut compression_task,
252+ buf,
253+ * max_message_size,
254+ & buffer_settings,
255+ cx,
256+ ) {
257+ Poll :: Ready ( Some ( result) ) => return Poll :: Ready ( Some ( result) ) ,
258+ Poll :: Pending => return Poll :: Pending ,
259+ Poll :: Ready ( None ) => {
260+ // Task completed, continue processing
261+ }
249262 }
250263 }
251264
@@ -268,32 +281,53 @@ where
268281 uncompression_buf,
269282 * compression_encoding,
270283 * max_message_size,
284+ #[ cfg( any(
285+ feature = "transport" ,
286+ feature = "channel" ,
287+ feature = "server"
288+ ) ) ]
271289 & mut compression_task,
272290 & buffer_settings,
273291 ) {
274292 Ok ( true ) => {
275- // We just spawned/armed the blocking compression task.
276- // Poll it once right away so it can capture our waker.
277- match Self :: poll_compression_task (
278- & mut compression_task,
279- buf,
280- * max_message_size,
281- & buffer_settings,
282- cx,
283- ) {
284- Poll :: Ready ( Some ( result) ) => {
285- return Poll :: Ready ( Some ( result) ) ;
286- }
287- Poll :: Ready ( None ) => {
288- if buf. len ( ) >= buffer_settings. yield_threshold {
289- return Poll :: Ready ( Some ( Ok ( buf
290- . split_to ( buf. len ( ) )
291- . freeze ( ) ) ) ) ;
293+ #[ cfg( any(
294+ feature = "transport" ,
295+ feature = "channel" ,
296+ feature = "server"
297+ ) ) ]
298+ {
299+ // We just spawned/armed the blocking compression task.
300+ // Poll it once right away so it can capture our waker.
301+ match Self :: poll_compression_task (
302+ & mut compression_task,
303+ buf,
304+ * max_message_size,
305+ & buffer_settings,
306+ cx,
307+ ) {
308+ Poll :: Ready ( Some ( result) ) => {
309+ return Poll :: Ready ( Some ( result) ) ;
310+ }
311+ Poll :: Ready ( None ) => {
312+ if buf. len ( ) >= buffer_settings. yield_threshold {
313+ return Poll :: Ready ( Some ( Ok ( buf
314+ . split_to ( buf. len ( ) )
315+ . freeze ( ) ) ) ) ;
316+ }
317+ }
318+ Poll :: Pending => {
319+ return Poll :: Pending ;
292320 }
293321 }
294- Poll :: Pending => {
295- return Poll :: Pending ;
296- }
322+ }
323+ #[ cfg( not( any(
324+ feature = "transport" ,
325+ feature = "channel" ,
326+ feature = "server"
327+ ) ) ) ]
328+ {
329+ // This shouldn't happen when tokio is not available
330+ unreachable ! ( "spawn_blocking returned true without tokio" )
297331 }
298332 }
299333 Ok ( false ) => {
0 commit comments