Skip to content

Commit

Permalink
feat(make_idempotent): implement idempotent writes for incr request
Browse files Browse the repository at this point in the history
  • Loading branch information
empiredan committed Jan 21, 2025
1 parent c950ffe commit 1c3edb1
Show file tree
Hide file tree
Showing 3 changed files with 177 additions and 3 deletions.
15 changes: 15 additions & 0 deletions idl/rrdb.thrift
Original file line number Diff line number Diff line change
Expand Up @@ -68,11 +68,26 @@ enum mutate_operation
MO_DELETE
}

enum update_type
{
UT_PUT,
UT_INCR
}

// The single-put request, just writes a key/value pair into storage, which is certainly
// idempotent.
struct update_request
{
1:dsn.blob key;
2:dsn.blob value;
3:i32 expire_ts_seconds;

// This field marks the type of a single-put request, mainly used to differentiate a general
// single-put request from the one translated from a non-idempotent atomic write request:
// - a general single-put request, if `type` is UT_PUT or not set by default as it's
// optional, or
// - a put request translated from a non-idempotent incr request, if `type` is UT_INCR.
4:optional update_type type;
}

struct update_response
Expand Down
152 changes: 151 additions & 1 deletion src/server/pegasus_write_service_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -171,6 +171,105 @@ class pegasus_write_service::impl : public dsn::replication::replica_base
return resp.error;
}

// Tranlate an incr request which is certainly non-idempotent into a single-put request
// which is certainly idempotent. Return current status for RocksDB.
int make_idempotent(const dsn::apps::incr_request &req,
dsn::apps::incr_response &err_resp,
dsn::apps::update_request &update)
{
// Get old value from the RocksDB instance according to the provided key.
db_get_context get_ctx;
int err = _rocksdb_wrapper->get(req.key.to_string_view(), &get_ctx);
if (dsn_unlikely(err != rocksdb::Status::kOk)) {
return make_error_response(err, err_resp);
}

if (!get_ctx.found || get_ctx.expired) {
// Once the provided key is not found or has been expired, we could assume that
// its value is 0 before incr; thus the final result for incr could be set as
// the value of the single-put request, i.e. req.increment.
return make_idempotent_request_for_incr(req.key,
req.increment,
req.expire_ts_seconds > 0 ? req.expire_ts_seconds : 0,
update);
}

dsn::blob old_value;
pegasus_extract_user_data(_pegasus_data_version, std::move(get_ctx.raw_value), old_value);

int64_t new_int = 0;
if (old_value.empty()) {
// Old value is also considered as 0 before incr as above once it's empty, thus
// set req.increment as the value for single put.
new_int = req.increment;
} else {
int64_t old_int = 0;
if (!dsn::buf2int64(old_value, old_int)) {
// Old value is not valid int64.
LOG_ERROR_PREFIX("incr failed: error = old value \"{}\" "
"is not an integer or out of range",
utils::c_escape_sensitive_string(old_value));
return make_error_response(rocksdb::Status::kInvalidArgument, err_resp);
}

new_int = old_int + req.increment;
if ((req.increment > 0 && new_int < old_int) ||
(req.increment < 0 && new_int > old_int)) {
// New value overflows, just respond with the old value.
LOG_ERROR_PREFIX("incr failed: error = new value is out of range, "
"old_value = {}, increment = {}, new_value = {}",
old_int,
req.increment,
new_int);
return make_error_response(rocksdb::Status::kInvalidArgument, old_int, err_resp);
}
}

// Set new TTL.
uint32_t new_expire_ts = 0;
if (req.expire_ts_seconds == 0) {
new_expire_ts = get_ctx.expire_ts;
} else if (req.expire_ts_seconds < 0) {
new_expire_ts = 0;
} else { // req.expire_ts_seconds > 0
new_expire_ts = req.expire_ts_seconds;
}

return make_idempotent_request_for_incr(req.key, new_int, new_expire_ts, update);
}

