Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fixed test_mt and made TCPClient threadsafe #83

Merged
merged 4 commits into from
Jul 20, 2017
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion Makefile.am
Original file line number Diff line number Diff line change
@@ -1,7 +1,8 @@
AUTOMAKE_OPTIONS = foreign serial-tests
SUBDIRS = src tests benchmarks
TESTS = ./tests/basic_test.py ./tests/test_mem_exhaustion.py ./tests/test_store_simple.py \
./tests/test_cache_manager.py ./tests/test_iterator.py ./tests/test_store.py
./tests/test_cache_manager.py ./tests/test_iterator.py ./tests/test_store.py \
./tests/test_mt.py

source_dir = src/
DEFAULT_INCLUDES = -I$(source_dir)
Expand Down
28 changes: 20 additions & 8 deletions src/client/TCPClient.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
#include "utils/utils.h"
#include "common/Future.h"
#include "common/Exception.h"
#include "common/Synchronization.h"

namespace cirrus {

Expand Down Expand Up @@ -103,15 +104,18 @@ cirrus::Future TCPClient::write_async(ObjectID oid, const void* data,
auto msg_contents = message::TCPBladeMessage::CreateWrite(*builder,
oid,
data_fb_vector);
curr_txn_id_lock.wait();
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think you need this. There are atomic types in C++

http://en.cppreference.com/w/cpp/atomic/atomic

const int txn_id = curr_txn_id++;
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As far as I understand, you use a map txn_id->txn_info to keep the state of a put/get operation. This is necessary because our network layer works as an event handler.

I think this needs to be briefly explained in the code. Probably in a comment to txn_map.

curr_txn_id_lock.signal();
auto msg = message::TCPBladeMessage::CreateTCPBladeMessage(
*builder,
curr_txn_id,
txn_id,
0,
message::TCPBladeMessage::Message_Write,
msg_contents.Union());
builder->Finish(msg);

return enqueue_message(builder);
return enqueue_message(builder, txn_id);
}

/**
Expand All @@ -134,15 +138,19 @@ cirrus::Future TCPClient::read_async(ObjectID oid, void* data,
// Create and send read request
auto msg_contents = message::TCPBladeMessage::CreateRead(*builder, oid);

curr_txn_id_lock.wait();
const int txn_id = curr_txn_id++;
curr_txn_id_lock.signal();

auto msg = message::TCPBladeMessage::CreateTCPBladeMessage(
*builder,
curr_txn_id,
txn_id,
0,
message::TCPBladeMessage::Message_Read,
msg_contents.Union());
builder->Finish(msg);

return enqueue_message(builder, data);
return enqueue_message(builder, txn_id, data);
}

/**
Expand Down Expand Up @@ -191,15 +199,19 @@ bool TCPClient::remove(ObjectID oid) {
// Create and send removal request
auto msg_contents = message::TCPBladeMessage::CreateRemove(*builder, oid);

curr_txn_id_lock.wait();
const int txn_id = curr_txn_id++;
curr_txn_id_lock.signal();

auto msg = message::TCPBladeMessage::CreateTCPBladeMessage(
*builder,
curr_txn_id,
txn_id,
0,
message::TCPBladeMessage::Message_Remove,
msg_contents.Union());
builder->Finish(msg);

cirrus::Future future = enqueue_message(builder);
cirrus::Future future = enqueue_message(builder, txn_id);
return future.get();
}

Expand Down Expand Up @@ -424,7 +436,7 @@ void TCPClient::process_send() {
*/
cirrus::Future TCPClient::enqueue_message(
std::shared_ptr<flatbuffers::FlatBufferBuilder> builder,
void *ptr) {
const int txn_id, void *ptr) {
std::shared_ptr<struct txn_info> txn = std::make_shared<struct txn_info>();

txn->mem_for_read = ptr;
Expand All @@ -433,7 +445,7 @@ cirrus::Future TCPClient::enqueue_message(
map_lock.wait();

// Add to map
txn_map[curr_txn_id++] = txn;
txn_map[txn_id] = txn;

// Release lock on map
map_lock.signal();
Expand Down
10 changes: 7 additions & 3 deletions src/client/TCPClient.h
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ class TCPClient : public BladeClient {
ssize_t send_all(int, const void*, size_t, int);
cirrus::Future enqueue_message(
std::shared_ptr<flatbuffers::FlatBufferBuilder> builder,
const int txn_id,
void *ptr = nullptr);
void process_received();
void process_send();
Expand All @@ -63,9 +64,12 @@ class TCPClient : public BladeClient {
error_code = std::make_shared<cirrus::ErrorCodes>();
}
};

int sock = 0; /**< fd of the socket used to communicate w/ remote store */
TxnID curr_txn_id = 0; /**< next txn_id to assign */
/** fd of the socket used to communicate w/ remote store */
int sock = 0;
/**< next txn_id to assign */
TxnID curr_txn_id = 0;
/** Lock on the current transaction id. */
cirrus::SpinLock curr_txn_id_lock;

/**
* Map that allows receiver thread to map transactions to their
Expand Down
19 changes: 11 additions & 8 deletions tests/object_store/test_mt.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -28,8 +28,8 @@ cirrus::ostore::FullBladeObjectStoreTempl<cirrus::Dummy<SIZE>> store(IP, PORT,

/**
* Tests that behavior is as expected when multiple threads make get and put
* requests to the remote store. These clients all use the same instance of
* the store to connect. Currently not working.
* requests to the remote store. These threads all use the same instance of
* the store to connect.
*/
void test_mt() {
cirrus::TimerFunction tf("connect time", true);
Expand All @@ -39,20 +39,23 @@ void test_mt() {
std::random_device rd;
std::mt19937 gen(rd());
std::uniform_int_distribution<> dis(1, 10);

int start = 0;
int stop = 10;
for (int i = 0; i < N_THREADS; ++i) {
threads[i] = new std::thread([dis, gen]() {
for (int i = 0; i < 100; ++i) {
threads[i] = new std::thread([dis, gen, start, stop]() {
for (int i = start; i < stop; i++) {
int rnd = std::rand();
struct cirrus::Dummy<SIZE> d(rnd);

store.put(1, d);
cirrus::Dummy<SIZE> d2 = store.get(1);
store.put(i, d);
cirrus::Dummy<SIZE> d2 = store.get(i);

if (d2.id != rnd)
throw std::runtime_error("mismatch");
throw std::runtime_error("Incorrect value returned.");
}
});
start += 10;
stop += 10;
}

for (int i = 0; i < N_THREADS; ++i)
Expand Down
11 changes: 11 additions & 0 deletions tests/test_mt.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
#!/usr/bin/env python3

import sys
import subprocess
import time
import test_runner

# Set name of test to run
testPath = "./tests/object_store/test_mt"
# Call script to run the test
test_runner.runTest(testPath)