diff --git a/tonic/Cargo.toml b/tonic/Cargo.toml index 989b8f8c0..6b9c90127 100644 --- a/tonic/Cargo.toml +++ b/tonic/Cargo.toml @@ -27,9 +27,11 @@ default = ["transport", "codegen", "prost"] codegen = ["async-trait"] transport = [ "h2", - "hyper", + "hyper/full", "tokio", + "tokio/net", "tower", + "tower/balance", "tracing-futures", "tokio/macros", "tokio/time", @@ -39,6 +41,13 @@ tls-roots-common = ["tls"] tls-roots = ["tls-roots-common", "rustls-native-certs"] tls-webpki-roots = ["tls-roots-common", "webpki-roots"] prost = ["prost1", "prost-derive"] +client = [ + "h2", + "hyper/client", + "hyper/http2", + "tokio", + "tower", +] # [[bench]] # name = "bench_main" @@ -69,8 +78,8 @@ async-trait = { version = "0.1.13", optional = true } # transport h2 = { version = "0.3", optional = true } -hyper = { version = "0.14.2", features = ["full"], optional = true } -tokio = { version = "1.0.1", features = ["net"], optional = true } +hyper = { version = "0.14.2", default-features = false, optional = true } +tokio = { version = "1.0.1", default-features = false, optional = true } tokio-stream = "0.1" tower = { version = "0.4.7", features = ["balance", "buffer", "discover", "limit", "load", "make", "timeout", "util"], optional = true } tracing-futures = { version = "0.2", optional = true } @@ -80,6 +89,9 @@ tokio-rustls = { version = "0.22", optional = true } rustls-native-certs = { version = "0.5", optional = true } webpki-roots = { version = "0.21.1", optional = true } +[target.'cfg(target_arch = "wasm32")'.dependencies] +wasm-bindgen-futures = "0.4.18" + [dev-dependencies] tokio = { version = "1.0", features = ["rt", "macros"] } static_assertions = "1.0" diff --git a/tonic/src/lib.rs b/tonic/src/lib.rs index 4d91643c1..52a8fed5d 100644 --- a/tonic/src/lib.rs +++ b/tonic/src/lib.rs @@ -87,7 +87,7 @@ pub mod metadata; pub mod server; pub mod service; -#[cfg(feature = "transport")] +#[cfg(any(feature = "transport", feature = "client"))] #[cfg_attr(docsrs, doc(cfg(feature = "transport")))] pub mod transport; @@ -112,6 +112,15 @@ pub use status::{Code, Status}; pub(crate) type Error = Box; +#[cfg(all( + any(feature = "transport", feature = "client"), + not(target_arch = "wasm32") +))] +pub(crate) use tokio::spawn; +#[cfg(all(any(feature = "transport", feature = "client"), target_arch = "wasm32"))] +#[cfg(target_arch = "wasm32")] +pub(crate) use wasm_bindgen_futures::spawn_local as spawn; + #[doc(hidden)] #[cfg(feature = "codegen")] #[cfg_attr(docsrs, doc(cfg(feature = "codegen")))] diff --git a/tonic/src/transport/channel/endpoint.rs b/tonic/src/transport/channel/endpoint.rs index 204ab7bc9..51d9c9ff7 100644 --- a/tonic/src/transport/channel/endpoint.rs +++ b/tonic/src/transport/channel/endpoint.rs @@ -241,6 +241,7 @@ impl Endpoint { } /// Create a channel from this config. + #[cfg(feature = "transport")] pub async fn connect(&self) -> Result { let mut http = hyper::client::connect::HttpConnector::new(); http.enforce_http(false); @@ -260,6 +261,7 @@ impl Endpoint { /// /// The channel returned by this method does not attempt to connect to the endpoint until first /// use. + #[cfg(feature = "transport")] pub fn connect_lazy(&self) -> Result { let mut http = hyper::client::connect::HttpConnector::new(); http.enforce_http(false); diff --git a/tonic/src/transport/channel/mod.rs b/tonic/src/transport/channel/mod.rs index c2ecd8e39..12bfbc187 100644 --- a/tonic/src/transport/channel/mod.rs +++ b/tonic/src/transport/channel/mod.rs @@ -9,7 +9,7 @@ pub use endpoint::Endpoint; #[cfg(feature = "tls")] pub use tls::ClientTlsConfig; -use super::service::{Connection, DynamicServiceStream}; +use super::service::Connection; use crate::body::BoxBody; use bytes::Bytes; use http::{ @@ -20,19 +20,20 @@ use hyper::client::connect::Connection as HyperConnection; use std::{ fmt, future::Future, - hash::Hash, pin::Pin, task::{Context, Poll}, }; -use tokio::{ - io::{AsyncRead, AsyncWrite}, - sync::mpsc::{channel, Sender}, -}; +use tokio::io::{AsyncRead, AsyncWrite}; +#[cfg(feature = "transport")] +use tokio::sync::mpsc::{channel, Sender}; -use tower::balance::p2c::Balance; +#[cfg(feature = "transport")] use tower::{ - buffer::{self, Buffer}, + balance::p2c::Balance, discover::{Change, Discover}, +}; +use tower::{ + buffer::{self, Buffer}, util::{BoxService, Either}, Service, }; @@ -108,6 +109,7 @@ impl Channel { /// /// This creates a [`Channel`] that will load balance accross all the /// provided endpoints. + #[cfg(feature = "transport")] pub fn balance_list(list: impl Iterator) -> Self { let (channel, tx) = Self::balance_channel(DEFAULT_BUFFER_SIZE); list.for_each(|endpoint| { @@ -121,15 +123,17 @@ impl Channel { /// Balance a list of [`Endpoint`]'s. /// /// This creates a [`Channel`] that will listen to a stream of change events and will add or remove provided endpoints. + #[cfg(feature = "transport")] pub fn balance_channel(capacity: usize) -> (Self, Sender>) where - K: Hash + Eq + Send + Clone + 'static, + K: std::hash::Hash + Eq + Send + Clone + 'static, { let (tx, rx) = channel(capacity); - let list = DynamicServiceStream::new(rx); + let list = super::service::DynamicServiceStream::new(rx); (Self::balance(list, DEFAULT_BUFFER_SIZE), tx) } + #[cfg(feature = "transport")] pub(crate) fn new(connector: C, endpoint: Endpoint) -> Self where C: Service + Send + 'static, @@ -140,7 +144,8 @@ impl Channel { let buffer_size = endpoint.buffer_size.clone().unwrap_or(DEFAULT_BUFFER_SIZE); let svc = Connection::lazy(connector, endpoint); - let svc = Buffer::new(Either::A(svc), buffer_size); + let (svc, worker) = Buffer::pair(Either::A(svc), buffer_size); + crate::spawn(worker); Channel { svc } } @@ -157,21 +162,24 @@ impl Channel { let svc = Connection::connect(connector, endpoint) .await .map_err(super::Error::from_source)?; - let svc = Buffer::new(Either::A(svc), buffer_size); + let (svc, worker) = Buffer::pair(Either::A(svc), buffer_size); + crate::spawn(worker); Ok(Channel { svc }) } + #[cfg(feature = "transport")] pub(crate) fn balance(discover: D, buffer_size: usize) -> Self where D: Discover + Unpin + Send + 'static, D::Error: Into, - D::Key: Hash + Send + Clone, + D::Key: std::hash::Hash + Send + Clone, { let svc = Balance::new(discover); let svc = BoxService::new(svc); - let svc = Buffer::new(Either::B(svc), buffer_size); + let (svc, worker) = Buffer::pair(Either::B(svc), buffer_size); + crate::spawn(worker); Channel { svc } } diff --git a/tonic/src/transport/mod.rs b/tonic/src/transport/mod.rs index 6cc4c2890..ac09192a2 100644 --- a/tonic/src/transport/mod.rs +++ b/tonic/src/transport/mod.rs @@ -87,6 +87,7 @@ //! [rustls]: https://docs.rs/rustls/0.16.0/rustls/ pub mod channel; +#[cfg(feature = "transport")] pub mod server; mod error; @@ -96,6 +97,7 @@ mod tls; #[doc(inline)] pub use self::channel::{Channel, Endpoint}; pub use self::error::Error; +#[cfg(feature = "transport")] #[doc(inline)] pub use self::server::{NamedService, Server}; #[doc(inline)] diff --git a/tonic/src/transport/service/connection.rs b/tonic/src/transport/service/connection.rs index f321f3402..4b412f296 100644 --- a/tonic/src/transport/service/connection.rs +++ b/tonic/src/transport/service/connection.rs @@ -34,23 +34,34 @@ impl Connection { C::Future: Unpin + Send, C::Response: AsyncRead + AsyncWrite + HyperConnection + Unpin + Send + 'static, { - let mut settings = Builder::new() + let mut settings = Builder::new(); + settings .http2_initial_stream_window_size(endpoint.init_stream_window_size) .http2_initial_connection_window_size(endpoint.init_connection_window_size) - .http2_only(true) - .http2_keep_alive_interval(endpoint.http2_keep_alive_interval) - .clone(); + .http2_only(true); - if let Some(val) = endpoint.http2_keep_alive_timeout { - settings.http2_keep_alive_timeout(val); + if let Some(val) = endpoint.http2_adaptive_window { + settings.http2_adaptive_window(val); } - if let Some(val) = endpoint.http2_keep_alive_while_idle { - settings.http2_keep_alive_while_idle(val); + #[cfg(target_arch = "wasm32")] + { + settings + .executor(wasm::Executor) + // reset streams require `Instant::now` which is not available on wasm + .http2_max_concurrent_reset_streams(0); } - if let Some(val) = endpoint.http2_adaptive_window { - settings.http2_adaptive_window(val); + #[cfg(feature = "transport")] + { + settings.http2_keep_alive_interval(endpoint.http2_keep_alive_interval); + if let Some(val) = endpoint.http2_keep_alive_timeout { + settings.http2_keep_alive_timeout(val); + } + + if let Some(val) = endpoint.http2_keep_alive_while_idle { + settings.http2_keep_alive_while_idle(val); + } } let stack = ServiceBuilder::new() @@ -81,6 +92,7 @@ impl Connection { Self::new(connector, endpoint, false).ready_oneshot().await } + #[cfg(feature = "transport")] pub(crate) fn lazy(connector: C, endpoint: Endpoint) -> Self where C: Service + Send + 'static, @@ -119,3 +131,19 @@ impl fmt::Debug for Connection { f.debug_struct("Connection").finish() } } + +#[cfg(target_arch = "wasm32")] +mod wasm { + use std::future::Future; + use std::pin::Pin; + + type BoxSendFuture = Pin + Send>>; + + pub(crate) struct Executor; + + impl hyper::rt::Executor for Executor { + fn execute(&self, fut: BoxSendFuture) { + wasm_bindgen_futures::spawn_local(fut) + } + } +} diff --git a/tonic/src/transport/service/io.rs b/tonic/src/transport/service/io.rs index 0419336a7..34fd88ddc 100644 --- a/tonic/src/transport/service/io.rs +++ b/tonic/src/transport/service/io.rs @@ -1,3 +1,4 @@ +#[cfg(feature = "transport")] use crate::transport::server::Connected; use hyper::client::connect::{Connected as HyperConnected, Connection}; use std::io; @@ -28,6 +29,7 @@ impl Connection for BoxedIo { } } +#[cfg(feature = "transport")] impl Connected for BoxedIo { type ConnectInfo = NoneConnectInfo; @@ -67,21 +69,24 @@ impl AsyncWrite for BoxedIo { } } +#[cfg(feature = "transport")] pub(crate) enum ServerIo { Io(IO), #[cfg(feature = "tls")] TlsIo(TlsStream), } +#[cfg(feature = "transport")] use tower::util::Either; -#[cfg(feature = "tls")] +#[cfg(all(feature = "transport", feature = "tls"))] type ServerIoConnectInfo = Either<::ConnectInfo, as Connected>::ConnectInfo>; -#[cfg(not(feature = "tls"))] +#[cfg(all(feature = "transport", not(feature = "tls")))] type ServerIoConnectInfo = Either<::ConnectInfo, ()>; +#[cfg(feature = "transport")] impl ServerIo { pub(in crate::transport) fn new_io(io: IO) -> Self { Self::Io(io) @@ -92,7 +97,7 @@ impl ServerIo { Self::TlsIo(io) } - #[cfg(feature = "tls")] + #[cfg(all(feature = "transport", feature = "tls"))] pub(in crate::transport) fn connect_info(&self) -> ServerIoConnectInfo where IO: Connected, @@ -104,7 +109,7 @@ impl ServerIo { } } - #[cfg(not(feature = "tls"))] + #[cfg(all(feature = "transport", not(feature = "tls")))] pub(in crate::transport) fn connect_info(&self) -> ServerIoConnectInfo where IO: Connected, @@ -115,6 +120,7 @@ impl ServerIo { } } +#[cfg(feature = "transport")] impl AsyncRead for ServerIo where IO: AsyncWrite + AsyncRead + Unpin, @@ -132,6 +138,7 @@ where } } +#[cfg(feature = "transport")] impl AsyncWrite for ServerIo where IO: AsyncWrite + AsyncRead + Unpin, diff --git a/tonic/src/transport/service/mod.rs b/tonic/src/transport/service/mod.rs index 4e1d89c0c..3edef0147 100644 --- a/tonic/src/transport/service/mod.rs +++ b/tonic/src/transport/service/mod.rs @@ -1,10 +1,12 @@ mod add_origin; mod connection; mod connector; +#[cfg(feature = "transport")] mod discover; mod grpc_timeout; mod io; mod reconnect; +#[cfg(feature = "transport")] mod router; #[cfg(feature = "tls")] mod tls; @@ -13,9 +15,13 @@ mod user_agent; pub(crate) use self::add_origin::AddOrigin; pub(crate) use self::connection::Connection; pub(crate) use self::connector::connector; +#[cfg(feature = "transport")] pub(crate) use self::discover::DynamicServiceStream; +#[cfg(feature = "transport")] pub(crate) use self::grpc_timeout::GrpcTimeout; +#[cfg(feature = "transport")] pub(crate) use self::io::ServerIo; +#[cfg(feature = "transport")] pub(crate) use self::router::{Or, Routes}; #[cfg(feature = "tls")] pub(crate) use self::tls::{TlsAcceptor, TlsConnector};