Skip to content

Commit

Permalink
feat: add CAS
Browse files Browse the repository at this point in the history
  • Loading branch information
marvin-j97 committed Dec 3, 2023
1 parent ea9c878 commit 4642fbe
Show file tree
Hide file tree
Showing 3 changed files with 170 additions and 42 deletions.
5 changes: 3 additions & 2 deletions examples/actix-triplestore/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -122,13 +122,14 @@ async fn list_by_verb(
let mut edges = vec![];

for item in all {
let (key, _) = item?;
let (key, value) = item?;

let composite_key = std::str::from_utf8(&key).unwrap();
let verb_key = composite_key.split(':').nth(4).unwrap();
let object_key = composite_key.split(':').nth(6).unwrap();
let relation_data =
serde_json::from_str::<serde_json::Value>(std::str::from_utf8(&item).unwrap()).unwrap();
serde_json::from_str::<serde_json::Value>(std::str::from_utf8(&value).unwrap())
.unwrap();

let object_data = data
.db
Expand Down
12 changes: 12 additions & 0 deletions src/segment/range.rs
Original file line number Diff line number Diff line change
Expand Up @@ -206,6 +206,8 @@ mod tests {
)?;

for key in (0u64..ITEM_COUNT).map(u64::to_be_bytes) {
// NOTE: It's just a test
#[allow(clippy::expect_used)]
let item = iter.next().expect("item should exist")?;
assert_eq!(key, &*item.key);
}
Expand All @@ -221,6 +223,8 @@ mod tests {
)?;

for key in (0u64..ITEM_COUNT).rev().map(u64::to_be_bytes) {
// NOTE: It's just a test
#[allow(clippy::expect_used)]
let item = iter.next_back().expect("item should exist")?;
assert_eq!(key, &*item.key);
}
Expand All @@ -238,6 +242,8 @@ mod tests {
)?;

for key in (0..5_000).map(u64::to_be_bytes) {
// NOTE: It's just a test
#[allow(clippy::expect_used)]
let item = iter.next().expect("item should exist")?;
assert_eq!(key, &*item.key);
}
Expand All @@ -253,6 +259,8 @@ mod tests {
)?;

for key in (1_000..5_000).rev().map(u64::to_be_bytes) {
// NOTE: It's just a test
#[allow(clippy::expect_used)]
let item = iter.next_back().expect("item should exist")?;
assert_eq!(key, &*item.key);
}
Expand All @@ -270,6 +278,8 @@ mod tests {
)?;

for key in (1_000..5_000).map(u64::to_be_bytes) {
// NOTE: It's just a test
#[allow(clippy::expect_used)]
let item = iter.next().expect("item should exist")?;
assert_eq!(key, &*item.key);
}
Expand All @@ -287,6 +297,8 @@ mod tests {
)?;

for key in (1_000..5_000).rev().map(u64::to_be_bytes) {
// NOTE: It's just a test
#[allow(clippy::expect_used)]
let item = iter.next_back().expect("item should exist")?;
assert_eq!(key, &*item.key);
}
Expand Down
195 changes: 155 additions & 40 deletions src/tree.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,16 @@ use std::{
};
use std_semaphore::Semaphore;

pub struct CompareAndSwapError {
/// The value currently in the tree that caused the CAS error
pub prev: Option<Vec<u8>>,

/// The value that was proposed
pub next: Option<Vec<u8>>,
}

pub type CompareAndSwapResult = Result<(), CompareAndSwapError>;

/// A log-structured merge tree (LSM-tree/LSMT)
///
/// The tree is internally synchronized (Send + Sync), so it does not need to be wrapped in a lock nor an Arc.
Expand Down Expand Up @@ -932,76 +942,181 @@ impl Tree {
self.lsn.fetch_add(1, std::sync::atomic::Ordering::AcqRel)
}

/// Compare-and-swap an entry
///
/// # Errors
///
/// Will return `Err` if an IO error occurs.
pub fn compare_and_swap<K: AsRef<[u8]>>(
&self,
key: K,
expected: Option<&Vec<u8>>,
next: Option<&Vec<u8>>,
) -> crate::Result<CompareAndSwapResult> {
let key = key.as_ref();

match self.get(key)? {
Some(current_value) => {
match expected {
Some(expected_value) => {
// We expected Some and got Some
// Check if the value is as expected
if current_value != *expected_value {
return Ok(Err(CompareAndSwapError {
prev: Some(current_value),
next: next.cloned(),
}));
}

// Set or delete the object now
match next {
Some(next_value) => {
self.insert(key, next_value.clone())?;
Ok(Ok(()))
}
None => {
self.remove(key)?;
Ok(Ok(()))
}
}
}
None => {
// We expected Some but got None
// CAS error!
Ok(Err(CompareAndSwapError {
prev: None,
next: next.cloned(),
}))
}
}
}
None => match expected {
Some(_) => {
// We expected Some but got None
// CAS error!
Ok(Err(CompareAndSwapError {
prev: None,
next: next.cloned(),
}))
}
None => match next {
// We expected None and got None
// Set or delete the object now
Some(next_value) => {
self.insert(key, next_value.clone())?;
Ok(Ok(()))
}
None => {
self.remove(key)?;
Ok(Ok(()))
}
},
},
}
}

/// Atomically fetches and updates an item if it exists.
///
/// Returns the previous value if the item exists.
///
/// # Examples
///
/// ```
/// # let folder = tempfile::tempdir()?;
/// use lsm_tree::{Config, Tree};
///
/// let tree = Config::new(folder).open()?;
/// tree.insert("key", "a")?;
///
/// let prev = tree.fetch_update("key", |_| Some("b"))?.expect("item should exist");
/// assert_eq!("a".as_bytes(), prev);
///
/// let item = tree.get("key")?.expect("item should exist");
/// assert_eq!("b".as_bytes(), item);
///
/// let prev = tree.fetch_update("key", |_| None::<String>)?.expect("item should exist");
/// assert_eq!("b".as_bytes(), prev);
///
/// assert!(tree.is_empty()?);
/// #
/// # Ok::<(), lsm_tree::Error>(())
/// ```
///
/// # Errors
///
/// Will return `Err` if an IO error occurs.
pub fn fetch_update<K: AsRef<[u8]>, F: Fn(&[u8]) -> Vec<u8>>(
pub fn fetch_update<K: AsRef<[u8]>, V: Into<Vec<u8>>, F: Fn(Option<&Vec<u8>>) -> Option<V>>(
&self,
key: K,
f: F,
) -> crate::Result<Option<Vec<u8>>> {
// TODO: fully lock all shards
let key = key.as_ref();

let shard = self.journal.lock_shard();
let mut fetched = self.get(key)?;

loop {
let expected = fetched.as_ref();
let next = f(expected).map(Into::into);

Ok(match self.get_internal_entry(key, true)? {
Some(item) => {
let updated_value = f(&item.value);

self.append_entry(
shard,
Value {
key: item.key,
value: updated_value,
is_tombstone: false,
seqno: self.increment_lsn(),
},
)?;

Some(item.value)
match self.compare_and_swap(key, expected, next.as_ref())? {
Ok(_) => return Ok(fetched),
Err(err) => {
fetched = err.prev;
}
}
None => None,
})
}
}

/// Atomically fetches and updates an item if it exists.
///
/// Returns the updated value if the item exists.
///
/// # Examples
///
/// ```
/// # let folder = tempfile::tempdir()?;
/// use lsm_tree::{Config, Tree};
///
/// let tree = Config::new(folder).open()?;
/// tree.insert("key", "a")?;
///
/// let prev = tree.update_fetch("key", |_| Some("b"))?.expect("item should exist");
/// assert_eq!("b".as_bytes(), prev);
///
/// let item = tree.get("key")?.expect("item should exist");
/// assert_eq!("b".as_bytes(), item);
///
/// let prev = tree.update_fetch("key", |_| None::<String>)?;
/// assert_eq!(None, prev);
///
/// assert!(tree.is_empty()?);
/// #
/// # Ok::<(), lsm_tree::Error>(())
/// ```
///
/// # Errors
///
/// Will return `Err` if an IO error occurs.
pub fn update_fetch<K: AsRef<[u8]>, F: Fn(&[u8]) -> Vec<u8>>(
pub fn update_fetch<K: AsRef<[u8]>, V: Into<Vec<u8>>, F: Fn(Option<&Vec<u8>>) -> Option<V>>(
&self,
key: K,
f: F,
) -> crate::Result<Option<Vec<u8>>> {
// TODO: fully lock all shards
let key = key.as_ref();

let shard = self.journal.lock_shard();
let mut fetched = self.get(key)?;

loop {
let expected = fetched.as_ref();
let next = f(expected).map(Into::into);

Ok(match self.get_internal_entry(key, true)? {
Some(item) => {
let updated_value = f(&item.value);

self.append_entry(
shard,
Value {
key: item.key,
value: updated_value.clone(),
is_tombstone: false,
seqno: self.increment_lsn(),
},
)?;

Some(updated_value)
match self.compare_and_swap(key, expected, next.as_ref())? {
Ok(_) => return Ok(next),
Err(err) => {
fetched = err.prev;
}
}
None => None,
})
}
}

/// Force-starts a memtable flush thread.
Expand Down

0 comments on commit 4642fbe

Please sign in to comment.