// Apply single-put request translated from incr request into RocksDB, and build response
// for incr. Return current status for RocksDB.
int put(const db_write_context &ctx,
const dsn::apps::update_request &update,
dsn::apps::incr_response &resp)
{
resp.app_id = get_gpid().get_app_id();
resp.partition_index = get_gpid().get_partition_index();
resp.decree = ctx.decree;
resp.server = _primary_host_port;

auto cleanup = dsn::defer([this]() { _rocksdb_wrapper->clear_up_write_batch(); });

resp.error = _rocksdb_wrapper->write_batch_put_ctx(
ctx, update.key.to_string_view(), update.value.to_string_view(), static_cast<uint32_t>(update.expire_ts_seconds));
if (dsn_unlikely(resp.error != rocksdb::Status::kOk)) {
return resp.error;
}

resp.error = _rocksdb_wrapper->write(ctx.decree);
if (dsn_unlikely(resp.error != rocksdb::Status::kOk)) {
return resp.error;
}

CHECK(dsn::buf2int64(update.value, resp.new_value),
"invalid int64 value for put incr: key={}, value={}",
update.key,
update.value);

return resp.error;
}

int incr(int64_t decree, const dsn::apps::incr_request &update, dsn::apps::incr_response &resp)
{
resp.app_id = get_gpid().get_app_id();
Expand Down Expand Up @@ -242,14 +341,15 @@ class pegasus_write_service::impl : public dsn::replication::replica_base
auto cleanup = dsn::defer([this]() { _rocksdb_wrapper->clear_up_write_batch(); });
resp.error = _rocksdb_wrapper->write_batch_put(
decree, update.key.to_string_view(), std::to_string(new_value), new_expire_ts);
if (resp.error) {
if (resp.error != rocksdb::Status::kOk) {
return resp.error;
}

resp.error = _rocksdb_wrapper->write(decree);
if (resp.error == rocksdb::Status::kOk) {
resp.new_value = new_value;
}

return resp.error;
}

Expand Down Expand Up @@ -569,6 +669,56 @@ class pegasus_write_service::impl : public dsn::replication::replica_base
return raw_key;
}

// Build a single-put request by provided int64-typed value.
static inline void make_idempotent_request(const dsn::blob &key,
int64_t value,
uint32_t expire_ts_seconds,
dsn::apps::update_type::type type,
dsn::apps::update_request &update)
{
update.key = key;
update.value = dsn::blob::create_from_numeric(value);
update.expire_ts_seconds = expire_ts_seconds;
update.__set_type(type);
}

// Build corresponding single-put request for a incr request, and return current status
// for RocksDB, i.e. kOk.
static inline int make_idempotent_request_for_incr(const dsn::blob &key,
int64_t value,
uint32_t expire_ts_seconds,
dsn::apps::update_request &update)
{
make_idempotent_request(
key, value, expire_ts_seconds, dsn::apps::update_type::UT_INCR, update);
return rocksdb::Status::kOk;
}

// Build incr response only for error, and return the current error status for RocksDB.
inline int make_error_response(int err, dsn::apps::incr_response &resp)
{
CHECK_NE_MSG(err, rocksdb::Status::kOk, "this incr response is built only for error");
resp.error = err;

const auto pid = get_gpid();
resp.app_id = pid.get_app_id();
resp.partition_index = pid.get_partition_index();

// Currently the mutation has not been assigned with valid decree, thus set to -1.
resp.decree = -1;

resp.server = _primary_host_port;

return err;
}

// Build incr response as above, except that also set new value for response.
inline int make_error_response(int err, int64_t new_value, dsn::apps::incr_response &resp)
{
resp.new_value = new_value;
return make_error_response(err, resp);
}

// return true if the check type is supported
static bool is_check_type_supported(::dsn::apps::cas_check_type::type check_type)
{
Expand Down
13 changes: 11 additions & 2 deletions src/utils/blob.h
Original file line number Diff line number Diff line change
Expand Up @@ -26,10 +26,12 @@

#pragma once

#include <memory>
#include <fmt/core.h>
#include <cstring>

#include <memory>
#include <string_view>
#include <type_traits>

#include <thrift/protocol/TBinaryProtocol.h>
#include <thrift/protocol/TProtocol.h>

Expand Down Expand Up @@ -126,6 +128,13 @@ class blob
return {std::move(buf), static_cast<unsigned int>(s->length())};
}

template <typename TNum,
typename = typename std::enable_if<std::is_arithmetic<TNum>::value>::type>
[[nodiscard]] static blob create_from_numeric(TNum val)
{
return create_from_bytes(fmt::format("{}", val));
}

void assign(const std::shared_ptr<char> &buffer, int offset, unsigned int length)
{
_holder = buffer;
Expand Down

0 comments on commit 1c3edb1

Please sign in to comment.