Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(cpp-shell): make mlog_dump dump plog #1760

Merged
merged 15 commits into from
Dec 12, 2023
2 changes: 2 additions & 0 deletions src/replica/replica.h
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,7 @@ class learn_notify_response;
class learn_request;
class learn_response;
class learn_state;
class mutation_log_tool;
class replica;
class replica_backup_manager;
class replica_bulk_loader;
Expand Down Expand Up @@ -529,6 +530,7 @@ class replica : public serverlet<replica>, public ref_counter, public replica_ba

private:
friend class ::dsn::replication::test::test_checker;
friend class ::dsn::replication::mutation_log_tool;
friend class ::dsn::replication::mutation_queue;
friend class ::dsn::replication::replica_stub;
friend class mock_replica;
Expand Down
65 changes: 39 additions & 26 deletions src/shell/commands/debugger.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -20,11 +20,6 @@
// IWYU pragma: no_include <bits/getopt_core.h>
// TODO(yingchun): refactor this after libfmt upgraded
#include <fmt/chrono.h> // IWYU pragma: keep
// IWYU pragma: no_include <fmt/core.h>
// IWYU pragma: no_include <fmt/format.h>
#if FMT_VERSION < 60000
#include <fmt/time.h> // IWYU pragma: keep
#endif
#include <fmt/printf.h> // IWYU pragma: keep
// IWYU pragma: no_include <algorithm>
// IWYU pragma: no_include <iterator>
Expand All @@ -38,13 +33,17 @@
#include <stdint.h>
#include <stdio.h>
#include <ctime>
// IWYU pragma: no_include <fmt/core.h>
// IWYU pragma: no_include <fmt/format.h>
#include <filesystem>
#include <functional>
#include <iostream>
#include <string>
#include <utility>
#include <vector>

#include "base/idl_utils.h"
#include "common/gpid.h"
#include "common/replication.codes.h"
#include "pegasus_key_schema.h"
#include "pegasus_utils.h"
Expand Down Expand Up @@ -78,7 +77,7 @@ bool mlog_dump(command_executor *e, shell_context *sc, arguments args)
{0, 0, 0, 0}};

