Skip to content

Commit

Permalink
BINEX: CRC and BNXI fixes (#275)
Browse files Browse the repository at this point in the history
* Prepare for CRC field management
  -  Working on CRC16/CRC32 implementation
* Improve Decoder implementation
* Fixed ephemeris encoding/decoding
* Fixed 1-4 BNXI
* Introduced StreamElement
* Correct and improve geodetic marker frames
  - Reflect that only comments may be repeated
* Introduce ClosedSourceMeta
* Update readme

---------

Signed-off-by: Guillaume W. Bres <[email protected]>
  • Loading branch information
gwbres authored Nov 2, 2024
1 parent c317d8a commit d6ed7be
Show file tree
Hide file tree
Showing 26 changed files with 1,865 additions and 1,291 deletions.
2 changes: 2 additions & 0 deletions binex/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,9 @@ rustdoc-args = ["--cfg", "docrs", "--generate-link-to-definition"]

[dependencies]
log = "0.4"
md-5 = "0.10"
thiserror = "1"
lazy_static = "1.4"
flate2 = { version = "1.0.34", optional = true }
hifitime = { version = "4.0.0-alpha", features = ["serde", "std"] }

Expand Down
12 changes: 4 additions & 8 deletions binex/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -7,15 +7,11 @@

BINEX is a simple library to decode and encode BINEX messages.
BINEX stands for BINary EXchange and is the "real time" stream oriented
version of the RINEX format.
version of the RINEX format. It is to this day, the only open source protocol
to encode GNSS and navigation data.

RINEX is a readable text format which is based on line termination and allows describing
from the minimum requirement for GNSS navigation up to very precise navigation and
other side GNSS applications.

BINEX is a binary stream (non readable) conversion to that, dedicated to GNSS receivers and hardware interfacing.
Like RINEX, it is an open source format, the specifications are described by
[UNAVCO here](https://www.unavco.org/data/gps-gnss/data-formats/binex).
While RINEX is readable and based on line termination, BINEX is real-time and
hardware orientated (at the GNSS receiver firmware level).

This library allows easy message encoding and decoding, and aims at providing seamless
convertion from RINEX back and forth.
Expand Down
5 changes: 4 additions & 1 deletion binex/benches/decoding.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,12 +8,15 @@ pub fn criterion_benchmark(c: &mut Criterion) {
let t0 = Epoch::from_gpst_seconds(10.0);
let meta = MonumentGeoMetadata::RNX2BIN;

let record = MonumentGeoRecord::new(t0, meta)
let mut record = MonumentGeoRecord::default()
.with_comment("This is a test")
.with_climatic_info("basic info")
.with_geophysical_info("another field")
.with_user_id("Test");

record.epoch = t0;
record.meta = meta;

let record = Record::new_monument_geo(record);
let msg = Message::new(true, TimeResolution::QuarterSecond, false, false, record);

Expand Down
5 changes: 4 additions & 1 deletion binex/benches/encoding.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,12 +9,15 @@ pub fn criterion_benchmark(c: &mut Criterion) {
let t0 = Epoch::from_gpst_seconds(10.0);
let meta = MonumentGeoMetadata::RNX2BIN;

let record = MonumentGeoRecord::new(t0, meta)
let mut record = MonumentGeoRecord::default()
.with_comment("This is a test")
.with_climatic_info("basic info")
.with_geophysical_info("another field")
.with_user_id("Test");

record.epoch = t0;
record.meta = meta;

let record = Record::new_monument_geo(record);
let msg = Message::new(true, TimeResolution::QuarterSecond, false, false, record);

Expand Down
229 changes: 99 additions & 130 deletions binex/src/decoder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,9 @@ use std::io::{Error as IoError, Read};
#[cfg(feature = "flate2")]
use flate2::read::GzDecoder;

use crate::{message::Message, Error};
use log::warn;
// use log::warn;

use crate::prelude::{ClosedSourceElement, Error, Message, StreamElement};

/// Abstraction for Plain or Compressed [R]
enum Reader<R: Read> {
Expand Down Expand Up @@ -38,41 +39,25 @@ impl<R: Read> Read for Reader<R> {
}
}

/// Decoder FSM
#[derive(Debug, Copy, Clone, Default, PartialEq)]
enum State {
/// Everything is OK we're consuming data
#[default]
Parsing,
/// Partial frame is found in internal Buffer.
/// We need a secondary read to complete this message
IncompleteMessage,
/// Partial frame was found in internal Buffer.
/// But the total expected payload exceeds our internal buffer capacity.
/// [Decoder] is currently limited to parsing [Message] that fits
/// in the buffer entirely. This may not apply to very length (> 1 MB) messages
/// which is the case of signal observations for example - that we do not support at the moment.
/// In this case, we proceed to trash (consume the Input interface), complete the message
/// we do not know how to interprate & move on to next message.
IncompleteTrashing,
}

/// [BINEX] Stream Decoder. Use this structure to decode all messages streamed
/// on a readable interface.
pub struct Decoder<R: Read> {
/// Internal state
state: State,
/// BINEX Stream Decoder. Use this structure to decode a serie
/// of [StreamElement]s streamed over any [Read]able interface.
pub struct Decoder<'a, R: Read> {
/// Write pointer
wr_ptr: usize,
/// Read pointer
rd_ptr: usize,
/// Internal buffer
buffer: Vec<u8>,
/// Reached EOS
eos: bool,
/// Internal buffer. Buffer is sized to fully contain
/// the "worst case" open source [Message].
buf: [u8; 4096],
/// [R]
reader: Reader<R>,
/// Used when partial frame is saved within Buffer
size_to_complete: usize,
/// Reference to past [ClosedSourceElement] (if any)
past_element: Option<ClosedSourceElement<'a>>,
}

impl<R: Read> Decoder<R> {
impl<'a, R: Read> Decoder<'a, R> {
/// Creates a new BINEX [Decoder] from [R] readable interface,
/// ready to parse incoming bytes.
/// ```
Expand All @@ -85,6 +70,7 @@ impl<R: Read> Decoder<R> {
/// let mut fd = File::open("../test_resources/BIN/mfle20190130.bnx")
/// .unwrap();
///
/// // Two generics: with M the internal buffer depth
/// let mut decoder = Decoder::new(fd);
///
/// // Consume data stream
Expand All @@ -94,7 +80,7 @@ impl<R: Read> Decoder<R> {
/// // do something
/// },
/// Some(Err(e)) => match e {
/// Error::IoError(e) => {
/// Error::IoError => {
/// // any I/O error should be handled
/// // and user should react accordingly,
/// break;
Expand All @@ -104,9 +90,6 @@ impl<R: Read> Decoder<R> {
/// // - reversed streams are not supported yet
/// // - little endian streams are not supported yet
/// },
/// Error::InvalidStartofStream => {
/// // other errors give meaningful information
/// },
/// _ => {},
/// },
/// None => {
Expand All @@ -118,11 +101,12 @@ impl<R: Read> Decoder<R> {
/// ```
pub fn new(reader: R) -> Self {
Self {
rd_ptr: 1024,
size_to_complete: 0,
eos: false,
rd_ptr: 0,
wr_ptr: 0,
buf: [0; 4096],
past_element: None,
reader: reader.into(),
state: State::default(),
buffer: [0; 1024].to_vec(),
}
}

Expand All @@ -149,7 +133,7 @@ impl<R: Read> Decoder<R> {
/// // do something
/// },
/// Some(Err(e)) => match e {
/// Error::IoError(e) => {
/// Error::IoError => {
/// // any I/O error should be handled
/// // and user should react accordingly,
/// break;
Expand All @@ -159,9 +143,6 @@ impl<R: Read> Decoder<R> {
/// // - reversed streams are not supported yet
/// // - little endian streams are not supported yet
/// },
/// Error::InvalidStartofStream => {
/// // other errors give meaningful information
/// },
/// _ => {},
/// },
/// None => {
Expand All @@ -173,108 +154,96 @@ impl<R: Read> Decoder<R> {
/// ```
pub fn new_gzip(reader: R) -> Self {
Self {
rd_ptr: 1024,
size_to_complete: 0,
state: State::default(),
buffer: [0; 1024].to_vec(),
eos: false,
rd_ptr: 0,
wr_ptr: 0,
buf: [0; 4096],
past_element: None,
reader: GzDecoder::new(reader).into(),
}
}
}

impl<R: Read> Iterator for Decoder<R> {
type Item = Result<Message, Error>;
/// Parse next message contained in stream
fn next(&mut self) -> Option<Self::Item> {
// parse internal buffer
while self.rd_ptr < 1024 && self.state == State::Parsing {
//println!("parsing: rd={}/wr={}", self.rd_ptr, 1024);
//println!("workbuf: {:?}", &self.buffer[self.rd_ptr..]);
impl<'a, R: Read> Iterator for Decoder<'a, R> {
type Item = Result<StreamElement<'a>, Error>;

match Message::decode(&self.buffer[self.rd_ptr..]) {
Ok(msg) => {
// one message fully decoded
// - increment pointer so we can move on to the next
// - and expose to User.
self.rd_ptr += msg.encoding_size();
return Some(Ok(msg));
},
Err(Error::IncompleteMessage(mlen)) => {
//print!("INCOMPLETE: rd_ptr={}/mlen={}", self.rd_ptr, mlen);
// buffer contains partial message
/// Parse next [StreamElement] contained in this BINEX stream.
fn next(&mut self) -> Option<Self::Item> {
// always try to fill internal buffer
let size = self.reader.read(&mut self.buf[self.wr_ptr..]).ok()?;
self.wr_ptr += size;
//println!("wr_ptr={}", self.wr_ptr);

// [IF] mlen (size to complete) fits in self.buffer
self.size_to_complete = mlen - self.rd_ptr;
if self.size_to_complete < 1024 {
// Then next .read() will complete this message
// and we will then be able to complete the parsing.
// Shift current content (rd_ptr=>0) and preserve then move on to Reading.
self.buffer.copy_within(self.rd_ptr..1024, 0);
self.state = State::IncompleteMessage;
} else {
// OR
// NB: some messages can be very lengthy (some MB)
// especially the signal sampling that we do not support yet.
// In this case, we simply trash the remaning amount of bytes,
// message is lost and we move on to the next SYNC
warn!("library limitation: unprocessed message");
self.state = State::IncompleteTrashing;
//println!("need to trash {} bytes", self.size_to_complete);
}
},
Err(Error::NoSyncByte) => {
// no SYNC in entire buffer
//println!(".decode no-sync");
// prepare for next read
self.rd_ptr = 1024;
//self.buffer.clear();
self.buffer = [0; 1024].to_vec();
},
Err(_) => {
// decoding error: unsupported message
// Keep iterating the buffer
self.rd_ptr += 1;
},
}
if size == 0 {
self.eos = true;
}

// read data: fill in buffer
match self.reader.read_exact(&mut self.buffer) {
Ok(_) => {
match self.state {
State::Parsing => {},
State::IncompleteMessage => {
// complete frame, move on to parsing
self.state = State::Parsing;
// try to consume one message
match Message::decode(&self.buf[self.rd_ptr..]) {
Ok(msg) => {
// one message fully decoded
// - increment pointer
// - expose to user
self.rd_ptr += msg.encoding_size();

// terminates possible [ClosedSourceElement] serie
self.past_element = None;

Some(Ok(msg.into()))
},
Err(e) => {
match e {
Error::NoSyncByte => {
// buffer does not even contain the sync byte:
// we can safely discard everything
self.wr_ptr = 0;
self.rd_ptr = 0;
if self.eos == true {
// consumed everything and EOS has been reached
return None;
}
},
State::IncompleteTrashing => {
if self.size_to_complete == 0 {
// trashed completely.
self.state = State::Parsing;
//println!("back to parsing");
Error::NonSupportedMesssage(mlen) => {
self.rd_ptr += mlen;

if self.rd_ptr > 4096 {
self.rd_ptr = 0;
self.wr_ptr = 0;
}

if self.eos == true {
// consumed everything and EOS has been reached
return None;
}
},
Error::IncompleteMessage(mlen) => {
// decoded partial valid frame
if self.rd_ptr + mlen > 4096 {
// frame would not fit in buffer:
// abort: we do not support that scenario.
// This should never happen anyway: internal buffer should be sized correctly.
self.buf = [0; 4096];
self.wr_ptr = 0;
self.rd_ptr = 0;
return Some(Err(Error::TooLargeInternalLimitation));
} else {
if self.size_to_complete < 1024 {
//println!("shiting {} bytes", self.size_to_complete);
// preserved content (shift left)
// and permit the refill that will conclude this message
self.buf.copy_within(self.rd_ptr.., 0);

// discard remaning bytes from buffer
// and move on to parsing to analyze new content
self.buffer.copy_within(self.size_to_complete.., 0);
self.state = State::Parsing;
//println!("back to parsing");
} else {
self.size_to_complete =
self.size_to_complete.saturating_add_signed(-1024);
//println!("size to trash: {}", self.size_to_complete);
}
self.wr_ptr -= self.rd_ptr;
self.rd_ptr = 0;
return Some(Err(Error::IncompleteMessage(mlen)));
}
},
_ => {
// bad content that does not look like valid BINEX.
// This is very inefficient. If returned error would increment
// the internal pointer, we could directly move on to next interesting bytes.
self.rd_ptr += 1;
},
}
// read success
self.rd_ptr = 0; // reset pointer & prepare for next Iter
Some(Err(Error::NotEnoughBytes))
},
Err(_) => {
None // EOS
Some(Err(e))
},
}
}
Expand Down
Loading

0 comments on commit d6ed7be

Please sign in to comment.