Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions tests/web/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@ tonic-prost = { path = "../../tonic-prost" }

[dev-dependencies]
tonic-web = { path = "../../tonic-web" }
tower-layer = "0.3"
tower-service = "0.3"

[build-dependencies]
tonic-prost-build = { path = "../../tonic-prost-build" }
94 changes: 94 additions & 0 deletions tests/web/tests/grpc_web.rs
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,29 @@ async fn binary_request() {
assert_eq!(&trailers[..], b"grpc-status:0\r\n");
}

#[tokio::test]
async fn binary_request_reverse_proxy() {
let server_url = spawn_reverse_proxy().await;
let client = Client::builder(TokioExecutor::new()).build_http();

let req = build_request(server_url, "grpc-web", "grpc-web");
let res = client.request(req).await.unwrap();
let content_type = res.headers().get(header::CONTENT_TYPE).unwrap().clone();
let content_type = content_type.to_str().unwrap();

assert_eq!(res.status(), StatusCode::OK);
assert_eq!(content_type, "application/grpc-web+proto");

let (message, trailers) = decode_body(res.into_body(), content_type).await;
let expected = Output {
id: 1,
desc: "one".to_owned(),
};

assert_eq!(message, expected);
assert_eq!(&trailers[..], b"grpc-status:0\r\n");
}

#[tokio::test]
async fn text_request() {
let server_url = spawn().await;
Expand Down Expand Up @@ -84,6 +107,77 @@ async fn spawn() -> String {
url
}

/// Spawn two servers, one serving the gRPC API and another acting as a grpc-web proxy
async fn spawn_reverse_proxy() -> String {
use hyper_util::client::legacy::Client;
use hyper_util::rt::TokioIo;
use tower_layer::Layer;
use tower_service::Service;

// Set up gRPC service
let addr = SocketAddr::from(([127, 0, 0, 1], 0));
let listener = TcpListener::bind(addr).await.expect("listener");
let url = format!("http://{}", listener.local_addr().unwrap());
let listener_stream = TcpListenerStream::new(listener);

drop(tokio::spawn(async move {
Server::builder()
.add_service(TestServer::new(Svc))
.serve_with_incoming(listener_stream)
.await
.unwrap()
}));

// Set up proxy to the above service that applies tonic-web
let addr2 = SocketAddr::from(([127, 0, 0, 1], 0));
let http_client = Client::builder(TokioExecutor::new())
.http2_only(true)
.build_http();
let listener2 = TcpListener::bind(addr2).await.expect("listener");
let url2 = format!("http://{}", listener2.local_addr().unwrap());

let backend_url = url.clone();

drop(tokio::spawn(async move {
loop {
let (stream, _) = listener2.accept().await.unwrap();
let io = TokioIo::new(stream);
let client = http_client.clone();
let backend = backend_url.clone();

tokio::spawn(async move {
let svc = GrpcWebLayer::new().layer(client.clone());
let hyper_svc = hyper::service::service_fn(move |mut req: Request<Incoming>| {
let mut svc = svc.clone();
let backend = backend.clone();
async move {
// Rewrite URI to point to backend
let path = req
.uri()
.path_and_query()
.map(|pq| pq.as_str())
.unwrap_or("/");
let new_uri = format!("{}{}", backend, path).parse().unwrap();
*req.uri_mut() = new_uri;

let req = req.map(Body::new);
svc.call(req).await
}
});

if let Err(err) = hyper_util::server::conn::auto::Builder::new(TokioExecutor::new())
.serve_connection(io, hyper_svc)
.await
{
eprintln!("Error serving connection: {:?}", err);
}
});
}
}));

url2
}

fn encode_body() -> Bytes {
let input = Input {
id: 1,
Expand Down
28 changes: 16 additions & 12 deletions tonic-web/src/call.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,3 @@
use std::fmt;
use std::pin::Pin;
use std::task::{ready, Context, Poll};

Expand All @@ -10,6 +9,8 @@ use pin_project::pin_project;
use tokio_stream::Stream;
use tonic::Status;

use crate::BoxError;

use self::content_types::*;

// A grpc header is u8 (flag) + u32 (msg len)
Expand Down Expand Up @@ -158,11 +159,11 @@ impl<B> GrpcWebCall<B> {
}
}

impl<B> GrpcWebCall<B>
impl<B, D> GrpcWebCall<B>
where
B: Body,
B::Data: Buf,
B::Error: fmt::Display,
B: Body<Data = D>,
B::Error: Into<BoxError> + Send,
D: Buf,
{
// Poll body for data, decoding (e.g. via Base64 if necessary) and returning frames
// to the caller. If the caller is a client, it should look for trailers before
Expand Down Expand Up @@ -247,10 +248,11 @@ where
}
}

impl<B> Body for GrpcWebCall<B>
impl<B, D> Body for GrpcWebCall<B>
where
B: Body,
B::Error: fmt::Display,
B: Body<Data = D>,
B::Error: Into<BoxError> + Send,
D: Buf,
{
type Data = Bytes;
type Error = Status;
Expand Down Expand Up @@ -336,10 +338,11 @@ where
}
}

