From f5c322f01e6e20df1425f1e4df70dd2a43d7b77a Mon Sep 17 00:00:00 2001 From: Dan Wang Date: Thu, 16 Jan 2025 12:37:07 +0800 Subject: [PATCH] refactor: specify data column family explicitly for RocksDB wrapper (#2182) Currently the wrapper operates RocksDB data still by default column family handler. To make data/meta column family both specified explicitly while accessing RocksDB instance, introduce data column family handler into the wrapper. --- src/server/rocksdb_wrapper.cpp | 19 ++++++++++++------- src/server/rocksdb_wrapper.h | 1 + 2 files changed, 13 insertions(+), 7 deletions(-) diff --git a/src/server/rocksdb_wrapper.cpp b/src/server/rocksdb_wrapper.cpp index 078b2b9d8a..6f5ba8bc34 100644 --- a/src/server/rocksdb_wrapper.cpp +++ b/src/server/rocksdb_wrapper.cpp @@ -61,6 +61,7 @@ rocksdb_wrapper::rocksdb_wrapper(pegasus_server_impl *server) : replica_base(server), _db(server->_db), _rd_opts(server->_data_cf_rd_opts), + _data_cf(server->_data_cf), _meta_cf(server->_meta_cf), _pegasus_data_version(server->_pegasus_data_version), METRIC_VAR_INIT_replica(read_expired_values), @@ -78,7 +79,8 @@ int rocksdb_wrapper::get(std::string_view raw_key, /*out*/ db_get_context *ctx) { FAIL_POINT_INJECT_F("db_get", [](std::string_view) -> int { return FAIL_DB_GET; }); - rocksdb::Status s = _db->Get(_rd_opts, utils::to_rocksdb_slice(raw_key), &(ctx->raw_value)); + rocksdb::Status s = + _db->Get(_rd_opts, _data_cf, utils::to_rocksdb_slice(raw_key), &ctx->raw_value); if (dsn_likely(s.ok())) { // success ctx->found = true; @@ -94,7 +96,8 @@ int rocksdb_wrapper::get(std::string_view raw_key, /*out*/ db_get_context *ctx) return rocksdb::Status::kOk; } - dsn::blob hash_key, sort_key; + dsn::blob hash_key; + dsn::blob sort_key; pegasus_restore_key(dsn::blob(raw_key.data(), 0, raw_key.size()), hash_key, sort_key); LOG_ERROR_ROCKSDB("Get", s.ToString(), @@ -152,9 +155,10 @@ int rocksdb_wrapper::write_batch_put_ctx(const db_write_context &ctx, rocksdb::SliceParts skey_parts(&skey, 1); rocksdb::SliceParts svalue = _value_generator->generate_value( _pegasus_data_version, value, db_expire_ts(expire_sec), new_timetag); - rocksdb::Status s = _write_batch->Put(skey_parts, svalue); + rocksdb::Status s = _write_batch->Put(_data_cf, skey_parts, svalue); if (dsn_unlikely(!s.ok())) { - ::dsn::blob hash_key, sort_key; + dsn::blob hash_key; + dsn::blob sort_key; pegasus_restore_key(::dsn::blob(raw_key.data(), 0, raw_key.size()), hash_key, sort_key); LOG_ERROR_ROCKSDB("WriteBatchPut", s.ToString(), @@ -199,9 +203,10 @@ int rocksdb_wrapper::write_batch_delete(int64_t decree, std::string_view raw_key FAIL_POINT_INJECT_F("db_write_batch_delete", [](std::string_view) -> int { return FAIL_DB_WRITE_BATCH_DELETE; }); - rocksdb::Status s = _write_batch->Delete(utils::to_rocksdb_slice(raw_key)); + rocksdb::Status s = _write_batch->Delete(_data_cf, utils::to_rocksdb_slice(raw_key)); if (dsn_unlikely(!s.ok())) { - dsn::blob hash_key, sort_key; + dsn::blob hash_key; + dsn::blob sort_key; pegasus_restore_key(dsn::blob(raw_key.data(), 0, raw_key.size()), hash_key, sort_key); LOG_ERROR_ROCKSDB("write_batch_delete", s.ToString(), @@ -223,7 +228,7 @@ int rocksdb_wrapper::ingest_files(int64_t decree, ifo.move_files = true; ifo.ingest_behind = ingest_behind; ifo.write_global_seqno = FLAGS_rocksdb_write_global_seqno; - rocksdb::Status s = _db->IngestExternalFile(sst_file_list, ifo); + rocksdb::Status s = _db->IngestExternalFile(_data_cf, sst_file_list, ifo); if (dsn_unlikely(!s.ok())) { LOG_ERROR_ROCKSDB("IngestExternalFile", s.ToString(), diff --git a/src/server/rocksdb_wrapper.h b/src/server/rocksdb_wrapper.h index c73f5cb918..f905dea936 100644 --- a/src/server/rocksdb_wrapper.h +++ b/src/server/rocksdb_wrapper.h @@ -81,6 +81,7 @@ class rocksdb_wrapper : public dsn::replication::replica_base std::unique_ptr _value_generator; std::unique_ptr _write_batch; std::unique_ptr _wt_opts; + rocksdb::ColumnFamilyHandle *_data_cf; rocksdb::ColumnFamilyHandle *_meta_cf; const uint32_t _pegasus_data_version;