Skip to content

Commit 5cddb8c

Browse files
authored
feat: change transport api (#35)
1 parent 65b3900 commit 5cddb8c

File tree

14 files changed

+185
-338
lines changed

14 files changed

+185
-338
lines changed

examples/Cargo.toml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -48,4 +48,4 @@ path = "tls/server.rs"
4848

4949
[[example]]
5050
name = "tls-client"
51-
path = "tls/client.rs"
51+
path = "tls/client.rs"

rsocket-test/tests/test_clients.rs

Lines changed: 0 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -74,7 +74,6 @@ fn test_websocket() {
7474
exec_request_response(&cli).await;
7575
exec_request_stream(&cli).await;
7676
exec_request_channel(&cli).await;
77-
cli.close();
7877
});
7978
}
8079

@@ -120,7 +119,6 @@ fn test_tcp() {
120119
exec_request_response(&cli).await;
121120
exec_request_stream(&cli).await;
122121
exec_request_channel(&cli).await;
123-
cli.close();
124122
});
125123
}
126124

@@ -176,7 +174,6 @@ fn test_unix() {
176174
exec_request_response(&cli).await;
177175
exec_request_stream(&cli).await;
178176
exec_request_channel(&cli).await;
179-
cli.close();
180177
});
181178
}
182179

rsocket-transport-tcp/src/connection/tcp.rs

Lines changed: 8 additions & 42 deletions
Original file line numberDiff line numberDiff line change
@@ -1,57 +1,23 @@
1-
use super::codec::LengthBasedFrameCodec;
2-
use futures::stream::{SplitSink, SplitStream};
31
use futures::{SinkExt, StreamExt};
4-
use rsocket_rust::async_trait;
5-
use rsocket_rust::frame::Frame;
6-
use rsocket_rust::transport::{Connection, Reader, Writer};
7-
use rsocket_rust::{error::RSocketError, Result};
82
use tokio::net::TcpStream;
93
use tokio_util::codec::Framed;
104

5+
use rsocket_rust::error::RSocketError;
6+
use rsocket_rust::transport::{Connection, FrameSink, FrameStream};
7+
8+
use super::codec::LengthBasedFrameCodec;
9+
1110
#[derive(Debug)]
1211
pub struct TcpConnection {
1312
stream: TcpStream,
1413
}
1514

