diff --git a/src/replica/replica_stub.cpp b/src/replica/replica_stub.cpp index 3f2428f9ad..101e91b629 100644 --- a/src/replica/replica_stub.cpp +++ b/src/replica/replica_stub.cpp @@ -31,11 +31,11 @@ #include #include #include -#include -#include #include #include #include +#include +#include #include #include #include @@ -88,7 +88,7 @@ #include "utils/strings.h" #include "utils/synchronize.h" #include "utils/threadpool_spec.h" -#include "utils/time_utils.h" +#include "utils/timer.h" #ifdef DSN_ENABLE_GPERF #include #elif defined(DSN_USE_JEMALLOC) @@ -411,8 +411,11 @@ bool replica_stub::s_not_exit_on_log_failure = false; namespace { +// Register commands that get/set flag configurations. void register_flags_ctrl_command() { + // For the reaonse why using std::call_once please see comments in + // replica_stub::register_ctrl_command() for details. static std::once_flag flag; std::call_once(flag, []() mutable { dsn::command_manager::instance().add_global_cmd( @@ -450,7 +453,7 @@ replica_stub::replica_stub(replica_state_subscriber subscriber /*= nullptr*/, bool is_long_subscriber /* = true*/) : serverlet("replica_stub"), _state(NS_Disconnected), - _replica_state_subscriber(subscriber), + _replica_state_subscriber(std::move(subscriber)), _is_long_subscriber(is_long_subscriber), _deny_client(false), _verbose_client_log(false), @@ -512,17 +515,17 @@ void replica_stub::initialize(bool clear /* = false*/) std::vector replica_stub::get_all_disk_dirs() const { std::vector disks; - for (const auto &dn : _fs_manager.get_dir_nodes()) { - if (dsn_unlikely(dn->status == disk_status::IO_ERROR)) { + for (const auto &disk_node : _fs_manager.get_dir_nodes()) { + if (dsn_unlikely(disk_node->status == disk_status::IO_ERROR)) { // Skip disks with IO errors. continue; } std::vector sub_dirs; - CHECK(dsn::utils::filesystem::get_subdirectories(dn->full_dir, sub_dirs, false), + CHECK(dsn::utils::filesystem::get_subdirectories(disk_node->full_dir, sub_dirs, false), "failed to get sub_directories in {}", - dn->full_dir); - disks.push_back(disk_replicas_info{dn.get(), std::move(sub_dirs)}); + disk_node->full_dir); + disks.push_back(disk_replicas_info{disk_node.get(), std::move(sub_dirs)}); } return disks; @@ -530,12 +533,16 @@ std::vector replica_stub::get_all_disk_dirs() // TaskCode: LPC_REPLICATION_INIT_LOAD // ThreadPool: THREAD_POOL_LOCAL_APP -void replica_stub::load_replica(dir_node *dn, +void replica_stub::load_replica(dir_node *disk_node, const std::string &replica_dir, + const size_t total_dir_count, utils::ex_lock &reps_lock, - replica_map_by_gpid &reps) + replica_map_by_gpid &reps, + std::atomic &finished_dir_count) { - LOG_INFO("loading replica: tag={}, replica_dir={}", dn->tag, replica_dir); + SCOPED_LOG_TIMING(INFO, "on loading {}:{}", disk_node->tag, replica_dir); + + LOG_INFO("loading replica: replica_dir={}:{}", disk_node->tag, replica_dir); const auto *const worker = task::get_current_worker2(); if (worker != nullptr) { @@ -545,17 +552,24 @@ void replica_stub::load_replica(dir_node *dn, "among multiple threads"); } - auto rep = load_replica(dn, replica_dir); + auto rep = load_replica(disk_node, replica_dir); if (rep == nullptr) { + LOG_INFO("load replica failed: replica_dir={}:{}, progress={}/{}", + disk_node->tag, + replica_dir, + ++finished_dir_count, + total_dir_count); return; } - LOG_INFO("{}@{}: load replica successfully, tag={}, replica_dir={}, last_durable_decree={}, " - "last_committed_decree={}, last_prepared_decree={}", + LOG_INFO("{}@{}: load replica successfully, replica_dir={}:{}, progress={}/{}, " + "last_durable_decree={}, last_committed_decree={}, last_prepared_decree={}", rep->get_gpid(), dsn_primary_host_port(), - dn->tag, + disk_node->tag, replica_dir, + ++finished_dir_count, + total_dir_count, rep->last_durable_decree(), rep->last_committed_decree(), rep->last_prepared_decree()); @@ -574,9 +588,12 @@ void replica_stub::load_replica(dir_node *dn, void replica_stub::load_replicas(replica_map_by_gpid &reps) { + SCOPED_LOG_TIMING(INFO, "on loading replicas"); + const auto &disks = get_all_disk_dirs(); - // The index of currently loaded replica dir for each disk. Once current replica + // The max index of dirs that are currently being loaded for each disk. The dirs with + // higher indexes have not begun to be loaded (namely pushed into the queue). std::vector replica_dir_indexes(disks.size(), 0); struct replica_dir_loader @@ -586,9 +603,19 @@ void replica_stub::load_replicas(replica_map_by_gpid &reps) task_ptr load_replica_task; }; - // + // Each queue would cache the tasks that loading dirs for each disk. Once the task is + // found finished (namely a dir has been loaded successfully), it would be popped from + // the queue. std::vector> load_disk_queues(disks.size()); + // TODO(wangdan): calculate the number of successful or failed loading of replica dirs, + // and the number for each reason if failed. + std::vector> finished_replica_dirs(disks.size()); + for (auto &count : finished_replica_dirs) { + count.store(0); + } + + // The lock for operations on the loaded replicas as output. utils::ex_lock reps_lock; while (true) { @@ -602,80 +629,96 @@ void replica_stub::load_replicas(replica_map_by_gpid &reps) // both variables until clang has been upgraded to version 16 which could support // that well: // - // const auto &[dn, dirs] = disks[disk_index]; + // const auto &[disk_node, replica_dirs] = disks[disk_index]; // // For the docs of clang 16 please see: // // https://releases.llvm.org/16.0.0/tools/clang/docs/ReleaseNotes.html#c-20-feature-support. - const auto &dirs = disks[disk_index].replica_dirs; + const auto &replica_dirs = disks[disk_index].replica_dirs; auto &replica_dir_index = replica_dir_indexes[disk_index]; - if (replica_dir_index >= dirs.size()) { + if (replica_dir_index >= replica_dirs.size()) { // All of the replicas for the disk `disks[disk_index]` have begun to be loaded, // thus just skip. ++finished_disks; continue; } - const auto &dn = disks[disk_index].disk_node; + const auto &disk_node = disks[disk_index].disk_node; auto &load_disk_queue = load_disk_queues[disk_index]; if (load_disk_queue.size() >= FLAGS_max_replicas_on_load_for_each_disk) { // Loading replicas should be throttled in case that disk IO is saturated. if (!load_disk_queue.front().load_replica_task->wait( static_cast(FLAGS_load_replica_max_wait_time_ms))) { // There might be too many replicas that are being loaded which lead to - // slow disk IO. - LOG_WARNING("after {} ms, loading dir(index={}, path={}) is still not " - "finished, there are {} replicas being loaded for disk(index" - "={}, tag={}, path={}), skip dir(index={}, path={}), turn to " - "next disk", + // slow disk IO, thus turn to load replicas of next disk, and try to load + // dir `replica_dir_index` of this disk in the next round. + LOG_WARNING("after {} ms, loading dir({}, {}/{}) is still not finished, " + "there are {} replicas being loaded for disk({}:{}, {}/{}), " + "now turn to next disk, and will begin to load dir({}, {}/{}) " + "soon", FLAGS_load_replica_max_wait_time_ms, - load_disk_queue.front().replica_dir_index, load_disk_queue.front().replica_dir_path, + load_disk_queue.front().replica_dir_index, + replica_dirs.size(), load_disk_queue.size(), + disk_node->tag, + disk_node->full_dir, disk_index, - dn->tag, - dn->full_dir, + disks.size(), + replica_dirs[replica_dir_index], replica_dir_index, - dirs[replica_dir_index]); + replica_dirs.size()); continue; } - // Continue to load a replica since we are within the limit now. + // Now the queue size is within the limit again, continue to load a new replica dir. load_disk_queue.pop(); } - if (dsn::replication::is_data_dir_invalid(dirs[replica_dir_index])) { - LOG_WARNING( - "ignore dir(index={}, path={})", replica_dir_index, dirs[replica_dir_index]); + if (dsn::replication::is_data_dir_invalid(replica_dirs[replica_dir_index])) { + LOG_WARNING("ignore dir({}, {}/{}) for disk({}:{}, {}/{})", + replica_dirs[replica_dir_index], + replica_dir_index, + replica_dirs.size(), + disk_node->tag, + disk_node->full_dir, + disk_index, + disks.size()); ++replica_dir_index; continue; } - LOG_DEBUG("ready to load dir(index={}, path={}) for disk(index={}, tag={}, path={})", + LOG_DEBUG("ready to load dir({}, {}/{}) for disk({}:{}, {}/{})", + replica_dirs[replica_dir_index], replica_dir_index, - dirs[replica_dir_index], + replica_dirs.size(), + disk_node->tag, + disk_node->full_dir, disk_index, - dn->tag, - dn->full_dir); + disks.size()); load_disk_queue.push(replica_dir_loader{ replica_dir_index, - dirs[replica_dir_index], + replica_dirs[replica_dir_index], tasking::create_task( // Ensure that the thread pool is non-partitioned. LPC_REPLICATION_INIT_LOAD, &_tracker, std::bind(static_cast( + replica_map_by_gpid &, + std::atomic &)>( &replica_stub::load_replica), this, - dn, - dirs[replica_dir_index], + disk_node, + replica_dirs[replica_dir_index], + replica_dirs.size(), std::ref(reps_lock), - std::ref(reps)))}); + std::ref(reps), + std::ref(finished_replica_dirs[disk_index])))}); load_disk_queue.back().load_replica_task->enqueue(); @@ -783,13 +826,9 @@ void replica_stub::initialize(const replication_options &opts, bool clear /* = f LOG_INFO("start to load replicas"); replica_map_by_gpid reps; - - utils::chronograph chrono; load_replicas(reps); - LOG_INFO("load replicas succeed, replica_count = {}, time_used = {} ms", - reps.size(), - chrono.duration_ms()); + LOG_INFO("load replicas succeed, replica_count = {}", reps.size()); bool is_log_complete = true; for (auto it = reps.begin(); it != reps.end(); ++it) { @@ -2278,7 +2317,7 @@ bool replica_stub::validate_replica_dir(const std::string &dir, return true; } -replica_ptr replica_stub::load_replica(dir_node *dn, const std::string &replica_dir) +replica_ptr replica_stub::load_replica(dir_node *disk_node, const std::string &replica_dir) { FAIL_POINT_INJECT_F("mock_replica_load", [&](std::string_view) -> replica * { return nullptr; }); @@ -2292,14 +2331,14 @@ replica_ptr replica_stub::load_replica(dir_node *dn, const std::string &replica_ } // The replica's directory must exist when creating a replica. - CHECK_EQ(dn->replica_dir(ai.app_type, pid), replica_dir); + CHECK_EQ(disk_node->replica_dir(ai.app_type, pid), replica_dir); - auto *rep = new replica(this, pid, ai, dn, false); + auto *rep = new replica(this, pid, ai, disk_node, false); const auto err = rep->initialize_on_load(); if (err != ERR_OK) { LOG_ERROR("{}: load replica failed, tag={}, replica_dir={}, err={}", rep->name(), - dn->tag, + disk_node->tag, replica_dir, err); delete rep; @@ -2315,7 +2354,10 @@ replica_ptr replica_stub::load_replica(dir_node *dn, const std::string &replica_ return nullptr; } - LOG_INFO("{}: load replica succeed, tag={}, replica_dir={}", rep->name(), dn->tag, replica_dir); + LOG_INFO("{}: load replica succeed, tag={}, replica_dir={}", + rep->name(), + disk_node->tag, + replica_dir); return rep; } @@ -2713,7 +2755,7 @@ replica_stub::exec_command_on_replica(const std::vector &arg_str_li gpid id; if (id.parse_from(arg.c_str())) { - // app_id.partition_index + // Format: app_id.partition_index required_ids.insert(id); auto find = rs.find(id); if (find != rs.end()) { @@ -2723,15 +2765,15 @@ replica_stub::exec_command_on_replica(const std::vector &arg_str_li continue; } - int pid = 0; - if (sscanf(arg.c_str(), "%d", &pid) != 1) { + // Must be app_id. + int32_t app_id = 0; + if (!buf2int32(arg, app_id)) { return kInvalidArguments; } - // app_id for (const auto &[_, rep] : rs) { id = rep->get_gpid(); - if (id.get_app_id() == pid) { + if (id.get_app_id() == app_id) { choosed_rs[id] = rep; } } diff --git a/src/replica/replica_stub.h b/src/replica/replica_stub.h index c798f39142..ed6702fa04 100644 --- a/src/replica/replica_stub.h +++ b/src/replica/replica_stub.h @@ -394,16 +394,18 @@ class replica_stub : public serverlet, public ref_counter // Load an existing replica which is located in `dn` with `replica_dir`. Usually each // different `dn` represents a unique disk. `replica_dir` is the absolute path of the // directory for a replica. - virtual replica_ptr load_replica(dir_node *dn, const std::string &replica_dir); + virtual replica_ptr load_replica(dir_node *disk_node, const std::string &replica_dir); using replica_map_by_gpid = std::unordered_map; // The same as the above `load_replica` function, except that this function is to load // each replica to `reps` with protection from `reps_lock`. - void load_replica(dir_node *dn, + void load_replica(dir_node *disk_node, const std::string &replica_dir, + const size_t total_dir_count, utils::ex_lock &reps_lock, - replica_map_by_gpid &reps); + replica_map_by_gpid &reps, + std::atomic &finished_dir_count); // Load all replicas simultaneously from all disks to `reps`. void load_replicas(replica_map_by_gpid &reps);