Skip to content

Commit

Permalink
collect all events for all sids
Browse files Browse the repository at this point in the history
  • Loading branch information
toshipp committed Dec 31, 2021
1 parent b018406 commit 3b9b71a
Show file tree
Hide file tree
Showing 3 changed files with 10 additions and 10 deletions.
2 changes: 1 addition & 1 deletion Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[package]
name = "tstools"
version = "0.1.5"
version = "0.1.6"
authors = ["toshi_pp <[email protected]>"]
edition = "2021"

Expand Down
16 changes: 8 additions & 8 deletions src/cmd/events.rs
Original file line number Diff line number Diff line change
Expand Up @@ -131,7 +131,7 @@ fn try_into_event(eit: psi::EventInformationSection) -> Result<Vec<Event>> {
Ok(events)
}

async fn find_service_id<S: Stream<Item = ts::TSPacket> + Unpin>(s: &mut S) -> Result<u16> {
async fn find_service_ids<S: Stream<Item = ts::TSPacket> + Unpin>(s: &mut S) -> Result<Vec<u16>> {
let sdt_stream = s.filter(|packet| packet.pid == psi::SDT_PID);
let mut buffer = psi::Buffer::new(sdt_stream);
loop {
Expand All @@ -141,7 +141,7 @@ async fn find_service_id<S: Stream<Item = ts::TSPacket> + Unpin>(s: &mut S) -> R
let table_id = bytes[0];
if table_id == psi::SELF_STREAM_TABLE_ID {
match psi::ServiceDescriptionSection::parse(bytes) {
Ok(sdt) => return Ok(sdt.services[0].service_id),
Ok(sdt) => return Ok(sdt.services.iter().map(|s| s.service_id).collect()),
Err(e) => info!("sdt parse error: {:?}", e),
}
}
Expand All @@ -153,7 +153,7 @@ async fn find_service_id<S: Stream<Item = ts::TSPacket> + Unpin>(s: &mut S) -> R
}

fn packets_to_events<S: Stream<Item = ts::TSPacket> + Unpin>(
sid: u16,
sids: Vec<u16>,
s: S,
) -> impl Stream<Item = Result<Vec<Event>>> {
psi::Buffer::new(s).filter_map(move |bytes| match bytes {
Expand All @@ -163,7 +163,7 @@ fn packets_to_events<S: Stream<Item = ts::TSPacket> + Unpin>(
if 0x4e <= table_id && table_id <= 0x6f {
match psi::EventInformationSection::parse(bytes) {
Ok(eit) => {
if eit.service_id == sid {
if sids.contains(&eit.service_id) {
if let Ok(events) = try_into_event(eit) {
return Some(Ok(events));
}
Expand All @@ -181,15 +181,15 @@ fn packets_to_events<S: Stream<Item = ts::TSPacket> + Unpin>(
}

fn into_event_stream<S: Stream<Item = ts::TSPacket> + Send + 'static + Unpin>(
service_id: u16,
service_ids: Vec<u16>,
mut s: S,
) -> impl Stream<Item = Result<Vec<Event>>> {
let (event_tx, event_rx) = channel(1);
let mut tx_map = HashMap::new();
for pid in ts::EIT_PIDS.iter() {
let (tx, rx) = channel(1);
tx_map.insert(pid, tx);
let mut events_stream = packets_to_events(service_id, ReceiverStream::new(rx));
let mut events_stream = packets_to_events(service_ids.clone(), ReceiverStream::new(rx));
let event_tx = event_tx.clone();
tokio::spawn(async move {
while let Some(events) = events_stream.next().await {
Expand Down Expand Up @@ -230,9 +230,9 @@ pub async fn run(input: Option<PathBuf>) -> Result<()> {
let packets = FramedRead::new(input, ts::TSPacketDecoder::new());
let packets = strip_error_packets(packets);
let mut cueable_packets = cueable(packets);
let sid = find_service_id(&mut cueable_packets).await?;
let sids = find_service_ids(&mut cueable_packets).await?;
let packets = cueable_packets.cue_up();
let events = into_event_stream(sid, packets);
let events = into_event_stream(sids, packets);
let event_map = into_event_map(events).await?;
for e in event_map.values() {
println!("{}", serde_json::to_string(e)?);
Expand Down

0 comments on commit 3b9b71a

Please sign in to comment.