Skip to content

Commit d4e4a2a

Browse files
authored
Initial simple grpc channel implementation (#2338)
1 parent cde96ac commit d4e4a2a

File tree

23 files changed

+2204
-165
lines changed

23 files changed

+2204
-165
lines changed

Cargo.toml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -33,7 +33,7 @@ members = [
3333
resolver = "2"
3434

3535
[workspace.package]
36-
rust-version = "1.75"
36+
rust-version = "1.86"
3737

3838
[workspace.lints.rust]
3939
missing_debug_implementations = "warn"

grpc/Cargo.toml

Lines changed: 24 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -6,19 +6,33 @@ authors = ["gRPC Authors"]
66
license = "MIT"
77

88
[dependencies]
9-
url = "2.5.0"
10-
tokio = { version = "1.37.0", features = ["sync", "rt", "net", "time", "macros"] }
11-
tonic = { version = "0.14.0", path = "../tonic", default-features = false, features = ["codegen"] }
9+
bytes = "1.10.1"
1210
futures-core = "0.3.31"
13-
serde_json = "1.0.140"
14-
serde = "1.0.219"
11+
futures-util = "0.3.31"
1512
hickory-resolver = { version = "0.25.1", optional = true }
16-
rand = "0.9"
13+
http = "1.1.0"
14+
http-body = "1.0.1"
15+
hyper = { version = "1.6.0", features = ["client", "http2"] }
16+
hyper-util = "0.1.14"
17+
once_cell = "1.19.0"
1718
parking_lot = "0.12.4"
18-
bytes = "1.10.1"
19+
pin-project-lite = "0.2.16"
20+
rand = "0.9"
21+
serde = { version = "1.0.219", features = ["derive"] }
22+
serde_json = "1.0.140"
23+
socket2 = "0.5.10"
24+
tokio = { version = "1.37.0", features = ["sync", "rt", "net", "time", "macros"] }
25+
tokio-stream = "0.1.17"
26+
tonic = { version = "0.14.0", path = "../tonic", default-features = false, features = ["codegen", "transport"] }
27+
tower = "0.5.2"
28+
tower-service = "0.3.3"
29+
url = "2.5.0"
1930

2031
[dev-dependencies]
32+
async-stream = "0.3.6"
33+
tonic = { version = "0.14.0", path = "../tonic", default-features = false, features = ["prost", "server", "router"] }
2134
hickory-server = "0.25.2"
35+
prost = "0.13.5"
2236

2337
[features]
2438
default = ["dns"]
@@ -28,4 +42,6 @@ dns = ["dep:hickory-resolver"]
2842
allowed_external_types = [
2943
"tonic::*",
3044
"futures_core::stream::Stream",
31-
]
45+
"tokio::sync::oneshot::Sender",
46+
"once_cell::sync::Lazy",
47+
]

grpc/examples/inmemory.rs

Lines changed: 74 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,74 @@
1+
use std::any::Any;
2+
3+
use futures_util::stream::StreamExt;
4+
use grpc::service::{Message, Request, Response, Service};
5+
use grpc::{client::ChannelOptions, inmemory};
6+
use tonic::async_trait;
7+
8+
struct Handler {}
9+
10+
#[derive(Debug)]
11+
struct MyReqMessage(String);
12+
13+
impl Message for MyReqMessage {}
14+
15+
#[derive(Debug)]
16+
struct MyResMessage(String);
17+
impl Message for MyResMessage {}
18+
19+
#[async_trait]
20+
impl Service for Handler {
21+
async fn call(&self, method: String, request: Request) -> Response {
22+
let mut stream = request.into_inner();
23+
let output = async_stream::try_stream! {
24+
while let Some(req) = stream.next().await {
25+
yield Box::new(MyResMessage(format!(
26+
"Server: responding to: {}; msg: {}",
27+
method, (req as Box<dyn Any>).downcast_ref::<MyReqMessage>().unwrap().0,
28+
))) as Box<dyn Message>;
29+
}
30+
};
31+
32+
Response::new(Box::pin(output))
33+
}
34+
}
35+
36+
#[tokio::main]
37+
async fn main() {
38+
inmemory::reg();
39+
40+
// Spawn the server.
41+
let lis = inmemory::Listener::new();
42+
let mut srv = grpc::server::Server::new();
43+
srv.set_handler(Handler {});
44+
let lis_clone = lis.clone();
45+
tokio::task::spawn(async move {
46+
srv.serve(&lis_clone).await;
47+
println!("serve returned for listener 1!");
48+
});
49+
50+
println!("Creating channel for {}", lis.target());
51+
let chan_opts = ChannelOptions::default();
52+
let chan = grpc::client::Channel::new(lis.target().as_str(), None, chan_opts);
53+
54+
let outbound = async_stream::stream! {
55+
yield Box::new(MyReqMessage("My Request 1".to_string())) as Box<dyn Message>;
56+
yield Box::new(MyReqMessage("My Request 2".to_string()));
57+
yield Box::new(MyReqMessage("My Request 3".to_string()));
58+
};
59+
60+
let req = Request::new(Box::pin(outbound));
61+
let res = chan.call("/some/method".to_string(), req).await;
62+
let mut res = res.into_inner();
63+
64+
while let Some(resp) = res.next().await {
65+
println!(
66+
"CALL RESPONSE: {}",
67+
(resp.unwrap() as Box<dyn Any>)
68+
.downcast_ref::<MyResMessage>()
69+
.unwrap()
70+
.0,
71+
);
72+
}
73+
lis.close().await;
74+
}

grpc/examples/multiaddr.rs

Lines changed: 101 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,101 @@
1+
use std::any::Any;
2+
3+
use futures_util::StreamExt;
4+
use grpc::service::{Message, Request, Response, Service};
5+
use grpc::{client::ChannelOptions, inmemory};
6+
use tonic::async_trait;
7+
8+
struct Handler {
9+
id: String,
10+
}
11+
12+
#[derive(Debug)]
13+
struct MyReqMessage(String);
14+
15+
impl Message for MyReqMessage {}
16+
17+
#[derive(Debug)]
18+
struct MyResMessage(String);
19+
impl Message for MyResMessage {}
20+
21+
#[async_trait]
22+
impl Service for Handler {
23+
async fn call(&self, method: String, request: Request) -> Response {
24+
let id = self.id.clone();
25+
let mut stream = request.into_inner();
26+
let output = async_stream::try_stream! {
27+
while let Some(req) = stream.next().await {
28+
yield Box::new(MyResMessage(format!(
29+
"Server {}: responding to: {}; msg: {}",
30+
id, method, (req as Box<dyn Any>).downcast_ref::<MyReqMessage>().unwrap().0,
31+
))) as Box<dyn Message>;
32+
}
33+
};
34+
35+
Response::new(Box::pin(output))
36+
}
37+
}
38+
39+
#[tokio::main]
40+
async fn main() {
41+
inmemory::reg();
42+
43+
// Spawn the first server.
44+
let lis1 = inmemory::Listener::new();
45+
let mut srv = grpc::server::Server::new();
46+
srv.set_handler(Handler { id: lis1.id() });
47+
let lis1_clone = lis1.clone();
48+
tokio::task::spawn(async move {
49+
srv.serve(&lis1_clone).await;
50+
println!("serve returned for listener 1!");
51+
});
52+
53+
// Spawn the second server.
54+
let lis2 = inmemory::Listener::new();
55+
let mut srv = grpc::server::Server::new();
56+
srv.set_handler(Handler { id: lis2.id() });
57+
let lis2_clone = lis2.clone();
58+
tokio::task::spawn(async move {
59+
srv.serve(&lis2_clone).await;
60+
println!("serve returned for listener 2!");
61+
});
62+
63+
// Spawn the third server.
64+
let lis3 = inmemory::Listener::new();
65+
let mut srv = grpc::server::Server::new();
66+
srv.set_handler(Handler { id: lis3.id() });
67+
let lis3_clone = lis3.clone();
68+
tokio::task::spawn(async move {
69+
srv.serve(&lis3_clone).await;
70+
println!("serve returned for listener 3!");
71+
});
72+
73+
let target = String::from("inmemory:///dummy");
74+
println!("Creating channel for {target}");
75+
let chan_opts = ChannelOptions::default();
76+
let chan = grpc::client::Channel::new(target.as_str(), None, chan_opts);
77+
78+
let outbound = async_stream::stream! {
79+
yield Box::new(MyReqMessage("My Request 1".to_string())) as Box<dyn Message>;
80+
yield Box::new(MyReqMessage("My Request 2".to_string()));
81+
yield Box::new(MyReqMessage("My Request 3".to_string()));
82+
};
83+
84+
let req = Request::new(Box::pin(outbound));
85+
let res = chan.call("/some/method".to_string(), req).await;
86+
let mut res = res.into_inner();
87+
88+
while let Some(resp) = res.next().await {
89+
println!(
90+
"CALL RESPONSE: {}",
91+
(resp.unwrap() as Box<dyn Any>)
92+
.downcast_ref::<MyResMessage>()
93+
.unwrap()
94+
.0,
95+
);
96+
}
97+
98+
lis1.close().await;
99+
lis2.close().await;
100+
lis3.close().await;
101+
}

grpc/src/attributes.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -24,5 +24,5 @@
2424

2525
/// A key-value store for arbitrary configuration data between multiple
2626
/// pluggable components.
27-
#[derive(Debug, Default, Clone)]
27+
#[derive(Debug, Default, Clone, PartialEq, PartialOrd, Eq, Ord)]
2828
pub struct Attributes;

0 commit comments

Comments
 (0)