Skip to content

Commit 6950459

Browse files
authored
fix: register client-side responder correctly (#36)
1 parent 5cddb8c commit 6950459

File tree

6 files changed

+33
-30
lines changed

6 files changed

+33
-30
lines changed

rsocket/src/core/client.rs

Lines changed: 7 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,7 @@ use crate::payload::{Payload, SetupPayload, SetupPayloadBuilder};
1414
use crate::runtime;
1515
use crate::spi::{ClientResponder, Flux, RSocket};
1616
use crate::transport::{
17-
self, Acceptor, Connection, DuplexSocket, FrameSink, FrameStream, Splitter, Transport,
17+
self, Connection, DuplexSocket, FrameSink, FrameStream, Splitter, Transport,
1818
};
1919
use crate::Result;
2020

@@ -127,7 +127,11 @@ where
127127
let mut socket = DuplexSocket::new(1, snd_tx, splitter).await;
128128

129129
let mut cloned_socket = socket.clone();
130-
let acceptor: Option<Acceptor> = self.responder.map(|it| Acceptor::Simple(Arc::new(it)));
130+
131+
if let Some(f) = self.responder {
132+
let responder = f();
133+
socket.bind_responder(responder).await;
134+
}
131135

132136
let conn = tp.connect().await?;
133137
let (mut sink, mut stream) = conn.split();
@@ -191,7 +195,7 @@ where
191195

192196
runtime::spawn(async move {
193197
while let Some(next) = read_rx.next().await {
194-
if let Err(e) = cloned_socket.dispatch(next, &acceptor).await {
198+
if let Err(e) = cloned_socket.dispatch(next, None).await {
195199
error!("dispatch frame failed: {}", e);
196200
break;
197201
}

rsocket/src/core/server.rs

Lines changed: 5 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -3,9 +3,7 @@ use crate::frame::{self, Frame};
33
use crate::payload::SetupPayload;
44
use crate::runtime;
55
use crate::spi::{RSocket, ServerResponder};
6-
use crate::transport::{
7-
Acceptor, Connection, DuplexSocket, ServerTransport, Splitter, Transport, MIN_MTU,
8-
};
6+
use crate::transport::{Connection, DuplexSocket, ServerTransport, Splitter, Transport, MIN_MTU};
97
use crate::utils::EmptyRSocket;
108
use crate::Result;
119
use futures::{SinkExt, StreamExt};
@@ -70,7 +68,7 @@ where
7068
{
7169
pub async fn serve(mut self) -> Result<()> {
7270
let mut server_transport = self.transport.take().expect("missing transport");
73-
let acceptor = self.on_setup.map(|v| Acceptor::Generate(Arc::new(v)));
71+
// let acceptor = self.on_setup.map(|v| Acceptor::Generate(Arc::new(v)));
7472

7573
let mtu = self.mtu;
7674

@@ -80,6 +78,7 @@ where
8078
invoke();
8179
}
8280

81+
let acceptor = Arc::new(self.on_setup);
8382
while let Some(next) = server_transport.next().await {
8483
match next {
8584
Ok(tp) => {
@@ -99,7 +98,7 @@ where
9998
}
10099

101100
#[inline]
102-
async fn on_transport(mtu: usize, tp: C, acceptor: Option<Acceptor>) -> Result<()> {
101+
async fn on_transport(mtu: usize, tp: C, acceptor: Arc<Option<ServerResponder>>) -> Result<()> {
103102
// Establish connection.
104103
let conn = tp.connect().await?;
105104
let (mut writer, mut reader) = conn.split();
@@ -148,7 +147,7 @@ where
148147
});
149148

150149
while let Some(frame) = read_rx.next().await {
151-
if let Err(e) = socket.dispatch(frame, &acceptor).await {
150+
if let Err(e) = socket.dispatch(frame, acceptor.as_ref().as_ref()).await {
152151
error!("dispatch incoming frame failed: {}", e);
153152
break;
154153
}

rsocket/src/lib.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -96,6 +96,8 @@ pub use async_stream::stream;
9696
/// A re-export of [`async-trait`](https://docs.rs/async-trait) for use with RSocket trait implementation.
9797
pub use async_trait::async_trait;
9898

99+
#[macro_use]
100+
extern crate anyhow;
99101
#[macro_use]
100102
extern crate log;
101103
#[macro_use]

rsocket/src/transport/socket.rs

Lines changed: 10 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,7 @@ use super::spi::*;
44
use crate::error::{self, RSocketError};
55
use crate::frame::{self, Body, Frame};
66
use crate::payload::{Payload, SetupPayload};
7-
use crate::spi::{Flux, RSocket};
7+
use crate::spi::{Flux, RSocket, ServerResponder};
88
use crate::utils::EmptyRSocket;
99
use crate::{runtime, Result};
1010
use async_stream::stream;
@@ -103,7 +103,7 @@ impl DuplexSocket {
103103
pub(crate) async fn dispatch(
104104
&mut self,
105105
frame: Frame,
106-
acceptor: &Option<Acceptor>,
106+
acceptor: Option<&ServerResponder>,
107107
) -> Result<()> {
108108
if let Some(frame) = self.join_frame(frame).await {
109109
self.process_once(frame, acceptor).await;
@@ -112,7 +112,7 @@ impl DuplexSocket {
112112
}
113113

114114
#[inline]
115-
async fn process_once(&mut self, msg: Frame, acceptor: &Option<Acceptor>) {
115+
async fn process_once(&mut self, msg: Frame, acceptor: Option<&ServerResponder>) {
116116
let sid = msg.get_stream_id();
117117
let flag = msg.get_flag();
118118
debug_frame(false, &msg);
@@ -343,10 +343,14 @@ impl DuplexSocket {
343343
}
344344
}
345345

346+
pub(crate) async fn bind_responder(&self, responder: Box<dyn RSocket>) {
347+
self.responder.set(responder).await;
348+
}
349+
346350
#[inline]
347351
async fn on_setup(
348352
&self,
349-
acceptor: &Option<Acceptor>,
353+
acceptor: Option<&ServerResponder>,
350354
sid: u32,
351355
flag: u16,
352356
setup: SetupPayload,
@@ -356,11 +360,7 @@ impl DuplexSocket {
356360
self.responder.set(Box::new(EmptyRSocket)).await;
357361
Ok(())
358362
}
359-
Some(Acceptor::Simple(gen)) => {
360-
self.responder.set(gen()).await;
361-
Ok(())
362-
}
363-
Some(Acceptor::Generate(gen)) => match gen(setup, Box::new(self.clone())) {
363+
Some(gen) => match gen(setup, Box::new(self.clone())) {
364364
Ok(it) => {
365365
self.responder.set(it).await;
366366
Ok(())
@@ -414,7 +414,7 @@ impl DuplexSocket {
414414
Err(e) => {
415415
let sending = frame::Error::builder(sid, 0)
416416
.set_code(error::ERR_APPLICATION)
417-
.set_data(Bytes::from("TODO: should be error details"))
417+
.set_data(Bytes::from(e.to_string()))
418418
.build();
419419
if let Err(e) = tx.send(sending) {
420420
error!("respond REQUEST_RESPONSE failed: {}", e);

rsocket/src/transport/spi.rs

Lines changed: 0 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -15,12 +15,6 @@ use crate::spi::{ClientResponder, RSocket, ServerResponder};
1515
use crate::{error::RSocketError, frame::Frame};
1616
use crate::{Error, Result};
1717

18-
#[derive(Clone)]
19-
pub(crate) enum Acceptor {
20-
Simple(Arc<ClientResponder>),
21-
Generate(Arc<ServerResponder>),
22-
}
23-
2418
pub type FrameSink = dyn Sink<Frame, Error = RSocketError> + Send + Unpin;
2519
pub type FrameStream = dyn Stream<Item = StdResult<Frame, RSocketError>> + Send + Unpin;
2620

rsocket/src/utils.rs

Lines changed: 9 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -60,23 +60,27 @@ pub(crate) struct EmptyRSocket;
6060
#[async_trait]
6161
impl RSocket for EmptyRSocket {
6262
async fn metadata_push(&self, _req: Payload) -> Result<()> {
63-
Err(RSocketError::ApplicationException("UNIMPLEMENT".into()).into())
63+
Err(anyhow!("UNIMPLEMENT"))
6464
}
6565

6666
async fn fire_and_forget(&self, _req: Payload) -> Result<()> {
67-
Err(RSocketError::ApplicationException("UNIMPLEMENT".into()).into())
67+
Err(anyhow!("UNIMPLEMENT"))
6868
}
6969

7070
async fn request_response(&self, _req: Payload) -> Result<Option<Payload>> {
71-
Err(RSocketError::ApplicationException("UNIMPLEMENT".into()).into())
71+
Err(anyhow!("UNIMPLEMENT"))
7272
}
7373

7474
fn request_stream(&self, _req: Payload) -> Flux<Result<Payload>> {
75-
Box::pin(futures::stream::empty())
75+
Box::pin(stream! {
76+
yield Err(anyhow!("UNIMPLEMENT"));
77+
})
7678
}
7779

7880
fn request_channel(&self, _reqs: Flux<Result<Payload>>) -> Flux<Result<Payload>> {
79-
Box::pin(futures::stream::empty())
81+
Box::pin(stream! {
82+
yield Err(anyhow!("UNIMPLEMENT"));
83+
})
8084
}
8185
}
8286

0 commit comments

Comments
 (0)