Skip to content

Commit

Permalink
feat(cpp-shell): make mlog_dump dump plog (#1760)
Browse files Browse the repository at this point in the history
Since Pegasus 2.5.0, the slog has been deprecated entirely, the plog is left.
The 'mlog_dump' tool in cpp-shell is used to dump slog ever, this patch transfer
it to dump plog.
  • Loading branch information
acelyc111 authored Dec 12, 2023
1 parent 5ea2a8d commit 224cb81
Show file tree
Hide file tree
Showing 7 changed files with 96 additions and 47 deletions.
2 changes: 2 additions & 0 deletions src/replica/replica.h
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,7 @@ class learn_notify_response;
class learn_request;
class learn_response;
class learn_state;
class mutation_log_tool;
class replica;
class replica_backup_manager;
class replica_bulk_loader;
Expand Down Expand Up @@ -529,6 +530,7 @@ class replica : public serverlet<replica>, public ref_counter, public replica_ba

private:
friend class ::dsn::replication::test::test_checker;
friend class ::dsn::replication::mutation_log_tool;
friend class ::dsn::replication::mutation_queue;
friend class ::dsn::replication::replica_stub;
friend class mock_replica;
Expand Down
65 changes: 39 additions & 26 deletions src/shell/commands/debugger.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -20,11 +20,6 @@
// IWYU pragma: no_include <bits/getopt_core.h>
// TODO(yingchun): refactor this after libfmt upgraded
#include <fmt/chrono.h> // IWYU pragma: keep
// IWYU pragma: no_include <fmt/core.h>
// IWYU pragma: no_include <fmt/format.h>
#if FMT_VERSION < 60000
#include <fmt/time.h> // IWYU pragma: keep
#endif
#include <fmt/printf.h> // IWYU pragma: keep
// IWYU pragma: no_include <algorithm>
// IWYU pragma: no_include <iterator>
Expand All @@ -38,13 +33,17 @@
#include <stdint.h>
#include <stdio.h>
#include <ctime>
// IWYU pragma: no_include <fmt/core.h>
// IWYU pragma: no_include <fmt/format.h>
#include <filesystem>
#include <functional>
#include <iostream>
#include <string>
#include <utility>
#include <vector>

#include "base/idl_utils.h"
#include "common/gpid.h"
#include "common/replication.codes.h"
#include "pegasus_key_schema.h"
#include "pegasus_utils.h"
Expand Down Expand Up @@ -78,7 +77,7 @@ bool mlog_dump(command_executor *e, shell_context *sc, arguments args)
{0, 0, 0, 0}};

