Skip to content

Conversation

srxg
Copy link
Contributor

@srxg srxg commented Feb 14, 2025

This PR introduces a peek method to the mpsc::Receiver. This allows consumers to inspect the next message in the queue without removing it, enabling use cases that require lookahead behavior. The recv method has been modified to utilise peek: if a message is available, recv reads the value, advances the internal queue, and updates the semaphore accordingly.

Motivation

Currently, the mpsc::Receiver does not provide a way to inspect the next message in the queue without consuming it. Users may want to make decisions based on the upcoming message before actually removing it from the channel.

Solution

The peek method retrieves the next message without modifying the queue state. It works by:

  1. Calling try_peeking_ahead() to determine the correct block to peek from, handling block transitions if needed.
  2. Returning the peeked value via block.peek(self.index).
  3. block.peek(slot_index) checks if the slot at slot_index is ready using ready_offset_read. If ready, it returns a reference to the value without modifying the queue.

@github-actions github-actions bot added the R-loom-sync Run loom sync tests on this PR label Feb 14, 2025
@srxg
Copy link
Contributor Author

srxg commented Feb 14, 2025

There is unexpected behavior in the contention/bounded_full benchmark in benches/sync_mpsc.rs. Occasionally, tx.send() fails with SendError, and the sender reports as closed. However, recv() never appears to return None. Further investigation is needed.

@Darksonn Darksonn added A-tokio Area: The main tokio crate M-sync Module: tokio/sync labels Feb 14, 2025
@srxg srxg marked this pull request as ready for review February 14, 2025 18:38
@mox692
Copy link
Member

mox692 commented Feb 24, 2025

Thanks!

I believe even today we could achieve a peekable receiver by using ReceiverStream.

let (tx, rx) = tokio::sync::mpsc::channel::<usize>(10);

let mut stream = ReceiverStream::new(rx).peekable();

let jh = tokio::spawn(async move {
    tx.send(1).await.unwrap();
    tx.send(2).await.unwrap();
});

assert_eq!(stream.peek().await, Some(&1));
assert_eq!(stream.next().await, Some(1));
assert_eq!(stream.peek().await, Some(&2));

jh.await.unwrap();

My question here is whether we actually want a peek method on mpsc.

@srxg
Copy link
Contributor Author

srxg commented Feb 25, 2025

Thanks for the comment @mox692 👍

That's a good point. A few things come to mind:

  1. Peekable::peek requires mutable access, whereas the proposed Receiver::peek only requires &self. This is more aligned with the non-destructive intent of the operation, imo. Although, if changes are required to the proposed peek which require mutable access, then we can certainly revisit this.
  2. Receiver::peek, simplifies things for users who don't need the full stream functionality but still want peek behaviour.

That said, I'm open to feedback and further discussion on whether this approach is the best fit. Thanks again!

@srxg srxg closed this Aug 11, 2025
@github-actions github-actions bot removed the R-loom-sync Run loom sync tests on this PR label Aug 11, 2025
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
A-tokio Area: The main tokio crate M-sync Module: tokio/sync
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants