From 510dc78ac488f06b115b60a8fbc93d0b7dcf11eb Mon Sep 17 00:00:00 2001 From: Dan Wang Date: Wed, 18 Dec 2024 20:42:03 +0800 Subject: [PATCH] fix clang-tidy and add command to update configurations for loading replicas dynamically --- src/replica/replica_stub.cpp | 155 ++++++++++++++++++++++------------- src/replica/replica_stub.h | 3 +- src/utils/command_manager.h | 41 +++++---- 3 files changed, 127 insertions(+), 72 deletions(-) diff --git a/src/replica/replica_stub.cpp b/src/replica/replica_stub.cpp index 070f843f21..a01105db84 100644 --- a/src/replica/replica_stub.cpp +++ b/src/replica/replica_stub.cpp @@ -97,8 +97,18 @@ #include "remote_cmd/remote_command.h" #include "utils/fail_point.h" -static const char *kMaxConcurrentBulkLoadDownloadingCountDesc = - "The maximum concurrent bulk load downloading replica count"; +namespace { + +const char *kMaxConcurrentBulkLoadDownloadingCountDesc = + "The maximum concurrent bulk load downloading replica count."; + +const char *kMaxReplicasOnLoadForEachDiskDesc = + "The max number of replicas that are allowed to be loaded simultaneously for each disk dir."; + +const char *kLoadReplicaMaxWaitTimeMsDesc = "The max waiting time for replica loading to complete."; + +} // anonymous namespace + DSN_DEFINE_int32(replication, max_concurrent_bulk_load_downloading_count, 5, @@ -269,18 +279,15 @@ DSN_DECLARE_string(server_key); DSN_DEFINE_uint64(replication, max_replicas_on_load_for_each_disk, 256, - "The max number of replicas that are allowed to be loaded simultaneously " - "for each disk dir."); + kMaxReplicasOnLoadForEachDiskDesc); DSN_TAG_VARIABLE(max_replicas_on_load_for_each_disk, FT_MUTABLE); DSN_DEFINE_validator(max_replicas_on_load_for_each_disk, - [](uint64_t max_replicas_on_load_for_each_disk) -> bool { - return max_replicas_on_load_for_each_disk > 0; - }); + [](uint64_t value) -> bool { return value > 0; }); -DSN_DEFINE_uint64(replication, - load_replica_max_wait_time_ms, - 10, - "The max waiting time for replica loading to complete."); +DSN_DEFINE_uint64(replication, load_replica_max_wait_time_ms, 10, kLoadReplicaMaxWaitTimeMsDesc); +DSN_TAG_VARIABLE(load_replica_max_wait_time_ms, FT_MUTABLE); +DSN_DEFINE_validator(load_replica_max_wait_time_ms, + [](uint64_t value) -> bool { return value > 0; }); DSN_DEFINE_bool(replication, deny_client_on_start, @@ -401,6 +408,40 @@ namespace dsn { namespace replication { bool replica_stub::s_not_exit_on_log_failure = false; +namespace { + +void register_flags_ctrl_command() +{ + static std::once_flag flag; + static std::vector> cmds; + std::call_once(flag, []() mutable { + cmds.emplace_back(dsn::command_manager::instance().register_int_command( + FLAGS_max_replicas_on_load_for_each_disk, + FLAGS_max_replicas_on_load_for_each_disk, + "replica.max-replicas-on-load-for-each-disk", + kMaxReplicasOnLoadForEachDiskDesc)); + + cmds.emplace_back(dsn::command_manager::instance().register_int_command( + FLAGS_load_replica_max_wait_time_ms, + FLAGS_load_replica_max_wait_time_ms, + "replica.load-replica-max-wait-time-ms", + kLoadReplicaMaxWaitTimeMsDesc)); + + cmds.emplace_back(dsn::command_manager::instance().register_bool_command( + FLAGS_empty_write_disabled, + "replica.disable-empty-write", + "whether to disable empty writes")); + + cmds.emplace_back(::dsn::command_manager::instance().register_int_command( + FLAGS_max_concurrent_bulk_load_downloading_count, + FLAGS_max_concurrent_bulk_load_downloading_count, + "replica.max-concurrent-bulk-load-downloading-count", + kMaxConcurrentBulkLoadDownloadingCountDesc)); + }); +} + +} // anonymous namespace + replica_stub::replica_stub(replica_state_subscriber subscriber /*= nullptr*/, bool is_long_subscriber /* = true*/) : serverlet("replica_stub"), @@ -452,6 +493,8 @@ replica_stub::replica_stub(replica_state_subscriber subscriber /*= nullptr*/, _is_long_subscriber = is_long_subscriber; _failure_detector = nullptr; _state = NS_Disconnected; + + register_flags_ctrl_command(); } replica_stub::~replica_stub(void) { close(); } @@ -531,14 +574,17 @@ void replica_stub::load_replicas(replica_map_by_gpid &reps) { const auto &disks = get_all_disk_dirs(); - std::vector dir_indexes(disks.size(), 0); + // The index of currently loaded replica dir for each disk. Once current replica + std::vector replica_dir_indexes(disks.size(), 0); struct replica_dir_loader { - size_t dir_index; - std::string dir_path; - task_ptr load_task; + size_t replica_dir_index; + std::string replica_dir_path; + task_ptr load_replica_task; }; + + // std::vector> load_disk_queues(disks.size()); utils::ex_lock reps_lock; @@ -561,8 +607,8 @@ void replica_stub::load_replicas(replica_map_by_gpid &reps) // https://releases.llvm.org/16.0.0/tools/clang/docs/ReleaseNotes.html#c-20-feature-support. const auto &dirs = disks[disk_index].replica_dirs; - auto &dir_index = dir_indexes[disk_index]; - if (dir_index >= dirs.size()) { + auto &replica_dir_index = replica_dir_indexes[disk_index]; + if (replica_dir_index >= dirs.size()) { // All of the replicas for the disk `disks[disk_index]` have begun to be loaded, // thus just skip. ++finished_disks; @@ -573,7 +619,7 @@ void replica_stub::load_replicas(replica_map_by_gpid &reps) auto &load_disk_queue = load_disk_queues[disk_index]; if (load_disk_queue.size() >= FLAGS_max_replicas_on_load_for_each_disk) { // Loading replicas should be throttled in case that disk IO is saturated. - if (!load_disk_queue.front().load_task->wait( + if (!load_disk_queue.front().load_replica_task->wait( static_cast(FLAGS_load_replica_max_wait_time_ms))) { // There might be too many replicas that are being loaded which lead to // slow disk IO. @@ -582,14 +628,14 @@ void replica_stub::load_replicas(replica_map_by_gpid &reps) "={}, tag={}, path={}), skip dir(index={}, path={}), turn to " "next disk", FLAGS_load_replica_max_wait_time_ms, - load_disk_queue.front().dir_index, - load_disk_queue.front().dir_path, + load_disk_queue.front().replica_dir_index, + load_disk_queue.front().replica_dir_path, load_disk_queue.size(), disk_index, dn->tag, dn->full_dir, - dir_index, - dirs[dir_index]); + replica_dir_index, + dirs[replica_dir_index]); continue; } @@ -597,22 +643,23 @@ void replica_stub::load_replicas(replica_map_by_gpid &reps) load_disk_queue.pop(); } - if (dsn::replication::is_data_dir_invalid(dirs[dir_index])) { - LOG_WARNING("ignore dir(index={}, path={})", dir_index, dirs[dir_index]); - ++dir_index; + if (dsn::replication::is_data_dir_invalid(dirs[replica_dir_index])) { + LOG_WARNING( + "ignore dir(index={}, path={})", replica_dir_index, dirs[replica_dir_index]); + ++replica_dir_index; continue; } LOG_DEBUG("ready to load dir(index={}, path={}) for disk(index={}, tag={}, path={})", - dir_index, - dirs[dir_index], + replica_dir_index, + dirs[replica_dir_index], disk_index, dn->tag, dn->full_dir); load_disk_queue.push(replica_dir_loader{ - dir_index, - dirs[dir_index], + replica_dir_index, + dirs[replica_dir_index], tasking::create_task( // Ensure that the thread pool is non-partitioned. LPC_REPLICATION_INIT_LOAD, @@ -624,13 +671,13 @@ void replica_stub::load_replicas(replica_map_by_gpid &reps) &replica_stub::load_replica), this, dn, - dirs[dir_index], + dirs[replica_dir_index], std::ref(reps_lock), std::ref(reps)))}); - load_disk_queue.back().load_task->enqueue(); + load_disk_queue.back().load_replica_task->enqueue(); - ++dir_index; + ++replica_dir_index; } if (finished_disks >= disks.size()) { @@ -642,7 +689,7 @@ void replica_stub::load_replicas(replica_map_by_gpid &reps) // All loading tasks have been in the queue. Just wait all tasks to be finished. for (auto &load_disk_queue : load_disk_queues) { while (!load_disk_queue.empty()) { - CHECK_TRUE(load_disk_queue.front().load_task->wait()); + CHECK_TRUE(load_disk_queue.front().load_replica_task->wait()); load_disk_queue.pop(); } } @@ -2593,11 +2640,6 @@ void replica_stub::register_ctrl_command() }); })); - _cmds.emplace_back(::dsn::command_manager::instance().register_bool_command( - FLAGS_empty_write_disabled, - "replica.disable-empty-write", - "whether to disable empty writes")); - #ifdef DSN_ENABLE_GPERF _cmds.emplace_back(::dsn::command_manager::instance().register_bool_command( _release_tcmalloc_memory, @@ -2632,21 +2674,18 @@ void replica_stub::register_ctrl_command() #elif defined(DSN_USE_JEMALLOC) register_jemalloc_ctrl_command(); #endif - _cmds.emplace_back(::dsn::command_manager::instance().register_int_command( - FLAGS_max_concurrent_bulk_load_downloading_count, - FLAGS_max_concurrent_bulk_load_downloading_count, - "replica.max-concurrent-bulk-load-downloading-count", - kMaxConcurrentBulkLoadDownloadingCountDesc)); }); } std::string -replica_stub::exec_command_on_replica(const std::vector &args, +replica_stub::exec_command_on_replica(const std::vector &arg_str_list, bool allow_empty_args, std::function func) { - if (args.empty() && !allow_empty_args) { - return std::string("invalid arguments"); + static const std::string kInvalidArguments("invalid arguments"); + + if (arg_str_list.empty() && !allow_empty_args) { + return kInvalidArguments; } replica_map_by_gpid rs; @@ -2657,17 +2696,19 @@ replica_stub::exec_command_on_replica(const std::vector &args, std::set required_ids; replica_map_by_gpid choosed_rs; - if (!args.empty()) { - for (int i = 0; i < args.size(); i++) { - std::vector arg_strs; - utils::split_args(args[i].c_str(), arg_strs, ','); - if (arg_strs.empty()) { - return std::string("invalid arguments"); + if (!arg_str_list.empty()) { + for (const auto &arg_str : arg_str_list) { + std::vector args; + utils::split_args(arg_str.c_str(), args, ','); + if (args.empty()) { + return kInvalidArguments; } - for (const std::string &arg : arg_strs) { - if (arg.empty()) + for (const std::string &arg : args) { + if (arg.empty()) { continue; + } + gpid id; int pid; if (id.parse_from(arg.c_str())) { @@ -2686,7 +2727,7 @@ replica_stub::exec_command_on_replica(const std::vector &args, } } } else { - return std::string("invalid arguments"); + return kInvalidArguments; } } } @@ -2706,8 +2747,10 @@ replica_stub::exec_command_on_replica(const std::vector &args, [rep, &func, &results_lock, &results]() { partition_status::type status = rep->status(); if (status != partition_status::PS_PRIMARY && - status != partition_status::PS_SECONDARY) + status != partition_status::PS_SECONDARY) { return; + } + std::string result = func(rep); ::dsn::zauto_lock l(results_lock); auto &value = results[rep->get_gpid()]; diff --git a/src/replica/replica_stub.h b/src/replica/replica_stub.h index c0bb7f43b6..a6cc9bd917 100644 --- a/src/replica/replica_stub.h +++ b/src/replica/replica_stub.h @@ -118,7 +118,8 @@ typedef rpc_holder add_new_disk_rpc namespace test { class test_checker; -} +} // namespace test + class cold_backup_context; class replica_split_manager; diff --git a/src/utils/command_manager.h b/src/utils/command_manager.h index b971522d01..828727bcb1 100644 --- a/src/utils/command_manager.h +++ b/src/utils/command_manager.h @@ -65,14 +65,12 @@ class command_manager : public ::dsn::utils::singleton // 'validator' is used to validate the new value. // The value is reset to 'default_value' if passing "DEFAULT" argument. template - WARN_UNUSED_RESULT std::unique_ptr register_int_command( - T &value, - T default_value, - const std::string &command, - const std::string &help, - std::function validator = [](int64_t new_value) -> bool { - return new_value >= 0; - }) + WARN_UNUSED_RESULT std::unique_ptr + register_int_command(T &value, + T default_value, + const std::string &command, + const std::string &help, + std::function::type)> validator) { return register_single_command( command, @@ -83,6 +81,19 @@ class command_manager : public ::dsn::utils::singleton }); } + template + WARN_UNUSED_RESULT std::unique_ptr register_int_command( + T &value, T default_value, const std::string &command, const std::string &help) + { + return register_int_command(value, + default_value, + command, + help, + [](typename std::remove_reference::type new_value) -> bool { + return new_value >= 0; + }); + } + // Register a single 'command' with the 'help' description, its arguments are described in // 'args'. std::unique_ptr @@ -133,11 +144,12 @@ class command_manager : public ::dsn::utils::singleton set_bool(bool &value, const std::string &name, const std::vector &args); template - static std::string set_int(T &value, - T default_value, - const std::string &name, - const std::vector &args, - const std::function &validator) + static std::string + set_int(T &value, + T default_value, + const std::string &name, + const std::vector &args, + const std::function::type)> &validator) { nlohmann::json msg; msg["error"] = "ok"; @@ -164,8 +176,7 @@ class command_manager : public ::dsn::utils::singleton // Invalid argument. T new_value = 0; - if (!internal::buf2signed(args[0], new_value) || - !validator(static_cast(new_value))) { + if (!buf2numeric(args[0], new_value) || !validator(new_value)) { msg["error"] = fmt::format("ERR: invalid argument '{}', the value is not acceptable", args[0]); return msg.dump(2);