From bd9a80f7b54298208d820eef2347f75905a6047c Mon Sep 17 00:00:00 2001 From: Al Liu Date: Sat, 27 Apr 2024 17:16:50 +0800 Subject: [PATCH] Cleanup and add more write skew tests --- Cargo.toml | 2 +- README.md | 4 +- async-skipdb/README-zh_CN.md | 4 +- async-skipdb/README.md | 4 +- async-skipdb/src/optimistic/tests.rs | 78 ---- async-skipdb/src/serializable/optimistic.rs | 21 +- .../src/serializable/optimistic/tests.rs | 78 ---- skipdb-core/Cargo.toml | 2 +- skipdb/README-zh_CN.md | 4 +- skipdb/README.md | 4 +- skipdb/src/optimistic/tests.rs | 118 +----- skipdb/src/optimistic/tests/write_skew.rs | 207 ++++++++++ skipdb/src/optimistic/write.rs | 2 - skipdb/src/serializable/optimistic.rs | 14 +- skipdb/src/serializable/optimistic/tests.rs | 118 +----- .../optimistic/tests/write_skew.rs | 207 ++++++++++ skipdb/src/serializable/serializable/tests.rs | 214 +--------- .../serializable/tests/write_skew.rs | 364 ++++++++++++++++++ txn-core/Cargo.toml | 1 + txn-core/src/sync.rs | 19 + txn-core/src/sync/hash_cm.rs | 81 ++-- 21 files changed, 864 insertions(+), 682 deletions(-) create mode 100644 skipdb/src/optimistic/tests/write_skew.rs create mode 100644 skipdb/src/serializable/optimistic/tests/write_skew.rs create mode 100644 skipdb/src/serializable/serializable/tests/write_skew.rs diff --git a/Cargo.toml b/Cargo.toml index 6ff7c2f..db6fff6 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -38,7 +38,7 @@ wmark = { path = "wmark", version = "0.1", default-features = false } txn-core = { path = "txn-core", version = "0.1", default-features = false } txn = { path = "txn", version = "0.1", default-features = false } async-txn = { path = "async-txn", version = "0.1", default-features = false } -skipdb-core = { path = "skipdb-core", version = "0.1", default-features = false } +skipdb-core = { path = "skipdb-core", version = "0.2", default-features = false } [workspace.metadata.docs.rs] diff --git a/README.md b/README.md index f8faa31..e9cf44e 100644 --- a/README.md +++ b/README.md @@ -61,13 +61,13 @@ This repository contains two kinds of in-memory key-value database which support Transactions are created by `SerializableDb::serializable_write` can handle all kinds of write skew correctly. - Transactions are created by `SerializableDb::optimistic_write` can handle all kinds of direct dependent write skew, but cannot handle all kinds of indirect dependent write skew e.g. range intersection between two concurrent transactions (see unit tests `write_skew_intersecting_data2` and `write_skew_intersecting_data3` for more details). + Transactions are created by `SerializableDb::optimistic_write` can handle all kinds of direct dependent write skew, but cannot handle all kinds of indirect dependent write skew e.g. https://wiki.postgresql.org/wiki/SSI#Intersecting_Data. 2. `OptimisticDb` Only support oncurrent execution of optimistic concurrency control, which means the write transaction cannot detect all kinds of write skew. - All kinds of direct dependent write skew can be handled correctly, but cannot handle all kinds of indirect dependent write skew e.g. range intersection between two concurrent transactions (see unit tests `write_skew_intersecting_data2` and `write_skew_intersecting_data3` for more details). + All kinds of direct dependent write skew can be handled correctly, but cannot handle all kinds of indirect dependent write skew e.g. https://wiki.postgresql.org/wiki/SSI#Intersecting_Data. ### Features diff --git a/async-skipdb/README-zh_CN.md b/async-skipdb/README-zh_CN.md index ebf9a9b..bd0b836 100644 --- a/async-skipdb/README-zh_CN.md +++ b/async-skipdb/README-zh_CN.md @@ -36,13 +36,13 @@ This crate contains two kinds of in-memory key-value database: Transactions are created by `SerializableDb::serializable_write` can handle all kinds of write skew correctly. - Transactions are created by `SerializableDb::optimistic_write` can handle all kinds of direct dependent write skew, but cannot handle all kinds of indirect dependent write skew e.g. range intersection between two concurrent transactions (see unit tests `write_skew_intersecting_data2` and `write_skew_intersecting_data3` for more details). + Transactions are created by `SerializableDb::optimistic_write` can handle all kinds of direct dependent write skew, but cannot handle all kinds of indirect dependent write skew e.g. https://wiki.postgresql.org/wiki/SSI#Intersecting_Data. 2. `OptimisticDb` Only support oncurrent execution of optimistic concurrency control, which means the write transaction cannot detect all kinds of write skew. - All kinds of direct dependent write skew can be handled correctly, but cannot handle all kinds of indirect dependent write skew e.g. range intersection between two concurrent transactions (see unit tests `write_skew_intersecting_data2` and `write_skew_intersecting_data3` for more details). + All kinds of direct dependent write skew can be handled correctly, but cannot handle all kinds of indirect dependent write skew e.g. https://wiki.postgresql.org/wiki/SSI#Intersecting_Data. ## Features diff --git a/async-skipdb/README.md b/async-skipdb/README.md index ebf9a9b..bd0b836 100644 --- a/async-skipdb/README.md +++ b/async-skipdb/README.md @@ -36,13 +36,13 @@ This crate contains two kinds of in-memory key-value database: Transactions are created by `SerializableDb::serializable_write` can handle all kinds of write skew correctly. - Transactions are created by `SerializableDb::optimistic_write` can handle all kinds of direct dependent write skew, but cannot handle all kinds of indirect dependent write skew e.g. range intersection between two concurrent transactions (see unit tests `write_skew_intersecting_data2` and `write_skew_intersecting_data3` for more details). + Transactions are created by `SerializableDb::optimistic_write` can handle all kinds of direct dependent write skew, but cannot handle all kinds of indirect dependent write skew e.g. https://wiki.postgresql.org/wiki/SSI#Intersecting_Data. 2. `OptimisticDb` Only support oncurrent execution of optimistic concurrency control, which means the write transaction cannot detect all kinds of write skew. - All kinds of direct dependent write skew can be handled correctly, but cannot handle all kinds of indirect dependent write skew e.g. range intersection between two concurrent transactions (see unit tests `write_skew_intersecting_data2` and `write_skew_intersecting_data3` for more details). + All kinds of direct dependent write skew can be handled correctly, but cannot handle all kinds of indirect dependent write skew e.g. https://wiki.postgresql.org/wiki/SSI#Intersecting_Data. ## Features diff --git a/async-skipdb/src/optimistic/tests.rs b/async-skipdb/src/optimistic/tests.rs index 774ac07..dc929a3 100644 --- a/async-skipdb/src/optimistic/tests.rs +++ b/async-skipdb/src/optimistic/tests.rs @@ -359,84 +359,6 @@ fn txn_write_skew_smol() { smol::block_on(txn_write_skew_in::()); } -// https://wiki.postgresql.org/wiki/SSI#Intersecting_Data -async fn txn_write_skew_intersecting_data_in() { - let db: OptimisticDb<&'static str, u64, S> = OptimisticDb::new().await; - - // Setup - let mut txn = db.write().await; - txn.insert("a1", 10).unwrap(); - txn.insert("a2", 20).unwrap(); - txn.insert("b1", 100).unwrap(); - txn.insert("b2", 200).unwrap(); - txn.commit().await.unwrap(); - assert_eq!(1, db.version().await); - - let mut txn1 = db.write().await; - let val = txn1 - .iter() - .unwrap() - .filter_map(|ele| { - if ele.key().starts_with('a') { - Some(*ele.value()) - } else { - None - } - }) - .sum::(); - txn1.insert("b3", 30).unwrap(); - assert_eq!(30, val); - - let mut txn2 = db.write().await; - let val = txn2 - .iter() - .unwrap() - .filter_map(|ele| { - if ele.key().starts_with('b') { - Some(*ele.value()) - } else { - None - } - }) - .sum::(); - txn2.insert("a3", 300).unwrap(); - assert_eq!(300, val); - txn2.commit().await.unwrap(); - txn1.commit().await.unwrap_err(); - - let mut txn3 = db.write().await; - let val = txn3 - .iter() - .unwrap() - .filter_map(|ele| { - if ele.key().starts_with('a') { - Some(*ele.value()) - } else { - None - } - }) - .sum::(); - assert_eq!(330, val); -} - -#[tokio::test] -#[cfg(feature = "tokio")] -async fn txn_write_skew_intersecting_data_tokio() { - txn_write_skew_intersecting_data_in::().await; -} - -#[async_std::test] -#[cfg(feature = "async-std")] -async fn txn_write_skew_intersecting_data_async_std() { - txn_write_skew_intersecting_data_in::().await; -} - -#[test] -#[cfg(feature = "smol")] -fn txn_write_skew_intersecting_data_smol() { - smol::block_on(txn_write_skew_intersecting_data_in::()); -} - async fn txn_conflict_get_in() { let set_count = Arc::new(AtomicU32::new(0)); diff --git a/async-skipdb/src/serializable/optimistic.rs b/async-skipdb/src/serializable/optimistic.rs index ecc22c1..ba36456 100644 --- a/async-skipdb/src/serializable/optimistic.rs +++ b/async-skipdb/src/serializable/optimistic.rs @@ -1,7 +1,7 @@ use async_txn::{error::WtmError, PwmComparableRange}; use skipdb_core::rev_range::WriteTransactionRevRange; -use std::{convert::Infallible, future::Future, ops::Bound}; +use std::{convert::Infallible, future::Future}; use super::*; @@ -183,18 +183,14 @@ where &mut self, ) -> Result>, TransactionError> { let version = self.wtm.version(); - let (mut marker, pm) = self + let (marker, pm) = self .wtm .blocking_marker_with_pm() .ok_or(TransactionError::Discard)?; - - let start: Bound = Bound::Unbounded; - let end: Bound = Bound::Unbounded; - marker.mark_range((start, end)); let committed = self.db.inner.map.iter(version); let pendings = pm.iter(); - Ok(TransactionIter::new(pendings, committed, None)) + Ok(TransactionIter::new(pendings, committed, Some(marker))) } /// Iterate over the entries of the write transaction in reverse order. @@ -204,18 +200,19 @@ where ) -> Result>, TransactionError> { let version = self.wtm.version(); - let (mut marker, pm) = self + let (marker, pm) = self .wtm .blocking_marker_with_pm() .ok_or(TransactionError::Discard)?; - let start: Bound = Bound::Unbounded; - let end: Bound = Bound::Unbounded; - marker.mark_range((start, end)); let committed = self.db.inner.map.iter_rev(version); let pendings = pm.iter().rev(); - Ok(WriteTransactionRevIter::new(pendings, committed, None)) + Ok(WriteTransactionRevIter::new( + pendings, + committed, + Some(marker), + )) } /// Returns an iterator over the subset of entries of the database. diff --git a/async-skipdb/src/serializable/optimistic/tests.rs b/async-skipdb/src/serializable/optimistic/tests.rs index 51aac00..f6e2dc0 100644 --- a/async-skipdb/src/serializable/optimistic/tests.rs +++ b/async-skipdb/src/serializable/optimistic/tests.rs @@ -358,84 +358,6 @@ fn txn_write_skew_smol() { smol::block_on(txn_write_skew_in::()); } -// https://wiki.postgresql.org/wiki/SSI#Intersecting_Data -async fn txn_write_skew_intersecting_data_in() { - let db: SerializableDb<&'static str, u64, S> = SerializableDb::new().await; - - // Setup - let mut txn = db.optimistic_write().await; - txn.insert("a1", 10).unwrap(); - txn.insert("a2", 20).unwrap(); - txn.insert("b1", 100).unwrap(); - txn.insert("b2", 200).unwrap(); - txn.commit().await.unwrap(); - assert_eq!(1, db.version().await); - - let mut txn1 = db.optimistic_write().await; - let val = txn1 - .iter() - .unwrap() - .filter_map(|ele| { - if ele.key().starts_with('a') { - Some(*ele.value()) - } else { - None - } - }) - .sum::(); - txn1.insert("b3", 30).unwrap(); - assert_eq!(30, val); - - let mut txn2 = db.optimistic_write().await; - let val = txn2 - .iter() - .unwrap() - .filter_map(|ele| { - if ele.key().starts_with('b') { - Some(*ele.value()) - } else { - None - } - }) - .sum::(); - txn2.insert("a3", 300).unwrap(); - assert_eq!(300, val); - txn2.commit().await.unwrap(); - txn1.commit().await.unwrap_err(); - - let mut txn3 = db.optimistic_write().await; - let val = txn3 - .iter() - .unwrap() - .filter_map(|ele| { - if ele.key().starts_with('a') { - Some(*ele.value()) - } else { - None - } - }) - .sum::(); - assert_eq!(330, val); -} - -#[tokio::test] -#[cfg(feature = "tokio")] -async fn txn_write_skew_intersecting_data_tokio() { - txn_write_skew_intersecting_data_in::().await; -} - -#[async_std::test] -#[cfg(feature = "async-std")] -async fn txn_write_skew_intersecting_data_async_std() { - txn_write_skew_intersecting_data_in::().await; -} - -#[test] -#[cfg(feature = "smol")] -fn txn_write_skew_intersecting_data_smol() { - smol::block_on(txn_write_skew_intersecting_data_in::()); -} - async fn txn_conflict_get_in() { let set_count = Arc::new(AtomicU32::new(0)); diff --git a/skipdb-core/Cargo.toml b/skipdb-core/Cargo.toml index e840e35..0230179 100644 --- a/skipdb-core/Cargo.toml +++ b/skipdb-core/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "skipdb-core" -version.workspace = true +version = "0.2.0" edition.workspace = true rust-version.workspace = true repository.workspace = true diff --git a/skipdb/README-zh_CN.md b/skipdb/README-zh_CN.md index ffd1b1d..eda4bee 100644 --- a/skipdb/README-zh_CN.md +++ b/skipdb/README-zh_CN.md @@ -36,13 +36,13 @@ This crate contains two kinds of in-memory key-value database: Transactions are created by `SerializableDb::serializable_write` can handle all kinds of write skew correctly. - Transactions are created by `SerializableDb::optimistic_write` can handle all kinds of direct dependent write skew, but cannot handle all kinds of indirect dependent write skew e.g. range intersection between two concurrent transactions (see unit tests `write_skew_intersecting_data2` and `write_skew_intersecting_data3` for more details). + Transactions are created by `SerializableDb::optimistic_write` can handle all kinds of direct dependent write skew, but cannot handle all kinds of indirect dependent write skew e.g. https://wiki.postgresql.org/wiki/SSI#Intersecting_Data. 2. `OptimisticDb` Only support oncurrent execution of optimistic concurrency control, which means the write transaction cannot detect all kinds of write skew. - All kinds of direct dependent write skew can be handled correctly, but cannot handle all kinds of indirect dependent write skew e.g. range intersection between two concurrent transactions (see unit tests `write_skew_intersecting_data2` and `write_skew_intersecting_data3` for more details). + All kinds of direct dependent write skew can be handled correctly, but cannot handle all kinds of indirect dependent write skew e.g. https://wiki.postgresql.org/wiki/SSI#Intersecting_Data. ## Features diff --git a/skipdb/README.md b/skipdb/README.md index ffd1b1d..eda4bee 100644 --- a/skipdb/README.md +++ b/skipdb/README.md @@ -36,13 +36,13 @@ This crate contains two kinds of in-memory key-value database: Transactions are created by `SerializableDb::serializable_write` can handle all kinds of write skew correctly. - Transactions are created by `SerializableDb::optimistic_write` can handle all kinds of direct dependent write skew, but cannot handle all kinds of indirect dependent write skew e.g. range intersection between two concurrent transactions (see unit tests `write_skew_intersecting_data2` and `write_skew_intersecting_data3` for more details). + Transactions are created by `SerializableDb::optimistic_write` can handle all kinds of direct dependent write skew, but cannot handle all kinds of indirect dependent write skew e.g. https://wiki.postgresql.org/wiki/SSI#Intersecting_Data. 2. `OptimisticDb` Only support oncurrent execution of optimistic concurrency control, which means the write transaction cannot detect all kinds of write skew. - All kinds of direct dependent write skew can be handled correctly, but cannot handle all kinds of indirect dependent write skew e.g. range intersection between two concurrent transactions (see unit tests `write_skew_intersecting_data2` and `write_skew_intersecting_data3` for more details). + All kinds of direct dependent write skew can be handled correctly, but cannot handle all kinds of indirect dependent write skew e.g. https://wiki.postgresql.org/wiki/SSI#Intersecting_Data. ## Features diff --git a/skipdb/src/optimistic/tests.rs b/skipdb/src/optimistic/tests.rs index 1a355dd..424c727 100644 --- a/skipdb/src/optimistic/tests.rs +++ b/skipdb/src/optimistic/tests.rs @@ -10,6 +10,8 @@ use wmark::Closer; use super::*; +mod write_skew; + #[test] fn begin_tx_readable() { let db: OptimisticDb<&'static str, Vec> = OptimisticDb::new(); @@ -175,122 +177,6 @@ fn txn_commit_with_callback() { std::thread::sleep(Duration::from_millis(10)); } -#[test] -fn txn_write_skew() { - // accounts - let a999 = 999; - let a888 = 888; - let db: OptimisticDb = OptimisticDb::new(); - - // Set balance to $100 in each account. - let mut txn = db.write(); - txn.insert(a999, 100).unwrap(); - txn.insert(a888, 100).unwrap(); - txn.commit().unwrap(); - assert_eq!(1, db.version()); - - let get_bal = |txn: &mut OptimisticTransaction, k: &u64| -> u64 { - let item = txn.get(k).unwrap().unwrap(); - let val = *item.value(); - val - }; - - // Start two transactions, each would read both accounts and deduct from one account. - let mut txn1 = db.write(); - - let mut sum = get_bal(&mut txn1, &a999); - sum += get_bal(&mut txn1, &a888); - assert_eq!(200, sum); - txn1.insert(a999, 0).unwrap(); // Deduct 100 from a999 - - // Let's read this back. - let mut sum = get_bal(&mut txn1, &a999); - assert_eq!(0, sum); - sum += get_bal(&mut txn1, &a888); - assert_eq!(100, sum); - // Don't commit yet. - - let mut txn2 = db.write(); - - let mut sum = get_bal(&mut txn2, &a999); - sum += get_bal(&mut txn2, &a888); - assert_eq!(200, sum); - txn2.insert(a888, 0).unwrap(); // Deduct 100 from a888 - - // Let's read this back. - let mut sum = get_bal(&mut txn2, &a999); - assert_eq!(100, sum); - sum += get_bal(&mut txn2, &a888); - assert_eq!(100, sum); - - // Commit both now. - txn1.commit().unwrap(); - txn2.commit().unwrap_err(); // This should fail - - assert_eq!(2, db.version()); -} - -// https://wiki.postgresql.org/wiki/SSI#Intersecting_Data -#[test] -fn txn_write_skew_intersecting_data() { - let db: OptimisticDb<&'static str, u64> = OptimisticDb::new(); - - // Setup - let mut txn = db.write(); - txn.insert("a1", 10).unwrap(); - txn.insert("a2", 20).unwrap(); - txn.insert("b1", 100).unwrap(); - txn.insert("b2", 200).unwrap(); - txn.commit().unwrap(); - assert_eq!(1, db.version()); - - let mut txn1 = db.write(); - let val = txn1 - .iter() - .unwrap() - .filter_map(|ele| { - if ele.key().starts_with('a') { - Some(*ele.value()) - } else { - None - } - }) - .sum::(); - txn1.insert("b3", 30).unwrap(); - assert_eq!(30, val); - - let mut txn2 = db.write(); - let val = txn2 - .iter() - .unwrap() - .filter_map(|ele| { - if ele.key().starts_with('b') { - Some(*ele.value()) - } else { - None - } - }) - .sum::(); - txn2.insert("a3", 300).unwrap(); - assert_eq!(300, val); - txn2.commit().unwrap(); - txn1.commit().unwrap_err(); - - let mut txn3 = db.write(); - let val = txn3 - .iter() - .unwrap() - .filter_map(|ele| { - if ele.key().starts_with('a') { - Some(*ele.value()) - } else { - None - } - }) - .sum::(); - assert_eq!(330, val); -} - #[test] fn txn_conflict_get() { let set_count = Arc::new(AtomicU32::new(0)); diff --git a/skipdb/src/optimistic/tests/write_skew.rs b/skipdb/src/optimistic/tests/write_skew.rs new file mode 100644 index 0000000..a45528b --- /dev/null +++ b/skipdb/src/optimistic/tests/write_skew.rs @@ -0,0 +1,207 @@ +use super::*; + +#[test] +fn txn_write_skew() { + // accounts + let a999 = 999; + let a888 = 888; + let db: OptimisticDb = OptimisticDb::new(); + + // Set balance to $100 in each account. + let mut txn = db.write(); + txn.insert(a999, 100).unwrap(); + txn.insert(a888, 100).unwrap(); + txn.commit().unwrap(); + assert_eq!(1, db.version()); + + let get_bal = |txn: &mut OptimisticTransaction, k: &u64| -> u64 { + let item = txn.get(k).unwrap().unwrap(); + let val = *item.value(); + val + }; + + // Start two transactions, each would read both accounts and deduct from one account. + let mut txn1 = db.write(); + + let mut sum = get_bal(&mut txn1, &a999); + sum += get_bal(&mut txn1, &a888); + assert_eq!(200, sum); + txn1.insert(a999, 0).unwrap(); // Deduct 100 from a999 + + // Let's read this back. + let mut sum = get_bal(&mut txn1, &a999); + assert_eq!(0, sum); + sum += get_bal(&mut txn1, &a888); + assert_eq!(100, sum); + // Don't commit yet. + + let mut txn2 = db.write(); + + let mut sum = get_bal(&mut txn2, &a999); + sum += get_bal(&mut txn2, &a888); + assert_eq!(200, sum); + txn2.insert(a888, 0).unwrap(); // Deduct 100 from a888 + + // Let's read this back. + let mut sum = get_bal(&mut txn2, &a999); + assert_eq!(100, sum); + sum += get_bal(&mut txn2, &a888); + assert_eq!(100, sum); + + // Commit both now. + txn1.commit().unwrap(); + txn2.commit().unwrap_err(); // This should fail + + assert_eq!(2, db.version()); +} + +// https://wiki.postgresql.org/wiki/SSI#Black_and_White +#[test] +fn txn_write_skew_black_white() { + let db: OptimisticDb = OptimisticDb::new(); + + // Setup + let mut txn = db.write(); + for i in 1..=10 { + if i % 2 == 1 { + txn.insert(i, "black").unwrap(); + } else { + txn.insert(i, "white").unwrap(); + } + } + txn.commit().unwrap(); + + // txn1 + let mut txn1 = db.write(); + let indices = txn1 + .iter() + .unwrap() + .filter_map(|e| { + if e.value() == "black" { + Some(*e.key()) + } else { + None + } + }) + .collect::>(); + for i in indices { + txn1.insert(i, "white").unwrap(); + } + + // txn2 + let mut txn2 = db.write(); + let indices = txn2 + .iter() + .unwrap() + .filter_map(|e| { + if e.value() == "white" { + Some(*e.key()) + } else { + None + } + }) + .collect::>(); + for i in indices { + txn2.insert(i, "black").unwrap(); + } + txn2.commit().unwrap(); + txn1.commit().unwrap_err(); +} + +// https://wiki.postgresql.org/wiki/SSI#Overdraft_Protection +#[test] +fn txn_write_skew_overdraft_protection() { + let db: OptimisticDb<&'static str, u64> = OptimisticDb::new(); + + // Setup + let mut txn = db.write(); + txn.insert("kevin", 1000).unwrap(); + txn.commit().unwrap(); + + // txn1 + let mut txn1 = db.write(); + let money = *txn1.get(&"kevin").unwrap().unwrap().value(); + txn1.insert("kevin", money - 100).unwrap(); + + // txn2 + let mut txn2 = db.write(); + let money = *txn2.get(&"kevin").unwrap().unwrap().value(); + txn2.insert("kevin", money - 100).unwrap(); + + txn1.commit().unwrap(); + txn2.commit().unwrap_err(); +} + +// https://wiki.postgresql.org/wiki/SSI#Primary_Colors +#[test] +fn txn_write_skew_primary_colors() { + let db: OptimisticDb = OptimisticDb::new(); + + // Setup + let mut txn = db.write(); + for i in 1..=9000 { + if i % 3 == 1 { + txn.insert(i, "red").unwrap(); + } else if i % 3 == 2 { + txn.insert(i, "yellow").unwrap(); + } else { + txn.insert(i, "blue").unwrap(); + } + } + txn.commit().unwrap(); + + // txn1 + let mut txn1 = db.write(); + let indices = txn1 + .iter() + .unwrap() + .filter_map(|e| { + if e.value() == "yellow" { + Some(*e.key()) + } else { + None + } + }) + .collect::>(); + for i in indices { + txn1.insert(i, "red").unwrap(); + } + + // txn2 + let mut txn2 = db.write(); + let indices = txn2 + .iter() + .unwrap() + .filter_map(|e| { + if e.value() == "blue" { + Some(*e.key()) + } else { + None + } + }) + .collect::>(); + for i in indices { + txn2.insert(i, "yellow").unwrap(); + } + + // txn3 + let mut txn3 = db.write(); + let indices = txn3 + .iter() + .unwrap() + .filter_map(|e| { + if e.value() == "blue" { + Some(*e.key()) + } else { + None + } + }) + .collect::>(); + for i in indices { + txn3.insert(i, "red").unwrap(); + } + + txn1.commit().unwrap(); + txn3.commit().unwrap_err(); + txn2.commit().unwrap_err(); +} diff --git a/skipdb/src/optimistic/write.rs b/skipdb/src/optimistic/write.rs index d9103d8..b019ab9 100644 --- a/skipdb/src/optimistic/write.rs +++ b/skipdb/src/optimistic/write.rs @@ -184,7 +184,6 @@ where ) -> Result>, TransactionError> { let version = self.wtm.version(); let (marker, pm) = self.wtm.marker_with_pm().ok_or(TransactionError::Discard)?; - let committed = self.db.inner.map.iter(version); let pendings = pm.iter(); @@ -201,7 +200,6 @@ where > { let version = self.wtm.version(); let (marker, pm) = self.wtm.marker_with_pm().ok_or(TransactionError::Discard)?; - let committed = self.db.inner.map.iter_rev(version); let pendings = pm.iter().rev(); diff --git a/skipdb/src/serializable/optimistic.rs b/skipdb/src/serializable/optimistic.rs index cb88276..b09e587 100644 --- a/skipdb/src/serializable/optimistic.rs +++ b/skipdb/src/serializable/optimistic.rs @@ -1,7 +1,7 @@ use skipdb_core::rev_range::WriteTransactionRevRange; use txn::{error::WtmError, PwmComparableRange}; -use std::{convert::Infallible, ops::Bound}; +use std::convert::Infallible; use super::*; @@ -166,14 +166,11 @@ where &mut self, ) -> Result>, TransactionError> { let version = self.wtm.version(); - let (mut marker, pm) = self.wtm.marker_with_pm().ok_or(TransactionError::Discard)?; - let start: Bound = Bound::Unbounded; - let end: Bound = Bound::Unbounded; - marker.mark_range((start, end)); + let (marker, pm) = self.wtm.marker_with_pm().ok_or(TransactionError::Discard)?; let committed = self.db.inner.map.iter(version); let pendings = pm.iter(); - Ok(TransactionIter::new(pendings, committed, None)) + Ok(TransactionIter::new(pendings, committed, Some(marker))) } /// Iterate over the entries of the write transaction in reverse order. @@ -183,10 +180,7 @@ where ) -> Result>, TransactionError> { let version = self.wtm.version(); - let (mut marker, pm) = self.wtm.marker_with_pm().ok_or(TransactionError::Discard)?; - let start: Bound = Bound::Unbounded; - let end: Bound = Bound::Unbounded; - marker.mark_range((start, end)); + let (marker, pm) = self.wtm.marker_with_pm().ok_or(TransactionError::Discard)?; let committed = self.db.inner.map.iter_rev(version); let pendings = pm.iter().rev(); diff --git a/skipdb/src/serializable/optimistic/tests.rs b/skipdb/src/serializable/optimistic/tests.rs index 146fd64..779a8a6 100644 --- a/skipdb/src/serializable/optimistic/tests.rs +++ b/skipdb/src/serializable/optimistic/tests.rs @@ -8,6 +8,8 @@ use wmark::Closer; use super::*; +mod write_skew; + #[test] fn begin_tx_readable() { let db: SerializableDb<&'static str, Vec> = SerializableDb::new(); @@ -172,122 +174,6 @@ fn txn_commit_with_callback() { std::thread::sleep(Duration::from_millis(10)); } -#[test] -fn txn_write_skew() { - // accounts - let a999 = 999; - let a888 = 888; - let db: SerializableDb = SerializableDb::new(); - - // Set balance to $100 in each account. - let mut txn = db.optimistic_write(); - txn.insert(a999, 100).unwrap(); - txn.insert(a888, 100).unwrap(); - txn.commit().unwrap(); - assert_eq!(1, db.version()); - - let get_bal = |txn: &mut OptimisticTransaction, k: &u64| -> u64 { - let item = txn.get(k).unwrap().unwrap(); - let val = *item.value(); - val - }; - - // Start two transactions, each would read both accounts and deduct from one account. - let mut txn1 = db.optimistic_write(); - - let mut sum = get_bal(&mut txn1, &a999); - sum += get_bal(&mut txn1, &a888); - assert_eq!(200, sum); - txn1.insert(a999, 0).unwrap(); // Deduct 100 from a999 - - // Let's read this back. - let mut sum = get_bal(&mut txn1, &a999); - assert_eq!(0, sum); - sum += get_bal(&mut txn1, &a888); - assert_eq!(100, sum); - // Don't commit yet. - - let mut txn2 = db.optimistic_write(); - - let mut sum = get_bal(&mut txn2, &a999); - sum += get_bal(&mut txn2, &a888); - assert_eq!(200, sum); - txn2.insert(a888, 0).unwrap(); // Deduct 100 from a888 - - // Let's read this back. - let mut sum = get_bal(&mut txn2, &a999); - assert_eq!(100, sum); - sum += get_bal(&mut txn2, &a888); - assert_eq!(100, sum); - - // Commit both now. - txn1.commit().unwrap(); - txn2.commit().unwrap_err(); // This should fail - - assert_eq!(2, db.version()); -} - -// https://wiki.postgresql.org/wiki/SSI#Intersecting_Data -#[test] -fn txn_write_skew_intersecting_data() { - let db: SerializableDb<&'static str, u64> = SerializableDb::new(); - - // Setup - let mut txn = db.optimistic_write(); - txn.insert("a1", 10).unwrap(); - txn.insert("a2", 20).unwrap(); - txn.insert("b1", 100).unwrap(); - txn.insert("b2", 200).unwrap(); - txn.commit().unwrap(); - assert_eq!(1, db.version()); - - let mut txn1 = db.optimistic_write(); - let val = txn1 - .iter() - .unwrap() - .filter_map(|ele| { - if ele.key().starts_with('a') { - Some(*ele.value()) - } else { - None - } - }) - .sum::(); - txn1.insert("b3", 30).unwrap(); - assert_eq!(30, val); - - let mut txn2 = db.optimistic_write(); - let val = txn2 - .iter() - .unwrap() - .filter_map(|ele| { - if ele.key().starts_with('b') { - Some(*ele.value()) - } else { - None - } - }) - .sum::(); - txn2.insert("a3", 300).unwrap(); - assert_eq!(300, val); - txn2.commit().unwrap(); - txn1.commit().unwrap_err(); - - let mut txn3 = db.optimistic_write(); - let val = txn3 - .iter() - .unwrap() - .filter_map(|ele| { - if ele.key().starts_with('a') { - Some(*ele.value()) - } else { - None - } - }) - .sum::(); - assert_eq!(330, val); -} - #[test] fn txn_conflict_get() { let set_count = Arc::new(AtomicU32::new(0)); diff --git a/skipdb/src/serializable/optimistic/tests/write_skew.rs b/skipdb/src/serializable/optimistic/tests/write_skew.rs new file mode 100644 index 0000000..35cfd71 --- /dev/null +++ b/skipdb/src/serializable/optimistic/tests/write_skew.rs @@ -0,0 +1,207 @@ +use super::*; + +#[test] +fn txn_write_skew() { + // accounts + let a999 = 999; + let a888 = 888; + let db: SerializableDb = SerializableDb::new(); + + // Set balance to $100 in each account. + let mut txn = db.optimistic_write(); + txn.insert(a999, 100).unwrap(); + txn.insert(a888, 100).unwrap(); + txn.commit().unwrap(); + assert_eq!(1, db.version()); + + let get_bal = |txn: &mut OptimisticTransaction, k: &u64| -> u64 { + let item = txn.get(k).unwrap().unwrap(); + let val = *item.value(); + val + }; + + // Start two transactions, each would read both accounts and deduct from one account. + let mut txn1 = db.optimistic_write(); + + let mut sum = get_bal(&mut txn1, &a999); + sum += get_bal(&mut txn1, &a888); + assert_eq!(200, sum); + txn1.insert(a999, 0).unwrap(); // Deduct 100 from a999 + + // Let's read this back. + let mut sum = get_bal(&mut txn1, &a999); + assert_eq!(0, sum); + sum += get_bal(&mut txn1, &a888); + assert_eq!(100, sum); + // Don't commit yet. + + let mut txn2 = db.optimistic_write(); + + let mut sum = get_bal(&mut txn2, &a999); + sum += get_bal(&mut txn2, &a888); + assert_eq!(200, sum); + txn2.insert(a888, 0).unwrap(); // Deduct 100 from a888 + + // Let's read this back. + let mut sum = get_bal(&mut txn2, &a999); + assert_eq!(100, sum); + sum += get_bal(&mut txn2, &a888); + assert_eq!(100, sum); + + // Commit both now. + txn1.commit().unwrap(); + txn2.commit().unwrap_err(); // This should fail + + assert_eq!(2, db.version()); +} + +// https://wiki.postgresql.org/wiki/SSI#Black_and_White +#[test] +fn txn_write_skew_black_white() { + let db: SerializableDb = SerializableDb::new(); + + // Setup + let mut txn = db.optimistic_write(); + for i in 1..=10 { + if i % 2 == 1 { + txn.insert(i, "black").unwrap(); + } else { + txn.insert(i, "white").unwrap(); + } + } + txn.commit().unwrap(); + + // txn1 + let mut txn1 = db.optimistic_write(); + let indices = txn1 + .iter() + .unwrap() + .filter_map(|e| { + if e.value() == "black" { + Some(*e.key()) + } else { + None + } + }) + .collect::>(); + for i in indices { + txn1.insert(i, "white").unwrap(); + } + + // txn2 + let mut txn2 = db.optimistic_write(); + let indices = txn2 + .iter() + .unwrap() + .filter_map(|e| { + if e.value() == "white" { + Some(*e.key()) + } else { + None + } + }) + .collect::>(); + for i in indices { + txn2.insert(i, "black").unwrap(); + } + txn2.commit().unwrap(); + txn1.commit().unwrap_err(); +} + +// https://wiki.postgresql.org/wiki/SSI#Overdraft_Protection +#[test] +fn txn_write_skew_overdraft_protection() { + let db: SerializableDb<&'static str, u64> = SerializableDb::new(); + + // Setup + let mut txn = db.optimistic_write(); + txn.insert("kevin", 1000).unwrap(); + txn.commit().unwrap(); + + // txn1 + let mut txn1 = db.optimistic_write(); + let money = *txn1.get(&"kevin").unwrap().unwrap().value(); + txn1.insert("kevin", money - 100).unwrap(); + + // txn2 + let mut txn2 = db.optimistic_write(); + let money = *txn2.get(&"kevin").unwrap().unwrap().value(); + txn2.insert("kevin", money - 100).unwrap(); + + txn1.commit().unwrap(); + txn2.commit().unwrap_err(); +} + +// https://wiki.postgresql.org/wiki/SSI#Primary_Colors +#[test] +fn txn_write_skew_primary_colors() { + let db: SerializableDb = SerializableDb::new(); + + // Setup + let mut txn = db.optimistic_write(); + for i in 1..=9000 { + if i % 3 == 1 { + txn.insert(i, "red").unwrap(); + } else if i % 3 == 2 { + txn.insert(i, "yellow").unwrap(); + } else { + txn.insert(i, "blue").unwrap(); + } + } + txn.commit().unwrap(); + + // txn1 + let mut txn1 = db.optimistic_write(); + let indices = txn1 + .iter() + .unwrap() + .filter_map(|e| { + if e.value() == "yellow" { + Some(*e.key()) + } else { + None + } + }) + .collect::>(); + for i in indices { + txn1.insert(i, "red").unwrap(); + } + + // txn2 + let mut txn2 = db.optimistic_write(); + let indices = txn2 + .iter() + .unwrap() + .filter_map(|e| { + if e.value() == "blue" { + Some(*e.key()) + } else { + None + } + }) + .collect::>(); + for i in indices { + txn2.insert(i, "yellow").unwrap(); + } + + // txn3 + let mut txn3 = db.optimistic_write(); + let indices = txn3 + .iter() + .unwrap() + .filter_map(|e| { + if e.value() == "blue" { + Some(*e.key()) + } else { + None + } + }) + .collect::>(); + for i in indices { + txn3.insert(i, "red").unwrap(); + } + + txn1.commit().unwrap(); + txn3.commit().unwrap_err(); + txn2.commit().unwrap_err(); +} diff --git a/skipdb/src/serializable/serializable/tests.rs b/skipdb/src/serializable/serializable/tests.rs index 63472c8..7ed56fc 100644 --- a/skipdb/src/serializable/serializable/tests.rs +++ b/skipdb/src/serializable/serializable/tests.rs @@ -8,6 +8,8 @@ use wmark::Closer; use super::*; +mod write_skew; + #[test] fn begin_tx_readable() { let db: SerializableDb<&'static str, Vec> = SerializableDb::new(); @@ -172,218 +174,6 @@ fn txn_commit_with_callback() { std::thread::sleep(Duration::from_millis(10)); } -#[test] -fn txn_write_skew() { - // accounts - let a999 = 999; - let a888 = 888; - let db: SerializableDb = SerializableDb::new(); - - // Set balance to $100 in each account. - let mut txn = db.serializable_write(); - txn.insert(a999, 100).unwrap(); - txn.insert(a888, 100).unwrap(); - txn.commit().unwrap(); - assert_eq!(1, db.version()); - - let get_bal = |txn: &mut SerializableTransaction, k: &u64| -> u64 { - let item = txn.get(k).unwrap().unwrap(); - let val = *item.value(); - val - }; - - // Start two transactions, each would read both accounts and deduct from one account. - let mut txn1 = db.serializable_write(); - - let mut sum = get_bal(&mut txn1, &a999); - sum += get_bal(&mut txn1, &a888); - assert_eq!(200, sum); - txn1.insert(a999, 0).unwrap(); // Deduct 100 from a999 - - // Let's read this back. - let mut sum = get_bal(&mut txn1, &a999); - assert_eq!(0, sum); - sum += get_bal(&mut txn1, &a888); - assert_eq!(100, sum); - // Don't commit yet. - - let mut txn2 = db.serializable_write(); - - let mut sum = get_bal(&mut txn2, &a999); - sum += get_bal(&mut txn2, &a888); - assert_eq!(200, sum); - txn2.insert(a888, 0).unwrap(); // Deduct 100 from a888 - - // Let's read this back. - let mut sum = get_bal(&mut txn2, &a999); - assert_eq!(100, sum); - sum += get_bal(&mut txn2, &a888); - assert_eq!(100, sum); - - // Commit both now. - txn1.commit().unwrap(); - txn2.commit().unwrap_err(); // This should fail - - assert_eq!(2, db.version()); -} - -// https://wiki.postgresql.org/wiki/SSI#Intersecting_Data -#[test] -fn txn_write_skew_intersecting_data() { - let db: SerializableDb<&'static str, u64> = SerializableDb::new(); - - // Setup - let mut txn = db.serializable_write(); - txn.insert("a1", 10).unwrap(); - txn.insert("a2", 20).unwrap(); - txn.insert("b1", 100).unwrap(); - txn.insert("b2", 200).unwrap(); - txn.commit().unwrap(); - assert_eq!(1, db.version()); - - let mut txn1 = db.serializable_write(); - let val = txn1 - .iter() - .unwrap() - .filter_map(|ele| { - if ele.key().starts_with('a') { - Some(*ele.value()) - } else { - None - } - }) - .sum::(); - txn1.insert("b3", 30).unwrap(); - assert_eq!(30, val); - - let mut txn2 = db.serializable_write(); - let val = txn2 - .iter() - .unwrap() - .filter_map(|ele| { - if ele.key().starts_with('b') { - Some(*ele.value()) - } else { - None - } - }) - .sum::(); - txn2.insert("a3", 300).unwrap(); - assert_eq!(300, val); - txn2.commit().unwrap(); - txn1.commit().unwrap_err(); - - let mut txn3 = db.serializable_write(); - let val = txn3 - .iter() - .unwrap() - .filter_map(|ele| { - if ele.key().starts_with('a') { - Some(*ele.value()) - } else { - None - } - }) - .sum::(); - assert_eq!(330, val); -} - -// https://wiki.postgresql.org/wiki/SSI#Intersecting_Data -#[test] -fn txn_write_skew_intersecting_data2() { - let db: SerializableDb<&'static str, u64> = SerializableDb::new(); - - // Setup - let mut txn = db.serializable_write(); - txn.insert("a1", 10).unwrap(); - txn.insert("b1", 100).unwrap(); - txn.insert("b2", 200).unwrap(); - txn.commit().unwrap(); - assert_eq!(1, db.version()); - - // - let mut txn1 = db.serializable_write(); - let val = txn1 - .range("a".."b") - .unwrap() - .map(|ele| *ele.value()) - .sum::(); - txn1.insert("b3", 10).unwrap(); - assert_eq!(10, val); - - let mut txn2 = db.serializable_write(); - let val = txn2 - .range("b".."c") - .unwrap() - .map(|ele| *ele.value()) - .sum::(); - txn2.insert("a3", 300).unwrap(); - assert_eq!(300, val); - txn2.commit().unwrap(); - txn1.commit().unwrap_err(); - - let mut txn3 = db.serializable_write(); - let val = txn3 - .iter() - .unwrap() - .filter_map(|ele| { - if ele.key().starts_with('a') { - Some(*ele.value()) - } else { - None - } - }) - .sum::(); - assert_eq!(310, val); -} - -// https://wiki.postgresql.org/wiki/SSI#Intersecting_Data -#[test] -fn txn_write_skew_intersecting_data3() { - let db: SerializableDb<&'static str, u64> = SerializableDb::new(); - - // Setup - let mut txn = db.serializable_write(); - txn.insert("b1", 100).unwrap(); - txn.insert("b2", 200).unwrap(); - txn.commit().unwrap(); - assert_eq!(1, db.version()); - - let mut txn1 = db.serializable_write(); - let val = txn1 - .range("a".."b") - .unwrap() - .map(|ele| *ele.value()) - .sum::(); - txn1.insert("b3", 0).unwrap(); - assert_eq!(0, val); - - let mut txn2 = db.serializable_write(); - let val = txn2 - .range("b".."c") - .unwrap() - .map(|ele| *ele.value()) - .sum::(); - txn2.insert("a3", 300).unwrap(); - assert_eq!(300, val); - txn2.commit().unwrap(); - txn1.commit().unwrap_err(); - - let mut txn3 = db.serializable_write(); - let val = txn3 - .iter() - .unwrap() - .filter_map(|ele| { - if ele.key().starts_with('a') { - Some(*ele.value()) - } else { - None - } - }) - .sum::(); - assert_eq!(300, val); -} - #[test] fn txn_conflict_get() { let set_count = Arc::new(AtomicU32::new(0)); diff --git a/skipdb/src/serializable/serializable/tests/write_skew.rs b/skipdb/src/serializable/serializable/tests/write_skew.rs new file mode 100644 index 0000000..0d9dd3d --- /dev/null +++ b/skipdb/src/serializable/serializable/tests/write_skew.rs @@ -0,0 +1,364 @@ +use super::*; + +#[test] +fn txn_write_skew() { + // accounts + let a999 = 999; + let a888 = 888; + let db: SerializableDb = SerializableDb::new(); + + // Set balance to $100 in each account. + let mut txn = db.serializable_write(); + txn.insert(a999, 100).unwrap(); + txn.insert(a888, 100).unwrap(); + txn.commit().unwrap(); + assert_eq!(1, db.version()); + + let get_bal = |txn: &mut SerializableTransaction, k: &u64| -> u64 { + let item = txn.get(k).unwrap().unwrap(); + let val = *item.value(); + val + }; + + // Start two transactions, each would read both accounts and deduct from one account. + let mut txn1 = db.serializable_write(); + + let mut sum = get_bal(&mut txn1, &a999); + sum += get_bal(&mut txn1, &a888); + assert_eq!(200, sum); + txn1.insert(a999, 0).unwrap(); // Deduct 100 from a999 + + // Let's read this back. + let mut sum = get_bal(&mut txn1, &a999); + assert_eq!(0, sum); + sum += get_bal(&mut txn1, &a888); + assert_eq!(100, sum); + // Don't commit yet. + + let mut txn2 = db.serializable_write(); + + let mut sum = get_bal(&mut txn2, &a999); + sum += get_bal(&mut txn2, &a888); + assert_eq!(200, sum); + txn2.insert(a888, 0).unwrap(); // Deduct 100 from a888 + + // Let's read this back. + let mut sum = get_bal(&mut txn2, &a999); + assert_eq!(100, sum); + sum += get_bal(&mut txn2, &a888); + assert_eq!(100, sum); + + // Commit both now. + txn1.commit().unwrap(); + txn2.commit().unwrap_err(); // This should fail + + assert_eq!(2, db.version()); +} + +// https://wiki.postgresql.org/wiki/SSI#Black_and_White +#[test] +fn txn_write_skew_black_white() { + let db: SerializableDb = SerializableDb::new(); + + // Setup + let mut txn = db.serializable_write(); + for i in 1..=10 { + if i % 2 == 1 { + txn.insert(i, "black").unwrap(); + } else { + txn.insert(i, "white").unwrap(); + } + } + txn.commit().unwrap(); + + // txn1 + let mut txn1 = db.serializable_write(); + let indices = txn1 + .iter() + .unwrap() + .filter_map(|e| { + if e.value() == "black" { + Some(*e.key()) + } else { + None + } + }) + .collect::>(); + for i in indices { + txn1.insert(i, "white").unwrap(); + } + + // txn2 + let mut txn2 = db.serializable_write(); + let indices = txn2 + .iter() + .unwrap() + .filter_map(|e| { + if e.value() == "white" { + Some(*e.key()) + } else { + None + } + }) + .collect::>(); + for i in indices { + txn2.insert(i, "black").unwrap(); + } + txn2.commit().unwrap(); + txn1.commit().unwrap_err(); +} + +// https://wiki.postgresql.org/wiki/SSI#Intersecting_Data +#[test] +fn txn_write_skew_intersecting_data() { + let db: SerializableDb<&'static str, u64> = SerializableDb::new(); + + // Setup + let mut txn = db.serializable_write(); + txn.insert("a1", 10).unwrap(); + txn.insert("a2", 20).unwrap(); + txn.insert("b1", 100).unwrap(); + txn.insert("b2", 200).unwrap(); + txn.commit().unwrap(); + assert_eq!(1, db.version()); + + let mut txn1 = db.serializable_write(); + let val = txn1 + .iter() + .unwrap() + .filter_map(|ele| { + if ele.key().starts_with('a') { + Some(*ele.value()) + } else { + None + } + }) + .sum::(); + txn1.insert("b3", 30).unwrap(); + assert_eq!(30, val); + + let mut txn2 = db.serializable_write(); + let val = txn2 + .iter() + .unwrap() + .filter_map(|ele| { + if ele.key().starts_with('b') { + Some(*ele.value()) + } else { + None + } + }) + .sum::(); + txn2.insert("a3", 300).unwrap(); + assert_eq!(300, val); + txn2.commit().unwrap(); + txn1.commit().unwrap_err(); + + let mut txn3 = db.serializable_write(); + let val = txn3 + .iter() + .unwrap() + .filter_map(|ele| { + if ele.key().starts_with('a') { + Some(*ele.value()) + } else { + None + } + }) + .sum::(); + assert_eq!(330, val); +} + +// https://wiki.postgresql.org/wiki/SSI#Intersecting_Data +#[test] +fn txn_write_skew_intersecting_data2() { + let db: SerializableDb<&'static str, u64> = SerializableDb::new(); + + // Setup + let mut txn = db.serializable_write(); + txn.insert("a1", 10).unwrap(); + txn.insert("b1", 100).unwrap(); + txn.insert("b2", 200).unwrap(); + txn.commit().unwrap(); + assert_eq!(1, db.version()); + + // + let mut txn1 = db.serializable_write(); + let val = txn1 + .range("a".."b") + .unwrap() + .map(|ele| *ele.value()) + .sum::(); + txn1.insert("b3", 10).unwrap(); + assert_eq!(10, val); + + let mut txn2 = db.serializable_write(); + let val = txn2 + .range("b".."c") + .unwrap() + .map(|ele| *ele.value()) + .sum::(); + txn2.insert("a3", 300).unwrap(); + assert_eq!(300, val); + txn2.commit().unwrap(); + txn1.commit().unwrap_err(); + + let mut txn3 = db.serializable_write(); + let val = txn3 + .iter() + .unwrap() + .filter_map(|ele| { + if ele.key().starts_with('a') { + Some(*ele.value()) + } else { + None + } + }) + .sum::(); + assert_eq!(310, val); +} + +// https://wiki.postgresql.org/wiki/SSI#Intersecting_Data +#[test] +fn txn_write_skew_intersecting_data3() { + let db: SerializableDb<&'static str, u64> = SerializableDb::new(); + + // Setup + let mut txn = db.serializable_write(); + txn.insert("b1", 100).unwrap(); + txn.insert("b2", 200).unwrap(); + txn.commit().unwrap(); + assert_eq!(1, db.version()); + + let mut txn1 = db.serializable_write(); + let val = txn1 + .range("a".."b") + .unwrap() + .map(|ele| *ele.value()) + .sum::(); + txn1.insert("b3", 0).unwrap(); + assert_eq!(0, val); + + let mut txn2 = db.serializable_write(); + let val = txn2 + .range("b".."c") + .unwrap() + .map(|ele| *ele.value()) + .sum::(); + txn2.insert("a3", 300).unwrap(); + assert_eq!(300, val); + txn2.commit().unwrap(); + txn1.commit().unwrap_err(); + + let mut txn3 = db.serializable_write(); + let val = txn3 + .iter() + .unwrap() + .filter_map(|ele| { + if ele.key().starts_with('a') { + Some(*ele.value()) + } else { + None + } + }) + .sum::(); + assert_eq!(300, val); +} + +// https://wiki.postgresql.org/wiki/SSI#Overdraft_Protection +#[test] +fn txn_write_skew_overdraft_protection() { + let db: SerializableDb<&'static str, u64> = SerializableDb::new(); + + // Setup + let mut txn = db.serializable_write(); + txn.insert("kevin", 1000).unwrap(); + txn.commit().unwrap(); + + // txn1 + let mut txn1 = db.serializable_write(); + let money = *txn1.get(&"kevin").unwrap().unwrap().value(); + txn1.insert("kevin", money - 100).unwrap(); + + // txn2 + let mut txn2 = db.serializable_write(); + let money = *txn2.get(&"kevin").unwrap().unwrap().value(); + txn2.insert("kevin", money - 100).unwrap(); + + txn1.commit().unwrap(); + txn2.commit().unwrap_err(); +} + +// https://wiki.postgresql.org/wiki/SSI#Primary_Colors +#[test] +fn txn_write_skew_primary_colors() { + let db: SerializableDb = SerializableDb::new(); + + // Setup + let mut txn = db.serializable_write(); + for i in 1..=9000 { + if i % 3 == 1 { + txn.insert(i, "red").unwrap(); + } else if i % 3 == 2 { + txn.insert(i, "yellow").unwrap(); + } else { + txn.insert(i, "blue").unwrap(); + } + } + txn.commit().unwrap(); + + // txn1 + let mut txn1 = db.serializable_write(); + let indices = txn1 + .iter() + .unwrap() + .filter_map(|e| { + if e.value() == "yellow" { + Some(*e.key()) + } else { + None + } + }) + .collect::>(); + for i in indices { + txn1.insert(i, "red").unwrap(); + } + + // txn2 + let mut txn2 = db.serializable_write(); + let indices = txn2 + .iter() + .unwrap() + .filter_map(|e| { + if e.value() == "blue" { + Some(*e.key()) + } else { + None + } + }) + .collect::>(); + for i in indices { + txn2.insert(i, "yellow").unwrap(); + } + + // txn3 + let mut txn3 = db.serializable_write(); + let indices = txn3 + .iter() + .unwrap() + .filter_map(|e| { + if e.value() == "blue" { + Some(*e.key()) + } else { + None + } + }) + .collect::>(); + for i in indices { + txn3.insert(i, "red").unwrap(); + } + + txn1.commit().unwrap(); + txn3.commit().unwrap_err(); + txn2.commit().unwrap_err(); +} diff --git a/txn-core/Cargo.toml b/txn-core/Cargo.toml index 2741e6c..cc7d508 100644 --- a/txn-core/Cargo.toml +++ b/txn-core/Cargo.toml @@ -17,6 +17,7 @@ std = ["alloc", "smallvec-wrapper/std", "indexmap/default", "thiserror"] [dependencies] ahash = "0.8" cheap-clone.workspace = true +either.workspace = true thiserror = { workspace = true, optional = true } indexmap = { workspace = true, optional = true } diff --git a/txn-core/src/sync.rs b/txn-core/src/sync.rs index 89359b9..dda58b6 100644 --- a/txn-core/src/sync.rs +++ b/txn-core/src/sync.rs @@ -58,6 +58,13 @@ impl<'a, C: CmRange> Marker<'a, C> { } } +impl<'a, C: CmIter> Marker<'a, C> { + /// Marks a key is operated. + pub fn mark_iter(&mut self) { + self.marker.mark_iter(); + } +} + impl<'a, C: CmComparable> Marker<'a, C> { /// Marks a key is operated. pub fn mark_comparable(&mut self, k: &Q) @@ -158,6 +165,18 @@ pub trait CmRange: Cm + Sized { fn mark_range(&mut self, range: impl RangeBounds<::Key>); } +/// A extended trait of the [`Cm`] trait that can be used to manage the iterator of keys. +pub trait CmIter: Cm + Sized { + /// Mark the iterator is operated, this is useful to detect the indirect conflict. + fn mark_iter(&mut self); +} + +impl CmIter for T { + fn mark_iter(&mut self) { + self.mark_range(..); + } +} + /// An optimized version of the [`Cm`] trait that if your conflict manager is depend on hash. pub trait CmEquivalent: Cm { /// Optimized version of [`mark_read`] that accepts borrowed keys. Optional to implement. diff --git a/txn-core/src/sync/hash_cm.rs b/txn-core/src/sync/hash_cm.rs index 95a68f2..4dec4f8 100644 --- a/txn-core/src/sync/hash_cm.rs +++ b/txn-core/src/sync/hash_cm.rs @@ -5,7 +5,13 @@ use crate::DefaultHasher; use super::*; -use indexmap::IndexMap; +use indexmap::IndexSet; + +#[derive(Clone, Copy, Debug)] +enum Read { + Single(u64), + All, +} /// Options for the [`HashCm`]. #[derive(Default, Clone, Copy, Debug, PartialEq, Eq, Hash)] @@ -38,8 +44,8 @@ impl HashCmOptions { /// A [`Cm`] conflict manager implementation that based on the [`Hash`](Hash). pub struct HashCm { - reads: MediumVec, - conflict_keys: IndexMap, S>, + reads: MediumVec, + conflict_keys: IndexSet, _k: core::marker::PhantomData, } @@ -67,12 +73,12 @@ where Ok(match options.capacity { Some(capacity) => Self { reads: MediumVec::with_capacity(capacity), - conflict_keys: IndexMap::with_capacity_and_hasher(capacity, options.hasher), + conflict_keys: IndexSet::with_capacity_and_hasher(capacity, options.hasher), _k: core::marker::PhantomData, }, None => Self { reads: MediumVec::new(), - conflict_keys: IndexMap::with_hasher(options.hasher), + conflict_keys: IndexSet::with_hasher(options.hasher), _k: core::marker::PhantomData, }, }) @@ -81,55 +87,33 @@ where #[inline] fn mark_read(&mut self, key: &K) { let fp = self.conflict_keys.hasher().hash_one(key); - self.reads.push(fp); + self.reads.push(Read::Single(fp)); } #[inline] fn mark_conflict(&mut self, key: &Self::Key) { let fp = self.conflict_keys.hasher().hash_one(key); - let idx = if self.reads.is_empty() { - None - } else { - Some(self.reads.len() - 1) - }; - self.conflict_keys.insert(fp, idx); + self.conflict_keys.insert(fp); } #[inline] fn has_conflict(&self, other: &Self) -> bool { - println!("read {:?}", self.reads); - println!("conflict {:?}", self.conflict_keys); if self.reads.is_empty() { return false; } // check if there is any direct conflict for ro in self.reads.iter() { - if other.conflict_keys.contains_key(ro) { - return true; - } - } - - // check if there is any indirect conflict - for i in self - .conflict_keys - .iter() - .filter_map(|(_, idx)| idx.map(|idx| idx)) - { - let happens_before_reads = &other.reads[..i]; - - for j in self - .conflict_keys - .iter() - .filter_map(|(_, idx)| idx.map(|idx| idx)) - { - let other_happens_before_reads = &other.reads[..j]; - - if happens_before_reads - .iter() - .any(|ro| other_happens_before_reads.contains(ro)) - { - return true; + match ro { + Read::Single(ro) => { + if other.conflict_keys.contains(ro) { + return true; + } + } + Read::All => { + if !other.conflict_keys.is_empty() { + return true; + } } } } @@ -145,6 +129,16 @@ where } } +impl CmIter for HashCm +where + S: BuildHasher, + K: Hash + Eq, +{ + fn mark_iter(&mut self) { + self.reads.push(Read::All); + } +} + impl CmEquivalent for HashCm where S: BuildHasher, @@ -157,7 +151,7 @@ where Q: Hash + Eq + ?Sized, { let fp = self.conflict_keys.hasher().hash_one(key); - self.reads.push(fp); + self.reads.push(Read::Single(fp)); } #[inline] @@ -167,12 +161,7 @@ where Q: Hash + Eq + ?Sized, { let fp = self.conflict_keys.hasher().hash_one(key); - let idx = if self.reads.is_empty() { - None - } else { - Some(self.reads.len() - 1) - }; - self.conflict_keys.insert(fp, idx); + self.conflict_keys.insert(fp); } }