diff --git a/application/apps/indexer/addons/dlt-tools/src/lib.rs b/application/apps/indexer/addons/dlt-tools/src/lib.rs index 7bd83adb12..0c83b144c8 100644 --- a/application/apps/indexer/addons/dlt-tools/src/lib.rs +++ b/application/apps/indexer/addons/dlt-tools/src/lib.rs @@ -37,7 +37,7 @@ pub async fn scan_dlt_ft( let reader = BufReader::new(&input); let source = BinaryByteSource::new(reader); let parser = DltParser::new(filter.map(|f| f.into()), None, None, with_storage_header); - let mut producer = MessageProducer::new(parser, source, None); + let mut producer = MessageProducer::new(parser, source, Vec::new(), None); let stream = producer.as_stream(); pin_mut!(stream); diff --git a/application/apps/indexer/indexer_cli/src/interactive.rs b/application/apps/indexer/indexer_cli/src/interactive.rs index a12215d399..054eb3aaa1 100644 --- a/application/apps/indexer/indexer_cli/src/interactive.rs +++ b/application/apps/indexer/indexer_cli/src/interactive.rs @@ -45,7 +45,7 @@ pub(crate) async fn handle_interactive_session(input: Option) { static RECEIVER: &str = "127.0.0.1:5000"; let udp_source = UdpSource::new(RECEIVER, vec![]).await.unwrap(); let dlt_parser = DltParser::new(None, None, None, false); - let mut dlt_msg_producer = MessageProducer::new(dlt_parser, udp_source, None); + let mut dlt_msg_producer = MessageProducer::new(dlt_parser, udp_source, Vec::new(), None); let msg_stream = dlt_msg_producer.as_stream(); pin_mut!(msg_stream); loop { diff --git a/application/apps/indexer/indexer_cli/src/main.rs b/application/apps/indexer/indexer_cli/src/main.rs index 949642d4a3..eda3113df5 100644 --- a/application/apps/indexer/indexer_cli/src/main.rs +++ b/application/apps/indexer/indexer_cli/src/main.rs @@ -806,7 +806,7 @@ pub async fn main() -> Result<()> { let dlt_parser = DltParser::new(None, None, None, true); let reader = BufReader::new(&in_file); let source = BinaryByteSource::new(reader); - let mut dlt_msg_producer = MessageProducer::new(dlt_parser, source, None); + let mut dlt_msg_producer = MessageProducer::new(dlt_parser, source, Vec::new(), None); let cancel = CancellationToken::new(); export_raw( Box::pin(dlt_msg_producer.as_stream()), @@ -1371,7 +1371,7 @@ async fn count_dlt_messages(input: &Path) -> Result { let source = BinaryByteSource::new(second_reader); - let mut dlt_msg_producer = MessageProducer::new(dlt_parser, source, None); + let mut dlt_msg_producer = MessageProducer::new(dlt_parser, source, Vec::new(), None); let msg_stream = dlt_msg_producer.as_stream(); Ok(msg_stream.count().await as u64) } else { @@ -1388,7 +1388,7 @@ async fn detect_messages_type(input: &Path) -> Result { let buf_reader = BufReader::new(fs::File::open(input)?); let source = BinaryByteSource::new(buf_reader); let dlt_parser = DltRangeParser::new(); - let mut dlt_msg_producer = MessageProducer::new(dlt_parser, source, None); + let mut dlt_msg_producer = MessageProducer::new(dlt_parser, source, Vec::new(), None); let msg_stream = dlt_msg_producer.as_stream(); pin_mut!(msg_stream); let mut item_count = 0usize; @@ -1434,7 +1434,8 @@ async fn detect_messages_type(input: &Path) -> Result { let some_parser = SomeipParser::new(); match PcapngByteSource::new(fs::File::open(input)?) { Ok(source) => { - let mut some_msg_producer = MessageProducer::new(some_parser, source, None); + let mut some_msg_producer = + MessageProducer::new(some_parser, source, Vec::new(), None); let msg_stream = some_msg_producer.as_stream(); pin_mut!(msg_stream); let mut item_count = 0usize; @@ -1473,7 +1474,8 @@ async fn detect_messages_type(input: &Path) -> Result { // let buf_reader = BufReader::new(fs::File::open(&input)?); match PcapngByteSource::new(fs::File::open(input)?) { Ok(source) => { - let mut dlt_msg_producer = MessageProducer::new(dlt_parser, source, None); + let mut dlt_msg_producer = + MessageProducer::new(dlt_parser, source, Vec::new(), None); let msg_stream = dlt_msg_producer.as_stream(); pin_mut!(msg_stream); let mut item_count = 0usize; @@ -1525,7 +1527,7 @@ async fn detect_messages_type(input: &Path) -> Result { let txt_parser = StringTokenizer {}; let buf_reader = BufReader::new(fs::File::open(input)?); let source = BinaryByteSource::new(buf_reader); - let mut txt_msg_producer = MessageProducer::new(txt_parser, source, None); + let mut txt_msg_producer = MessageProducer::new(txt_parser, source, Vec::new(), None); let msg_stream = txt_msg_producer.as_stream(); pin_mut!(msg_stream); let mut item_count = 0usize; diff --git a/application/apps/indexer/parsers/src/dlt/mod.rs b/application/apps/indexer/parsers/src/dlt/mod.rs index dae82f4ce4..51172ec473 100644 --- a/application/apps/indexer/parsers/src/dlt/mod.rs +++ b/application/apps/indexer/parsers/src/dlt/mod.rs @@ -1,7 +1,7 @@ pub mod attachment; pub mod fmt; -use crate::{dlt::fmt::FormattableMessage, Error, LogMessage, ParseYield, Parser}; +use crate::{dlt::fmt::FormattableMessage, Error, LogMessage, ParseYield, Parser, ParserAlias}; use byteorder::{BigEndian, WriteBytesExt}; pub use dlt_core::{ dlt::LogLevel, @@ -64,7 +64,7 @@ impl LogMessage for RawMessage { } } -#[derive(Default)] +#[derive(Default, Debug)] pub struct DltParser<'m> { pub filter_config: Option, pub fibex_metadata: Option<&'m FibexMetadata>, @@ -120,6 +120,7 @@ impl<'m> Parser> for DltParser<'m> { &mut self, input: &'b [u8], timestamp: Option, + mut nested: impl FnMut(&'b [u8], ParserAlias) -> Option, ) -> Result<(&'b [u8], Option>>), Error> { match dlt_message(input, self.filter_config.as_ref(), self.with_storage_header) .map_err(|e| Error::Parse(format!("{e}")))? @@ -145,6 +146,12 @@ impl<'m> Parser> for DltParser<'m> { options: self.fmt_options, }; self.offset += input.len() - rest.len(); + + debug_assert!({ + let _ = nested(&[0, 0, 0, 0, 0], ParserAlias::SomeIp); + true + }); + Ok(( rest, if let Some(attachment) = attachment { @@ -163,6 +170,7 @@ impl Parser for DltRangeParser { &mut self, input: &'b [u8], _timestamp: Option, + _nested: impl FnMut(&'b [u8], ParserAlias) -> Option, ) -> Result<(&'b [u8], Option>), Error> { let (rest, consumed) = dlt_consume_msg(input).map_err(|e| Error::Parse(format!("{e}")))?; let msg = consumed.map(|c| { @@ -183,6 +191,7 @@ impl Parser for DltRawParser { &mut self, input: &'b [u8], _timestamp: Option, + _nested: impl FnMut(&'b [u8], ParserAlias) -> Option, ) -> Result<(&'b [u8], Option>), Error> { let (rest, consumed) = dlt_consume_msg(input).map_err(|e| Error::Parse(format!("{e}")))?; let msg = consumed.map(|c| RawMessage { diff --git a/application/apps/indexer/parsers/src/lib.rs b/application/apps/indexer/parsers/src/lib.rs index 1b7fce8efe..493312da57 100644 --- a/application/apps/indexer/parsers/src/lib.rs +++ b/application/apps/indexer/parsers/src/lib.rs @@ -8,6 +8,15 @@ use thiserror::Error; extern crate log; +#[derive(Debug)] +pub enum ParserKind { + SomeIp(someip::SomeipParser), +} + +pub enum ParserAlias { + SomeIp, +} + #[derive(Error, Debug)] pub enum Error { #[error("Parse error: {0}")] @@ -44,6 +53,7 @@ pub trait Parser { &mut self, input: &'a [u8], timestamp: Option, + nested: impl FnMut(&'a [u8], ParserAlias) -> Option, ) -> Result<(&'a [u8], Option>), Error>; } diff --git a/application/apps/indexer/parsers/src/someip.rs b/application/apps/indexer/parsers/src/someip.rs index 1601017d78..aaf1570af2 100644 --- a/application/apps/indexer/parsers/src/someip.rs +++ b/application/apps/indexer/parsers/src/someip.rs @@ -1,4 +1,4 @@ -use crate::{Error, LogMessage, ParseYield, Parser}; +use crate::{Error, LogMessage, ParseYield, Parser, ParserAlias}; use std::{borrow::Cow, fmt, fmt::Display, io::Write, path::PathBuf}; use someip_messages::*; @@ -12,6 +12,7 @@ use log::{debug, error}; use serde::Serialize; /// A parser for SOME/IP log messages. +#[derive(Debug)] pub struct SomeipParser { model: Option, } @@ -53,6 +54,7 @@ impl Parser for SomeipParser { &mut self, input: &'a [u8], timestamp: Option, + _nested: impl FnMut(&'a [u8], ParserAlias) -> Option, ) -> Result<(&'a [u8], Option>), Error> { let time = timestamp.unwrap_or(0); match Message::from_slice(input) { @@ -407,7 +409,11 @@ mod test { ]; let mut parser = SomeipParser::new(); - let (output, message) = parser.parse(input, None).unwrap(); + let (output, message) = parser + .parse(input, None, |_: &[u8], _: ParserAlias| -> Option { + None + }) + .unwrap(); assert!(output.is_empty()); @@ -428,7 +434,11 @@ mod test { ]; let mut parser = SomeipParser::new(); - let (output, message) = parser.parse(input, None).unwrap(); + let (output, message) = parser + .parse(input, None, |_: &[u8], _: ParserAlias| -> Option { + None + }) + .unwrap(); assert!(output.is_empty()); @@ -449,7 +459,11 @@ mod test { ]; let mut parser = SomeipParser::new(); - let (output, message) = parser.parse(input, None).unwrap(); + let (output, message) = parser + .parse(input, None, |_: &[u8], _: ParserAlias| -> Option { + None + }) + .unwrap(); assert!(output.is_empty()); @@ -474,7 +488,11 @@ mod test { let model = test_model(); let mut parser = SomeipParser { model: Some(model) }; - let (output, message) = parser.parse(input, None).unwrap(); + let (output, message) = parser + .parse(input, None, |_: &[u8], _: ParserAlias| -> Option { + None + }) + .unwrap(); assert!(output.is_empty()); @@ -498,7 +516,11 @@ mod test { ]; let mut parser = SomeipParser::new(); - let (output, message) = parser.parse(input, None).unwrap(); + let (output, message) = parser + .parse(input, None, |_: &[u8], _: ParserAlias| -> Option { + None + }) + .unwrap(); assert!(output.is_empty()); @@ -524,7 +546,11 @@ mod test { let model = test_model(); let mut parser = SomeipParser { model: Some(model) }; - let (output, message) = parser.parse(input, None).unwrap(); + let (output, message) = parser + .parse(input, None, |_: &[u8], _: ParserAlias| -> Option { + None + }) + .unwrap(); assert!(output.is_empty()); @@ -550,7 +576,11 @@ mod test { ]; let mut parser = SomeipParser::new(); - let (output, message) = parser.parse(input, None).unwrap(); + let (output, message) = parser + .parse(input, None, |_: &[u8], _: ParserAlias| -> Option { + None + }) + .unwrap(); assert!(output.is_empty()); @@ -592,7 +622,11 @@ mod test { ]; let mut parser = SomeipParser::new(); - let (output, message) = parser.parse(input, None).unwrap(); + let (output, message) = parser + .parse(input, None, |_: &[u8], _: ParserAlias| -> Option { + None + }) + .unwrap(); assert!(output.is_empty()); diff --git a/application/apps/indexer/parsers/src/text.rs b/application/apps/indexer/parsers/src/text.rs index 6a09a52b9c..04bbb75260 100644 --- a/application/apps/indexer/parsers/src/text.rs +++ b/application/apps/indexer/parsers/src/text.rs @@ -1,4 +1,4 @@ -use crate::{Error, LogMessage, ParseYield, Parser}; +use crate::{Error, LogMessage, ParseYield, Parser, ParserAlias}; use serde::Serialize; use std::{fmt, io::Write}; @@ -31,6 +31,7 @@ where &mut self, input: &'b [u8], _timestamp: Option, + _nested: impl FnMut(&'b [u8], ParserAlias) -> Option, ) -> Result<(&'b [u8], Option>), Error> { // TODO: support non-utf8 encodings use memchr::memchr; @@ -58,18 +59,32 @@ where fn test_string_tokenizer() { let mut parser = StringTokenizer {}; let content = b"hello\nworld\n"; - let (rest_1, first_msg) = parser.parse(content, None).unwrap(); + let (rest_1, first_msg) = parser + .parse( + content, + None, + |_: &[u8], _: ParserAlias| -> Option { None }, + ) + .unwrap(); match first_msg { Some(ParseYield::Message(StringMessage { content })) if content.eq("hello") => {} _ => panic!("First message did not match"), } println!("rest_1 = {:?}", String::from_utf8_lossy(rest_1)); - let (rest_2, second_msg) = parser.parse(rest_1, None).unwrap(); + let (rest_2, second_msg) = parser + .parse(rest_1, None, |_: &[u8], _: ParserAlias| -> Option { + None + }) + .unwrap(); match second_msg { Some(ParseYield::Message(StringMessage { content })) if content.eq("world") => {} _ => panic!("Second message did not match"), } - let (rest_3, third_msg) = parser.parse(rest_2, None).unwrap(); + let (rest_3, third_msg) = parser + .parse(rest_2, None, |_: &[u8], _: ParserAlias| -> Option { + None + }) + .unwrap(); println!("rest_3 = {:?}", String::from_utf8_lossy(rest_3)); assert!(third_msg.is_none()); } diff --git a/application/apps/indexer/processor/src/text_source.rs b/application/apps/indexer/processor/src/text_source.rs index 974e3bbfeb..93686c932a 100644 --- a/application/apps/indexer/processor/src/text_source.rs +++ b/application/apps/indexer/processor/src/text_source.rs @@ -258,7 +258,7 @@ impl TextFileSource { file_part: &FilePart, ) -> Result, GrabError> { Ok(String::from_utf8_lossy(read_buf) - .split(|c| c == '\n') + .split('\n') .take(file_part.total_lines - file_part.lines_to_drop) .skip(file_part.lines_to_skip) .map(|s| s.to_string()) diff --git a/application/apps/indexer/session/src/handlers/export_raw.rs b/application/apps/indexer/session/src/handlers/export_raw.rs index e81e20d59b..7f08bec6d7 100644 --- a/application/apps/indexer/session/src/handlers/export_raw.rs +++ b/application/apps/indexer/session/src/handlers/export_raw.rs @@ -144,7 +144,7 @@ async fn export( } else { SomeipParser::new() }; - let mut producer = MessageProducer::new(parser, source, None); + let mut producer = MessageProducer::new(parser, source, Vec::new(), None); export_runner( Box::pin(producer.as_stream()), dest, @@ -163,7 +163,7 @@ async fn export( fmt_options.as_ref(), settings.with_storage_header, ); - let mut producer = MessageProducer::new(parser, source, None); + let mut producer = MessageProducer::new(parser, source, Vec::new(), None); export_runner( Box::pin(producer.as_stream()), dest, @@ -175,7 +175,7 @@ async fn export( .await } ParserType::Text => { - let mut producer = MessageProducer::new(StringTokenizer {}, source, None); + let mut producer = MessageProducer::new(StringTokenizer {}, source, Vec::new(), None); export_runner( Box::pin(producer.as_stream()), dest, diff --git a/application/apps/indexer/session/src/handlers/observing/mod.rs b/application/apps/indexer/session/src/handlers/observing/mod.rs index 48d8b1a250..b6be1b0a1e 100644 --- a/application/apps/indexer/session/src/handlers/observing/mod.rs +++ b/application/apps/indexer/session/src/handlers/observing/mod.rs @@ -10,7 +10,7 @@ use parsers::{ dlt::{fmt::FormatOptions, DltParser}, someip::SomeipParser, text::StringTokenizer, - LogMessage, MessageStreamItem, ParseYield, Parser, + LogMessage, MessageStreamItem, ParseYield, Parser, ParserKind, }; use sources::{ factory::ParserType, @@ -85,11 +85,11 @@ async fn run_source_intern( } None => SomeipParser::new(), }; - let producer = MessageProducer::new(someip_parser, source, rx_sde); + let producer = MessageProducer::new(someip_parser, source, Vec::new(), rx_sde); run_producer(operation_api, state, source_id, producer, rx_tail).await } ParserType::Text => { - let producer = MessageProducer::new(StringTokenizer {}, source, rx_sde); + let producer = MessageProducer::new(StringTokenizer {}, source, Vec::new(), rx_sde); run_producer(operation_api, state, source_id, producer, rx_tail).await } ParserType::Dlt(settings) => { @@ -100,7 +100,14 @@ async fn run_source_intern( fmt_options.as_ref(), settings.with_storage_header, ); - let producer = MessageProducer::new(dlt_parser, source, rx_sde); + let nested: Vec = + vec![ParserKind::SomeIp(match &settings.fibex_file_paths { + Some(paths) => { + SomeipParser::from_fibex_files(paths.iter().map(PathBuf::from).collect()) + } + None => SomeipParser::new(), + })]; + let producer = MessageProducer::new(dlt_parser, source, nested, rx_sde); run_producer(operation_api, state, source_id, producer, rx_tail).await } } diff --git a/application/apps/indexer/sources/src/producer.rs b/application/apps/indexer/sources/src/producer.rs index 6e1e99e8d4..4d17d04dde 100644 --- a/application/apps/indexer/sources/src/producer.rs +++ b/application/apps/indexer/sources/src/producer.rs @@ -1,7 +1,10 @@ use crate::{sde::SdeMsg, ByteSource, ReloadInfo, SourceFilter}; use async_stream::stream; use log::warn; -use parsers::{Error as ParserError, LogMessage, MessageStreamItem, Parser}; +use parsers::{ + Error as ParserError, LogMessage, MessageStreamItem, ParseYield, Parser, ParserAlias, + ParserKind, +}; use std::marker::PhantomData; use tokio::{ select, @@ -27,6 +30,7 @@ where byte_source: D, index: usize, parser: P, + nested: Vec, filter: Option, last_seen_ts: Option, _phantom_data: Option>, @@ -38,11 +42,12 @@ where impl, D: ByteSource> MessageProducer { /// create a new producer by plugging into a byte source - pub fn new(parser: P, source: D, rx_sde: Option) -> Self { + pub fn new(parser: P, source: D, nested: Vec, rx_sde: Option) -> Self { MessageProducer { byte_source: source, index: 0, parser, + nested, filter: None, last_seen_ts: None, _phantom_data: None, @@ -124,9 +129,30 @@ impl, D: ByteSource> MessageProducer { self.done = true; return Some((0, MessageStreamItem::Done)); } + let parsers = &mut self.nested; + let nested = |data: &[u8], target: ParserAlias| -> Option { + match target { + ParserAlias::SomeIp => { + let parser = parsers.iter_mut().find_map(|parser| match parser { + ParserKind::SomeIp(parser) => Some(parser), + #[allow(unreachable_patterns)] + _ => None, + })?; + let (_, Some(ParseYield::Message(item))) = parser + .parse(data, None, |_: &[u8], _: ParserAlias| -> Option { + None + }) + .ok()? + else { + return None; + }; + Some(item.to_string()) + } + } + }; match self .parser - .parse(self.byte_source.current_slice(), self.last_seen_ts) + .parse(self.byte_source.current_slice(), self.last_seen_ts, nested) { Ok((rest, Some(m))) => { let consumed = available - rest.len();