Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

2.6.0 #123

Draft
wants to merge 14 commits into
base: main
Choose a base branch
from
4 changes: 2 additions & 2 deletions .github/workflows/test.yml
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ jobs:
matrix:
rust_version:
- stable
- "1.74.0" # MSRV
- "1.75.0" # MSRV
os:
- ubuntu-latest
- windows-latest
Expand Down Expand Up @@ -63,7 +63,7 @@ jobs:
matrix:
rust_version:
- stable
- "1.74.0" # MSRV
- "1.75.0" # MSRV
os:
- ubuntu-latest
- windows-latest
Expand Down
12 changes: 6 additions & 6 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,9 @@
name = "fjall"
description = "LSM-based key-value storage engine"
license = "MIT OR Apache-2.0"
version = "2.5.0"
version = "2.6.0"
edition = "2021"
rust-version = "1.74.0"
rust-version = "1.75.0"
readme = "README.md"
include = ["src/**/*", "LICENSE-APACHE", "LICENSE-MIT", "README.md"]
repository = "https://github.com/fjall-rs/fjall"
Expand All @@ -17,18 +17,18 @@ name = "fjall"
path = "src/lib.rs"

[features]
default = ["bloom", "single_writer_tx", "lz4"]
default = ["single_writer_tx", "lz4"]
lz4 = ["lsm-tree/lz4"]
miniz = ["lsm-tree/miniz"]
bloom = ["lsm-tree/bloom"]
bytes = ["lsm-tree/bytes"]
single_writer_tx = []
ssi_tx = []
__internal_whitebox = []
bytes = ["lsm-tree/bytes"]

[dependencies]
byteorder = "1.5.0"
lsm-tree = { version = "2.5.0", default-features = false }
lsm-tree = { git = "https://github.com/fjall-rs/lsm-tree", branch = "2.6.0", default-features = false, features = [
] }
log = "0.4.21"
std-semaphore = "0.1.0"
tempfile = "3.10.1"
Expand Down
11 changes: 1 addition & 10 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
<a href="https://crates.io/crates/fjall">
<img src="https://img.shields.io/crates/v/fjall?color=blue" alt="Crates.io" />
</a>
<img src="https://img.shields.io/badge/MSRV-1.74.0-blue" alt="MSRV" />
<img src="https://img.shields.io/badge/MSRV-1.75.0-blue" alt="MSRV" />
<a href="https://deps.rs/repo/github/fjall-rs/fjall">
<img src="https://deps.rs/repo/github/fjall-rs/fjall/status.svg" alt="dependency status" />
</a>
Expand Down Expand Up @@ -147,15 +147,6 @@ Uses [`bytes`](https://github.com/tokio-rs/bytes) as the underlying `Slice` type

*Disabled by default.*

### bloom *[deprecated]*

Uses bloom filters to reduce disk I/O when serving point reads, but increases memory usage.

*Enabled by default.*

> Will be removed in the future.
> If you are absolutely, 100% sure you do not need bloom filters: they will be togglable on a per-partition basis.

## Stable disk format

The disk format is stable as of 1.0.0.
Expand Down
22 changes: 17 additions & 5 deletions src/compaction/worker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,33 +5,45 @@
use super::manager::CompactionManager;
use crate::snapshot_tracker::SnapshotTracker;
use lsm_tree::AbstractTree;
use std::sync::atomic::AtomicUsize;
use std::{
sync::atomic::{AtomicU64, AtomicUsize},
time::Instant,
};

/// Runs a single run of compaction.
pub fn run(
compaction_manager: &CompactionManager,
snapshot_tracker: &SnapshotTracker,
compaction_counter: &AtomicUsize,
time_compacting: &AtomicU64,
) {
use std::sync::atomic::Ordering::Relaxed;

let Some(item) = compaction_manager.pop() else {
return;
};

log::trace!(
"compactor: calling compaction strategy for partition {:?}",
item.0.name
item.0.name,
);

let strategy = item.config.compaction_strategy.clone();

// TODO: loop if there's more work to do

compaction_counter.fetch_add(1, std::sync::atomic::Ordering::Relaxed);
compaction_counter.fetch_add(1, Relaxed);

let start = Instant::now();

if let Err(e) = item
.tree
.compact(strategy.inner(), snapshot_tracker.get_seqno_safe_to_gc())
{
log::error!("Compaction failed: {e:?}");
};
compaction_counter.fetch_sub(1, std::sync::atomic::Ordering::Relaxed);
}

time_compacting.fetch_add(start.elapsed().as_micros() as u64, Relaxed);

compaction_counter.fetch_sub(1, Relaxed);
}
1 change: 1 addition & 0 deletions src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ use lsm_tree::{DecodeError, EncodeError};

