Skip to content

Mention Unpin requirement in Stream::by_ref() #2995

@svix-jbrown

Description

@svix-jbrown

Maybe this is just me being silly, but I found the following behavior very surprising:

Given this code:

use futures::stream::{self, Stream, StreamExt};

async fn demo<S: Stream<Item = u32>>(mut stream: S) {
    let sum = stream
        .by_ref()
        .take(2)
        .fold(0, |a, b| async move { a + b })
        .await;
    assert_eq!(sum, 3);
}

fn main() {
    futures::executor::block_on(demo(stream::iter(1..5)));
}

(which is basically the example from the documentation, but with the stream moved to a parameter instead of a literal)

You get this totally unhelpful compile failure:

$ cargo run
   Compiling stream_ref_test v0.1.0 (/Users/jbrown/tmp/2026-02-27/stream_ref_test)
error[E0507]: cannot move out of a mutable reference
    --> src/main.rs:4:15
     |
   4 |       let sum = stream
     |  _______________^
   5 | |         .by_ref()
     | |_________________^ move occurs because value has type `S`, which does not implement the `Copy` trait
   6 |           .take(2)
     |            ------- value moved due to this method call
     |
note: `futures::StreamExt::take` takes ownership of the receiver `self`, which moves value
    --> /Users/jbrown/.cargo/registry/src/index.crates.io-1949cf8c6b5b557f/futures-util-0.3.32/src/stream/stream/mod.rs:1176:13
     |
1176 |     fn take(self, n: usize) -> Take<Self>
     |             ^^^^
help: if `S` implemented `Clone`, you could clone the value
    --> src/main.rs:3:15
     |
   3 |   async fn demo<S: Stream<Item = u32>>(mut stream: S) {
     |                 ^ consider constraining this type parameter with `Clone`
   4 |       let sum = stream
     |  _______________-
   5 | |         .by_ref()
     | |_________________- you could clone this value

For more information about this error, try `rustc --explain E0507`.
error: could not compile `stream_ref_test` (bin "stream_ref_test") due to 1 previous error

That's because the impl for Stream on &mut S has an Unpin bound, so in order to use as_ref() with a stream, the stream has to be Unpin.

use futures::stream::{self, Stream, StreamExt};

async fn demo<S: Stream<Item = u32> + Unpin>(mut stream: S) {
    let sum = stream
        .by_ref()
        .take(2)
        .fold(0, |a, b| async move { a + b })
        .await;
    assert_eq!(sum, 3);
}

fn main() {
    futures::executor::block_on(demo(stream::iter(1..5)));
}

I recognize that the book talks about Unpin a bit, but it might also be worth throwing into the docs for StreamExt::by_ref()? What do y'all think?

Metadata

Metadata

Assignees

No one assigned

    Labels

    No labels
    No labels

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions