Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(new_metircs): collect the number of primary and secondary replicas #2161

Merged
merged 2 commits into from
Dec 4, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
60 changes: 51 additions & 9 deletions src/replica/replica_stub.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -115,6 +115,26 @@ METRIC_DEFINE_gauge_int64(server,
dsn::metric_unit::kReplicas,
"The number of closing replicas");

METRIC_DEFINE_gauge_int64(server,
inactive_replicas,
dsn::metric_unit::kReplicas,
"The number of inactive replicas");

METRIC_DEFINE_gauge_int64(server,
error_replicas,
dsn::metric_unit::kReplicas,
"The number of replicas with errors");

METRIC_DEFINE_gauge_int64(server,
primary_replicas,
dsn::metric_unit::kReplicas,
"The number of primary replicas");

METRIC_DEFINE_gauge_int64(server,
secondary_replicas,
dsn::metric_unit::kReplicas,
"The number of secondary replicas");

METRIC_DEFINE_gauge_int64(server,
learning_replicas,
dsn::metric_unit::kReplicas,
Expand Down Expand Up @@ -236,7 +256,6 @@ DSN_DECLARE_int32(fd_beacon_interval_seconds);
DSN_DECLARE_int32(fd_check_interval_seconds);
DSN_DECLARE_int32(fd_grace_seconds);
DSN_DECLARE_int32(fd_lease_seconds);
DSN_DECLARE_int32(gc_interval_ms);
DSN_DECLARE_string(data_dirs);
DSN_DECLARE_string(encryption_cluster_key_name);
DSN_DECLARE_string(server_key);
Expand Down Expand Up @@ -322,6 +341,13 @@ bool check_mem_release_max_reserved_mem_percentage(int32_t value)
DSN_DEFINE_validator(mem_release_max_reserved_mem_percentage,
&check_mem_release_max_reserved_mem_percentage);

DSN_DEFINE_uint32(replication,
replicas_stat_interval_ms,
30000,
"period in milliseconds that stats for replicas are calculated");
DSN_TAG_VARIABLE(replicas_stat_interval_ms, FT_MUTABLE);
DSN_DEFINE_validator(replicas_stat_interval_ms, [](uint32_t value) -> bool { return value > 0; });

DSN_DEFINE_string(
pegasus.server,
hadoop_kms_url,
Expand Down Expand Up @@ -368,6 +394,10 @@ replica_stub::replica_stub(replica_state_subscriber subscriber /*= nullptr*/,
METRIC_VAR_INIT_server(total_replicas),
METRIC_VAR_INIT_server(opening_replicas),
METRIC_VAR_INIT_server(closing_replicas),
METRIC_VAR_INIT_server(inactive_replicas),
METRIC_VAR_INIT_server(error_replicas),
METRIC_VAR_INIT_server(primary_replicas),
METRIC_VAR_INIT_server(secondary_replicas),
METRIC_VAR_INIT_server(learning_replicas),
METRIC_VAR_INIT_server(learning_replicas_max_duration_ms),
METRIC_VAR_INIT_server(learning_replicas_max_copy_file_bytes),
Expand Down Expand Up @@ -605,9 +635,9 @@ void replica_stub::initialize(const replication_options &opts, bool clear /* = f
LPC_GARBAGE_COLLECT_LOGS_AND_REPLICAS,
&_tracker,
[this] { on_replicas_stat(); },
std::chrono::milliseconds(FLAGS_gc_interval_ms),
std::chrono::milliseconds(FLAGS_replicas_stat_interval_ms),
0,
std::chrono::milliseconds(rand::next_u32(0, FLAGS_gc_interval_ms)));
std::chrono::milliseconds(rand::next_u32(0, FLAGS_replicas_stat_interval_ms)));
}

// disk stat
Expand Down Expand Up @@ -1625,26 +1655,30 @@ void replica_stub::on_replicas_stat()
LOG_INFO("start replicas statistics, replica_count = {}", rep_stat_info_by_gpid.size());

// statistic learning info
uint64_t learning_count = 0;
uint64_t learning_max_duration_time_ms = 0;
uint64_t learning_max_copy_file_size = 0;
uint64_t bulk_load_running_count = 0;
uint64_t bulk_load_max_ingestion_time_ms = 0;
uint64_t bulk_load_max_duration_time_ms = 0;
uint64_t splitting_count = 0;
uint64_t splitting_max_duration_time_ms = 0;
uint64_t splitting_max_async_learn_time_ms = 0;
uint64_t splitting_max_copy_file_size = 0;

std::map<partition_status::type, size_t> status_counts;
for (const auto &[_, rep_stat_info] : rep_stat_info_by_gpid) {
const auto &rep = rep_stat_info.rep;
++status_counts[rep->status()];

if (rep->status() == partition_status::PS_POTENTIAL_SECONDARY) {
learning_count++;
learning_max_duration_time_ms = std::max(
learning_max_duration_time_ms, rep->_potential_secondary_states.duration_ms());
learning_max_copy_file_size =
std::max(learning_max_copy_file_size,
rep->_potential_secondary_states.learning_copy_file_size);

continue;
}

if (rep->status() == partition_status::PS_PRIMARY ||
rep->status() == partition_status::PS_SECONDARY) {
if (rep->get_bulk_loader()->get_bulk_load_status() != bulk_load_status::BLS_INVALID) {
Expand All @@ -1654,26 +1688,34 @@ void replica_stub::on_replicas_stat()
bulk_load_max_duration_time_ms =
std::max(bulk_load_max_duration_time_ms, rep->get_bulk_loader()->duration_ms());
}

continue;
}

// splitting_max_copy_file_size, rep->_split_states.copy_file_size
if (rep->status() == partition_status::PS_PARTITION_SPLIT) {
splitting_count++;
splitting_max_duration_time_ms =
std::max(splitting_max_duration_time_ms, rep->_split_states.total_ms());
splitting_max_async_learn_time_ms =
std::max(splitting_max_async_learn_time_ms, rep->_split_states.async_learn_ms());
splitting_max_copy_file_size =
std::max(splitting_max_copy_file_size, rep->_split_states.splitting_copy_file_size);

continue;
}
}

METRIC_VAR_SET(learning_replicas, learning_count);
METRIC_VAR_SET(inactive_replicas, status_counts[partition_status::PS_INACTIVE]);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

These METRIC_VAR_SET will be skipped if using continue above, is it in purpose?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

continue is in for loop, and METRIC_VAR_SET is out of for loop, right ?

METRIC_VAR_SET(error_replicas, status_counts[partition_status::PS_ERROR]);
METRIC_VAR_SET(primary_replicas, status_counts[partition_status::PS_PRIMARY]);
METRIC_VAR_SET(secondary_replicas, status_counts[partition_status::PS_SECONDARY]);
METRIC_VAR_SET(learning_replicas, status_counts[partition_status::PS_POTENTIAL_SECONDARY]);
METRIC_VAR_SET(learning_replicas_max_duration_ms, learning_max_duration_time_ms);
METRIC_VAR_SET(learning_replicas_max_copy_file_bytes, learning_max_copy_file_size);
METRIC_VAR_SET(bulk_load_running_count, bulk_load_running_count);
METRIC_VAR_SET(bulk_load_ingestion_max_duration_ms, bulk_load_max_ingestion_time_ms);
METRIC_VAR_SET(bulk_load_max_duration_ms, bulk_load_max_duration_time_ms);
METRIC_VAR_SET(splitting_replicas, splitting_count);
METRIC_VAR_SET(splitting_replicas, status_counts[partition_status::PS_PARTITION_SPLIT]);
METRIC_VAR_SET(splitting_replicas_max_duration_ms, splitting_max_duration_time_ms);
METRIC_VAR_SET(splitting_replicas_async_learn_max_duration_ms,
splitting_max_async_learn_time_ms);
Expand Down
4 changes: 4 additions & 0 deletions src/replica/replica_stub.h
Original file line number Diff line number Diff line change
Expand Up @@ -528,6 +528,10 @@ class replica_stub : public serverlet<replica_stub>, public ref_counter
METRIC_VAR_DECLARE_gauge_int64(opening_replicas);
METRIC_VAR_DECLARE_gauge_int64(closing_replicas);

METRIC_VAR_DECLARE_gauge_int64(inactive_replicas);
METRIC_VAR_DECLARE_gauge_int64(error_replicas);
METRIC_VAR_DECLARE_gauge_int64(primary_replicas);
METRIC_VAR_DECLARE_gauge_int64(secondary_replicas);
METRIC_VAR_DECLARE_gauge_int64(learning_replicas);
METRIC_VAR_DECLARE_gauge_int64(learning_replicas_max_duration_ms);
METRIC_VAR_DECLARE_gauge_int64(learning_replicas_max_copy_file_bytes);
Expand Down
Loading