From 606b1fc097e573c5206e958dfb966e416ccfbc7f Mon Sep 17 00:00:00 2001 From: "kevin.russ" Date: Wed, 19 Feb 2025 13:36:15 +0100 Subject: [PATCH] Add support for reading DLT messages. This change includes: - new API to read DLT messages from a source - new API to read DLT messages from a stream (#34) - new API to collect generic DLT statistics (#31) - removing of any dependency to the buf_redux crate - consolidation of the crate's feature names - increase of the crate's version to 0.20.0 --- CHANGELOG.md | 15 +- Cargo.toml | 22 +- README.md | 63 +--- examples/dlt_statistics.rs | 37 ++ examples/file_parser.rs | 53 +-- src/dlt.rs | 48 +-- src/fibex/mod.rs | 2 +- src/filtering.rs | 6 +- src/lib.rs | 5 +- src/parse.rs | 27 +- src/read.rs | 137 +++++++ src/statistics.rs | 672 +++++++++++++++------------------- src/stream.rs | 140 +++++++ src/tests/dlt_parse_tests.rs | 18 +- src/tests/mod.rs | 20 +- src/tests/statistics_tests.rs | 2 +- 16 files changed, 735 insertions(+), 532 deletions(-) create mode 100644 examples/dlt_statistics.rs create mode 100644 src/read.rs create mode 100644 src/stream.rs diff --git a/CHANGELOG.md b/CHANGELOG.md index cbdf797..03d60d8 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -7,6 +7,19 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 ## [Unreleased] +## [0.20.0] - 2025-02-25 + +### Added + +- API to read DLT messages from a source +- API to read DLT messages from a stream +- API to collect generic DLT statistics + +### Changed + +- Removed buf_redux dependency +- Cleanup feature names + ## [0.19.2] - 2025-02-06 ### Changed @@ -36,7 +49,7 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 ### Changed -- Add feature "serde-support", which adds to crate's types Serialize/Deserialize +- Add feature "serialization", which adds to crate's types Serialize/Deserialize ## [0.17.0] - 2024-10-04 diff --git a/Cargo.toml b/Cargo.toml index b400daa..8565906 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "dlt-core" -version = "0.19.2" +version = "0.20.0" authors = ["esrlabs.com"] edition = "2021" description = """ @@ -10,7 +10,6 @@ license = "Apache-2.0" repository = "https://github.com/esrlabs/dlt-core" [dependencies] -buf_redux = { version = "0.8.4", optional = true } byteorder = "1.4" bytes = "1.0" log = "0.4" @@ -21,25 +20,29 @@ rustc-hash = { version = "2.1", optional = true } serde = { version = "1.0", features = ["derive"], optional = true } serde_json = { version = "1.0", optional = true } thiserror = "1.0" +futures = "0.3" [features] default = [] -statistics = ["buf_redux", "rustc-hash"] -fibex_parser = ["quick-xml"] -debug_parser = [] -serde-support = ["serde", "serde_json"] +statistics = ["rustc-hash"] +fibex = ["quick-xml"] +debug = [] +serialization = ["serde", "serde_json"] +stream = [] [lints.rust] unexpected_cfgs = { level = "warn", check-cfg = ['cfg(tarpaulin_include)'] } [dev-dependencies] -buf_redux = "0.8.4" criterion = { version = "0.4", features = ["html_reports"] } dirs = "4.0" env_logger = "0.10" pretty_assertions = "1.3" proptest = "1.6" proptest-derive = "0.5" +tokio = { version = "1", features = ["full"] } +tokio-stream = "0.1" +tokio-util = "0.7" [[bench]] name = "dlt_benchmarks" @@ -48,3 +51,8 @@ harness = false [[example]] name = "file_parser" path = "examples/file_parser.rs" + +[[example]] +name = "dlt_statistics" +path = "examples/dlt_statistics.rs" +required-features = ["statistics"] diff --git a/README.md b/README.md index 3954ee3..517cc3c 100644 --- a/README.md +++ b/README.md @@ -18,7 +18,7 @@ Add this to your `Cargo.toml`: ```toml [dependencies] -dlt_core = "0.19" +dlt_core = "0.20" ``` This is an example of how to parse a message and serialize it back to a byte array. @@ -69,68 +69,35 @@ The following example can be run with `cargo run --example file_parser --release ```rust -use buf_redux::{policy::MinBuffered, BufReader}; -use dlt_core::parse::{dlt_message, DltParseError}; -use std::{env, fs, fs::File, io::BufRead, path::PathBuf, time::Instant}; - -const BIN_READER_CAPACITY: usize = 10 * 1024 * 1024; -const BIN_MIN_BUFFER_SPACE: usize = 10 * 1024; +use dlt_core::read::{read_message, DltMessageReader}; +use std::{env, fs, fs::File, path::PathBuf, time::Instant}; fn main() { // collect input file details let dlt_file_path = PathBuf::from(&env::args().nth(1).expect("No filename given")); let dlt_file = File::open(&dlt_file_path).expect("could not open file"); let source_file_size = fs::metadata(&dlt_file_path).expect("file size error").len(); - // create a reader that maintains a minimum amount of bytes in it's buffer - let mut reader = BufReader::with_capacity(BIN_READER_CAPACITY, dlt_file) - .set_policy(MinBuffered(BIN_MIN_BUFFER_SPACE)); // now parse all file content - let mut parsed = 0usize; + let mut dlt_reader = DltMessageReader::new(dlt_file, true); + let mut message_count = 0usize; let start = Instant::now(); loop { - let consumed: usize = match reader.fill_buf() { - Ok(content) => { - if content.is_empty() { - println!("empty content after {} parsed messages", parsed); - break; - } - let available = content.len(); - - match dlt_message(content, None, true) { - Ok((rest, _maybe_msg)) => { - let consumed = available - rest.len(); - parsed += 1; - consumed - } - Err(DltParseError::IncompleteParse { needed }) => { - println!("parse incomplete, needed: {:?}", needed); - return; - } - Err(DltParseError::ParsingHickup(reason)) => { - println!("parse error: {}", reason); - 4 //skip 4 bytes - } - Err(DltParseError::Unrecoverable(cause)) => { - println!("unrecoverable parse failure: {}", cause); - return; - } - } + match read_message(&mut dlt_reader, None).expect("read dlt message") { + Some(_message) => { + message_count += 1; } - Err(e) => { - println!("Error reading: {}", e); - return; + None => { + break; } }; - reader.consume(consumed); } - // print some stats let duration_in_s = start.elapsed().as_millis() as f64 / 1000.0; let file_size_in_mb = source_file_size as f64 / 1024.0 / 1024.0; let amount_per_second: f64 = file_size_in_mb / duration_in_s; println!( "parsing {} messages took {:.3}s! ({:.3} MB/s)", - parsed, duration_in_s, amount_per_second + message_count, duration_in_s, amount_per_second ); } @@ -150,11 +117,13 @@ Below is the revised and improved English version of the documentation: * **`statistics`**: Enables the `statistics` module, which scans the source data and provides a summary of its contents. This gives you an overview of the number of messages and their content. -* **`fibex_parser`**: Enables the `fibex` module, which allows to parse configurations for non-verbose messages from a fibex model. +* **`fibex`**: Enables the `fibex` module, which allows to parse configurations for non-verbose messages from a fibex model. + +* **`debug`**: Adds additional log output for debugging purposes. -* **`debug_parser`**: Adds additional log output for debugging purposes. +* **`serialization`**: Adds `Serialize` and `Deserialize` implementations (via `serde`) to all public types. This feature is useful if you need to encode or decode these types for transmission or storage. -* **`serde-support`**: Adds `Serialize` and `Deserialize` implementations (via `serde`) to all public types. This feature is useful if you need to encode or decode these types for transmission or storage. +* **`stream`**: Provides API for parsing DLT messages from streams. ## Example users diff --git a/examples/dlt_statistics.rs b/examples/dlt_statistics.rs new file mode 100644 index 0000000..ab893cf --- /dev/null +++ b/examples/dlt_statistics.rs @@ -0,0 +1,37 @@ +use dlt_core::{ + parse::DltParseError, + read::DltMessageReader, + statistics::{collect_statistics, Statistic, StatisticCollector}, +}; +use std::{env, fs, fs::File, path::PathBuf, time::Instant}; + +pub struct MessageCounter { + count: usize, +} + +impl StatisticCollector for MessageCounter { + fn collect_statistic(&mut self, _statistic: Statistic) -> Result<(), DltParseError> { + self.count += 1; + Ok(()) + } +} + +fn main() { + // collect input file details + let dlt_file_path = PathBuf::from(&env::args().nth(1).expect("No filename given")); + let dlt_file = File::open(&dlt_file_path).expect("could not open file"); + let source_file_size = fs::metadata(&dlt_file_path).expect("file size error").len(); + // now scan all file content + let mut dlt_reader = DltMessageReader::new(dlt_file, true); + let mut dlt_collector = MessageCounter { count: 0 }; + let start = Instant::now(); + collect_statistics(&mut dlt_reader, &mut dlt_collector).expect("collect dlt statistics"); + // print some stats + let duration_in_s = start.elapsed().as_millis() as f64 / 1000.0; + let file_size_in_mb = source_file_size as f64 / 1024.0 / 1024.0; + let amount_per_second: f64 = file_size_in_mb / duration_in_s; + println!( + "parsing {} messages took {:.3}s! ({:.3} MB/s)", + dlt_collector.count, duration_in_s, amount_per_second + ); +} diff --git a/examples/file_parser.rs b/examples/file_parser.rs index 7aade21..7d2df78 100644 --- a/examples/file_parser.rs +++ b/examples/file_parser.rs @@ -1,64 +1,31 @@ -use buf_redux::{policy::MinBuffered, BufReader}; -use dlt_core::parse::{dlt_message, DltParseError}; -use std::{env, fs, fs::File, io::BufRead, path::PathBuf, time::Instant}; - -const BIN_READER_CAPACITY: usize = 10 * 1024 * 1024; -const BIN_MIN_BUFFER_SPACE: usize = 10 * 1024; +use dlt_core::read::{read_message, DltMessageReader}; +use std::{env, fs, fs::File, path::PathBuf, time::Instant}; fn main() { // collect input file details let dlt_file_path = PathBuf::from(&env::args().nth(1).expect("No filename given")); let dlt_file = File::open(&dlt_file_path).expect("could not open file"); let source_file_size = fs::metadata(&dlt_file_path).expect("file size error").len(); - // create a reader that maintains a minimum amount of bytes in it's buffer - let mut reader = BufReader::with_capacity(BIN_READER_CAPACITY, dlt_file) - .set_policy(MinBuffered(BIN_MIN_BUFFER_SPACE)); // now parse all file content - let mut parsed = 0usize; + let mut dlt_reader = DltMessageReader::new(dlt_file, true); + let mut message_count = 0usize; let start = Instant::now(); loop { - let consumed: usize = match reader.fill_buf() { - Ok(content) => { - if content.is_empty() { - println!("empty content after {} parsed messages", parsed); - break; - } - let available = content.len(); - - match dlt_message(content, None, true) { - Ok((rest, _maybe_msg)) => { - let consumed = available - rest.len(); - parsed += 1; - consumed - } - Err(DltParseError::IncompleteParse { needed }) => { - println!("parse incomplete, needed: {:?}", needed); - return; - } - Err(DltParseError::ParsingHickup(reason)) => { - println!("parse error: {}", reason); - 4 //skip 4 bytes - } - Err(DltParseError::Unrecoverable(cause)) => { - println!("unrecoverable parse failure: {}", cause); - return; - } - } + match read_message(&mut dlt_reader, None).expect("read dlt message") { + Some(_message) => { + message_count += 1; } - Err(e) => { - println!("Error reading: {}", e); - return; + None => { + break; } }; - reader.consume(consumed); } - // print some stats let duration_in_s = start.elapsed().as_millis() as f64 / 1000.0; let file_size_in_mb = source_file_size as f64 / 1024.0 / 1024.0; let amount_per_second: f64 = file_size_in_mb / duration_in_s; println!( "parsing {} messages took {:.3}s! ({:.3} MB/s)", - parsed, duration_in_s, amount_per_second + message_count, duration_in_s, amount_per_second ); } diff --git a/src/dlt.rs b/src/dlt.rs index 8d5320f..2f9aceb 100644 --- a/src/dlt.rs +++ b/src/dlt.rs @@ -42,7 +42,7 @@ pub enum Error { /// Used to express the byte order of DLT data type fields #[cfg_attr( - feature = "serde-support", + feature = "serialization", derive(serde::Serialize, serde::Deserialize) )] #[derive(Debug, PartialEq, Eq, Hash, Clone, Copy, PartialOrd, Ord)] @@ -56,7 +56,7 @@ pub enum Endianness { /// represents a DLT message including all headers #[cfg_attr( - feature = "serde-support", + feature = "serialization", derive(serde::Serialize, serde::Deserialize) )] #[derive(Debug, Clone, PartialEq)] @@ -69,7 +69,7 @@ pub struct Message { /// Storage header is used in case of dlt entries stored in file #[cfg_attr( - feature = "serde-support", + feature = "serialization", derive(serde::Serialize, serde::Deserialize) )] #[derive(Debug, Clone, PartialEq)] @@ -82,7 +82,7 @@ pub struct StorageHeader { /// The Standard Header shall be in big endian format #[cfg_attr( - feature = "serde-support", + feature = "serialization", derive(serde::Serialize, serde::Deserialize) )] #[derive(Debug, Clone, PartialEq)] @@ -107,7 +107,7 @@ pub struct StandardHeader { /// /// The Extended Header shall be in big endian format #[cfg_attr( - feature = "serde-support", + feature = "serialization", derive(serde::Serialize, serde::Deserialize) )] #[derive(Debug, Clone, PartialEq)] @@ -144,7 +144,7 @@ pub struct ExtendedHeader { /// and payload. The payload contains of the Service ID and the contained parameters. /// #[cfg_attr( - feature = "serde-support", + feature = "serialization", derive(serde::Serialize, serde::Deserialize) )] #[derive(Debug, Clone, PartialEq)] @@ -177,7 +177,7 @@ pub enum PayloadContent { } #[cfg_attr( - feature = "serde-support", + feature = "serialization", derive(serde::Serialize, serde::Deserialize) )] #[derive(Debug, Clone, PartialEq)] @@ -343,7 +343,7 @@ impl StandardHeader { /// Representation of log levels used in DLT log messages #[cfg_attr( - feature = "serde-support", + feature = "serialization", derive(serde::Serialize, serde::Deserialize) )] #[derive(Debug, PartialEq, PartialOrd, Clone, Copy)] @@ -378,7 +378,7 @@ impl AsRef for LogLevel { /// In case the dlt message contains tracing information, the Trace-Type /// indicates different kinds of trace message types #[cfg_attr( - feature = "serde-support", + feature = "serialization", derive(serde::Serialize, serde::Deserialize) )] #[derive(Debug, PartialEq, Clone)] @@ -414,7 +414,7 @@ impl AsRef for ApplicationTraceType { /// In case the dlt message contains networking information, /// the Trace-Type indicates different kinds of network message types #[cfg_attr( - feature = "serde-support", + feature = "serialization", derive(serde::Serialize, serde::Deserialize) )] #[derive(Debug, PartialEq, Clone)] @@ -457,7 +457,7 @@ const CTRL_TYPE_RESPONSE: u8 = 0x2; /// In case the dlt message contains control information, /// the Trace-Type indicates different kinds of control message types #[cfg_attr( - feature = "serde-support", + feature = "serialization", derive(serde::Serialize, serde::Deserialize) )] #[derive(Debug, PartialEq, Clone)] @@ -498,7 +498,7 @@ impl ControlType { /// Part of the extended header, distinguishes log, trace and controll messages #[cfg_attr( - feature = "serde-support", + feature = "serialization", derive(serde::Serialize, serde::Deserialize) )] #[derive(Debug, PartialEq, Clone)] @@ -559,7 +559,7 @@ impl ExtendedHeader { /// Fixed-Point representation. only supports 32 bit and 64 bit values /// according to the spec 128 bit are possible but we don't support it #[cfg_attr( - feature = "serde-support", + feature = "serialization", derive(serde::Serialize, serde::Deserialize) )] #[derive(Debug, PartialEq, Clone)] @@ -578,7 +578,7 @@ pub(crate) fn fixed_point_value_width(v: &FixedPointValue) -> usize { /// Represents the value of an DLT argument #[cfg_attr( - feature = "serde-support", + feature = "serialization", derive(serde::Serialize, serde::Deserialize) )] #[derive(Debug, PartialEq, Clone)] @@ -603,7 +603,7 @@ pub enum Value { /// Defines what string type is used, `ASCII` or `UTF8` #[allow(clippy::upper_case_acronyms)] #[cfg_attr( - feature = "serde-support", + feature = "serialization", derive(serde::Serialize, serde::Deserialize) )] #[derive(Debug, Clone, PartialEq)] @@ -620,7 +620,7 @@ pub enum StringCoding { /// Represents the bit width of a floatingpoint value type #[cfg_attr( - feature = "serde-support", + feature = "serialization", derive(serde::Serialize, serde::Deserialize) )] #[derive(Debug, Clone, PartialEq, Copy)] @@ -639,7 +639,7 @@ pub(crate) fn float_width_to_type_length(width: FloatWidth) -> TypeLength { /// Represents the bit width of a value type #[cfg_attr( - feature = "serde-support", + feature = "serialization", derive(serde::Serialize, serde::Deserialize) )] #[derive(Debug, Clone, PartialEq, Copy)] @@ -677,7 +677,7 @@ impl TypeLength { /// /// the Array type is not yet supported and honestly I never saw anyone using it #[cfg_attr( - feature = "serde-support", + feature = "serialization", derive(serde::Serialize, serde::Deserialize) )] #[derive(Debug, Clone, PartialEq)] @@ -724,7 +724,7 @@ pub enum TypeInfoKind { /// number of characters of the associated name or unit filed. The unit /// information is to add only in some data types. #[cfg_attr( - feature = "serde-support", + feature = "serialization", derive(serde::Serialize, serde::Deserialize) )] #[derive(Debug, Clone, PartialEq)] @@ -910,7 +910,7 @@ impl TryFrom for TypeInfo { /// * i64 bit if Type Length (TYLE) equals 4 /// * i128 bit if Type Length (TYLE) equals 5 (unsupported) #[cfg_attr( - feature = "serde-support", + feature = "serialization", derive(serde::Serialize, serde::Deserialize) )] #[derive(Debug, Clone, PartialEq)] @@ -927,7 +927,7 @@ pub struct FixedPoint { /// itself, it is needed to provide information like size and type /// of the variable. This information is contained in the `type_info` field. #[cfg_attr( - feature = "serde-support", + feature = "serialization", derive(serde::Serialize, serde::Deserialize) )] #[derive(Debug, Clone, PartialEq)] @@ -1528,7 +1528,7 @@ fn payload_content_len(content: &PayloadContent) -> usize { /// Configuration options for the extended header #[cfg_attr( - feature = "serde-support", + feature = "serialization", derive(serde::Serialize, serde::Deserialize) )] #[derive(Debug, Clone, PartialEq)] @@ -1540,7 +1540,7 @@ pub struct ExtendedHeaderConfig { /// Configuration options for a DLT message, used when constructing a message #[cfg_attr( - feature = "serde-support", + feature = "serialization", derive(serde::Serialize, serde::Deserialize) )] #[derive(Debug, Clone, PartialEq)] @@ -1557,7 +1557,7 @@ pub struct MessageConfig { #[inline] fn dbg_bytes_with_info(_name: &str, _bytes: &[u8], _info: Option<&str>) { - #[cfg(feature = "debug_parser")] + #[cfg(feature = "debug")] { trace!( "writing {}: {} {:02X?} {}", diff --git a/src/fibex/mod.rs b/src/fibex/mod.rs index db33032..ebe4bf1 100644 --- a/src/fibex/mod.rs +++ b/src/fibex/mod.rs @@ -55,7 +55,7 @@ pub enum Error { /// Contains all the paths of fibex files that should be combined into the model #[cfg_attr( - feature = "serde-support", + feature = "serialization", derive(serde::Serialize, serde::Deserialize) )] #[derive(Debug)] diff --git a/src/filtering.rs b/src/filtering.rs index 2bc6a92..b0afedd 100644 --- a/src/filtering.rs +++ b/src/filtering.rs @@ -25,7 +25,7 @@ use std::{collections::HashSet, iter::FromIterator}; /// only this is possible: /// - `app-id is_one_of ["abc","foo"] AND log-level <= DEBUG` #[cfg_attr( - feature = "serde-support", + feature = "serialization", derive(serde::Serialize, serde::Deserialize) )] #[derive(Debug, Clone)] @@ -96,8 +96,8 @@ impl From<&DltFilterConfig> for ProcessedDltFilterConfig { } } -/// Read filter config from a json file. Available only with feature "serde-support" -#[cfg(feature = "serde-support")] +/// Read filter config from a json file. Available only with feature "serialization" +#[cfg(feature = "serialization")] pub fn read_filter_options(f: &mut std::fs::File) -> Option { use std::io::Read; diff --git a/src/lib.rs b/src/lib.rs index 4c96995..c1ea2dc 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -20,10 +20,13 @@ extern crate log; pub mod dlt; -#[cfg(feature = "fibex_parser")] +#[cfg(feature = "fibex")] pub mod fibex; pub mod filtering; pub mod parse; +pub mod read; +#[cfg(feature = "stream")] +pub mod stream; #[cfg(not(tarpaulin_include))] pub mod service_id; diff --git a/src/parse.rs b/src/parse.rs index 86458ea..229c54d 100644 --- a/src/parse.rs +++ b/src/parse.rs @@ -747,7 +747,7 @@ fn dlt_payload( #[inline] fn dbg_parsed(_name: &str, _before: &[u8], _after: &[u8], _value: &T) { - // #[cfg(feature = "debug_parser")] + // #[cfg(feature = "debug")] { let input_len = _before.len(); let now_len = _after.len(); @@ -794,7 +794,7 @@ pub enum ParsedMessage { /// /// 3500001F 45435500 3F88623A 16014150 5000434F 4E001100 00000472 656D6F (31 byte) /// -------------------------------------------- -///
: 35 00 001F 45435500 3F88623A +/// `
: 35 00 001F 45435500 3F88623A` /// header type = 0x35 = 0b0011 0101 /// UEH: 1 - > using extended header /// MSBF: 0 - > little endian @@ -806,7 +806,7 @@ pub enum ParsedMessage { /// ecu-id = 45435500 = "ECU " /// timestamp = 3F88623A = 106590265.0 ms since ECU startup (~30 h) /// -------------------------------------------- -/// : 16014150 5000434F 4E00 +/// `: 16014150 5000434F 4E00` /// message-info MSIN = 0x16 = 0b0001 0110 /// 0 -> non-verbose /// 011 (MSTP Message Type) = 0x3 = Dlt Control Message @@ -987,21 +987,6 @@ pub(crate) fn validated_payload_length( Ok(payload_length) } -#[cfg(feature = "statistics")] -pub(crate) fn skip_till_after_next_storage_header( - input: &[u8], -) -> Result<(&[u8], u64), DltParseError> { - match forward_to_next_storage_header(input) { - Some((consumed, rest)) => { - let (after_storage_header, skipped_bytes) = skip_storage_header(rest)?; - Ok((after_storage_header, consumed + skipped_bytes)) - } - None => Err(DltParseError::ParsingHickup( - "did not find another storage header".into(), - )), - } -} - #[allow(clippy::useless_conversion)] /// Remove the storage header from the input if present pub fn skip_storage_header(input: &[u8]) -> Result<(&[u8], u64), DltParseError> { @@ -1240,3 +1225,9 @@ pub fn construct_arguments( } Ok(arguments) } + +/// Parse the DLT message length from a slice containing a DLT standard header. +pub(crate) fn parse_length(input: &[u8]) -> IResult<&[u8], usize, DltParseError> { + let (rest, (_, length)) = tuple((take(2usize), be_u16))(input)?; + Ok((rest, length as usize)) +} diff --git a/src/read.rs b/src/read.rs new file mode 100644 index 0000000..6fdfb48 --- /dev/null +++ b/src/read.rs @@ -0,0 +1,137 @@ +// Copyright 2021 by Accenture ESR +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +//! # dlt reading support +use crate::{ + dlt::{HEADER_MIN_LENGTH, STORAGE_HEADER_LENGTH}, + filtering::ProcessedDltFilterConfig, + parse::{dlt_message, parse_length, DltParseError, ParsedMessage}, +}; +use std::io::{BufReader, Read}; + +const DEFAULT_BUFFER_CAPACITY: usize = 10 * 1024 * 1024; +const DEFAULT_MESSAGE_MAX_LEN: usize = 10 * 1024; + +/// Read and parse the next DLT message from the given reader, if any +pub fn read_message( + reader: &mut DltMessageReader, + filter_config_opt: Option<&ProcessedDltFilterConfig>, +) -> Result, DltParseError> { + let with_storage_header = reader.with_storage_header(); + let slice = reader.next_message_slice()?; + + if !slice.is_empty() { + Ok(Some( + dlt_message(slice, filter_config_opt, with_storage_header)?.1, + )) + } else { + Ok(None) + } +} + +/// Buffered reader for DLT message slices from a source. +pub struct DltMessageReader { + source: BufReader, + with_storage_header: bool, + buffer: Vec, +} + +impl DltMessageReader { + /// Create a new reader for the given source. + pub fn new(source: S, with_storage_header: bool) -> Self { + DltMessageReader::with_capacity( + DEFAULT_BUFFER_CAPACITY, + DEFAULT_MESSAGE_MAX_LEN, + source, + with_storage_header, + ) + } + + /// Create a new reader for the given source and specified capacities. + pub fn with_capacity( + buffer_capacity: usize, + message_max_len: usize, + source: S, + with_storage_header: bool, + ) -> Self { + DltMessageReader { + source: BufReader::with_capacity(buffer_capacity, source), + with_storage_header, + buffer: vec![0u8; message_max_len], + } + } + + /// Read the next message slice from the source, + /// or return an empty slice if no more message could be read. + pub fn next_message_slice(&mut self) -> Result<&[u8], DltParseError> { + let storage_len = if self.with_storage_header { + STORAGE_HEADER_LENGTH as usize + } else { + 0 + }; + let header_len = storage_len + HEADER_MIN_LENGTH as usize; + + if self + .source + .read_exact(&mut self.buffer[..header_len]) + .is_err() + { + return Ok(&[]); + } + + let (_, message_len) = parse_length(&self.buffer[storage_len..header_len])?; + let total_len = storage_len + message_len; + + self.source + .read_exact(&mut self.buffer[header_len..total_len])?; + + Ok(&self.buffer[..total_len]) + } + + /// Answer if message slices contain a `StorageHeader´. + pub fn with_storage_header(&self) -> bool { + self.with_storage_header + } +} + +#[cfg(test)] +mod tests { + use super::*; + use crate::tests::DLT_ROUNDTRIP_MESSAGE; + + #[test] + fn test_next_message() { + let mut reader = DltMessageReader::new(DLT_ROUNDTRIP_MESSAGE, true); + assert!(reader.with_storage_header()); + + let message = reader.next_message_slice().expect("message"); + assert_eq!(DLT_ROUNDTRIP_MESSAGE, message); + + assert!(reader.next_message_slice().expect("message").is_empty()); + } + + #[test] + fn test_read_message() { + let mut reader = DltMessageReader::new(DLT_ROUNDTRIP_MESSAGE, true); + assert!(reader.with_storage_header()); + + if let Some(ParsedMessage::Item(message)) = + read_message(&mut reader, None).expect("message") + { + assert_eq!(DLT_ROUNDTRIP_MESSAGE, message.as_bytes()); + } + + assert_eq!(None, read_message(&mut reader, None).expect("message")) + } +} diff --git a/src/statistics.rs b/src/statistics.rs index 680bfa6..0c076ae 100644 --- a/src/statistics.rs +++ b/src/statistics.rs @@ -14,416 +14,352 @@ //! # rapidly gather statistics info of a dlt source use crate::{ - dlt::{LogLevel, MessageType}, - parse::{ - dlt_consume_msg, dlt_extended_header, dlt_standard_header, - skip_till_after_next_storage_header, validated_payload_length, DltParseError, - }, + dlt::{ExtendedHeader, LogLevel, MessageType, StandardHeader, StorageHeader}, + parse::{dlt_extended_header, dlt_standard_header, dlt_storage_header, DltParseError}, + read::DltMessageReader, }; -use buf_redux::{policy::MinBuffered, BufReader as ReduxReader}; -use nom::bytes::streaming::take; -use rustc_hash::FxHashMap; -use std::{ - fs, - io::{BufRead, Read}, - path::Path, -}; - -pub(crate) const BIN_READER_CAPACITY: usize = 10 * 1024 * 1024; -pub(crate) const BIN_MIN_BUFFER_SPACE: usize = 10 * 1024; - -/// Parse out the `StatisticRowInfo` for the next DLT message in a byte array -pub fn dlt_statistic_row_info( - input: &[u8], - with_storage_header: bool, -) -> Result<(&[u8], StatisticRowInfo), DltParseError> { - let (after_storage_header, _) = if with_storage_header { - skip_till_after_next_storage_header(input)? - } else { - (input, 0) - }; - let (after_storage_and_normal_header, header) = dlt_standard_header(after_storage_header)?; - - let payload_length = match validated_payload_length(&header, input.len()) { - Ok(length) => length, - Err(_e) => { - return Ok(( - after_storage_and_normal_header, - StatisticRowInfo { - app_id_context_id: None, - ecu_id: header.ecu_id, - level: None, - verbose: false, - }, - )); - } - }; - if !header.has_extended_header { - // no app id, skip rest - let (after_message, _) = - take::(payload_length)(after_storage_and_normal_header)?; - return Ok(( - after_message, - StatisticRowInfo { - app_id_context_id: None, - ecu_id: header.ecu_id, - level: None, - verbose: false, - }, - )); - } +use std::io::Read; - let (after_headers, extended_header) = dlt_extended_header(after_storage_and_normal_header)?; - // skip payload - let (after_message, _) = take::(payload_length)(after_headers)?; - let level = match extended_header.message_type { - MessageType::Log(level) => Some(level), - _ => None, - }; - Ok(( - after_message, - StatisticRowInfo { - app_id_context_id: Some((extended_header.application_id, extended_header.context_id)), - ecu_id: header.ecu_id, - level, - verbose: extended_header.verbose, - }, - )) +/// Trait for a DLT statistics collector. +pub trait StatisticCollector { + fn collect_statistic(&mut self, statistic: Statistic) -> Result<(), DltParseError>; } -/// Shows how many messages per log level where found -#[cfg_attr( - feature = "serde-support", - derive(serde::Serialize, serde::Deserialize) -)] -#[derive(Debug, Default, Clone)] -pub struct LevelDistribution { - pub non_log: usize, - pub log_fatal: usize, - pub log_error: usize, - pub log_warning: usize, - pub log_info: usize, - pub log_debug: usize, - pub log_verbose: usize, - pub log_invalid: usize, +/// Available statistics on a DLT message. +pub struct Statistic<'a> { + /// The `LogLevel` of the message, if any. + pub log_level: Option, + /// The `StorageHeader` of the message, if any. + pub storage_header: Option, + /// The `StandardHeader` of the message. + pub standard_header: StandardHeader, + /// The `ExtendedHeader` of the message, if any. + pub extended_header: Option, + /// The remaining payload of the message after all headers. + pub payload: &'a [u8], + /// Answers if the message's payload is verbose. + pub is_verbose: bool, } -impl LevelDistribution { - pub fn new(level: Option) -> LevelDistribution { - let all_zero = Default::default(); - match level { - None => LevelDistribution { - non_log: 1, - ..all_zero - }, - Some(LogLevel::Fatal) => LevelDistribution { - log_fatal: 1, - ..all_zero - }, - Some(LogLevel::Error) => LevelDistribution { - log_error: 1, - ..all_zero - }, - Some(LogLevel::Warn) => LevelDistribution { - log_warning: 1, - ..all_zero - }, - Some(LogLevel::Info) => LevelDistribution { - log_info: 1, - ..all_zero - }, - Some(LogLevel::Debug) => LevelDistribution { - log_debug: 1, - ..all_zero - }, - Some(LogLevel::Verbose) => LevelDistribution { - log_verbose: 1, - ..all_zero - }, - _ => LevelDistribution { - log_invalid: 1, - ..all_zero - }, +/// Collect DLT statistics from the given reader. +pub fn collect_statistics( + reader: &mut DltMessageReader, + collector: &mut impl StatisticCollector, +) -> Result<(), DltParseError> { + let with_storage_header = reader.with_storage_header(); + + loop { + let slice = reader.next_message_slice()?; + if slice.is_empty() { + break; } - } - pub fn merge(&mut self, outside: &LevelDistribution) { - self.non_log += outside.non_log; - self.log_fatal += outside.log_fatal; - self.log_error += outside.log_error; - self.log_warning += outside.log_warning; - self.log_info += outside.log_info; - self.log_debug += outside.log_debug; - self.log_verbose += outside.log_verbose; - self.log_invalid += outside.log_invalid; + let (rest_before_standard_header, storage_header) = if with_storage_header { + let result = dlt_storage_header(slice)?; + let rest = result.0; + let header = if let Some(header) = result.1 { + Some(header.0) + } else { + None + }; + (rest, header) + } else { + (slice, None) + }; + + let (rest_after_standard_header, standard_header) = + dlt_standard_header(rest_before_standard_header)?; + + let (rest_after_all_headers, extended_header, log_level, is_verbose) = + if standard_header.has_extended_header { + let result = dlt_extended_header(rest_after_standard_header)?; + let rest = result.0; + let header = result.1; + let level = match header.message_type { + MessageType::Log(level) => Some(level), + _ => None, + }; + let verbose = header.verbose; + (rest, Some(header), level, verbose) + } else { + (rest_after_standard_header, None, None, false) + }; + + collector.collect_statistic(Statistic { + log_level, + storage_header, + standard_header, + extended_header, + payload: rest_after_all_headers, + is_verbose, + })?; } -} -type IdMap = FxHashMap; - -/// Includes the `LevelDistribution` for all `app-ids`, `context-ids` and -/// `ecu_ids` -#[cfg_attr( - feature = "serde-support", - derive(serde::Serialize, serde::Deserialize) -)] -#[derive(Debug)] -pub struct StatisticInfo { - pub app_ids: Vec<(String, LevelDistribution)>, - pub context_ids: Vec<(String, LevelDistribution)>, - pub ecu_ids: Vec<(String, LevelDistribution)>, - pub contained_non_verbose: bool, + Ok(()) } -impl StatisticInfo { - pub fn new() -> Self { - Self { - app_ids: vec![], - context_ids: vec![], - ecu_ids: vec![], - contained_non_verbose: false, - } +/// Contains the common DLT statistics. +pub mod common { + use super::*; + use rustc_hash::FxHashMap; + + type IdMap = FxHashMap; + + /// Collector for the `StatisticInfo` statistics. + pub struct StatisticInfoCollector { + app_ids: IdMap, + context_ids: IdMap, + ecu_ids: IdMap, + contained_non_verbose: bool, } - pub fn merge(&mut self, stat: StatisticInfo) { - StatisticInfo::merge_levels(&mut self.app_ids, stat.app_ids); - StatisticInfo::merge_levels(&mut self.context_ids, stat.context_ids); - StatisticInfo::merge_levels(&mut self.ecu_ids, stat.ecu_ids); - self.contained_non_verbose = self.contained_non_verbose || stat.contained_non_verbose; + impl StatisticInfoCollector { + /// Create a new statistics collector. + pub fn new() -> Self { + StatisticInfoCollector { + app_ids: FxHashMap::default(), + context_ids: FxHashMap::default(), + ecu_ids: FxHashMap::default(), + contained_non_verbose: false, + } + } + + /// Finalize and return the collected statistics. + pub fn collect(self) -> StatisticInfo { + StatisticInfo { + app_ids: self + .app_ids + .into_iter() + .collect::>(), + context_ids: self + .context_ids + .into_iter() + .collect::>(), + ecu_ids: self + .ecu_ids + .into_iter() + .collect::>(), + contained_non_verbose: self.contained_non_verbose, + } + } } - fn merge_levels( - owner: &mut Vec<(String, LevelDistribution)>, - incomes: Vec<(String, LevelDistribution)>, - ) { - incomes.iter().for_each(|(income_id, income)| { - if let Some((_, existed)) = owner.iter_mut().find(|(owner_id, _)| owner_id == income_id) - { - existed.merge(income); - } else { - owner.push((income_id.to_owned(), income.clone())); + impl StatisticCollector for StatisticInfoCollector { + fn collect_statistic(&mut self, statistic: Statistic) -> Result<(), DltParseError> { + let log_level = statistic.log_level; + + match statistic.standard_header.ecu_id { + Some(id) => add_for_level(log_level, &mut self.ecu_ids, id), + None => add_for_level(log_level, &mut self.ecu_ids, "NONE".to_string()), + }; + + if let Some(extended_header) = statistic.extended_header { + add_for_level(log_level, &mut self.app_ids, extended_header.application_id); + add_for_level(log_level, &mut self.context_ids, extended_header.context_id); } - }); + + self.contained_non_verbose = self.contained_non_verbose || !statistic.is_verbose; + + Ok(()) + } } -} -impl Default for StatisticInfo { - fn default() -> Self { - Self::new() + /// Some common statistics about collected messages. + /// Includes the `LevelDistribution` for `app-ids`, `context-ids` and `ecu_ids`. + #[cfg_attr( + feature = "serialization", + derive(serde::Serialize, serde::Deserialize) + )] + #[derive(Debug)] + pub struct StatisticInfo { + pub app_ids: Vec<(String, LevelDistribution)>, + pub context_ids: Vec<(String, LevelDistribution)>, + pub ecu_ids: Vec<(String, LevelDistribution)>, + pub contained_non_verbose: bool, } -} -/// Stats about a row in a DLT file -#[cfg_attr( - feature = "serde-support", - derive(serde::Serialize, serde::Deserialize) -)] -#[derive(Debug)] -pub struct StatisticRowInfo { - pub app_id_context_id: Option<(String, String)>, - pub ecu_id: Option, - pub level: Option, - pub verbose: bool, -} + impl StatisticInfo { + pub fn new() -> Self { + Self { + app_ids: vec![], + context_ids: vec![], + ecu_ids: vec![], + contained_non_verbose: false, + } + } -/// Read in a DLT file and collect some statistics about it -pub fn collect_dlt_stats(in_file: &Path) -> Result { - let f = fs::File::open(in_file)?; + pub fn merge(&mut self, stat: StatisticInfo) { + StatisticInfo::merge_levels(&mut self.app_ids, stat.app_ids); + StatisticInfo::merge_levels(&mut self.context_ids, stat.context_ids); + StatisticInfo::merge_levels(&mut self.ecu_ids, stat.ecu_ids); + self.contained_non_verbose = self.contained_non_verbose || stat.contained_non_verbose; + } - let mut reader = ReduxReader::with_capacity(BIN_READER_CAPACITY, f) - .set_policy(MinBuffered(BIN_MIN_BUFFER_SPACE)); + fn merge_levels( + owner: &mut Vec<(String, LevelDistribution)>, + incomes: Vec<(String, LevelDistribution)>, + ) { + incomes.iter().for_each(|(income_id, income)| { + if let Some((_, existed)) = + owner.iter_mut().find(|(owner_id, _)| owner_id == income_id) + { + existed.merge(income); + } else { + owner.push((income_id.to_owned(), income.clone())); + } + }); + } + } - let mut app_ids: IdMap = FxHashMap::default(); - let mut context_ids: IdMap = FxHashMap::default(); - let mut ecu_ids: IdMap = FxHashMap::default(); - let mut contained_non_verbose = false; - loop { - match read_one_dlt_message_info(&mut reader, true) { - Ok(Some(( - consumed, - StatisticRowInfo { - app_id_context_id: Some((app_id, context_id)), - ecu_id: ecu, - level, - verbose, + impl Default for StatisticInfo { + fn default() -> Self { + Self::new() + } + } + + /// Shows how many messages per log level where found + #[cfg_attr( + feature = "serialization", + derive(serde::Serialize, serde::Deserialize) + )] + #[derive(Debug, Default, Clone)] + pub struct LevelDistribution { + pub non_log: usize, + pub log_fatal: usize, + pub log_error: usize, + pub log_warning: usize, + pub log_info: usize, + pub log_debug: usize, + pub log_verbose: usize, + pub log_invalid: usize, + } + + impl LevelDistribution { + pub fn new(level: Option) -> LevelDistribution { + let all_zero = Default::default(); + match level { + None => LevelDistribution { + non_log: 1, + ..all_zero }, - ))) => { - contained_non_verbose = contained_non_verbose || !verbose; - reader.consume(consumed as usize); - add_for_level(level, &mut app_ids, app_id); - add_for_level(level, &mut context_ids, context_id); - match ecu { - Some(id) => add_for_level(level, &mut ecu_ids, id), - None => add_for_level(level, &mut ecu_ids, "NONE".to_string()), - }; - } - Ok(Some(( - consumed, - StatisticRowInfo { - app_id_context_id: None, - ecu_id: ecu, - level, - verbose, + Some(LogLevel::Fatal) => LevelDistribution { + log_fatal: 1, + ..all_zero + }, + Some(LogLevel::Error) => LevelDistribution { + log_error: 1, + ..all_zero + }, + Some(LogLevel::Warn) => LevelDistribution { + log_warning: 1, + ..all_zero + }, + Some(LogLevel::Info) => LevelDistribution { + log_info: 1, + ..all_zero + }, + Some(LogLevel::Debug) => LevelDistribution { + log_debug: 1, + ..all_zero + }, + Some(LogLevel::Verbose) => LevelDistribution { + log_verbose: 1, + ..all_zero + }, + _ => LevelDistribution { + log_invalid: 1, + ..all_zero }, - ))) => { - contained_non_verbose = contained_non_verbose || !verbose; - reader.consume(consumed as usize); - add_for_level(level, &mut app_ids, "NONE".to_string()); - add_for_level(level, &mut context_ids, "NONE".to_string()); - match ecu { - Some(id) => add_for_level(level, &mut ecu_ids, id), - None => add_for_level(level, &mut ecu_ids, "NONE".to_string()), - }; - } - Ok(None) => { - break; - } - Err(e) => { - // we couldn't parse the message. try to skip it and find the next. - debug!("stats...try to skip and continue parsing: {}", e); - match e { - DltParseError::ParsingHickup(reason) => { - // we couldn't parse the message. try to skip it and find the next. - reader.consume(4); // at least skip the magic DLT pattern - debug!( - "error parsing 1 dlt message, try to continue parsing: {}", - reason - ); - } - _ => return Err(e), - } } } - } - let res = StatisticInfo { - app_ids: app_ids - .into_iter() - .collect::>(), - context_ids: context_ids - .into_iter() - .collect::>(), - ecu_ids: ecu_ids - .into_iter() - .collect::>(), - contained_non_verbose, - }; - Ok(res) -} -fn read_one_dlt_message_info( - reader: &mut ReduxReader, - with_storage_header: bool, -) -> Result, DltParseError> { - match reader.fill_buf() { - Ok(content) => { - if content.is_empty() { - return Ok(None); - } - let available = content.len(); - let r = dlt_statistic_row_info(content, with_storage_header)?; - let consumed = available - r.0.len(); - Ok(Some((consumed as u64, r.1))) + pub fn merge(&mut self, outside: &LevelDistribution) { + self.non_log += outside.non_log; + self.log_fatal += outside.log_fatal; + self.log_error += outside.log_error; + self.log_warning += outside.log_warning; + self.log_info += outside.log_info; + self.log_debug += outside.log_debug; + self.log_verbose += outside.log_verbose; + self.log_invalid += outside.log_invalid; } - Err(e) => Err(DltParseError::ParsingHickup(format!( - "error while parsing dlt messages: {}", - e - ))), } -} -fn add_for_level(level: Option, ids: &mut IdMap, id: String) { - if let Some(n) = ids.get_mut(&id) { - match level { - Some(LogLevel::Fatal) => { - *n = LevelDistribution { - log_fatal: n.log_fatal + 1, - ..*n + fn add_for_level(level: Option, ids: &mut IdMap, id: String) { + if let Some(n) = ids.get_mut(&id) { + match level { + Some(LogLevel::Fatal) => { + *n = LevelDistribution { + log_fatal: n.log_fatal + 1, + ..*n + } } - } - Some(LogLevel::Error) => { - *n = LevelDistribution { - log_error: n.log_error + 1, - ..*n + Some(LogLevel::Error) => { + *n = LevelDistribution { + log_error: n.log_error + 1, + ..*n + } } - } - Some(LogLevel::Warn) => { - *n = LevelDistribution { - log_warning: n.log_warning + 1, - ..*n + Some(LogLevel::Warn) => { + *n = LevelDistribution { + log_warning: n.log_warning + 1, + ..*n + } } - } - Some(LogLevel::Info) => { - *n = LevelDistribution { - log_info: n.log_info + 1, - ..*n + Some(LogLevel::Info) => { + *n = LevelDistribution { + log_info: n.log_info + 1, + ..*n + } + } + Some(LogLevel::Debug) => { + *n = LevelDistribution { + log_debug: n.log_debug + 1, + ..*n + }; + } + Some(LogLevel::Verbose) => { + *n = LevelDistribution { + log_verbose: n.log_verbose + 1, + ..*n + }; + } + Some(LogLevel::Invalid(_)) => { + *n = LevelDistribution { + log_invalid: n.log_invalid + 1, + ..*n + }; + } + None => { + *n = LevelDistribution { + non_log: n.non_log + 1, + ..*n + }; } } - Some(LogLevel::Debug) => { - *n = LevelDistribution { - log_debug: n.log_debug + 1, - ..*n - }; - } - Some(LogLevel::Verbose) => { - *n = LevelDistribution { - log_verbose: n.log_verbose + 1, - ..*n - }; - } - Some(LogLevel::Invalid(_)) => { - *n = LevelDistribution { - log_invalid: n.log_invalid + 1, - ..*n - }; - } - None => { - *n = LevelDistribution { - non_log: n.non_log + 1, - ..*n - }; - } + } else { + ids.insert(id, LevelDistribution::new(level)); } - } else { - ids.insert(id, LevelDistribution::new(level)); } } -/// Count the dlt messages in a file. This assumes that messages are stored with using a `StorageHeader` -pub fn count_dlt_messages(input: &Path) -> Result { - if input.exists() { - let f = fs::File::open(input)?; +#[cfg(test)] +mod tests { + use super::common::*; + use super::*; + use crate::tests::DLT_ROUNDTRIP_MESSAGE; - let mut reader = ReduxReader::with_capacity(BIN_READER_CAPACITY, f) - .set_policy(MinBuffered(BIN_MIN_BUFFER_SPACE)); + #[test] + fn test_collect_statistics() { + let mut reader = DltMessageReader::new(DLT_ROUNDTRIP_MESSAGE, true); + let mut collector = StatisticInfoCollector::new(); - let mut msg_cnt: u64 = 0; - loop { - match reader.fill_buf() { - Ok(content) => { - if content.is_empty() { - break; - } - if let Ok((_rest, Some(consumed))) = dlt_consume_msg(content) { - reader.consume(consumed as usize); - msg_cnt += 1; - } else { - break; - } - } - Err(e) => { - trace!("no more content"); - return Err(DltParseError::Unrecoverable(format!( - "error for filling buffer with dlt messages: {:?}", - e - ))); - } - } - } - Ok(msg_cnt) - } else { - Err(DltParseError::Unrecoverable(format!( - "Couldn't find dlt file: {:?}", - input - ))) + collect_statistics(&mut reader, &mut collector).expect("collect statistics"); + let stats = collector.collect(); + + assert_eq!(1, stats.app_ids.len()); + assert_eq!(1, stats.context_ids.len()); + assert_eq!(1, stats.ecu_ids.len()); + assert!(!stats.contained_non_verbose); } } diff --git a/src/stream.rs b/src/stream.rs new file mode 100644 index 0000000..3787d94 --- /dev/null +++ b/src/stream.rs @@ -0,0 +1,140 @@ +// Copyright 2021 by Accenture ESR +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +//! # dlt streaming support +use crate::{ + dlt::{HEADER_MIN_LENGTH, STORAGE_HEADER_LENGTH}, + filtering::ProcessedDltFilterConfig, + parse::{dlt_message, parse_length, DltParseError, ParsedMessage}, +}; +use futures::{AsyncRead, AsyncReadExt}; + +const DEFAULT_MESSAGE_MAX_LEN: usize = 10 * 1024; + +/// Async read and parse the next DLT message from the given reader, if any. +pub async fn read_message( + reader: &mut DltStreamReader, + filter_config_opt: Option<&ProcessedDltFilterConfig>, +) -> Result, DltParseError> { + let with_storage_header = reader.with_storage_header(); + let slice = reader.next_message_slice().await?; + + if !slice.is_empty() { + Ok(Some( + dlt_message(slice, filter_config_opt, with_storage_header)?.1, + )) + } else { + Ok(None) + } +} + +/// Async reader for DLT message slices from a source. +pub struct DltStreamReader { + source: S, + with_storage_header: bool, + buffer: Vec, +} + +impl DltStreamReader { + /// Create a new reader for the given source. + pub fn new(source: S, with_storage_header: bool) -> Self { + DltStreamReader::with_capacity(DEFAULT_MESSAGE_MAX_LEN, source, with_storage_header) + } + + /// Create a new reader for the given source and specified capacity. + pub fn with_capacity(message_max_len: usize, source: S, with_storage_header: bool) -> Self { + DltStreamReader { + source, + with_storage_header, + buffer: vec![0u8; message_max_len], + } + } + + /// Async read the next message slice from the source, + /// or return an empty slice if no more message could be read. + pub async fn next_message_slice(&mut self) -> Result<&[u8], DltParseError> { + let storage_len = if self.with_storage_header { + STORAGE_HEADER_LENGTH as usize + } else { + 0 + }; + let header_len = storage_len + HEADER_MIN_LENGTH as usize; + + if self + .source + .read_exact(&mut self.buffer[..header_len]) + .await + .is_err() + { + return Ok(&[]); + } + + let (_, message_len) = parse_length(&self.buffer[storage_len..header_len])?; + let total_len = storage_len + message_len; + + self.source + .read_exact(&mut self.buffer[header_len..total_len]) + .await?; + + Ok(&self.buffer[..total_len]) + } + + /// Answer if message slices contain a `StorageHeader´. + pub fn with_storage_header(&self) -> bool { + self.with_storage_header + } +} + +#[cfg(test)] +mod tests { + use super::*; + use crate::tests::DLT_ROUNDTRIP_MESSAGE; + use futures::{stream, TryStreamExt}; + + #[tokio::test] + async fn test_next_message() { + let stream = stream::iter([Ok(DLT_ROUNDTRIP_MESSAGE)]); + let mut input = stream.into_async_read(); + let mut reader = DltStreamReader::new(&mut input, true); + assert!(reader.with_storage_header()); + + let message = reader.next_message_slice().await.expect("message"); + assert_eq!(DLT_ROUNDTRIP_MESSAGE, message); + + assert!(reader + .next_message_slice() + .await + .expect("message") + .is_empty()); + } + + #[tokio::test] + async fn test_read_message() { + let stream = stream::iter([Ok(DLT_ROUNDTRIP_MESSAGE)]); + let mut input = stream.into_async_read(); + let mut reader = DltStreamReader::new(&mut input, true); + assert!(reader.with_storage_header()); + + if let Some(ParsedMessage::Item(message)) = + read_message(&mut reader, None).await.expect("message") + { + assert_eq!(DLT_ROUNDTRIP_MESSAGE, message.as_bytes()); + } + + assert_eq!( + None, + read_message(&mut reader, None).await.expect("message") + ) + } +} diff --git a/src/tests/dlt_parse_tests.rs b/src/tests/dlt_parse_tests.rs index 1c18605..5aad333 100644 --- a/src/tests/dlt_parse_tests.rs +++ b/src/tests/dlt_parse_tests.rs @@ -77,23 +77,7 @@ mod tests { #[test] fn test_dlt_roundtrip_msg() { init_logging(); - #[rustfmt::skip] - let raw1: Vec = vec![ - 0x44, 0x4C, 0x54, 0x01, 0x46, 0x93, 0x01, 0x5D, 0x79, 0x39, 0x0E, 0x00, 0x48, 0x46, - 0x50, 0x50, 0x3D, 0x1E, 0x00, 0xA8, 0x48, 0x46, 0x50, 0x50, 0x00, 0x00, 0x02, 0x48, - 0x00, 0x1C, 0x76, 0x49, 0x51, 0x08, 0x50, 0x61, 0x72, 0x61, 0x76, 0x63, 0x73, 0x6F, - 0x00, 0x82, 0x00, 0x00, 0x1A, 0x00, 0x5B, 0x35, 0x38, 0x34, 0x3A, 0x20, 0x53, 0x6F, - 0x6D, 0x65, 0x49, 0x70, 0x50, 0x6F, 0x73, 0x69, 0x78, 0x43, 0x6C, 0x69, 0x65, 0x6E, - 0x74, 0x5D, 0x20, 0x00, 0x00, 0x82, 0x00, 0x00, 0x12, 0x00, 0x53, 0x65, 0x6E, 0x64, - 0x53, 0x6F, 0x6D, 0x65, 0x49, 0x70, 0x4D, 0x65, 0x73, 0x73, 0x61, 0x67, 0x65, 0x00, - 0x00, 0x82, 0x00, 0x00, 0x02, 0x00, 0x3A, 0x00, 0x23, 0x00, 0x00, 0x00, 0x10, 0x01, - 0x00, 0x00, 0x00, 0x82, 0x00, 0x00, 0x11, 0x00, 0x3A, 0x20, 0x69, 0x6E, 0x73, 0x74, - 0x61, 0x6E, 0x63, 0x65, 0x5F, 0x69, 0x64, 0x20, 0x30, 0x78, 0x00, 0x42, 0x00, 0x01, - 0x00, 0x01, 0x00, 0x00, 0x82, 0x00, 0x00, 0x17, 0x00, 0x20, 0x6D, 0x65, 0x6D, 0x6F, - 0x72, 0x79, 0x20, 0x62, 0x75, 0x66, 0x66, 0x65, 0x72, 0x20, 0x6C, 0x65, 0x6E, 0x67, - 0x74, 0x68, 0x20, 0x00, 0x44, 0x00, 0x00, 0x00, 0x14, 0x00, 0x00, 0x00, 0x00, 0x00, - 0x00, 0x00, - ]; + let raw1 = crate::tests::DLT_ROUNDTRIP_MESSAGE; match dlt_message(&raw1[..], None, true) { Ok((_rest, ParsedMessage::Item(msg))) => { let msg_bytes = msg.as_bytes(); diff --git a/src/tests/mod.rs b/src/tests/mod.rs index 1879a30..ceac9da 100644 --- a/src/tests/mod.rs +++ b/src/tests/mod.rs @@ -14,7 +14,25 @@ #[macro_use] mod dlt_tests; mod dlt_parse_tests; -#[cfg(feature = "fibex_parser")] +#[cfg(feature = "fibex")] mod fibex_tests; #[cfg(feature = "statistics")] mod statistics_tests; + +#[rustfmt::skip] +pub(crate) static DLT_ROUNDTRIP_MESSAGE: &[u8] = &[ + 0x44, 0x4C, 0x54, 0x01, 0x46, 0x93, 0x01, 0x5D, 0x79, 0x39, 0x0E, 0x00, 0x48, 0x46, + 0x50, 0x50, 0x3D, 0x1E, 0x00, 0xA8, 0x48, 0x46, 0x50, 0x50, 0x00, 0x00, 0x02, 0x48, + 0x00, 0x1C, 0x76, 0x49, 0x51, 0x08, 0x50, 0x61, 0x72, 0x61, 0x76, 0x63, 0x73, 0x6F, + 0x00, 0x82, 0x00, 0x00, 0x1A, 0x00, 0x5B, 0x35, 0x38, 0x34, 0x3A, 0x20, 0x53, 0x6F, + 0x6D, 0x65, 0x49, 0x70, 0x50, 0x6F, 0x73, 0x69, 0x78, 0x43, 0x6C, 0x69, 0x65, 0x6E, + 0x74, 0x5D, 0x20, 0x00, 0x00, 0x82, 0x00, 0x00, 0x12, 0x00, 0x53, 0x65, 0x6E, 0x64, + 0x53, 0x6F, 0x6D, 0x65, 0x49, 0x70, 0x4D, 0x65, 0x73, 0x73, 0x61, 0x67, 0x65, 0x00, + 0x00, 0x82, 0x00, 0x00, 0x02, 0x00, 0x3A, 0x00, 0x23, 0x00, 0x00, 0x00, 0x10, 0x01, + 0x00, 0x00, 0x00, 0x82, 0x00, 0x00, 0x11, 0x00, 0x3A, 0x20, 0x69, 0x6E, 0x73, 0x74, + 0x61, 0x6E, 0x63, 0x65, 0x5F, 0x69, 0x64, 0x20, 0x30, 0x78, 0x00, 0x42, 0x00, 0x01, + 0x00, 0x01, 0x00, 0x00, 0x82, 0x00, 0x00, 0x17, 0x00, 0x20, 0x6D, 0x65, 0x6D, 0x6F, + 0x72, 0x79, 0x20, 0x62, 0x75, 0x66, 0x66, 0x65, 0x72, 0x20, 0x6C, 0x65, 0x6E, 0x67, + 0x74, 0x68, 0x20, 0x00, 0x44, 0x00, 0x00, 0x00, 0x14, 0x00, 0x00, 0x00, 0x00, 0x00, + 0x00, 0x00, +]; diff --git a/src/tests/statistics_tests.rs b/src/tests/statistics_tests.rs index 9ba3904..ab505c8 100644 --- a/src/tests/statistics_tests.rs +++ b/src/tests/statistics_tests.rs @@ -15,7 +15,7 @@ mod tests { use crate::{ dlt::LogLevel, - statistics::{LevelDistribution, StatisticInfo}, + statistics::common::{LevelDistribution, StatisticInfo}, }; fn get_stat_entities() -> Vec<(String, LevelDistribution)> {