Skip to content

Commit 833d275

Browse files
committed
support encoding/decoding messages
1 parent c6e42eb commit 833d275

File tree

11 files changed

+313
-40
lines changed

11 files changed

+313
-40
lines changed

grpc/examples/inmemory.rs

Lines changed: 43 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
use std::any::Any;
22

3-
use grpc::service::{Message, Request, Response, Service};
3+
use grpc::service::{Message, MessageAllocator, Request, Response, Service};
44
use grpc::{client::ChannelOptions, inmemory};
55
use tokio_stream::StreamExt;
66
use tonic::async_trait;
@@ -10,12 +10,37 @@ struct Handler {}
1010
#[derive(Debug)]
1111
struct MyReqMessage(String);
1212

13+
impl Message for MyReqMessage {
14+
fn encode(&self, _: &mut bytes::BytesMut) -> Result<(), String> {
15+
Err("not implemented".to_string())
16+
}
17+
18+
fn decode(&mut self, _: &bytes::Bytes) -> Result<(), String> {
19+
Err("not implemented".to_string())
20+
}
21+
}
22+
1323
#[derive(Debug)]
1424
struct MyResMessage(String);
1525

26+
impl Message for MyResMessage {
27+
fn encode(&self, _: &mut bytes::BytesMut) -> Result<(), String> {
28+
Err("not implemented".to_string())
29+
}
30+
31+
fn decode(&mut self, _: &bytes::Bytes) -> Result<(), String> {
32+
Err("not implemented".to_string())
33+
}
34+
}
35+
1636
#[async_trait]
1737
impl Service for Handler {
18-
async fn call(&self, method: String, request: Request) -> Response {
38+
async fn call(
39+
&self,
40+
method: String,
41+
request: Request,
42+
_: Box<dyn MessageAllocator>,
43+
) -> Response {
1944
let mut stream = request.into_inner();
2045
let output = async_stream::try_stream! {
2146
while let Some(req) = stream.next().await {
@@ -30,6 +55,15 @@ impl Service for Handler {
3055
}
3156
}
3257

58+
#[derive(Debug, Default)]
59+
struct MyResMessageAllocator {}
60+
61+
impl MessageAllocator for MyResMessageAllocator {
62+
fn allocate(&self) -> Box<dyn Message> {
63+
Box::new(MyResMessage(String::new()))
64+
}
65+
}
66+
3367
#[tokio::main]
3468
async fn main() {
3569
inmemory::reg();
@@ -55,7 +89,13 @@ async fn main() {
5589
};
5690

5791
let req = Request::new(Box::pin(outbound));
58-
let res = chan.call("/some/method".to_string(), req).await;
92+
let res = chan
93+
.call(
94+
"/some/method".to_string(),
95+
req,
96+
Box::new(MyResMessageAllocator {}),
97+
)
98+
.await;
5999
let mut res = res.into_inner();
60100

61101
while let Some(resp) = res.next().await {

grpc/examples/multiaddr.rs

Lines changed: 43 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
use std::any::Any;
22

3-
use grpc::service::{Message, Request, Response, Service};
3+
use grpc::service::{Message, MessageAllocator, Request, Response, Service};
44
use grpc::{client::ChannelOptions, inmemory};
55
use tokio_stream::StreamExt;
66
use tonic::async_trait;
@@ -15,9 +15,43 @@ struct MyReqMessage(String);
1515
#[derive(Debug)]
1616
struct MyResMessage(String);
1717

18+
impl Message for MyReqMessage {
19+
fn encode(&self, _: &mut bytes::BytesMut) -> Result<(), String> {
20+
Err("not implemented".to_string())
21+
}
22+
23+
fn decode(&mut self, _: &bytes::Bytes) -> Result<(), String> {
24+
Err("not implemented".to_string())
25+
}
26+
}
27+
28+
#[derive(Debug, Default)]
29+
struct MyResMessageAllocator {}
30+
31+
impl Message for MyResMessage {
32+
fn encode(&self, _: &mut bytes::BytesMut) -> Result<(), String> {
33+
Err("not implemented".to_string())
34+
}
35+
36+
fn decode(&mut self, _: &bytes::Bytes) -> Result<(), String> {
37+
Err("not implemented".to_string())
38+
}
39+
}
40+
41+
impl MessageAllocator for MyResMessageAllocator {
42+
fn allocate(&self) -> Box<dyn Message> {
43+
Box::new(MyResMessage(String::new()))
44+
}
45+
}
46+
1847
#[async_trait]
1948
impl Service for Handler {
20-
async fn call(&self, method: String, request: Request) -> Response {
49+
async fn call(
50+
&self,
51+
method: String,
52+
request: Request,
53+
_: Box<dyn MessageAllocator>,
54+
) -> Response {
2155
let id = self.id.clone();
2256
let mut stream = request.into_inner();
2357
let output = async_stream::try_stream! {
@@ -79,7 +113,13 @@ async fn main() {
79113
};
80114

81115
let req = Request::new(Box::pin(outbound));
82-
let res = chan.call("/some/method".to_string(), req).await;
116+
let res = chan
117+
.call(
118+
"/some/method".to_string(),
119+
req,
120+
Box::new(MyResMessageAllocator {}),
121+
)
122+
.await;
83123
let mut res = res.into_inner();
84124

85125
while let Some(resp) = res.next().await {

grpc/src/client/channel.rs

Lines changed: 20 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -42,9 +42,9 @@ use serde_json::json;
4242
use tonic::async_trait;
4343
use url::Url; // NOTE: http::Uri requires non-empty authority portion of URI
4444

45-
use crate::attributes::Attributes;
4645
use crate::rt;
4746
use crate::service::{Request, Response, Service};
47+
use crate::{attributes::Attributes, service::MessageAllocator};
4848
use crate::{client::ConnectivityState, rt::Runtime};
4949
use crate::{credentials::Credentials, rt::default_runtime};
5050

@@ -204,9 +204,14 @@ impl Channel {
204204
s.clone().unwrap()
205205
}
206206

207-
pub async fn call(&self, method: String, request: Request) -> Response {
207+
pub async fn call(
208+
&self,
209+
method: String,
210+
request: Request,
211+
response_allocator: Box<dyn MessageAllocator>,
212+
) -> Response {
208213
let ac = self.get_or_create_active_channel();
209-
ac.call(method, request).await
214+
ac.call(method, request, response_allocator).await
210215
}
211216
}
212217

@@ -302,7 +307,12 @@ impl ActiveChannel {
302307
})
303308
}
304309

305-
async fn call(&self, method: String, request: Request) -> Response {
310+
async fn call(
311+
&self,
312+
method: String,
313+
request: Request,
314+
response_allocator: Box<dyn MessageAllocator>,
315+
) -> Response {
306316
// TODO: pre-pick tasks (e.g. deadlines, interceptors, retry)
307317
let mut i = self.picker.iter();
308318
loop {
@@ -314,7 +324,12 @@ impl ActiveChannel {
314324
if let Some(sc) = (pr.subchannel.as_ref() as &dyn Any)
315325
.downcast_ref::<ExternalSubchannel>()
316326
{
317-
return sc.isc.as_ref().unwrap().call(method, request).await;
327+
return sc
328+
.isc
329+
.as_ref()
330+
.unwrap()
331+
.call(method, request, response_allocator)
332+
.await;
318333
} else {
319334
panic!("picked subchannel is not an implementation provided by the channel");
320335
}

grpc/src/client/load_balancing/test_utils.rs

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -40,6 +40,16 @@ pub(crate) fn new_request() -> Request {
4040
)))
4141
}
4242

43+
impl Message for EmptyMessage {
44+
fn encode(&self, buf: &mut bytes::BytesMut) -> Result<(), String> {
45+
Ok(())
46+
}
47+
48+
fn decode(&mut self, buf: &bytes::Bytes) -> Result<(), String> {
49+
Ok(())
50+
}
51+
}
52+
4353
// A test subchannel that forwards connect calls to a channel.
4454
// This allows tests to verify when a subchannel is asked to connect.
4555
pub(crate) struct TestSubchannel {

grpc/src/client/name_resolution/mod.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -39,7 +39,7 @@ use std::{
3939
};
4040

4141
mod backoff;
42-
mod dns;
42+
pub mod dns;
4343
mod registry;
4444
pub use registry::global_registry;
4545
use url::Url;

grpc/src/client/subchannel.rs

Lines changed: 8 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,7 @@ use crate::{
1212
transport::{ConnectedTransport, TransportOptions},
1313
},
1414
rt::{BoxedTaskHandle, Runtime},
15-
service::{Request, Response, Service},
15+
service::{MessageAllocator, Request, Response, Service},
1616
};
1717
use core::panic;
1818
use std::time::{Duration, Instant};
@@ -192,7 +192,12 @@ struct InnerSubchannel {
192192

193193
#[async_trait]
194194
impl Service for InternalSubchannel {
195-
async fn call(&self, method: String, request: Request) -> Response {
195+
async fn call(
196+
&self,
197+
method: String,
198+
request: Request,
199+
response_allocator: Box<dyn MessageAllocator>,
200+
) -> Response {
196201
let svc = self.inner.lock().unwrap().state.connected_transport();
197202
if svc.is_none() {
198203
// TODO(easwars): Change the signature of this method to return a
@@ -201,7 +206,7 @@ impl Service for InternalSubchannel {
201206
}
202207

203208
let svc = svc.unwrap().clone();
204-
return svc.call(method, request).await;
209+
return svc.call(method, request, response_allocator).await;
205210
}
206211
}
207212

grpc/src/client/transport/tonic/mod.rs

Lines changed: 25 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -8,10 +8,12 @@ use crate::rt::BoxedTaskHandle;
88
use crate::rt::Runtime;
99
use crate::rt::TcpOptions;
1010
use crate::service::Message;
11+
use crate::service::MessageAllocator;
1112
use crate::service::Request as GrpcRequest;
1213
use crate::service::Response as GrpcResponse;
1314
use crate::{client::name_resolution::TCP_IP_NETWORK_TYPE, service::Service};
1415
use bytes::Bytes;
16+
use bytes::BytesMut;
1517
use http::uri::PathAndQuery;
1618
use http::Request as HttpRequest;
1719
use http::Response as HttpResponse;
@@ -63,7 +65,12 @@ impl Drop for TonicTransport {
6365

6466
#[async_trait]
6567
impl Service for TonicTransport {
66-
async fn call(&self, method: String, request: GrpcRequest) -> GrpcResponse {
68+
async fn call(
69+
&self,
70+
method: String,
71+
request: GrpcRequest,
72+
response_allocator: Box<dyn MessageAllocator>,
73+
) -> GrpcResponse {
6774
let Ok(path) = PathAndQuery::from_maybe_shared(method) else {
6875
let err = Status::internal("Failed to parse path");
6976
return create_error_response(err);
@@ -78,7 +85,7 @@ impl Service for TonicTransport {
7885
};
7986
let request = convert_request(request);
8087
let response = grpc.streaming(request, path, BytesCodec {}).await;
81-
convert_response(response)
88+
convert_response(response, response_allocator)
8289
}
8390
}
8491

@@ -92,9 +99,11 @@ fn convert_request(req: GrpcRequest) -> TonicRequest<Pin<Box<dyn Stream<Item = B
9299
let (metadata, extensions, stream) = req.into_parts();
93100

94101
let bytes_stream = Box::pin(stream.filter_map(|msg| {
95-
if let Ok(bytes) = (msg as Box<dyn Any>).downcast::<Bytes>() {
96-
Some(*bytes)
102+
let mut buf = BytesMut::with_capacity(msg.encoded_message_size_hint().unwrap_or(0));
103+
if let Ok(()) = msg.encode(&mut buf) {
104+
Some(buf.freeze())
97105
} else {
106+
// TODO: Handle encoding failures.
98107
// If it fails, log the error and return None to filter it out.
99108
eprintln!("A message could not be downcast to Bytes and was skipped.");
100109
None
@@ -104,7 +113,10 @@ fn convert_request(req: GrpcRequest) -> TonicRequest<Pin<Box<dyn Stream<Item = B
104113
TonicRequest::from_parts(metadata, extensions, bytes_stream as _)
105114
}
106115

107-
fn convert_response(res: Result<TonicResponse<Streaming<Bytes>>, Status>) -> GrpcResponse {
116+
fn convert_response(
117+
res: Result<TonicResponse<Streaming<Bytes>>, Status>,
118+
allocator: Box<dyn MessageAllocator>,
119+
) -> GrpcResponse {
108120
let response = match res {
109121
Ok(s) => s,
110122
Err(e) => {
@@ -113,11 +125,14 @@ fn convert_response(res: Result<TonicResponse<Streaming<Bytes>>, Status>) -> Grp
113125
}
114126
};
115127
let (metadata, stream, extensions) = response.into_parts();
116-
let message_stream: BoxStream<Box<dyn Message>> = Box::pin(stream.map(|msg| {
117-
msg.map(|b| {
118-
let msg: Box<dyn Message> = Box::new(b);
119-
msg
120-
})
128+
let allocator: Arc<dyn MessageAllocator> = Arc::from(allocator);
129+
let allocator_copy = allocator.clone();
130+
let message_stream: BoxStream<Box<dyn Message>> = Box::pin(stream.map(move |msg| {
131+
let allocator = allocator_copy.clone();
132+
let buf = msg?;
133+
let mut msg = allocator.allocate();
134+
msg.decode(&buf).map_err(|s| Status::internal(s))?;
135+
Ok(msg)
121136
}));
122137
TonicResponse::from_parts(metadata, message_stream, extensions)
123138
}

0 commit comments

Comments
 (0)