Skip to content

fix: spawn tasks in Stream::buffered and Stream::buffer_unordered to max concurrency #2962

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 4 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
56 changes: 34 additions & 22 deletions futures-util/src/stream/stream/buffer_unordered.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
use crate::stream::{Fuse, FuturesUnordered, StreamExt};
use alloc::vec::Vec;
use core::fmt;
use core::num::NonZeroUsize;
use core::pin::Pin;
Expand All @@ -13,20 +14,23 @@ pin_project! {
/// Stream for the [`buffer_unordered`](super::StreamExt::buffer_unordered)
/// method.
#[must_use = "streams do nothing unless polled"]
pub struct BufferUnordered<St>
pub struct BufferUnordered<St, F>
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we fix this in FuturesUnordered? This could actually be a breaking change.

Copy link
Author

@andylokandy andylokandy Jul 24, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we fix this in FuturesUnordered?

I've tried to implement it in andylokandy@1d9fe21.

The short answer is no -- many components in futures -rs relies on the feature of FuturesUnordered not greedily poll internal futures. Instead, buffered is supposed to provide the ability to maximize the concurrency respecting the limit.

where
St: Stream,
St: Stream<Item = F>,
F: Future,
{
#[pin]
stream: Fuse<St>,
in_progress_queue: FuturesUnordered<St::Item>,
ready_queue: Vec<F::Output>,
max: Option<NonZeroUsize>,
}
}

impl<St> fmt::Debug for BufferUnordered<St>
impl<St, F> fmt::Debug for BufferUnordered<St, F>
where
St: Stream + fmt::Debug,
St: Stream<Item = F> + fmt::Debug,
F: Future,
{
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.debug_struct("BufferUnordered")
Expand All @@ -37,26 +41,27 @@ where
}
}

impl<St> BufferUnordered<St>
impl<St, F> BufferUnordered<St, F>
where
St: Stream,
St::Item: Future,
St: Stream<Item = F>,
F: Future,
{
pub(super) fn new(stream: St, n: Option<usize>) -> Self {
Self {
stream: super::Fuse::new(stream),
in_progress_queue: FuturesUnordered::new(),
ready_queue: Vec::new(),
max: n.and_then(NonZeroUsize::new),
}
}

delegate_access_inner!(stream, St, (.));
}

impl<St> Stream for BufferUnordered<St>
impl<St, F> Stream for BufferUnordered<St, F>
where
St: Stream,
St::Item: Future,
St: Stream<Item = F>,
F: Future,
{
type Item = <St::Item as Future>::Output;

Expand All @@ -72,14 +77,21 @@ where
}
}

