diff --git a/Cargo.toml b/Cargo.toml index 9f2e75b9..12052cd4 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -2,7 +2,7 @@ name = "fjall" description = "LSM-based key-value storage engine" license = "MIT OR Apache-2.0" -version = "2.4.4" +version = "2.5.0" edition = "2021" rust-version = "1.74.0" readme = "README.md" @@ -28,7 +28,7 @@ bytes = ["lsm-tree/bytes"] [dependencies] byteorder = "1.5.0" -lsm-tree = { version = "2.4.0", default-features = false } +lsm-tree = { version = "2.5.0", default-features = false } log = "0.4.21" std-semaphore = "0.1.0" tempfile = "3.10.1" diff --git a/examples/permuterm/src/main.rs b/examples/permuterm/src/main.rs index ed346e45..fa87762f 100644 --- a/examples/permuterm/src/main.rs +++ b/examples/permuterm/src/main.rs @@ -80,7 +80,7 @@ fn main() -> Result<()> { for word in WORDS { for term in permuterm(word) { - db.insert(format!("{term}#{word}"), word).unwrap(); + db.insert(format!("{term}#{word}"), *word).unwrap(); } } diff --git a/examples/tx-blob-cas/src/main.rs b/examples/tx-blob-cas/src/main.rs index bb434086..510c967d 100644 --- a/examples/tx-blob-cas/src/main.rs +++ b/examples/tx-blob-cas/src/main.rs @@ -125,7 +125,7 @@ impl Cas { let rc = self.decrement_ref_count(&mut tx, &prev_content_hash)?; if rc == 0 { // No more references - tx.remove(&self.blobs, &prev_content_hash); + tx.remove(&self.blobs, prev_content_hash); } } @@ -156,7 +156,7 @@ impl Cas { let rc = self.decrement_ref_count(&mut tx, &content_hash)?; if rc == 0 { // No more references - tx.remove(&self.blobs, &content_hash); + tx.remove(&self.blobs, content_hash); } tx.commit()?; diff --git a/examples/tx-mpmc-queue/src/main.rs b/examples/tx-mpmc-queue/src/main.rs index a3397ffa..591b9e9d 100644 --- a/examples/tx-mpmc-queue/src/main.rs +++ b/examples/tx-mpmc-queue/src/main.rs @@ -63,11 +63,12 @@ fn main() -> fjall::Result<()> { // Something like SingleDelete https://github.com/facebook/rocksdb/wiki/Single-Delete // would be good for this type of workload if let Some((key, _)) = tx.first_key_value(&tasks)? { - tx.remove(&tasks, &key); + let task_id = std::str::from_utf8(&key).unwrap().to_owned(); + + tx.remove(&tasks, key); tx.commit()?; - let task_id = std::str::from_utf8(&key).unwrap(); println!("consumer {idx} completed task {task_id}"); counter.fetch_add(1, Relaxed); diff --git a/examples/tx-partition-move/src/main.rs b/examples/tx-partition-move/src/main.rs index 3d80d494..07af8079 100644 --- a/examples/tx-partition-move/src/main.rs +++ b/examples/tx-partition-move/src/main.rs @@ -33,12 +33,13 @@ fn main() -> fjall::Result<()> { // Something like SingleDelete https://github.com/facebook/rocksdb/wiki/Single-Delete // would be good for this type of workload if let Some((key, value)) = tx.first_key_value(&src)? { - tx.remove(&src, &key); - tx.insert(&dst, &key, &value); + let task_id = std::str::from_utf8(&key).unwrap().to_owned(); + + tx.remove(&src, key.clone()); + tx.insert(&dst, key, value); tx.commit()?; - let task_id = std::str::from_utf8(&key).unwrap(); println!("consumer {idx} moved {task_id}"); let ms = rng.gen_range(10..100); diff --git a/examples/tx-ssi-mpmc-queue/src/main.rs b/examples/tx-ssi-mpmc-queue/src/main.rs index 8a9e3541..37de49b9 100644 --- a/examples/tx-ssi-mpmc-queue/src/main.rs +++ b/examples/tx-ssi-mpmc-queue/src/main.rs @@ -65,13 +65,14 @@ fn main() -> fjall::Result<()> { // Something like SingleDelete https://github.com/facebook/rocksdb/wiki/Single-Delete // would be good for this type of workload if let Some((key, _)) = tx.first_key_value(&tasks)? { - tx.remove(&tasks, &key); + let task_id = std::str::from_utf8(&key).unwrap().to_owned(); + + tx.remove(&tasks, key); if tx.commit()?.is_ok() { counter.fetch_add(1, Relaxed); } - let task_id = std::str::from_utf8(&key).unwrap(); println!("consumer {idx} completed task {task_id}"); let ms = rng.gen_range(50..200); diff --git a/examples/tx-ssi-partition-move/src/main.rs b/examples/tx-ssi-partition-move/src/main.rs index 358927bf..714e0ab4 100644 --- a/examples/tx-ssi-partition-move/src/main.rs +++ b/examples/tx-ssi-partition-move/src/main.rs @@ -33,12 +33,13 @@ fn main() -> fjall::Result<()> { // Something like SingleDelete https://github.com/facebook/rocksdb/wiki/Single-Delete // would be good for this type of workload if let Some((key, value)) = tx.first_key_value(&src)? { - tx.remove(&src, &key); - tx.insert(&dst, &key, &value); + let task_id = std::str::from_utf8(&key).unwrap().to_owned(); + + tx.remove(&src, key.clone()); + tx.insert(&dst, key, value); tx.commit()?.ok(); - let task_id = std::str::from_utf8(&key).unwrap(); println!("consumer {idx} moved {task_id}"); let ms = rng.gen_range(10..100); diff --git a/src/batch/mod.rs b/src/batch/mod.rs index 3391225e..0462108b 100644 --- a/src/batch/mod.rs +++ b/src/batch/mod.rs @@ -6,7 +6,7 @@ pub mod item; use crate::{Keyspace, PartitionHandle, PersistMode}; use item::Item; -use lsm_tree::{AbstractTree, ValueType}; +use lsm_tree::{AbstractTree, UserKey, UserValue, ValueType}; use std::{collections::HashSet, sync::Arc}; /// Partition key (a.k.a. column family, locality group) @@ -52,28 +52,20 @@ impl Batch { } /// Inserts a key-value pair into the batch - pub fn insert, V: AsRef<[u8]>>( + pub fn insert, V: Into>( &mut self, p: &PartitionHandle, key: K, value: V, ) { - self.data.push(Item::new( - p.name.clone(), - key.as_ref(), - value.as_ref(), - ValueType::Value, - )); + self.data + .push(Item::new(p.name.clone(), key, value, ValueType::Value)); } /// Adds a tombstone marker for a key - pub fn remove>(&mut self, p: &PartitionHandle, key: K) { - self.data.push(Item::new( - p.name.clone(), - key.as_ref(), - vec![], - ValueType::Tombstone, - )); + pub fn remove>(&mut self, p: &PartitionHandle, key: K) { + self.data + .push(Item::new(p.name.clone(), key, vec![], ValueType::Tombstone)); } /// Commits the batch to the [`Keyspace`] atomically diff --git a/src/compaction/worker.rs b/src/compaction/worker.rs index 100fb8f4..e56a832c 100644 --- a/src/compaction/worker.rs +++ b/src/compaction/worker.rs @@ -5,9 +5,14 @@ use super::manager::CompactionManager; use crate::snapshot_tracker::SnapshotTracker; use lsm_tree::AbstractTree; +use std::sync::atomic::AtomicUsize; /// Runs a single run of compaction. -pub fn run(compaction_manager: &CompactionManager, snapshot_tracker: &SnapshotTracker) { +pub fn run( + compaction_manager: &CompactionManager, + snapshot_tracker: &SnapshotTracker, + compaction_counter: &AtomicUsize, +) { let Some(item) = compaction_manager.pop() else { return; }; @@ -21,10 +26,12 @@ pub fn run(compaction_manager: &CompactionManager, snapshot_tracker: &SnapshotTr // TODO: loop if there's more work to do + compaction_counter.fetch_add(1, std::sync::atomic::Ordering::Relaxed); if let Err(e) = item .tree .compact(strategy.inner(), snapshot_tracker.get_seqno_safe_to_gc()) { log::error!("Compaction failed: {e:?}"); }; + compaction_counter.fetch_sub(1, std::sync::atomic::Ordering::Relaxed); } diff --git a/src/flush/worker.rs b/src/flush/worker.rs index 815233bb..cc6b5f95 100644 --- a/src/flush/worker.rs +++ b/src/flush/worker.rs @@ -152,7 +152,10 @@ pub fn run( flush_manager.dequeue_tasks(partition.name.clone(), created_segments.len()); write_buffer_manager.free(memtables_size); - compaction_manager.notify(partition); + + for _ in 0..parallelism { + compaction_manager.notify(partition.clone()); + } } } Err(e) => { diff --git a/src/keyspace.rs b/src/keyspace.rs index 595efc13..6189c486 100644 --- a/src/keyspace.rs +++ b/src/keyspace.rs @@ -78,6 +78,9 @@ pub struct KeyspaceInner { /// True if fsync failed pub(crate) is_poisoned: Arc, + /// Active compaction conter + pub(crate) active_compaction_count: Arc, + #[doc(hidden)] pub snapshot_tracker: SnapshotTracker, } @@ -194,6 +197,14 @@ impl Keyspace { self.write_buffer_manager.get() } + /// Returns the number of active compactions currently running. + #[doc(hidden)] + #[must_use] + pub fn active_compactions(&self) -> usize { + self.active_compaction_count + .load(std::sync::atomic::Ordering::Relaxed) + } + /// Returns the amount of journals on disk. /// /// # Examples @@ -580,6 +591,7 @@ impl Keyspace { write_buffer_manager: WriteBufferManager::default(), is_poisoned: Arc::default(), snapshot_tracker: SnapshotTracker::default(), + active_compaction_count: Arc::default(), }; let keyspace = Self(Arc::new(inner)); @@ -713,6 +725,7 @@ impl Keyspace { write_buffer_manager: WriteBufferManager::default(), is_poisoned: Arc::default(), snapshot_tracker: SnapshotTracker::default(), + active_compaction_count: Arc::default(), }; // NOTE: Lastly, fsync .fjall marker, which contains the version @@ -792,6 +805,7 @@ impl Keyspace { let stop_signal = self.stop_signal.clone(); let thread_counter = self.active_background_threads.clone(); let snapshot_tracker = self.snapshot_tracker.clone(); + let compaction_counter = self.active_compaction_count.clone(); thread_counter.fetch_add(1, std::sync::atomic::Ordering::Relaxed); @@ -802,7 +816,11 @@ impl Keyspace { log::trace!("compaction: waiting for work"); compaction_manager.wait_for(); - crate::compaction::worker::run(&compaction_manager, &snapshot_tracker); + crate::compaction::worker::run( + &compaction_manager, + &snapshot_tracker, + &compaction_counter, + ); } log::trace!("compaction thread: exiting because keyspace is dropping"); diff --git a/src/partition/mod.rs b/src/partition/mod.rs index 912581fd..21bf6887 100644 --- a/src/partition/mod.rs +++ b/src/partition/mod.rs @@ -866,15 +866,19 @@ impl PartitionHandle { /// # Errors /// /// Will return `Err` if an IO error occurs. - pub fn insert, V: AsRef<[u8]>>(&self, key: K, value: V) -> crate::Result<()> { + pub fn insert, V: Into>( + &self, + key: K, + value: V, + ) -> crate::Result<()> { use std::sync::atomic::Ordering; if self.is_deleted.load(Ordering::Relaxed) { return Err(crate::Error::PartitionDeleted); } - let key = key.as_ref(); - let value = value.as_ref(); + let key = key.into(); + let value = value.into(); let mut journal_writer = self.journal.get_writer(); @@ -885,7 +889,7 @@ impl PartitionHandle { return Err(crate::Error::Poisoned); } - journal_writer.write_raw(&self.name, key, value, lsm_tree::ValueType::Value, seqno)?; + journal_writer.write_raw(&self.name, &key, &value, lsm_tree::ValueType::Value, seqno)?; if !self.config.manual_journal_persist { journal_writer @@ -943,14 +947,14 @@ impl PartitionHandle { /// # Errors /// /// Will return `Err` if an IO error occurs. - pub fn remove>(&self, key: K) -> crate::Result<()> { + pub fn remove>(&self, key: K) -> crate::Result<()> { use std::sync::atomic::Ordering; if self.is_deleted.load(Ordering::Relaxed) { return Err(crate::Error::PartitionDeleted); } - let key = key.as_ref(); + let key = key.into(); let mut journal_writer = self.journal.get_writer(); @@ -961,7 +965,7 @@ impl PartitionHandle { return Err(crate::Error::Poisoned); } - journal_writer.write_raw(&self.name, key, &[], lsm_tree::ValueType::Tombstone, seqno)?; + journal_writer.write_raw(&self.name, &key, &[], lsm_tree::ValueType::Tombstone, seqno)?; if !self.config.manual_journal_persist { journal_writer diff --git a/src/partition/options.rs b/src/partition/options.rs index df7570b9..977cc10e 100644 --- a/src/partition/options.rs +++ b/src/partition/options.rs @@ -234,6 +234,7 @@ impl lsm_tree::coding::Decode for CreateOptions { l0_threshold, target_size, level_ratio, + ..Default::default() }) } 1 => { diff --git a/src/tx/conflict_manager.rs b/src/tx/conflict_manager.rs index c2b1f6c9..78ca52b8 100644 --- a/src/tx/conflict_manager.rs +++ b/src/tx/conflict_manager.rs @@ -31,18 +31,18 @@ impl ConflictManager { } } - pub fn mark_read(&mut self, partition: &PartitionKey, key: &Slice) { - self.push_read(partition, Read::Single(key.clone())); + pub fn mark_read(&mut self, partition: &PartitionKey, key: Slice) { + self.push_read(partition, Read::Single(key)); } - pub fn mark_conflict(&mut self, partition: &PartitionKey, key: &[u8]) { + pub fn mark_conflict(&mut self, partition: &PartitionKey, key: Slice) { if let Some(tbl) = self.conflict_keys.get_mut(partition) { - tbl.insert(key.into()); + tbl.insert(key); } else { self.conflict_keys .entry(partition.clone()) .or_default() - .insert(key.into()); + .insert(key); } } diff --git a/src/tx/partition.rs b/src/tx/partition.rs index ac9ac395..e41b8107 100644 --- a/src/tx/partition.rs +++ b/src/tx/partition.rs @@ -3,7 +3,7 @@ // (found in the LICENSE-* files in the repository) use crate::{gc::GarbageCollection, PartitionHandle, TxKeyspace}; -use lsm_tree::{gc::Report as GcReport, KvPair, UserValue}; +use lsm_tree::{gc::Report as GcReport, KvPair, UserKey, UserValue}; use std::path::PathBuf; /// Access to a partition of a transactional keyspace @@ -63,7 +63,7 @@ impl TransactionalPartitionHandle { /// # Errors /// /// Will return `Err` if an IO error occurs. - pub fn take>(&self, key: K) -> crate::Result> { + pub fn take>(&self, key: K) -> crate::Result> { self.fetch_update(key, |_| None) } @@ -121,11 +121,13 @@ impl TransactionalPartitionHandle { /// /// Will return `Err` if an IO error occurs. #[allow(unused_mut)] - pub fn fetch_update, F: FnMut(Option<&UserValue>) -> Option>( + pub fn fetch_update, F: FnMut(Option<&UserValue>) -> Option>( &self, key: K, mut f: F, ) -> crate::Result> { + let key: UserKey = key.into(); + #[cfg(feature = "single_writer_tx")] { let mut tx = self.keyspace.write_tx(); @@ -139,7 +141,7 @@ impl TransactionalPartitionHandle { #[cfg(feature = "ssi_tx")] loop { let mut tx = self.keyspace.write_tx()?; - let prev = tx.fetch_update(self, key.as_ref(), &mut f)?; + let prev = tx.fetch_update(self, key.clone(), &mut f)?; if tx.commit()?.is_ok() { return Ok(prev); } @@ -200,11 +202,13 @@ impl TransactionalPartitionHandle { /// /// Will return `Err` if an IO error occurs. #[allow(unused_mut)] - pub fn update_fetch, F: FnMut(Option<&UserValue>) -> Option>( + pub fn update_fetch, F: FnMut(Option<&UserValue>) -> Option>( &self, key: K, mut f: F, ) -> crate::Result> { + let key = key.into(); + #[cfg(feature = "single_writer_tx")] { let mut tx = self.keyspace.write_tx(); @@ -217,7 +221,7 @@ impl TransactionalPartitionHandle { #[cfg(feature = "ssi_tx")] loop { let mut tx = self.keyspace.write_tx()?; - let updated = tx.update_fetch(self, key.as_ref(), &mut f)?; + let updated = tx.update_fetch(self, key.clone(), &mut f)?; if tx.commit()?.is_ok() { return Ok(updated); } @@ -251,7 +255,11 @@ impl TransactionalPartitionHandle { /// # Errors /// /// Will return `Err` if an IO error occurs. - pub fn insert, V: AsRef<[u8]>>(&self, key: K, value: V) -> crate::Result<()> { + pub fn insert, V: Into>( + &self, + key: K, + value: V, + ) -> crate::Result<()> { #[cfg(feature = "single_writer_tx")] { let mut tx = self.keyspace.write_tx(); @@ -263,7 +271,7 @@ impl TransactionalPartitionHandle { #[cfg(feature = "ssi_tx")] { let mut tx = self.keyspace.write_tx()?; - tx.insert(self, key.as_ref(), value.as_ref()); + tx.insert(self, key, value); tx.commit()?.expect("blind insert should not conflict ever"); Ok(()) } @@ -296,7 +304,7 @@ impl TransactionalPartitionHandle { /// # Errors /// /// Will return `Err` if an IO error occurs. - pub fn remove>(&self, key: K) -> crate::Result<()> { + pub fn remove>(&self, key: K) -> crate::Result<()> { #[cfg(feature = "single_writer_tx")] { let mut tx = self.keyspace.write_tx(); @@ -308,7 +316,7 @@ impl TransactionalPartitionHandle { #[cfg(feature = "ssi_tx")] { let mut tx = self.keyspace.write_tx()?; - tx.remove(self, key.as_ref()); + tx.remove(self, key); tx.commit()?.expect("blind remove should not conflict ever"); Ok(()) } diff --git a/src/tx/write/mod.rs b/src/tx/write/mod.rs index 75796c8e..b3a08221 100644 --- a/src/tx/write/mod.rs +++ b/src/tx/write/mod.rs @@ -59,7 +59,7 @@ impl BaseTransaction { /// # Errors /// /// Will return `Err` if an IO error occurs. - pub(super) fn take>( + pub(super) fn take>( &mut self, partition: &TxPartitionHandle, key: K, @@ -75,7 +75,7 @@ impl BaseTransaction { /// /// Will return `Err` if an IO error occurs. pub(super) fn update_fetch< - K: AsRef<[u8]>, + K: Into, F: FnMut(Option<&UserValue>) -> Option, >( &mut self, @@ -83,16 +83,17 @@ impl BaseTransaction { key: K, mut f: F, ) -> crate::Result> { + let key = key.into(); let prev = self.get(partition, &key)?; let updated = f(prev.as_ref()); - if let Some(value) = &updated { + if let Some(value) = updated.clone() { // NOTE: Skip insert if the value hasn't changed - if updated != prev { - self.insert(partition, &key, value); + if prev.as_ref() != Some(&value) { + self.insert(partition, key, value); } } else if prev.is_some() { - self.remove(partition, &key); + self.remove(partition, key); } Ok(updated) @@ -106,7 +107,7 @@ impl BaseTransaction { /// /// Will return `Err` if an IO error occurs. pub(super) fn fetch_update< - K: AsRef<[u8]>, + K: Into, F: FnMut(Option<&UserValue>) -> Option, >( &mut self, @@ -114,16 +115,17 @@ impl BaseTransaction { key: K, mut f: F, ) -> crate::Result> { + let key = key.into(); let prev = self.get(partition, &key)?; let updated = f(prev.as_ref()); - if let Some(value) = &updated { + if let Some(value) = updated { // NOTE: Skip insert if the value hasn't changed - if updated != prev { - self.insert(partition, &key, value); + if prev.as_ref() != Some(&value) { + self.insert(partition, key, value); } } else if prev.is_some() { - self.remove(partition, &key); + self.remove(partition, key); } Ok(prev) @@ -350,7 +352,7 @@ impl BaseTransaction { /// # Errors /// /// Will return `Err` if an IO error occurs. - pub(super) fn insert, V: AsRef<[u8]>>( + pub(super) fn insert, V: Into>( &mut self, partition: &TxPartitionHandle, key: K, @@ -361,8 +363,8 @@ impl BaseTransaction { .entry(partition.inner.name.clone()) .or_default() .insert(lsm_tree::InternalValue::from_components( - key.as_ref(), - value.as_ref(), + key, + value, // NOTE: Just take the max seqno, which should never be reached // that way, the write is definitely always the newest SeqNo::MAX, @@ -378,13 +380,13 @@ impl BaseTransaction { /// # Errors /// /// Will return `Err` if an IO error occurs. - pub(super) fn remove>(&mut self, partition: &TxPartitionHandle, key: K) { + pub(super) fn remove>(&mut self, partition: &TxPartitionHandle, key: K) { // TODO: PERF: slow?? self.memtables .entry(partition.inner.name.clone()) .or_default() .insert(lsm_tree::InternalValue::new_tombstone( - key.as_ref(), + key, // NOTE: Just take the max seqno, which should never be reached // that way, the write is definitely always the newest SeqNo::MAX, diff --git a/src/tx/write/single_writer.rs b/src/tx/write/single_writer.rs index be9557d4..5be788fb 100644 --- a/src/tx/write/single_writer.rs +++ b/src/tx/write/single_writer.rs @@ -58,7 +58,7 @@ impl<'a> WriteTransaction<'a> { /// # Errors /// /// Will return `Err` if an IO error occurs. - pub fn take>( + pub fn take>( &mut self, partition: &TxPartitionHandle, key: K, @@ -116,7 +116,7 @@ impl<'a> WriteTransaction<'a> { /// # Errors /// /// Will return `Err` if an IO error occurs. - pub fn update_fetch, F: FnMut(Option<&UserValue>) -> Option>( + pub fn update_fetch, F: FnMut(Option<&UserValue>) -> Option>( &mut self, partition: &TxPartitionHandle, key: K, @@ -175,7 +175,7 @@ impl<'a> WriteTransaction<'a> { /// # Errors /// /// Will return `Err` if an IO error occurs. - pub fn fetch_update, F: FnMut(Option<&UserValue>) -> Option>( + pub fn fetch_update, F: FnMut(Option<&UserValue>) -> Option>( &mut self, partition: &TxPartitionHandle, key: K, @@ -555,7 +555,7 @@ impl<'a> WriteTransaction<'a> { /// # Errors /// /// Will return `Err` if an IO error occurs. - pub fn insert, V: AsRef<[u8]>>( + pub fn insert, V: Into>( &mut self, partition: &TxPartitionHandle, key: K, @@ -598,7 +598,7 @@ impl<'a> WriteTransaction<'a> { /// # Errors /// /// Will return `Err` if an IO error occurs. - pub fn remove>(&mut self, partition: &TxPartitionHandle, key: K) { + pub fn remove>(&mut self, partition: &TxPartitionHandle, key: K) { self.inner.remove(partition, key); } diff --git a/src/tx/write/ssi.rs b/src/tx/write/ssi.rs index 33ea14e3..fa8e2f78 100644 --- a/src/tx/write/ssi.rs +++ b/src/tx/write/ssi.rs @@ -72,7 +72,7 @@ impl WriteTransaction { /// # Errors /// /// Will return `Err` if an IO error occurs. - pub fn take>( + pub fn take>( &mut self, partition: &TxPartitionHandle, key: K, @@ -130,17 +130,17 @@ impl WriteTransaction { /// # Errors /// /// Will return `Err` if an IO error occurs. - pub fn update_fetch, F: FnMut(Option<&UserValue>) -> Option>( + pub fn update_fetch, F: FnMut(Option<&UserValue>) -> Option>( &mut self, partition: &TxPartitionHandle, key: K, f: F, ) -> crate::Result> { - let key = key.as_ref(); + let key: UserKey = key.into(); - let updated = self.inner.update_fetch(partition, key, f)?; + let updated = self.inner.update_fetch(partition, key.clone(), f)?; - self.cm.mark_read(&partition.inner.name, &key.into()); + self.cm.mark_read(&partition.inner.name, key.clone()); self.cm.mark_conflict(&partition.inner.name, key); Ok(updated) @@ -196,17 +196,17 @@ impl WriteTransaction { /// # Errors /// /// Will return `Err` if an IO error occurs. - pub fn fetch_update, F: FnMut(Option<&UserValue>) -> Option>( + pub fn fetch_update, F: FnMut(Option<&UserValue>) -> Option>( &mut self, partition: &TxPartitionHandle, key: K, f: F, ) -> crate::Result> { - let key = key.as_ref(); + let key = key.into(); - let prev = self.inner.fetch_update(partition, key, f)?; + let prev = self.inner.fetch_update(partition, key.clone(), f)?; - self.cm.mark_read(&partition.inner.name, &key.into()); + self.cm.mark_read(&partition.inner.name, key.clone()); self.cm.mark_conflict(&partition.inner.name, key); Ok(prev) @@ -253,7 +253,7 @@ impl WriteTransaction { let res = self.inner.get(partition, key.as_ref())?; self.cm - .mark_read(&partition.inner.name, &key.as_ref().into()); + .mark_read(&partition.inner.name, key.as_ref().into()); Ok(res) } @@ -338,7 +338,7 @@ impl WriteTransaction { let contains = self.inner.contains_key(partition, key.as_ref())?; self.cm - .mark_read(&partition.inner.name, &key.as_ref().into()); + .mark_read(&partition.inner.name, key.as_ref().into()); Ok(contains) } @@ -628,15 +628,15 @@ impl WriteTransaction { /// # Errors /// /// Will return `Err` if an IO error occurs. - pub fn insert, V: AsRef<[u8]>>( + pub fn insert, V: Into>( &mut self, partition: &TxPartitionHandle, key: K, value: V, ) { - let key = key.as_ref(); + let key: UserKey = key.into(); - self.inner.insert(partition, key, value); + self.inner.insert(partition, key.clone(), value); self.cm.mark_conflict(&partition.inner.name, key); } @@ -674,10 +674,10 @@ impl WriteTransaction { /// # Errors /// /// Will return `Err` if an IO error occurs. - pub fn remove>(&mut self, partition: &TxPartitionHandle, key: K) { - let key = key.as_ref(); + pub fn remove>(&mut self, partition: &TxPartitionHandle, key: K) { + let key: UserKey = key.into(); - self.inner.remove(partition, key); + self.inner.remove(partition, key.clone()); self.cm.mark_conflict(&partition.inner.name, key); }