diff --git a/idl/duplication.thrift b/idl/duplication.thrift index 20562fd30c..604edd1d3b 100644 --- a/idl/duplication.thrift +++ b/idl/duplication.thrift @@ -22,6 +22,9 @@ namespace cpp dsn.replication namespace go admin namespace java org.apache.pegasus.replication +// Indicate which data of a table needs to be duplicated: +// * FULL: all of the data of the table needs to be duplicated. +// * INCREMENTAL: only incremental data of the table would be duplicated. enum duplication_mode { FULL = 0, @@ -135,9 +138,13 @@ struct duplication_modify_response 2:i32 appid; } +// The states tracking each partition for duplication. struct duplication_partition_state { + // The max decree of this partition that has been confirmed to be received by follower. 1:i64 confirmed_decree; + + // The max decree that has been committed by this partition. 2:i64 last_committed_decree; } @@ -148,7 +155,7 @@ struct duplication_entry 3:string remote; 4:i64 create_ts; - // Used for syncing duplications(replica server -> meta server). + // Used for syncing duplications with partition-level progress (replica server -> meta server). // partition index => confirmed decree. 5:optional map progress; @@ -164,10 +171,10 @@ struct duplication_entry // For versions < v2.6.0, this must be the same with source replica_count. 9:optional i32 remote_replica_count; - // TODO(wangdan) + // TODO(wangdan): would be supported later. 10:optional duplication_mode mode; - // Used for listing duplications(client -> meta server). + // Used for listing duplications with partition-level details (client -> meta server). // partition index => partition states. 11:optional map partition_states; } diff --git a/src/common/duplication_common.cpp b/src/common/duplication_common.cpp index c2f549a841..696308a64a 100644 --- a/src/common/duplication_common.cpp +++ b/src/common/duplication_common.cpp @@ -182,8 +182,8 @@ static nlohmann::json duplication_entry_to_json(const duplication_entry &ent) if (ent.__isset.progress) { nlohmann::json progress; - for (const auto &[partition_id, state] : ent.progress) { - progress[std::to_string(partition_id)] = state; + for (const auto &[partition_index, state] : ent.progress) { + progress[std::to_string(partition_index)] = state; } json["progress"] = progress; @@ -201,12 +201,12 @@ static nlohmann::json duplication_entry_to_json(const duplication_entry &ent) if (ent.__isset.partition_states) { nlohmann::json partition_states; - for (const auto &[partition_id, state] : ent.partition_states) { + for (const auto &[partition_index, state] : ent.partition_states) { nlohmann::json partition_state; partition_state["confirmed_decree"] = state.confirmed_decree; partition_state["last_committed_decree"] = state.last_committed_decree; - partition_states[std::to_string(partition_id)] = partition_state; + partition_states[std::to_string(partition_index)] = partition_state; } json["partition_states"] = partition_states; diff --git a/src/meta/duplication/duplication_info.h b/src/meta/duplication/duplication_info.h index 4e9b45ac7d..10106eedb9 100644 --- a/src/meta/duplication/duplication_info.h +++ b/src/meta/duplication/duplication_info.h @@ -152,6 +152,7 @@ class duplication_info // which is not thread safe for read. void append_as_entry(std::vector &entry_list) const; + // Build an entry including only duplication-level info. duplication_entry to_duplication_level_entry() const { duplication_entry entry; @@ -166,28 +167,32 @@ class duplication_info return entry; } + // Build an entry including also partition-level progress used for sync besides + // duplication-level info. duplication_entry to_partition_level_entry_for_sync() const { auto entry = to_duplication_level_entry(); entry.__isset.progress = true; - for (const auto &[partition_id, state] : _progress) { + for (const auto &[partition_index, state] : _progress) { if (!state.is_inited) { continue; } - entry.progress.emplace(partition_id, state.stored_decree); + entry.progress.emplace(partition_index, state.stored_decree); } return entry; } + // Build an entry including also partition-level detailed states used for list + // besides duplication-level info. duplication_entry to_partition_level_entry_for_list() const { auto entry = to_duplication_level_entry(); entry.__isset.partition_states = true; - for (const auto &[partition_id, state] : _progress) { + for (const auto &[partition_index, state] : _progress) { if (!state.is_inited) { continue; } @@ -196,7 +201,7 @@ class duplication_info partition_state.confirmed_decree = state.stored_decree; partition_state.last_committed_decree = state.last_committed_decree; - entry.partition_states.emplace(partition_id, partition_state); + entry.partition_states.emplace(partition_index, partition_state); } return entry; @@ -261,7 +266,7 @@ class duplication_info bool checkpoint_prepared{false}; }; - // partition_idx => progress + // partition_index => progress std::map _progress; uint64_t _last_progress_report_ms{0}; diff --git a/src/meta/test/duplication_info_test.cpp b/src/meta/test/duplication_info_test.cpp index cc7c6e46ee..a690d450d2 100644 --- a/src/meta/test/duplication_info_test.cpp +++ b/src/meta/test/duplication_info_test.cpp @@ -29,6 +29,7 @@ #include #include "gtest/gtest.h" +#include "gutil/map_util.h" #include "runtime/app_model.h" #include "test_util/test_util.h" #include "utils/flags.h" @@ -51,11 +52,46 @@ class duplication_info_test : public testing::Test dup._status = status; } - static void test_init_progress(duplication_info &dup, int partition_idx, decree expected_decree) + static void test_duplication_entry_for_sync(const duplication_info &dup, + int partition_index, + decree expected_confirmed_decree) { - dup.init_progress(partition_idx, expected_decree); + const auto &entry = dup.to_partition_level_entry_for_sync(); + ASSERT_TRUE(gutil::ContainsKey(entry.progress, partition_index)); + ASSERT_EQ(expected_confirmed_decree, gutil::FindOrDie(entry.progress, partition_index)); + } + + static void test_duplication_entry_for_list(const duplication_info &dup, + int partition_index, + decree expected_confirmed_decree, + decree expected_last_committed_decree) + { + const auto &entry = dup.to_partition_level_entry_for_list(); + ASSERT_TRUE(gutil::ContainsKey(entry.partition_states, partition_index)); + + const auto &state = gutil::FindOrDie(entry.partition_states, partition_index); + ASSERT_EQ(expected_confirmed_decree, state.confirmed_decree); + ASSERT_EQ(expected_last_committed_decree, state.last_committed_decree); + } + + static void test_duplication_entry(const duplication_info &dup, + int partition_index, + decree expected_confirmed_decree, + decree expected_last_committed_decree) + { + test_duplication_entry_for_sync(dup, partition_index, expected_confirmed_decree); + test_duplication_entry_for_list( + dup, partition_index, expected_confirmed_decree, expected_last_committed_decree); + } + + static void + test_init_progress(duplication_info &dup, int partition_index, decree expected_decree) + { + dup.init_progress(partition_index, expected_decree); + + ASSERT_TRUE(gutil::ContainsKey(dup._progress, partition_index)); - const auto &progress = dup._progress[partition_idx]; + const auto &progress = dup._progress[partition_index]; ASSERT_EQ(invalid_decree, progress.last_committed_decree); ASSERT_EQ(expected_decree, progress.volatile_decree); ASSERT_EQ(expected_decree, progress.stored_decree); @@ -63,6 +99,8 @@ class duplication_info_test : public testing::Test ASSERT_EQ(0, progress.last_progress_update_ms); ASSERT_TRUE(progress.is_inited); ASSERT_FALSE(progress.checkpoint_prepared); + + test_duplication_entry(dup, partition_index, expected_decree, invalid_decree); } static void test_alter_progress() @@ -96,6 +134,7 @@ class duplication_info_test : public testing::Test ASSERT_EQ(invalid_decree, dup._progress[0].stored_decree); ASSERT_TRUE(dup._progress[0].is_altering); ASSERT_TRUE(dup._progress[0].checkpoint_prepared); + test_duplication_entry(dup, 0, invalid_decree, 8); // Busy updating. entry.__set_last_committed_decree(15); @@ -109,6 +148,7 @@ class duplication_info_test : public testing::Test ASSERT_EQ(invalid_decree, dup._progress[0].stored_decree); ASSERT_TRUE(dup._progress[0].is_altering); ASSERT_TRUE(dup._progress[0].checkpoint_prepared); + test_duplication_entry(dup, 0, invalid_decree, 15); // Persist progress for partition 0. dup.persist_progress(0); @@ -118,6 +158,7 @@ class duplication_info_test : public testing::Test ASSERT_EQ(5, dup._progress[0].stored_decree); ASSERT_FALSE(dup._progress[0].is_altering); ASSERT_TRUE(dup._progress[0].checkpoint_prepared); + test_duplication_entry(dup, 0, 5, 15); // Initialize progress for partition 1. test_init_progress(dup, 1, 5); @@ -130,6 +171,7 @@ class duplication_info_test : public testing::Test ASSERT_EQ(5, dup._progress[1].stored_decree); ASSERT_TRUE(dup._progress[1].is_altering); ASSERT_FALSE(dup._progress[1].checkpoint_prepared); + test_duplication_entry(dup, 1, 5, 15); // Persist progress for partition 1. dup.persist_progress(1); @@ -148,6 +190,7 @@ class duplication_info_test : public testing::Test ASSERT_FALSE(dup._progress[1].is_altering); // checkpoint_prepared would be updated successfully even if it is too frequent. ASSERT_TRUE(dup._progress[1].checkpoint_prepared); + test_duplication_entry(dup, 1, 10, 25); // Reduce last update timestamp to make it infrequent. dup._progress[1].last_progress_update_ms -= FLAGS_dup_progress_min_update_period_ms + 100; @@ -160,6 +203,7 @@ class duplication_info_test : public testing::Test ASSERT_EQ(10, dup._progress[1].stored_decree); ASSERT_TRUE(dup._progress[1].is_altering); ASSERT_TRUE(dup._progress[1].checkpoint_prepared); + test_duplication_entry(dup, 1, 10, 26); // Checkpoint are ready for both partition 0 and 1. ASSERT_TRUE(dup.all_checkpoint_has_prepared()); @@ -181,17 +225,24 @@ class duplication_info_test : public testing::Test ASSERT_EQ(duplication_status::DS_INIT, dup._status); ASSERT_EQ(duplication_status::DS_INIT, dup._next_status); - auto dup_ent = dup.to_partition_level_entry_for_sync(); - ASSERT_EQ(0, dup_ent.progress.size()); - ASSERT_EQ(kTestRemoteAppName, dup_ent.remote_app_name); - ASSERT_EQ(kTestRemoteReplicaCount, dup_ent.remote_replica_count); + { + const auto &entry = dup.to_partition_level_entry_for_sync(); + ASSERT_TRUE(entry.progress.empty()); + ASSERT_EQ(kTestRemoteAppName, entry.remote_app_name); + ASSERT_EQ(kTestRemoteReplicaCount, entry.remote_replica_count); + } - for (int i = 0; i < 4; i++) { + for (int i = 0; i < 4; ++i) { dup.init_progress(i, invalid_decree); } - for (auto kv : dup_ent.progress) { - ASSERT_EQ(invalid_decree, kv.second); + { + const auto &entry = dup.to_partition_level_entry_for_sync(); + ASSERT_EQ(4, entry.progress.size()); + for (int partition_index = 0; partition_index < 4; ++partition_index) { + ASSERT_TRUE(gutil::ContainsKey(entry.progress, partition_index)); + ASSERT_EQ(invalid_decree, gutil::FindOrDie(entry.progress, partition_index)); + } } dup.start(); diff --git a/src/meta/test/meta_duplication_service_test.cpp b/src/meta/test/meta_duplication_service_test.cpp index ef398b08fa..5e0ebe6b19 100644 --- a/src/meta/test/meta_duplication_service_test.cpp +++ b/src/meta/test/meta_duplication_service_test.cpp @@ -46,6 +46,7 @@ #include "dsn.layer2_types.h" #include "duplication_types.h" #include "gtest/gtest.h" +#include "gutil/map_util.h" #include "http/http_server.h" #include "http/http_status_code.h" #include "meta/duplication/duplication_info.h" @@ -233,9 +234,12 @@ class meta_duplication_service_test : public meta_test_base ASSERT_EQ(duplication_status::DS_INIT, dup->_status); ASSERT_EQ(duplication_status::DS_INIT, dup->_next_status); - auto ent = dup->to_partition_level_entry_for_sync(); - for (int j = 0; j < app->partition_count; j++) { - ASSERT_EQ(invalid_decree, ent.progress[j]); + const auto &entry = dup->to_partition_level_entry_for_sync(); + ASSERT_EQ(app->partition_count, entry.progress.size()); + for (int partition_index = 0; partition_index < app->partition_count; + ++partition_index) { + ASSERT_TRUE(gutil::ContainsKey(entry.progress, partition_index)); + ASSERT_EQ(invalid_decree, gutil::FindOrDie(entry.progress, partition_index)); } if (last_dup != 0) {