-
Notifications
You must be signed in to change notification settings - Fork 8
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
Add async commit to Transaction, and allow reject rules to be provided
- Loading branch information
Saxon Parker
committed
Sep 16, 2019
1 parent
4b69dae
commit 3c59260
Showing
4 changed files
with
274 additions
and
16 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,257 @@ | ||
Adds methods to perform transactions asynchronously. | ||
|
||
From: Saxon Parker <[email protected]> | ||
|
||
|
||
--- | ||
GNUmakefile | 4 +++ | ||
src/Transaction.cc | 48 ++++++++++++++++++++++++++++++-- | ||
src/Transaction.h | 21 ++++++++++++-- | ||
src/TransactionTest.cc | 73 ++++++++++++++++++++++++++++++++++++++++++++++++ | ||
4 files changed, 142 insertions(+), 4 deletions(-) | ||
|
||
diff --git a/GNUmakefile b/GNUmakefile | ||
index 50d491f6..201131cc 100644 | ||
--- a/GNUmakefile | ||
+++ b/GNUmakefile | ||
@@ -431,7 +431,10 @@ INSTALL_INCLUDES := \ | ||
src/BoostIntrusive.h \ | ||
src/Buffer.h \ | ||
src/ClientException.h \ | ||
+ src/ClientTransactionManager.h \ | ||
src/CodeLocation.h \ | ||
+ src/Common.h \ | ||
+ src/Context.h \ | ||
src/CoordinatorClient.h \ | ||
src/CoordinatorRpcWrapper.h \ | ||
src/Crc32C.h \ | ||
@@ -467,6 +470,7 @@ INSTALL_INCLUDES := \ | ||
src/SpinLock.h \ | ||
src/Status.h \ | ||
src/TestLog.h \ | ||
+ src/Transaction.h \ | ||
src/Transport.h \ | ||
src/Tub.h \ | ||
src/WireFormat.h \ | ||
diff --git a/src/Transaction.cc b/src/Transaction.cc | ||
index c12867e3..037339be 100644 | ||
--- a/src/Transaction.cc | ||
+++ b/src/Transaction.cc | ||
@@ -104,6 +104,44 @@ Transaction::commitAndSync() | ||
return commit(); | ||
} | ||
|
||
+void | ||
+Transaction::commitAsync() | ||
+{ | ||
+ if (!commitStarted) { | ||
+ commitStarted = true; | ||
+ ramcloud->transactionManager->startTransactionTask(taskPtr); | ||
+ } | ||
+} | ||
+ | ||
+bool | ||
+Transaction::commitReady() | ||
+{ | ||
+ return taskPtr->allDecisionsSent(); | ||
+} | ||
+ | ||
+bool | ||
+Transaction::syncReady() | ||
+{ | ||
+ return taskPtr->isReady(); | ||
+} | ||
+ | ||
+void | ||
+Transaction::poll() | ||
+{ | ||
+ ramcloud->transactionManager->poll(); | ||
+} | ||
+ | ||
+bool | ||
+Transaction::result() | ||
+{ | ||
+ if (expect_false(taskPtr->getDecision() == | ||
+ WireFormat::TxDecision::UNDECIDED)) { | ||
+ ClientException::throwException(HERE, STATUS_INTERNAL_ERROR); | ||
+ } | ||
+ | ||
+ return (taskPtr->getDecision() == WireFormat::TxDecision::COMMIT); | ||
+} | ||
+ | ||
/** | ||
* Read the current contents of an object as part of this transaction. | ||
* | ||
@@ -144,7 +182,7 @@ Transaction::read(uint64_t tableId, const void* key, uint16_t keyLength, | ||
* Size in bytes of the key. | ||
*/ | ||
void | ||
-Transaction::remove(uint64_t tableId, const void* key, uint16_t keyLength) | ||
+Transaction::remove(uint64_t tableId, const void* key, uint16_t keyLength, const RejectRules* rejectRules) | ||
{ | ||
if (expect_false(commitStarted)) { | ||
throw TxOpAfterCommit(HERE); | ||
@@ -165,6 +203,9 @@ Transaction::remove(uint64_t tableId, const void* key, uint16_t keyLength) | ||
} | ||
|
||
entry->type = ClientTransactionTask::CacheEntry::REMOVE; | ||
+ if (rejectRules != NULL) { | ||
+ entry->rejectRules = *rejectRules; | ||
+ } | ||
} | ||
|
||
/** | ||
@@ -187,7 +228,7 @@ Transaction::remove(uint64_t tableId, const void* key, uint16_t keyLength) | ||
*/ | ||
void | ||
Transaction::write(uint64_t tableId, const void* key, uint16_t keyLength, | ||
- const void* buf, uint32_t length) | ||
+ const void* buf, uint32_t length, const RejectRules* rejectRules) | ||
{ | ||
if (expect_false(commitStarted)) { | ||
throw TxOpAfterCommit(HERE); | ||
@@ -212,6 +253,9 @@ Transaction::write(uint64_t tableId, const void* key, uint16_t keyLength, | ||
} | ||
|
||
entry->type = ClientTransactionTask::CacheEntry::WRITE; | ||
+ if (rejectRules != NULL) { | ||
+ entry->rejectRules = *rejectRules; | ||
+ } | ||
} | ||
|
||
/** | ||
diff --git a/src/Transaction.h b/src/Transaction.h | ||
index 4283fcf6..786bda79 100644 | ||
--- a/src/Transaction.h | ||
+++ b/src/Transaction.h | ||
@@ -59,13 +59,30 @@ class Transaction { | ||
void sync(); | ||
bool commitAndSync(); | ||
|
||
+ /** | ||
+ * This set of methods is used to issue a commit asynchronously. | ||
+ * Sample code: | ||
+ * Transaction t; | ||
+ * ... add operations ... | ||
+ * t.commitAsync(); | ||
+ * while (!t.commitReady()) | ||
+ * t.poll(); | ||
+ * if (!t.result()) | ||
+ * std::cerr << "transaction failed" << std::endl; | ||
+ */ | ||
+ void commitAsync(); | ||
+ bool commitReady(); | ||
+ bool syncReady(); | ||
+ bool result(); | ||
+ void poll(); | ||
+ | ||
void read(uint64_t tableId, const void* key, uint16_t keyLength, | ||
Buffer* value, bool* objectExists = NULL); | ||
|
||
- void remove(uint64_t tableId, const void* key, uint16_t keyLength); | ||
+ void remove(uint64_t tableId, const void* key, uint16_t keyLength, const RejectRules* rejectRules = NULL); | ||
|
||
void write(uint64_t tableId, const void* key, uint16_t keyLength, | ||
- const void* buf, uint32_t length); | ||
+ const void* buf, uint32_t length, const RejectRules* rejectRules = NULL); | ||
|
||
/** | ||
* Encapsulates the state of a Transaction::read operation, | ||
diff --git a/src/TransactionTest.cc b/src/TransactionTest.cc | ||
index 724b7f23..ee36eabf 100644 | ||
--- a/src/TransactionTest.cc | ||
+++ b/src/TransactionTest.cc | ||
@@ -118,6 +118,35 @@ TEST_F(TransactionTest, commit_basic) { | ||
EXPECT_TRUE(transaction->commitStarted); | ||
} | ||
|
||
+TEST_F(TransactionTest, commit_async_basic) { | ||
+ ramcloud->write(tableId1, "0", 1, "abcdef", 6); | ||
+ | ||
+ Buffer value; | ||
+ transaction->read(tableId1, "0", 1, &value); | ||
+ transaction->write(tableId1, "0", 1, "hello", 5); | ||
+ | ||
+ EXPECT_FALSE(transaction->commitStarted); | ||
+ EXPECT_EQ(ClientTransactionTask::INIT, | ||
+ transaction->taskPtr.get()->state); | ||
+ | ||
+ transaction->commitAsync(); | ||
+ while (!transaction->commitReady()) | ||
+ transaction->poll(); | ||
+ | ||
+ EXPECT_TRUE(transaction->result()); | ||
+ | ||
+ EXPECT_EQ(ClientTransactionTask::DONE, | ||
+ transaction->taskPtr.get()->state); | ||
+ EXPECT_TRUE(transaction->commitStarted); | ||
+ | ||
+ // Check that commit does not wait for decision rpcs to return. | ||
+ transaction->taskPtr.get()->state = ClientTransactionTask::DECISION; | ||
+ EXPECT_TRUE(transaction->commit()); | ||
+ EXPECT_EQ(ClientTransactionTask::DECISION, | ||
+ transaction->taskPtr.get()->state); | ||
+ EXPECT_TRUE(transaction->commitStarted); | ||
+} | ||
+ | ||
TEST_F(TransactionTest, commit_abort) { | ||
ramcloud->write(tableId1, "0", 1, "abcdef", 6); | ||
|
||
@@ -343,6 +372,25 @@ TEST_F(TransactionTest, remove) { | ||
EXPECT_EQ(entry, task->findCacheEntry(key)); | ||
} | ||
|
||
+TEST_F(TransactionTest, remove_with_reject) { | ||
+ ramcloud->write(tableId1, "0", 1, "abcdef", 6); | ||
+ ramcloud->write(tableId1, "0", 1, "ghijkl", 6); | ||
+ | ||
+ RejectRules rr; | ||
+ rr.versionNeGiven = 1; | ||
+ transaction->remove(tableId1, "0", 1, &rr); | ||
+ | ||
+ EXPECT_FALSE(transaction->commitStarted); | ||
+ EXPECT_EQ(ClientTransactionTask::INIT, | ||
+ transaction->taskPtr.get()->state); | ||
+ | ||
+ // The commit should fail because of the reject rules | ||
+ EXPECT_FALSE(transaction->commit()); | ||
+ EXPECT_EQ(ClientTransactionTask::DONE, | ||
+ transaction->taskPtr.get()->state); | ||
+ EXPECT_TRUE(transaction->commitStarted); | ||
+} | ||
+ | ||
TEST_F(TransactionTest, remove_afterCommit) { | ||
transaction->commitStarted = true; | ||
EXPECT_THROW(transaction->remove(1, "test", 4), | ||
@@ -382,6 +430,31 @@ TEST_F(TransactionTest, write) { | ||
EXPECT_EQ(entry, task->findCacheEntry(key)); | ||
} | ||
|
||
+TEST_F(TransactionTest, write_with_reject) { | ||
+ ramcloud->write(tableId1, "0", 1, "abcdef", 6); | ||
+ ramcloud->write(tableId1, "0", 1, "ghijkl", 6); | ||
+ | ||
+ RejectRules rr; | ||
+ rr.versionNeGiven = 1; | ||
+ transaction->write(tableId1, "0", 1, "mnopqrs", 6, &rr); | ||
+ | ||
+ EXPECT_FALSE(transaction->commitStarted); | ||
+ EXPECT_EQ(ClientTransactionTask::INIT, | ||
+ transaction->taskPtr.get()->state); | ||
+ | ||
+ // The commit should fail because of the reject rules | ||
+ EXPECT_FALSE(transaction->commit()); | ||
+ EXPECT_EQ(ClientTransactionTask::DONE, | ||
+ transaction->taskPtr.get()->state); | ||
+ EXPECT_TRUE(transaction->commitStarted); | ||
+ | ||
+ Buffer value; | ||
+ ramcloud->read(tableId1, "0", 1, &value); | ||
+ EXPECT_EQ("ghijkl", string(reinterpret_cast<const char*>( | ||
+ value.getRange(0, value.size())), | ||
+ value.size())); | ||
+} | ||
+ | ||
TEST_F(TransactionTest, write_afterCommit) { | ||
transaction->commitStarted = true; | ||
EXPECT_THROW(transaction->write(1, "test", 4, "hello", 5), |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,8 +1,9 @@ | ||
# This series applies on GIT commit 3f4a0ef1f881184da116faa7a70e2a14681dc303 | ||
# This series applies on GIT commit e7089bc1332e311ad3f2451ebc681cc20fe29ff4 | ||
disable-log-before-main.patch | ||
fix-dpdk-mbuf-release.patch | ||
dpdk-config.patch | ||
multiop-includes.patch | ||
remove-java.patch | ||
make-rc-unit-test.patch | ||
flaky-test_fix.patch | ||
async-transaction.patch |