Skip to content

Commit

Permalink
cr1
Browse files Browse the repository at this point in the history
  • Loading branch information
acelyc111 committed Dec 14, 2023
1 parent ce8e0b6 commit 9db0895
Show file tree
Hide file tree
Showing 3 changed files with 29 additions and 38 deletions.
2 changes: 1 addition & 1 deletion src/common/replication_common.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ DSN_DEFINE_int32(
replication,
gc_interval_ms,
30 * 1000,
"every what period (ms) we do garbage collection for dead replicas, on-disk state, log, etc.");
"every what period (ms) we do replica stat. The name contains 'gc' is for legacy reason.");
DSN_DEFINE_int32(replication,
fd_check_interval_seconds,
2,
Expand Down
56 changes: 24 additions & 32 deletions src/replica/replica_stub.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -232,7 +232,10 @@ DSN_DEFINE_bool(replication,
log_shared_force_flush,
false,
"when write shared log, whether to flush file after write done");
DSN_DEFINE_bool(replication, gc_disabled, false, "whether to disable garbage collection");
DSN_DEFINE_bool(replication,
gc_disabled,
false,
"whether to disable replica stat. The name contains 'gc' is for legacy reason.");
DSN_DEFINE_bool(replication, disk_stat_disabled, false, "whether to disable disk stat");
DSN_DEFINE_bool(replication,
delay_for_fd_timeout_on_start,
Expand Down Expand Up @@ -504,12 +507,12 @@ void replica_stub::initialize(const replication_options &opts, bool clear /* = f
}
}

// gc
// replicas stat
if (!FLAGS_gc_disabled) {
_gc_timer_task = tasking::enqueue_timer(
_replicas_stat_timer_task = tasking::enqueue_timer(
LPC_GARBAGE_COLLECT_LOGS_AND_REPLICAS,
&_tracker,
[this] { on_gc(); },
[this] { on_replicas_stat(); },
std::chrono::milliseconds(FLAGS_gc_interval_ms),
0,
std::chrono::milliseconds(rand::next_u32(0, FLAGS_gc_interval_ms)));
Expand Down Expand Up @@ -1430,17 +1433,6 @@ void replica_stub::response_client(gpid id,
}
}

void replica_stub::init_gc_for_test()
{
CHECK(FLAGS_gc_disabled, "");

_gc_timer_task = tasking::enqueue(LPC_GARBAGE_COLLECT_LOGS_AND_REPLICAS,
&_tracker,
[this] { on_gc(); },
0,
std::chrono::milliseconds(FLAGS_gc_interval_ms));
}

void replica_stub::on_gc_replica(replica_stub_ptr this_, gpid id)
{
std::pair<app_info, replica_info> closed_info;
Expand Down Expand Up @@ -1483,27 +1475,27 @@ void replica_stub::on_gc_replica(replica_stub_ptr this_, gpid id)
}
}

void replica_stub::on_gc()
void replica_stub::on_replicas_stat()
{
uint64_t start = dsn_now_ns();

replica_gc_info_map replica_gc_map;
replica_stat_info_by_gpid rep_stat_info_by_gpid;
{
zauto_read_lock l(_replicas_lock);
// A replica was removed from _replicas before it would be closed by replica::close().
// Thus it's safe to use the replica after fetching its ref pointer from _replicas.
for (const auto &rep_pair : _replicas) {
const replica_ptr &rep = rep_pair.second;

auto &replica_gc = replica_gc_map[rep_pair.first];
replica_gc.rep = rep;
replica_gc.status = rep->status();
replica_gc.plog = rep->private_log();
replica_gc.last_durable_decree = rep->last_durable_decree();
for (const auto &replica : _replicas) {
const auto &rep = replica.second;

auto &rep_stat_info = rep_stat_info_by_gpid[replica.first];
rep_stat_info.rep = rep;
rep_stat_info.status = rep->status();
rep_stat_info.plog = rep->private_log();
rep_stat_info.last_durable_decree = rep->last_durable_decree();
}
}

LOG_INFO("start to garbage collection, replica_count = {}", replica_gc_map.size());
LOG_INFO("start replicas statistics, replica_count = {}", rep_stat_info_by_gpid.size());

// statistic learning info
uint64_t learning_count = 0;
Expand All @@ -1516,8 +1508,8 @@ void replica_stub::on_gc()
uint64_t splitting_max_duration_time_ms = 0;
uint64_t splitting_max_async_learn_time_ms = 0;
uint64_t splitting_max_copy_file_size = 0;
for (auto &kv : replica_gc_map) {
replica_ptr &rep = kv.second.rep;
for (const auto & [ _, rep_stat_info ] : rep_stat_info_by_gpid) {
const auto &rep = rep_stat_info.rep;
if (rep->status() == partition_status::PS_POTENTIAL_SECONDARY) {
learning_count++;
learning_max_duration_time_ms = std::max(
Expand Down Expand Up @@ -1560,7 +1552,7 @@ void replica_stub::on_gc()
splitting_max_async_learn_time_ms);
METRIC_VAR_SET(splitting_replicas_max_copy_file_bytes, splitting_max_copy_file_size);

LOG_INFO("finish to garbage collection, time_used_ns = {}", dsn_now_ns() - start);
LOG_INFO("finish replicas statistics, time used {}ns", dsn_now_ns() - start);
}

void replica_stub::on_disk_stat()
Expand Down Expand Up @@ -2417,9 +2409,9 @@ void replica_stub::close()
_disk_stat_timer_task = nullptr;
}

if (_gc_timer_task != nullptr) {
_gc_timer_task->cancel(true);
_gc_timer_task = nullptr;
if (_replicas_stat_timer_task != nullptr) {
_replicas_stat_timer_task->cancel(true);
_replicas_stat_timer_task = nullptr;
}

if (_mem_release_timer_task != nullptr) {
Expand Down
9 changes: 4 additions & 5 deletions src/replica/replica_stub.h
Original file line number Diff line number Diff line change
Expand Up @@ -176,13 +176,12 @@ class replica_stub : public serverlet<replica_stub>, public ref_counter
//
void on_meta_server_connected();
void on_meta_server_disconnected();
void on_gc();
void on_replicas_stat();
void on_disk_stat();

//
// routines published for test
//
void init_gc_for_test();
void set_meta_server_disconnected_for_test() { on_meta_server_disconnected(); }
void set_meta_server_connected_for_test(const configuration_query_by_node_response &config);
void set_replica_state_subscriber_for_test(replica_state_subscriber subscriber,
Expand Down Expand Up @@ -355,14 +354,14 @@ class replica_stub : public serverlet<replica_stub>, public ref_counter
replica_life_cycle get_replica_life_cycle(gpid id);
void on_gc_replica(replica_stub_ptr this_, gpid id);

struct replica_gc_info
struct replica_stat_info
{
replica_ptr rep;
partition_status::type status;
mutation_log_ptr plog;
decree last_durable_decree;
};
using replica_gc_info_map = std::unordered_map<gpid, replica_gc_info>;
using replica_stat_info_by_gpid = std::unordered_map<gpid, replica_stat_info>;

void response_client(gpid id,
bool is_read,
Expand Down Expand Up @@ -454,7 +453,7 @@ class replica_stub : public serverlet<replica_stub>, public ref_counter
// temproal states
::dsn::task_ptr _config_query_task;
::dsn::timer_task_ptr _config_sync_timer_task;
::dsn::task_ptr _gc_timer_task;
::dsn::task_ptr _replicas_stat_timer_task;
::dsn::task_ptr _disk_stat_timer_task;
::dsn::task_ptr _mem_release_timer_task;

Expand Down

0 comments on commit 9db0895

Please sign in to comment.