Skip to content

Initial simple grpc channel implementation #2338

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 5 commits into from
Jul 18, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ members = [
resolver = "2"

[workspace.package]
rust-version = "1.75"
rust-version = "1.86"

[workspace.lints.rust]
missing_debug_implementations = "warn"
Expand Down
32 changes: 24 additions & 8 deletions grpc/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -6,19 +6,33 @@ authors = ["gRPC Authors"]
license = "MIT"

[dependencies]
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In a follow up we should relax the versions required here unless we need a feature in a specific patch.

url = "2.5.0"
tokio = { version = "1.37.0", features = ["sync", "rt", "net", "time", "macros"] }
tonic = { version = "0.14.0", path = "../tonic", default-features = false, features = ["codegen"] }
bytes = "1.10.1"
futures-core = "0.3.31"
serde_json = "1.0.140"
serde = "1.0.219"
futures-util = "0.3.31"
hickory-resolver = { version = "0.25.1", optional = true }
rand = "0.9"
http = "1.1.0"
http-body = "1.0.1"
hyper = { version = "1.6.0", features = ["client", "http2"] }
hyper-util = "0.1.14"
once_cell = "1.19.0"
parking_lot = "0.12.4"
bytes = "1.10.1"
pin-project-lite = "0.2.16"
rand = "0.9"
serde = { version = "1.0.219", features = ["derive"] }
serde_json = "1.0.140"
socket2 = "0.5.10"
tokio = { version = "1.37.0", features = ["sync", "rt", "net", "time", "macros"] }
tokio-stream = "0.1.17"
tonic = { version = "0.14.0", path = "../tonic", default-features = false, features = ["codegen", "transport"] }
tower = "0.5.2"
tower-service = "0.3.3"
url = "2.5.0"

[dev-dependencies]
async-stream = "0.3.6"
tonic = { version = "0.14.0", path = "../tonic", default-features = false, features = ["prost", "server", "router"] }
hickory-server = "0.25.2"
prost = "0.13.5"

[features]
default = ["dns"]
Expand All @@ -28,4 +42,6 @@ dns = ["dep:hickory-resolver"]
allowed_external_types = [
"tonic::*",
"futures_core::stream::Stream",
]
"tokio::sync::oneshot::Sender",
"once_cell::sync::Lazy",
]
74 changes: 74 additions & 0 deletions grpc/examples/inmemory.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,74 @@
use std::any::Any;

use futures_util::stream::StreamExt;
use grpc::service::{Message, Request, Response, Service};
use grpc::{client::ChannelOptions, inmemory};
use tonic::async_trait;

struct Handler {}

#[derive(Debug)]
struct MyReqMessage(String);

impl Message for MyReqMessage {}

#[derive(Debug)]
struct MyResMessage(String);
impl Message for MyResMessage {}

#[async_trait]
impl Service for Handler {
async fn call(&self, method: String, request: Request) -> Response {
let mut stream = request.into_inner();
let output = async_stream::try_stream! {
while let Some(req) = stream.next().await {
yield Box::new(MyResMessage(format!(
"Server: responding to: {}; msg: {}",
method, (req as Box<dyn Any>).downcast_ref::<MyReqMessage>().unwrap().0,
))) as Box<dyn Message>;
}
};

Response::new(Box::pin(output))
}
}