bool detailed = false;
std::string input;
std::string plog_dir;
std::string output;
optind = 0;
while (true) {
Expand All @@ -92,7 +91,7 @@ bool mlog_dump(command_executor *e, shell_context *sc, arguments args)
detailed = true;
break;
case 'i':
input = optarg;
plog_dir = optarg;
break;
case 'o':
output = optarg;
Expand All @@ -101,12 +100,26 @@ bool mlog_dump(command_executor *e, shell_context *sc, arguments args)
return false;
}
}
if (input.empty()) {
fprintf(stderr, "ERROR: input is not specified\n");
if (plog_dir.empty()) {
fmt::print(stderr, "ERROR: 'input' is not specified\n");
return false;
}
if (!dsn::utils::filesystem::directory_exists(plog_dir)) {
fmt::print(stderr, "ERROR: '{}' is not a directory\n", plog_dir);
return false;
}
if (!dsn::utils::filesystem::directory_exists(input)) {
fprintf(stderr, "ERROR: input %s is not a directory\n", input.c_str());

const auto replica_path = std::filesystem::path(plog_dir).parent_path();
const auto name = replica_path.filename().string();
if (name.empty()) {
fmt::print(stderr, "ERROR: '{}' is not a valid plog directory\n", plog_dir);
return false;
}

char app_type[128];
int32_t app_id, pidx;
if (3 != sscanf(name.c_str(), "%d.%d.%s", &app_id, &pidx, app_type)) {
fmt::print(stderr, "ERROR: '{}' is not a valid plog directory\n", plog_dir);
return false;
}

Expand All @@ -116,7 +129,7 @@ bool mlog_dump(command_executor *e, shell_context *sc, arguments args)
} else {
os_ptr = new std::ofstream(output);
if (!*os_ptr) {
fprintf(stderr, "ERROR: open output file %s failed\n", output.c_str());
fmt::print(stderr, "ERROR: open output file {} failed\n", output);
delete os_ptr;
return true;
}
Expand Down Expand Up @@ -218,11 +231,11 @@ bool mlog_dump(command_executor *e, shell_context *sc, arguments args)
}

dsn::replication::mutation_log_tool tool;
bool ret = tool.dump(input, os, callback);
bool ret = tool.dump(plog_dir, dsn::gpid(app_id, pidx), os, callback);
if (!ret) {
fprintf(stderr, "ERROR: dump failed\n");
fmt::print(stderr, "ERROR: dump failed\n");
} else {
fprintf(stderr, "Done\n");
fmt::print(stderr, "Done\n");
}

if (os_ptr != &std::cout) {
Expand All @@ -246,7 +259,7 @@ bool local_get(command_executor *e, shell_context *sc, arguments args)
rocksdb::DB *db;
rocksdb::Status status = rocksdb::DB::OpenForReadOnly(db_opts, db_path, &db);
if (!status.ok()) {
fprintf(stderr, "ERROR: open db failed: %s\n", status.ToString().c_str());
fmt::print(stderr, "ERROR: open db failed: {}\n", status.ToString());
return true;
}

Expand All @@ -257,15 +270,15 @@ bool local_get(command_executor *e, shell_context *sc, arguments args)
rocksdb::ReadOptions rd_opts;
status = db->Get(rd_opts, skey, &value);
if (!status.ok()) {
fprintf(stderr, "ERROR: get failed: %s\n", status.ToString().c_str());
fmt::print(stderr, "ERROR: get failed: {}\n", status.ToString());
} else {
uint32_t expire_ts = pegasus::pegasus_extract_expire_ts(0, value);
dsn::blob user_data;
pegasus::pegasus_extract_user_data(0, std::move(value), user_data);
fprintf(stderr,
"%u : \"%s\"\n",
expire_ts,
pegasus::utils::c_escape_string(user_data, sc->escape_all).c_str());
fmt::print(stderr,
"{} : \"{}\"\n",
expire_ts,
pegasus::utils::c_escape_string(user_data, sc->escape_all));
}

delete db;
Expand All @@ -282,7 +295,7 @@ bool rdb_key_str2hex(command_executor *e, shell_context *sc, arguments args)
::dsn::blob key;
pegasus::pegasus_generate_key(key, hash_key, sort_key);
rocksdb::Slice skey(key.data(), key.length());
fprintf(stderr, "\"%s\"\n", skey.ToString(true).c_str());
fmt::print(stderr, "\"{}\"\n", skey.ToString(true));
return true;
}

Expand Down Expand Up @@ -312,12 +325,12 @@ bool rdb_value_hex2str(command_executor *e, shell_context *sc, arguments args)
auto expire_ts = static_cast<int64_t>(pegasus::pegasus_extract_expire_ts(0, pegasus_value)) +
pegasus::utils::epoch_begin; // TODO(wutao): pass user specified version
std::time_t tm(expire_ts);
fmt::print(stderr, "\nWhen to expire:\n {:%Y-%m-%d %H:%M:%S}\n", *std::localtime(&tm));
fmt::print(stderr, "\nWhen to expire:\n {:%Y-%m-%d %H:%M:%S}\n", fmt::localtime(tm));

dsn::blob user_data;
pegasus::pegasus_extract_user_data(0, std::move(pegasus_value), user_data);
fprintf(stderr,
"user_data:\n \"%s\"\n",
pegasus::utils::c_escape_string(user_data.to_string(), sc->escape_all).c_str());
fmt::print(stderr,
"user_data:\n \"{}\"\n",
pegasus::utils::c_escape_string(user_data.to_string(), sc->escape_all));
return true;
}
2 changes: 1 addition & 1 deletion src/shell/config.ini
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ count = 1
[apps.mimic]
type = dsn.app.mimic
arguments =
pools = THREAD_POOL_DEFAULT,THREAD_POOL_META_SERVER
pools = THREAD_POOL_DEFAULT,THREAD_POOL_META_SERVER,THREAD_POOL_REPLICATION
run = true
count = 1

