Skip to content

Commit

Permalink
fix clang-tidy, add timer and comments
Browse files Browse the repository at this point in the history
  • Loading branch information
empiredan committed Dec 19, 2024
1 parent 2fd42b9 commit caeafdc
Show file tree
Hide file tree
Showing 2 changed files with 106 additions and 62 deletions.
160 changes: 101 additions & 59 deletions src/replica/replica_stub.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -31,11 +31,11 @@
#include <fmt/format.h>
#include <nlohmann/json.hpp>
#include <rapidjson/ostreamwrapper.h>
#include <stdio.h>
#include <stdlib.h>
#include <algorithm>
#include <chrono>
#include <cstdint>
#include <cstdio>
#include <cstdlib>
#include <iterator>
#include <mutex>
#include <queue>
Expand Down Expand Up @@ -88,7 +88,7 @@
#include "utils/strings.h"
#include "utils/synchronize.h"
#include "utils/threadpool_spec.h"
#include "utils/time_utils.h"
#include "utils/timer.h"
#ifdef DSN_ENABLE_GPERF
#include <gperftools/malloc_extension.h>
#elif defined(DSN_USE_JEMALLOC)
Expand Down Expand Up @@ -411,8 +411,11 @@ bool replica_stub::s_not_exit_on_log_failure = false;

