Skip to content

Commit

Permalink
feat(duplication): support duplication entry for multiple purposes
Browse files Browse the repository at this point in the history
  • Loading branch information
empiredan committed Dec 2, 2024
1 parent d9f2600 commit f641578
Show file tree
Hide file tree
Showing 7 changed files with 60 additions and 17 deletions.
13 changes: 12 additions & 1 deletion idl/duplication.thrift
Original file line number Diff line number Diff line change
Expand Up @@ -129,14 +129,21 @@ struct duplication_modify_response
2:i32 appid;
}

struct duplication_partition_state
{
1:i64 confirmed_decree;
2:i64 last_committed_decree;
}

struct duplication_entry
{
1:i32 dupid;
2:duplication_status status;
3:string remote;
4:i64 create_ts;

// partition_index => confirmed decree
// Used for syncing duplications(replica server -> meta server).
// partition index => confirmed decree.
5:optional map<i32, i64> progress;

7:optional duplication_fail_mode fail_mode;
Expand All @@ -150,6 +157,10 @@ struct duplication_entry
// For versions >= v2.6.0, this could be specified by client.
// For versions < v2.6.0, this must be the same with source replica_count.
9:optional i32 remote_replica_count;

// Used for listing duplications(client -> meta server).
// partition index => partition states.
10:optional map<i32, duplication_partition_state> partition_states;
}

// This request is sent from client to meta.
Expand Down
6 changes: 3 additions & 3 deletions src/meta/duplication/duplication_info.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -221,7 +221,7 @@ void duplication_info::persist_status()

std::string duplication_info::to_string() const
{
return duplication_entry_to_string(to_duplication_entry());
return duplication_entry_to_string(to_duplication_entry(duplication_entry_type::kPartitionLevelList));
}

blob duplication_info::to_json_blob() const
Expand Down Expand Up @@ -299,11 +299,11 @@ void duplication_info::append_if_valid_for_query(
{
zauto_read_lock l(_lock);

entry_list.emplace_back(to_duplication_entry());
entry_list.emplace_back(to_duplication_entry(duplication_entry_type::kDuplicationLevelInfo));
duplication_entry &ent = entry_list.back();
// the confirmed decree is not useful for displaying
// the overall state of duplication
ent.__isset.progress = false;
// ent.__isset.progress = false;
}

} // namespace dsn::replication
42 changes: 36 additions & 6 deletions src/meta/duplication/duplication_info.h
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
#include <fmt/core.h>
#include <algorithm>
#include <cstdint>
#include <functional>
#include <iosfwd>
#include <map>
#include <memory>
Expand Down Expand Up @@ -47,6 +48,13 @@ class duplication_info;

using duplication_info_s_ptr = std::shared_ptr<duplication_info>;

enum class duplication_entry_type: int
{
kDuplicationLevelInfo,
kPartitionLevelSync,
kPartitionLevelList,
};