Expand Down
3 changes: 2 additions & 1 deletion src/shell/main.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -392,7 +392,8 @@ static command_executor commands[] = {
{
"mlog_dump",
"dump mutation log dir",
"<-i|--input log_dir> [-o|--output file_name] [-d|--detailed]",
"<-i|--input log_dir(e.g. '/path/to/replica/reps/2.1.pegasus/plog/')> [-o|--output "
"file_name] [-d|--detailed]",
mlog_dump,
},
{
Expand Down
62 changes: 48 additions & 14 deletions src/tools/mutation_log_tool.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -27,48 +27,82 @@
#include "mutation_log_tool.h"

#include <alloca.h>
#include <memory>
#include <vector>

#include "common/fs_manager.h"
#include "common/gpid.h"
#include "consensus_types.h"
#include "dsn.layer2_types.h"
#include "fmt/core.h"
#include "replica/mutation.h"
#include "replica/mutation_log.h"
#include "replica/replica.h"
#include "replica/replica_stub.h"
#include "runtime/rpc/rpc_message.h"
#include "runtime/task/task_spec.h"
#include "utils/autoref_ptr.h"
#include "utils/blob.h"
#include "utils/defer.h"
#include "utils/error_code.h"
#include "utils/filesystem.h"
#include "utils/flags.h"
#include "utils/time_utils.h"

namespace dsn {
namespace replication {

DSN_DECLARE_int32(log_private_file_size_mb);

bool mutation_log_tool::dump(
const std::string &log_dir,
gpid pid,
std::ostream &output,
std::function<void(int64_t decree, int64_t timestamp, dsn::message_ex **requests, int count)>
callback)
{
mutation_log_ptr mlog = new mutation_log_shared(log_dir, 32, false);
std::string absolute_path;
if (!utils::filesystem::get_absolute_path(log_dir, absolute_path)) {
output << fmt::format("ERROR: get absolute path failed\n");
return false;
}
std::string norm_path;
utils::filesystem::get_normalized_path(absolute_path, norm_path);
auto dn = std::make_unique<dir_node>(/* tag_ */ "", norm_path);
app_info ai;
ai.__set_app_type("pegasus");
auto stub = std::make_unique<replica_stub>();
// Constructor of replica is private which can not be accessed by std::make_unique, so use raw
// pointer here.
auto *rep = new replica(stub.get(),
pid,
ai,
dn.get(),
/* need_restore */ false,
/* is_duplication_follower */ false);
auto cleanup = dsn::defer([rep]() { delete rep; });
auto mlog =
std::make_shared<mutation_log_private>(log_dir, FLAGS_log_private_file_size_mb, pid, rep);
error_code err = mlog->open(
[mlog, &output, callback](int log_length, mutation_ptr &mu) -> bool {
if (mlog->max_decree(mu->data.header.pid) == 0) {
mlog->set_valid_start_offset_on_open(mu->data.header.pid, 0);
}
char timestamp_buf[32] = {0};
utils::time_ms_to_string(mu->data.header.timestamp / 1000, timestamp_buf);
output << "mutation [" << mu->name() << "]: "
<< "gpid=" << mu->data.header.pid.get_app_id() << "."
<< mu->data.header.pid.get_partition_index() << ", "
<< "ballot=" << mu->data.header.ballot << ", decree=" << mu->data.header.decree
<< ", "
<< "timestamp=" << timestamp_buf
<< ", last_committed_decree=" << mu->data.header.last_committed_decree << ", "
<< "log_offset=" << mu->data.header.log_offset << ", log_length=" << log_length
<< ", "
<< "update_count=" << mu->data.updates.size();
if (callback && mu->data.updates.size() > 0) {

output << fmt::format("mutation [{}]: gpid={}, ballot={}, decree={}, timestamp={}, "
"last_committed_decree={}, log_offset={}, log_length={}, "
"update_count={}\n",
mu->name(),
mu->data.header.pid,
mu->data.header.ballot,
mu->data.header.decree,
timestamp_buf,
mu->data.header.last_committed_decree,
mu->data.header.log_offset,
log_length,
mu->data.updates.size());
if (callback && !mu->data.updates.empty()) {
dsn::message_ex **batched_requests =
(dsn::message_ex **)alloca(sizeof(dsn::message_ex *) * mu->data.updates.size());
int batched_count = 0;
Expand All @@ -93,7 +127,7 @@ bool mutation_log_tool::dump(
nullptr);
mlog->close();
if (err != dsn::ERR_OK) {
output << "ERROR: dump mutation log failed, err = " << err.to_string() << std::endl;
output << fmt::format("ERROR: dump mutation log failed, err = {}\n", err);
return false;
} else {
return true;
Expand Down
6 changes: 4 additions & 2 deletions src/tools/mutation_log_tool.h
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
#include <string>

namespace dsn {
class gpid;
class message_ex;

namespace replication {
Expand All @@ -41,9 +42,10 @@ class mutation_log_tool
public:
bool
dump(const std::string &log_dir,
gpid pid,
std::ostream &output,
std::function<void(
int64_t decree, int64_t timestamp, dsn::message_ex **requests, int count)> callback);
};
}
}
} // namespace replication
} // namespace dsn
3 changes: 0 additions & 3 deletions src/utils/time_utils.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -23,9 +23,6 @@
#include <fmt/chrono.h> // IWYU pragma: keep
// IWYU pragma: no_include <fmt/core.h>
// IWYU pragma: no_include <fmt/format.h>
#if FMT_VERSION < 60000
#include <fmt/time.h> // IWYU pragma: keep
#endif
#include <fmt/printf.h> // IWYU pragma: keep
// IWYU pragma: no_include <algorithm>
// IWYU pragma: no_include <iterator>
Expand Down

0 comments on commit 224cb81

Please sign in to comment.