Skip to content

fix: spawn tasks in Stream::buffered and Stream::buffer_unordered to max concurrency #2962

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 4 commits into
base: master
Choose a base branch
from

Conversation

andylokandy
Copy link

@andylokandy andylokandy commented Jul 22, 2025

Closes #2961

@rustbot rustbot added A-stream Area: futures::stream S-waiting-on-review Status: Awaiting review from the assignee but also interested parties. labels Jul 22, 2025
Copy link
Contributor

@tisonkun tisonkun left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM.

cc @taiki-e

@andylokandy andylokandy marked this pull request as draft July 23, 2025 09:42
@rustbot rustbot removed the S-waiting-on-review Status: Awaiting review from the assignee but also interested parties. label Jul 23, 2025
@andylokandy andylokandy marked this pull request as ready for review July 23, 2025 17:17
@rustbot rustbot added the S-waiting-on-review Status: Awaiting review from the assignee but also interested parties. label Jul 23, 2025
@@ -13,20 +14,23 @@ pin_project! {
/// Stream for the [`buffer_unordered`](super::StreamExt::buffer_unordered)
/// method.
#[must_use = "streams do nothing unless polled"]
pub struct BufferUnordered<St>
pub struct BufferUnordered<St, F>
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we fix this in FuturesUnordered? This could actually be a breaking change.

Copy link
Author

@andylokandy andylokandy Jul 24, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we fix this in FuturesUnordered?

I've tried to implement it in andylokandy@1d9fe21.

The short answer is no -- many components in futures -rs relies on the feature of FuturesUnordered not greedily poll internal futures. Instead, buffered is supposed to provide the ability to maximize the concurrency respecting the limit.

x @ Poll::Pending | x @ Poll::Ready(Some(_)) => return x,
Poll::Ready(None) => {}
// Try to poll all ready futures in the in_progress_queue.
loop {
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I believe this fix could be incorrect, as it assumes that tasks are finished in order. And it introduces extra costs.

I have similar issues in opendal, and my final solution is to find a way to track the status of futures and subtract the already finished ones from the maximum concurrent limit.

Do you think it's a good idea to implement similiar thing in FuturesUnordered?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I believe this fix could be incorrect, as it assumes that tasks are finished in order.

Would you like to elaborate more why you think it assumes that tasks are finished in order?

Comment on lines +16 to +19
pub struct Buffered<St, F>
where
St: Stream,
St::Item: Future,
St: Stream<Item = F>,
F: Future,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Seems like a breaking change?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It seems ok since we are on 0.4-alpha

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
A-stream Area: futures::stream S-waiting-on-review Status: Awaiting review from the assignee but also interested parties.
Projects
None yet
Development

Successfully merging this pull request may close these issues.

Buffered and BufferUnordered do not always reach to max concurrency
4 participants