Skip to content

Commit 63f2b08

Browse files
committed
support client-side request_channel.
1 parent 7f0105e commit 63f2b08

File tree

8 files changed

+124
-34
lines changed

8 files changed

+124
-34
lines changed

Cargo.toml

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -15,10 +15,10 @@ log = "0.4.8"
1515
bytes = "0.5.2"
1616
futures = "0.3.1"
1717
lazy_static = "1.4.0"
18-
# reactor_rs = {git = "https://github.com/jjeffcaii/reactor-rust", branch = "develop"}
18+
reactor_rs = {git = "https://github.com/jjeffcaii/reactor-rust", branch = "develop"}
1919

2020
[dependencies.tokio]
21-
version = "0.2.1"
21+
version = "0.2.2"
2222
default-features = false
2323
features = ["full"]
2424

src/errors.rs

Lines changed: 9 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,7 @@ use std::io;
55
#[derive(Debug)]
66
pub enum ErrorKind {
77
Internal(u32, &'static str),
8-
WithDescription(&'static str),
8+
WithDescription(String),
99
IO(io::Error),
1010
Cancelled(),
1111
Send(),
@@ -41,11 +41,18 @@ impl From<ErrorKind> for RSocketError {
4141
RSocketError { kind }
4242
}
4343
}
44+
impl From<String> for RSocketError {
45+
fn from(e: String) -> RSocketError {
46+
RSocketError {
47+
kind: ErrorKind::WithDescription(e),
48+
}
49+
}
50+
}
4451

4552
impl From<&'static str> for RSocketError {
4653
fn from(e: &'static str) -> RSocketError {
4754
RSocketError {
48-
kind: ErrorKind::WithDescription(e),
55+
kind: ErrorKind::WithDescription(String::from(e)),
4956
}
5057
}
5158
}

src/extension.rs

Lines changed: 4 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -66,9 +66,7 @@ impl RoutingMetadata {
6666
}
6767
let size = bf.get_u8() as usize;
6868
if bf.len() < size {
69-
return Err(RSocketError::from(ErrorKind::WithDescription(
70-
"require more bytes!",
71-
)));
69+
return Err(RSocketError::from("require more bytes!"));
7270
}
7371
let tag = String::from_utf8(bf.split_to(size).to_vec()).unwrap();
7472
Ok(Some(tag))
@@ -131,24 +129,18 @@ impl CompositeMetadata {
131129
// Bad
132130
let mime_len = first as usize;
133131
if bs.len() < mime_len {
134-
return Err(RSocketError::from(ErrorKind::WithDescription(
135-
"bad COMPOSITE_METADATA bytes: missing required bytes!",
136-
)));
132+
return Err(RSocketError::from("broken COMPOSITE_METADATA bytes!"));
137133
}
138134
let front = bs.split_to(mime_len);
139135
String::from_utf8(front.to_vec()).unwrap()
140136
};
141137

142138
if bs.len() < 3 {
143-
return Err(RSocketError::from(ErrorKind::WithDescription(
144-
"bad COMPOSITE_METADATA bytes: missing required bytes!",
145-
)));
139+
return Err(RSocketError::from("broken COMPOSITE_METADATA bytes!"));
146140
}
147141
let payload_size = U24::read_advance(bs) as usize;
148142
if bs.len() < payload_size {
149-
return Err(RSocketError::from(ErrorKind::WithDescription(
150-
"bad COMPOSITE_METADATA bytes: missing required bytes!",
151-
)));
143+
return Err(RSocketError::from("broken COMPOSITE_METADATA bytes!"));
152144
}
153145
let p = bs.split_to(payload_size).freeze();
154146
Ok(Some(CompositeMetadata::new(m, p)))

src/spi.rs

Lines changed: 9 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,7 @@ pub trait RSocket: Sync + Send {
1717
fn fire_and_forget(&self, req: Payload) -> Mono<()>;
1818
fn request_response(&self, req: Payload) -> Mono<Payload>;
1919
fn request_stream(&self, req: Payload) -> Flux<Payload>;
20-
// fn request_channel(&self, reqs: Flux<Payload>) -> Flux<Payload>;
20+
fn request_channel(&self, reqs: Flux<Payload>) -> Flux<Payload>;
2121
}
2222

2323
pub struct EchoRSocket;
@@ -43,6 +43,10 @@ impl RSocket for EchoRSocket {
4343
Ok(req),
4444
]))
4545
}
46+
fn request_channel(&self, reqs: Flux<Payload>) -> Flux<Payload> {
47+
info!("echo request_channel");
48+
reqs
49+
}
4650
}
4751

