From 224cb818ea9797b20cf0db25a3c2352e0a0bb160 Mon Sep 17 00:00:00 2001 From: Yingchun Lai Date: Tue, 12 Dec 2023 18:38:40 +0800 Subject: [PATCH] feat(cpp-shell): make mlog_dump dump plog (#1760) Since Pegasus 2.5.0, the slog has been deprecated entirely, the plog is left. The 'mlog_dump' tool in cpp-shell is used to dump slog ever, this patch transfer it to dump plog. --- src/replica/replica.h | 2 + src/shell/commands/debugger.cpp | 65 ++++++++++++++++++++------------- src/shell/config.ini | 2 +- src/shell/main.cpp | 3 +- src/tools/mutation_log_tool.cpp | 62 ++++++++++++++++++++++++------- src/tools/mutation_log_tool.h | 6 ++- src/utils/time_utils.cpp | 3 -- 7 files changed, 96 insertions(+), 47 deletions(-) diff --git a/src/replica/replica.h b/src/replica/replica.h index 2057fca6cf..c4f3d2ff49 100644 --- a/src/replica/replica.h +++ b/src/replica/replica.h @@ -92,6 +92,7 @@ class learn_notify_response; class learn_request; class learn_response; class learn_state; +class mutation_log_tool; class replica; class replica_backup_manager; class replica_bulk_loader; @@ -529,6 +530,7 @@ class replica : public serverlet, public ref_counter, public replica_ba private: friend class ::dsn::replication::test::test_checker; + friend class ::dsn::replication::mutation_log_tool; friend class ::dsn::replication::mutation_queue; friend class ::dsn::replication::replica_stub; friend class mock_replica; diff --git a/src/shell/commands/debugger.cpp b/src/shell/commands/debugger.cpp index 4febb46401..8bfe9b2958 100644 --- a/src/shell/commands/debugger.cpp +++ b/src/shell/commands/debugger.cpp @@ -20,11 +20,6 @@ // IWYU pragma: no_include // TODO(yingchun): refactor this after libfmt upgraded #include // IWYU pragma: keep -// IWYU pragma: no_include -// IWYU pragma: no_include -#if FMT_VERSION < 60000 -#include // IWYU pragma: keep -#endif #include // IWYU pragma: keep // IWYU pragma: no_include // IWYU pragma: no_include @@ -38,6 +33,9 @@ #include #include #include +// IWYU pragma: no_include +// IWYU pragma: no_include +#include #include #include #include @@ -45,6 +43,7 @@ #include #include "base/idl_utils.h" +#include "common/gpid.h" #include "common/replication.codes.h" #include "pegasus_key_schema.h" #include "pegasus_utils.h" @@ -78,7 +77,7 @@ bool mlog_dump(command_executor *e, shell_context *sc, arguments args) {0, 0, 0, 0}}; bool detailed = false; - std::string input; + std::string plog_dir; std::string output; optind = 0; while (true) { @@ -92,7 +91,7 @@ bool mlog_dump(command_executor *e, shell_context *sc, arguments args) detailed = true; break; case 'i': - input = optarg; + plog_dir = optarg; break; case 'o': output = optarg; @@ -101,12 +100,26 @@ bool mlog_dump(command_executor *e, shell_context *sc, arguments args) return false; } } - if (input.empty()) { - fprintf(stderr, "ERROR: input is not specified\n"); + if (plog_dir.empty()) { + fmt::print(stderr, "ERROR: 'input' is not specified\n"); + return false; + } + if (!dsn::utils::filesystem::directory_exists(plog_dir)) { + fmt::print(stderr, "ERROR: '{}' is not a directory\n", plog_dir); return false; } - if (!dsn::utils::filesystem::directory_exists(input)) { - fprintf(stderr, "ERROR: input %s is not a directory\n", input.c_str()); + + const auto replica_path = std::filesystem::path(plog_dir).parent_path(); + const auto name = replica_path.filename().string(); + if (name.empty()) { + fmt::print(stderr, "ERROR: '{}' is not a valid plog directory\n", plog_dir); + return false; + } + + char app_type[128]; + int32_t app_id, pidx; + if (3 != sscanf(name.c_str(), "%d.%d.%s", &app_id, &pidx, app_type)) { + fmt::print(stderr, "ERROR: '{}' is not a valid plog directory\n", plog_dir); return false; } @@ -116,7 +129,7 @@ bool mlog_dump(command_executor *e, shell_context *sc, arguments args) } else { os_ptr = new std::ofstream(output); if (!*os_ptr) { - fprintf(stderr, "ERROR: open output file %s failed\n", output.c_str()); + fmt::print(stderr, "ERROR: open output file {} failed\n", output); delete os_ptr; return true; } @@ -218,11 +231,11 @@ bool mlog_dump(command_executor *e, shell_context *sc, arguments args) } dsn::replication::mutation_log_tool tool; - bool ret = tool.dump(input, os, callback); + bool ret = tool.dump(plog_dir, dsn::gpid(app_id, pidx), os, callback); if (!ret) { - fprintf(stderr, "ERROR: dump failed\n"); + fmt::print(stderr, "ERROR: dump failed\n"); } else { - fprintf(stderr, "Done\n"); + fmt::print(stderr, "Done\n"); } if (os_ptr != &std::cout) { @@ -246,7 +259,7 @@ bool local_get(command_executor *e, shell_context *sc, arguments args) rocksdb::DB *db; rocksdb::Status status = rocksdb::DB::OpenForReadOnly(db_opts, db_path, &db); if (!status.ok()) { - fprintf(stderr, "ERROR: open db failed: %s\n", status.ToString().c_str()); + fmt::print(stderr, "ERROR: open db failed: {}\n", status.ToString()); return true; } @@ -257,15 +270,15 @@ bool local_get(command_executor *e, shell_context *sc, arguments args) rocksdb::ReadOptions rd_opts; status = db->Get(rd_opts, skey, &value); if (!status.ok()) { - fprintf(stderr, "ERROR: get failed: %s\n", status.ToString().c_str()); + fmt::print(stderr, "ERROR: get failed: {}\n", status.ToString()); } else { uint32_t expire_ts = pegasus::pegasus_extract_expire_ts(0, value); dsn::blob user_data; pegasus::pegasus_extract_user_data(0, std::move(value), user_data); - fprintf(stderr, - "%u : \"%s\"\n", - expire_ts, - pegasus::utils::c_escape_string(user_data, sc->escape_all).c_str()); + fmt::print(stderr, + "{} : \"{}\"\n", + expire_ts, + pegasus::utils::c_escape_string(user_data, sc->escape_all)); } delete db; @@ -282,7 +295,7 @@ bool rdb_key_str2hex(command_executor *e, shell_context *sc, arguments args) ::dsn::blob key; pegasus::pegasus_generate_key(key, hash_key, sort_key); rocksdb::Slice skey(key.data(), key.length()); - fprintf(stderr, "\"%s\"\n", skey.ToString(true).c_str()); + fmt::print(stderr, "\"{}\"\n", skey.ToString(true)); return true; } @@ -312,12 +325,12 @@ bool rdb_value_hex2str(command_executor *e, shell_context *sc, arguments args) auto expire_ts = static_cast(pegasus::pegasus_extract_expire_ts(0, pegasus_value)) + pegasus::utils::epoch_begin; // TODO(wutao): pass user specified version std::time_t tm(expire_ts); - fmt::print(stderr, "\nWhen to expire:\n {:%Y-%m-%d %H:%M:%S}\n", *std::localtime(&tm)); + fmt::print(stderr, "\nWhen to expire:\n {:%Y-%m-%d %H:%M:%S}\n", fmt::localtime(tm)); dsn::blob user_data; pegasus::pegasus_extract_user_data(0, std::move(pegasus_value), user_data); - fprintf(stderr, - "user_data:\n \"%s\"\n", - pegasus::utils::c_escape_string(user_data.to_string(), sc->escape_all).c_str()); + fmt::print(stderr, + "user_data:\n \"{}\"\n", + pegasus::utils::c_escape_string(user_data.to_string(), sc->escape_all)); return true; } diff --git a/src/shell/config.ini b/src/shell/config.ini index 6c6ab5adcd..8126688393 100644 --- a/src/shell/config.ini +++ b/src/shell/config.ini @@ -24,7 +24,7 @@ count = 1 [apps.mimic] type = dsn.app.mimic arguments = -pools = THREAD_POOL_DEFAULT,THREAD_POOL_META_SERVER +pools = THREAD_POOL_DEFAULT,THREAD_POOL_META_SERVER,THREAD_POOL_REPLICATION run = true count = 1 diff --git a/src/shell/main.cpp b/src/shell/main.cpp index 50c1614c10..75a52abc63 100644 --- a/src/shell/main.cpp +++ b/src/shell/main.cpp @@ -392,7 +392,8 @@ static command_executor commands[] = { { "mlog_dump", "dump mutation log dir", - "<-i|--input log_dir> [-o|--output file_name] [-d|--detailed]", + "<-i|--input log_dir(e.g. '/path/to/replica/reps/2.1.pegasus/plog/')> [-o|--output " + "file_name] [-d|--detailed]", mlog_dump, }, { diff --git a/src/tools/mutation_log_tool.cpp b/src/tools/mutation_log_tool.cpp index 3c1177d4d9..b68a4979b5 100644 --- a/src/tools/mutation_log_tool.cpp +++ b/src/tools/mutation_log_tool.cpp @@ -27,29 +27,62 @@ #include "mutation_log_tool.h" #include +#include #include +#include "common/fs_manager.h" #include "common/gpid.h" #include "consensus_types.h" +#include "dsn.layer2_types.h" +#include "fmt/core.h" #include "replica/mutation.h" #include "replica/mutation_log.h" +#include "replica/replica.h" +#include "replica/replica_stub.h" #include "runtime/rpc/rpc_message.h" #include "runtime/task/task_spec.h" #include "utils/autoref_ptr.h" #include "utils/blob.h" +#include "utils/defer.h" #include "utils/error_code.h" +#include "utils/filesystem.h" +#include "utils/flags.h" #include "utils/time_utils.h" namespace dsn { namespace replication { +DSN_DECLARE_int32(log_private_file_size_mb); + bool mutation_log_tool::dump( const std::string &log_dir, + gpid pid, std::ostream &output, std::function callback) { - mutation_log_ptr mlog = new mutation_log_shared(log_dir, 32, false); + std::string absolute_path; + if (!utils::filesystem::get_absolute_path(log_dir, absolute_path)) { + output << fmt::format("ERROR: get absolute path failed\n"); + return false; + } + std::string norm_path; + utils::filesystem::get_normalized_path(absolute_path, norm_path); + auto dn = std::make_unique(/* tag_ */ "", norm_path); + app_info ai; + ai.__set_app_type("pegasus"); + auto stub = std::make_unique(); + // Constructor of replica is private which can not be accessed by std::make_unique, so use raw + // pointer here. + auto *rep = new replica(stub.get(), + pid, + ai, + dn.get(), + /* need_restore */ false, + /* is_duplication_follower */ false); + auto cleanup = dsn::defer([rep]() { delete rep; }); + auto mlog = + std::make_shared(log_dir, FLAGS_log_private_file_size_mb, pid, rep); error_code err = mlog->open( [mlog, &output, callback](int log_length, mutation_ptr &mu) -> bool { if (mlog->max_decree(mu->data.header.pid) == 0) { @@ -57,18 +90,19 @@ bool mutation_log_tool::dump( } char timestamp_buf[32] = {0}; utils::time_ms_to_string(mu->data.header.timestamp / 1000, timestamp_buf); - output << "mutation [" << mu->name() << "]: " - << "gpid=" << mu->data.header.pid.get_app_id() << "." - << mu->data.header.pid.get_partition_index() << ", " - << "ballot=" << mu->data.header.ballot << ", decree=" << mu->data.header.decree - << ", " - << "timestamp=" << timestamp_buf - << ", last_committed_decree=" << mu->data.header.last_committed_decree << ", " - << "log_offset=" << mu->data.header.log_offset << ", log_length=" << log_length - << ", " - << "update_count=" << mu->data.updates.size(); - if (callback && mu->data.updates.size() > 0) { - + output << fmt::format("mutation [{}]: gpid={}, ballot={}, decree={}, timestamp={}, " + "last_committed_decree={}, log_offset={}, log_length={}, " + "update_count={}\n", + mu->name(), + mu->data.header.pid, + mu->data.header.ballot, + mu->data.header.decree, + timestamp_buf, + mu->data.header.last_committed_decree, + mu->data.header.log_offset, + log_length, + mu->data.updates.size()); + if (callback && !mu->data.updates.empty()) { dsn::message_ex **batched_requests = (dsn::message_ex **)alloca(sizeof(dsn::message_ex *) * mu->data.updates.size()); int batched_count = 0; @@ -93,7 +127,7 @@ bool mutation_log_tool::dump( nullptr); mlog->close(); if (err != dsn::ERR_OK) { - output << "ERROR: dump mutation log failed, err = " << err.to_string() << std::endl; + output << fmt::format("ERROR: dump mutation log failed, err = {}\n", err); return false; } else { return true; diff --git a/src/tools/mutation_log_tool.h b/src/tools/mutation_log_tool.h index d313bdc4a4..e380be2afb 100644 --- a/src/tools/mutation_log_tool.h +++ b/src/tools/mutation_log_tool.h @@ -32,6 +32,7 @@ #include namespace dsn { +class gpid; class message_ex; namespace replication { @@ -41,9 +42,10 @@ class mutation_log_tool public: bool dump(const std::string &log_dir, + gpid pid, std::ostream &output, std::function callback); }; -} -} +} // namespace replication +} // namespace dsn diff --git a/src/utils/time_utils.cpp b/src/utils/time_utils.cpp index 711b4cd3d8..34504fc993 100644 --- a/src/utils/time_utils.cpp +++ b/src/utils/time_utils.cpp @@ -23,9 +23,6 @@ #include // IWYU pragma: keep // IWYU pragma: no_include // IWYU pragma: no_include -#if FMT_VERSION < 60000 -#include // IWYU pragma: keep -#endif #include // IWYU pragma: keep // IWYU pragma: no_include // IWYU pragma: no_include