Skip to content

Commit 24e3424

Browse files
nanoqshnotgull
authored andcommitted
feat: add StreamExt::map_while
1 parent 83bcf1e commit 24e3424

File tree

1 file changed

+55
-0
lines changed

1 file changed

+55
-0
lines changed

src/stream.rs

Lines changed: 55 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1137,6 +1137,33 @@ pub trait StreamExt: Stream {
11371137
}
11381138
}
11391139

1140+
/// Maps items while `predicate` returns [`Some`].
1141+
///
1142+
/// # Examples
1143+
///
1144+
/// ```
1145+
/// use futures_lite::stream::{self, StreamExt};
1146+
///
1147+
/// # spin_on::spin_on(async {
1148+
/// let s = stream::iter(vec![1, 2, 0, 3]);
1149+
/// let mut s = s.map_while(|x: u32| x.checked_sub(1));
1150+
///
1151+
/// assert_eq!(s.next().await, Some(0));
1152+
/// assert_eq!(s.next().await, Some(1));
1153+
/// assert_eq!(s.next().await, None);
1154+
/// # });
1155+
/// ```
1156+
fn map_while<B, P>(self, predicate: P) -> MapWhile<Self, P>
1157+
where
1158+
Self: Sized,
1159+
P: FnMut(Self::Item) -> Option<B>,
1160+
{
1161+
MapWhile {
1162+
stream: self,
1163+
predicate,
1164+
}
1165+
}
1166+
11401167
/// Skips the first `n` items of the stream.
11411168
///
11421169
/// # Examples
@@ -2790,6 +2817,34 @@ where
27902817
}
27912818
}
27922819

2820+
pin_project! {
2821+
/// Stream for the [`StreamExt::map_while()`] method.
2822+
#[derive(Clone, Debug)]
2823+
#[must_use = "streams do nothing unless polled"]
2824+
pub struct MapWhile<S, P> {
2825+
#[pin]
2826+
stream: S,
2827+
predicate: P,
2828+
}
2829+
}
2830+
2831+
impl<B, S, P> Stream for MapWhile<S, P>
2832+
where
2833+
S: Stream,
2834+
P: FnMut(S::Item) -> Option<B>,
2835+
{
2836+
type Item = B;
2837+
2838+
fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
2839+
let this = self.project();
2840+
2841+
match ready!(this.stream.poll_next(cx)) {
2842+
Some(v) => Poll::Ready((this.predicate)(v)),
2843+
None => Poll::Ready(None),
2844+
}
2845+
}
2846+
}
2847+
27932848
pin_project! {
27942849
/// Stream for the [`StreamExt::skip()`] method.
27952850
#[derive(Clone, Debug)]

0 commit comments

Comments
 (0)