Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

BINEX: CRC and BNXI fixes #275

Merged
merged 15 commits into from
Nov 2, 2024
2 changes: 2 additions & 0 deletions binex/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,9 @@ rustdoc-args = ["--cfg", "docrs", "--generate-link-to-definition"]

[dependencies]
log = "0.4"
md-5 = "0.10"
thiserror = "1"
lazy_static = "1.4"
flate2 = { version = "1.0.34", optional = true }
hifitime = { version = "4.0.0-alpha", features = ["serde", "std"] }

Expand Down
12 changes: 4 additions & 8 deletions binex/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -7,15 +7,11 @@

BINEX is a simple library to decode and encode BINEX messages.
BINEX stands for BINary EXchange and is the "real time" stream oriented
version of the RINEX format.
version of the RINEX format. It is to this day, the only open source protocol
to encode GNSS and navigation data.

RINEX is a readable text format which is based on line termination and allows describing
from the minimum requirement for GNSS navigation up to very precise navigation and
other side GNSS applications.

BINEX is a binary stream (non readable) conversion to that, dedicated to GNSS receivers and hardware interfacing.
Like RINEX, it is an open source format, the specifications are described by
[UNAVCO here](https://www.unavco.org/data/gps-gnss/data-formats/binex).
While RINEX is readable and based on line termination, BINEX is real-time and
hardware orientated (at the GNSS receiver firmware level).

This library allows easy message encoding and decoding, and aims at providing seamless
convertion from RINEX back and forth.
Expand Down
5 changes: 4 additions & 1 deletion binex/benches/decoding.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,12 +8,15 @@ pub fn criterion_benchmark(c: &mut Criterion) {
let t0 = Epoch::from_gpst_seconds(10.0);
let meta = MonumentGeoMetadata::RNX2BIN;

let record = MonumentGeoRecord::new(t0, meta)
let mut record = MonumentGeoRecord::default()
.with_comment("This is a test")
.with_climatic_info("basic info")
.with_geophysical_info("another field")
.with_user_id("Test");

record.epoch = t0;
record.meta = meta;

let record = Record::new_monument_geo(record);
let msg = Message::new(true, TimeResolution::QuarterSecond, false, false, record);

Expand Down
5 changes: 4 additions & 1 deletion binex/benches/encoding.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,12 +9,15 @@ pub fn criterion_benchmark(c: &mut Criterion) {
let t0 = Epoch::from_gpst_seconds(10.0);
let meta = MonumentGeoMetadata::RNX2BIN;

let record = MonumentGeoRecord::new(t0, meta)
let mut record = MonumentGeoRecord::default()
.with_comment("This is a test")
.with_climatic_info("basic info")
.with_geophysical_info("another field")
.with_user_id("Test");

record.epoch = t0;
record.meta = meta;

let record = Record::new_monument_geo(record);
let msg = Message::new(true, TimeResolution::QuarterSecond, false, false, record);

Expand Down
229 changes: 99 additions & 130 deletions binex/src/decoder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,9 @@ use std::io::{Error as IoError, Read};
#[cfg(feature = "flate2")]
use flate2::read::GzDecoder;

use crate::{message::Message, Error};
use log::warn;
// use log::warn;

use crate::prelude::{ClosedSourceElement, Error, Message, StreamElement};

/// Abstraction for Plain or Compressed [R]
enum Reader<R: Read> {
Expand Down Expand Up @@ -38,41 +39,25 @@ impl<R: Read> Read for Reader<R> {
}
}

/// Decoder FSM
#[derive(Debug, Copy, Clone, Default, PartialEq)]
enum State {
/// Everything is OK we're consuming data
#[default]
Parsing,
/// Partial frame is found in internal Buffer.
/// We need a secondary read to complete this message
IncompleteMessage,
/// Partial frame was found in internal Buffer.
/// But the total expected payload exceeds our internal buffer capacity.
/// [Decoder] is currently limited to parsing [Message] that fits
/// in the buffer entirely. This may not apply to very length (> 1 MB) messages
/// which is the case of signal observations for example - that we do not support at the moment.
/// In this case, we proceed to trash (consume the Input interface), complete the message
/// we do not know how to interprate & move on to next message.
IncompleteTrashing,
}

/// [BINEX] Stream Decoder. Use this structure to decode all messages streamed
/// on a readable interface.
pub struct Decoder<R: Read> {
/// Internal state
state: State,
/// BINEX Stream Decoder. Use this structure to decode a serie
/// of [StreamElement]s streamed over any [Read]able interface.
pub struct Decoder<'a, R: Read> {
/// Write pointer
wr_ptr: usize,
/// Read pointer
rd_ptr: usize,
/// Internal buffer
buffer: Vec<u8>,
/// Reached EOS
eos: bool,
/// Internal buffer. Buffer is sized to fully contain
/// the "worst case" open source [Message].
buf: [u8; 4096],
/// [R]
reader: Reader<R>,
/// Used when partial frame is saved within Buffer
size_to_complete: usize,
/// Reference to past [ClosedSourceElement] (if any)
past_element: Option<ClosedSourceElement<'a>>,
}

impl<R: Read> Decoder<R> {
impl<'a, R: Read> Decoder<'a, R> {
/// Creates a new BINEX [Decoder] from [R] readable interface,
/// ready to parse incoming bytes.
/// ```
Expand All @@ -85,6 +70,7 @@ impl<R: Read> Decoder<R> {
/// let mut fd = File::open("../test_resources/BIN/mfle20190130.bnx")
/// .unwrap();
///
/// // Two generics: with M the internal buffer depth
/// let mut decoder = Decoder::new(fd);
///
/// // Consume data stream
Expand All @@ -94,7 +80,7 @@ impl<R: Read> Decoder<R> {
/// // do something
/// },
/// Some(Err(e)) => match e {
/// Error::IoError(e) => {
/// Error::IoError => {
/// // any I/O error should be handled
/// // and user should react accordingly,
/// break;
Expand All @@ -104,9 +90,6 @@ impl<R: Read> Decoder<R> {
/// // - reversed streams are not supported yet
/// // - little endian streams are not supported yet
/// },
/// Error::InvalidStartofStream => {
/// // other errors give meaningful information
/// },
/// _ => {},
/// },
/// None => {
Expand All @@ -118,11 +101,12 @@ impl<R: Read> Decoder<R> {
/// ```
pub fn new(reader: R) -> Self {
Self {
rd_ptr: 1024,
size_to_complete: 0,
eos: false,
rd_ptr: 0,
wr_ptr: 0,
buf: [0; 4096],
past_element: None,
reader: reader.into(),
state: State::default(),
buffer: [0; 1024].to_vec(),
}
}

Expand All @@ -149,7 +133,7 @@ impl<R: Read> Decoder<R> {
/// // do something
/// },
/// Some(Err(e)) => match e {
/// Error::IoError(e) => {
/// Error::IoError => {
/// // any I/O error should be handled
/// // and user should react accordingly,
/// break;
Expand All @@ -159,9 +143,6 @@ impl<R: Read> Decoder<R> {
/// // - reversed streams are not supported yet
/// // - little endian streams are not supported yet
/// },
/// Error::InvalidStartofStream => {
/// // other errors give meaningful information
/// },
/// _ => {},
/// },
/// None => {
Expand All @@ -173,108 +154,96 @@ impl<R: Read> Decoder<R> {
/// ```
pub fn new_gzip(reader: R) -> Self {
Self {
rd_ptr: 1024,
size_to_complete: 0,
state: State::default(),
buffer: [0; 1024].to_vec(),
eos: false,
rd_ptr: 0,
wr_ptr: 0,
buf: [0; 4096],
past_element: None,
reader: GzDecoder::new(reader).into(),
}
}
}

impl<R: Read> Iterator for Decoder<R> {
type Item = Result<Message, Error>;
/// Parse next message contained in stream
fn next(&mut self) -> Option<Self::Item> {
// parse internal buffer
while self.rd_ptr < 1024 && self.state == State::Parsing {
//println!("parsing: rd={}/wr={}", self.rd_ptr, 1024);
//println!("workbuf: {:?}", &self.buffer[self.rd_ptr..]);
impl<'a, R: Read> Iterator for Decoder<'a, R> {
type Item = Result<StreamElement<'a>, Error>;

match Message::decode(&self.buffer[self.rd_ptr..]) {
Ok(msg) => {
// one message fully decoded
// - increment pointer so we can move on to the next
// - and expose to User.
self.rd_ptr += msg.encoding_size();
return Some(Ok(msg));
},
Err(Error::IncompleteMessage(mlen)) => {
//print!("INCOMPLETE: rd_ptr={}/mlen={}", self.rd_ptr, mlen);
// buffer contains partial message
/// Parse next [StreamElement] contained in this BINEX stream.
fn next(&mut self) -> Option<Self::Item> {
// always try to fill internal buffer
let size = self.reader.read(&mut self.buf[self.wr_ptr..]).ok()?;
self.wr_ptr += size;
//println!("wr_ptr={}", self.wr_ptr);

// [IF] mlen (size to complete) fits in self.buffer
self.size_to_complete = mlen - self.rd_ptr;
if self.size_to_complete < 1024 {
// Then next .read() will complete this message
// and we will then be able to complete the parsing.
// Shift current content (rd_ptr=>0) and preserve then move on to Reading.
self.buffer.copy_within(self.rd_ptr..1024, 0);
self.state = State::IncompleteMessage;
} else {
// OR
// NB: some messages can be very lengthy (some MB)
// especially the signal sampling that we do not support yet.
// In this case, we simply trash the remaning amount of bytes,
// message is lost and we move on to the next SYNC
warn!("library limitation: unprocessed message");
self.state = State::IncompleteTrashing;
//println!("need to trash {} bytes", self.size_to_complete);
}
},
Err(Error::NoSyncByte) => {
// no SYNC in entire buffer
//println!(".decode no-sync");
// prepare for next read
self.rd_ptr = 1024;
//self.buffer.clear();
self.buffer = [0; 1024].to_vec();
},
Err(_) => {
// decoding error: unsupported message
// Keep iterating the buffer
self.rd_ptr += 1;
},
}
if size == 0 {
self.eos = true;
}

// read data: fill in buffer
match self.reader.read_exact(&mut self.buffer) {
Ok(_) => {
match self.state {
State::Parsing => {},
State::IncompleteMessage => {
// complete frame, move on to parsing
self.state = State::Parsing;
// try to consume one message
match Message::decode(&self.buf[self.rd_ptr..]) {
Ok(msg) => {
// one message fully decoded
// - increment pointer
// - expose to user
self.rd_ptr += msg.encoding_size();

// terminates possible [ClosedSourceElement] serie
self.past_element = None;

Some(Ok(msg.into()))
},
Err(e) => {
match e {
Error::NoSyncByte => {
// buffer does not even contain the sync byte:
// we can safely discard everything
self.wr_ptr = 0;
self.rd_ptr = 0;
if self.eos == true {
// consumed everything and EOS has been reached
return None;
}
},
State::IncompleteTrashing => {
if self.size_to_complete == 0 {
// trashed completely.
self.state = State::Parsing;
//println!("back to parsing");
Error::NonSupportedMesssage(mlen) => {
self.rd_ptr += mlen;

if self.rd_ptr > 4096 {
self.rd_ptr = 0;
self.wr_ptr = 0;
}

if self.eos == true {
// consumed everything and EOS has been reached
return None;
}
},
Error::IncompleteMessage(mlen) => {
// decoded partial valid frame
if self.rd_ptr + mlen > 4096 {
// frame would not fit in buffer:
// abort: we do not support that scenario.
// This should never happen anyway: internal buffer should be sized correctly.
self.buf = [0; 4096];
self.wr_ptr = 0;
self.rd_ptr = 0;
return Some(Err(Error::TooLargeInternalLimitation));
} else {
if self.size_to_complete < 1024 {
//println!("shiting {} bytes", self.size_to_complete);
// preserved content (shift left)
// and permit the refill that will conclude this message
self.buf.copy_within(self.rd_ptr.., 0);

// discard remaning bytes from buffer
// and move on to parsing to analyze new content
self.buffer.copy_within(self.size_to_complete.., 0);
self.state = State::Parsing;
//println!("back to parsing");
} else {
self.size_to_complete =
self.size_to_complete.saturating_add_signed(-1024);
//println!("size to trash: {}", self.size_to_complete);
}
self.wr_ptr -= self.rd_ptr;
self.rd_ptr = 0;
return Some(Err(Error::IncompleteMessage(mlen)));
}
},
_ => {
// bad content that does not look like valid BINEX.
// This is very inefficient. If returned error would increment
// the internal pointer, we could directly move on to next interesting bytes.
self.rd_ptr += 1;
},
}
// read success
self.rd_ptr = 0; // reset pointer & prepare for next Iter
Some(Err(Error::NotEnoughBytes))
},
Err(_) => {
None // EOS
Some(Err(e))
},
}
}
Expand Down
Loading
Loading