// Attempt to pull the next value from the in_progress_queue
match this.in_progress_queue.poll_next_unpin(cx) {
x @ Poll::Pending | x @ Poll::Ready(Some(_)) => return x,
Poll::Ready(None) => {}
// Try to poll all ready futures in the in_progress_queue.
loop {
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I believe this fix could be incorrect, as it assumes that tasks are finished in order. And it introduces extra costs.

I have similar issues in opendal, and my final solution is to find a way to track the status of futures and subtract the already finished ones from the maximum concurrent limit.

Do you think it's a good idea to implement similiar thing in FuturesUnordered?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I believe this fix could be incorrect, as it assumes that tasks are finished in order.

Would you like to elaborate more why you think it assumes that tasks are finished in order?

match this.in_progress_queue.poll_next_unpin(cx) {
Poll::Ready(Some(output)) => {
this.ready_queue.push(output);
}
Poll::Ready(None) => break,
Poll::Pending => break,
}
}

// If more values are still coming from the stream, we're not done yet
if this.stream.is_done() {
if let Some(output) = this.ready_queue.pop() {
// If we have any ready outputs, return the first one.
Poll::Ready(Some(output))
} else if this.stream.is_done() && this.in_progress_queue.is_empty() {
Poll::Ready(None)
} else {
Poll::Pending
Expand All @@ -98,10 +110,10 @@ where
}
}

impl<St> FusedStream for BufferUnordered<St>
impl<St, F> FusedStream for BufferUnordered<St, F>
where
St: Stream,
St::Item: Future,
St: Stream<Item = F>,
F: Future,
{
fn is_terminated(&self) -> bool {
self.in_progress_queue.is_terminated() && self.stream.is_terminated()
Expand All @@ -110,10 +122,10 @@ where

// Forwarding impl of Sink from the underlying stream
#[cfg(feature = "sink")]
impl<S, Item> Sink<Item> for BufferUnordered<S>
impl<S, F, Item> Sink<Item> for BufferUnordered<S, F>
where
S: Stream + Sink<Item>,
S::Item: Future,
S: Stream<Item = F> + Sink<Item>,
F: Future,
{
type Error = S::Error;

Expand Down
59 changes: 34 additions & 25 deletions futures-util/src/stream/stream/buffered.rs
Original file line number Diff line number Diff line change
@@ -1,9 +1,9 @@
use crate::stream::{Fuse, FusedStream, FuturesOrdered, StreamExt};
use alloc::collections::VecDeque;
use core::fmt;
use core::num::NonZeroUsize;
use core::pin::Pin;
use futures_core::future::Future;
use futures_core::ready;
use futures_core::stream::Stream;
use futures_core::task::{Context, Poll};
#[cfg(feature = "sink")]
Expand All @@ -13,22 +13,23 @@ use pin_project_lite::pin_project;
pin_project! {
/// Stream for the [`buffered`](super::StreamExt::buffered) method.
#[must_use = "streams do nothing unless polled"]
pub struct Buffered<St>
pub struct Buffered<St, F>
where
St: Stream,
St::Item: Future,
St: Stream<Item = F>,
F: Future,
Comment on lines +16 to +19
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Seems like a breaking change?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It seems ok since we are on 0.4-alpha

{
#[pin]
stream: Fuse<St>,
in_progress_queue: FuturesOrdered<St::Item>,
ready_queue: VecDeque<F::Output>,
max: Option<NonZeroUsize>,
}
}

impl<St> fmt::Debug for Buffered<St>
impl<St, F> fmt::Debug for Buffered<St, F>
where
St: Stream + fmt::Debug,
St::Item: Future,
St: Stream<Item = F> + fmt::Debug,
F: Future,
{
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.debug_struct("Buffered")
Expand All @@ -39,26 +40,27 @@ where
}
}

impl<St> Buffered<St>
impl<St, F> Buffered<St, F>
where
St: Stream,
St::Item: Future,
St: Stream<Item = F>,
F: Future,
{
pub(super) fn new(stream: St, n: Option<usize>) -> Self {
Self {
stream: super::Fuse::new(stream),
in_progress_queue: FuturesOrdered::new(),
ready_queue: VecDeque::new(),
max: n.and_then(NonZeroUsize::new),
}
}

delegate_access_inner!(stream, St, (.));
}

impl<St> Stream for Buffered<St>
impl<St, F> Stream for Buffered<St, F>
where
St: Stream,
St::Item: Future,
St: Stream<Item = F>,
F: Future,
{
type Item = <St::Item as Future>::Output;

Expand All @@ -74,14 +76,21 @@ where
}
}

// Attempt to pull the next value from the in_progress_queue
let res = this.in_progress_queue.poll_next_unpin(cx);
if let Some(val) = ready!(res) {
return Poll::Ready(Some(val));
// Try to poll all ready futures in the in_progress_queue.
loop {
match this.in_progress_queue.poll_next_unpin(cx) {
Poll::Ready(Some(output)) => {
this.ready_queue.push_back(output);
}
Poll::Ready(None) => break,
Poll::Pending => break,
}
}

// If more values are still coming from the stream, we're not done yet
if this.stream.is_done() {
if let Some(output) = this.ready_queue.pop_front() {
// If we have any ready outputs, return the first one.
Poll::Ready(Some(output))
} else if this.stream.is_done() && this.in_progress_queue.is_empty() {
Poll::Ready(None)
} else {
Poll::Pending
Expand All @@ -100,10 +109,10 @@ where
}
}

impl<St> FusedStream for Buffered<St>
impl<St, F> FusedStream for Buffered<St, F>
where
St: Stream,
St::Item: Future,
St: Stream<Item = F>,
F: Future,
{
fn is_terminated(&self) -> bool {
self.stream.is_done() && self.in_progress_queue.is_terminated()
Expand All @@ -112,10 +121,10 @@ where

// Forwarding impl of Sink from the underlying stream
#[cfg(feature = "sink")]
impl<S, Item> Sink<Item> for Buffered<S>
impl<S, F, Item> Sink<Item> for Buffered<S, F>
where
S: Stream + Sink<Item>,
S::Item: Future,
S: Stream<Item = F> + Sink<Item>,
F: Future,
{
type Error = S::Error;

Expand Down
10 changes: 6 additions & 4 deletions futures-util/src/stream/stream/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1487,10 +1487,11 @@ pub trait StreamExt: Stream {
/// library is activated, and it is activated by default.
#[cfg_attr(target_os = "none", cfg(target_has_atomic = "ptr"))]
#[cfg(feature = "alloc")]
fn buffered(self, n: impl Into<Option<usize>>) -> Buffered<Self>
fn buffered<F>(self, n: impl Into<Option<usize>>) -> Buffered<Self, F>
where
Self::Item: Future,
Self: Sized,
Self: Stream<Item = F>,
F: Future,
{
assert_stream::<<Self::Item as Future>::Output, _>(Buffered::new(self, n.into()))
}
Expand Down Expand Up @@ -1536,10 +1537,11 @@ pub trait StreamExt: Stream {
/// ```
#[cfg_attr(target_os = "none", cfg(target_has_atomic = "ptr"))]
#[cfg(feature = "alloc")]
fn buffer_unordered(self, n: impl Into<Option<usize>>) -> BufferUnordered<Self>
fn buffer_unordered<F>(self, n: impl Into<Option<usize>>) -> BufferUnordered<Self, F>
where
Self::Item: Future,
Self: Sized,
Self: Stream<Item = F>,
F: Future,
{
assert_stream::<<Self::Item as Future>::Output, _>(BufferUnordered::new(self, n.into()))
}
Expand Down
39 changes: 21 additions & 18 deletions futures/tests/auto_traits.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1112,24 +1112,27 @@ mod stream {
assert_not_impl!(AndThen<PhantomPinned, (), ()>: Unpin);
assert_not_impl!(AndThen<(), PhantomPinned, ()>: Unpin);

assert_impl!(BufferUnordered<SendStream<()>>: Send);
assert_not_impl!(BufferUnordered<SendStream>: Send);
assert_not_impl!(BufferUnordered<LocalStream>: Send);
assert_impl!(BufferUnordered<SyncStream<()>>: Sync);
assert_not_impl!(BufferUnordered<SyncStream>: Sync);
assert_not_impl!(BufferUnordered<LocalStream>: Sync);
assert_impl!(BufferUnordered<UnpinStream>: Unpin);
assert_not_impl!(BufferUnordered<PinnedStream>: Unpin);

assert_impl!(Buffered<SendStream<SendFuture<()>>>: Send);
assert_not_impl!(Buffered<SendStream<SendFuture>>: Send);
assert_not_impl!(Buffered<SendStream<LocalFuture>>: Send);
assert_not_impl!(Buffered<LocalStream<SendFuture<()>>>: Send);
assert_impl!(Buffered<SyncStream<SendSyncFuture<()>>>: Sync);
assert_not_impl!(Buffered<SyncStream<SyncFuture<()>>>: Sync);
assert_not_impl!(Buffered<LocalStream<SendSyncFuture<()>>>: Sync);
assert_impl!(Buffered<UnpinStream<PinnedFuture>>: Unpin);
assert_not_impl!(Buffered<PinnedStream<PinnedFuture>>: Unpin);
assert_impl!(BufferUnordered<SendStream<SendFuture<()>>, SendFuture<()>>: Send);
assert_not_impl!(BufferUnordered<SendStream<SendFuture>, SendFuture>: Send);
assert_not_impl!(BufferUnordered<SendStream<LocalFuture>, LocalFuture>: Send);
assert_not_impl!(BufferUnordered<LocalStream<LocalFuture>, LocalFuture>: Send);
assert_impl!(BufferUnordered<SyncStream<SendSyncFuture<()>>, SendSyncFuture<()>>: Sync);
assert_not_impl!(BufferUnordered<SyncStream<SyncFuture<()>>, SyncFuture<()>>: Sync);
assert_not_impl!(BufferUnordered<SyncStream<LocalFuture>, LocalFuture>: Sync);
assert_not_impl!(BufferUnordered<LocalStream<LocalFuture>, LocalFuture>: Sync);
assert_impl!(BufferUnordered<UnpinStream<UnpinFuture>, UnpinFuture>: Unpin);
assert_not_impl!(BufferUnordered<PinnedStream<PinnedFuture>, PinnedFuture>: Unpin);

assert_impl!(Buffered<SendStream<SendFuture<()>>, SendFuture<()>>: Send);
assert_not_impl!(Buffered<SendStream<SendFuture>, SendFuture>: Send);
assert_not_impl!(Buffered<SendStream<LocalFuture>, LocalFuture>: Send);
assert_not_impl!(Buffered<LocalStream<LocalFuture>, LocalFuture>: Send);
assert_impl!(Buffered<SyncStream<SendSyncFuture<()>>, SendSyncFuture<()>>: Sync);
assert_not_impl!(Buffered<SyncStream<SyncFuture<()>>, SyncFuture<()>>: Sync);
assert_not_impl!(Buffered<SyncStream<LocalFuture>, LocalFuture>: Sync);
assert_not_impl!(Buffered<LocalStream<LocalFuture>, LocalFuture>: Sync);
assert_impl!(Buffered<UnpinStream<UnpinFuture>, UnpinFuture>: Unpin);
assert_not_impl!(Buffered<PinnedStream<PinnedFuture>, PinnedFuture>: Unpin);

assert_impl!(CatchUnwind<SendStream>: Send);
assert_not_impl!(CatchUnwind<LocalStream>: Send);
Expand Down
Loading