Skip to content

Commit

Permalink
add tests
Browse files Browse the repository at this point in the history
  • Loading branch information
empiredan committed Dec 3, 2024
1 parent 566789a commit a6f751b
Show file tree
Hide file tree
Showing 5 changed files with 92 additions and 25 deletions.
13 changes: 10 additions & 3 deletions idl/duplication.thrift
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,9 @@ namespace cpp dsn.replication
namespace go admin
namespace java org.apache.pegasus.replication

// Indicate which data of a table needs to be duplicated:
// * FULL: all of the data of the table needs to be duplicated.
// * INCREMENTAL: only incremental data of the table would be duplicated.
enum duplication_mode
{
FULL = 0,
Expand Down Expand Up @@ -135,9 +138,13 @@ struct duplication_modify_response
2:i32 appid;
}

// The states tracking each partition for duplication.
struct duplication_partition_state
{
// The max decree of this partition that has been confirmed to be received by follower.
1:i64 confirmed_decree;

// The max decree that has been committed by this partition.
2:i64 last_committed_decree;
}

Expand All @@ -148,7 +155,7 @@ struct duplication_entry
3:string remote;
4:i64 create_ts;

// Used for syncing duplications(replica server -> meta server).
// Used for syncing duplications with partition-level progress (replica server -> meta server).
// partition index => confirmed decree.
5:optional map<i32, i64> progress;

Expand All @@ -164,10 +171,10 @@ struct duplication_entry
// For versions < v2.6.0, this must be the same with source replica_count.
9:optional i32 remote_replica_count;

// TODO(wangdan)
// TODO(wangdan): would be supported later.
10:optional duplication_mode mode;

// Used for listing duplications(client -> meta server).
// Used for listing duplications with partition-level details (client -> meta server).
// partition index => partition states.
11:optional map<i32, duplication_partition_state> partition_states;
}
Expand Down
8 changes: 4 additions & 4 deletions src/common/duplication_common.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -182,8 +182,8 @@ static nlohmann::json duplication_entry_to_json(const duplication_entry &ent)