namespace {

// Register commands that get/set flag configurations.
void register_flags_ctrl_command()
{
// For the reaonse why using std::call_once please see comments in
// replica_stub::register_ctrl_command() for details.
static std::once_flag flag;
std::call_once(flag, []() mutable {
dsn::command_manager::instance().add_global_cmd(
Expand Down Expand Up @@ -450,7 +453,7 @@ replica_stub::replica_stub(replica_state_subscriber subscriber /*= nullptr*/,
bool is_long_subscriber /* = true*/)
: serverlet("replica_stub"),
_state(NS_Disconnected),
_replica_state_subscriber(subscriber),
_replica_state_subscriber(std::move(subscriber)),
_is_long_subscriber(is_long_subscriber),
_deny_client(false),
_verbose_client_log(false),
Expand Down Expand Up @@ -512,30 +515,34 @@ void replica_stub::initialize(bool clear /* = false*/)
std::vector<replica_stub::disk_replicas_info> replica_stub::get_all_disk_dirs() const
{
std::vector<disk_replicas_info> disks;
for (const auto &dn : _fs_manager.get_dir_nodes()) {
if (dsn_unlikely(dn->status == disk_status::IO_ERROR)) {
for (const auto &disk_node : _fs_manager.get_dir_nodes()) {
if (dsn_unlikely(disk_node->status == disk_status::IO_ERROR)) {
// Skip disks with IO errors.
continue;
}

std::vector<std::string> sub_dirs;
CHECK(dsn::utils::filesystem::get_subdirectories(dn->full_dir, sub_dirs, false),
CHECK(dsn::utils::filesystem::get_subdirectories(disk_node->full_dir, sub_dirs, false),
"failed to get sub_directories in {}",
dn->full_dir);
disks.push_back(disk_replicas_info{dn.get(), std::move(sub_dirs)});
disk_node->full_dir);
disks.push_back(disk_replicas_info{disk_node.get(), std::move(sub_dirs)});
}

return disks;
}

// TaskCode: LPC_REPLICATION_INIT_LOAD
// ThreadPool: THREAD_POOL_LOCAL_APP
void replica_stub::load_replica(dir_node *dn,
void replica_stub::load_replica(dir_node *disk_node,
const std::string &replica_dir,
const size_t total_dir_count,
utils::ex_lock &reps_lock,
replica_map_by_gpid &reps)
replica_map_by_gpid &reps,
std::atomic<size_t> &finished_dir_count)
{
LOG_INFO("loading replica: tag={}, replica_dir={}", dn->tag, replica_dir);
SCOPED_LOG_TIMING(INFO, "on loading {}:{}", disk_node->tag, replica_dir);

LOG_INFO("loading replica: replica_dir={}:{}", disk_node->tag, replica_dir);

const auto *const worker = task::get_current_worker2();
if (worker != nullptr) {
Expand All @@ -545,17 +552,24 @@ void replica_stub::load_replica(dir_node *dn,
"among multiple threads");
}

auto rep = load_replica(dn, replica_dir);
auto rep = load_replica(disk_node, replica_dir);
if (rep == nullptr) {
LOG_INFO("load replica failed: replica_dir={}:{}, progress={}/{}",
disk_node->tag,
replica_dir,
++finished_dir_count,
total_dir_count);
return;
}

LOG_INFO("{}@{}: load replica successfully, tag={}, replica_dir={}, last_durable_decree={}, "
"last_committed_decree={}, last_prepared_decree={}",
LOG_INFO("{}@{}: load replica successfully, replica_dir={}:{}, progress={}/{}, "
"last_durable_decree={}, last_committed_decree={}, last_prepared_decree={}",
rep->get_gpid(),
dsn_primary_host_port(),
dn->tag,
disk_node->tag,
replica_dir,
++finished_dir_count,
total_dir_count,
rep->last_durable_decree(),
rep->last_committed_decree(),
rep->last_prepared_decree());
Expand All @@ -574,9 +588,12 @@ void replica_stub::load_replica(dir_node *dn,

void replica_stub::load_replicas(replica_map_by_gpid &reps)
{
SCOPED_LOG_TIMING(INFO, "on loading replicas");

const auto &disks = get_all_disk_dirs();

// The index of currently loaded replica dir for each disk. Once current replica
// The max index of dirs that are currently being loaded for each disk. The dirs with
// higher indexes have not begun to be loaded (namely pushed into the queue).
std::vector<size_t> replica_dir_indexes(disks.size(), 0);

struct replica_dir_loader
Expand All @@ -586,9 +603,19 @@ void replica_stub::load_replicas(replica_map_by_gpid &reps)
task_ptr load_replica_task;
};

//
// Each queue would cache the tasks that loading dirs for each disk. Once the task is
// found finished (namely a dir has been loaded successfully), it would be popped from
// the queue.
std::vector<std::queue<replica_dir_loader>> load_disk_queues(disks.size());

// TODO(wangdan): calculate the number of successful or failed loading of replica dirs,
// and the number for each reason if failed.
std::vector<std::atomic<size_t>> finished_replica_dirs(disks.size());
for (auto &count : finished_replica_dirs) {
count.store(0);
}

// The lock for operations on the loaded replicas as output.
utils::ex_lock reps_lock;

while (true) {
Expand All @@ -602,80 +629,96 @@ void replica_stub::load_replicas(replica_map_by_gpid &reps)
// both variables until clang has been upgraded to version 16 which could support
// that well:
//
// const auto &[dn, dirs] = disks[disk_index];
// const auto &[disk_node, replica_dirs] = disks[disk_index];
//
// For the docs of clang 16 please see:
//
// https://releases.llvm.org/16.0.0/tools/clang/docs/ReleaseNotes.html#c-20-feature-support.
const auto &dirs = disks[disk_index].replica_dirs;
const auto &replica_dirs = disks[disk_index].replica_dirs;

auto &replica_dir_index = replica_dir_indexes[disk_index];
if (replica_dir_index >= dirs.size()) {
if (replica_dir_index >= replica_dirs.size()) {
// All of the replicas for the disk `disks[disk_index]` have begun to be loaded,
// thus just skip.
++finished_disks;
continue;
}

const auto &dn = disks[disk_index].disk_node;
const auto &disk_node = disks[disk_index].disk_node;
auto &load_disk_queue = load_disk_queues[disk_index];
if (load_disk_queue.size() >= FLAGS_max_replicas_on_load_for_each_disk) {
// Loading replicas should be throttled in case that disk IO is saturated.
if (!load_disk_queue.front().load_replica_task->wait(
static_cast<int>(FLAGS_load_replica_max_wait_time_ms))) {
// There might be too many replicas that are being loaded which lead to
// slow disk IO.
LOG_WARNING("after {} ms, loading dir(index={}, path={}) is still not "
"finished, there are {} replicas being loaded for disk(index"
"={}, tag={}, path={}), skip dir(index={}, path={}), turn to "
"next disk",
// slow disk IO, thus turn to load replicas of next disk, and try to load
// dir `replica_dir_index` of this disk in the next round.
LOG_WARNING("after {} ms, loading dir({}, {}/{}) is still not finished, "
"there are {} replicas being loaded for disk({}:{}, {}/{}), "
"now turn to next disk, and will begin to load dir({}, {}/{}) "
"soon",
FLAGS_load_replica_max_wait_time_ms,
load_disk_queue.front().replica_dir_index,
load_disk_queue.front().replica_dir_path,
load_disk_queue.front().replica_dir_index,
replica_dirs.size(),
load_disk_queue.size(),
disk_node->tag,
disk_node->full_dir,
disk_index,
dn->tag,
dn->full_dir,
disks.size(),
replica_dirs[replica_dir_index],
replica_dir_index,
dirs[replica_dir_index]);
replica_dirs.size());
continue;
}

// Continue to load a replica since we are within the limit now.
// Now the queue size is within the limit again, continue to load a new replica dir.
load_disk_queue.pop();
}

if (dsn::replication::is_data_dir_invalid(dirs[replica_dir_index])) {
LOG_WARNING(
"ignore dir(index={}, path={})", replica_dir_index, dirs[replica_dir_index]);
if (dsn::replication::is_data_dir_invalid(replica_dirs[replica_dir_index])) {
LOG_WARNING("ignore dir({}, {}/{}) for disk({}:{}, {}/{})",
replica_dirs[replica_dir_index],
replica_dir_index,
replica_dirs.size(),
disk_node->tag,
disk_node->full_dir,
disk_index,
disks.size());
++replica_dir_index;
continue;
}

LOG_DEBUG("ready to load dir(index={}, path={}) for disk(index={}, tag={}, path={})",
LOG_DEBUG("ready to load dir({}, {}/{}) for disk({}:{}, {}/{})",
replica_dirs[replica_dir_index],
replica_dir_index,
dirs[replica_dir_index],
replica_dirs.size(),
disk_node->tag,
disk_node->full_dir,
disk_index,
dn->tag,
dn->full_dir);
disks.size());

load_disk_queue.push(replica_dir_loader{
replica_dir_index,
dirs[replica_dir_index],
replica_dirs[replica_dir_index],
tasking::create_task(
// Ensure that the thread pool is non-partitioned.
LPC_REPLICATION_INIT_LOAD,
&_tracker,
std::bind(static_cast<void (replica_stub::*)(dir_node *,
const std::string &,
const size_t,
utils::ex_lock &,
replica_map_by_gpid &)>(
replica_map_by_gpid &,
std::atomic<size_t> &)>(
&replica_stub::load_replica),
this,
dn,
dirs[replica_dir_index],
disk_node,
replica_dirs[replica_dir_index],
replica_dirs.size(),
std::ref(reps_lock),
std::ref(reps)))});
std::ref(reps),
std::ref(finished_replica_dirs[disk_index])))});

load_disk_queue.back().load_replica_task->enqueue();

Expand Down Expand Up @@ -783,13 +826,9 @@ void replica_stub::initialize(const replication_options &opts, bool clear /* = f
LOG_INFO("start to load replicas");

replica_map_by_gpid reps;

utils::chronograph chrono;
load_replicas(reps);

LOG_INFO("load replicas succeed, replica_count = {}, time_used = {} ms",
reps.size(),
chrono.duration_ms());
LOG_INFO("load replicas succeed, replica_count = {}", reps.size());

bool is_log_complete = true;
for (auto it = reps.begin(); it != reps.end(); ++it) {
Expand Down Expand Up @@ -2278,7 +2317,7 @@ bool replica_stub::validate_replica_dir(const std::string &dir,
return true;
}

replica_ptr replica_stub::load_replica(dir_node *dn, const std::string &replica_dir)
replica_ptr replica_stub::load_replica(dir_node *disk_node, const std::string &replica_dir)
{
FAIL_POINT_INJECT_F("mock_replica_load",
[&](std::string_view) -> replica * { return nullptr; });
Expand All @@ -2292,14 +2331,14 @@ replica_ptr replica_stub::load_replica(dir_node *dn, const std::string &replica_
}

// The replica's directory must exist when creating a replica.
CHECK_EQ(dn->replica_dir(ai.app_type, pid), replica_dir);
CHECK_EQ(disk_node->replica_dir(ai.app_type, pid), replica_dir);

auto *rep = new replica(this, pid, ai, dn, false);
auto *rep = new replica(this, pid, ai, disk_node, false);
const auto err = rep->initialize_on_load();
if (err != ERR_OK) {
LOG_ERROR("{}: load replica failed, tag={}, replica_dir={}, err={}",
rep->name(),
dn->tag,
disk_node->tag,
replica_dir,
err);
delete rep;
Expand All @@ -2315,7 +2354,10 @@ replica_ptr replica_stub::load_replica(dir_node *dn, const std::string &replica_
return nullptr;
}

LOG_INFO("{}: load replica succeed, tag={}, replica_dir={}", rep->name(), dn->tag, replica_dir);
LOG_INFO("{}: load replica succeed, tag={}, replica_dir={}",
rep->name(),
disk_node->tag,
replica_dir);
return rep;
}

Expand Down Expand Up @@ -2713,7 +2755,7 @@ replica_stub::exec_command_on_replica(const std::vector<std::string> &arg_str_li

gpid id;
if (id.parse_from(arg.c_str())) {
// app_id.partition_index
// Format: app_id.partition_index
required_ids.insert(id);
auto find = rs.find(id);
if (find != rs.end()) {
Expand All @@ -2723,15 +2765,15 @@ replica_stub::exec_command_on_replica(const std::vector<std::string> &arg_str_li
continue;
}

int pid = 0;
if (sscanf(arg.c_str(), "%d", &pid) != 1) {
// Must be app_id.
int32_t app_id = 0;
if (!buf2int32(arg, app_id)) {
return kInvalidArguments;
}

// app_id
for (const auto &[_, rep] : rs) {
id = rep->get_gpid();
if (id.get_app_id() == pid) {
if (id.get_app_id() == app_id) {
choosed_rs[id] = rep;
}
}
Expand Down
8 changes: 5 additions & 3 deletions src/replica/replica_stub.h
Original file line number Diff line number Diff line change
Expand Up @@ -394,16 +394,18 @@ class replica_stub : public serverlet<replica_stub>, public ref_counter
// Load an existing replica which is located in `dn` with `replica_dir`. Usually each
// different `dn` represents a unique disk. `replica_dir` is the absolute path of the
// directory for a replica.
virtual replica_ptr load_replica(dir_node *dn, const std::string &replica_dir);
virtual replica_ptr load_replica(dir_node *disk_node, const std::string &replica_dir);

using replica_map_by_gpid = std::unordered_map<gpid, replica_ptr>;

// The same as the above `load_replica` function, except that this function is to load
// each replica to `reps` with protection from `reps_lock`.
void load_replica(dir_node *dn,
void load_replica(dir_node *disk_node,
const std::string &replica_dir,
const size_t total_dir_count,
utils::ex_lock &reps_lock,
replica_map_by_gpid &reps);
replica_map_by_gpid &reps,
std::atomic<size_t> &finished_dir_count);

// Load all replicas simultaneously from all disks to `reps`.
void load_replicas(replica_map_by_gpid &reps);
Expand Down

0 comments on commit caeafdc

Please sign in to comment.