Skip to content

Commit

Permalink
Merge pull request #108 from fjall-rs/2.5.0
Browse files Browse the repository at this point in the history
2.5.0
  • Loading branch information
marvin-j97 authored Jan 8, 2025
2 parents 80a7b99 + 4e92845 commit fa09be9
Show file tree
Hide file tree
Showing 18 changed files with 132 additions and 93 deletions.
4 changes: 2 additions & 2 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
name = "fjall"
description = "LSM-based key-value storage engine"
license = "MIT OR Apache-2.0"
version = "2.4.4"
version = "2.5.0"
edition = "2021"
rust-version = "1.74.0"
readme = "README.md"
Expand All @@ -28,7 +28,7 @@ bytes = ["lsm-tree/bytes"]

[dependencies]
byteorder = "1.5.0"
lsm-tree = { version = "2.4.0", default-features = false }
lsm-tree = { version = "2.5.0", default-features = false }
log = "0.4.21"
std-semaphore = "0.1.0"
tempfile = "3.10.1"
Expand Down
2 changes: 1 addition & 1 deletion examples/permuterm/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,7 @@ fn main() -> Result<()> {

for word in WORDS {
for term in permuterm(word) {
db.insert(format!("{term}#{word}"), word).unwrap();
db.insert(format!("{term}#{word}"), *word).unwrap();
}
}

Expand Down
4 changes: 2 additions & 2 deletions examples/tx-blob-cas/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -125,7 +125,7 @@ impl Cas {
let rc = self.decrement_ref_count(&mut tx, &prev_content_hash)?;
if rc == 0 {
// No more references
tx.remove(&self.blobs, &prev_content_hash);
tx.remove(&self.blobs, prev_content_hash);
}
}

Expand Down Expand Up @@ -156,7 +156,7 @@ impl Cas {
let rc = self.decrement_ref_count(&mut tx, &content_hash)?;
if rc == 0 {
// No more references
tx.remove(&self.blobs, &content_hash);
tx.remove(&self.blobs, content_hash);
}

tx.commit()?;
Expand Down
5 changes: 3 additions & 2 deletions examples/tx-mpmc-queue/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -63,11 +63,12 @@ fn main() -> fjall::Result<()> {
// Something like SingleDelete https://github.com/facebook/rocksdb/wiki/Single-Delete
// would be good for this type of workload
if let Some((key, _)) = tx.first_key_value(&tasks)? {
tx.remove(&tasks, &key);
let task_id = std::str::from_utf8(&key).unwrap().to_owned();

tx.remove(&tasks, key);

tx.commit()?;

let task_id = std::str::from_utf8(&key).unwrap();
println!("consumer {idx} completed task {task_id}");

counter.fetch_add(1, Relaxed);
Expand Down
7 changes: 4 additions & 3 deletions examples/tx-partition-move/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,12 +33,13 @@ fn main() -> fjall::Result<()> {
// Something like SingleDelete https://github.com/facebook/rocksdb/wiki/Single-Delete
// would be good for this type of workload
if let Some((key, value)) = tx.first_key_value(&src)? {
tx.remove(&src, &key);
tx.insert(&dst, &key, &value);
let task_id = std::str::from_utf8(&key).unwrap().to_owned();

tx.remove(&src, key.clone());
tx.insert(&dst, key, value);

tx.commit()?;

let task_id = std::str::from_utf8(&key).unwrap();
println!("consumer {idx} moved {task_id}");

let ms = rng.gen_range(10..100);
Expand Down
5 changes: 3 additions & 2 deletions examples/tx-ssi-mpmc-queue/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -65,13 +65,14 @@ fn main() -> fjall::Result<()> {
// Something like SingleDelete https://github.com/facebook/rocksdb/wiki/Single-Delete
// would be good for this type of workload
if let Some((key, _)) = tx.first_key_value(&tasks)? {
tx.remove(&tasks, &key);
let task_id = std::str::from_utf8(&key).unwrap().to_owned();

tx.remove(&tasks, key);

if tx.commit()?.is_ok() {
counter.fetch_add(1, Relaxed);
}

let task_id = std::str::from_utf8(&key).unwrap();
println!("consumer {idx} completed task {task_id}");

let ms = rng.gen_range(50..200);
Expand Down
7 changes: 4 additions & 3 deletions examples/tx-ssi-partition-move/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,12 +33,13 @@ fn main() -> fjall::Result<()> {
// Something like SingleDelete https://github.com/facebook/rocksdb/wiki/Single-Delete
// would be good for this type of workload
if let Some((key, value)) = tx.first_key_value(&src)? {
tx.remove(&src, &key);
tx.insert(&dst, &key, &value);
let task_id = std::str::from_utf8(&key).unwrap().to_owned();

tx.remove(&src, key.clone());
tx.insert(&dst, key, value);

tx.commit()?.ok();

let task_id = std::str::from_utf8(&key).unwrap();
println!("consumer {idx} moved {task_id}");

let ms = rng.gen_range(10..100);
Expand Down
22 changes: 7 additions & 15 deletions src/batch/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ pub mod item;

use crate::{Keyspace, PartitionHandle, PersistMode};
use item::Item;
use lsm_tree::{AbstractTree, ValueType};
use lsm_tree::{AbstractTree, UserKey, UserValue, ValueType};
use std::{collections::HashSet, sync::Arc};

/// Partition key (a.k.a. column family, locality group)
Expand Down Expand Up @@ -52,28 +52,20 @@ impl Batch {
}

/// Inserts a key-value pair into the batch
pub fn insert<K: AsRef<[u8]>, V: AsRef<[u8]>>(
pub fn insert<K: Into<UserKey>, V: Into<UserValue>>(
&mut self,
p: &PartitionHandle,
key: K,
value: V,
) {
self.data.push(Item::new(
p.name.clone(),
key.as_ref(),
value.as_ref(),
ValueType::Value,
));
self.data
.push(Item::new(p.name.clone(), key, value, ValueType::Value));
}

/// Adds a tombstone marker for a key
pub fn remove<K: AsRef<[u8]>>(&mut self, p: &PartitionHandle, key: K) {
self.data.push(Item::new(
p.name.clone(),
key.as_ref(),
vec![],
ValueType::Tombstone,
));
pub fn remove<K: Into<UserKey>>(&mut self, p: &PartitionHandle, key: K) {
self.data
.push(Item::new(p.name.clone(), key, vec![], ValueType::Tombstone));
}

/// Commits the batch to the [`Keyspace`] atomically
Expand Down
9 changes: 8 additions & 1 deletion src/compaction/worker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,9 +5,14 @@
use super::manager::CompactionManager;
use crate::snapshot_tracker::SnapshotTracker;
use lsm_tree::AbstractTree;
use std::sync::atomic::AtomicUsize;

/// Runs a single run of compaction.
pub fn run(compaction_manager: &CompactionManager, snapshot_tracker: &SnapshotTracker) {
pub fn run(
compaction_manager: &CompactionManager,
snapshot_tracker: &SnapshotTracker,
compaction_counter: &AtomicUsize,
) {
let Some(item) = compaction_manager.pop() else {
return;
};
Expand All @@ -21,10 +26,12 @@ pub fn run(compaction_manager: &CompactionManager, snapshot_tracker: &SnapshotTr

// TODO: loop if there's more work to do

compaction_counter.fetch_add(1, std::sync::atomic::Ordering::Relaxed);
if let Err(e) = item
.tree
.compact(strategy.inner(), snapshot_tracker.get_seqno_safe_to_gc())
{
log::error!("Compaction failed: {e:?}");
};
compaction_counter.fetch_sub(1, std::sync::atomic::Ordering::Relaxed);
}
5 changes: 4 additions & 1 deletion src/flush/worker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -152,7 +152,10 @@ pub fn run(
flush_manager.dequeue_tasks(partition.name.clone(), created_segments.len());

write_buffer_manager.free(memtables_size);
compaction_manager.notify(partition);

for _ in 0..parallelism {
compaction_manager.notify(partition.clone());
}
}
}
Err(e) => {
Expand Down
20 changes: 19 additions & 1 deletion src/keyspace.rs
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,9 @@ pub struct KeyspaceInner {
/// True if fsync failed
pub(crate) is_poisoned: Arc<AtomicBool>,

/// Active compaction conter
pub(crate) active_compaction_count: Arc<AtomicUsize>,

#[doc(hidden)]
pub snapshot_tracker: SnapshotTracker,
}
Expand Down Expand Up @@ -194,6 +197,14 @@ impl Keyspace {
self.write_buffer_manager.get()
}

/// Returns the number of active compactions currently running.
#[doc(hidden)]
#[must_use]
pub fn active_compactions(&self) -> usize {
self.active_compaction_count
.load(std::sync::atomic::Ordering::Relaxed)
}

/// Returns the amount of journals on disk.
///
/// # Examples
Expand Down Expand Up @@ -580,6 +591,7 @@ impl Keyspace {
write_buffer_manager: WriteBufferManager::default(),
is_poisoned: Arc::default(),
snapshot_tracker: SnapshotTracker::default(),
active_compaction_count: Arc::default(),
};

let keyspace = Self(Arc::new(inner));
Expand Down Expand Up @@ -713,6 +725,7 @@ impl Keyspace {
write_buffer_manager: WriteBufferManager::default(),
is_poisoned: Arc::default(),
snapshot_tracker: SnapshotTracker::default(),
active_compaction_count: Arc::default(),
};

// NOTE: Lastly, fsync .fjall marker, which contains the version
Expand Down Expand Up @@ -792,6 +805,7 @@ impl Keyspace {
let stop_signal = self.stop_signal.clone();
let thread_counter = self.active_background_threads.clone();
let snapshot_tracker = self.snapshot_tracker.clone();
let compaction_counter = self.active_compaction_count.clone();

thread_counter.fetch_add(1, std::sync::atomic::Ordering::Relaxed);

Expand All @@ -802,7 +816,11 @@ impl Keyspace {
log::trace!("compaction: waiting for work");
compaction_manager.wait_for();

crate::compaction::worker::run(&compaction_manager, &snapshot_tracker);
crate::compaction::worker::run(
&compaction_manager,
&snapshot_tracker,
&compaction_counter,
);
}

log::trace!("compaction thread: exiting because keyspace is dropping");
Expand Down
18 changes: 11 additions & 7 deletions src/partition/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -866,15 +866,19 @@ impl PartitionHandle {
/// # Errors
///
/// Will return `Err` if an IO error occurs.
pub fn insert<K: AsRef<[u8]>, V: AsRef<[u8]>>(&self, key: K, value: V) -> crate::Result<()> {
pub fn insert<K: Into<UserKey>, V: Into<UserValue>>(
&self,
key: K,
value: V,
) -> crate::Result<()> {
use std::sync::atomic::Ordering;

if self.is_deleted.load(Ordering::Relaxed) {
return Err(crate::Error::PartitionDeleted);
}

let key = key.as_ref();
let value = value.as_ref();
let key = key.into();
let value = value.into();

let mut journal_writer = self.journal.get_writer();

Expand All @@ -885,7 +889,7 @@ impl PartitionHandle {
return Err(crate::Error::Poisoned);
}

journal_writer.write_raw(&self.name, key, value, lsm_tree::ValueType::Value, seqno)?;
journal_writer.write_raw(&self.name, &key, &value, lsm_tree::ValueType::Value, seqno)?;

if !self.config.manual_journal_persist {
journal_writer
Expand Down Expand Up @@ -943,14 +947,14 @@ impl PartitionHandle {
/// # Errors
///
/// Will return `Err` if an IO error occurs.
pub fn remove<K: AsRef<[u8]>>(&self, key: K) -> crate::Result<()> {
pub fn remove<K: Into<UserKey>>(&self, key: K) -> crate::Result<()> {
use std::sync::atomic::Ordering;

if self.is_deleted.load(Ordering::Relaxed) {
return Err(crate::Error::PartitionDeleted);
}

let key = key.as_ref();
let key = key.into();

let mut journal_writer = self.journal.get_writer();

Expand All @@ -961,7 +965,7 @@ impl PartitionHandle {
return Err(crate::Error::Poisoned);
}

journal_writer.write_raw(&self.name, key, &[], lsm_tree::ValueType::Tombstone, seqno)?;
journal_writer.write_raw(&self.name, &key, &[], lsm_tree::ValueType::Tombstone, seqno)?;

if !self.config.manual_journal_persist {
journal_writer
Expand Down
1 change: 1 addition & 0 deletions src/partition/options.rs
Original file line number Diff line number Diff line change
Expand Up @@ -234,6 +234,7 @@ impl lsm_tree::coding::Decode for CreateOptions {
l0_threshold,
target_size,
level_ratio,
..Default::default()
})
}
1 => {
Expand Down
10 changes: 5 additions & 5 deletions src/tx/conflict_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,18 +31,18 @@ impl ConflictManager {
}
}

pub fn mark_read(&mut self, partition: &PartitionKey, key: &Slice) {
self.push_read(partition, Read::Single(key.clone()));
pub fn mark_read(&mut self, partition: &PartitionKey, key: Slice) {
self.push_read(partition, Read::Single(key));
}

pub fn mark_conflict(&mut self, partition: &PartitionKey, key: &[u8]) {
pub fn mark_conflict(&mut self, partition: &PartitionKey, key: Slice) {
if let Some(tbl) = self.conflict_keys.get_mut(partition) {
tbl.insert(key.into());
tbl.insert(key);
} else {
self.conflict_keys
.entry(partition.clone())
.or_default()
.insert(key.into());
.insert(key);
}
}

Expand Down
Loading

0 comments on commit fa09be9

Please sign in to comment.