Skip to content

Commit

Permalink
fix load replicas and tests
Browse files Browse the repository at this point in the history
  • Loading branch information
empiredan committed Dec 16, 2024
1 parent ae9445a commit ae08126
Show file tree
Hide file tree
Showing 2 changed files with 31 additions and 18 deletions.
43 changes: 28 additions & 15 deletions src/replica/replica_stub.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -527,7 +527,15 @@ void replica_stub::load_replicas(replicas &reps)
const auto &disks = get_all_disk_dirs();

std::vector<size_t> dir_indexes(disks.size(), 0);
std::vector<std::queue<std::pair<std::string, task_ptr>>> load_disk_queues(disks.size());

struct replica_dir_loader
{
size_t dir_index;
std::string dir_path;
task_ptr load_task;
};
std::vector<std::queue<replica_dir_loader>> load_disk_queues(disks.size());

utils::ex_lock reps_lock;

while (true) {
Expand Down Expand Up @@ -559,15 +567,17 @@ void replica_stub::load_replicas(replicas &reps)
auto &load_disk_queue = load_disk_queues[disk_index];
if (load_disk_queue.size() >= FLAGS_max_replicas_on_load_for_each_disk) {
// Loading replicas should be throttled in case that disk IO is saturated.
if (!load_disk_queue.front().second->wait(
if (!load_disk_queue.front().load_task->wait(
static_cast<int>(FLAGS_load_replica_max_wait_time_ms))) {
// There might be too many replicas that are being loaded which lead to
// slow disk IO.
LOG_WARNING("after {} ms, loading dir({}) is still not finished, there are "
"{} replicas being loaded for disk(index={}, tag={}, path={}), "
"skip dir(index={}, path={}), turn to next disk",
LOG_WARNING("after {} ms, loading dir(index={}, path={}) is still not "
"finished, there are {} replicas being loaded for disk(index"
"={}, tag={}, path={}), skip dir(index={}, path={}), turn to "
"next disk",
FLAGS_load_replica_max_wait_time_ms,
load_disk_queue.front().first,
load_disk_queue.front().dir_index,
load_disk_queue.front().dir_path,
load_disk_queue.size(),
disk_index,
dn->tag,
Expand All @@ -581,9 +591,9 @@ void replica_stub::load_replicas(replicas &reps)
load_disk_queue.pop();
}

const auto &dir = dirs[dir_index++];
if (dsn::replication::is_data_dir_invalid(dir)) {
LOG_WARNING("ignore dir {}", dir);
if (dsn::replication::is_data_dir_invalid(dirs[dir_index])) {
LOG_WARNING("ignore dir(index={}, path={})", dir_index, dirs[dir_index]);
++dir_index;
continue;
}

Expand All @@ -594,8 +604,9 @@ void replica_stub::load_replicas(replicas &reps)
dn->tag,
dn->full_dir);

load_disk_queue.emplace(
dir,
load_disk_queue.push(replica_dir_loader{
dir_index,
dirs[dir_index],
tasking::create_task(
// Ensure that the thread pool is non-partitioned.
LPC_REPLICATION_INIT_LOAD,
Expand All @@ -605,11 +616,13 @@ void replica_stub::load_replicas(replicas &reps)
&replica_stub::load_replica),
this,
dn,
dir,
dirs[dir_index],
std::ref(reps_lock),
std::ref(reps))));
std::ref(reps)))});

load_disk_queue.back().load_task->enqueue();

load_disk_queue.back().second->enqueue();
++dir_index;
}

if (finished_disks >= disks.size()) {
Expand All @@ -621,7 +634,7 @@ void replica_stub::load_replicas(replicas &reps)
// All loading tasks have been in the queue. Just wait all tasks to be finished.
for (auto &load_disk_queue : load_disk_queues) {
while (!load_disk_queue.empty()) {
CHECK_TRUE(load_disk_queue.front().second->wait());
CHECK_TRUE(load_disk_queue.front().load_task->wait());
load_disk_queue.pop();
}
}
Expand Down
6 changes: 3 additions & 3 deletions src/replica/test/load_replicas_test.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -214,13 +214,13 @@ class LoadReplicasTest : public testing::TestWithParam<load_replicas_case>
public:
LoadReplicasTest()
{
_stub.remove_disk_dirs();

const auto &load_case = GetParam();
_stub.initialize(load_case.dirs_by_tag, load_case.replicas_by_tag);
}

~LoadReplicasTest() override = default;

void TearDown() override { _stub.remove_disk_dirs(); }
~LoadReplicasTest() override { _stub.remove_disk_dirs(); }

void test_load_replicas(bool test_load_order, uint64_t max_replicas_on_load_for_each_disk)
{
Expand Down

0 comments on commit ae08126

Please sign in to comment.