Skip to content

feat: Add StreamExt::switch#2997

Open
Hywan wants to merge 1 commit intorust-lang:masterfrom
Hywan:feat-util-switch
Open

feat: Add StreamExt::switch#2997
Hywan wants to merge 1 commit intorust-lang:masterfrom
Hywan:feat-util-switch

Conversation

@Hywan
Copy link

@Hywan Hywan commented Mar 5, 2026

This patch introduces the Switch combinator stream: it flattens a higher-order stream into a first-order stream.

This combinator flattens a stream of streams, i.e. an outer stream yielding inner streams. This combinator always keeps the most recently yielded inner stream, and yields items from it, until the outer stream produces a new inner stream, at which point the inner stream to yield items from is switched to the new one.

Examples

An empty (outer) stream can be switched:

use futures::stream::{self, Empty, StreamExt, Switch};

let stream: Switch<Empty<Empty<()>>> = stream::empty().switch();

assert!(stream.collect::<Vec<_>>().await.is_empty());

An outer stream can produce several inner streams — once switched, the last one is immediately selected:

use futures::stream::{self, StreamExt};

let stream = stream::iter([
    stream::iter([1, 2, 3]),
    stream::iter([4, 5, 6]),
    stream::iter([7, 8, 9]),
])
.switch();

assert_eq!(vec![7, 8, 9], stream.collect::<Vec<_>>().await);

One of the most interesting usecase is when an outer stream produces new inner streams dynamically. Let's imagine the outer stream produces inner streams yielding a sequence of integers starting from a particular value. For example:

  • outer stream yields 7:
    • inner stream yields 7, 8, 9, 10, 11…
  • outer stream yields 42:
    • inner stream yields 42, 43, 44, 45, 46…
use futures::channel::mpsc;
use futures::stream::{self, StreamExt};
use futures::task::Poll;
use std::pin::pin;

// `receiver` is the outer stream: it yields integers received by
// `sender`.
let (sender, receiver) = mpsc::unbounded::<u8>();

let mut stream = pin!(receiver
    // For every new received value from `sender`…
    .map(|init_value| {
        // … let's create a new inner stream.
        //
        // First off, the inner stream starts with `init_value`. Then,
        // it continues with incremented integers.
        let mut next_value = init_value;

        stream::poll_fn(move |_| {
            let current_value = next_value;
            next_value += 1;

            Poll::Ready(Some(current_value))
        })
    })
    .switch());

// `stream` is pending until the outer stream yields something.
sender.unbounded_send(7).unwrap();

// `stream` has switched to the inner stream.
assert_eq!(vec![7, 8, 9, 10, 11], stream.by_ref().take(5).collect::<Vec<_>>().await);
assert_eq!(vec![12, 13, 14, 15, 16], stream.by_ref().take(5).collect::<Vec<_>>().await);

// The outer stream will yield a new value, which will create a new
// inner stream.
sender.unbounded_send(42).unwrap();

// `stream` has been “reset” and will produce a new inner stream.
assert_eq!(vec![42, 43, 44, 45, 46], stream.take(5).collect::<Vec<_>>().await);

This is a port of async_rx::Switch, written by @jplatte, and maintained by @jplatte and I. The test suite has been improved compared to the original repository.

I believe this combinator is useful when one wants to generate streams dynamically (the inner streams) based on another input (managed by the outer stream).

This patch introduces the `Switch` combinator stream: it flattens a
higher-order stream into a first-order stream.

This combinator flattens a stream of streams, i.e. an outer stream
yielding inner streams. This combinator always keeps the most recently
yielded inner stream, and yields items from it, until the outer stream
produces a new inner stream, at which point the inner stream to yield
items from is switched to the new one.

Examples
--------

An empty (outer) stream can be switched:

```
\# futures::executor::block_on(async {
use futures::stream::{self, Empty, StreamExt, Switch};

let stream: Switch<Empty<Empty<()>>> = stream::empty().switch();

assert!(stream.collect::<Vec<_>>().await.is_empty());
\# });
```

An outer stream can produce several inner streams — once switched, the
last one is immediately selected:

```
\# futures::executor::block_on(async {
use futures::stream::{self, StreamExt};

let stream = stream::iter([
    stream::iter([1, 2, 3]),
    stream::iter([4, 5, 6]),
    stream::iter([7, 8, 9]),
])
.switch();

assert_eq!(vec![7, 8, 9], stream.collect::<Vec<_>>().await);
\# });
```

One of the most interesting usecase is when an outer stream produces new
inner  streams dynamically. Let's imagine the outer stream produces inner
streams yielding a sequence of integers starting from a particular value.
For example:

- outer stream yields 7:
  - inner stream yields 7, 8, 9, 10, 11…
- outer stream yields 42:
  - inner stream yields 42, 43, 44, 45, 46…

```
\# futures::executor::block_on(async {
use futures::channel::mpsc;
use futures::stream::{self, StreamExt};
use futures::task::Poll;
use std::pin::pin;

// `receiver` is the outer stream: it yields integers received by
// `sender`.
let (sender, receiver) = mpsc::unbounded::<u8>();

let mut stream = pin!(receiver
    // For every new received value from `sender`…
    .map(|init_value| {
        // … let's create a new inner stream.
        //
        // First off, the inner stream starts with `init_value`. Then,
        // it continues with incremented integers.
        let mut next_value = init_value;

        stream::poll_fn(move |_| {
            let current_value = next_value;
            next_value += 1;

            Poll::Ready(Some(current_value))
        })
    })
    .switch());

// `stream` is pending until the outer stream yields something.
sender.unbounded_send(7).unwrap();

// `stream` has switched to the inner stream.
assert_eq!(vec![7, 8, 9, 10, 11], stream.by_ref().take(5).collect::<Vec<_>>().await);
assert_eq!(vec![12, 13, 14, 15, 16], stream.by_ref().take(5).collect::<Vec<_>>().await);

// The outer stream will yield a new value, which will create a new
// inner stream.
sender.unbounded_send(42).unwrap();

// `stream` has been “reset” and will produce a new inner stream.
assert_eq!(vec![42, 43, 44, 45, 46], stream.take(5).collect::<Vec<_>>().await);
\# });
```
@rustbot rustbot added the A-stream Area: futures::stream label Mar 5, 2026
@Hywan Hywan marked this pull request as ready for review March 5, 2026 13:44
@rustbot rustbot added the S-waiting-on-review Status: Awaiting review from the assignee but also interested parties. label Mar 5, 2026
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

A-stream Area: futures::stream S-waiting-on-review Status: Awaiting review from the assignee but also interested parties.

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants