Skip to content

Commit 566a770

Browse files
committed
support server-side request_channel.
1 parent 63f2b08 commit 566a770

File tree

7 files changed

+107
-35
lines changed

7 files changed

+107
-35
lines changed

Cargo.toml

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
[package]
22
name = "rsocket_rust"
3-
version = "0.2.0"
3+
version = "0.3.0"
44
authors = ["Jeffsky <[email protected]>"]
55
edition = "2018"
66
license = "Apache-2.0"
@@ -15,7 +15,7 @@ log = "0.4.8"
1515
bytes = "0.5.2"
1616
futures = "0.3.1"
1717
lazy_static = "1.4.0"
18-
reactor_rs = {git = "https://github.com/jjeffcaii/reactor-rust", branch = "develop"}
18+
# reactor_rs = {git = "https://github.com/jjeffcaii/reactor-rust", branch = "develop"}
1919

2020
[dependencies.tokio]
2121
version = "0.2.2"

README.md

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -80,12 +80,12 @@ async fn test() {
8080
- [x] REQUEST_FNF
8181
- [x] REQUEST_RESPONSE
8282
- [x] REQUEST_STREAM
83-
- [ ] REQUEST_CHANNEL
83+
- [x] REQUEST_CHANNEL
8484
- Transport
8585
- [x] TCP
8686
- [ ] Websocket
8787
- Reactor
8888
- [ ] ...
8989
- High Level APIs
90-
- [ ] Client
91-
- [ ] Server
90+
- [x] Client
91+
- [x] Server

src/spi.rs

Lines changed: 19 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@ use futures::{Sink, SinkExt, Stream, StreamExt};
77
use std::future::Future;
88
use std::pin::Pin;
99
use std::sync::Arc;
10+
use tokio::sync::mpsc;
1011

1112
// TODO: switch to reactor-rust.
1213
pub type Mono<T> = Pin<Box<dyn Send + Sync + Future<Output = RSocketResult<T>>>>;
@@ -24,28 +25,37 @@ pub struct EchoRSocket;
2425

2526
impl RSocket for EchoRSocket {
2627
fn metadata_push(&self, req: Payload) -> Mono<()> {
27-
info!("echo metadata_push: {:?}", req);
28+
info!("{:?}", req);
2829
Box::pin(future::ok::<(), RSocketError>(()))
2930
}
3031
fn fire_and_forget(&self, req: Payload) -> Mono<()> {
31-
info!("echo fire_and_forget: {:?}", req);
32+
info!("{:?}", req);
3233
Box::pin(future::ok::<(), RSocketError>(()))
3334
}
3435
fn request_response(&self, req: Payload) -> Mono<Payload> {
35-
info!("echo request_response: {:?}", req);
36+
info!("{:?}", req);
3637
Box::pin(future::ok::<Payload, RSocketError>(req))
3738
}
3839
fn request_stream(&self, req: Payload) -> Flux<Payload> {
39-
info!("echo request_stream: {:?}", req);
40+
info!("{:?}", req);
4041
Box::pin(futures::stream::iter(vec![
4142
Ok(req.clone()),
4243
Ok(req.clone()),
4344
Ok(req),
4445
]))
4546
}
46-
fn request_channel(&self, reqs: Flux<Payload>) -> Flux<Payload> {
47-
info!("echo request_channel");
48-
reqs
47+
fn request_channel(&self, mut reqs: Flux<Payload>) -> Flux<Payload> {
48+
let (sender, receiver) = mpsc::unbounded_channel::<RSocketResult<Payload>>();
49+
tokio::spawn(async move {
50+
while let Some(it) = reqs.next().await {
51+
let pa = it.unwrap();
52+
info!("{:?}", pa);
53+
sender.send(Ok(pa)).unwrap();
54+
}
55+
});
56+
Box::pin(receiver)
57+
// or returns directly
58+
// reqs
4959
}
5060
}
5161

@@ -70,10 +80,10 @@ impl RSocket for EmptyRSocket {
7080
Box::pin(future::err(self.must_failed()))
7181
}
7282

73-
fn request_stream(&self, req: Payload) -> Flux<Payload> {
83+
fn request_stream(&self, _req: Payload) -> Flux<Payload> {
7484
Box::pin(futures::stream::iter(vec![Err(self.must_failed())]))
7585
}
76-
fn request_channel(&self, mut reqs: Flux<Payload>) -> Flux<Payload> {
86+
fn request_channel(&self, _reqs: Flux<Payload>) -> Flux<Payload> {
7787
Box::pin(futures::stream::iter(vec![Err(self.must_failed())]))
7888
}
7989
}

src/transport/misc.rs

Lines changed: 28 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,7 @@ use std::{
99
collections::HashMap,
1010
future::Future,
1111
sync::{
12-
atomic::{AtomicU32, Ordering},
12+
atomic::{AtomicI64, AtomicU32, Ordering},
1313
Arc, Mutex, RwLock,
1414
},
1515
};
@@ -37,3 +37,30 @@ impl From<u32> for StreamID {
3737
StreamID::new(v)
3838
}
3939
}
40+
41+
#[derive(Debug, Clone)]
42+
pub(crate) struct Counter {
43+
inner: Arc<AtomicI64>,
44+
}
45+
46+
impl Counter {
47+
pub(crate) fn new(value: i64) -> Counter {
48+
Counter {
49+
inner: Arc::new(AtomicI64::new(value)),
50+
}
51+
}
52+
53+
pub(crate) fn count_down(&self) -> i64 {
54+
let c = self.inner.clone();
55+
c.fetch_add(-1, Ordering::SeqCst)
56+
}
57+
}
58+
59+
#[inline]
60+
pub(crate) fn debug_frame(snd: bool, f: &frame::Frame) {
61+
if snd {
62+
debug!("===> SND: {:?}", f);
63+
} else {
64+
debug!("<=== RCV: {:?}", f);
65+
}
66+
}

src/transport/socket.rs

Lines changed: 49 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
use super::misc::StreamID;
1+
use super::misc::{self, Counter, StreamID};
22
use super::spi::{Rx, Transport, Tx};
33
use crate::errors::{ErrorKind, RSocketError};
44
use crate::frame::{self, Body, Frame};
@@ -12,6 +12,7 @@ use std::env;
1212
use std::error::Error;
1313
use std::future::Future;
1414
use std::pin::Pin;
15+
use std::ptr;
1516
use std::sync::{Arc, Mutex, RwLock};
1617
use tokio::net::TcpListener;
1718
use tokio::prelude::*;
@@ -33,12 +34,13 @@ struct Responder {
3334

3435
type Single = oneshot::Sender<RSocketResult<Payload>>;
3536
type Multi = mpsc::UnboundedSender<RSocketResult<Payload>>;
37+
type MultiReceiver = mpsc::UnboundedReceiver<RSocketResult<Payload>>;
3638

3739
#[derive(Debug)]
3840
enum Handler {
3941
Request(Single),
4042
Stream(Multi),
41-
Channel(Multi),
43+
Channel((Multi, Counter)),
4244
}
4345

4446
#[derive(Debug)]
@@ -95,7 +97,7 @@ impl DuplexSocket {
9597
while let Some(msg) = rx.recv().await {
9698
let sid = msg.get_stream_id();
9799
let flag = msg.get_flag();
98-
debug!("<--- RCV: {:?}", msg);
100+
misc::debug_frame(false, &msg);
99101
match msg.get_body() {
100102
Body::Setup(v) => self.on_setup(&acceptor, sid, flag, SetupPayload::from(v)),
101103
Body::Resume(v) => {
@@ -122,6 +124,8 @@ impl DuplexSocket {
122124
}
123125
Body::RequestChannel(v) => {
124126
// TODO: support channel
127+
let input = Payload::from(v);
128+
self.on_request_channel(sid, flag, input).await;
125129
}
126130
Body::Payload(v) => {
127131
let input = Payload::from(v);
@@ -159,25 +163,19 @@ impl DuplexSocket {
159163
Handler::Request(sender) => sender.send(Ok(input)).unwrap(),
160164
Handler::Stream(sender) => {
161165
if flag & frame::FLAG_NEXT != 0 {
162-
sender.clone().send(Ok(input)).unwrap();
166+
sender.send(Ok(input)).unwrap();
163167
}
164-
if flag & frame::FLAG_COMPLETE != 0 {
165-
// steam end
166-
drop(sender);
167-
} else {
168+
if flag & frame::FLAG_COMPLETE == 0 {
168169
senders.insert(sid, Handler::Stream(sender));
169170
}
170171
}
171-
Handler::Channel(sender) => {
172+
Handler::Channel((sender, cdl)) => {
172173
// TODO: support channel
173174
if flag & frame::FLAG_NEXT != 0 {
174-
sender.clone().send(Ok(input)).unwrap();
175+
sender.send(Ok(input)).unwrap();
175176
}
176-
if flag & frame::FLAG_COMPLETE != 0 {
177-
// steam end
178-
drop(sender);
179-
} else {
180-
senders.insert(sid, Handler::Stream(sender));
177+
if flag & frame::FLAG_COMPLETE == 0 {
178+
senders.insert(sid, Handler::Channel((sender, cdl)));
181179
}
182180
}
183181
};
@@ -256,6 +254,39 @@ impl DuplexSocket {
256254
});
257255
}
258256

257+
#[inline]
258+
async fn on_request_channel(&self, sid: u32, flag: u16, first: Payload) {
259+
let responder = self.responder.clone();
260+
let tx = self.tx.clone();
261+
let (sender, receiver) = mpsc::unbounded_channel::<RSocketResult<Payload>>();
262+
sender.send(Ok(first)).unwrap();
263+
let cdl = Counter::new(2);
264+
self.register_handler(sid, Handler::Channel((sender, cdl.clone())));
265+
tokio::spawn(async move {
266+
// respond client channel
267+
let inputs: Flux<Payload> = Box::pin(receiver);
268+
let mut outputs = responder.request_channel(inputs);
269+
// TODO: support custom RequestN.
270+
let request_n = frame::RequestN::builder(sid, 0).build();
271+
tx.send(request_n).unwrap();
272+
273+
while let Some(v) = outputs.next().await {
274+
let (d, m) = v.unwrap().split();
275+
let mut bu = frame::Payload::builder(sid, frame::FLAG_NEXT);
276+
if let Some(b) = d {
277+
bu = bu.set_data(b);
278+
}
279+
if let Some(b) = m {
280+
bu = bu.set_metadata(b);
281+
}
282+
let sending = bu.build();
283+
tx.send(sending).unwrap();
284+
}
285+
let complete = frame::Payload::builder(sid, frame::FLAG_COMPLETE).build();
286+
tx.send(complete).unwrap();
287+
});
288+
}
289+
259290
#[inline]
260291
async fn on_metadata_push(&self, input: Payload) {
261292
if let Err(e) = self.responder.clone().metadata_push(input).await {
@@ -378,7 +409,8 @@ impl RSocket for DuplexSocket {
378409
let tx = self.tx.clone();
379410
// register handler
380411
let (sender, receiver) = mpsc::unbounded_channel::<RSocketResult<Payload>>();
381-
self.register_handler(sid, Handler::Channel(sender));
412+
let cdl = Counter::new(2);
413+
self.register_handler(sid, Handler::Channel((sender, cdl.clone())));
382414
tokio::spawn(async move {
383415
let mut sent: u64 = 0;
384416
while let Some(it) = reqs.next().await {
@@ -406,6 +438,7 @@ impl RSocket for DuplexSocket {
406438
};
407439
tx.send(sending).unwrap();
408440
}
441+
cdl.count_down();
409442
let sending = frame::Payload::builder(sid, frame::FLAG_COMPLETE).build();
410443
tx.send(sending).unwrap();
411444
});

src/transport/tcp.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,5 @@
11
use super::codec::RFrameCodec;
2+
use super::misc;
23
use super::spi::{Rx, Tx};
34
use crate::frame::Frame;
45
use futures::{Sink, SinkExt, Stream, StreamExt};
@@ -30,6 +31,7 @@ pub async fn process(socket: TcpStream, mut inputs: Rx, outputs: Tx) {
3031
});
3132
// loop write
3233
while let Some(it) = inputs.recv().await {
34+
misc::debug_frame(true, &it);
3335
writer.send(it).await.unwrap()
3436
}
3537
}

tests/clients.rs

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -17,10 +17,10 @@ async fn test_client() {
1717
.start()
1818
.await
1919
.unwrap();
20-
// exec_metadata_push(&cli).await;
21-
// exec_fire_and_forget(&cli).await;
22-
// exec_request_response(&cli).await;
23-
// exec_request_stream(&cli).await;
20+
exec_metadata_push(&cli).await;
21+
exec_fire_and_forget(&cli).await;
22+
exec_request_response(&cli).await;
23+
exec_request_stream(&cli).await;
2424
exec_request_channel(&cli).await;
2525
cli.close();
2626
}

0 commit comments

Comments
 (0)