Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Ready] Fast Concurrent Deque Through Explicit Timestamping #127

Open
wants to merge 30 commits into
base: integration
Choose a base branch
from
Open
Changes from 1 commit
Commits
Show all changes
30 commits
Select commit Hold shift + click to select a range
9d2f2af
Init ts_deque
pr3sto Dec 4, 2018
4e31819
init deque_buffer
EduardBlees Dec 6, 2018
fff2b21
Merge branch 'develop' of github.com:EduardBlees/libcds into develop
EduardBlees Dec 6, 2018
c20d3c3
init timestamp
vikkiorrikki Dec 6, 2018
86ff17d
add Item and initialize method
pr3sto Dec 8, 2018
c7c96bf
try remove left impl
EduardBlees Dec 10, 2018
9f6d53e
threadcontext
EduardBlees Dec 10, 2018
92efc84
implement insert_right
pr3sto Dec 12, 2018
a136c71
timestamp hardware
vikkiorrikki Dec 12, 2018
0dcef53
insert_left implementation
pr3sto Dec 13, 2018
7fdf05c
try remove right impl
EduardBlees Dec 15, 2018
ab723b5
timestamp hardware interval
vikkiorrikki Dec 15, 2018
b2a8b91
implement ts_deque
pr3sto Dec 16, 2018
0178ecb
methods for fixing aba problem added
pr3sto Dec 16, 2018
2bb8dce
try remove fixes, use ABA functions
EduardBlees Dec 17, 2018
06542ae
fixes insert methods
pr3sto Dec 18, 2018
09052d7
timestamp atomic counter
vikkiorrikki Dec 19, 2018
546b8b3
assign threadcontext to each thread
pr3sto Dec 19, 2018
0e4fd30
code format
EduardBlees Dec 20, 2018
0181934
add clear, size and empty methods
pr3sto Dec 23, 2018
c4ecac4
prepare to pull request
pr3sto Dec 24, 2018
eb5e9f6
add unit tests
pr3sto Dec 24, 2018
a5ecc16
move asm code to compiler folder
pr3sto Dec 24, 2018
72f5ef0
fix
pr3sto Dec 24, 2018
ac02558
add copyright note
pr3sto Dec 25, 2018
430297f
fix emptyness check, thread_id
pr3sto Dec 27, 2018
ff40ba7
stress tests added
pr3sto Dec 28, 2018
5ee65b5
fix tests for different architectures
pr3sto Jan 16, 2019
d5ece4c
fix rdtsc and rdtscp for i386 arch
pr3sto Jan 16, 2019
2410e88
add rdtscp check
pr3sto Jan 18, 2019
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
try remove right impl
EduardBlees committed Dec 15, 2018
commit 7fdf05c48c998594ff41dfe5b18f6df7aefb9614
265 changes: 214 additions & 51 deletions cds/container/ts_deque_buffer.h
Original file line number Diff line number Diff line change
@@ -81,57 +81,53 @@ class TSDequeBuffer
}
}

/////////////////////////////////////////////////////////////////
// insert_right
/////////////////////////////////////////////////////////////////
inline std::atomic<uint64_t> *insert_left(T element) {
uint64_t thread_id = ThreadContext::get().thread_id();

// Create a new item.
Item *new_item = new Item();
timestamping_->init_top_atomic(new_item->timestamp);
new_item->data.store(element);
new_item->taken.store(0);
new_item->left.store(new_item);
// Items inserted at the left get negative indices. Thereby the
// order of items in the thread-local lists correspond with the
// order of indices, and we can use the sign of the index to
// determine on which side an item has been inserted.
new_item->index = -((*next_index_[thread_id])++);

// Determine leftmost not-taken item in the list. The new item is
// inserted to the left of that item.
Item* old_left = left_[thread_id]->load();

Item* left = old_left;
while (left->right.load() != left
&& left->taken.load()) {
left = left->right.load();
}

if (left->taken.load() && left->right.load() == left) {
left = old_left;
left->right.store(left);
Item* old_right = right_[thread_id]->load();
right_[thread_id]->store(left);
}

// Add the new item to the list.
new_item->right.store(left);
left->left.store(new_item);
left_[thread_id]->store(new_item);

// Return a pointer to the timestamp location of the item so that a
// timestamp can be added.
return new_item->timestamp;
}

/////////////////////////////////////////////////////////////////
// insert_right
/////////////////////////////////////////////////////////////////
inline std::atomic<uint64_t> *insert_left(T element)
{
uint64_t thread_id = ThreadContext::get().thread_id();

// Create a new item.
Item *new_item = new Item();
timestamping_->init_top_atomic(new_item->timestamp);
new_item->data.store(element);
new_item->taken.store(0);
new_item->left.store(new_item);
// Items inserted at the left get negative indices. Thereby the
// order of items in the thread-local lists correspond with the
// order of indices, and we can use the sign of the index to
// determine on which side an item has been inserted.
new_item->index = -((*next_index_[thread_id])++);

// Determine leftmost not-taken item in the list. The new item is
// inserted to the left of that item.
Item *old_left = left_[thread_id]->load();

Item *left = old_left;
while (left->right.load() != left && left->taken.load())
{
left = left->right.load();
}

if (left->taken.load() && left->right.load() == left)
{
left = old_left;
left->right.store(left);
Item *old_right = right_[thread_id]->load();
right_[thread_id]->store(left);
}

// Add the new item to the list.
new_item->right.store(left);
left->left.store(new_item);
left_[thread_id]->store(new_item);

// Return a pointer to the timestamp location of the item so that a
// timestamp can be added.
return new_item->timestamp;
}

