From 9db429e120a1997329ee60ea7108132803f91e88 Mon Sep 17 00:00:00 2001 From: "kevin.russ" Date: Mon, 27 Jan 2025 11:05:05 +0100 Subject: [PATCH] Add bufread as an addon --- application/apps/indexer/Cargo.lock | 50 ++- application/apps/indexer/Cargo.toml | 6 +- .../apps/indexer/addons/bufread/Cargo.toml | 19 + .../apps/indexer/addons/bufread/README.md | 2 + .../addons/bufread/benches/bench_config.rs | 23 ++ .../addons/bufread/benches/buffer_benches.rs | 62 +++ .../addons/bufread/benches/reader_benches.rs | 46 +++ .../apps/indexer/addons/bufread/src/lib.rs | 390 ++++++++++++++++++ .../indexer/addons/bufread/tests/proto.rs | 153 +++++++ .../addons/bufread/tests/proto_tests.rs | 58 +++ .../addons/bufread/tests/reader_proptest.rs | 53 +++ .../apps/indexer/addons/text_grep/Cargo.toml | 2 +- application/apps/indexer/processor/Cargo.toml | 2 +- application/apps/indexer/sources/Cargo.toml | 2 +- application/apps/indexer/stypes/Cargo.toml | 2 +- application/apps/protocol/Cargo.lock | 3 +- .../apps/rustcore/rs-bindings/Cargo.lock | 45 +- 17 files changed, 902 insertions(+), 16 deletions(-) create mode 100644 application/apps/indexer/addons/bufread/Cargo.toml create mode 100644 application/apps/indexer/addons/bufread/README.md create mode 100644 application/apps/indexer/addons/bufread/benches/bench_config.rs create mode 100644 application/apps/indexer/addons/bufread/benches/buffer_benches.rs create mode 100644 application/apps/indexer/addons/bufread/benches/reader_benches.rs create mode 100644 application/apps/indexer/addons/bufread/src/lib.rs create mode 100644 application/apps/indexer/addons/bufread/tests/proto.rs create mode 100644 application/apps/indexer/addons/bufread/tests/proto_tests.rs create mode 100644 application/apps/indexer/addons/bufread/tests/reader_proptest.rs diff --git a/application/apps/indexer/Cargo.lock b/application/apps/indexer/Cargo.lock index e913d1af9..a8c5802ca 100644 --- a/application/apps/indexer/Cargo.lock +++ b/application/apps/indexer/Cargo.lock @@ -447,10 +447,25 @@ dependencies = [ "serde", ] +[[package]] +name = "buf_redux" +version = "0.8.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b953a6887648bb07a535631f2bc00fbdb2a2216f135552cb3f534ed136b9c07f" +dependencies = [ + "memchr", + "safemem", + "slice-deque", +] + [[package]] name = "bufread" -version = "0.1.0" -source = "git+https://github.com/kruss/bufread#5dec80376c0cbbbf3c4f864cf88b965cceb67c3b" +version = "0.1.1" +dependencies = [ + "criterion", + "proptest", + "rand", +] [[package]] name = "bumpalo" @@ -892,9 +907,10 @@ dependencies = [ [[package]] name = "dlt-core" version = "0.18.0" -source = "git+https://github.com/kruss/dlt-core.git?branch=remove_buf_redux#87c7cea8be5224790ba8457ff531b4b6804c1298" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0b304e32f1164b8c2ef1dc746b32d321f25f88a32672f0f5bcba2df0f70a3b70" dependencies = [ - "bufread", + "buf_redux", "byteorder", "bytes", "derive_more", @@ -1628,6 +1644,15 @@ dependencies = [ "value-bag", ] +[[package]] +name = "mach" +version = "0.2.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "86dd2487cdfea56def77b88438a2c915fb45113c5319bfe7e14306ca4cd0b0e1" +dependencies = [ + "libc", +] + [[package]] name = "mach2" version = "0.4.2" @@ -2475,6 +2500,12 @@ version = "1.0.18" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "f3cb5ba0dc43242ce17de99c180e96db90b235b8a9fdc9543c96d2209116bd9f" +[[package]] +name = "safemem" +version = "0.3.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ef703b7cb59335eae2eb93ceb664c0eb7ea6bf567079d843e09420219668e072" + [[package]] name = "same-file" version = "1.0.6" @@ -2635,6 +2666,17 @@ dependencies = [ "autocfg", ] +[[package]] +name = "slice-deque" +version = "0.2.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ffddf594f5f597f63533d897427a570dbaa9feabaaa06595b74b71b7014507d7" +dependencies = [ + "libc", + "mach", + "winapi", +] + [[package]] name = "smallvec" version = "1.13.2" diff --git a/application/apps/indexer/Cargo.toml b/application/apps/indexer/Cargo.toml index 8bd797d89..1b04260d2 100644 --- a/application/apps/indexer/Cargo.toml +++ b/application/apps/indexer/Cargo.toml @@ -6,6 +6,7 @@ members = [ "addons/someip-tools", "addons/file-tools", "addons/text_grep", + "addons/bufread", "indexer_base", "indexer_cli", "merging", @@ -24,12 +25,10 @@ thiserror = "2.0" lazy_static = "1.5" tokio = { version = "1", features = ["full"] } tokio-stream = "0.1" -#dlt-core = "0.18.0" -dlt-core = { git = "https://github.com/kruss/dlt-core.git", branch = "remove_buf_redux" } +dlt-core = "0.18.0" crossbeam-channel = "0.5" futures = "0.3" tokio-util = "0.7" -bufread = { git = "https://github.com/kruss/bufread" } regex = "1" grep-regex = "0.1" rand = "0.8" @@ -45,6 +44,7 @@ envvars = "0.1" # Support for `html_reports` needs running the benchmarks via `cargo-criterion` tool. criterion = { version = "0.5", features = ["html_reports"] } insta = { version = "1.41", features = ["yaml"] } +proptest = "1.6" # `insta` crate and its dependency `similar` will be compiled once and run many times, # therefore it's suggested to compile them with more optimizations for faster runs. diff --git a/application/apps/indexer/addons/bufread/Cargo.toml b/application/apps/indexer/addons/bufread/Cargo.toml new file mode 100644 index 000000000..64cda961b --- /dev/null +++ b/application/apps/indexer/addons/bufread/Cargo.toml @@ -0,0 +1,19 @@ +[package] +name = "bufread" +version = "0.1.1" +edition = "2021" + +[dependencies] + +[dev-dependencies] +criterion.workspace = true +proptest.workspace = true +rand.workspace = true + +[[bench]] +name = "buffer_benches" +harness = false + +[[bench]] +name = "reader_benches" +harness = false diff --git a/application/apps/indexer/addons/bufread/README.md b/application/apps/indexer/addons/bufread/README.md new file mode 100644 index 000000000..901eec8c5 --- /dev/null +++ b/application/apps/indexer/addons/bufread/README.md @@ -0,0 +1,2 @@ +# bufread +A buffered reader implementation in Rust with special behavior. diff --git a/application/apps/indexer/addons/bufread/benches/bench_config.rs b/application/apps/indexer/addons/bufread/benches/bench_config.rs new file mode 100644 index 000000000..3fbc00d7d --- /dev/null +++ b/application/apps/indexer/addons/bufread/benches/bench_config.rs @@ -0,0 +1,23 @@ +// Copyright (c) 2025 ESR Labs GmbH. All rights reserved. +// +// NOTICE: All information contained herein is, and remains +// the property of E.S.R.Labs and its suppliers, if any. +// The intellectual and technical concepts contained herein are +// proprietary to E.S.R.Labs and its suppliers and may be covered +// by German and Foreign Patents, patents in process, and are protected +// by trade secret or copyright law. +// Dissemination of this information or reproduction of this material +// is strictly forbidden unless prior written permission is obtained +// from E.S.R.Labs. + +use criterion::Criterion; +use std::time::Duration; + +pub fn bench_config(sample_size: usize) -> Criterion { + Criterion::default() + .warm_up_time(Duration::from_secs(5)) + .measurement_time(Duration::from_secs(10)) + .sample_size(sample_size) + .significance_level(0.01) + .noise_threshold(0.05) +} diff --git a/application/apps/indexer/addons/bufread/benches/buffer_benches.rs b/application/apps/indexer/addons/bufread/benches/buffer_benches.rs new file mode 100644 index 000000000..a8106f839 --- /dev/null +++ b/application/apps/indexer/addons/bufread/benches/buffer_benches.rs @@ -0,0 +1,62 @@ +// Copyright (c) 2025 ESR Labs GmbH. All rights reserved. +// +// NOTICE: All information contained herein is, and remains +// the property of E.S.R.Labs and its suppliers, if any. +// The intellectual and technical concepts contained herein are +// proprietary to E.S.R.Labs and its suppliers and may be covered +// by German and Foreign Patents, patents in process, and are protected +// by trade secret or copyright law. +// Dissemination of this information or reproduction of this material +// is strictly forbidden unless prior written permission is obtained +// from E.S.R.Labs. + +use criterion::{criterion_group, criterion_main, Criterion}; +use std::hint::black_box; + +#[path = "./bench_config.rs"] +mod bench_config; +use bench_config::bench_config; + +use bufread::DeqBuffer; + +fn write_buffer(buffer: &mut DeqBuffer, src: &[u8]) { + while buffer.write_available() >= src.len() { + buffer.write_from(src); + } +} + +fn write_buffer_benchmark(c: &mut Criterion) { + let mut buffer = DeqBuffer::new(50 * 1000); + let src: [u8; 100] = [1; 100]; + + c.bench_function("write_buffer", |b| { + b.iter(|| write_buffer(black_box(&mut buffer), black_box(&src))) + }); +} + +fn write_read_buffer(buffer: &mut DeqBuffer, src: &[u8], dst: &mut [u8]) { + while buffer.write_available() >= src.len() { + buffer.write_from(src); + } + while buffer.read_available() != 0 { + buffer.read_to(dst); + } +} + +fn write_read_buffer_benchmark(c: &mut Criterion) { + let mut buffer = DeqBuffer::new(50 * 1000); + let src: [u8; 100] = [1; 100]; + let mut dst: [u8; 100] = [1; 100]; + + c.bench_function("write_read_buffer", |b| { + b.iter(|| write_read_buffer(black_box(&mut buffer), black_box(&src), black_box(&mut dst))) + }); +} + +criterion_group! { + name = benches; + config = bench_config(1000); + targets = write_buffer_benchmark, write_read_buffer_benchmark +} + +criterion_main!(benches); diff --git a/application/apps/indexer/addons/bufread/benches/reader_benches.rs b/application/apps/indexer/addons/bufread/benches/reader_benches.rs new file mode 100644 index 000000000..703b77ed8 --- /dev/null +++ b/application/apps/indexer/addons/bufread/benches/reader_benches.rs @@ -0,0 +1,46 @@ +#![allow(dead_code)] + +// Copyright (c) 2025 ESR Labs GmbH. All rights reserved. +// +// NOTICE: All information contained herein is, and remains +// the property of E.S.R.Labs and its suppliers, if any. +// The intellectual and technical concepts contained herein are +// proprietary to E.S.R.Labs and its suppliers and may be covered +// by German and Foreign Patents, patents in process, and are protected +// by trade secret or copyright law. +// Dissemination of this information or reproduction of this material +// is strictly forbidden unless prior written permission is obtained +// from E.S.R.Labs. + +use criterion::{criterion_group, criterion_main, Criterion}; +use std::hint::black_box; + +#[path = "./bench_config.rs"] +mod bench_config; +use bench_config::bench_config; + +#[path = "../tests/proto.rs"] +mod proto; + +use bufread::BufReader; +use proto::{Parser, Source, MAX_PACKET_LEN}; + +fn proto_benchmark(c: &mut Criterion) { + let source_min_size = 100 * MAX_PACKET_LEN; + let buffer_max_size = 3 * MAX_PACKET_LEN; + let buffer_min_size = MAX_PACKET_LEN; + + let source = Source::fixed(source_min_size); + let reader = BufReader::new(buffer_max_size, buffer_min_size, source.data()); + let mut parser = Parser::new(reader); + + c.bench_function("proto", |b| b.iter(|| Parser::run(black_box(&mut parser)))); +} + +criterion_group! { + name = benches; + config = bench_config(1000); + targets = proto_benchmark +} + +criterion_main!(benches); diff --git a/application/apps/indexer/addons/bufread/src/lib.rs b/application/apps/indexer/addons/bufread/src/lib.rs new file mode 100644 index 000000000..216fd199d --- /dev/null +++ b/application/apps/indexer/addons/bufread/src/lib.rs @@ -0,0 +1,390 @@ +// Copyright (c) 2025 ESR Labs GmbH. All rights reserved. +// +// NOTICE: All information contained herein is, and remains +// the property of E.S.R.Labs and its suppliers, if any. +// The intellectual and technical concepts contained herein are +// proprietary to E.S.R.Labs and its suppliers and may be covered +// by German and Foreign Patents, patents in process, and are protected +// by trade secret or copyright law. +// Dissemination of this information or reproduction of this material +// is strictly forbidden unless prior written permission is obtained +// from E.S.R.Labs. + +use std::{ + cmp::min, + io::{BufRead, Read, Result, Seek, SeekFrom}, + ptr::copy, +}; + +/// A buffered reader that could refill while still some remaining data is stored. +pub struct BufReader { + reader: R, + buffer: DeqBuffer, + min: usize, +} + +impl BufReader { + /// Creates a new reader with the given maximum space and minimum bytes to be buffered ahead. + pub fn new(max: usize, min: usize, reader: R) -> Self { + BufReader { + reader, + buffer: DeqBuffer::new(max), + min, + } + } + + /// Returns the total capacity of the inner buffer. + pub fn capacity(&self) -> usize { + self.buffer.capacity() + } + + /// Returns the number of currently available bytes of the inner buffer. + pub fn len(&self) -> usize { + self.buffer.read_available() + } + + /// Answers if the inner buffer is currently empty. + pub fn is_empty(&self) -> bool { + self.len() == 0 + } + + /// Returns the current slice to read from of the inner buffer. + pub fn buffer(&self) -> &[u8] { + self.buffer.read_slice() + } +} + +impl Read for BufReader { + fn read(&mut self, buffer: &mut [u8]) -> Result { + if self.buffer.write_available() < self.min { + self.buffer.flush(); + } + + let size = self.fill_buf()?.read(buffer)?; + self.consume(size); + Ok(size) + } +} + +impl BufRead for BufReader { + fn fill_buf(&mut self) -> Result<&[u8]> { + if self.buffer.read_available() < self.min { + self.buffer.flush(); + } + + if let Ok(size) = self.reader.read(self.buffer.write_slice()) { + self.buffer.write_done(size); + } + + Ok(self.buffer.read_slice()) + } + + fn consume(&mut self, size: usize) { + self.buffer.read_done(size); + } +} + +impl Seek for BufReader { + fn seek(&mut self, pos: SeekFrom) -> Result { + let result: u64; + + if let SeekFrom::Current(pos) = pos { + result = self + .reader + .seek(SeekFrom::Current(pos - self.len() as i64))?; + } else { + result = self.reader.seek(pos)?; + } + + self.buffer.clear(); + Ok(result) + } +} + +/// A fixed buffer that could be used as a byte dequeue. +pub struct DeqBuffer { + slice: Box<[u8]>, + start: usize, + end: usize, +} + +impl DeqBuffer { + /// Creates a new buffer with the given maximum space. + pub fn new(size: usize) -> Self { + let mut vec = Vec::with_capacity(size); + let cap = vec.capacity(); + unsafe { + vec.set_len(cap); + } + + DeqBuffer { + slice: vec.into(), + start: 0, + end: 0, + } + } + + /// Returns the total capacity of the buffer. + pub fn capacity(&self) -> usize { + self.slice.len() + } + + /// Clears the buffer by consuming all available bytes. + pub fn clear(&mut self) -> usize { + let size = self.read_available(); + self.read_done(size) + } + + /// Reads from this buffer into the given output + /// and returns the number of bytes being read. + pub fn read_to(&mut self, buffer: &mut [u8]) -> usize { + let size = min(self.read_available(), buffer.len()); + buffer[..size].copy_from_slice(&self.read_slice()[..size]); + self.read_done(size) + } + + /// Returns the number of currently available bytes for reading. + pub fn read_available(&self) -> usize { + self.end - self.start + } + + /// Returns the current slice to read from. + pub fn read_slice(&self) -> &[u8] { + &self.slice[self.start..self.end] + } + + /// Sets the amount of newly read bytes. + pub fn read_done(&mut self, size: usize) -> usize { + let before = self.start; + + self.start = min(self.start + size, self.slice.len()); + self.start - before + } + + /// Writes from the given input into this buffer + /// and returns the number of bytes being written. + pub fn write_from(&mut self, buffer: &[u8]) -> usize { + let size = min(self.write_available(), buffer.len()); + self.write_slice()[..size].copy_from_slice(&buffer[..size]); + self.write_done(size) + } + + /// Returns the number of currently available bytes for writing. + pub fn write_available(&self) -> usize { + self.slice.len() - self.end + } + + /// Returns the current slice to write to. + pub fn write_slice(&mut self) -> &mut [u8] { + &mut self.slice[self.end..] + } + + /// Sets the amount of newly written bytes. + pub fn write_done(&mut self, size: usize) -> usize { + let before = self.end; + + self.end = min(self.end + size, self.slice.len()); + self.end - before + } + + /// Moves any remaining bytes within the buffer to its front. + pub fn flush(&mut self) -> usize { + let before = self.start; + + if self.start != 0 { + unsafe { + copy( + self.read_slice().as_ptr(), + self.slice.as_mut_ptr(), + self.read_available(), + ); + } + self.end -= self.start; + self.start = 0; + } + + before + } +} + +#[cfg(test)] +mod tests { + use super::*; + use std::io::Cursor; + + #[test] + fn test_buffer() { + let max_size = 1000; + let chunk_size = 100; + + let mut buffer = DeqBuffer::new(max_size); + assert_eq!(max_size, buffer.capacity()); + assert_eq!(max_size, buffer.write_available()); + assert_eq!(0, buffer.read_available()); + + // write first chunk + let chunk: [u8; 100] = [1; 100]; + assert_eq!(chunk_size, buffer.write_from(&chunk)); + assert_eq!(max_size - chunk_size, buffer.write_available()); + assert_eq!(chunk_size, buffer.read_available()); + + // read first chunk + let mut temp: [u8; 100] = [0; 100]; + assert_eq!(chunk_size, buffer.read_to(&mut temp)); + assert_eq!(chunk, temp); + assert_eq!(max_size - chunk_size, buffer.write_available()); + assert_eq!(0, buffer.read_available()); + + // write second chunk + let chunk: [u8; 100] = [2; 100]; + assert_eq!(chunk_size, buffer.write_from(&chunk)); + assert_eq!(max_size - chunk_size * 2, buffer.write_available()); + assert_eq!(chunk_size, buffer.read_available()); + + // read second chunk + let mut temp: [u8; 100] = [0; 100]; + assert_eq!(chunk_size, buffer.read_to(&mut temp)); + assert_eq!(chunk, temp); + assert_eq!(max_size - chunk_size * 2, buffer.write_available()); + assert_eq!(0, buffer.read_available()); + + // write third chunk + let chunk: [u8; 100] = [3; 100]; + assert_eq!(chunk_size, buffer.write_from(&chunk)); + assert_eq!(max_size - chunk_size * 3, buffer.write_available()); + assert_eq!(chunk_size, buffer.read_available()); + + // flush with rest + assert_eq!(chunk_size * 2, buffer.flush()); + assert_eq!(max_size - chunk_size, buffer.write_available()); + assert_eq!(chunk_size, buffer.read_available()); + + // read third chunk + let mut temp: [u8; 100] = [0; 100]; + assert_eq!(chunk_size, buffer.read_to(&mut temp)); + assert_eq!(chunk, temp); + assert_eq!(max_size - chunk_size, buffer.write_available()); + assert_eq!(0, buffer.read_available()); + + // flush all + assert_eq!(chunk_size, buffer.flush()); + assert_eq!(max_size, buffer.write_available()); + assert_eq!(0, buffer.read_available()); + + // write all + for i in 0..max_size { + buffer.write_slice()[i] = 255 as u8; + } + assert_eq!(max_size, buffer.write_done(max_size)); + assert_eq!(0, buffer.write_available()); + assert_eq!(max_size, buffer.read_available()); + + // read first chunk + for i in 0..chunk_size { + assert_eq!(255 as u8, buffer.read_slice()[i]); + } + assert_eq!(chunk_size, buffer.read_done(chunk_size)); + assert_eq!(0, buffer.write_available()); + assert_eq!(max_size - chunk_size, buffer.read_available()); + + // flush with rest + assert_eq!(chunk_size, buffer.flush()); + assert_eq!(chunk_size, buffer.write_available()); + assert_eq!(max_size - chunk_size, buffer.read_available()); + + // read rest + for i in 0..(max_size - chunk_size) { + assert_eq!(255 as u8, buffer.read_slice()[i]); + } + assert_eq!( + max_size - chunk_size, + buffer.read_done(max_size - chunk_size) + ); + assert_eq!(chunk_size, buffer.write_available()); + assert_eq!(0, buffer.read_available()); + + // write chunk and clear + let chunk: [u8; 100] = [1; 100]; + assert_eq!(chunk_size, buffer.write_from(&chunk)); + assert_eq!(chunk_size, buffer.clear()); + assert_eq!(0, buffer.write_available()); + assert_eq!(0, buffer.read_available()); + + // flush all + assert_eq!(max_size, buffer.flush()); + assert_eq!(max_size, buffer.write_available()); + assert_eq!(0, buffer.read_available()); + } + + #[test] + fn test_reader() { + let input: &[u8] = &[1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15]; + let max_size = 5; + let min_size = 2; + + let mut reader = BufReader::new(max_size, min_size, input); + assert_eq!(max_size, reader.capacity()); + assert!(reader.is_empty()); + + let mut output = [0, 0, 0]; + + // read first chunk fills buffer + assert_eq!(reader.read(&mut output).unwrap(), 3); + assert_eq!(output, [1, 2, 3]); + + // read second chunk flushes buffer + assert_eq!(reader.read(&mut output).unwrap(), 3); + assert_eq!(output, [4, 5, 6]); + + // refill buffer if below minimum + assert_eq!(reader.fill_buf().unwrap(), [7, 8]); + reader.consume(1); + assert_eq!(reader.fill_buf().unwrap(), [8, 9, 10, 11, 12]); + + // read third chunk from buffer + assert_eq!(reader.read(&mut output).unwrap(), 3); + assert_eq!(output, [8, 9, 10]); + + // drop some bytes and refill buffer + reader.consume(1); + assert_eq!(reader.read(&mut output).unwrap(), 3); + assert_eq!(output, [12, 13, 14]); + + // read until end + let mut rest = [0, 0, 0]; + assert_eq!(reader.read(&mut rest).unwrap(), 1); + assert_eq!(rest, [15, 0, 0]); + assert_eq!(reader.fill_buf().unwrap(), []); + } + + #[test] + fn test_seek() { + let input: &[u8] = &[1, 2, 3, 4, 5, 6, 7, 8, 9]; + let mut reader = BufReader::new(2, 1, Cursor::new(input)); + + // Seek from start + assert_eq!(reader.seek(SeekFrom::Start(2)).unwrap(), 2); + assert_eq!(reader.fill_buf().unwrap(), &[3, 4][..]); + + // Seek from current + assert_eq!(reader.seek(SeekFrom::Current(1)).unwrap(), 3); + assert_eq!(reader.fill_buf().unwrap(), &[4, 5][..]); + + // Seek empty + assert_eq!(reader.seek(SeekFrom::Current(0)).unwrap(), 3); + assert_eq!(reader.fill_buf().unwrap(), &[4, 5][..]); + + // Seek reverse + assert_eq!(reader.seek(SeekFrom::Current(-1)).unwrap(), 2); + assert_eq!(reader.fill_buf().unwrap(), &[3, 4][..]); + + // Seek after consuming bytes + reader.consume(1); + assert_eq!(reader.seek(SeekFrom::Current(1)).unwrap(), 4); + assert_eq!(reader.fill_buf().unwrap(), &[5, 6][..]); + + // Seek to end + assert_eq!(reader.seek(SeekFrom::Start(9)).unwrap(), 9); + assert_eq!(reader.fill_buf().unwrap(), &[][..]); + } +} diff --git a/application/apps/indexer/addons/bufread/tests/proto.rs b/application/apps/indexer/addons/bufread/tests/proto.rs new file mode 100644 index 000000000..dacf6fbd3 --- /dev/null +++ b/application/apps/indexer/addons/bufread/tests/proto.rs @@ -0,0 +1,153 @@ +// Copyright (c) 2025 ESR Labs GmbH. All rights reserved. +// +// NOTICE: All information contained herein is, and remains +// the property of E.S.R.Labs and its suppliers, if any. +// The intellectual and technical concepts contained herein are +// proprietary to E.S.R.Labs and its suppliers and may be covered +// by German and Foreign Patents, patents in process, and are protected +// by trade secret or copyright law. +// Dissemination of this information or reproduction of this material +// is strictly forbidden unless prior written permission is obtained +// from E.S.R.Labs. + +/* + Contains a pseudo protocol for testing, with packages consisting of + an u16 header with the length of the payload after the header + providing a total packet length within the range of + size_of::() to size_of::() + u16::MAX bytes. +*/ + +use bufread::BufReader; +use rand::prelude::*; +use std::{ + io::{BufRead, Result}, + mem::size_of, +}; + +/// The fixed header length of the protocol. +pub const HEADER_LEN: usize = size_of::() as usize; +/// The maximum payload length of the protocol. +const MAX_PAYLOAD_LEN: usize = u16::MAX as usize; +/// The maximum packet length of the protocol. +pub const MAX_PACKET_LEN: usize = HEADER_LEN + MAX_PAYLOAD_LEN; + +/// A source for pseudo protocol packages. +pub struct Source { + data: Vec, + num_packets: usize, +} + +impl Source { + /// Creates a new source with a deterministic package layout and adds packages + /// until reaching at least the given minimal data size. + pub fn fixed(min_size: usize) -> Self { + let mut data = Vec::new(); + let mut num_packets = 0; + + while data.len() < min_size { + let payload_len = num_packets % 1024; + data.append(&mut Self::create_packet(payload_len)); + num_packets += 1; + } + + Source { data, num_packets } + } + + /// Creates a new source with a random package layout and adds packages + /// until reaching at least the given minimal data size. + pub fn random(min_size: usize) -> Self { + let mut data = Vec::new(); + let mut num_packets = 0; + + while data.len() < min_size { + let payload_len: usize = rand::thread_rng().gen_range(0..MAX_PAYLOAD_LEN); + data.append(&mut Self::create_packet(payload_len)); + num_packets += 1; + } + + Source { data, num_packets } + } + + /// Creates a new packet with the given payload length. + fn create_packet(payload_len: usize) -> Vec { + assert!(payload_len <= MAX_PAYLOAD_LEN); + + let packet_len = HEADER_LEN + payload_len; + let mut packet: Vec = Vec::with_capacity(packet_len); + unsafe { + packet.set_len(packet_len); + } + + let header = (payload_len as u16).to_be_bytes().to_vec(); + packet[0] = header[0]; + packet[1] = header[1]; + + rand::thread_rng().fill_bytes(&mut packet[HEADER_LEN..]); + packet + } + + /// Returns the contained data slice. + pub fn data(&self) -> &[u8] { + self.data.as_slice() + } + + /// Returns the length of the contained data. + pub fn data_len(&self) -> usize { + self.data.len() + } + + /// Returns the number of the contained packages. + pub fn num_packets(&self) -> usize { + self.num_packets + } +} + +/// A parser for pseudo protocol packages. +pub struct Parser<'a> { + reader: BufReader<&'a [u8]>, +} + +impl<'a> Parser<'a> { + /// Creates a new parser for the given source. + pub fn new(reader: BufReader<&'a [u8]>) -> Self { + Parser { reader } + } + + /// Parses the next package from the source, if available. + /// + /// Returns the total length of the package being parsed, or + /// a zero-length if at EOF. + pub fn next(&mut self) -> Result { + let buffer = self.reader.fill_buf()?; + if buffer.is_empty() { + return Ok(0); + } + + let mut header = [0; HEADER_LEN]; + header[0] = buffer[0]; + header[1] = buffer[1]; + + let payload_len = u16::from_be_bytes(header) as usize; + let packet_len = HEADER_LEN + payload_len; + self.reader.consume(packet_len); + + Ok(packet_len) + } + + /// Runs a parser and returns the total number of packets and bytes being read. + pub fn run(parser: &mut Parser) -> Result<(usize, usize)> { + let mut result: (usize, usize) = (0, 0); + + loop { + let size = parser.next()?; + if size == 0 { + break; + } + + result.0 += 1; + result.1 += size; + } + + Ok(result) + } +} diff --git a/application/apps/indexer/addons/bufread/tests/proto_tests.rs b/application/apps/indexer/addons/bufread/tests/proto_tests.rs new file mode 100644 index 000000000..d957bc027 --- /dev/null +++ b/application/apps/indexer/addons/bufread/tests/proto_tests.rs @@ -0,0 +1,58 @@ +// Copyright (c) 2025 ESR Labs GmbH. All rights reserved. +// +// NOTICE: All information contained herein is, and remains +// the property of E.S.R.Labs and its suppliers, if any. +// The intellectual and technical concepts contained herein are +// proprietary to E.S.R.Labs and its suppliers and may be covered +// by German and Foreign Patents, patents in process, and are protected +// by trade secret or copyright law. +// Dissemination of this information or reproduction of this material +// is strictly forbidden unless prior written permission is obtained +// from E.S.R.Labs. + +#[path = "./proto.rs"] +mod proto; + +#[cfg(test)] +mod tests { + use super::proto::{Parser, Source, MAX_PACKET_LEN}; + use bufread::BufReader; + + const SOURCE_MIN_SIZE: usize = 10 * MAX_PACKET_LEN; + const BUFFER_MAX_SIZE: usize = 3 * MAX_PACKET_LEN; + const BUFFER_MIN_SIZE: usize = MAX_PACKET_LEN; + + #[test] + fn test_fixed_source() { + let source = Source::fixed(SOURCE_MIN_SIZE); + let reader = BufReader::new(BUFFER_MAX_SIZE, BUFFER_MIN_SIZE, source.data()); + let mut parser = Parser::new(reader); + + match Parser::run(&mut parser) { + Ok(result) => { + assert_eq!(source.num_packets(), result.0); + assert_eq!(source.data_len(), result.1); + } + Err(error) => { + panic!("{}", error); + } + } + } + + #[test] + fn test_random_source() { + let source = Source::random(SOURCE_MIN_SIZE); + let reader = BufReader::new(BUFFER_MAX_SIZE, BUFFER_MIN_SIZE, source.data()); + let mut parser = Parser::new(reader); + + match Parser::run(&mut parser) { + Ok(result) => { + assert_eq!(source.num_packets(), result.0); + assert_eq!(source.data_len(), result.1); + } + Err(error) => { + panic!("{}", error); + } + } + } +} diff --git a/application/apps/indexer/addons/bufread/tests/reader_proptest.rs b/application/apps/indexer/addons/bufread/tests/reader_proptest.rs new file mode 100644 index 000000000..01825222a --- /dev/null +++ b/application/apps/indexer/addons/bufread/tests/reader_proptest.rs @@ -0,0 +1,53 @@ +#![allow(dead_code)] + +// Copyright (c) 2025 ESR Labs GmbH. All rights reserved. +// +// NOTICE: All information contained herein is, and remains +// the property of E.S.R.Labs and its suppliers, if any. +// The intellectual and technical concepts contained herein are +// proprietary to E.S.R.Labs and its suppliers and may be covered +// by German and Foreign Patents, patents in process, and are protected +// by trade secret or copyright law. +// Dissemination of this information or reproduction of this material +// is strictly forbidden unless prior written permission is obtained +// from E.S.R.Labs. + +#[path = "./proto.rs"] +mod proto; + +#[cfg(test)] +mod tests { + use super::proto::{Parser, Source, MAX_PACKET_LEN}; + use bufread::BufReader; + use proptest::prelude::*; + use proptest::test_runner::FileFailurePersistence; + + proptest! { + #![proptest_config(ProptestConfig::with_failure_persistence(FileFailurePersistence::Off))] + #[test] + fn reader_proptest( + source_min_size in (MAX_PACKET_LEN / 2)..(3 * MAX_PACKET_LEN), + buffer_max_size in MAX_PACKET_LEN..(2 * MAX_PACKET_LEN) + ) { + let buffer_min_size = MAX_PACKET_LEN; + + let source = Source::fixed(source_min_size); + let reader = BufReader::new(buffer_max_size, buffer_min_size, source.data()); + let mut parser = Parser::new(reader); + + match Parser::run(&mut parser) { + Ok(result) => { + if source.num_packets() != result.0 { + panic!("num packets does not match: {} != {}", source.num_packets(), result.0); + } + if source.data_len() != result.1 { + panic!("source len does not match: {} != {}", source.data_len(), result.1); + } + } + Err(error) => { + panic!("{}", error); + } + } + } + } +} diff --git a/application/apps/indexer/addons/text_grep/Cargo.toml b/application/apps/indexer/addons/text_grep/Cargo.toml index 903cfcf35..08a9ae0d5 100644 --- a/application/apps/indexer/addons/text_grep/Cargo.toml +++ b/application/apps/indexer/addons/text_grep/Cargo.toml @@ -11,7 +11,7 @@ path = "src/lib.rs" # See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html [dependencies] tokio = { workspace = true, features = ["full"] } -bufread.workspace = true +bufread = { path = "../bufread" } tokio-util.workspace = true tempfile.workspace = true grep-searcher.workspace = true diff --git a/application/apps/indexer/processor/Cargo.toml b/application/apps/indexer/processor/Cargo.toml index dd0de5346..96f494db3 100644 --- a/application/apps/indexer/processor/Cargo.toml +++ b/application/apps/indexer/processor/Cargo.toml @@ -6,7 +6,7 @@ edition = "2021" [dependencies] bincode = "1.3" -bufread.workspace = true +bufread = { path = "../addons/bufread" } bytecount = "0.6" futures.workspace = true grep-regex.workspace = true diff --git a/application/apps/indexer/sources/Cargo.toml b/application/apps/indexer/sources/Cargo.toml index bf1a74a1d..c7e4b905a 100644 --- a/application/apps/indexer/sources/Cargo.toml +++ b/application/apps/indexer/sources/Cargo.toml @@ -6,7 +6,7 @@ edition = "2021" [dependencies] async-stream = "0.3" -bufread.workspace = true +bufread = { path = "../addons/bufread" } bytes = "1.3" etherparse = "0.16" futures.workspace = true diff --git a/application/apps/indexer/stypes/Cargo.toml b/application/apps/indexer/stypes/Cargo.toml index e61036c89..05c759e31 100644 --- a/application/apps/indexer/stypes/Cargo.toml +++ b/application/apps/indexer/stypes/Cargo.toml @@ -35,7 +35,7 @@ envvars = { workspace = true, optional = true } tokio = { workspace = true } walkdir = { workspace = true } node-bindgen = { git = "https://github.com/infinyon/node-bindgen.git", branch="master" } -proptest = "1.5" +proptest = { workspace = true } paste = "1.0" uuid = { workspace = true, features = ["serde", "v4"] } remove_dir_all = "1.0" diff --git a/application/apps/protocol/Cargo.lock b/application/apps/protocol/Cargo.lock index 251582e50..e28ca0e75 100644 --- a/application/apps/protocol/Cargo.lock +++ b/application/apps/protocol/Cargo.lock @@ -66,7 +66,8 @@ dependencies = [ [[package]] name = "dlt-core" version = "0.18.0" -source = "git+https://github.com/kruss/dlt-core.git?branch=remove_buf_redux#87c7cea8be5224790ba8457ff531b4b6804c1298" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0b304e32f1164b8c2ef1dc746b32d321f25f88a32672f0f5bcba2df0f70a3b70" dependencies = [ "byteorder", "bytes", diff --git a/application/apps/rustcore/rs-bindings/Cargo.lock b/application/apps/rustcore/rs-bindings/Cargo.lock index c12274499..f8284ad2b 100644 --- a/application/apps/rustcore/rs-bindings/Cargo.lock +++ b/application/apps/rustcore/rs-bindings/Cargo.lock @@ -344,10 +344,20 @@ dependencies = [ "serde", ] +[[package]] +name = "buf_redux" +version = "0.8.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b953a6887648bb07a535631f2bc00fbdb2a2216f135552cb3f534ed136b9c07f" +dependencies = [ + "memchr", + "safemem", + "slice-deque", +] + [[package]] name = "bufread" -version = "0.1.0" -source = "git+https://github.com/kruss/bufread#5dec80376c0cbbbf3c4f864cf88b965cceb67c3b" +version = "0.1.1" [[package]] name = "bumpalo" @@ -631,9 +641,10 @@ dependencies = [ [[package]] name = "dlt-core" version = "0.18.0" -source = "git+https://github.com/kruss/dlt-core.git?branch=remove_buf_redux#87c7cea8be5224790ba8457ff531b4b6804c1298" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0b304e32f1164b8c2ef1dc746b32d321f25f88a32672f0f5bcba2df0f70a3b70" dependencies = [ - "bufread", + "buf_redux", "byteorder", "bytes", "derive_more", @@ -1264,6 +1275,15 @@ dependencies = [ "winapi", ] +[[package]] +name = "mach" +version = "0.2.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "86dd2487cdfea56def77b88438a2c915fb45113c5319bfe7e14306ca4cd0b0e1" +dependencies = [ + "libc", +] + [[package]] name = "mach2" version = "0.4.2" @@ -2000,6 +2020,12 @@ version = "1.0.18" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "f3cb5ba0dc43242ce17de99c180e96db90b235b8a9fdc9543c96d2209116bd9f" +[[package]] +name = "safemem" +version = "0.3.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ef703b7cb59335eae2eb93ceb664c0eb7ea6bf567079d843e09420219668e072" + [[package]] name = "same-file" version = "1.0.6" @@ -2207,6 +2233,17 @@ dependencies = [ "autocfg", ] +[[package]] +name = "slice-deque" +version = "0.2.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ffddf594f5f597f63533d897427a570dbaa9feabaaa06595b74b71b7014507d7" +dependencies = [ + "libc", + "mach", + "winapi", +] + [[package]] name = "smallvec" version = "1.13.2"