Skip to content

Commit a4b1801

Browse files
authored
Merge pull request #8 from rsocket/develop
Support fragmentation.
2 parents d150bd7 + 2babf66 commit a4b1801

40 files changed

+1420
-391
lines changed

README.md

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -45,12 +45,12 @@ async fn main() -> Result<(), Box<dyn Error + Send + Sync>> {
4545

4646
RSocketFactory::receive()
4747
.transport(TcpServerTransport::from(addr))
48-
.acceptor(|setup, _socket| {
48+
.acceptor(Box::new(|setup, _socket| {
4949
info!("accept setup: {:?}", setup);
5050
Ok(Box::new(EchoRSocket))
5151
// Or you can reject setup
5252
// Err(From::from("SETUP_NOT_ALLOW"))
53-
})
53+
}))
5454
.on_start(|| info!("+++++++ echo server started! +++++++"))
5555
.serve()
5656
.await
@@ -67,7 +67,7 @@ use rsocket_rust_transport_tcp::TcpClientTransport;
6767
#[test]
6868
async fn test() {
6969
let cli = RSocketFactory::connect()
70-
.acceptor(|| Box::new(EchoRSocket))
70+
.acceptor(Box::new(|| Box::new(EchoRSocket)))
7171
.transport(TcpClientTransport::from("127.0.0.1:7878"))
7272
.setup(Payload::from("READY!"))
7373
.mime_type("text/plain", "text/plain")

examples/Cargo.toml

Lines changed: 16 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -3,17 +3,25 @@ name = "examples"
33
version = "0.0.0"
44
authors = ["Jeffsky <[email protected]>"]
55
edition = "2018"
6+
publish = false
67

78
[dev-dependencies]
8-
rsocket_rust = "0.5.0"
9-
rsocket_rust_transport_tcp = "0.5.0"
10-
rsocket_rust_transport_websocket = "0.5.0"
119
log = "0.4.8"
1210
env_logger = "0.7.1"
1311
futures = "0.3.4"
12+
clap = "2.33.0"
13+
14+
[dev-dependencies.rsocket_rust]
15+
path = "../rsocket"
16+
17+
[dev-dependencies.rsocket_rust_transport_tcp]
18+
path = "../rsocket-transport-tcp"
19+
20+
[dev-dependencies.rsocket_rust_transport_websocket]
21+
path = "../rsocket-transport-websocket"
1422

1523
[dev-dependencies.tokio]
16-
version = "0.2.11"
24+
version = "0.2.16"
1725
default-features = false
1826
features = ["full"]
1927

@@ -24,3 +32,7 @@ path = "echo.rs"
2432
[[example]]
2533
name = "proxy"
2634
path = "proxy.rs"
35+
36+
[[example]]
37+
name = "cli"
38+
path = "cli.rs"

examples/cli.rs

Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,22 @@
1+
use rsocket_rust::prelude::*;
2+
use rsocket_rust_transport_tcp::TcpClientTransport;
3+
use std::error::Error;
4+
5+
#[tokio::main]
6+
async fn main() -> Result<(), Box<dyn Error + Send + Sync>> {
7+
let client = RSocketFactory::connect()
8+
.transport(TcpClientTransport::from("127.0.0.1:7878"))
9+
.acceptor(Box::new(|| {
10+
// Return a responder.
11+
Box::new(EchoRSocket)
12+
}))
13+
.start()
14+
.await
15+
.expect("Connect failed!");
16+
17+
let req = Payload::builder().set_data_utf8("Ping!").build();
18+
let res = client.request_response(req).await.expect("Requet failed!");
19+
println!("request success: response={:?}", res);
20+
21+
Ok(())
22+
}

examples/echo.rs

Lines changed: 199 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -1,25 +1,209 @@
11
#[macro_use]
22
extern crate log;
33

4+
use clap::{App, Arg, SubCommand};
45
use rsocket_rust::prelude::*;
5-
use rsocket_rust_transport_tcp::TcpServerTransport;
6-
use std::env;
6+
use rsocket_rust_transport_tcp::{TcpClientTransport, TcpServerTransport};
77
use std::error::Error;
8+
use std::fs;
9+
10+
enum RequestMode {
11+
FNF,
12+
REQUEST,
13+
STREAM,
14+
CHANNEL,
15+
}
816

917
#[tokio::main]
1018
async fn main() -> Result<(), Box<dyn Error + Send + Sync>> {
1119
env_logger::builder().format_timestamp_millis().init();
12-
let addr = env::args().nth(1).unwrap_or("127.0.0.1:7878".to_string());
13-
14-
RSocketFactory::receive()
15-
.transport(TcpServerTransport::from(addr))
16-
.acceptor(|setup, _socket| {
17-
info!("accept setup: {:?}", setup);
18-
Ok(Box::new(EchoRSocket))
19-
// Or you can reject setup
20-
// Err(From::from("SETUP_NOT_ALLOW"))
21-
})
22-
.on_start(|| info!("+++++++ echo server started! +++++++"))
23-
.serve()
24-
.await
20+
21+
let cli = App::new("echo")
22+
.version("0.0.0")
23+
.author("Jeffsky <[email protected]>")
24+
.about("An echo tool for RSocket.")
25+
.subcommand(
26+
SubCommand::with_name("serve")
27+
.about("serve an echo server")
28+
.arg(
29+
Arg::with_name("mtu")
30+
.long("mtu")
31+
.required(false)
32+
.takes_value(true)
33+
.help("Fragment mtu."),
34+
)
35+
.arg(
36+
Arg::with_name("URL")
37+
.required(true)
38+
.index(1)
39+
.help("connect url"),
40+
),
41+
)
42+
.subcommand(
43+
SubCommand::with_name("connect")
44+
.about("connect to echo server")
45+
.arg(
46+
Arg::with_name("input")
47+
.short("i")
48+
.long("input")
49+
.required(false)
50+
.takes_value(true)
51+
.help("Input payload data."),
52+
)
53+
.arg(
54+
Arg::with_name("mtu")
55+
.long("mtu")
56+
.required(false)
57+
.takes_value(true)
58+
.help("Fragment mtu."),
59+
)
60+
.arg(
61+
Arg::with_name("request")
62+
.long("request")
63+
.required(false)
64+
.takes_value(false)
65+
.help("request_response mode."),
66+
)
67+
.arg(
68+
Arg::with_name("channel")
69+
.long("channel")
70+
.required(false)
71+
.takes_value(false)
72+
.help("request_channel mode."),
73+
)
74+
.arg(
75+
Arg::with_name("stream")
76+
.long("stream")
77+
.required(false)
78+
.takes_value(false)
79+
.help("request_stream mode."),
80+
)
81+
.arg(
82+
Arg::with_name("fnf")
83+
.long("fnf")
84+
.required(false)
85+
.takes_value(false)
86+
.help("fire_and_forget mode."),
87+
)
88+
.arg(
89+
Arg::with_name("URL")
90+
.required(true)
91+
.index(1)
92+
.help("connect url"),
93+
),
94+
)
95+
.arg(
96+
Arg::with_name("debug")
97+
.short("d")
98+
.help("print debug information verbosely"),
99+
)
100+
.get_matches();
101+
102+
match cli.subcommand() {
103+
("serve", Some(flags)) => {
104+
let addr = flags.value_of("URL").expect("Missing URL");
105+
let mtu: usize = flags
106+
.value_of("mtu")
107+
.map(|it| it.parse().expect("Invalid mtu string!"))
108+
.unwrap_or(0);
109+
RSocketFactory::receive()
110+
.transport(TcpServerTransport::from(addr))
111+
.fragment(mtu)
112+
.acceptor(Box::new(|setup, _socket| {
113+
info!("accept setup: {:?}", setup);
114+
Ok(Box::new(EchoRSocket))
115+
// Or you can reject setup
116+
// Err(From::from("SETUP_NOT_ALLOW"))
117+
}))
118+
.on_start(Box::new(|| info!("+++++++ echo server started! +++++++")))
119+
.serve()
120+
.await
121+
}
122+
("connect", Some(flags)) => {
123+
let mut modes: Vec<RequestMode> = vec![];
124+
125+
if flags.is_present("stream") {
126+
modes.push(RequestMode::STREAM);
127+
}
128+
if flags.is_present("fnf") {
129+
modes.push(RequestMode::FNF);
130+
}
131+
if flags.is_present("channel") {
132+
modes.push(RequestMode::CHANNEL);
133+
}
134+
135+
if flags.is_present("request") {
136+
modes.push(RequestMode::REQUEST);
137+
}
138+
139+
if modes.len() > 1 {
140+
error!("duplicated request mode: use one of --fnf/--request/--stream/--channel.");
141+
return Ok(());
142+
}
143+
144+
let mtu: usize = flags
145+
.value_of("mtu")
146+
.map(|it| it.parse().expect("Invalid mtu string!"))
147+
.unwrap_or(0);
148+
149+
let addr = flags.value_of("URL").expect("Missing URL");
150+
let cli = RSocketFactory::connect()
151+
.fragment(mtu)
152+
.transport(TcpClientTransport::from(addr))
153+
.start()
154+
.await
155+
.expect("Connect failed!");
156+
let mut bu = Payload::builder();
157+
if let Some(data) = flags.value_of("input") {
158+
if data.starts_with("@") {
159+
let file_content =
160+
fs::read_to_string(&data[1..].to_owned()).expect("Read file failed.");
161+
bu = bu.set_data_utf8(&file_content);
162+
} else {
163+
bu = bu.set_data_utf8(data);
164+
}
165+
}
166+
let req = bu.build();
167+
168+
match modes.pop().unwrap_or(RequestMode::REQUEST) {
169+
RequestMode::FNF => {
170+
cli.fire_and_forget(req).await;
171+
}
172+
RequestMode::STREAM => {
173+
let mut results = cli.request_stream(req);
174+
loop {
175+
match results.next().await {
176+
Some(Ok(v)) => info!("{:?}", v),
177+
Some(Err(e)) => {
178+
error!("STREAM_RESPONSE FAILED: {:?}", e);
179+
break;
180+
}
181+
None => break,
182+
}
183+
}
184+
}
185+
RequestMode::CHANNEL => {
186+
let mut results =
187+
cli.request_channel(Box::pin(futures::stream::iter(vec![Ok(req)])));
188+
loop {
189+
match results.next().await {
190+
Some(Ok(v)) => info!("{:?}", v),
191+
Some(Err(e)) => {
192+
error!("CHANNEL_RESPONSE FAILED: {:?}", e);
193+
break;
194+
}
195+
None => break,
196+
}
197+
}
198+
}
199+
RequestMode::REQUEST => {
200+
let res = cli.request_response(req).await.expect("Request failed!");
201+
info!("{:?}", res);
202+
}
203+
}
204+
205+
Ok(())
206+
}
207+
_ => Ok(()),
208+
}
25209
}

examples/proxy.rs

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -12,18 +12,18 @@ async fn main() -> Result<(), Box<dyn Error + Send + Sync>> {
1212

1313
RSocketFactory::receive()
1414
.transport(TcpServerTransport::from("127.0.0.1:7979"))
15-
.acceptor(|setup, _sending_socket| {
15+
.acceptor(Box::new(|setup, _sending_socket| {
1616
info!("incoming socket: setup={:?}", setup);
1717
Ok(Box::new(block_on(async move {
1818
RSocketFactory::connect()
1919
.transport(TcpClientTransport::from("127.0.0.1:7878"))
20-
.acceptor(|| Box::new(EchoRSocket))
20+
.acceptor(Box::new(|| Box::new(EchoRSocket)))
2121
.setup(Payload::from("I'm Rust!"))
2222
.start()
2323
.await
2424
.unwrap()
2525
})))
26-
})
26+
}))
2727
.serve()
2828
.await
2929
}

rsocket-test/Cargo.toml

Lines changed: 12 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -3,19 +3,27 @@ name = "rsocket_rust_test"
33
version = "0.0.0"
44
authors = ["Jeffsky <[email protected]>"]
55
edition = "2018"
6+
publish = false
67

78
[dev-dependencies]
89
log = "0.4"
910
futures = "0.3.4"
1011
env_logger = "0.7.1"
11-
rsocket_rust = { version = "0.5.0", features = ["frame"] }
12-
rsocket_rust_transport_tcp = { version = "0.5.0" }
13-
rsocket_rust_transport_websocket = { version = "0.5.0" }
1412
bytes = "0.5.4"
1513
hex = "0.4.2"
1614
rand = "0.7.3"
1715

16+
[dev-dependencies.rsocket_rust]
17+
path = "../rsocket"
18+
features = ["frame"]
19+
20+
[dev-dependencies.rsocket_rust_transport_tcp]
21+
path = "../rsocket-transport-tcp"
22+
23+
[dev-dependencies.rsocket_rust_transport_websocket]
24+
path = "../rsocket-transport-websocket"
25+
1826
[dev-dependencies.tokio]
19-
version = "0.2.11"
27+
version = "0.2.13"
2028
default-features = false
2129
features = ["full"]

0 commit comments

Comments
 (0)