bool detailed = false;
std::string input;
std::string plog_dir;
std::string output;
optind = 0;
while (true) {
Expand All @@ -92,7 +91,7 @@ bool mlog_dump(command_executor *e, shell_context *sc, arguments args)
detailed = true;
break;
case 'i':
input = optarg;
plog_dir = optarg;
break;
case 'o':
output = optarg;
Expand All @@ -101,12 +100,26 @@ bool mlog_dump(command_executor *e, shell_context *sc, arguments args)
return false;
}
}
if (input.empty()) {
fprintf(stderr, "ERROR: input is not specified\n");
if (plog_dir.empty()) {
fmt::print(stderr, "ERROR: 'input' is not specified\n");
return false;
}
if (!dsn::utils::filesystem::directory_exists(plog_dir)) {
fmt::print(stderr, "ERROR: '{}' is not a directory\n", plog_dir);
return false;
}
if (!dsn::utils::filesystem::directory_exists(input)) {
fprintf(stderr, "ERROR: input %s is not a directory\n", input.c_str());

const auto replica_path = std::filesystem::path(plog_dir).parent_path();
const auto name = replica_path.filename().string();
if (name.empty()) {
fmt::print(stderr, "ERROR: '{}' is not a valid plog directory\n", plog_dir);
return false;
}

char app_type[128];
int32_t app_id, pidx;
if (3 != sscanf(name.c_str(), "%d.%d.%s", &app_id, &pidx, app_type)) {
fmt::print(stderr, "ERROR: '{}' is not a valid plog directory\n", plog_dir);
return false;
}

Expand All @@ -116,7 +129,7 @@ bool mlog_dump(command_executor *e, shell_context *sc, arguments args)
} else {
os_ptr = new std::ofstream(output);
if (!*os_ptr) {
fprintf(stderr, "ERROR: open output file %s failed\n", output.c_str());
fmt::print(stderr, "ERROR: open output file {} failed\n", output);
delete os_ptr;
return true;
}
Expand Down Expand Up @@ -218,11 +231,11 @@ bool mlog_dump(command_executor *e, shell_context *sc, arguments args)
}

dsn::replication::mutation_log_tool tool;
bool ret = tool.dump(input, os, callback);
bool ret = tool.dump(plog_dir, dsn::gpid(app_id, pidx), os, callback);
if (!ret) {
fprintf(stderr, "ERROR: dump failed\n");
fmt::print(stderr, "ERROR: dump failed\n");
} else {
fprintf(stderr, "Done\n");
fmt::print(stderr, "Done\n");
}

if (os_ptr != &std::cout) {
Expand All @@ -246,7 +259,7 @@ bool local_get(command_executor *e, shell_context *sc, arguments args)
rocksdb::DB *db;
rocksdb::Status status = rocksdb::DB::OpenForReadOnly(db_opts, db_path, &db);
if (!status.ok()) {
fprintf(stderr, "ERROR: open db failed: %s\n", status.ToString().c_str());
fmt::print(stderr, "ERROR: open db failed: {}\n", status.ToString());
return true;
}

Expand All @@ -257,15 +270,15 @@ bool local_get(command_executor *e, shell_context *sc, arguments args)
rocksdb::ReadOptions rd_opts;
status = db->Get(rd_opts, skey, &value);
if (!status.ok()) {
fprintf(stderr, "ERROR: get failed: %s\n", status.ToString().c_str());
fmt::print(stderr, "ERROR: get failed: {}\n", status.ToString());
} else {
uint32_t expire_ts = pegasus::pegasus_extract_expire_ts(0, value);
dsn::blob user_data;
pegasus::pegasus_extract_user_data(0, std::move(value), user_data);
fprintf(stderr,
"%u : \"%s\"\n",
expire_ts,
pegasus::utils::c_escape_string(user_data, sc->escape_all).c_str());
fmt::print(stderr,
"{} : \"{}\"\n",
expire_ts,
pegasus::utils::c_escape_string(user_data, sc->escape_all));
}

delete db;
Expand All @@ -282,7 +295,7 @@ bool rdb_key_str2hex(command_executor *e, shell_context *sc, arguments args)
::dsn::blob key;
pegasus::pegasus_generate_key(key, hash_key, sort_key);
rocksdb::Slice skey(key.data(), key.length());
fprintf(stderr, "\"%s\"\n", skey.ToString(true).c_str());
fmt::print(stderr, "\"{}\"\n", skey.ToString(true));
return true;
}

Expand Down Expand Up @@ -312,12 +325,12 @@ bool rdb_value_hex2str(command_executor *e, shell_context *sc, arguments args)
auto expire_ts = static_cast<int64_t>(pegasus::pegasus_extract_expire_ts(0, pegasus_value)) +
pegasus::utils::epoch_begin; // TODO(wutao): pass user specified version
std::time_t tm(expire_ts);
fmt::print(stderr, "\nWhen to expire:\n {:%Y-%m-%d %H:%M:%S}\n", *std::localtime(&tm));
fmt::print(stderr, "\nWhen to expire:\n {:%Y-%m-%d %H:%M:%S}\n", fmt::localtime(tm));

dsn::blob user_data;
pegasus::pegasus_extract_user_data(0, std::move(pegasus_value), user_data);
fprintf(stderr,
"user_data:\n \"%s\"\n",
pegasus::utils::c_escape_string(user_data.to_string(), sc->escape_all).c_str());
fmt::print(stderr,
"user_data:\n \"{}\"\n",
pegasus::utils::c_escape_string(user_data.to_string(), sc->escape_all));
return true;
}
2 changes: 1 addition & 1 deletion src/shell/config.ini
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ count = 1
[apps.mimic]
type = dsn.app.mimic
arguments =
pools = THREAD_POOL_DEFAULT,THREAD_POOL_META_SERVER
pools = THREAD_POOL_DEFAULT,THREAD_POOL_META_SERVER,THREAD_POOL_REPLICATION
run = true
count = 1

Expand Down
3 changes: 2 additions & 1 deletion src/shell/main.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -392,7 +392,8 @@ static command_executor commands[] = {
{
"mlog_dump",
"dump mutation log dir",
"<-i|--input log_dir> [-o|--output file_name] [-d|--detailed]",
"<-i|--input log_dir(e.g. '/path/to/replica/reps/2.1.pegasus/plog/')> [-o|--output "
"file_name] [-d|--detailed]",
mlog_dump,
},
{
Expand Down
62 changes: 48 additions & 14 deletions src/tools/mutation_log_tool.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -27,48 +27,82 @@
#include "mutation_log_tool.h"

#include <alloca.h>
#include <memory>
#include <vector>

#include "common/fs_manager.h"
#include "common/gpid.h"
#include "consensus_types.h"
#include "dsn.layer2_types.h"
#include "fmt/core.h"
#include "replica/mutation.h"
#include "replica/mutation_log.h"
#include "replica/replica.h"
#include "replica/replica_stub.h"
#include "runtime/rpc/rpc_message.h"
#include "runtime/task/task_spec.h"
#include "utils/autoref_ptr.h"
#include "utils/blob.h"
#include "utils/defer.h"
#include "utils/error_code.h"
#include "utils/filesystem.h"
#include "utils/flags.h"
#include "utils/time_utils.h"

namespace dsn {
namespace replication {

DSN_DECLARE_int32(log_private_file_size_mb);

bool mutation_log_tool::dump(
const std::string &log_dir,
gpid pid,
std::ostream &output,
std::function<void(int64_t decree, int64_t timestamp, dsn::message_ex **requests, int count)>
callback)
{
mutation_log_ptr mlog = new mutation_log_shared(log_dir, 32, false);
std::string absolute_path;
if (!utils::filesystem::get_absolute_path(log_dir, absolute_path)) {
output << fmt::format("ERROR: get absolute path failed\n");
return false;
}
std::string norm_path;
utils::filesystem::get_normalized_path(absolute_path, norm_path);
auto dn = std::make_unique<dir_node>(/* tag_ */ "", norm_path);
app_info ai;
ai.__set_app_type("pegasus");
auto stub = std::make_unique<replica_stub>();
// Constructor of replica is private which can not be accessed by std::make_unique, so use raw
// pointer here.
auto *rep = new replica(stub.get(),
pid,
ai,
dn.get(),
/* need_restore */ false,
/* is_duplication_follower */ false);
auto cleanup = dsn::defer([rep]() { delete rep; });
auto mlog =
std::make_shared<mutation_log_private>(log_dir, FLAGS_log_private_file_size_mb, pid, rep);
error_code err = mlog->open(
[mlog, &output, callback](int log_length, mutation_ptr &mu) -> bool {
if (mlog->max_decree(mu->data.header.pid) == 0) {
mlog->set_valid_start_offset_on_open(mu->data.header.pid, 0);
}
char timestamp_buf[32] = {0};
utils::time_ms_to_string(mu->data.header.timestamp / 1000, timestamp_buf);
output << "mutation [" << mu->name() << "]: "
<< "gpid=" << mu->data.header.pid.get_app_id() << "."
<< mu->data.header.pid.get_partition_index() << ", "
<< "ballot=" << mu->data.header.ballot << ", decree=" << mu->data.header.decree
<< ", "
<< "timestamp=" << timestamp_buf
<< ", last_committed_decree=" << mu->data.header.last_committed_decree << ", "
<< "log_offset=" << mu->data.header.log_offset << ", log_length=" << log_length
<< ", "
<< "update_count=" << mu->data.updates.size();
if (callback && mu->data.updates.size() > 0) {

output << fmt::format("mutation [{}]: gpid={}, ballot={}, decree={}, timestamp={}, "
"last_committed_decree={}, log_offset={}, log_length={}, "
"update_count={}\n",
mu->name(),
mu->data.header.pid,
mu->data.header.ballot,
mu->data.header.decree,
timestamp_buf,
mu->data.header.last_committed_decree,
mu->data.header.log_offset,
log_length,
mu->data.updates.size());
if (callback && !mu->data.updates.empty()) {
dsn::message_ex **batched_requests =
(dsn::message_ex **)alloca(sizeof(dsn::message_ex *) * mu->data.updates.size());
int batched_count = 0;
Expand All @@ -93,7 +127,7 @@ bool mutation_log_tool::dump(
nullptr);
mlog->close();
if (err != dsn::ERR_OK) {
output << "ERROR: dump mutation log failed, err = " << err.to_string() << std::endl;
output << fmt::format("ERROR: dump mutation log failed, err = {}\n", err);
return false;
} else {
return true;
Expand Down
6 changes: 4 additions & 2 deletions src/tools/mutation_log_tool.h
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
#include <string>

namespace dsn {
class gpid;
class message_ex;

namespace replication {
Expand All @@ -41,9 +42,10 @@ class mutation_log_tool
public:
bool
dump(const std::string &log_dir,
gpid pid,
std::ostream &output,
std::function<void(
int64_t decree, int64_t timestamp, dsn::message_ex **requests, int count)> callback);
};
}
}
} // namespace replication
} // namespace dsn
3 changes: 0 additions & 3 deletions src/utils/time_utils.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -23,9 +23,6 @@
#include <fmt/chrono.h> // IWYU pragma: keep
// IWYU pragma: no_include <fmt/core.h>
// IWYU pragma: no_include <fmt/format.h>
#if FMT_VERSION < 60000
#include <fmt/time.h> // IWYU pragma: keep
#endif
#include <fmt/printf.h> // IWYU pragma: keep
// IWYU pragma: no_include <algorithm>
// IWYU pragma: no_include <iterator>
Expand Down
Loading