Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Introduce mpsc::Receiver peek #7156

Open
wants to merge 6 commits into
base: master
Choose a base branch
from
Open

Introduce mpsc::Receiver peek #7156

wants to merge 6 commits into from

Conversation

srxg
Copy link

@srxg srxg commented Feb 14, 2025

This PR introduces a peek method to the mpsc::Receiver. This allows consumers to inspect the next message in the queue without removing it, enabling use cases that require lookahead behavior. The recv method has been modified to utilise peek: if a message is available, recv reads the value, advances the internal queue, and updates the semaphore accordingly.

Motivation

Currently, the mpsc::Receiver does not provide a way to inspect the next message in the queue without consuming it. Users may want to make decisions based on the upcoming message before actually removing it from the channel.

Solution

The peek method retrieves the next message without modifying the queue state. It works by:

  1. Calling try_peeking_ahead() to determine the correct block to peek from, handling block transitions if needed.
  2. Returning the peeked value via block.peek(self.index).
  3. block.peek(slot_index) checks if the slot at slot_index is ready using ready_offset_read. If ready, it returns a reference to the value without modifying the queue.

@github-actions github-actions bot added the R-loom-sync Run loom sync tests on this PR label Feb 14, 2025
@srxg
Copy link
Author

srxg commented Feb 14, 2025

There is unexpected behavior in the contention/bounded_full benchmark in benches/sync_mpsc.rs. Occasionally, tx.send() fails with SendError, and the sender reports as closed. However, recv() never appears to return None. Further investigation is needed.

@Darksonn Darksonn added A-tokio Area: The main tokio crate M-sync Module: tokio/sync labels Feb 14, 2025
@srxg srxg marked this pull request as ready for review February 14, 2025 18:38
@mox692
Copy link
Member

mox692 commented Feb 24, 2025

Thanks!

I believe even today we could achieve a peekable receiver by using ReceiverStream.

let (tx, rx) = tokio::sync::mpsc::channel::<usize>(10);

let mut stream = ReceiverStream::new(rx).peekable();

let jh = tokio::spawn(async move {
    tx.send(1).await.unwrap();
    tx.send(2).await.unwrap();
});

assert_eq!(stream.peek().await, Some(&1));
assert_eq!(stream.next().await, Some(1));
assert_eq!(stream.peek().await, Some(&2));

jh.await.unwrap();

My question here is whether we actually want a peek method on mpsc.

@srxg
Copy link
Author

srxg commented Feb 25, 2025

Thanks for the comment @mox692 👍

That's a good point. A few things come to mind:

  1. Peekable::peek requires mutable access, whereas the proposed Receiver::peek only requires &self. This is more aligned with the non-destructive intent of the operation, imo. Although, if changes are required to the proposed peek which require mutable access, then we can certainly revisit this.
  2. Receiver::peek, simplifies things for users who don't need the full stream functionality but still want peek behaviour.

That said, I'm open to feedback and further discussion on whether this approach is the best fit. Thanks again!

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
A-tokio Area: The main tokio crate M-sync Module: tokio/sync R-loom-sync Run loom sync tests on this PR
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants