Skip to content

Commit

Permalink
rename replicas type
Browse files Browse the repository at this point in the history
  • Loading branch information
empiredan committed Dec 18, 2024
1 parent 6329ecb commit be70e2e
Show file tree
Hide file tree
Showing 3 changed files with 39 additions and 33 deletions.
32 changes: 18 additions & 14 deletions src/replica/replica_stub.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -488,7 +488,7 @@ std::vector<replica_stub::disk_replicas_info> replica_stub::get_all_disk_dirs()
void replica_stub::load_replica(dir_node *dn,
const std::string &replica_dir,
utils::ex_lock &reps_lock,
replicas &reps)
replica_map_by_gpid &reps)
{
LOG_INFO("loading replica: tag={}, replica_dir={}", dn->tag, replica_dir);

Expand Down Expand Up @@ -518,14 +518,16 @@ void replica_stub::load_replica(dir_node *dn,
utils::auto_lock<utils::ex_lock> l(reps_lock);
const auto rep_iter = reps.find(rep->get_gpid());
CHECK(rep_iter == reps.end(),
"conflict replica dir: {} <--> {}",
"{}@{}: newly loaded dir {} conflicts with existing {} while loading replica",
rep->get_gpid(),
dsn_primary_host_port(),
rep->dir(),
rep_iter->second->dir());

reps.emplace(rep->get_gpid(), rep);
}

void replica_stub::load_replicas(replicas &reps)
void replica_stub::load_replicas(replica_map_by_gpid &reps)
{
const auto &disks = get_all_disk_dirs();

Expand Down Expand Up @@ -615,8 +617,10 @@ void replica_stub::load_replicas(replicas &reps)
// Ensure that the thread pool is non-partitioned.
LPC_REPLICATION_INIT_LOAD,
&_tracker,
std::bind(static_cast<void (replica_stub::*)(
dir_node *, const std::string &, utils::ex_lock &, replicas &)>(
std::bind(static_cast<void (replica_stub::*)(dir_node *,
const std::string &,
utils::ex_lock &,
replica_map_by_gpid &)>(
&replica_stub::load_replica),
this,
dn,
Expand Down Expand Up @@ -729,7 +733,7 @@ void replica_stub::initialize(const replication_options &opts, bool clear /* = f
// Start to load replicas in available data directories.
LOG_INFO("start to load replicas");

replicas reps;
replica_map_by_gpid reps;

utils::chronograph chrono;
load_replicas(reps);
Expand Down Expand Up @@ -894,7 +898,7 @@ dsn::error_code replica_stub::on_kill_replica(gpid id)
{
LOG_INFO("kill replica: gpid = {}", id);
if (id.get_app_id() == -1 || id.get_partition_index() == -1) {
replicas rs;
replica_map_by_gpid rs;
{
zauto_read_lock l(_replicas_lock);
rs = _replicas;
Expand Down Expand Up @@ -1521,7 +1525,7 @@ void replica_stub::on_node_query_reply(error_code err,
resp.partitions.size(),
resp.gc_replicas.size());

replicas reps;
replica_map_by_gpid reps;
{
zauto_read_lock rl(_replicas_lock);
reps = _replicas;
Expand Down Expand Up @@ -1671,7 +1675,7 @@ void replica_stub::on_meta_server_disconnected()

_state = NS_Disconnected;

replicas reps;
replica_map_by_gpid reps;
{
zauto_read_lock rl(_replicas_lock);
reps = _replicas;
Expand Down Expand Up @@ -2064,7 +2068,7 @@ void replica_stub::open_replica(
METRIC_VAR_DECREMENT(opening_replicas);

CHECK(_replicas.find(id) == _replicas.end(), "replica {} is already in _replicas", id);
_replicas.insert(replicas::value_type(rep->get_gpid(), rep));
_replicas.insert(replica_map_by_gpid::value_type(rep->get_gpid(), rep));
METRIC_VAR_INCREMENT(total_replicas);

_closed_replicas.erase(id);
Expand Down Expand Up @@ -2645,14 +2649,14 @@ replica_stub::exec_command_on_replica(const std::vector<std::string> &args,
return std::string("invalid arguments");
}

replicas rs;
replica_map_by_gpid rs;
{
zauto_read_lock l(_replicas_lock);
rs = _replicas;
}

std::set<gpid> required_ids;
replicas choosed_rs;
replica_map_by_gpid choosed_rs;
if (!args.empty()) {
for (int i = 0; i < args.size(); i++) {
std::vector<std::string> arg_strs;
Expand Down Expand Up @@ -2915,7 +2919,7 @@ replica_ptr replica_stub::create_child_replica_if_not_found(gpid child_pid,
CHECK_NOTNULL(dn, "");
auto *rep = new replica(this, child_pid, *app, dn, false);
rep->_config.status = partition_status::PS_INACTIVE;
_replicas.insert(replicas::value_type(child_pid, rep));
_replicas.insert(replica_map_by_gpid::value_type(child_pid, rep));
LOG_INFO("mock create_child_replica_if_not_found succeed");
return rep;
});
Expand All @@ -2934,7 +2938,7 @@ replica_ptr replica_stub::create_child_replica_if_not_found(gpid child_pid,
} else {
replica *rep = new_replica(child_pid, *app, false, false, parent_dir);
if (rep != nullptr) {
auto pr = _replicas.insert(replicas::value_type(child_pid, rep));
auto pr = _replicas.insert(replica_map_by_gpid::value_type(child_pid, rep));
CHECK(pr.second, "child replica {} has been existed", rep->name());
METRIC_VAR_INCREMENT(total_replicas);
_closed_replicas.erase(child_pid);
Expand Down
36 changes: 19 additions & 17 deletions src/replica/replica_stub.h
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,6 @@
#pragma once

#include <gtest/gtest_prod.h>
#include <algorithm>
#include <atomic>
#include <cstdint>
#include <functional>
Expand Down Expand Up @@ -122,11 +121,10 @@ class test_checker;
}
class cold_backup_context;
class replica_split_manager;
typedef std::function<void(const ::dsn::host_port & /*from*/,
const replica_configuration & /*new_config*/,
bool /*is_closing*/)>
replica_state_subscriber;
typedef std::unordered_map<gpid, replica_ptr> replicas;

using replica_state_subscriber = std::function<void(const ::dsn::host_port & /*from*/,
const replica_configuration & /*new_config*/,
bool /*is_closing*/)>;

class replica_stub;

Expand Down Expand Up @@ -397,15 +395,17 @@ class replica_stub : public serverlet<replica_stub>, public ref_counter
// directory for a replica.
virtual replica_ptr load_replica(dir_node *dn, const std::string &replica_dir);

using replica_map_by_gpid = std::unordered_map<gpid, replica_ptr>;

// The same as the above `load_replica` function, except that this function is to load
// each replica to `reps` with protection from `reps_lock`.
void load_replica(dir_node *dn,
const std::string &replica_dir,
utils::ex_lock &reps_lock,
replicas &reps);
replica_map_by_gpid &reps);

// Load all replicas simultaneously from all disks to `reps`.
void load_replicas(replicas &reps);
void load_replicas(replica_map_by_gpid &reps);

// Clean up the memory state and on disk data if creating replica failed.
void clear_on_failure(replica *rep);
Expand Down Expand Up @@ -497,17 +497,19 @@ class replica_stub : public serverlet<replica_stub>, public ref_counter
FRIEND_TEST(replica_test, test_auto_trash_of_corruption);
FRIEND_TEST(replica_test, test_clear_on_failure);

typedef std::unordered_map<gpid, ::dsn::task_ptr> opening_replicas;
typedef std::unordered_map<gpid, std::tuple<task_ptr, replica_ptr, app_info, replica_info>>
closing_replicas; // <gpid, <close_task, replica, app_info, replica_info> >
typedef std::map<gpid, std::pair<app_info, replica_info>>
closed_replicas; // <gpid, <app_info, replica_info> >
using opening_replica_map_by_gpid = std::unordered_map<gpid, task_ptr>;

// `task_ptr` is the task being closed.
using closing_replica_map_by_gpid =
std::unordered_map<gpid, std::tuple<task_ptr, replica_ptr, app_info, replica_info>>;

using closed_replica_map_by_gpid = std::map<gpid, std::pair<app_info, replica_info>>;

mutable zrwlock_nr _replicas_lock;
replicas _replicas;
opening_replicas _opening_replicas;
closing_replicas _closing_replicas;
closed_replicas _closed_replicas;
replica_map_by_gpid _replicas;
opening_replica_map_by_gpid _opening_replicas;
closing_replica_map_by_gpid _closing_replicas;
closed_replica_map_by_gpid _closed_replicas;

::dsn::host_port _primary_host_port;
// The stringify of '_primary_host_port', used by logging usually.
Expand Down
4 changes: 2 additions & 2 deletions src/replica/test/load_replicas_test.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -103,7 +103,7 @@ class mock_load_replica : public replica_stub
PRESERVE_FLAG(max_replicas_on_load_for_each_disk);
FLAGS_max_replicas_on_load_for_each_disk = max_replicas_on_load_for_each_disk;

replicas actual_loaded_replicas;
replica_stub::replica_map_by_gpid actual_loaded_replicas;
load_replicas(actual_loaded_replicas);
ASSERT_EQ(_expected_loaded_replicas, actual_loaded_replicas);

Expand Down Expand Up @@ -197,7 +197,7 @@ class mock_load_replica : public replica_stub
std::vector<size_t> _disk_loaded_replicas_for_order;

mutable std::mutex _mtx;
replicas _expected_loaded_replicas;
replica_stub::replica_map_by_gpid _expected_loaded_replicas;

DISALLOW_COPY_AND_ASSIGN(mock_load_replica);
DISALLOW_MOVE_AND_ASSIGN(mock_load_replica);
Expand Down

0 comments on commit be70e2e

Please sign in to comment.