#[tokio::main]
async fn main() {
inmemory::reg();

// Spawn the server.
let lis = inmemory::Listener::new();
let mut srv = grpc::server::Server::new();
srv.set_handler(Handler {});
let lis_clone = lis.clone();
tokio::task::spawn(async move {
srv.serve(&lis_clone).await;
println!("serve returned for listener 1!");
});

println!("Creating channel for {}", lis.target());
let chan_opts = ChannelOptions::default();
let chan = grpc::client::Channel::new(lis.target().as_str(), None, chan_opts);

let outbound = async_stream::stream! {
yield Box::new(MyReqMessage("My Request 1".to_string())) as Box<dyn Message>;
yield Box::new(MyReqMessage("My Request 2".to_string()));
yield Box::new(MyReqMessage("My Request 3".to_string()));
};

let req = Request::new(Box::pin(outbound));
let res = chan.call("/some/method".to_string(), req).await;
let mut res = res.into_inner();

while let Some(resp) = res.next().await {
println!(
"CALL RESPONSE: {}",
(resp.unwrap() as Box<dyn Any>)
.downcast_ref::<MyResMessage>()
.unwrap()
.0,
);
}
lis.close().await;
}
101 changes: 101 additions & 0 deletions grpc/examples/multiaddr.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,101 @@
use std::any::Any;

use futures_util::StreamExt;
use grpc::service::{Message, Request, Response, Service};
use grpc::{client::ChannelOptions, inmemory};
use tonic::async_trait;

struct Handler {
id: String,
}

#[derive(Debug)]
struct MyReqMessage(String);

impl Message for MyReqMessage {}

#[derive(Debug)]
struct MyResMessage(String);
impl Message for MyResMessage {}

#[async_trait]
impl Service for Handler {
async fn call(&self, method: String, request: Request) -> Response {
let id = self.id.clone();
let mut stream = request.into_inner();
let output = async_stream::try_stream! {
while let Some(req) = stream.next().await {
yield Box::new(MyResMessage(format!(
"Server {}: responding to: {}; msg: {}",
id, method, (req as Box<dyn Any>).downcast_ref::<MyReqMessage>().unwrap().0,
))) as Box<dyn Message>;
}
};

Response::new(Box::pin(output))
}
}

#[tokio::main]
async fn main() {
inmemory::reg();

// Spawn the first server.
let lis1 = inmemory::Listener::new();
let mut srv = grpc::server::Server::new();
srv.set_handler(Handler { id: lis1.id() });
let lis1_clone = lis1.clone();
tokio::task::spawn(async move {
srv.serve(&lis1_clone).await;
println!("serve returned for listener 1!");
});

// Spawn the second server.
let lis2 = inmemory::Listener::new();
let mut srv = grpc::server::Server::new();
srv.set_handler(Handler { id: lis2.id() });
let lis2_clone = lis2.clone();
tokio::task::spawn(async move {
srv.serve(&lis2_clone).await;
println!("serve returned for listener 2!");
});

// Spawn the third server.
let lis3 = inmemory::Listener::new();
let mut srv = grpc::server::Server::new();
srv.set_handler(Handler { id: lis3.id() });
let lis3_clone = lis3.clone();
tokio::task::spawn(async move {
srv.serve(&lis3_clone).await;
println!("serve returned for listener 3!");
});

let target = String::from("inmemory:///dummy");
println!("Creating channel for {target}");
let chan_opts = ChannelOptions::default();
let chan = grpc::client::Channel::new(target.as_str(), None, chan_opts);

let outbound = async_stream::stream! {
yield Box::new(MyReqMessage("My Request 1".to_string())) as Box<dyn Message>;
yield Box::new(MyReqMessage("My Request 2".to_string()));
yield Box::new(MyReqMessage("My Request 3".to_string()));
};

let req = Request::new(Box::pin(outbound));
let res = chan.call("/some/method".to_string(), req).await;
let mut res = res.into_inner();

while let Some(resp) = res.next().await {
println!(
"CALL RESPONSE: {}",
(resp.unwrap() as Box<dyn Any>)
.downcast_ref::<MyResMessage>()
.unwrap()
.0,
);
}

lis1.close().await;
lis2.close().await;
lis3.close().await;
}
2 changes: 1 addition & 1 deletion grpc/src/attributes.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,5 +24,5 @@

/// A key-value store for arbitrary configuration data between multiple
/// pluggable components.
#[derive(Debug, Default, Clone)]
#[derive(Debug, Default, Clone, PartialEq, PartialOrd, Eq, Ord)]
pub struct Attributes;
Loading
Loading