From 8f65b6c805b9edcd5f9ae79f1b7b2954bf8d40ad Mon Sep 17 00:00:00 2001 From: Tyler Davis Date: Wed, 12 Jul 2017 11:50:25 -0700 Subject: [PATCH 1/3] Fixed test_mt and made TCPClient threadsafe --- Makefile.am | 3 ++- src/client/TCPClient.cpp | 28 ++++++++++++++++++++-------- src/client/TCPClient.h | 10 +++++++--- tests/object_store/test_mt.cpp | 19 +++++++++++-------- tests/test_mt.py | 11 +++++++++++ 5 files changed, 51 insertions(+), 20 deletions(-) create mode 100755 tests/test_mt.py diff --git a/Makefile.am b/Makefile.am index 55f1ea4d..b43ceb3b 100644 --- a/Makefile.am +++ b/Makefile.am @@ -1,7 +1,8 @@ AUTOMAKE_OPTIONS = foreign serial-tests SUBDIRS = src tests benchmarks TESTS = ./tests/basic_test.py ./tests/test_mem_exhaustion.py ./tests/test_store_simple.py \ - ./tests/test_cache_manager.py ./tests/test_iterator.py ./tests/test_store.py + ./tests/test_cache_manager.py ./tests/test_iterator.py ./tests/test_store.py \ + ./tests/test_mt.py source_dir = src/ DEFAULT_INCLUDES = -I$(source_dir) diff --git a/src/client/TCPClient.cpp b/src/client/TCPClient.cpp index 967b3f11..bb06c00d 100644 --- a/src/client/TCPClient.cpp +++ b/src/client/TCPClient.cpp @@ -14,6 +14,7 @@ #include "utils/utils.h" #include "common/Future.h" #include "common/Exception.h" +#include "common/Synchronization.h" namespace cirrus { @@ -103,15 +104,18 @@ cirrus::Future TCPClient::write_async(ObjectID oid, const void* data, auto msg_contents = message::TCPBladeMessage::CreateWrite(*builder, oid, data_fb_vector); + curr_txn_id_lock.wait(); + const int txn_id = curr_txn_id++; + curr_txn_id_lock.signal(); auto msg = message::TCPBladeMessage::CreateTCPBladeMessage( *builder, - curr_txn_id, + txn_id, 0, message::TCPBladeMessage::Message_Write, msg_contents.Union()); builder->Finish(msg); - return enqueue_message(builder); + return enqueue_message(builder, txn_id); } /** @@ -134,15 +138,19 @@ cirrus::Future TCPClient::read_async(ObjectID oid, void* data, // Create and send read request auto msg_contents = message::TCPBladeMessage::CreateRead(*builder, oid); + curr_txn_id_lock.wait(); + const int txn_id = curr_txn_id++; + curr_txn_id_lock.signal(); + auto msg = message::TCPBladeMessage::CreateTCPBladeMessage( *builder, - curr_txn_id, + txn_id, 0, message::TCPBladeMessage::Message_Read, msg_contents.Union()); builder->Finish(msg); - return enqueue_message(builder, data); + return enqueue_message(builder, txn_id, data); } /** @@ -191,15 +199,19 @@ bool TCPClient::remove(ObjectID oid) { // Create and send removal request auto msg_contents = message::TCPBladeMessage::CreateRemove(*builder, oid); + curr_txn_id_lock.wait(); + const int txn_id = curr_txn_id++; + curr_txn_id_lock.signal(); + auto msg = message::TCPBladeMessage::CreateTCPBladeMessage( *builder, - curr_txn_id, + txn_id, 0, message::TCPBladeMessage::Message_Remove, msg_contents.Union()); builder->Finish(msg); - cirrus::Future future = enqueue_message(builder); + cirrus::Future future = enqueue_message(builder, txn_id); return future.get(); } @@ -424,7 +436,7 @@ void TCPClient::process_send() { */ cirrus::Future TCPClient::enqueue_message( std::shared_ptr builder, - void *ptr) { + const int txn_id, void *ptr) { std::shared_ptr txn = std::make_shared(); txn->mem_for_read = ptr; @@ -433,7 +445,7 @@ cirrus::Future TCPClient::enqueue_message( map_lock.wait(); // Add to map - txn_map[curr_txn_id++] = txn; + txn_map[txn_id] = txn; // Release lock on map map_lock.signal(); diff --git a/src/client/TCPClient.h b/src/client/TCPClient.h index 584e51c3..1c659ac0 100644 --- a/src/client/TCPClient.h +++ b/src/client/TCPClient.h @@ -38,6 +38,7 @@ class TCPClient : public BladeClient { ssize_t send_all(int, const void*, size_t, int); cirrus::Future enqueue_message( std::shared_ptr builder, + const int txn_id, void *ptr = nullptr); void process_received(); void process_send(); @@ -63,9 +64,12 @@ class TCPClient : public BladeClient { error_code = std::make_shared(); } }; - - int sock = 0; /**< fd of the socket used to communicate w/ remote store */ - TxnID curr_txn_id = 0; /**< next txn_id to assign */ + /** fd of the socket used to communicate w/ remote store */ + int sock = 0; + /**< next txn_id to assign */ + TxnID curr_txn_id = 0; + /** Lock on the current transaction id. */ + cirrus::SpinLock curr_txn_id_lock; /** * Map that allows receiver thread to map transactions to their diff --git a/tests/object_store/test_mt.cpp b/tests/object_store/test_mt.cpp index 2eabf354..76c6efc7 100644 --- a/tests/object_store/test_mt.cpp +++ b/tests/object_store/test_mt.cpp @@ -28,8 +28,8 @@ cirrus::ostore::FullBladeObjectStoreTempl> store(IP, PORT, /** * Tests that behavior is as expected when multiple threads make get and put - * requests to the remote store. These clients all use the same instance of - * the store to connect. Currently not working. + * requests to the remote store. These threads all use the same instance of + * the store to connect. */ void test_mt() { cirrus::TimerFunction tf("connect time", true); @@ -39,20 +39,23 @@ void test_mt() { std::random_device rd; std::mt19937 gen(rd()); std::uniform_int_distribution<> dis(1, 10); - + int start = 0; + int stop = 10; for (int i = 0; i < N_THREADS; ++i) { - threads[i] = new std::thread([dis, gen]() { - for (int i = 0; i < 100; ++i) { + threads[i] = new std::thread([dis, gen, start, stop]() { + for (int i = start; i < stop; i++) { int rnd = std::rand(); struct cirrus::Dummy d(rnd); - store.put(1, d); - cirrus::Dummy d2 = store.get(1); + store.put(i, d); + cirrus::Dummy d2 = store.get(i); if (d2.id != rnd) - throw std::runtime_error("mismatch"); + throw std::runtime_error("Incorrect value returned."); } }); + start += 10; + stop += 10; } for (int i = 0; i < N_THREADS; ++i) diff --git a/tests/test_mt.py b/tests/test_mt.py new file mode 100755 index 00000000..93c4f96b --- /dev/null +++ b/tests/test_mt.py @@ -0,0 +1,11 @@ +#!/usr/bin/env python3 + +import sys +import subprocess +import time +import test_runner + +# Set name of test to run +testPath = "./tests/object_store/test_mt" +# Call script to run the test +test_runner.runTest(testPath) From 684a9dc722cd2b99a9f16346c6340a204acc6bdb Mon Sep 17 00:00:00 2001 From: Tyler Davis Date: Thu, 13 Jul 2017 17:29:12 -0700 Subject: [PATCH 2/3] switched to atomic type --- src/client/TCPClient.cpp | 7 +------ src/client/TCPClient.h | 7 +++---- 2 files changed, 4 insertions(+), 10 deletions(-) diff --git a/src/client/TCPClient.cpp b/src/client/TCPClient.cpp index bb06c00d..5eb6dff3 100644 --- a/src/client/TCPClient.cpp +++ b/src/client/TCPClient.cpp @@ -9,6 +9,7 @@ #include #include #include +#include #include "common/schemas/TCPBladeMessage_generated.h" #include "utils/logging.h" #include "utils/utils.h" @@ -104,9 +105,7 @@ cirrus::Future TCPClient::write_async(ObjectID oid, const void* data, auto msg_contents = message::TCPBladeMessage::CreateWrite(*builder, oid, data_fb_vector); - curr_txn_id_lock.wait(); const int txn_id = curr_txn_id++; - curr_txn_id_lock.signal(); auto msg = message::TCPBladeMessage::CreateTCPBladeMessage( *builder, txn_id, @@ -138,9 +137,7 @@ cirrus::Future TCPClient::read_async(ObjectID oid, void* data, // Create and send read request auto msg_contents = message::TCPBladeMessage::CreateRead(*builder, oid); - curr_txn_id_lock.wait(); const int txn_id = curr_txn_id++; - curr_txn_id_lock.signal(); auto msg = message::TCPBladeMessage::CreateTCPBladeMessage( *builder, @@ -199,9 +196,7 @@ bool TCPClient::remove(ObjectID oid) { // Create and send removal request auto msg_contents = message::TCPBladeMessage::CreateRemove(*builder, oid); - curr_txn_id_lock.wait(); const int txn_id = curr_txn_id++; - curr_txn_id_lock.signal(); auto msg = message::TCPBladeMessage::CreateTCPBladeMessage( *builder, diff --git a/src/client/TCPClient.h b/src/client/TCPClient.h index 1c659ac0..d4ae7e67 100644 --- a/src/client/TCPClient.h +++ b/src/client/TCPClient.h @@ -5,6 +5,7 @@ #include #include #include +#include #include "common/schemas/TCPBladeMessage_generated.h" #include "client/BladeClient.h" #include "common/Future.h" @@ -66,10 +67,8 @@ class TCPClient : public BladeClient { }; /** fd of the socket used to communicate w/ remote store */ int sock = 0; - /**< next txn_id to assign */ - TxnID curr_txn_id = 0; - /** Lock on the current transaction id. */ - cirrus::SpinLock curr_txn_id_lock; + /** next txn_id to assign */ + std::atomic curr_txn_id = {0}; /** * Map that allows receiver thread to map transactions to their From b8024f4fc27e46d3361e7d1049a354391c052dfe Mon Sep 17 00:00:00 2001 From: Tyler Davis Date: Tue, 18 Jul 2017 11:42:47 -0700 Subject: [PATCH 3/3] Added comments --- src/client/TCPClient.h | 8 ++++++-- 1 file changed, 6 insertions(+), 2 deletions(-) diff --git a/src/client/TCPClient.h b/src/client/TCPClient.h index d4ae7e67..160c38df 100644 --- a/src/client/TCPClient.h +++ b/src/client/TCPClient.h @@ -67,12 +67,16 @@ class TCPClient : public BladeClient { }; /** fd of the socket used to communicate w/ remote store */ int sock = 0; - /** next txn_id to assign */ + /** Next txn_id to assign to a txn_info. Used as a unique identifier. */ std::atomic curr_txn_id = {0}; /** * Map that allows receiver thread to map transactions to their - * completion information. + * completion information. When a message is added to the send queue, + * a struct txn_info is created and added to this map. This struct + * allows the receiver thread to place information regarding completion + * as well as data in a location that is accessible to the future + * corresponding to the transaction. */ std::map> txn_map; /**