diff --git a/idl/rrdb.thrift b/idl/rrdb.thrift index da778f55e7..c0534785db 100644 --- a/idl/rrdb.thrift +++ b/idl/rrdb.thrift @@ -68,11 +68,26 @@ enum mutate_operation MO_DELETE } +enum update_type +{ + UT_PUT, + UT_INCR +} + +// The single-put request, just writes a key/value pair into storage, which is certainly +// idempotent. struct update_request { 1:dsn.blob key; 2:dsn.blob value; 3:i32 expire_ts_seconds; + + // This field marks the type of a single-put request, mainly used to differentiate a general + // single-put request from the one translated from a non-idempotent atomic write request: + // - a general single-put request, if `type` is UT_PUT or not set by default as it's + // optional, or + // - a put request translated from a non-idempotent incr request, if `type` is UT_INCR. + 4:optional update_type type; } struct update_response diff --git a/src/server/pegasus_write_service_impl.h b/src/server/pegasus_write_service_impl.h index 35a9c6399d..a48a3d2ead 100644 --- a/src/server/pegasus_write_service_impl.h +++ b/src/server/pegasus_write_service_impl.h @@ -171,6 +171,105 @@ class pegasus_write_service::impl : public dsn::replication::replica_base return resp.error; } + // Tranlate an incr request which is certainly non-idempotent into a single-put request + // which is certainly idempotent. Return current status for RocksDB. + int make_idempotent(const dsn::apps::incr_request &req, + dsn::apps::incr_response &err_resp, + dsn::apps::update_request &update) + { + // Get old value from the RocksDB instance according to the provided key. + db_get_context get_ctx; + int err = _rocksdb_wrapper->get(req.key.to_string_view(), &get_ctx); + if (dsn_unlikely(err != rocksdb::Status::kOk)) { + return make_error_response(err, err_resp); + } + + if (!get_ctx.found || get_ctx.expired) { + // Once the provided key is not found or has been expired, we could assume that + // its value is 0 before incr; thus the final result for incr could be set as + // the value of the single-put request, i.e. req.increment. + return make_idempotent_request_for_incr(req.key, + req.increment, + req.expire_ts_seconds > 0 ? req.expire_ts_seconds : 0, + update); + } + + dsn::blob old_value; + pegasus_extract_user_data(_pegasus_data_version, std::move(get_ctx.raw_value), old_value); + + int64_t new_int = 0; + if (old_value.empty()) { + // Old value is also considered as 0 before incr as above once it's empty, thus + // set req.increment as the value for single put. + new_int = req.increment; + } else { + int64_t old_int = 0; + if (!dsn::buf2int64(old_value, old_int)) { + // Old value is not valid int64. + LOG_ERROR_PREFIX("incr failed: error = old value \"{}\" " + "is not an integer or out of range", + utils::c_escape_sensitive_string(old_value)); + return make_error_response(rocksdb::Status::kInvalidArgument, err_resp); + } + + new_int = old_int + req.increment; + if ((req.increment > 0 && new_int < old_int) || + (req.increment < 0 && new_int > old_int)) { + // New value overflows, just respond with the old value. + LOG_ERROR_PREFIX("incr failed: error = new value is out of range, " + "old_value = {}, increment = {}, new_value = {}", + old_int, + req.increment, + new_int); + return make_error_response(rocksdb::Status::kInvalidArgument, old_int, err_resp); + } + } + + // Set new TTL. + uint32_t new_expire_ts = 0; + if (req.expire_ts_seconds == 0) { + new_expire_ts = get_ctx.expire_ts; + } else if (req.expire_ts_seconds < 0) { + new_expire_ts = 0; + } else { // req.expire_ts_seconds > 0 + new_expire_ts = req.expire_ts_seconds; + } + + return make_idempotent_request_for_incr(req.key, new_int, new_expire_ts, update); + } + + // Apply single-put request translated from incr request into RocksDB, and build response + // for incr. Return current status for RocksDB. + int put(const db_write_context &ctx, + const dsn::apps::update_request &update, + dsn::apps::incr_response &resp) + { + resp.app_id = get_gpid().get_app_id(); + resp.partition_index = get_gpid().get_partition_index(); + resp.decree = ctx.decree; + resp.server = _primary_host_port; + + auto cleanup = dsn::defer([this]() { _rocksdb_wrapper->clear_up_write_batch(); }); + + resp.error = _rocksdb_wrapper->write_batch_put_ctx( + ctx, update.key.to_string_view(), update.value.to_string_view(), static_cast(update.expire_ts_seconds)); + if (dsn_unlikely(resp.error != rocksdb::Status::kOk)) { + return resp.error; + } + + resp.error = _rocksdb_wrapper->write(ctx.decree); + if (dsn_unlikely(resp.error != rocksdb::Status::kOk)) { + return resp.error; + } + + CHECK(dsn::buf2int64(update.value, resp.new_value), + "invalid int64 value for put incr: key={}, value={}", + update.key, + update.value); + + return resp.error; + } + int incr(int64_t decree, const dsn::apps::incr_request &update, dsn::apps::incr_response &resp) { resp.app_id = get_gpid().get_app_id(); @@ -242,7 +341,7 @@ class pegasus_write_service::impl : public dsn::replication::replica_base auto cleanup = dsn::defer([this]() { _rocksdb_wrapper->clear_up_write_batch(); }); resp.error = _rocksdb_wrapper->write_batch_put( decree, update.key.to_string_view(), std::to_string(new_value), new_expire_ts); - if (resp.error) { + if (resp.error != rocksdb::Status::kOk) { return resp.error; } @@ -250,6 +349,7 @@ class pegasus_write_service::impl : public dsn::replication::replica_base if (resp.error == rocksdb::Status::kOk) { resp.new_value = new_value; } + return resp.error; } @@ -569,6 +669,56 @@ class pegasus_write_service::impl : public dsn::replication::replica_base return raw_key; } + // Build a single-put request by provided int64-typed value. + static inline void make_idempotent_request(const dsn::blob &key, + int64_t value, + uint32_t expire_ts_seconds, + dsn::apps::update_type::type type, + dsn::apps::update_request &update) + { + update.key = key; + update.value = dsn::blob::create_from_numeric(value); + update.expire_ts_seconds = expire_ts_seconds; + update.__set_type(type); + } + + // Build corresponding single-put request for a incr request, and return current status + // for RocksDB, i.e. kOk. + static inline int make_idempotent_request_for_incr(const dsn::blob &key, + int64_t value, + uint32_t expire_ts_seconds, + dsn::apps::update_request &update) + { + make_idempotent_request( + key, value, expire_ts_seconds, dsn::apps::update_type::UT_INCR, update); + return rocksdb::Status::kOk; + } + + // Build incr response only for error, and return the current error status for RocksDB. + inline int make_error_response(int err, dsn::apps::incr_response &resp) + { + CHECK_NE_MSG(err, rocksdb::Status::kOk, "this incr response is built only for error"); + resp.error = err; + + const auto pid = get_gpid(); + resp.app_id = pid.get_app_id(); + resp.partition_index = pid.get_partition_index(); + + // Currently the mutation has not been assigned with valid decree, thus set to -1. + resp.decree = -1; + + resp.server = _primary_host_port; + + return err; + } + + // Build incr response as above, except that also set new value for response. + inline int make_error_response(int err, int64_t new_value, dsn::apps::incr_response &resp) + { + resp.new_value = new_value; + return make_error_response(err, resp); + } + // return true if the check type is supported static bool is_check_type_supported(::dsn::apps::cas_check_type::type check_type) { diff --git a/src/utils/blob.h b/src/utils/blob.h index 07675e9431..5b9d045c45 100644 --- a/src/utils/blob.h +++ b/src/utils/blob.h @@ -26,10 +26,12 @@ #pragma once -#include +#include #include - +#include #include +#include + #include #include @@ -126,6 +128,13 @@ class blob return {std::move(buf), static_cast(s->length())}; } + template ::value>::type> + [[nodiscard]] static blob create_from_numeric(TNum val) + { + return create_from_bytes(fmt::format("{}", val)); + } + void assign(const std::shared_ptr &buffer, int offset, unsigned int length) { _holder = buffer;