Skip to content

Commit

Permalink
refactor: use async-stream-lite
Browse files Browse the repository at this point in the history
  • Loading branch information
decahedron1 committed Nov 16, 2024
1 parent 0fc2624 commit 7d5cc70
Show file tree
Hide file tree
Showing 2 changed files with 30 additions and 30 deletions.
2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ simd-json = { version = "0.13", optional = true }
url = { version = "2.5", optional = true }
rand = { version = "0.8", optional = true }
regex = { version = "1.10", optional = true }
async-stream = "0.3"
async-stream-lite = "0.1"
pin-project-lite = "0.2"

[dev-dependencies]
Expand Down
58 changes: 29 additions & 29 deletions src/youtube/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,9 +12,10 @@
// See the License for the specific language governing permissions and
// limitations under the License.

use std::{collections::HashSet, io::BufRead, pin::Pin, sync::OnceLock, time::Duration};
use std::{collections::HashSet, io::BufRead, sync::OnceLock, time::Duration};

use futures_util::Stream;
use async_stream_lite::try_async_stream;
use futures_util::stream::BoxStream;
use reqwest::header::{self, HeaderMap, HeaderValue};
use simd_json::base::{ValueAsContainer, ValueAsScalar};
use thiserror::Error;
Expand Down Expand Up @@ -131,41 +132,39 @@ impl<'r> IntoIterator for ActionChunk<'r> {
}
}

pub async fn stream(options: &ChatContext) -> Result<Pin<Box<dyn Stream<Item = Result<Action, Error>> + '_>>, Error> {
pub async fn stream(options: &ChatContext) -> Result<BoxStream<'_, Result<Action, Error>>, Error> {
let initial_chat = GetLiveChatResponse::fetch(options, &options.initial_continuation).await?;

let (mut yield_tx, yield_rx) = unsafe { async_stream::__private::yielder::pair() };

Ok(Box::pin(async_stream::__private::AsyncStream::new(yield_rx, async move {
Ok(Box::pin(try_async_stream(|r#yield| async move {
let mut seen_messages = HashSet::new();

match &initial_chat.continuation_contents.as_ref().unwrap().live_chat_continuation.continuations[0] {
Continuation::Invalidation { invalidation_id, .. } => {
let topic = invalidation_id.topic.to_owned();

let mut chunk = ActionChunk::new(initial_chat, options).unwrap();
let mut chunk = ActionChunk::new(initial_chat, options)?;

let mut channel = SignalerChannelInner::with_topic(topic, options.tango_api_key.as_ref().unwrap());
channel.choose_server().await.unwrap();
channel.init_session().await.unwrap();
channel.choose_server().await?;
channel.init_session().await?;

for action in chunk.iter() {
match action {
Action::AddChatItem { item, .. } => {
if !seen_messages.contains(item.id()) {
yield_tx.send(Ok(action.to_owned())).await;
r#yield(action.to_owned()).await;
seen_messages.insert(item.id().to_owned());
}
}
Action::ReplayChat { actions, .. } => {
for action in actions {
if let Action::AddChatItem { .. } = action.action {
yield_tx.send(Ok(action.action.to_owned())).await;
r#yield(action.action.to_owned()).await;
}
}
}
action => {
yield_tx.send(Ok(action.to_owned())).await;
r#yield(action.to_owned()).await;
}
}
}
Expand All @@ -181,28 +180,28 @@ pub async fn stream(options: &ChatContext) -> Result<Pin<Box<dyn Stream<Item = R
match action {
Action::AddChatItem { item, .. } => {
if !seen_messages.contains(item.id()) {
yield_tx.send(Ok(action.to_owned())).await;
r#yield(action.to_owned()).await;
seen_messages.insert(item.id().to_owned());
}
}
Action::ReplayChat { actions, .. } => {
for action in actions {
if let Action::AddChatItem { .. } = action.action {
yield_tx.send(Ok(action.action.to_owned())).await;
r#yield(action.action.to_owned()).await;
}
}
}
action => {
yield_tx.send(Ok(action.to_owned())).await;
r#yield(action.to_owned()).await;
}
}
}

let mut req = {
channel.reset();
channel.choose_server().await.unwrap();
channel.init_session().await.unwrap();
channel.get_session_stream().await.unwrap()
channel.choose_server().await?;
channel.init_session().await?;
channel.get_session_stream().await?
};
loop {
match req.chunk().await {
Expand All @@ -226,19 +225,19 @@ pub async fn stream(options: &ChatContext) -> Result<Pin<Box<dyn Stream<Item = R
match action {
Action::AddChatItem { item, .. } => {
if !seen_messages.contains(item.id()) {
yield_tx.send(Ok(action.to_owned())).await;
r#yield(action.to_owned()).await;
seen_messages.insert(item.id().to_owned());
}
}
Action::ReplayChat { actions, .. } => {
for action in actions {
if let Action::AddChatItem { .. } = action.action {
yield_tx.send(Ok(action.action.to_owned())).await;
r#yield(action.action.to_owned()).await;
}
}
}
action => {
yield_tx.send(Ok(action.to_owned())).await;
r#yield(action.to_owned()).await;
}
}
}
Expand All @@ -255,22 +254,22 @@ pub async fn stream(options: &ChatContext) -> Result<Pin<Box<dyn Stream<Item = R
}
}
Continuation::Replay { .. } => {
let mut chunk = ActionChunk::new(initial_chat, options).unwrap();
let mut chunk = ActionChunk::new(initial_chat, options)?;
loop {
for action in chunk.iter() {
match action {
Action::AddChatItem { .. } => {
yield_tx.send(Ok(action.to_owned())).await;
r#yield(action.to_owned()).await;
}
Action::ReplayChat { actions, .. } => {
for action in actions {
if let Action::AddChatItem { .. } = action.action {
yield_tx.send(Ok(action.action.to_owned())).await;
r#yield(action.action.to_owned()).await;
}
}
}
action => {
yield_tx.send(Ok(action.to_owned())).await;
r#yield(action.to_owned()).await;
}
}
}
Expand All @@ -282,25 +281,25 @@ pub async fn stream(options: &ChatContext) -> Result<Pin<Box<dyn Stream<Item = R
}
Continuation::Timed { timeout_ms, .. } => {
let timeout = Duration::from_millis(*timeout_ms as _);
let mut chunk = ActionChunk::new(initial_chat, options).unwrap();
let mut chunk = ActionChunk::new(initial_chat, options)?;
loop {
for action in chunk.iter() {
match action {
Action::AddChatItem { item, .. } => {
if !seen_messages.contains(item.id()) {
yield_tx.send(Ok(action.to_owned())).await;
r#yield(action.to_owned()).await;
seen_messages.insert(item.id().to_owned());
}
}
Action::ReplayChat { actions, .. } => {
for action in actions {
if let Action::AddChatItem { .. } = action.action {
yield_tx.send(Ok(action.action.to_owned())).await;
r#yield(action.action.to_owned()).await;
}
}
}
action => {
yield_tx.send(Ok(action.to_owned())).await;
r#yield(action.to_owned()).await;
}
}
}
Expand All @@ -313,5 +312,6 @@ pub async fn stream(options: &ChatContext) -> Result<Pin<Box<dyn Stream<Item = R
}
Continuation::PlayerSeek { .. } => panic!("player seek should not be first continuation")
}
Ok(())
})))
}

0 comments on commit 7d5cc70

Please sign in to comment.