Skip to content

Commit

Permalink
ignore buffer errors on events
Browse files Browse the repository at this point in the history
  • Loading branch information
toshipp committed Jan 29, 2023
1 parent dcc28b9 commit 7609612
Show file tree
Hide file tree
Showing 2 changed files with 13 additions and 8 deletions.
2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[package]
name = "tstools"
version = "0.1.7"
version = "0.1.8"
authors = ["toshi_pp <[email protected]>"]
edition = "2021"

Expand Down
19 changes: 12 additions & 7 deletions src/cmd/events.rs
Original file line number Diff line number Diff line change
Expand Up @@ -146,7 +146,9 @@ async fn find_service_ids<S: Stream<Item = ts::TSPacket> + Unpin>(s: &mut S) ->
}
}
}
Some(Err(e)) => return Err(e),
Some(Err(e)) => {
info!("find_service_id: {:?}", e);
}
None => bail!("no sid found"),
}
}
Expand All @@ -155,7 +157,7 @@ async fn find_service_ids<S: Stream<Item = ts::TSPacket> + Unpin>(s: &mut S) ->
fn packets_to_events<S: Stream<Item = ts::TSPacket> + Unpin>(
sids: Vec<u16>,
s: S,
) -> impl Stream<Item = Result<Vec<Event>>> {
) -> impl Stream<Item = Vec<Event>> {
psi::Buffer::new(s).filter_map(move |bytes| match bytes {
Ok(bytes) => {
let bytes = &bytes[..];
Expand All @@ -165,7 +167,7 @@ fn packets_to_events<S: Stream<Item = ts::TSPacket> + Unpin>(
Ok(eit) => {
if sids.contains(&eit.service_id) {
if let Ok(events) = try_into_event(eit) {
return Some(Ok(events));
return Some(events);
}
}
}
Expand All @@ -176,14 +178,17 @@ fn packets_to_events<S: Stream<Item = ts::TSPacket> + Unpin>(
}
None
}
Err(e) => Some(Err(e)),
Err(e) => {
info!("packets_to_events: {:?}", e);
None
}
})
}

fn into_event_stream<S: Stream<Item = ts::TSPacket> + Send + 'static + Unpin>(
service_ids: Vec<u16>,
mut s: S,
) -> impl Stream<Item = Result<Vec<Event>>> {
) -> impl Stream<Item = Vec<Event>> {
let (event_tx, event_rx) = channel(1);
let mut tx_map = HashMap::new();
for pid in ts::EIT_PIDS.iter() {
Expand Down Expand Up @@ -213,11 +218,11 @@ fn into_event_stream<S: Stream<Item = ts::TSPacket> + Send + 'static + Unpin>(
ReceiverStream::new(event_rx)
}

async fn into_event_map<S: Stream<Item = Result<Vec<Event>>> + Unpin>(
async fn into_event_map<S: Stream<Item = Vec<Event>> + Unpin>(
mut s: S,
) -> Result<BTreeMap<u16, Event>> {
let mut out = BTreeMap::new();
while let Some(events) = s.try_next().await? {
while let Some(events) = s.next().await {
for event in events.into_iter() {
out.insert(event.id, event);
}
Expand Down

0 comments on commit 7609612

Please sign in to comment.