4852
pub struct EmptyRSocket;
@@ -67,7 +71,10 @@ impl RSocket for EmptyRSocket {
6771
}
6872

6973
fn request_stream(&self, req: Payload) -> Flux<Payload> {
70-
Box::pin(futures::stream::empty())
74+
Box::pin(futures::stream::iter(vec![Err(self.must_failed())]))
75+
}
76+
fn request_channel(&self, mut reqs: Flux<Payload>) -> Flux<Payload> {
77+
Box::pin(futures::stream::iter(vec![Err(self.must_failed())]))
7178
}
7279
}
7380

src/transport/socket.rs

Lines changed: 74 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -31,10 +31,14 @@ struct Responder {
3131
inner: Arc<RwLock<Box<dyn RSocket>>>,
3232
}
3333

34+
type Single = oneshot::Sender<RSocketResult<Payload>>;
35+
type Multi = mpsc::UnboundedSender<RSocketResult<Payload>>;
36+
3437
#[derive(Debug)]
3538
enum Handler {
36-
Request(oneshot::Sender<RSocketResult<Payload>>),
37-
Stream(mpsc::UnboundedSender<RSocketResult<Payload>>),
39+
Request(Single),
40+
Stream(Multi),
41+
Channel(Multi),
3842
}
3943

4044
#[derive(Debug)]
@@ -164,6 +168,18 @@ impl DuplexSocket {
164168
senders.insert(sid, Handler::Stream(sender));
165169
}
166170
}
171+
Handler::Channel(sender) => {
172+
// TODO: support channel
173+
if flag & frame::FLAG_NEXT != 0 {
174+
sender.clone().send(Ok(input)).unwrap();
175+
}
176+
if flag & frame::FLAG_COMPLETE != 0 {
177+
// steam end
178+
drop(sender);
179+
} else {
180+
senders.insert(sid, Handler::Stream(sender));
181+
}
182+
}
167183
};
168184
}
169185

@@ -257,6 +273,16 @@ impl DuplexSocket {
257273
}
258274
tx.send(sending.build()).unwrap();
259275
}
276+
277+
fn request_channel22(&self, mut reqs: Flux<Payload>) -> Flux<Payload> {
278+
let sid = self.seq.next();
279+
let tx = self.tx.clone();
280+
// register handler
281+
let (sender, receiver) = mpsc::unbounded_channel::<RSocketResult<Payload>>();
282+
self.register_handler(sid, Handler::Stream(sender));
283+
tokio::spawn(async move { while let Some(req) = reqs.next().await {} });
284+
Box::pin(receiver)
285+
}
260286
}
261287