/// This class is thread-safe.
class duplication_info
{
Expand Down Expand Up @@ -152,7 +160,7 @@ class duplication_info
void append_if_valid_for_query(const app_state &app,
/*out*/ std::vector<duplication_entry> &entry_list) const;

duplication_entry to_duplication_entry() const
duplication_entry to_duplication_entry(duplication_entry_type type) const
{
duplication_entry entry;
entry.dupid = id;
Expand All @@ -162,13 +170,24 @@ class duplication_info
entry.__set_fail_mode(_fail_mode);
entry.__set_remote_app_name(remote_app_name);
entry.__set_remote_replica_count(remote_replica_count);

if (type == duplication_entry_type::kDuplicationLevelInfo) {
return entry;
}

if (type == duplication_entry_type::kPartitionLevelSync) {
entry.__isset.progress = true;
for (const auto &kv : _progress) {
if (!kv.second.is_inited) {
continue;
}
entry.progress[kv.first] = kv.second.stored_decree;
insert_into_entry([](int partition_id, const partition_progress &partition_state, duplication_entry& entry){
entry.progress.emplace(partition_id, partition_state.stored_decree);
}, entry);
}
else if (type == duplication_entry_type::kPartitionLevelList) {
entry.__isset.partition_states= true;
insert_into_entry([](int partition_id, const partition_progress &partition_state, duplication_entry& entry){
entry.partition_states.emplace(partition_id, {partition_state.stored_decree, partition_state.last_committed_decree});
}, entry);
}

return entry;
}

Expand Down Expand Up @@ -208,6 +227,17 @@ class duplication_info
// To json encoded string.
std::string to_string() const;

void insert_into_entry(std::function<int, const partition_progress &, duplication_entry&> inserter, duplication_entry &entry) const
{
for (const auto &[partition_id, partition_state] : _progress) {
if (!partition_state.is_inited) {
continue;
}

inserter(partition_id,partition_state, entry);
}
}

friend class duplication_info_test;
friend class meta_duplication_service_test;

Expand Down
2 changes: 1 addition & 1 deletion src/meta/duplication/meta_duplication_service.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -425,7 +425,7 @@ void meta_duplication_service::duplication_sync(duplication_sync_rpc rpc)
}
}

response.dup_map[app_id][dup_id] = dup->to_duplication_entry();
response.dup_map[app_id][dup_id] = dup->to_duplication_entry(duplication_entry_type::kPartitionLevelSync);

// report progress periodically for each duplications
dup->report_progress_if_time_up();
Expand Down
2 changes: 1 addition & 1 deletion src/meta/test/duplication_info_test.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -181,7 +181,7 @@ class duplication_info_test : public testing::Test
ASSERT_EQ(duplication_status::DS_INIT, dup._status);
ASSERT_EQ(duplication_status::DS_INIT, dup._next_status);

auto dup_ent = dup.to_duplication_entry();
auto dup_ent = dup.to_duplication_entry(duplication_entry_type::kPartitionLevelSync);
ASSERT_EQ(0, dup_ent.progress.size());
ASSERT_EQ(kTestRemoteAppName, dup_ent.remote_app_name);
ASSERT_EQ(kTestRemoteReplicaCount, dup_ent.remote_replica_count);
Expand Down
2 changes: 1 addition & 1 deletion src/meta/test/meta_duplication_service_test.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -233,7 +233,7 @@ class meta_duplication_service_test : public meta_test_base
ASSERT_EQ(duplication_status::DS_INIT, dup->_status);
ASSERT_EQ(duplication_status::DS_INIT, dup->_next_status);

auto ent = dup->to_duplication_entry();
auto ent = dup->to_duplication_entry(duplication_entry_type::kPartitionLevelSync);
for (int j = 0; j < app->partition_count; j++) {
ASSERT_EQ(invalid_decree, ent.progress[j]);
}
Expand Down
10 changes: 6 additions & 4 deletions src/replica/duplication/replica_duplicator_manager.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -114,23 +114,25 @@ void replica_duplicator_manager::sync_duplication(const duplication_entry &ent)
dupid_t dupid = ent.dupid;
duplication_status::type next_status = ent.status;

replica_duplicator_u_ptr &dup = _duplications[dupid];
if (dup == nullptr) {
auto &dup = _duplications[dupid];
if (!dup) {
if (!is_duplication_status_invalid(next_status)) {
dup = std::make_unique<replica_duplicator>(ent, _replica);
} else {
LOG_ERROR_PREFIX("illegal duplication status: {}",
duplication_status_to_string(next_status));
}
} else {

return;
}

// update progress
duplication_progress newp = dup->progress().set_confirmed_decree(it->second);
CHECK_EQ_PREFIX(dup->update_progress(newp), error_s::ok());
dup->update_status_if_needed(next_status);
if (ent.__isset.fail_mode) {
dup->update_fail_mode(ent.fail_mode);
}
}
}

decree replica_duplicator_manager::min_confirmed_decree() const
Expand Down

0 comments on commit f641578

Please sign in to comment.