if (ent.__isset.progress) {
nlohmann::json progress;
for (const auto &[partition_id, state] : ent.progress) {
progress[std::to_string(partition_id)] = state;
for (const auto &[partition_index, state] : ent.progress) {
progress[std::to_string(partition_index)] = state;
}

json["progress"] = progress;
Expand All @@ -201,12 +201,12 @@ static nlohmann::json duplication_entry_to_json(const duplication_entry &ent)

if (ent.__isset.partition_states) {
nlohmann::json partition_states;
for (const auto &[partition_id, state] : ent.partition_states) {
for (const auto &[partition_index, state] : ent.partition_states) {
nlohmann::json partition_state;
partition_state["confirmed_decree"] = state.confirmed_decree;
partition_state["last_committed_decree"] = state.last_committed_decree;

partition_states[std::to_string(partition_id)] = partition_state;
partition_states[std::to_string(partition_index)] = partition_state;
}

json["partition_states"] = partition_states;
Expand Down
15 changes: 10 additions & 5 deletions src/meta/duplication/duplication_info.h
Original file line number Diff line number Diff line change
Expand Up @@ -152,6 +152,7 @@ class duplication_info
// which is not thread safe for read.
void append_as_entry(std::vector<duplication_entry> &entry_list) const;

// Build an entry including only duplication-level info.
duplication_entry to_duplication_level_entry() const
{
duplication_entry entry;
Expand All @@ -166,28 +167,32 @@ class duplication_info
return entry;
}

// Build an entry including also partition-level progress used for sync besides
// duplication-level info.
duplication_entry to_partition_level_entry_for_sync() const
{
auto entry = to_duplication_level_entry();

entry.__isset.progress = true;
for (const auto &[partition_id, state] : _progress) {
for (const auto &[partition_index, state] : _progress) {
if (!state.is_inited) {
continue;
}

entry.progress.emplace(partition_id, state.stored_decree);
entry.progress.emplace(partition_index, state.stored_decree);
}

return entry;
}

// Build an entry including also partition-level detailed states used for list
// besides duplication-level info.
duplication_entry to_partition_level_entry_for_list() const
{
auto entry = to_duplication_level_entry();

entry.__isset.partition_states = true;
for (const auto &[partition_id, state] : _progress) {
for (const auto &[partition_index, state] : _progress) {
if (!state.is_inited) {
continue;
}
Expand All @@ -196,7 +201,7 @@ class duplication_info
partition_state.confirmed_decree = state.stored_decree;
partition_state.last_committed_decree = state.last_committed_decree;

entry.partition_states.emplace(partition_id, partition_state);
entry.partition_states.emplace(partition_index, partition_state);
}

return entry;
Expand Down Expand Up @@ -261,7 +266,7 @@ class duplication_info
bool checkpoint_prepared{false};
};

// partition_idx => progress
// partition_index => progress
std::map<int, partition_progress> _progress;

uint64_t _last_progress_report_ms{0};
Expand Down
71 changes: 61 additions & 10 deletions src/meta/test/duplication_info_test.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
#include <boost/algorithm/string/replace.hpp>

#include "gtest/gtest.h"
#include "gutil/map_util.h"
#include "runtime/app_model.h"
#include "test_util/test_util.h"
#include "utils/flags.h"
Expand All @@ -51,18 +52,55 @@ class duplication_info_test : public testing::Test
dup._status = status;
}

static void test_init_progress(duplication_info &dup, int partition_idx, decree expected_decree)
static void test_duplication_entry_for_sync(const duplication_info &dup,
int partition_index,
decree expected_confirmed_decree)
{
dup.init_progress(partition_idx, expected_decree);
const auto &entry = dup.to_partition_level_entry_for_sync();
ASSERT_TRUE(gutil::ContainsKey(entry.progress, partition_index));
ASSERT_EQ(expected_confirmed_decree, gutil::FindOrDie(entry.progress, partition_index));
}

static void test_duplication_entry_for_list(const duplication_info &dup,
int partition_index,
decree expected_confirmed_decree,
decree expected_last_committed_decree)
{
const auto &entry = dup.to_partition_level_entry_for_list();
ASSERT_TRUE(gutil::ContainsKey(entry.partition_states, partition_index));

const auto &state = gutil::FindOrDie(entry.partition_states, partition_index);
ASSERT_EQ(expected_confirmed_decree, state.confirmed_decree);
ASSERT_EQ(expected_last_committed_decree, state.last_committed_decree);
}

static void test_duplication_entry(const duplication_info &dup,
int partition_index,
decree expected_confirmed_decree,
decree expected_last_committed_decree)
{
test_duplication_entry_for_sync(dup, partition_index, expected_confirmed_decree);
test_duplication_entry_for_list(
dup, partition_index, expected_confirmed_decree, expected_last_committed_decree);
}

static void
test_init_progress(duplication_info &dup, int partition_index, decree expected_decree)
{
dup.init_progress(partition_index, expected_decree);

ASSERT_TRUE(gutil::ContainsKey(dup._progress, partition_index));

const auto &progress = dup._progress[partition_idx];
const auto &progress = dup._progress[partition_index];
ASSERT_EQ(invalid_decree, progress.last_committed_decree);
ASSERT_EQ(expected_decree, progress.volatile_decree);
ASSERT_EQ(expected_decree, progress.stored_decree);
ASSERT_FALSE(progress.is_altering);
ASSERT_EQ(0, progress.last_progress_update_ms);
ASSERT_TRUE(progress.is_inited);
ASSERT_FALSE(progress.checkpoint_prepared);

test_duplication_entry(dup, partition_index, expected_decree, invalid_decree);
}

static void test_alter_progress()
Expand Down Expand Up @@ -96,6 +134,7 @@ class duplication_info_test : public testing::Test
ASSERT_EQ(invalid_decree, dup._progress[0].stored_decree);
ASSERT_TRUE(dup._progress[0].is_altering);
ASSERT_TRUE(dup._progress[0].checkpoint_prepared);
test_duplication_entry(dup, 0, invalid_decree, 8);

// Busy updating.
entry.__set_last_committed_decree(15);
Expand All @@ -109,6 +148,7 @@ class duplication_info_test : public testing::Test
ASSERT_EQ(invalid_decree, dup._progress[0].stored_decree);
ASSERT_TRUE(dup._progress[0].is_altering);
ASSERT_TRUE(dup._progress[0].checkpoint_prepared);
test_duplication_entry(dup, 0, invalid_decree, 15);

// Persist progress for partition 0.
dup.persist_progress(0);
Expand All @@ -118,6 +158,7 @@ class duplication_info_test : public testing::Test
ASSERT_EQ(5, dup._progress[0].stored_decree);
ASSERT_FALSE(dup._progress[0].is_altering);
ASSERT_TRUE(dup._progress[0].checkpoint_prepared);
test_duplication_entry(dup, 0, 5, 15);

// Initialize progress for partition 1.
test_init_progress(dup, 1, 5);
Expand All @@ -130,6 +171,7 @@ class duplication_info_test : public testing::Test
ASSERT_EQ(5, dup._progress[1].stored_decree);
ASSERT_TRUE(dup._progress[1].is_altering);
ASSERT_FALSE(dup._progress[1].checkpoint_prepared);
test_duplication_entry(dup, 1, 5, 15);

// Persist progress for partition 1.
dup.persist_progress(1);
Expand All @@ -148,6 +190,7 @@ class duplication_info_test : public testing::Test
ASSERT_FALSE(dup._progress[1].is_altering);
// checkpoint_prepared would be updated successfully even if it is too frequent.
ASSERT_TRUE(dup._progress[1].checkpoint_prepared);
test_duplication_entry(dup, 1, 10, 25);

// Reduce last update timestamp to make it infrequent.
dup._progress[1].last_progress_update_ms -= FLAGS_dup_progress_min_update_period_ms + 100;
Expand All @@ -160,6 +203,7 @@ class duplication_info_test : public testing::Test
ASSERT_EQ(10, dup._progress[1].stored_decree);
ASSERT_TRUE(dup._progress[1].is_altering);
ASSERT_TRUE(dup._progress[1].checkpoint_prepared);
test_duplication_entry(dup, 1, 10, 26);

// Checkpoint are ready for both partition 0 and 1.
ASSERT_TRUE(dup.all_checkpoint_has_prepared());
Expand All @@ -181,17 +225,24 @@ class duplication_info_test : public testing::Test
ASSERT_EQ(duplication_status::DS_INIT, dup._status);
ASSERT_EQ(duplication_status::DS_INIT, dup._next_status);

auto dup_ent = dup.to_partition_level_entry_for_sync();
ASSERT_EQ(0, dup_ent.progress.size());
ASSERT_EQ(kTestRemoteAppName, dup_ent.remote_app_name);
ASSERT_EQ(kTestRemoteReplicaCount, dup_ent.remote_replica_count);
{
const auto &entry = dup.to_partition_level_entry_for_sync();
ASSERT_TRUE(entry.progress.empty());
ASSERT_EQ(kTestRemoteAppName, entry.remote_app_name);
ASSERT_EQ(kTestRemoteReplicaCount, entry.remote_replica_count);
}

for (int i = 0; i < 4; i++) {
for (int i = 0; i < 4; ++i) {
dup.init_progress(i, invalid_decree);
}

for (auto kv : dup_ent.progress) {
ASSERT_EQ(invalid_decree, kv.second);
{
const auto &entry = dup.to_partition_level_entry_for_sync();
ASSERT_EQ(4, entry.progress.size());
for (int partition_index = 0; partition_index < 4; ++partition_index) {
ASSERT_TRUE(gutil::ContainsKey(entry.progress, partition_index));
ASSERT_EQ(invalid_decree, gutil::FindOrDie(entry.progress, partition_index));
}
}

dup.start();
Expand Down
10 changes: 7 additions & 3 deletions src/meta/test/meta_duplication_service_test.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@
#include "dsn.layer2_types.h"
#include "duplication_types.h"
#include "gtest/gtest.h"
#include "gutil/map_util.h"
#include "http/http_server.h"
#include "http/http_status_code.h"
#include "meta/duplication/duplication_info.h"
Expand Down Expand Up @@ -233,9 +234,12 @@ class meta_duplication_service_test : public meta_test_base
ASSERT_EQ(duplication_status::DS_INIT, dup->_status);
ASSERT_EQ(duplication_status::DS_INIT, dup->_next_status);

auto ent = dup->to_partition_level_entry_for_sync();
for (int j = 0; j < app->partition_count; j++) {
ASSERT_EQ(invalid_decree, ent.progress[j]);
const auto &entry = dup->to_partition_level_entry_for_sync();
ASSERT_EQ(app->partition_count, entry.progress.size());
for (int partition_index = 0; partition_index < app->partition_count;
++partition_index) {
ASSERT_TRUE(gutil::ContainsKey(entry.progress, partition_index));
ASSERT_EQ(invalid_decree, gutil::FindOrDie(entry.progress, partition_index));
}

if (last_dup != 0) {
Expand Down

0 comments on commit a6f751b

Please sign in to comment.