Skip to content

Commit

Permalink
doki broki
Browse files Browse the repository at this point in the history
  • Loading branch information
decahedron1 committed Feb 8, 2024
1 parent 0b7fac6 commit 9fc2981
Show file tree
Hide file tree
Showing 3 changed files with 109 additions and 43 deletions.
1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ simd-json = { version = "0.13", optional = true }
url = { version = "2.5", optional = true }
rand = { version = "0.8", optional = true }
regex = { version = "1.10", optional = true }
async-stream = "0.3"

[dev-dependencies]
anyhow = "1.0"
Expand Down
80 changes: 48 additions & 32 deletions examples/youtube.rs
Original file line number Diff line number Diff line change
@@ -1,48 +1,64 @@
use std::future::IntoFuture;
use std::{future::IntoFuture, time::Duration};

use brainrot::youtube::{self, YouTubeChatPageProcessor};
use tokio::time::sleep;

#[tokio::main]
async fn main() -> anyhow::Result<()> {
let (options, cont) = youtube::get_options_from_live_page("S144F6Cifyc").await?;
let (options, cont) = youtube::get_options_from_live_page("J2YmJL0PX5M").await?;
let initial_chat = youtube::fetch_yt_chat_page(&options, &cont).await?;
let topic = initial_chat.continuation_contents.as_ref().unwrap().live_chat_continuation.continuations[0]
if let Some(invalidation_continuation) = initial_chat.continuation_contents.as_ref().unwrap().live_chat_continuation.continuations[0]
.invalidation_continuation_data
.as_ref()
.unwrap()
.invalidation_id
.topic
.to_owned();
let subscriber = youtube::SignalerChannel::new(topic).await?;
let (mut receiver, _handle) = subscriber.spawn_event_subscriber().await?;
tokio::spawn(async move {
let mut processor = YouTubeChatPageProcessor::new(initial_chat, &options).unwrap();
for msg in &processor {
println!("{}: {}", msg.author.display_name, msg.runs.iter().map(|c| c.to_string()).collect::<String>());
}
{
let topic = invalidation_continuation.invalidation_id.topic.to_owned();
let subscriber = youtube::SignalerChannel::new(topic).await?;
let (mut receiver, _handle) = subscriber.spawn_event_subscriber().await?;
tokio::spawn(async move {
let mut processor = YouTubeChatPageProcessor::new(initial_chat, &options).unwrap();
for msg in &processor {
println!("{}: {}", msg.author.display_name, msg.runs.iter().map(|c| c.to_string()).collect::<String>());
}

while receiver.recv().await.is_ok() {
match processor.cont().await {
Some(Ok(s)) => {
processor = s;
for msg in &processor {
println!("{}: {}", msg.author.display_name, msg.runs.iter().map(|c| c.to_string()).collect::<String>());
}
while receiver.recv().await.is_ok() {
match processor.cont().await {
Some(Ok(s)) => {
processor = s;
for msg in &processor {
println!("{}: {}", msg.author.display_name, msg.runs.iter().map(|c| c.to_string()).collect::<String>());
}

subscriber.refresh_topic(processor.signaler_topic.as_ref().unwrap()).await;
}
Some(Err(e)) => {
eprintln!("{e:?}");
break;
}
None => {
eprintln!("none");
break;
subscriber.refresh_topic(processor.signaler_topic.as_ref().unwrap()).await;
}
Some(Err(e)) => {
eprintln!("{e:?}");
break;
}
None => {
eprintln!("none");
break;
}
}
}
});
_handle.into_future().await.unwrap();
} else if let Some(timed_continuation) = initial_chat.continuation_contents.as_ref().unwrap().live_chat_continuation.continuations[0]
.timed_continuation_data
.as_ref()
{
let timeout = timed_continuation.timeout_ms as u64;
let mut processor = YouTubeChatPageProcessor::new(initial_chat, &options).unwrap();
loop {
for msg in &processor {
println!("{}: {}", msg.author.display_name, msg.runs.iter().map(|c| c.to_string()).collect::<String>());
}
sleep(Duration::from_millis(timeout as _)).await;
match processor.cont().await {
Some(Ok(e)) => processor = e,
_ => break
}
}
});
_handle.into_future().await.unwrap();
}
println!("???");
Ok(())
}
71 changes: 60 additions & 11 deletions src/youtube/mod.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
use std::{collections::VecDeque, sync::OnceLock};

use futures_util::{Stream, StreamExt};

Check warning on line 3 in src/youtube/mod.rs

View workflow job for this annotation

GitHub Actions / Build and test (ubuntu-latest, stable)

unused import: `StreamExt`

Check warning on line 3 in src/youtube/mod.rs

View workflow job for this annotation

GitHub Actions / Build and test (ubuntu-latest, stable)

unused import: `StreamExt`

Check warning on line 3 in src/youtube/mod.rs

View workflow job for this annotation

GitHub Actions / Build and test (ubuntu-latest, stable)

unused import: `StreamExt`

Check warning on line 3 in src/youtube/mod.rs

View workflow job for this annotation

GitHub Actions / Build and test (ubuntu-latest, stable)

unused import: `StreamExt`

Check warning on line 3 in src/youtube/mod.rs

View workflow job for this annotation

GitHub Actions / Build and test (ubuntu-latest, stable)

unused import: `StreamExt`

Check warning on line 3 in src/youtube/mod.rs

View workflow job for this annotation

GitHub Actions / Build and test (ubuntu-latest, stable)

unused import: `StreamExt`

Check warning on line 3 in src/youtube/mod.rs

View workflow job for this annotation

GitHub Actions / Build and test (macos-latest, stable)

unused import: `StreamExt`

Check warning on line 3 in src/youtube/mod.rs

View workflow job for this annotation

GitHub Actions / Build and test (macos-latest, stable)

unused import: `StreamExt`

Check warning on line 3 in src/youtube/mod.rs

View workflow job for this annotation

GitHub Actions / Build and test (macos-latest, stable)

unused import: `StreamExt`

Check warning on line 3 in src/youtube/mod.rs

View workflow job for this annotation

GitHub Actions / Build and test (windows-latest, stable)

unused import: `StreamExt`

Check warning on line 3 in src/youtube/mod.rs

View workflow job for this annotation

GitHub Actions / Build and test (windows-latest, stable)

unused import: `StreamExt`

Check warning on line 3 in src/youtube/mod.rs

View workflow job for this annotation

GitHub Actions / Build and test (windows-latest, stable)

unused import: `StreamExt`

Check warning on line 3 in src/youtube/mod.rs

View workflow job for this annotation

GitHub Actions / Build and test (macos-latest, stable)

unused import: `StreamExt`

Check warning on line 3 in src/youtube/mod.rs

View workflow job for this annotation

GitHub Actions / Build and test (macos-latest, stable)

unused import: `StreamExt`

Check warning on line 3 in src/youtube/mod.rs

View workflow job for this annotation

GitHub Actions / Build and test (macos-latest, stable)

unused import: `StreamExt`

Check warning on line 3 in src/youtube/mod.rs

View workflow job for this annotation

GitHub Actions / Build and test (windows-latest, stable)

unused import: `StreamExt`

Check warning on line 3 in src/youtube/mod.rs

View workflow job for this annotation

GitHub Actions / Build and test (windows-latest, stable)

unused import: `StreamExt`

Check warning on line 3 in src/youtube/mod.rs

View workflow job for this annotation

GitHub Actions / Build and test (windows-latest, stable)

unused import: `StreamExt`
use regex::Regex;
use reqwest::{
header::{self, HeaderMap, HeaderValue},
Expand Down Expand Up @@ -154,15 +155,17 @@ unsafe impl<'r> Send for YouTubeChatPageProcessor<'r> {}
impl<'r> YouTubeChatPageProcessor<'r> {
pub fn new(response: GetLiveChatResponse, request_options: &'r RequestOptions) -> Result<Self, YouTubeError> {
let continuation_token = if request_options.live_status {
response
let continuation = &response
.continuation_contents
.as_ref()
.ok_or(YouTubeError::MissingContinuationContents)?
.live_chat_continuation
.continuations[0]
.continuations[0];
continuation
.invalidation_continuation_data
.as_ref()
.map(|x| x.continuation.to_owned())
.or_else(|| continuation.timed_continuation_data.as_ref().map(|x| x.continuation.to_owned()))
} else {
response
.continuation_contents
Expand All @@ -175,15 +178,10 @@ impl<'r> YouTubeChatPageProcessor<'r> {
.map(|x| x.continuation.to_owned())
};
let signaler_topic = if request_options.live_status {
Some(
response.continuation_contents.as_ref().unwrap().live_chat_continuation.continuations[0]
.invalidation_continuation_data
.as_ref()
.unwrap()
.invalidation_id
.topic
.to_owned()
)
response.continuation_contents.as_ref().unwrap().live_chat_continuation.continuations[0]
.invalidation_continuation_data
.as_ref()
.map(|c| c.invalidation_id.topic.to_owned())
} else {
None
};
Expand Down Expand Up @@ -297,6 +295,7 @@ impl<'r> Iterator for &YouTubeChatPageProcessor<'r> {

pub async fn fetch_yt_chat_page(options: &RequestOptions, continuation: impl AsRef<str>) -> Result<GetLiveChatResponse, YouTubeError> {
let body = GetLiveChatBody::new(continuation.as_ref(), &options.client_version, "WEB");
println!("{}", simd_json::to_string(&body)?);
let response: GetLiveChatResponse = get_http_client()
.post(Url::parse_with_params(
if options.live_status { TANGO_LIVE_ENDPOINT } else { TANGO_REPLAY_ENDPOINT },
Expand All @@ -307,5 +306,55 @@ pub async fn fetch_yt_chat_page(options: &RequestOptions, continuation: impl AsR
.await?
.simd_json()
.await?;
println!(
"{}",
Url::parse_with_params(
if options.live_status { TANGO_LIVE_ENDPOINT } else { TANGO_REPLAY_ENDPOINT },
[("key", options.api_key.as_str()), ("prettyPrint", "false")]
)?
);
Ok(response)
}

pub async fn stream(
options: &RequestOptions,
continuation: impl AsRef<str>
) -> Result<impl Stream<Item = Result<ChatMessage, YouTubeError>> + '_, YouTubeError> {
let initial_chat = fetch_yt_chat_page(options, continuation).await?;
let topic = initial_chat.continuation_contents.as_ref().unwrap().live_chat_continuation.continuations[0]
.invalidation_continuation_data
.as_ref()
.unwrap()
.invalidation_id
.topic
.to_owned();
let subscriber = SignalerChannel::new(topic).await?;
let (mut receiver, _handle) = subscriber.spawn_event_subscriber().await?;
Ok(async_stream::try_stream! {
let mut processor = YouTubeChatPageProcessor::new(initial_chat, options).unwrap();
for msg in &processor {
yield msg;
}

while receiver.recv().await.is_ok() {
match processor.cont().await {
Some(Ok(s)) => {
processor = s;
for msg in &processor {
yield msg;
}

subscriber.refresh_topic(processor.signaler_topic.as_ref().unwrap()).await;
}
Some(Err(e)) => {
eprintln!("{e:?}");
break;
}
None => {
eprintln!("none");
break;
}
}
}
})
}

0 comments on commit 9fc2981

Please sign in to comment.