262288
impl RSocket for DuplexSocket {
@@ -272,9 +298,7 @@ impl RSocket for DuplexSocket {
272298
let sending = bu.build();
273299
match tx.send(sending) {
274300
Ok(()) => Ok(()),
275-
Err(_e) => Err(RSocketError::from(ErrorKind::WithDescription(
276-
"send metadata_push failed",
277-
))),
301+
Err(_e) => Err(RSocketError::from("send metadata_push failed")),
278302
}
279303
})
280304
}
@@ -294,9 +318,7 @@ impl RSocket for DuplexSocket {
294318
let sending = bu.build();
295319
match tx.send(sending) {
296320
Ok(()) => Ok(()),
297-
Err(_e) => Err(RSocketError::from(ErrorKind::WithDescription(
298-
"send fire_and_forget failed",
299-
))),
321+
Err(_e) => Err(RSocketError::from("send fire_and_forget failed")),
300322
}
301323
})
302324
}
@@ -324,9 +346,7 @@ impl RSocket for DuplexSocket {
324346
Box::pin(async move {
325347
match rx.await {
326348
Ok(v) => v,
327-
Err(_e) => Err(RSocketError::from(ErrorKind::WithDescription(
328-
"request_response failed",
329-
))),
349+
Err(_e) => Err(RSocketError::from("request_response failed")),
330350
}
331351
})
332352
}
@@ -352,6 +372,45 @@ impl RSocket for DuplexSocket {
352372
});
353373
Box::pin(receiver)
354374
}
375+
376+
fn request_channel(&self, mut reqs: Flux<Payload>) -> Flux<Payload> {
377+
let sid = self.seq.next();
378+
let tx = self.tx.clone();
379+
// register handler
380+
let (sender, receiver) = mpsc::unbounded_channel::<RSocketResult<Payload>>();
381+
self.register_handler(sid, Handler::Channel(sender));
382+
tokio::spawn(async move {
383+
let mut sent: u64 = 0;
384+
while let Some(it) = reqs.next().await {
385+
// TODO: check Err
386+
let (d, m) = it.unwrap().split();
387+
sent += 1;
388+
let sending = if sent == 1 {
389+
let mut bu = frame::RequestChannel::builder(sid, frame::FLAG_NEXT);
390+
if let Some(b) = d {
391+
bu = bu.set_data(b);
392+
}
393+
if let Some(b) = m {
394+
bu = bu.set_metadata(b);
395+
}
396+
bu.build()
397+
} else {
398+
let mut bu = frame::Payload::builder(sid, frame::FLAG_NEXT);
399+
if let Some(b) = d {
400+
bu = bu.set_data(b);
401+
}
402+
if let Some(b) = m {
403+
bu = bu.set_metadata(b);
404+
}
405+
bu.build()
406+
};
407+
tx.send(sending).unwrap();
408+
}
409+
let sending = frame::Payload::builder(sid, frame::FLAG_COMPLETE).build();
410+
tx.send(sending).unwrap();
411+
});
412+
Box::pin(receiver)
413+
}
355414
}
356415

357416
impl Handlers {
@@ -405,4 +464,8 @@ impl RSocket for Responder {
405464
let inner = self.inner.read().unwrap();
406465
(*inner).request_stream(req)
407466
}
467+
fn request_channel(&self, reqs: Flux<Payload>) -> Flux<Payload> {
468+
let inner = self.inner.read().unwrap();
469+
(*inner).request_channel(reqs)
470+
}
408471
}

src/x/client.rs

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -136,4 +136,7 @@ impl RSocket for Client {
136136
fn request_stream(&self, req: Payload) -> Flux<Payload> {
137137
self.socket.request_stream(req)
138138
}
139+
fn request_channel(&self, reqs: Flux<Payload>) -> Flux<Payload> {
140+
self.socket.request_channel(reqs)
141+
}
139142
}

tests/clients.rs

Lines changed: 22 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,8 @@
11
extern crate rsocket_rust;
22
#[macro_use]
33
extern crate log;
4+
use futures::prelude::*;
5+
use futures::stream;
46
use rsocket_rust::prelude::*;
57

68
#[tokio::main]
@@ -15,10 +17,11 @@ async fn test_client() {
1517
.start()
1618
.await
1719
.unwrap();
18-
exec_metadata_push(&cli).await;
19-
exec_fire_and_forget(&cli).await;
20-
exec_request_response(&cli).await;
21-
exec_request_stream(&cli).await;
20+
// exec_metadata_push(&cli).await;
21+
// exec_fire_and_forget(&cli).await;
22+
// exec_request_response(&cli).await;
23+
// exec_request_stream(&cli).await;
24+
exec_request_channel(&cli).await;
2225
cli.close();
2326
}
2427

@@ -59,3 +62,18 @@ async fn exec_request_stream(socket: &Client) {
5962
}
6063
}
6164
}
65+
66+
async fn exec_request_channel(socket: &Client) {
67+
let mut sends = vec![];
68+
for i in 0..10 {
69+
let pa = Payload::builder()
70+
.set_data_utf8(&format!("Hello#{}", i))
71+
.set_metadata_utf8("RUST")
72+
.build();
73+
sends.push(Ok(pa));
74+
}
75+
let mut results = socket.request_channel(Box::pin(stream::iter(sends)));
76+
while let Some(v) = results.next().await {
77+
println!("====> next in channel: {:?}", v);
78+
}
79+
}

tests/routing_metadata_test.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
extern crate rsocket_rust;
22

3-
use bytes::{Bytes, BytesMut};
3+
use bytes::BytesMut;
44
use rsocket_rust::extension::RoutingMetadata;
55

66
#[test]

0 commit comments

Comments
 (0)