inline std::atomic<uint64_t> *insert_right(T element)
{
uint64_t thread_id = scal::ThreadContext::get().thread_id();
uint64_t thread_id = ThreadContext::get().thread_id();

// Create a new item.
Item *new_item = new Item();
@@ -169,6 +165,72 @@ class TSDequeBuffer
return new_item->timestamp;
}

// Helper function which returns true if the item was inserted at the left.
inline bool inserted_left(Item *item)
{
return item->index.load() < 0;
}

// Helper function which returns true if the item was inserted at the right.
inline bool inserted_right(Item *item)
{
return item->index.load() > 0;
}

// Helper function which returns true if item1 is more left than item2.
inline bool is_more_left(Item *item1, uint64_t *timestamp1, Item *item2, uint64_t *timestamp2)
{
if (inserted_left(item2))
{
if (inserted_left(item1))
{
return timestamping_->is_later(timestamp1, timestamp2);
}
else
{
return false;
}
}
else
{
if (inserted_left(item1))
{
return true;
}
else
{
return timestamping_->is_later(timestamp2, timestamp1);
}
}
}

// Helper function which returns true if item1 is more right than item2.
inline bool is_more_right(Item *item1, uint64_t *timestamp1, Item *item2, uint64_t *timestamp2)
{
if (inserted_right(item2))
{
if (inserted_right(item1))
{
return timestamping_->is_later(timestamp1, timestamp2);
}
else
{
return false;
}
}
else
{
if (inserted_right(item1))
{
return true;
}
else
{
return timestamping_->is_later(timestamp2, timestamp1);
}
}
}

bool try_remove_left(T *element, uint64_t *invocation_time)
{
// Initialize the data needed for the emptiness check.
@@ -225,7 +287,7 @@ class TSDequeBuffer
// Try to adjust the remove pointer. It does not matter if
// this CAS fails.
left_[tmp_buffer_index]->compare_exchange_weak(
tmp_left, (Item *)add_next_aba(item, tmp_left, 0));
tmp_left, item);
*element = item->data.load();
return true;
}
@@ -261,7 +323,7 @@ class TSDequeBuffer
// Try to adjust the remove pointer. It does not matter if
// this CAS fails.
left_[buffer_index]->compare_exchange_weak(
old_left, (Item *)add_next_aba(result, old_left, 0));
old_left, item);

*element = result->data.load();
return true;
@@ -295,5 +357,106 @@ class TSDequeBuffer

bool try_remove_right(T *element, uint64_t *invocation_time)
{
// Initialize the data needed for the emptiness check.
uint64_t thread_id = ThreadContext::get().thread_id();
Item **emptiness_check_left =
emptiness_check_left_[thread_id];
Item **emptiness_check_right =
emptiness_check_right_[thread_id];
bool empty = true;
// Initialize the result pointer to NULL, which means that no
// element has been removed.
Item *result = NULL;
// Indicates the index which contains the youngest item.
uint64_t buffer_index = -1;
// Memory on the stack frame where timestamps of items can be stored
// temporarily.
uint64_t tmp_timestamp[2][2];
// Index in the tmp_timestamp array whihc is not used at the moment.
uint64_t tmp_index = 1;
timestamping_->init_sentinel(tmp_timestamp[0]);
uint64_t *timestamp = tmp_timestamp[0];
// Stores the value of the remove pointer of a thead-local buffer
// before the buffer is actually accessed.
Item *old_right = NULL;

// Read the start time of the iteration. Items which were timestamped
// after the start time and inserted at the left are not removed.
uint64_t start_time[2];
timestamping_->read_time(start_time);
// We start iterating over the thread-local lists at a random index.
uint64_t start = hwrand();
// We iterate over all thead-local buffers
for (uint64_t i = 0; i < num_threads_; i++)
{

uint64_t tmp_buffer_index = (start + i) % num_threads_;
// We get the remove/insert pointer of the current thread-local buffer.
Item *tmp_right = right_[tmp_buffer_index]->load();
// We get the youngest element from that thread-local buffer.
Item *item = get_right_item(tmp_buffer_index);
// If we found an element, we compare it to the youngest element
// we have found until now.
if (item != NULL)
{
empty = false;
uint64_t *item_timestamp;
timestamping_->load_timestamp(tmp_timestamp[tmp_index], item->timestamp);
item_timestamp = tmp_timestamp[tmp_index];

if (inserted_right(item) && !timestamping_->is_later(invocation_time, item_timestamp))
{
uint64_t expected = 0;
if (item->taken.load() == 0 && item->taken.compare_exchange_weak(expected, 1))
{
// Try to adjust the remove pointer. It does not matter if
// this CAS fails.
right_[tmp_buffer_index]->compare_exchange_weak(
tmp_right, item);
*element = item->data.load();
return true;
}
else
{
item = get_right_item(tmp_buffer_index);
if (item != NULL)
{
timestamping_->load_timestamp(tmp_timestamp[tmp_index], item->timestamp);
item_timestamp = tmp_timestamp[tmp_index];
}
}
}

if (item != NULL && (result == NULL || is_more_right(item, item_timestamp, result, timestamp)))
{
// We found a new youngest element, so we remember it.
result = item;
buffer_index = tmp_buffer_index;
timestamp = item_timestamp;
tmp_index ^= 1;
old_right = tmp_right;
}
}
else
{
// No element was found, work on the emptiness check.
if (emptiness_check_right[tmp_buffer_index] != tmp_right)
{
empty = false;
emptiness_check_right[tmp_buffer_index] =
tmp_right;
}
Item *tmp_left = left_[tmp_buffer_index]->load();
if (emptiness_check_left[tmp_buffer_index] != tmp_left)
{
empty = false;
emptiness_check_left[tmp_buffer_index] =
tmp_left;
}
}
}

*element = (T)NULL;
return !empty;
}
};