Skip to content

Commit

Permalink
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
Add someip statistics api to unbound session
Browse files Browse the repository at this point in the history
kruss committed Aug 4, 2023
1 parent 434ebbc commit eb9cb6e
Showing 6 changed files with 418 additions and 40 deletions.
3 changes: 3 additions & 0 deletions application/apps/indexer/Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

3 changes: 3 additions & 0 deletions application/apps/indexer/parsers/Cargo.toml
Original file line number Diff line number Diff line change
@@ -10,14 +10,17 @@ crossbeam-channel = "0.5"
chrono = "0.4"
chrono-tz = "0.8"
dlt-core = "0.14"
etherparse = "0.13"
humantime = "2.1"
lazy_static = "1.4"
log = "0.4.17"
memchr = "2.4"
serde = { version = "1.0", features = ["derive"] }
thiserror = "1.0"
tokio-util = "0.7"
pcap-parser = "0.14"
rand = "0.8.5"
rustc-hash = "1.1"
# someip-messages = { path = "../../../../../someip"}
someip-messages = { git = "https://github.com/esrlabs/someip" }
# someip-payload = { path = "../../../../../someip-payload" }
376 changes: 375 additions & 1 deletion application/apps/indexer/parsers/src/someip.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
use crate::{Error, LogMessage, ParseYield, Parser};
use std::{borrow::Cow, fmt, fmt::Display, io::Write, path::PathBuf};
use std::{borrow::Cow, fmt, fmt::Display, fs::File, io::Write, path::Path, path::PathBuf};
use tokio_util::sync::CancellationToken;

