Skip to content

Commit

Permalink
Remove buf_redux
Browse files Browse the repository at this point in the history
  • Loading branch information
kruss committed Jan 27, 2025
1 parent 71c3395 commit 61adc97
Show file tree
Hide file tree
Showing 19 changed files with 387 additions and 458 deletions.
479 changes: 210 additions & 269 deletions application/apps/indexer/Cargo.lock

Large diffs are not rendered by default.

5 changes: 3 additions & 2 deletions application/apps/indexer/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -24,11 +24,12 @@ thiserror = "2.0"
lazy_static = "1.5"
tokio = { version = "1", features = ["full"] }
tokio-stream = "0.1"
dlt-core = "0.18.0"
#dlt-core = "0.18.0"
dlt-core = { git = "https://github.com/kruss/dlt-core.git", branch = "remove_buf_redux" }
crossbeam-channel = "0.5"
futures = "0.3"
tokio-util = "0.7"
buf_redux = "0.8"
bufread = { git = "https://github.com/kruss/bufread" }
regex = "1"
grep-regex = "0.1"
rand = "0.8"
Expand Down
2 changes: 1 addition & 1 deletion application/apps/indexer/addons/text_grep/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ path = "src/lib.rs"
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
[dependencies]
tokio = { workspace = true, features = ["full"] }
buf_redux.workspace = true
bufread.workspace = true
tokio-util.workspace = true
tempfile.workspace = true
grep-searcher.workspace = true
Expand Down
73 changes: 48 additions & 25 deletions application/apps/indexer/addons/text_grep/src/buffer.rs
Original file line number Diff line number Diff line change
@@ -1,39 +1,62 @@
use buf_redux::{
do_read,
policy::{DoRead, ReaderPolicy},
Buffer,
};
// TODO this duplicates: application/apps/indexer/processor/src/search/buffer.rs
use bufread::BufReader;
use tokio_util::sync::CancellationToken;
use std::io::{BufRead, Read, Result, Seek, SeekFrom, Error, ErrorKind};

pub const REDUX_READER_CAPACITY: usize = 1024 * 1024;
pub const REDUX_MIN_BUFFER_SPACE: usize = 10 * 1024;
const BIN_READER_CAPACITY: usize = 1024 * 1024;
const BIN_MIN_BUFFER_SPACE: usize = 10 * 1024;

#[derive(Debug)]
pub struct CancallableMinBuffered(pub (usize, CancellationToken));
pub struct CancellableBufReader<R> {
buffer: BufReader<R>,
cancel: CancellationToken
}