16-
struct InnerWriter {
17-
sink: SplitSink<Framed<TcpStream, LengthBasedFrameCodec>, Frame>,
18-
}
19-
20-
struct InnerReader {
21-
stream: SplitStream<Framed<TcpStream, LengthBasedFrameCodec>>,
22-
}
23-
24-
#[async_trait]
25-
impl Writer for InnerWriter {
26-
async fn write(&mut self, frame: Frame) -> Result<()> {
27-
match self.sink.send(frame).await {
28-
Ok(()) => Ok(()),
29-
Err(e) => Err(RSocketError::IO(e).into()),
30-
}
31-
}
32-
}
33-
34-
#[async_trait]
35-
impl Reader for InnerReader {
36-
async fn read(&mut self) -> Option<Result<Frame>> {
37-
self.stream
38-
.next()
39-
.await
40-
.map(|next| next.map_err(|e| RSocketError::IO(e).into()))
41-
}
42-
}
43-
4415
impl Connection for TcpConnection {
45-
fn split(
46-
self,
47-
) -> (
48-
Box<dyn Writer + Send + Unpin>,
49-
Box<dyn Reader + Send + Unpin>,
50-
) {
16+
fn split(self) -> (Box<FrameSink>, Box<FrameStream>) {
5117
let (sink, stream) = Framed::new(self.stream, LengthBasedFrameCodec).split();
5218
(
53-
Box::new(InnerWriter { sink }),
54-
Box::new(InnerReader { stream }),
19+
Box::new(sink.sink_map_err(|e| RSocketError::Other(e.into()))),
20+
Box::new(stream.map(|next| next.map_err(|e| RSocketError::Other(e.into())))),
5521
)
5622
}
5723
}

rsocket-transport-tcp/src/connection/tls.rs

Lines changed: 5 additions & 41 deletions
Original file line numberDiff line numberDiff line change
@@ -1,10 +1,7 @@
11
use super::codec::LengthBasedFrameCodec;
2-
use futures::stream::{SplitSink, SplitStream};
32
use futures::{SinkExt, StreamExt};
4-
use rsocket_rust::async_trait;
5-
use rsocket_rust::frame::Frame;
6-
use rsocket_rust::transport::{Connection, Reader, Writer};
7-
use rsocket_rust::{error::RSocketError, Result};
3+
use rsocket_rust::error::RSocketError;
4+
use rsocket_rust::transport::{Connection, FrameSink, FrameStream};
85
use tokio::net::TcpStream;
96
use tokio_native_tls::TlsStream;
107
use tokio_util::codec::Framed;
@@ -14,45 +11,12 @@ pub struct TlsConnection {
1411
stream: TlsStream<TcpStream>,
1512
}
1613

17-
struct InnerWriter {
18-
sink: SplitSink<Framed<TlsStream<TcpStream>, LengthBasedFrameCodec>, Frame>,
19-
}
20-
21-
struct InnerReader {
22-
stream: SplitStream<Framed<TlsStream<TcpStream>, LengthBasedFrameCodec>>,
23-
}
24-
25-
#[async_trait]
26-
impl Writer for InnerWriter {
27-
async fn write(&mut self, frame: Frame) -> Result<()> {
28-
match self.sink.send(frame).await {
29-
Ok(()) => Ok(()),
30-
Err(e) => Err(RSocketError::IO(e).into()),
31-
}
32-
}
33-
}
34-
35-
#[async_trait]
36-
impl Reader for InnerReader {
37-
async fn read(&mut self) -> Option<Result<Frame>> {
38-
self.stream
39-
.next()
40-
.await
41-
.map(|next| next.map_err(|e| RSocketError::IO(e).into()))
42-
}
43-
}
44-
4514
impl Connection for TlsConnection {
46-
fn split(
47-
self,
48-
) -> (
49-
Box<dyn Writer + Send + Unpin>,
50-
Box<dyn Reader + Send + Unpin>,
51-
) {
15+
fn split(self) -> (Box<FrameSink>, Box<FrameStream>) {
5216
let (sink, stream) = Framed::new(self.stream, LengthBasedFrameCodec).split();
5317
(
54-
Box::new(InnerWriter { sink }),
55-
Box::new(InnerReader { stream }),
18+
Box::new(sink.sink_map_err(|e| RSocketError::Other(e.into()))),
19+
Box::new(stream.map(|it| it.map_err(|e| RSocketError::Other(e.into())))),
5620
)
5721
}
5822
}

rsocket-transport-tcp/src/connection/uds.rs

Lines changed: 5 additions & 41 deletions
Original file line numberDiff line numberDiff line change
@@ -1,10 +1,7 @@
11
use super::codec::LengthBasedFrameCodec;
2-
use futures::stream::{SplitSink, SplitStream};
32
use futures::{SinkExt, StreamExt};
4-
use rsocket_rust::async_trait;
5-
use rsocket_rust::frame::Frame;
6-
use rsocket_rust::transport::{Connection, Reader, Writer};
7-
use rsocket_rust::{error::RSocketError, Result};
3+
use rsocket_rust::error::RSocketError;
4+
use rsocket_rust::transport::{Connection, FrameSink, FrameStream};
85
use tokio::net::UnixStream;
96
use tokio_util::codec::Framed;
107

@@ -13,49 +10,16 @@ pub struct UnixConnection {
1310
stream: UnixStream,
1411
}
1512

16-
struct InnerWriter {
17-
sink: SplitSink<Framed<UnixStream, LengthBasedFrameCodec>, Frame>,
18-
}
19-
20-
struct InnerReader {
21-
stream: SplitStream<Framed<UnixStream, LengthBasedFrameCodec>>,
22-
}
23-
2413
impl Connection for UnixConnection {
25-
fn split(
26-
self,
27-
) -> (
28-
Box<dyn Writer + Send + Unpin>,
29-
Box<dyn Reader + Send + Unpin>,
30-
) {
14+
fn split(self) -> (Box<FrameSink>, Box<FrameStream>) {
3115
let (sink, stream) = Framed::new(self.stream, LengthBasedFrameCodec).split();
3216
(
33-
Box::new(InnerWriter { sink }),
34-
Box::new(InnerReader { stream }),
17+
Box::new(sink.sink_map_err(|e| RSocketError::Other(e.into()))),
18+
Box::new(stream.map(|it| it.map_err(|e| RSocketError::Other(e.into())))),
3519
)
3620
}
3721
}
3822

39-
#[async_trait]
40-
impl Writer for InnerWriter {
41-
async fn write(&mut self, frame: Frame) -> Result<()> {
42-
match self.sink.send(frame).await {
43-
Ok(()) => Ok(()),
44-
Err(e) => Err(RSocketError::IO(e).into()),
45-
}
46-
}
47-
}
48-
49-
#[async_trait]
50-
impl Reader for InnerReader {
51-
async fn read(&mut self) -> Option<Result<Frame>> {
52-
self.stream
53-
.next()
54-
.await
55-
.map(|it| it.map_err(|e| RSocketError::IO(e).into()))
56-
}
57-
}
58-
5923
impl From<UnixStream> for UnixConnection {
6024
fn from(stream: UnixStream) -> UnixConnection {
6125
UnixConnection { stream }

rsocket-transport-wasm/Cargo.toml

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -12,12 +12,13 @@ description = "WASM Websocket RSocket transport implementation."
1212
[dependencies]
1313
bytes = "0.6.0"
1414
wasm-bindgen-futures = "0.4.19"
15-
futures-channel = "0.3.8"
16-
futures-util = "0.3.8"
15+
futures-channel = "0.3.9"
16+
futures-util = "0.3.9"
1717
js-sys = "0.3.46"
1818
serde = "1.0.118"
1919
serde_derive = "1.0.118"
2020
async-trait = "0.1.42"
21+
log = "0.4.11"
2122

2223
[dependencies.rsocket_rust]
2324
path = "../rsocket"

rsocket-transport-wasm/src/client.rs

Lines changed: 2 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -15,16 +15,6 @@ use wasm_bindgen::prelude::*;
1515
use wasm_bindgen::JsCast;
1616
use web_sys::{ErrorEvent, Event, FileReader, MessageEvent, ProgressEvent, WebSocket};
1717

18-
macro_rules! console_log {
19-
($($t:tt)*) => (log(&format_args!($($t)*).to_string()))
20-
}
21-
22-
#[wasm_bindgen]
23-
extern "C" {
24-
#[wasm_bindgen(js_namespace = console)]
25-
fn log(s: &str);
26-
}
27-
2818
pub struct WebsocketClientTransport {
2919
url: String,
3020
}
@@ -104,15 +94,15 @@ impl Transport for WebsocketClientTransport {
10494

10595
// on error
10696
let on_error = Closure::wrap(Box::new(move |e: ErrorEvent| {
107-
console_log!("websocket error: {}", e.message());
97+
log::error!("websocket error: {}", e.message());
10898
})
10999
as Box<dyn FnMut(ErrorEvent)>);
110100
ws.set_onerror(Some(on_error.as_ref().unchecked_ref()));
111101
on_error.forget();
112102

113103
// on_close
114104
let on_close = Closure::once(Box::new(move |_e: Event| {
115-
console_log!("websocket closed");
105+
log::info!("websocket closed");
116106
}) as Box<dyn FnMut(Event)>);
117107
ws.set_onclose(Some(on_close.as_ref().unchecked_ref()));
118108
on_close.forget();
Lines changed: 5 additions & 36 deletions
Original file line numberDiff line numberDiff line change
@@ -1,56 +1,25 @@
1-
use async_trait::async_trait;
21
use futures_channel::mpsc;
32
use futures_util::{SinkExt, StreamExt};
4-
use rsocket_rust::transport::{Connection, Reader, Writer};
5-
use rsocket_rust::{error::RSocketError, frame::Frame, Result};
3+
use rsocket_rust::transport::{Connection, FrameSink, FrameStream};
4+
use rsocket_rust::{error::RSocketError, frame::Frame};
65

76
#[derive(Debug)]
87
pub struct WebsocketConnection {
98
rx: mpsc::Receiver<Frame>,
109
tx: mpsc::Sender<Frame>,
1110
}
1211

13-
struct InnerWriter {
14-
tx: mpsc::Sender<Frame>,
15-
}
16-
17-
struct InnerReader {
18-
rx: mpsc::Receiver<Frame>,
19-
}
20-
2112
impl WebsocketConnection {
2213
pub(crate) fn new(tx: mpsc::Sender<Frame>, rx: mpsc::Receiver<Frame>) -> WebsocketConnection {
2314
WebsocketConnection { rx, tx }
2415
}
2516
}
2617

27-
#[async_trait]
28-
impl Writer for InnerWriter {
29-
async fn write(&mut self, frame: Frame) -> Result<()> {
30-
match self.tx.send(frame).await {
31-
Ok(()) => Ok(()),
32-
Err(e) => Err(RSocketError::Other(e.into()).into()),
33-
}
34-
}
35-
}
36-
37-
#[async_trait]
38-
impl Reader for InnerReader {
39-
async fn read(&mut self) -> Option<Result<Frame>> {
40-
self.rx.next().await.map(|frame| Ok(frame))
41-
}
42-
}
43-
4418
impl Connection for WebsocketConnection {
45-
fn split(
46-
self,
47-
) -> (
48-
Box<dyn Writer + Send + Unpin>,
49-
Box<dyn Reader + Send + Unpin>,
50-
) {
19+
fn split(self) -> (Box<FrameSink>, Box<FrameStream>) {
5120
(
52-
Box::new(InnerWriter { tx: self.tx }),
53-
Box::new(InnerReader { rx: self.rx }),
21+
Box::new(self.tx.sink_map_err(|e| RSocketError::Other(e.into()))),
22+
Box::new(self.rx.map(|it| Ok(it))),
5423
)
5524
}
5625
}

0 commit comments

Comments
 (0)