Skip to content

Commit

Permalink
Add support for reading DLT messages.
Browse files Browse the repository at this point in the history
This change includes:
- new API to read DLT messages from a source
- new API to read DLT messages from a stream (#34)
- new API to collect generic DLT statistics  (#31)
- removing of any dependency to the buf_redux crate
- consolidation of the crate's feature names
- increase of the crate's version to 0.20.0
  • Loading branch information
kruss committed Feb 26, 2025
1 parent 25706ea commit 5206e33
Show file tree
Hide file tree
Showing 15 changed files with 566 additions and 315 deletions.
15 changes: 14 additions & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,19 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0

## [Unreleased]

## [0.20.0] - 2025-02-25

### Added

- API to read DLT messages from a source
- API to read DLT messages from a stream
- API to collect generic DLT statistics

### Changed

- Removed buf_redux dependency
- Cleanup feature names

## [0.19.2] - 2025-02-06

### Changed
Expand Down Expand Up @@ -36,7 +49,7 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0

### Changed

- Add feature "serde-support", which adds to crate's types Serialize/Deserialize
- Add feature "serialization", which adds to crate's types Serialize/Deserialize

## [0.17.0] - 2024-10-04

Expand Down
22 changes: 15 additions & 7 deletions Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[package]
name = "dlt-core"
version = "0.19.2"
version = "0.20.0"
authors = ["esrlabs.com"]
edition = "2021"
description = """
Expand All @@ -10,7 +10,6 @@ license = "Apache-2.0"
repository = "https://github.com/esrlabs/dlt-core"

[dependencies]
buf_redux = { version = "0.8.4", optional = true }
byteorder = "1.4"
bytes = "1.0"
log = "0.4"
Expand All @@ -21,25 +20,29 @@ rustc-hash = { version = "2.1", optional = true }
serde = { version = "1.0", features = ["derive"], optional = true }
serde_json = { version = "1.0", optional = true }
thiserror = "1.0"
futures = "0.3"

[features]
default = []
statistics = ["buf_redux", "rustc-hash"]
fibex_parser = ["quick-xml"]
debug_parser = []
serde-support = ["serde", "serde_json"]
statistics = ["rustc-hash"]
fibex = ["quick-xml"]
debug = []
serialization = ["serde", "serde_json"]
stream = []

[lints.rust]
unexpected_cfgs = { level = "warn", check-cfg = ['cfg(tarpaulin_include)'] }

[dev-dependencies]
buf_redux = "0.8.4"
criterion = { version = "0.4", features = ["html_reports"] }
dirs = "4.0"
env_logger = "0.10"
pretty_assertions = "1.3"
proptest = "1.6"
proptest-derive = "0.5"
tokio = { version = "1", features = ["full"] }
tokio-stream = "0.1"
tokio-util = "0.7"

[[bench]]
name = "dlt_benchmarks"
Expand All @@ -48,3 +51,8 @@ harness = false
[[example]]
name = "file_parser"
path = "examples/file_parser.rs"

[[example]]
name = "dlt_statistics"
path = "examples/dlt_statistics.rs"
required-features = ["statistics"]
63 changes: 16 additions & 47 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ Add this to your `Cargo.toml`:

```toml
[dependencies]
dlt_core = "0.19"
dlt_core = "0.20"
```

This is an example of how to parse a message and serialize it back to a byte array.
Expand Down Expand Up @@ -69,68 +69,35 @@ The following example can be run with `cargo run --example file_parser --release

<!-- example start -->
```rust
use buf_redux::{policy::MinBuffered, BufReader};
use dlt_core::parse::{dlt_message, DltParseError};
use std::{env, fs, fs::File, io::BufRead, path::PathBuf, time::Instant};

const BIN_READER_CAPACITY: usize = 10 * 1024 * 1024;
const BIN_MIN_BUFFER_SPACE: usize = 10 * 1024;
use dlt_core::read::{read_message, DltMessageReader};
use std::{env, fs, fs::File, path::PathBuf, time::Instant};

fn main() {
// collect input file details
let dlt_file_path = PathBuf::from(&env::args().nth(1).expect("No filename given"));
let dlt_file = File::open(&dlt_file_path).expect("could not open file");
let source_file_size = fs::metadata(&dlt_file_path).expect("file size error").len();
// create a reader that maintains a minimum amount of bytes in it's buffer
let mut reader = BufReader::with_capacity(BIN_READER_CAPACITY, dlt_file)
.set_policy(MinBuffered(BIN_MIN_BUFFER_SPACE));
// now parse all file content
let mut parsed = 0usize;
let mut dlt_reader = DltMessageReader::new(dlt_file, true);
let mut message_count = 0usize;
let start = Instant::now();
loop {
let consumed: usize = match reader.fill_buf() {
Ok(content) => {
if content.is_empty() {
println!("empty content after {} parsed messages", parsed);
break;
}
let available = content.len();

match dlt_message(content, None, true) {
Ok((rest, _maybe_msg)) => {
let consumed = available - rest.len();
parsed += 1;
consumed
}
Err(DltParseError::IncompleteParse { needed }) => {
println!("parse incomplete, needed: {:?}", needed);
return;
}
Err(DltParseError::ParsingHickup(reason)) => {
println!("parse error: {}", reason);
4 //skip 4 bytes
}
Err(DltParseError::Unrecoverable(cause)) => {
println!("unrecoverable parse failure: {}", cause);
return;
}
}
match read_message(&mut dlt_reader, None).expect("read dlt message") {
Some(_message) => {
message_count += 1;
}
Err(e) => {
println!("Error reading: {}", e);
return;
None => {
break;
}
};
reader.consume(consumed);
}

// print some stats
let duration_in_s = start.elapsed().as_millis() as f64 / 1000.0;
let file_size_in_mb = source_file_size as f64 / 1024.0 / 1024.0;
let amount_per_second: f64 = file_size_in_mb / duration_in_s;
println!(
"parsing {} messages took {:.3}s! ({:.3} MB/s)",
parsed, duration_in_s, amount_per_second
message_count, duration_in_s, amount_per_second
);
}

Expand All @@ -150,11 +117,13 @@ Below is the revised and improved English version of the documentation:

* **`statistics`**: Enables the `statistics` module, which scans the source data and provides a summary of its contents. This gives you an overview of the number of messages and their content.

* **`fibex_parser`**: Enables the `fibex` module, which allows to parse configurations for non-verbose messages from a fibex model.
* **`fibex`**: Enables the `fibex` module, which allows to parse configurations for non-verbose messages from a fibex model.

* **`debug`**: Adds additional log output for debugging purposes.

* **`debug_parser`**: Adds additional log output for debugging purposes.
* **`serialization`**: Adds `Serialize` and `Deserialize` implementations (via `serde`) to all public types. This feature is useful if you need to encode or decode these types for transmission or storage.

* **`serde-support`**: Adds `Serialize` and `Deserialize` implementations (via `serde`) to all public types. This feature is useful if you need to encode or decode these types for transmission or storage.
* **`stream`**: Provides API for parsing DLT messages from streams.

## Example users

Expand Down
34 changes: 34 additions & 0 deletions examples/dlt_statistics.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
use dlt_core::{
parse::DltParseError,
statistics::{collect_dlt_statistics, StatisticCollector},
};
use std::{env, fs, path::PathBuf, time::Instant};

pub struct MessageCounter {
count: usize,
}

impl StatisticCollector for MessageCounter {
fn collect_message(&mut self, _message: &[u8]) -> Result<(), DltParseError> {
self.count += 1;
Ok(())
}
}

fn main() {
// collect input file details
let dlt_file_path = PathBuf::from(&env::args().nth(1).expect("No filename given"));
let source_file_size = fs::metadata(&dlt_file_path).expect("file size error").len();
// now parse all file content
let mut collector = MessageCounter { count: 0 };
let start = Instant::now();
collect_dlt_statistics(&dlt_file_path, &mut collector).expect("collect dlt statistics");
// print some stats
let duration_in_s = start.elapsed().as_millis() as f64 / 1000.0;
let file_size_in_mb = source_file_size as f64 / 1024.0 / 1024.0;
let amount_per_second: f64 = file_size_in_mb / duration_in_s;
println!(
"parsing {} messages took {:.3}s! ({:.3} MB/s)",
collector.count, duration_in_s, amount_per_second
);
}
53 changes: 10 additions & 43 deletions examples/file_parser.rs
Original file line number Diff line number Diff line change
@@ -1,64 +1,31 @@
use buf_redux::{policy::MinBuffered, BufReader};
use dlt_core::parse::{dlt_message, DltParseError};
use std::{env, fs, fs::File, io::BufRead, path::PathBuf, time::Instant};

const BIN_READER_CAPACITY: usize = 10 * 1024 * 1024;
const BIN_MIN_BUFFER_SPACE: usize = 10 * 1024;
use dlt_core::read::{read_message, DltMessageReader};
use std::{env, fs, fs::File, path::PathBuf, time::Instant};

fn main() {
// collect input file details
let dlt_file_path = PathBuf::from(&env::args().nth(1).expect("No filename given"));
let dlt_file = File::open(&dlt_file_path).expect("could not open file");
let source_file_size = fs::metadata(&dlt_file_path).expect("file size error").len();
// create a reader that maintains a minimum amount of bytes in it's buffer
let mut reader = BufReader::with_capacity(BIN_READER_CAPACITY, dlt_file)
.set_policy(MinBuffered(BIN_MIN_BUFFER_SPACE));
// now parse all file content
let mut parsed = 0usize;
let mut dlt_reader = DltMessageReader::new(dlt_file, true);
let mut message_count = 0usize;
let start = Instant::now();
loop {
let consumed: usize = match reader.fill_buf() {
Ok(content) => {
if content.is_empty() {
println!("empty content after {} parsed messages", parsed);
break;
}
let available = content.len();

match dlt_message(content, None, true) {
Ok((rest, _maybe_msg)) => {
let consumed = available - rest.len();
parsed += 1;
consumed
}
Err(DltParseError::IncompleteParse { needed }) => {
println!("parse incomplete, needed: {:?}", needed);
return;
}
Err(DltParseError::ParsingHickup(reason)) => {
println!("parse error: {}", reason);
4 //skip 4 bytes
}
Err(DltParseError::Unrecoverable(cause)) => {
println!("unrecoverable parse failure: {}", cause);
return;
}
}
match read_message(&mut dlt_reader, None).expect("read dlt message") {
Some(_message) => {
message_count += 1;
}
Err(e) => {
println!("Error reading: {}", e);
return;
None => {
break;
}
};
reader.consume(consumed);
}

// print some stats
let duration_in_s = start.elapsed().as_millis() as f64 / 1000.0;
let file_size_in_mb = source_file_size as f64 / 1024.0 / 1024.0;
let amount_per_second: f64 = file_size_in_mb / duration_in_s;
println!(
"parsing {} messages took {:.3}s! ({:.3} MB/s)",
parsed, duration_in_s, amount_per_second
message_count, duration_in_s, amount_per_second
);
}
Loading

0 comments on commit 5206e33

Please sign in to comment.