impl<B> Stream for GrpcWebCall<B>
impl<B, D> Stream for GrpcWebCall<B>
where
B: Body,
B::Error: fmt::Display,
B: Body<Data = D>,
B::Error: Into<BoxError> + Send,
D: Buf,
{
type Item = Result<Frame<Bytes>, Status>;

Expand Down Expand Up @@ -372,7 +375,8 @@ impl Encoding {
}
}

fn internal_error(e: impl std::fmt::Display) -> Status {
fn internal_error(e: impl Into<BoxError> + Send) -> Status {
let e = e.into();
Status::internal(format!("tonic-web: {e}"))
}

Expand Down
45 changes: 36 additions & 9 deletions tonic-web/src/layer.rs
Original file line number Diff line number Diff line change
@@ -1,22 +1,49 @@
use super::GrpcWebService;
use std::error::Error;

use super::{BoxError, GrpcWebService};
use tonic::body::Body;

use tower_layer::Layer;
use tower_service::Service;

/// Layer implementing the grpc-web protocol.
#[derive(Debug, Default, Clone)]
pub struct GrpcWebLayer {
_priv: (),
#[derive(Debug)]
pub struct GrpcWebLayer<ResBody = Body> {
_markers: std::marker::PhantomData<fn() -> ResBody>,
}

impl<ResBody> Clone for GrpcWebLayer<ResBody> {
fn clone(&self) -> Self {
Self {
_markers: std::marker::PhantomData,
}
}
}

impl GrpcWebLayer {
impl<ResBody> GrpcWebLayer<ResBody> {
/// Create a new grpc-web layer.
pub fn new() -> GrpcWebLayer {
Self::default()
pub fn new() -> Self {
Self {
_markers: std::marker::PhantomData,
}
}
}

impl<ResBody> Default for GrpcWebLayer<ResBody> {
fn default() -> Self {
Self::new()
}
}

impl<S> Layer<S> for GrpcWebLayer {
type Service = GrpcWebService<S>;
impl<S, ResBody> Layer<S> for GrpcWebLayer<ResBody>
where
S: Service<http::Request<Body>, Response = http::Response<ResBody>> + Send + 'static,
S::Future: Send + 'static,
S::Error: Into<BoxError> + Send,
ResBody: http_body::Body<Data = bytes::Bytes> + Send + 'static,
ResBody::Error: Error + Send + Sync + 'static,
{
type Service = GrpcWebService<S, ResBody>;

fn layer(&self, inner: S) -> Self::Service {
GrpcWebService::new(inner)
Expand Down
3 changes: 2 additions & 1 deletion tonic-web/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,8 @@ mod client;
mod layer;
mod service;

type BoxError = Box<dyn std::error::Error + Send + Sync>;
/// Alias for a type-erased error type.
pub type BoxError = Box<dyn std::error::Error + Send + Sync>;

pub(crate) mod util {
pub(crate) mod base64 {
Expand Down
Loading
Loading