From c8a4676a55173c067675ede32624854b22f48d35 Mon Sep 17 00:00:00 2001 From: Bouke van der Bijl Date: Tue, 7 Oct 2025 14:30:32 +0200 Subject: [PATCH] tonic-web: proxy any kind of service This allows applying the GrpcWebLayer to any kind of Service, not just ones that tonic generates. This makes it possible to use tonic-web as a grpc-web proxy to a gRPC server implemented in another language for example. --- tests/web/Cargo.toml | 2 + tests/web/tests/grpc_web.rs | 94 +++++++++++++++++++++++++++++++++++++ tonic-web/src/call.rs | 28 ++++++----- tonic-web/src/layer.rs | 45 ++++++++++++++---- tonic-web/src/lib.rs | 3 +- tonic-web/src/service.rs | 87 ++++++++++++++++++++++------------ 6 files changed, 207 insertions(+), 52 deletions(-) diff --git a/tests/web/Cargo.toml b/tests/web/Cargo.toml index edef9fa2b..e04317c33 100644 --- a/tests/web/Cargo.toml +++ b/tests/web/Cargo.toml @@ -18,6 +18,8 @@ tonic-prost = { path = "../../tonic-prost" } [dev-dependencies] tonic-web = { path = "../../tonic-web" } +tower-layer = "0.3" +tower-service = "0.3" [build-dependencies] tonic-prost-build = { path = "../../tonic-prost-build" } diff --git a/tests/web/tests/grpc_web.rs b/tests/web/tests/grpc_web.rs index 32e0b4737..6ada7ee7b 100644 --- a/tests/web/tests/grpc_web.rs +++ b/tests/web/tests/grpc_web.rs @@ -42,6 +42,29 @@ async fn binary_request() { assert_eq!(&trailers[..], b"grpc-status:0\r\n"); } +#[tokio::test] +async fn binary_request_reverse_proxy() { + let server_url = spawn_reverse_proxy().await; + let client = Client::builder(TokioExecutor::new()).build_http(); + + let req = build_request(server_url, "grpc-web", "grpc-web"); + let res = client.request(req).await.unwrap(); + let content_type = res.headers().get(header::CONTENT_TYPE).unwrap().clone(); + let content_type = content_type.to_str().unwrap(); + + assert_eq!(res.status(), StatusCode::OK); + assert_eq!(content_type, "application/grpc-web+proto"); + + let (message, trailers) = decode_body(res.into_body(), content_type).await; + let expected = Output { + id: 1, + desc: "one".to_owned(), + }; + + assert_eq!(message, expected); + assert_eq!(&trailers[..], b"grpc-status:0\r\n"); +} + #[tokio::test] async fn text_request() { let server_url = spawn().await; @@ -84,6 +107,77 @@ async fn spawn() -> String { url } +/// Spawn two servers, one serving the gRPC API and another acting as a grpc-web proxy +async fn spawn_reverse_proxy() -> String { + use hyper_util::client::legacy::Client; + use hyper_util::rt::TokioIo; + use tower_layer::Layer; + use tower_service::Service; + + // Set up gRPC service + let addr = SocketAddr::from(([127, 0, 0, 1], 0)); + let listener = TcpListener::bind(addr).await.expect("listener"); + let url = format!("http://{}", listener.local_addr().unwrap()); + let listener_stream = TcpListenerStream::new(listener); + + drop(tokio::spawn(async move { + Server::builder() + .add_service(TestServer::new(Svc)) + .serve_with_incoming(listener_stream) + .await + .unwrap() + })); + + // Set up proxy to the above service that applies tonic-web + let addr2 = SocketAddr::from(([127, 0, 0, 1], 0)); + let http_client = Client::builder(TokioExecutor::new()) + .http2_only(true) + .build_http(); + let listener2 = TcpListener::bind(addr2).await.expect("listener"); + let url2 = format!("http://{}", listener2.local_addr().unwrap()); + + let backend_url = url.clone(); + + drop(tokio::spawn(async move { + loop { + let (stream, _) = listener2.accept().await.unwrap(); + let io = TokioIo::new(stream); + let client = http_client.clone(); + let backend = backend_url.clone(); + + tokio::spawn(async move { + let svc = GrpcWebLayer::new().layer(client.clone()); + let hyper_svc = hyper::service::service_fn(move |mut req: Request| { + let mut svc = svc.clone(); + let backend = backend.clone(); + async move { + // Rewrite URI to point to backend + let path = req + .uri() + .path_and_query() + .map(|pq| pq.as_str()) + .unwrap_or("/"); + let new_uri = format!("{}{}", backend, path).parse().unwrap(); + *req.uri_mut() = new_uri; + + let req = req.map(Body::new); + svc.call(req).await + } + }); + + if let Err(err) = hyper_util::server::conn::auto::Builder::new(TokioExecutor::new()) + .serve_connection(io, hyper_svc) + .await + { + eprintln!("Error serving connection: {:?}", err); + } + }); + } + })); + + url2 +} + fn encode_body() -> Bytes { let input = Input { id: 1, diff --git a/tonic-web/src/call.rs b/tonic-web/src/call.rs index a5e0ad859..74e2c3571 100644 --- a/tonic-web/src/call.rs +++ b/tonic-web/src/call.rs @@ -1,4 +1,3 @@ -use std::fmt; use std::pin::Pin; use std::task::{ready, Context, Poll}; @@ -10,6 +9,8 @@ use pin_project::pin_project; use tokio_stream::Stream; use tonic::Status; +use crate::BoxError; + use self::content_types::*; // A grpc header is u8 (flag) + u32 (msg len) @@ -158,11 +159,11 @@ impl GrpcWebCall { } } -impl GrpcWebCall +impl GrpcWebCall where - B: Body, - B::Data: Buf, - B::Error: fmt::Display, + B: Body, + B::Error: Into + Send, + D: Buf, { // Poll body for data, decoding (e.g. via Base64 if necessary) and returning frames // to the caller. If the caller is a client, it should look for trailers before @@ -247,10 +248,11 @@ where } } -impl Body for GrpcWebCall +impl Body for GrpcWebCall where - B: Body, - B::Error: fmt::Display, + B: Body, + B::Error: Into + Send, + D: Buf, { type Data = Bytes; type Error = Status; @@ -336,10 +338,11 @@ where } } -impl Stream for GrpcWebCall +impl Stream for GrpcWebCall where - B: Body, - B::Error: fmt::Display, + B: Body, + B::Error: Into + Send, + D: Buf, { type Item = Result, Status>; @@ -372,7 +375,8 @@ impl Encoding { } } -fn internal_error(e: impl std::fmt::Display) -> Status { +fn internal_error(e: impl Into + Send) -> Status { + let e = e.into(); Status::internal(format!("tonic-web: {e}")) } diff --git a/tonic-web/src/layer.rs b/tonic-web/src/layer.rs index ac171af8c..e13f087d9 100644 --- a/tonic-web/src/layer.rs +++ b/tonic-web/src/layer.rs @@ -1,22 +1,49 @@ -use super::GrpcWebService; +use std::error::Error; + +use super::{BoxError, GrpcWebService}; +use tonic::body::Body; use tower_layer::Layer; +use tower_service::Service; /// Layer implementing the grpc-web protocol. -#[derive(Debug, Default, Clone)] -pub struct GrpcWebLayer { - _priv: (), +#[derive(Debug)] +pub struct GrpcWebLayer { + _markers: std::marker::PhantomData ResBody>, +} + +impl Clone for GrpcWebLayer { + fn clone(&self) -> Self { + Self { + _markers: std::marker::PhantomData, + } + } } -impl GrpcWebLayer { +impl GrpcWebLayer { /// Create a new grpc-web layer. - pub fn new() -> GrpcWebLayer { - Self::default() + pub fn new() -> Self { + Self { + _markers: std::marker::PhantomData, + } + } +} + +impl Default for GrpcWebLayer { + fn default() -> Self { + Self::new() } } -impl Layer for GrpcWebLayer { - type Service = GrpcWebService; +impl Layer for GrpcWebLayer +where + S: Service, Response = http::Response> + Send + 'static, + S::Future: Send + 'static, + S::Error: Into + Send, + ResBody: http_body::Body + Send + 'static, + ResBody::Error: Error + Send + Sync + 'static, +{ + type Service = GrpcWebService; fn layer(&self, inner: S) -> Self::Service { GrpcWebService::new(inner) diff --git a/tonic-web/src/lib.rs b/tonic-web/src/lib.rs index b8473f579..55cab1a06 100644 --- a/tonic-web/src/lib.rs +++ b/tonic-web/src/lib.rs @@ -80,7 +80,8 @@ mod client; mod layer; mod service; -type BoxError = Box; +/// Alias for a type-erased error type. +pub type BoxError = Box; pub(crate) mod util { pub(crate) mod base64 { diff --git a/tonic-web/src/service.rs b/tonic-web/src/service.rs index e0d75f4b3..b4af8009e 100644 --- a/tonic-web/src/service.rs +++ b/tonic-web/src/service.rs @@ -1,22 +1,37 @@ use core::fmt; +use std::error::Error; use std::future::Future; use std::pin::Pin; use std::task::{ready, Context, Poll}; +use bytes::Buf; use http::{header, HeaderMap, HeaderValue, Method, Request, Response, StatusCode, Version}; +use http_body::Body as HttpBody; use pin_project::pin_project; +use tonic::body::Body; use tonic::metadata::GRPC_CONTENT_TYPE; -use tonic::{body::Body, server::NamedService}; +use tonic::server::NamedService; use tower_service::Service; use tracing::{debug, trace}; use crate::call::content_types::is_grpc_web; use crate::call::{Encoding, GrpcWebCall}; +use crate::BoxError; /// Service implementing the grpc-web protocol. -#[derive(Debug, Clone)] -pub struct GrpcWebService { +#[derive(Debug)] +pub struct GrpcWebService { inner: S, + _markers: std::marker::PhantomData ResBody>, +} + +impl Clone for GrpcWebService { + fn clone(&self) -> Self { + Self { + inner: self.inner.clone(), + _markers: std::marker::PhantomData, + } + } } #[derive(Debug, PartialEq)] @@ -37,19 +52,24 @@ enum RequestKind<'a> { Other(http::Version), } -impl GrpcWebService { +impl GrpcWebService { pub(crate) fn new(inner: S) -> Self { - GrpcWebService { inner } + GrpcWebService { + inner, + _markers: std::marker::PhantomData, + } } } -impl Service> for GrpcWebService +impl Service> for GrpcWebService where - S: Service, Response = Response>, - ReqBody: http_body::Body + Send + 'static, - ReqBody::Error: Into + fmt::Display, - ResBody: http_body::Body + Send + 'static, - ResBody::Error: Into + fmt::Display, + S: Service, Response = Response> + Send + 'static, + S::Future: Send + 'static, + S::Error: Into + Send, + ReqBody: HttpBody + Send + 'static, + ReqBody::Error: Into + Send, + ResBody: HttpBody + Send + 'static, + ResBody::Error: Error + Send + Sync + 'static, { type Response = Response; type Error = S::Error; @@ -79,7 +99,9 @@ where ResponseFuture { case: Case::GrpcWeb { - future: self.inner.call(coerce_request(req, encoding)), + future: self + .inner + .call(coerce_request(req, encoding).map(Body::new)), accept, }, } @@ -154,11 +176,12 @@ impl Case { } } -impl Future for ResponseFuture +impl Future for ResponseFuture where - F: Future, E>>, - B: http_body::Body + Send + 'static, - B::Error: Into + fmt::Display, + F: Future, E>> + Send + 'static, + ResBody: HttpBody + Send + 'static, + ResBody::Error: Error + Send + Sync, + E: Into + Send, { type Output = Result, E>; @@ -169,7 +192,7 @@ where CaseProj::GrpcWeb { future, accept } => { let res = ready!(future.poll(cx))?; - Poll::Ready(Ok(coerce_response(res, *accept))) + Poll::Ready(Ok(coerce_response(res, *accept).map(Body::new))) } CaseProj::Other { future } => future.poll(cx).map_ok(|res| res.map(Body::new)), CaseProj::ImmediateResponse { res } => { @@ -180,7 +203,7 @@ where } } -impl NamedService for GrpcWebService { +impl NamedService for GrpcWebService { const NAME: &'static str = S::NAME; } @@ -207,11 +230,10 @@ impl<'a> RequestKind<'a> { // Mutating request headers to conform to a gRPC request is not really // necessary for us at this point. We could remove most of these except // maybe for inserting `header::TE`, which tonic should check? -fn coerce_request(mut req: Request, encoding: Encoding) -> Request -where - B: http_body::Body + Send + 'static, - B::Error: Into + fmt::Display, -{ +fn coerce_request( + mut req: Request, + encoding: Encoding, +) -> Request> { req.headers_mut().remove(header::CONTENT_LENGTH); req.headers_mut() @@ -225,17 +247,18 @@ where HeaderValue::from_static("identity,deflate,gzip"), ); - req.map(|b| Body::new(GrpcWebCall::request(b, encoding))) + req.map(|b| GrpcWebCall::request(b, encoding)) } -fn coerce_response(res: Response, encoding: Encoding) -> Response +fn coerce_response( + res: Response, + encoding: Encoding, +) -> Response> where - B: http_body::Body + Send + 'static, - B::Error: Into + fmt::Display, + ResBody: HttpBody + Send + 'static, + D: Buf, { - let mut res = res - .map(|b| GrpcWebCall::response(b, encoding)) - .map(Body::new); + let mut res = res.map(|b| GrpcWebCall::response(b, encoding)); res.headers_mut().insert( header::CONTENT_TYPE, @@ -252,6 +275,7 @@ mod tests { use http::header::{ ACCESS_CONTROL_REQUEST_HEADERS, ACCESS_CONTROL_REQUEST_METHOD, CONTENT_TYPE, ORIGIN, }; + use tonic::body::Body; use tower_layer::Layer as _; type BoxFuture = Pin> + Send>>; @@ -280,6 +304,9 @@ mod tests { fn enable(service: S) -> tower_http::cors::Cors> where S: Service, Response = http::Response>, + S: Send + 'static, + S::Future: Send + 'static, + S::Error: Into + Send, { tower_layer::Stack::new( crate::GrpcWebLayer::new(),