-
-
Notifications
You must be signed in to change notification settings - Fork 2.6k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Introduce mpsc::Receiver peek #7156
base: master
Are you sure you want to change the base?
Conversation
There is unexpected behavior in the |
Thanks! I believe even today we could achieve a peekable receiver by using ReceiverStream. let (tx, rx) = tokio::sync::mpsc::channel::<usize>(10);
let mut stream = ReceiverStream::new(rx).peekable();
let jh = tokio::spawn(async move {
tx.send(1).await.unwrap();
tx.send(2).await.unwrap();
});
assert_eq!(stream.peek().await, Some(&1));
assert_eq!(stream.next().await, Some(1));
assert_eq!(stream.peek().await, Some(&2));
jh.await.unwrap(); My question here is whether we actually want a |
Thanks for the comment @mox692 👍 That's a good point. A few things come to mind:
That said, I'm open to feedback and further discussion on whether this approach is the best fit. Thanks again! |
* Fix docs * Corrected example docs for peek * Doc tests for try_peek
This PR introduces a
peek
method to thempsc::Receiver
. This allows consumers to inspect the next message in the queue without removing it, enabling use cases that require lookahead behavior. Therecv
method has been modified to utilise peek: if a message is available,recv
reads the value, advances the internal queue, and updates the semaphore accordingly.Motivation
Currently, the
mpsc::Receiver
does not provide a way to inspect the next message in the queue without consuming it. Users may want to make decisions based on the upcoming message before actually removing it from the channel.Solution
The peek method retrieves the next message without modifying the queue state. It works by:
block.peek(self.index)
.block.peek(slot_index)
checks if the slot atslot_index
is ready usingready_offset_read
. If ready, it returns a reference to the value without modifying the queue.