diff --git a/src/replica/replica_stub.cpp b/src/replica/replica_stub.cpp index 6b21ba83fd..070f843f21 100644 --- a/src/replica/replica_stub.cpp +++ b/src/replica/replica_stub.cpp @@ -488,7 +488,7 @@ std::vector replica_stub::get_all_disk_dirs() void replica_stub::load_replica(dir_node *dn, const std::string &replica_dir, utils::ex_lock &reps_lock, - replicas &reps) + replica_map_by_gpid &reps) { LOG_INFO("loading replica: tag={}, replica_dir={}", dn->tag, replica_dir); @@ -518,14 +518,16 @@ void replica_stub::load_replica(dir_node *dn, utils::auto_lock l(reps_lock); const auto rep_iter = reps.find(rep->get_gpid()); CHECK(rep_iter == reps.end(), - "conflict replica dir: {} <--> {}", + "{}@{}: newly loaded dir {} conflicts with existing {} while loading replica", + rep->get_gpid(), + dsn_primary_host_port(), rep->dir(), rep_iter->second->dir()); reps.emplace(rep->get_gpid(), rep); } -void replica_stub::load_replicas(replicas &reps) +void replica_stub::load_replicas(replica_map_by_gpid &reps) { const auto &disks = get_all_disk_dirs(); @@ -615,8 +617,10 @@ void replica_stub::load_replicas(replicas &reps) // Ensure that the thread pool is non-partitioned. LPC_REPLICATION_INIT_LOAD, &_tracker, - std::bind(static_cast( + std::bind(static_cast( &replica_stub::load_replica), this, dn, @@ -729,7 +733,7 @@ void replica_stub::initialize(const replication_options &opts, bool clear /* = f // Start to load replicas in available data directories. LOG_INFO("start to load replicas"); - replicas reps; + replica_map_by_gpid reps; utils::chronograph chrono; load_replicas(reps); @@ -894,7 +898,7 @@ dsn::error_code replica_stub::on_kill_replica(gpid id) { LOG_INFO("kill replica: gpid = {}", id); if (id.get_app_id() == -1 || id.get_partition_index() == -1) { - replicas rs; + replica_map_by_gpid rs; { zauto_read_lock l(_replicas_lock); rs = _replicas; @@ -1521,7 +1525,7 @@ void replica_stub::on_node_query_reply(error_code err, resp.partitions.size(), resp.gc_replicas.size()); - replicas reps; + replica_map_by_gpid reps; { zauto_read_lock rl(_replicas_lock); reps = _replicas; @@ -1671,7 +1675,7 @@ void replica_stub::on_meta_server_disconnected() _state = NS_Disconnected; - replicas reps; + replica_map_by_gpid reps; { zauto_read_lock rl(_replicas_lock); reps = _replicas; @@ -2064,7 +2068,7 @@ void replica_stub::open_replica( METRIC_VAR_DECREMENT(opening_replicas); CHECK(_replicas.find(id) == _replicas.end(), "replica {} is already in _replicas", id); - _replicas.insert(replicas::value_type(rep->get_gpid(), rep)); + _replicas.insert(replica_map_by_gpid::value_type(rep->get_gpid(), rep)); METRIC_VAR_INCREMENT(total_replicas); _closed_replicas.erase(id); @@ -2645,14 +2649,14 @@ replica_stub::exec_command_on_replica(const std::vector &args, return std::string("invalid arguments"); } - replicas rs; + replica_map_by_gpid rs; { zauto_read_lock l(_replicas_lock); rs = _replicas; } std::set required_ids; - replicas choosed_rs; + replica_map_by_gpid choosed_rs; if (!args.empty()) { for (int i = 0; i < args.size(); i++) { std::vector arg_strs; @@ -2915,7 +2919,7 @@ replica_ptr replica_stub::create_child_replica_if_not_found(gpid child_pid, CHECK_NOTNULL(dn, ""); auto *rep = new replica(this, child_pid, *app, dn, false); rep->_config.status = partition_status::PS_INACTIVE; - _replicas.insert(replicas::value_type(child_pid, rep)); + _replicas.insert(replica_map_by_gpid::value_type(child_pid, rep)); LOG_INFO("mock create_child_replica_if_not_found succeed"); return rep; }); @@ -2934,7 +2938,7 @@ replica_ptr replica_stub::create_child_replica_if_not_found(gpid child_pid, } else { replica *rep = new_replica(child_pid, *app, false, false, parent_dir); if (rep != nullptr) { - auto pr = _replicas.insert(replicas::value_type(child_pid, rep)); + auto pr = _replicas.insert(replica_map_by_gpid::value_type(child_pid, rep)); CHECK(pr.second, "child replica {} has been existed", rep->name()); METRIC_VAR_INCREMENT(total_replicas); _closed_replicas.erase(child_pid); diff --git a/src/replica/replica_stub.h b/src/replica/replica_stub.h index dc6ac4074e..c0bb7f43b6 100644 --- a/src/replica/replica_stub.h +++ b/src/replica/replica_stub.h @@ -27,7 +27,6 @@ #pragma once #include -#include #include #include #include @@ -122,11 +121,10 @@ class test_checker; } class cold_backup_context; class replica_split_manager; -typedef std::function - replica_state_subscriber; -typedef std::unordered_map replicas; + +using replica_state_subscriber = std::function; class replica_stub; @@ -397,15 +395,17 @@ class replica_stub : public serverlet, public ref_counter // directory for a replica. virtual replica_ptr load_replica(dir_node *dn, const std::string &replica_dir); + using replica_map_by_gpid = std::unordered_map; + // The same as the above `load_replica` function, except that this function is to load // each replica to `reps` with protection from `reps_lock`. void load_replica(dir_node *dn, const std::string &replica_dir, utils::ex_lock &reps_lock, - replicas &reps); + replica_map_by_gpid &reps); // Load all replicas simultaneously from all disks to `reps`. - void load_replicas(replicas &reps); + void load_replicas(replica_map_by_gpid &reps); // Clean up the memory state and on disk data if creating replica failed. void clear_on_failure(replica *rep); @@ -497,17 +497,19 @@ class replica_stub : public serverlet, public ref_counter FRIEND_TEST(replica_test, test_auto_trash_of_corruption); FRIEND_TEST(replica_test, test_clear_on_failure); - typedef std::unordered_map opening_replicas; - typedef std::unordered_map> - closing_replicas; // > - typedef std::map> - closed_replicas; // > + using opening_replica_map_by_gpid = std::unordered_map; + + // `task_ptr` is the task being closed. + using closing_replica_map_by_gpid = + std::unordered_map>; + + using closed_replica_map_by_gpid = std::map>; mutable zrwlock_nr _replicas_lock; - replicas _replicas; - opening_replicas _opening_replicas; - closing_replicas _closing_replicas; - closed_replicas _closed_replicas; + replica_map_by_gpid _replicas; + opening_replica_map_by_gpid _opening_replicas; + closing_replica_map_by_gpid _closing_replicas; + closed_replica_map_by_gpid _closed_replicas; ::dsn::host_port _primary_host_port; // The stringify of '_primary_host_port', used by logging usually. diff --git a/src/replica/test/load_replicas_test.cpp b/src/replica/test/load_replicas_test.cpp index a6ca7aecc3..2e6c08808a 100644 --- a/src/replica/test/load_replicas_test.cpp +++ b/src/replica/test/load_replicas_test.cpp @@ -103,7 +103,7 @@ class mock_load_replica : public replica_stub PRESERVE_FLAG(max_replicas_on_load_for_each_disk); FLAGS_max_replicas_on_load_for_each_disk = max_replicas_on_load_for_each_disk; - replicas actual_loaded_replicas; + replica_stub::replica_map_by_gpid actual_loaded_replicas; load_replicas(actual_loaded_replicas); ASSERT_EQ(_expected_loaded_replicas, actual_loaded_replicas); @@ -197,7 +197,7 @@ class mock_load_replica : public replica_stub std::vector _disk_loaded_replicas_for_order; mutable std::mutex _mtx; - replicas _expected_loaded_replicas; + replica_stub::replica_map_by_gpid _expected_loaded_replicas; DISALLOW_COPY_AND_ASSIGN(mock_load_replica); DISALLOW_MOVE_AND_ASSIGN(mock_load_replica);