impl CancallableMinBuffered {
/// Set the number of bytes to ensure are in the buffer.
pub fn set_min(&mut self, min: usize) {
self.0 .0 = min;
impl<R> CancellableBufReader<R> {
pub fn new(reader: R, cancel: CancellationToken) -> Self {
CancellableBufReader {
buffer: BufReader::new(
BIN_READER_CAPACITY,
BIN_MIN_BUFFER_SPACE,
reader),
cancel
}
}
}

impl ReaderPolicy for CancallableMinBuffered {
fn before_read(&mut self, buffer: &mut Buffer) -> DoRead {
// do nothing if we have enough data
if buffer.len() >= self.0 .0 {
do_read!(false)
impl<R: Read> Read for CancellableBufReader<R> {
fn read(&mut self, buffer: &mut [u8]) -> Result<usize> {
if self.cancel.is_cancelled() {
return Ok(0);
}

self.buffer.read(buffer)
}
}

let cap = buffer.capacity();
impl<R: Read> BufRead for CancellableBufReader<R> {
fn fill_buf(&mut self) -> Result<&[u8]> {
if self.cancel.is_cancelled() {
return Ok(&[][..]);
}

self.buffer.fill_buf()
}

// if there's enough room but some of it's stuck after the head
if buffer.usable_space() < self.0 .0 && buffer.free_space() >= self.0 .0 {
buffer.make_room();
} else if cap < self.0 .0 {
buffer.reserve(self.0 .0 - cap);
fn consume(&mut self, size: usize) {
if self.cancel.is_cancelled() {
return;
}

DoRead(true)
self.buffer.consume(size)
}
}

impl<R: Seek> Seek for CancellableBufReader<R> {
fn seek(&mut self, pos: SeekFrom) -> Result<u64> {
if self.cancel.is_cancelled() {
return Err(Error::from(ErrorKind::NotFound));
}

self.buffer.seek(pos)
}
}
8 changes: 2 additions & 6 deletions application/apps/indexer/addons/text_grep/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,6 @@
//!
//! # Dependencies
//!
//! - `buf_redux`: Provides buffered I/O functionality.
//! - `grep_regex`: Facilitates regular expression searching.
//! - `grep_searcher`: Implements file searching capabilities.
//! - `regex`: Provides regular expression support.
Expand All @@ -81,8 +80,7 @@
//! - Both case-sensitive and case-insensitive searches are supported based on user preference.
pub mod buffer;
use crate::buffer::{CancallableMinBuffered, REDUX_MIN_BUFFER_SPACE, REDUX_READER_CAPACITY};
use buf_redux::BufReader;
use crate::buffer::CancellableBufReader;
use grep_regex::{RegexMatcher, RegexMatcherBuilder};
use grep_searcher::{sinks::UTF8, Searcher};
use regex::Regex;
Expand Down Expand Up @@ -214,9 +212,7 @@ fn process_file(
) -> Result<SearchResult, GrepError> {
let mut pattern_counts = HashMap::new();
let file = File::open(file_path)?;
let reader = BufReader::with_capacity(REDUX_READER_CAPACITY, file).set_policy(
CancallableMinBuffered((REDUX_MIN_BUFFER_SPACE, cancel_token.clone())),
);
let reader = CancellableBufReader::new(file, cancel_token.clone());
let mut searcher = Searcher::new();
searcher
.search_reader(
Expand Down
2 changes: 1 addition & 1 deletion application/apps/indexer/processor/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ edition = "2021"

[dependencies]
bincode = "1.3"
buf_redux = { git = "https://github.com/DmitryAstafyev/buf_redux.git" }
bufread.workspace = true
bytecount = "0.6"
futures.workspace = true
grep-regex.workspace = true
Expand Down
75 changes: 47 additions & 28 deletions application/apps/indexer/processor/src/search/buffer.rs
Original file line number Diff line number Diff line change
@@ -1,43 +1,62 @@
use buf_redux::{
do_read,
policy::{DoRead, ReaderPolicy},
Buffer,
};
// TODO this duplicates: application/apps/indexer/addons/text_grep/src/buffer.rs
use bufread::BufReader;
use tokio_util::sync::CancellationToken;
use std::io::{BufRead, Read, Result, Seek, SeekFrom, Error, ErrorKind};

pub const REDUX_READER_CAPACITY: usize = 1024 * 1024;
pub const REDUX_MIN_BUFFER_SPACE: usize = 10 * 1024;
const BIN_READER_CAPACITY: usize = 1024 * 1024;
const BIN_MIN_BUFFER_SPACE: usize = 10 * 1024;

#[derive(Debug)]
pub struct CancallableMinBuffered(pub (usize, CancellationToken));
pub struct CancellableBufReader<R> {
buffer: BufReader<R>,
cancel: CancellationToken
}

impl CancallableMinBuffered {
/// Set the number of bytes to ensure are in the buffer.
pub fn set_min(&mut self, min: usize) {
self.0 .0 = min;
impl<R> CancellableBufReader<R> {
pub fn new(reader: R, cancel: CancellationToken) -> Self {
CancellableBufReader {
buffer: BufReader::new(
BIN_READER_CAPACITY,
BIN_MIN_BUFFER_SPACE,
reader),
cancel
}
}
}

impl ReaderPolicy for CancallableMinBuffered {
fn before_read(&mut self, buffer: &mut Buffer) -> DoRead {
// do nothing if we have enough data
if buffer.len() >= self.0 .0 {
do_read!(false)
impl<R: Read> Read for CancellableBufReader<R> {
fn read(&mut self, buffer: &mut [u8]) -> Result<usize> {
if self.cancel.is_cancelled() {
return Ok(0);
}

self.buffer.read(buffer)
}
}

let cap = buffer.capacity();
impl<R: Read> BufRead for CancellableBufReader<R> {
fn fill_buf(&mut self) -> Result<&[u8]> {
if self.cancel.is_cancelled() {
return Ok(&[][..]);
}

self.buffer.fill_buf()
}

// if there's enough room but some of it's stuck after the head
if buffer.usable_space() < self.0 .0 && buffer.free_space() >= self.0 .0 {
buffer.make_room();
} else if cap < self.0 .0 {
buffer.reserve(self.0 .0 - cap);
fn consume(&mut self, size: usize) {
if self.cancel.is_cancelled() {
return;
}

DoRead(true)
self.buffer.consume(size)
}
}

fn is_paused(&mut self) -> bool {
self.0 .1.is_cancelled()
impl<R: Seek> Seek for CancellableBufReader<R> {
fn seek(&mut self, pos: SeekFrom) -> Result<u64> {
if self.cancel.is_cancelled() {
return Err(Error::from(ErrorKind::NotFound));
}

self.buffer.seek(pos)
}
}
}
10 changes: 3 additions & 7 deletions application/apps/indexer/processor/src/search/searchers/mod.rs
Original file line number Diff line number Diff line change
@@ -1,11 +1,10 @@
use crate::{
grabber::GrabError,
search::{
buffer::{CancallableMinBuffered, REDUX_MIN_BUFFER_SPACE, REDUX_READER_CAPACITY},
buffer::CancellableBufReader,
error::SearchError,
},
};
use buf_redux::BufReader as ReduxReader;
use grep_regex::RegexMatcher;
use grep_searcher::{sinks::UTF8, Searcher};
use std::{
Expand Down Expand Up @@ -68,7 +67,7 @@ impl<State: SearchState> BaseSearcher<State> {
&mut self,
rows_count: u64,
read_bytes: u64,
cancallation: CancellationToken,
cancel_token: CancellationToken,
mut f: F,
) -> Result<Range<usize>, SearchError>
where
Expand Down Expand Up @@ -101,10 +100,7 @@ impl<State: SearchState> BaseSearcher<State> {
let in_file = File::open(&self.file_path).map_err(|_| {
GrabError::IoOperation(format!("Could not open file {:?}", self.file_path))
})?;
let mut in_file_reader =
ReduxReader::with_capacity(REDUX_READER_CAPACITY, in_file).set_policy(
CancallableMinBuffered((REDUX_MIN_BUFFER_SPACE, cancallation)),
);
let mut in_file_reader = CancellableBufReader::new(in_file, cancel_token);
in_file_reader
.seek(SeekFrom::Start(self.bytes_read))
.map_err(|_| {
Expand Down
9 changes: 4 additions & 5 deletions application/apps/indexer/processor/src/text_source.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ use crate::grabber::{
identify_byte_range, ByteRange, ComputationResult, FilePart, GrabError, GrabMetadata,
LineRange, Slot,
};
use buf_redux::{policy::MinBuffered, BufReader as ReduxReader};
use bufread::BufReader;
use log::{debug, error};
use std::{
fs,
Expand All @@ -12,8 +12,8 @@ use std::{
};
use tokio_util::sync::CancellationToken;

const REDUX_READER_CAPACITY: usize = 1024 * 32;
const REDUX_MIN_BUFFER_SPACE: usize = 10 * 1024;
const BIN_READER_CAPACITY: usize = 1024 * 32;
const BIN_MIN_BUFFER_SPACE: usize = 10 * 1024;

#[derive(Debug)]
pub struct TextFileSource {
Expand Down Expand Up @@ -135,8 +135,7 @@ impl TextFileSource {
&self.path, byte_offset
))
})?;
let mut reader = ReduxReader::with_capacity(REDUX_READER_CAPACITY, f)
.set_policy(MinBuffered(REDUX_MIN_BUFFER_SPACE));
let mut reader = BufReader::new(BIN_READER_CAPACITY, BIN_MIN_BUFFER_SPACE, f);
let mut pending: Option<Slot> = None;
loop {
if let Some(shutdown_token) = &shutdown_token {
Expand Down
2 changes: 1 addition & 1 deletion application/apps/indexer/sources/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ edition = "2021"

[dependencies]
async-stream = "0.3"
buf_redux.workspace = true
bufread.workspace = true
bytes = "1.3"
etherparse = "0.16"
futures.workspace = true
Expand Down
20 changes: 10 additions & 10 deletions application/apps/indexer/sources/src/binary/pcap/legacy.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,15 +2,15 @@ use crate::{
binary::pcap::debug_block, ByteSource, Error as SourceError, ReloadInfo, SourceFilter,
TransportProtocol,
};
use buf_redux::Buffer;
use bufread::DeqBuffer;
use etherparse::{SlicedPacket, TransportSlice};
use log::{debug, error, trace};
use pcap_parser::{traits::PcapReaderIterator, LegacyPcapReader, PcapBlockOwned, PcapError};
use std::io::Read;

pub struct PcapLegacyByteSource<R: Read> {
pcap_reader: LegacyPcapReader<R>,
buffer: Buffer,
buffer: DeqBuffer,
last_know_timestamp: Option<u64>,
total: usize,
}
Expand All @@ -20,7 +20,7 @@ impl<R: Read> PcapLegacyByteSource<R> {
Ok(Self {
pcap_reader: LegacyPcapReader::new(65536, reader)
.map_err(|e| SourceError::Setup(format!("{e}")))?,
buffer: Buffer::new(),
buffer: DeqBuffer::new(8192),
last_know_timestamp: None,
total: 0,
})
Expand Down Expand Up @@ -104,8 +104,8 @@ impl<R: Read + Send + Sync> ByteSource for PcapLegacyByteSource<R> {
}),
) => {
let actual_tp: TransportProtocol = actual.into();
let received_bytes = self.buffer.copy_from_slice(payload);
let available_bytes = self.buffer.len();
let received_bytes = self.buffer.write_from(payload);
let available_bytes = self.buffer.read_available();
if actual_tp == *wanted {
Ok(Some(ReloadInfo::new(
received_bytes,
Expand All @@ -123,8 +123,8 @@ impl<R: Read + Send + Sync> ByteSource for PcapLegacyByteSource<R> {
}
}
_ => {
let copied = self.buffer.copy_from_slice(payload);
let available_bytes = self.buffer.len();
let copied = self.buffer.write_from(payload);
let available_bytes = self.buffer.read_available();
Ok(Some(ReloadInfo::new(
copied,
available_bytes,
Expand All @@ -145,15 +145,15 @@ impl<R: Read + Send + Sync> ByteSource for PcapLegacyByteSource<R> {
}

fn current_slice(&self) -> &[u8] {
self.buffer.buf()
self.buffer.read_slice()
}

fn consume(&mut self, offset: usize) {
self.buffer.consume(offset);
self.buffer.read_done(offset);
}

fn len(&self) -> usize {
self.buffer.len()
self.buffer.read_available()
}
}

Expand Down
Loading

0 comments on commit 61adc97

Please sign in to comment.