use someip_messages::*;
use someip_payload::{
@@ -11,6 +12,9 @@ use someip_payload::{
use log::{debug, error};
use serde::Serialize;

use pcap_parser::{traits::PcapReaderIterator, PcapBlockOwned, PcapError, PcapNGReader};
use rustc_hash::FxHashMap;

/// A parser for SOME/IP log messages.
pub struct SomeipParser {
model: Option<FibexModel>,
@@ -337,6 +341,250 @@ impl Display for SomeipLogMessage {
}
}

/// Represents the statistic of a SOME/IP trace.
#[derive(Serialize, Debug, PartialEq, Clone)]
pub struct SomeipStatistic {
/** Statistic on service-ids and related method-ids */
pub services: Vec<SomeipStatisticItem>,
/** Statistic on message-types and related return-codes */
pub messages: Vec<SomeipStatisticItem>,
}

#[derive(Serialize, Debug, PartialEq, Clone)]
pub struct SomeipStatisticItem {
pub item: SomeipStatisticDetail,
pub details: Vec<SomeipStatisticDetail>,
}

#[derive(Serialize, Debug, PartialEq, Clone)]
pub struct SomeipStatisticDetail {
pub id: usize,
pub num: usize,
}

impl SomeipStatistic {
pub fn new() -> Self {
Self {
services: vec![],
messages: vec![],
}
}

pub fn merge(&mut self, incomes: Self) {
Self::merge_items(&mut self.services, &incomes.services);
Self::merge_items(&mut self.messages, &incomes.messages);
}

fn merge_items(owner: &mut Vec<SomeipStatisticItem>, incomes: &[SomeipStatisticItem]) {
incomes.iter().for_each(|income_item| {
if let Some(existed_item) = owner
.iter_mut()
.find(|owner_item| owner_item.item.id == income_item.item.id)
{
existed_item.item.num += income_item.item.num;
Self::merge_details(&mut existed_item.details, &income_item.details);
} else {
owner.push(SomeipStatisticItem {
item: SomeipStatisticDetail {
id: income_item.item.id,
num: income_item.item.num,
},
details: income_item.details.clone(),
});
}
});
}

fn merge_details(owner: &mut Vec<SomeipStatisticDetail>, incomes: &[SomeipStatisticDetail]) {
incomes.iter().for_each(|income_detail| {
if let Some(existed_detail) = owner
.iter_mut()
.find(|owner_detail| owner_detail.id == income_detail.id)
{
existed_detail.num += income_detail.num;
} else {
owner.push(SomeipStatisticDetail {
id: income_detail.id,
num: income_detail.num,
});
}
});
}
}

impl Default for SomeipStatistic {
fn default() -> Self {
Self::new()
}
}

pub fn read_someip_statistic_from_pcapng(
path: &Path,
cancel: &CancellationToken,
) -> Result<SomeipStatistic, Error> {
let mut services: StatisticItemMap = FxHashMap::default();
let mut messages: StatisticItemMap = FxHashMap::default();
let mut error: Option<String> = None;

match File::open(path) {
Ok(file) => {
let mut reader =
PcapNGReader::new(65536, file).map_err(|e| Error::Parse(format!("{e}")))?;

loop {
if cancel.is_cancelled() {
break;
}
match reader.next() {
Ok((offset, block)) => {
match block {
PcapBlockOwned::NG(pcap_parser::Block::EnhancedPacket(ref epb)) => {
if let Err(e) = read_someip_statistic_from_pcapng_block(
epb.data,
&mut services,
&mut messages,
) {
error = Some(e.to_string());
break;
}
}
PcapBlockOwned::NG(pcap_parser::Block::SimplePacket(ref spb)) => {
if let Err(e) = read_someip_statistic_from_pcapng_block(
spb.data,
&mut services,
&mut messages,
) {
error = Some(e.to_string());
break;
}
}
_ => {
// skipped
}
}
reader.consume(offset);
}
Err(PcapError::Eof) => {
break;
}
Err(PcapError::Incomplete) => {
reader.refill().expect("pcapng refill failed");
// continue;
}
Err(e) => {
let msg = e.to_string();
error!("pcapng at offset {} : {}", reader.consumed(), msg);
error = Some(msg);
break;
}
}
}
}
Err(e) => {
error = Some(e.to_string());
}
}

if let Some(err) = error {
return Err(Error::Parse(err.to_string()));
}

let result = SomeipStatistic {
services: map_statistic(&services),
messages: map_statistic(&messages),
};

Ok(result)
}

type StatisticItemMap = FxHashMap<usize, (usize, StatisticDetailMap)>;
type StatisticDetailMap = FxHashMap<usize, usize>;

fn add_statistic(item_map: &mut StatisticItemMap, item_id: usize, detail_id: usize) {
if let Some((item_value, detail_map)) = item_map.get_mut(&item_id) {
*item_value += 1;
if let Some(detail_value) = detail_map.get_mut(&detail_id) {
*detail_value += 1;
} else {
detail_map.insert(detail_id, 1);
}
} else {
let mut detail_map: StatisticDetailMap = FxHashMap::default();
detail_map.insert(detail_id, 1);
item_map.insert(item_id, (1, detail_map));
}
}

fn map_statistic(item_map: &StatisticItemMap) -> Vec<SomeipStatisticItem> {
let mut item_vec: Vec<SomeipStatisticItem> = Vec::new();

for (item_id, (item_value, detail_map)) in item_map.iter() {
let mut item_statistic = SomeipStatisticItem {
item: SomeipStatisticDetail {
id: *item_id,
num: *item_value,
},
details: Vec::new(),
};

for (detail_id, detail_value) in detail_map.iter() {
item_statistic.details.push(SomeipStatisticDetail {
id: *detail_id,
num: *detail_value,
});
}
item_vec.push(item_statistic);
}

item_vec
}

fn read_someip_statistic_from_pcapng_block(
data: &[u8],
services: &mut StatisticItemMap,
messages: &mut StatisticItemMap,
) -> Result<(), Error> {
match etherparse::SlicedPacket::from_ethernet(data) {
Ok(value) => {
let payload = value.payload;
let total_len = payload.len();
let mut offset: usize = 0;
while total_len - offset >= Header::LENGTH {
match Header::from_slice(payload) {
Ok(header) => {
let message_len = header.message_len();
if total_len - offset >= message_len {
debug!("read someip statistic: {:?}", header.message_id());
add_statistic(
services,
header.message_id.service_id as usize,
header.message_id.method_id as usize,
);
add_statistic(
messages,
u8::from(header.message_type()) as usize,
u8::from(header.return_code()) as usize,
);
offset += message_len;
} else {
return Err(Error::Parse(format!(
"incomplete message ({} / {} bytes)",
message_len,
total_len - offset
)));
}
}
Err(e) => {
return Err(Error::Parse(e.to_string()));
}
}
}
Ok(())
}
Err(e) => Err(Error::Parse(e.to_string())),
}
}

#[cfg(test)]
mod test {
use super::*;
@@ -604,4 +852,130 @@ mod test {
panic!("unexpected parse yield");
}
}

#[test]
fn test_merge_statistics() {
let mut s1 = SomeipStatistic::new();
s1.services.push(SomeipStatisticItem {
item: SomeipStatisticDetail { id: 10001, num: 1 },
details: vec![
SomeipStatisticDetail { id: 1, num: 1 },
SomeipStatisticDetail { id: 2, num: 2 },
],
});
s1.messages.push(SomeipStatisticItem {
item: SomeipStatisticDetail { id: 10, num: 5 },
details: vec![
SomeipStatisticDetail { id: 0, num: 3 },
SomeipStatisticDetail { id: 1, num: 2 },
],
});

let mut s2 = SomeipStatistic::new();
s2.services.push(SomeipStatisticItem {
item: SomeipStatisticDetail { id: 10001, num: 1 },
details: vec![
SomeipStatisticDetail { id: 1, num: 3 },
SomeipStatisticDetail { id: 3, num: 3 },
],
});
s2.services.push(SomeipStatisticItem {
item: SomeipStatisticDetail { id: 10002, num: 3 },
details: vec![SomeipStatisticDetail { id: 5, num: 7 }],
});
s2.messages.push(SomeipStatisticItem {
item: SomeipStatisticDetail { id: 10, num: 1 },
details: vec![SomeipStatisticDetail { id: 9, num: 1 }],
});
s2.messages.push(SomeipStatisticItem {
item: SomeipStatisticDetail { id: 11, num: 3 },
details: vec![
SomeipStatisticDetail { id: 0, num: 1 },
SomeipStatisticDetail { id: 9, num: 2 },
],
});

s1.merge(s2);

assert_eq!(
s1,
SomeipStatistic {
services: [
SomeipStatisticItem {
item: SomeipStatisticDetail { id: 10001, num: 2 },
details: [
SomeipStatisticDetail { id: 1, num: 4 },
SomeipStatisticDetail { id: 2, num: 2 },
SomeipStatisticDetail { id: 3, num: 3 }
]
.to_vec()
},
SomeipStatisticItem {
item: SomeipStatisticDetail { id: 10002, num: 3 },
details: [SomeipStatisticDetail { id: 5, num: 7 }].to_vec()
}
]
.to_vec(),
messages: [
SomeipStatisticItem {
item: SomeipStatisticDetail { id: 10, num: 6 },
details: [
SomeipStatisticDetail { id: 0, num: 3 },
SomeipStatisticDetail { id: 1, num: 2 },
SomeipStatisticDetail { id: 9, num: 1 }
]
.to_vec()
},
SomeipStatisticItem {
item: SomeipStatisticDetail { id: 11, num: 3 },
details: [
SomeipStatisticDetail { id: 0, num: 1 },
SomeipStatisticDetail { id: 9, num: 2 }
]
.to_vec()
}
]
.to_vec()
}
);
}

const SOMEIP_PCAPNG_FILE: &str = "../../../../application/developing/resources/someip.pcapng";

#[test]
fn test_read_someip_statistic_from_pcapng() {
let _ = env_logger::try_init();

let path = Path::new(SOMEIP_PCAPNG_FILE);
let cancel = CancellationToken::new();

match read_someip_statistic_from_pcapng(&path, &cancel) {
Ok(statistic) => {
assert_eq!(
statistic,
SomeipStatistic {
services: [
SomeipStatisticItem {
item: SomeipStatisticDetail { id: 123, num: 22 },
details: [SomeipStatisticDetail { id: 32773, num: 22 }].to_vec()
},
SomeipStatisticItem {
item: SomeipStatisticDetail { id: 65535, num: 33 },
details: [SomeipStatisticDetail { id: 33024, num: 33 }].to_vec()
}
]
.to_vec(),
messages: [SomeipStatisticItem {
item: SomeipStatisticDetail { id: 2, num: 55 },
details: [SomeipStatisticDetail { id: 0, num: 55 }].to_vec()
}]
.to_vec()
}
);
}
Err(error) => {
panic!("{}", format!("{error}"));
}
}
}
}
71 changes: 33 additions & 38 deletions application/apps/indexer/session/src/unbound/commands/someip.rs
Original file line number Diff line number Diff line change
@@ -1,44 +1,39 @@
use super::CommandOutcome;
use crate::{events::ComputationError, unbound::signal::Signal};
use std::path::Path;

use parsers::someip::{read_someip_statistic_from_pcapng, SomeipStatistic};

pub fn get_someip_statistic(
_files: Vec<String>,
_signal: Signal,
files: Vec<String>,
signal: Signal,
) -> Result<CommandOutcome<String>, ComputationError> {
Err(ComputationError::OperationNotSupported("NYI".into()))
// use parsers::someip::{read_someip_statistic_from_pcapng, SomeipStatistic};
// use log::{error, warn};
// use std::path::Path;

// let mut statistic = SomeipStatistic::new();
// let mut error: Option<String> = None;
// warn!("Getting statistic for: {files:?}");
// files.iter().for_each(|file| {
// if error.is_some() {
// return;
// }
// if signal.is_cancelling() {
// return;
// }
// match read_someip_statistic_from_pcapng(Path::new(&file), &signal.token()) {
// Ok(result) => {
// statistic.merge(result);
// }
// Err(err) => {
// error = Some(err.to_string());
// }
// }
// });
// if let Some(err) = error {
// error!("Fail to get statistic for: {files:?}");
// return Err(ComputationError::IoOperation(err));
// }
// if signal.is_cancelling() {
// warn!("Operation of geting statistic for: {files:?} has been cancelled");
// return Ok(CommandOutcome::Cancelled);
// }
// Ok(CommandOutcome::Finished(
// serde_json::to_string(&statistic)
// .map_err(|e| ComputationError::IoOperation(e.to_string()))?,
// ))
let mut statistic = SomeipStatistic::new();
let mut error: Option<String> = None;
files.iter().for_each(|file| {
if error.is_some() {
return;
}
if signal.is_cancelling() {
return;
}
match read_someip_statistic_from_pcapng(Path::new(&file), &signal.token()) {
Ok(result) => {
statistic.merge(result);
}
Err(err) => {
error = Some(err.to_string());
}
}
});
if let Some(err) = error {
return Err(ComputationError::IoOperation(err));
}
if signal.is_cancelling() {
return Ok(CommandOutcome::Cancelled);
}
Ok(CommandOutcome::Finished(
serde_json::to_string(&statistic)
.map_err(|e| ComputationError::IoOperation(e.to_string()))?,
))
}
3 changes: 3 additions & 0 deletions application/apps/rustcore/rs-bindings/Cargo.lock
2 changes: 1 addition & 1 deletion application/apps/rustcore/ts-bindings/spec/defaults.json
Original file line number Diff line number Diff line change
@@ -58,7 +58,7 @@
},
"jobs": {
"regular": {
"execute_only": [1,2,3,4,5],
"execute_only": [],
"list": {
"1": "Test 1. Cancelation testing",
"2": "Test 2. Wrong sequence test",

0 comments on commit eb9cb6e

Please sign in to comment.