Skip to content

Commit aa970fe

Browse files
committed
Replace genawaiter based nice impl with manual state machine based ugly impl.
1 parent 9392eb3 commit aa970fe

File tree

1 file changed

+40
-35
lines changed

1 file changed

+40
-35
lines changed

src/util/irpc.rs

Lines changed: 40 additions & 35 deletions
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,10 @@
11
use std::{future::Future, io};
22

3-
use genawaiter::sync::Gen;
4-
use irpc::{channel::mpsc, RpcMessage};
5-
use n0_future::{Stream, StreamExt};
3+
use irpc::{
4+
channel::{mpsc, RecvError},
5+
RpcMessage,
6+
};
7+
use n0_future::{stream, Stream, StreamExt};
68

79
/// Trait for an enum that has three variants, item, error, and done.
810
///
@@ -135,38 +137,41 @@ where
135137
E: From<irpc::Error>,
136138
E: From<irpc::channel::RecvError>,
137139
{
138-
Gen::new(move |co| async move {
139-
let mut rx = match self.await {
140-
Ok(rx) => rx,
141-
Err(e) => {
142-
co.yield_(Err(E::from(e))).await;
143-
return;
144-
}
145-
};
146-
loop {
147-
match rx.recv().await {
148-
Ok(Some(item)) => match item.into_result_opt() {
149-
Some(Ok(i)) => co.yield_(Ok(i)).await,
150-
Some(Err(e)) => {
151-
co.yield_(Err(E::from(e))).await;
152-
break;
153-
}
154-
None => break,
155-
},
156-
Ok(None) => {
157-
co.yield_(Err(E::from(irpc::channel::RecvError::Io(io::Error::new(
158-
io::ErrorKind::UnexpectedEof,
159-
"unexpected end of stream",
160-
)))))
161-
.await;
162-
break;
163-
}
164-
Err(e) => {
165-
co.yield_(Err(E::from(e))).await;
166-
break;
167-
}
168-
}
140+
enum State<S, T> {
141+
Init(S),
142+
Receiving(mpsc::Receiver<T>),
143+
Done,
144+
}
145+
fn eof() -> RecvError {
146+
io::Error::new(io::ErrorKind::UnexpectedEof, "unexpected end of stream").into()
147+
}
148+
async fn process_recv<S, T, E>(
149+
mut rx: mpsc::Receiver<T>,
150+
) -> Option<(std::result::Result<T::Item, E>, State<S, T>)>
151+
where
152+
T: IrpcStreamItem,
153+
E: From<T::Error>,
154+
E: From<irpc::Error>,
155+
E: From<RecvError>,
156+
{
157+
match rx.recv().await {
158+
Ok(Some(item)) => match item.into_result_opt()? {
159+
Ok(i) => Some((Ok(i), State::Receiving(rx))),
160+
Err(e) => Some((Err(E::from(e)), State::Done)),
161+
},
162+
Ok(None) => Some((Err(E::from(eof())), State::Done)),
163+
Err(e) => Some((Err(E::from(e)), State::Done)),
164+
}
165+
}
166+
Box::pin(stream::unfold(State::Init(self), |state| async move {
167+
match state {
168+
State::Init(fut) => match fut.await {
169+
Ok(rx) => process_recv(rx).await,
170+
Err(e) => Some((Err(E::from(e)), State::Done)),
171+
},
172+
State::Receiving(rx) => process_recv(rx).await,
173+
State::Done => None,
169174
}
170-
})
175+
}))
171176
}
172177
}

0 commit comments

Comments
 (0)