/// Errors that may occur in the storage engine
#[derive(Debug)]
#[non_exhaustive]
pub enum Error {
/// Error inside LSM-tree
Storage(lsm_tree::Error),
Expand Down
2 changes: 1 addition & 1 deletion src/journal/marker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ pub fn serialize_marker_item<W: Write>(

// NOTE: Truncation is okay and actually needed
#[allow(clippy::cast_possible_truncation)]
writer.write_u8(partition.as_bytes().len() as u8)?;
writer.write_u8(partition.len() as u8)?;
writer.write_all(partition.as_bytes())?;

// NOTE: Truncation is okay and actually needed
Expand Down
17 changes: 16 additions & 1 deletion src/keyspace.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ use std::{
fs::{remove_dir_all, File},
path::Path,
sync::{
atomic::{AtomicBool, AtomicUsize},
atomic::{AtomicBool, AtomicU64, AtomicUsize},
Arc, RwLock,
},
};
Expand Down Expand Up @@ -81,6 +81,9 @@ pub struct KeyspaceInner {
/// Active compaction conter
pub(crate) active_compaction_count: Arc<AtomicUsize>,

/// Time spent in compactions (in µs)
pub(crate) time_compacting: Arc<AtomicU64>,

#[doc(hidden)]
pub snapshot_tracker: SnapshotTracker,
}
Expand Down Expand Up @@ -197,6 +200,14 @@ impl Keyspace {
self.write_buffer_manager.get()
}

/// Returns the time all compactions took until now, in µs.
#[doc(hidden)]
#[must_use]
pub fn time_compacting(&self) -> u64 {
self.time_compacting
.load(std::sync::atomic::Ordering::Relaxed)
}

/// Returns the number of active compactions currently running.
#[doc(hidden)]
#[must_use]
Expand Down Expand Up @@ -592,6 +603,7 @@ impl Keyspace {
is_poisoned: Arc::default(),
snapshot_tracker: SnapshotTracker::default(),
active_compaction_count: Arc::default(),
time_compacting: Arc::default(),
};

let keyspace = Self(Arc::new(inner));
Expand Down Expand Up @@ -726,6 +738,7 @@ impl Keyspace {
is_poisoned: Arc::default(),
snapshot_tracker: SnapshotTracker::default(),
active_compaction_count: Arc::default(),
time_compacting: Arc::default(),
};

// NOTE: Lastly, fsync .fjall marker, which contains the version
Expand Down Expand Up @@ -806,6 +819,7 @@ impl Keyspace {
let thread_counter = self.active_background_threads.clone();
let snapshot_tracker = self.snapshot_tracker.clone();
let compaction_counter = self.active_compaction_count.clone();
let time_compacting = self.time_compacting.clone();

thread_counter.fetch_add(1, std::sync::atomic::Ordering::Relaxed);

Expand All @@ -820,6 +834,7 @@ impl Keyspace {
&compaction_manager,
&snapshot_tracker,
&compaction_counter,
&time_compacting,
);
}

Expand Down
61 changes: 45 additions & 16 deletions src/partition/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -254,7 +254,8 @@ impl PartitionHandle {
.data_block_size(config.data_block_size)
.index_block_size(config.index_block_size)
.level_count(config.level_count)
.compression(config.compression);
.compression(config.compression)
.bloom_bits_per_key(config.bloom_bits_per_key);

if let Some(kv_opts) = &config.kv_separation {
base_config = base_config
Expand All @@ -263,11 +264,6 @@ impl PartitionHandle {
.blob_file_target_size(kv_opts.file_target_size);
}

#[cfg(feature = "bloom")]
{
base_config = base_config.bloom_bits_per_key(config.bloom_bits_per_key);
}

let tree = match config.tree_type {
lsm_tree::TreeType::Standard => AnyTree::Standard(base_config.open()?),
lsm_tree::TreeType::Blob => AnyTree::Blob(base_config.open_as_blob_tree()?),
Expand Down Expand Up @@ -339,23 +335,29 @@ impl PartitionHandle {
/// ```
#[must_use]
pub fn iter(&self) -> impl DoubleEndedIterator<Item = crate::Result<KvPair>> + 'static {
self.tree.iter().map(|item| item.map_err(Into::into))
self.tree
.iter(None, None)
.map(|item| item.map_err(Into::into))
}

/// Returns an iterator that scans through the entire partition, returning only keys.
///
/// Avoid using this function, or limit it as otherwise it may scan a lot of items.
#[must_use]
pub fn keys(&self) -> impl DoubleEndedIterator<Item = crate::Result<UserKey>> + 'static {
self.tree.keys().map(|item| item.map_err(Into::into))
self.tree
.keys(None, None)
.map(|item| item.map_err(Into::into))
}

/// Returns an iterator that scans through the entire partition, returning only values.
///
/// Avoid using this function, or limit it as otherwise it may scan a lot of items.
#[must_use]
pub fn values(&self) -> impl DoubleEndedIterator<Item = crate::Result<UserValue>> + 'static {
self.tree.values().map(|item| item.map_err(Into::into))
self.tree
.values(None, None)
.map(|item| item.map_err(Into::into))
}

/// Returns an iterator over a range of items.
Expand All @@ -381,7 +383,9 @@ impl PartitionHandle {
&'a self,
range: R,
) -> impl DoubleEndedIterator<Item = crate::Result<KvPair>> + 'static {
self.tree.range(range).map(|item| item.map_err(Into::into))
self.tree
.range(range, None, None)
.map(|item| item.map_err(Into::into))
}

/// Returns an iterator over a prefixed set of items.
Expand All @@ -408,7 +412,7 @@ impl PartitionHandle {
prefix: K,
) -> impl DoubleEndedIterator<Item = crate::Result<KvPair>> + 'static {
self.tree
.prefix(prefix)
.prefix(prefix, None, None)
.map(|item| item.map_err(Into::into))
}

Expand Down Expand Up @@ -536,7 +540,7 @@ impl PartitionHandle {
///
/// Will return `Err` if an IO error occurs.
pub fn contains_key<K: AsRef<[u8]>>(&self, key: K) -> crate::Result<bool> {
self.tree.contains_key(key).map_err(Into::into)
self.tree.contains_key(key, None).map_err(Into::into)
}

/// Retrieves an item from the partition.
Expand All @@ -561,7 +565,7 @@ impl PartitionHandle {
///
/// Will return `Err` if an IO error occurs.
pub fn get<K: AsRef<[u8]>>(&self, key: K) -> crate::Result<Option<lsm_tree::UserValue>> {
Ok(self.tree.get(key)?)
Ok(self.tree.get(key, None)?)
}

/// Retrieves the size of an item from the partition.
Expand All @@ -586,7 +590,7 @@ impl PartitionHandle {
///
/// Will return `Err` if an IO error occurs.
pub fn size_of<K: AsRef<[u8]>>(&self, key: K) -> crate::Result<Option<u32>> {
Ok(self.tree.size_of(key)?)
Ok(self.tree.size_of(key, None)?)
}

/// Returns the first key-value pair in the partition.
Expand Down Expand Up @@ -614,7 +618,7 @@ impl PartitionHandle {
///
/// Will return `Err` if an IO error occurs.
pub fn first_key_value(&self) -> crate::Result<Option<KvPair>> {
Ok(self.tree.first_key_value()?)
Ok(self.tree.first_key_value(None, None)?)
}

/// Returns the last key-value pair in the partition.
Expand Down Expand Up @@ -642,7 +646,32 @@ impl PartitionHandle {
///
/// Will return `Err` if an IO error occurs.
pub fn last_key_value(&self) -> crate::Result<Option<KvPair>> {
Ok(self.tree.last_key_value()?)
Ok(self.tree.last_key_value(None, None)?)
}

/// Returns `true` if the underlying LSM-tree is key-value-separated.
///
/// See [`CreateOptions::with_kv_separation`] for more information.
///
/// # Examples
///
/// ```
/// # use fjall::{Config, Keyspace, PartitionCreateOptions};
/// #
/// # let folder = tempfile::tempdir()?;
/// # let keyspace = Config::new(folder).open()?;
/// let tree1 = keyspace.open_partition("default", PartitionCreateOptions::default())?;
/// assert!(!tree1.is_kv_separated());
///
/// let blob_cfg = PartitionCreateOptions::default().with_kv_separation(Default::default());
/// let tree2 = keyspace.open_partition("blobs", blob_cfg)?;
/// assert!(tree2.is_kv_separated());
/// #
/// # Ok::<(), fjall::Error>(())
/// ```
#[must_use]
pub fn is_kv_separated(&self) -> bool {
matches!(self.tree, crate::AnyTree::Blob(_))
}

// NOTE: Used in tests
Expand Down
Loading
Loading