From 19badf10f5d7bfb5d83e8b14b037b6f51b12e655 Mon Sep 17 00:00:00 2001 From: Ye Cao Date: Mon, 2 Sep 2024 14:02:35 +0800 Subject: [PATCH] Support to spill objects with LRU strategy by vineyardd itself. (#1983) Fixes https://github.com/v6d-io/v6d/issues/1972 Signed-off-by: Ye Cao Signed-off-by: vegetableysm Co-authored-by: vegetableysm --- python/client.cc | 16 ++ python/pybind11_docs.cc | 26 +++ python/pybind11_docs.h | 2 + python/vineyard/core/client.py | 10 + src/client/ds/blob.cc | 1 + src/client/ds/i_object.cc | 8 +- src/server/async/socket_server.cc | 38 ++- src/server/memory/usage.h | 142 +++++++++--- src/server/util/spill_file.cc | 8 - src/server/util/spill_file.h | 3 + test/concurrent_lru_spill_test.cc | 342 +++++++++++++++++++++++++++ test/lru_spill_test.cc | 373 ++++++++++++++++++++++++++++++ test/lru_test.cc | 2 +- test/release_test.cc | 19 +- test/runner.py | 34 +++ test/spill_test.cc | 14 +- 16 files changed, 967 insertions(+), 71 deletions(-) create mode 100644 test/concurrent_lru_spill_test.cc create mode 100644 test/lru_spill_test.cc diff --git a/python/client.cc b/python/client.cc index 797b7fe21..6f0f5fc62 100644 --- a/python/client.cc +++ b/python/client.cc @@ -826,6 +826,22 @@ void bind_client(py::module& mod) { throw_on_error(self->Fork(*client)); return client; }) + .def( + "release_object", + [](Client* self, ObjectIDWrapper const& object_id) { + self->Release(object_id); + }, + "object_id"_a, doc::IPCClient_release_object) + .def( + "release_objects", + [](Client* self, std::vector const& object_ids) { + std::vector unwrapped_object_ids(object_ids.size()); + for (size_t idx = 0; idx < object_ids.size(); ++idx) { + unwrapped_object_ids[idx] = object_ids[idx]; + } + return self->Release(object_ids); + }, + "object_ids"_a, doc::IPCClient_release_objects) .def("__enter__", [](Client* self) { return self; }) .def("__exit__", [](Client* self, py::object, py::object, py::object) { // DO NOTHING diff --git a/python/pybind11_docs.cc b/python/pybind11_docs.cc index 1c3cd43e4..792b08386 100644 --- a/python/pybind11_docs.cc +++ b/python/pybind11_docs.cc @@ -1102,6 +1102,32 @@ Find the corresponding blob (if exists) of the given pointer. ObjectID )doc"; +const char* IPCClient_release_object = R"doc( +.. method:: release_object(object_id: ObjectID) -> None + :noindex: + +Release the object's ref count in the client side, which means the object can be +spilled to the disk from the server. + +Parameters: + object_id: ObjectID + The object id to release. + +)doc"; + +const char* IPCClient_release_objects = R"doc( +.. method:: release_objects(object_ids: List[ObjectID]) -> None + :noindex: + +Release the objects' ref count in the client side, which means these objects can be +spilled to the disk from the server. + +Parameters: + object_ids: List[ObjectID] + The object id list to release. + +)doc"; + const char* IPCClient_close = R"doc( Close the client. )doc"; diff --git a/python/pybind11_docs.h b/python/pybind11_docs.h index 070084320..68dad8d45 100644 --- a/python/pybind11_docs.h +++ b/python/pybind11_docs.h @@ -129,6 +129,8 @@ extern const char* IPCClient_list_metadatas; extern const char* IPCClient_allocated_size; extern const char* IPCClient_is_shared_memory; extern const char* IPCClient_find_shared_memory; +extern const char* IPCClient_release_object; +extern const char* IPCClient_release_objects; extern const char* IPCClient_close; extern const char* RPCClient; diff --git a/python/vineyard/core/client.py b/python/vineyard/core/client.py index dcc78dc15..2bb0b1c2f 100644 --- a/python/vineyard/core/client.py +++ b/python/vineyard/core/client.py @@ -622,6 +622,16 @@ def get_metas( metas.append(self.get_meta(object_id, sync_remote)) return metas + @_apply_docstring(IPCClient.release_object) + def release_object(self, object_id: ObjectID) -> None: + if self.has_ipc_client(): + self._ipc_client.release_object(object_id) + + @_apply_docstring(IPCClient.release_objects) + def release_objects(self, object_ids: List[ObjectID]) -> None: + if self.has_ipc_client(): + self._ipc_client.release_objects(object_ids) + @_apply_docstring(IPCClient.list_objects) def list_objects( self, pattern: str, regex: bool = False, limit: int = 5 diff --git a/src/client/ds/blob.cc b/src/client/ds/blob.cc index 057d76f7a..86d6eb02b 100644 --- a/src/client/ds/blob.cc +++ b/src/client/ds/blob.cc @@ -380,6 +380,7 @@ Status BufferSet::EmplaceBuffer(ObjectID const id, void BufferSet::Extend(BufferSet const& others) { for (auto const& kv : others.buffers_) { buffers_.emplace(kv.first, kv.second); + buffer_ids_.emplace(kv.first); } } diff --git a/src/client/ds/i_object.cc b/src/client/ds/i_object.cc index cfe116a6b..82083af83 100644 --- a/src/client/ds/i_object.cc +++ b/src/client/ds/i_object.cc @@ -17,6 +17,7 @@ limitations under the License. #include "client/client.h" #include "client/client_base.h" +#include "client/ds/blob.h" namespace vineyard { @@ -70,7 +71,12 @@ std::shared_ptr ObjectBuilder::Seal(Client& client) { Status ObjectBuilder::Seal(Client& client, std::shared_ptr& object) { RETURN_ON_ERROR(this->_Seal(client, object)); - return client.PostSeal(object->meta()); + auto const& meta = object->meta(); + RETURN_ON_ERROR(client.PostSeal(object->meta())); + for (auto const& buffer_id : meta.GetBufferSet()->AllBufferIds()) { + RETURN_ON_ERROR(client.Release(buffer_id)); + } + return Status::OK(); } std::shared_ptr ObjectBuilder::_Seal(Client& client) { diff --git a/src/server/async/socket_server.cc b/src/server/async/socket_server.cc index 5eede2cb0..95e26c616 100644 --- a/src/server/async/socket_server.cc +++ b/src/server/async/socket_server.cc @@ -447,7 +447,13 @@ bool SocketConnection::doCreateBuffers(const json& root) { for (auto const& size : sizes) { ObjectID object_id; std::shared_ptr object; - RESPONSE_ON_ERROR(bulk_store_->Create(size, object_id, object)); + Status status = bulk_store_->Create(size, object_id, object); + if (!status.ok()) { + for (auto const& object : objects) { + bulk_store_->Delete(object->id()); + } + RESPONSE_ON_ERROR(status); + } object_ids.emplace_back(object_id); objects.emplace_back(object); } @@ -570,6 +576,12 @@ bool SocketConnection::doGetBuffers(const json& root) { RESPONSE_ON_ERROR(bulk_store_->GetUnsafe(ids, unsafe, objects)); RESPONSE_ON_ERROR(bulk_store_->AddDependency( std::unordered_set(ids.begin(), ids.end()), this->getConnId())); + for (size_t i = 0; i < objects.size(); ++i) { + if (objects[i]->pointer == nullptr) { + RESPONSE_ON_ERROR( + bulk_store_->ReloadColdObject(ids[i], objects[i], false)); + } + } std::vector fd_to_send; for (auto object : objects) { @@ -679,6 +691,7 @@ bool SocketConnection::doCreateRemoteBuffer(const json& root) { ObjectID object_id; RESPONSE_ON_ERROR(bulk_store_->Create(size, object_id, object)); RESPONSE_ON_ERROR(bulk_store_->Seal(object_id)); + RESPONSE_ON_ERROR(bulk_store_->AddDependency(object_id, this->getConnId())); if (use_rdma) { std::string message_out; @@ -690,6 +703,8 @@ bool SocketConnection::doCreateRemoteBuffer(const json& root) { ReceiveRemoteBuffers( socket_, {object}, compress, [self, object](const Status& status) -> Status { + VINEYARD_DISCARD(self->bulk_store_->RemoveDependency( + object->object_id, self->getConnId())); std::string message_out; if (status.ok()) { WriteCreateBufferReply(object->object_id, object, -1, @@ -731,6 +746,7 @@ bool SocketConnection::doCreateRemoteBuffers(const json& root) { std::shared_ptr object; RESPONSE_ON_ERROR(bulk_store_->Create(size, object_id, object)); RESPONSE_ON_ERROR(bulk_store_->Seal(object_id)); + RESPONSE_ON_ERROR(bulk_store_->AddDependency(object_id, this->getConnId())); object_ids.emplace_back(object_id); objects.emplace_back(object); } @@ -746,6 +762,10 @@ bool SocketConnection::doCreateRemoteBuffers(const json& root) { ReceiveRemoteBuffers( socket_, objects, compress, [self, object_ids, objects](const Status& status) -> Status { + VINEYARD_DISCARD(self->bulk_store_->RemoveDependency( + std::unordered_set(object_ids.begin(), + object_ids.end()), + self->getConnId())); std::string message_out; if (status.ok()) { WriteCreateBuffersReply(object_ids, objects, std::vector{}, @@ -817,6 +837,9 @@ bool SocketConnection::doGetRemoteBuffers(const json& root) { self->UnlockTransmissionObjects(ids); return Status::OK(); }); + std::unordered_set ids_set(ids.begin(), ids.end()); + VINEYARD_DISCARD( + self->bulk_store_->RemoveDependency(ids_set, self->getConnId())); return Status::OK(); }); } else { @@ -1858,10 +1881,15 @@ bool SocketConnection::doReleaseBlobsWithRDMA(const json& root) { std::vector ids; TRY_READ_REQUEST(ReadReleaseBlobsWithRDMARequest, root, ids); - this->UnlockTransmissionObjects(ids); - std::string message_out; - WriteReleaseBlobsWithRDMAReply(message_out); - this->doWrite(message_out); + boost::asio::post(server_ptr_->GetIOContext(), [self, ids]() { + self->server_ptr_->UnlockTransmissionObjects(ids); + std::unordered_set id_set(ids.begin(), ids.end()); + VINEYARD_DISCARD( + self->bulk_store_->RemoveDependency(id_set, self->getConnId())); + std::string message_out; + WriteReleaseBlobsWithRDMAReply(message_out); + self->doWrite(message_out); + }); return false; } diff --git a/src/server/memory/usage.h b/src/server/memory/usage.h index 5d7cc21a2..33f4a1b81 100644 --- a/src/server/memory/usage.h +++ b/src/server/memory/usage.h @@ -24,6 +24,7 @@ limitations under the License. #include #include #include +#include #include "flat_hash_map/flat_hash_map.hpp" #include "libcuckoo/cuckoohash_map.hh" @@ -193,7 +194,7 @@ class ColdObjectTracker * - `Unref(ID id)` Remove the designated id from lru. * - `PopLeastUsed()` Get the least used blob id. If no object in structure, * then status will be Invalids. - * - `CheckExist(ID id)` Check the existence of id. + * - `CheckIsSpilled(ID id)` Check whether the object is spilled. */ class LRU { public: @@ -206,21 +207,21 @@ class ColdObjectTracker ~LRU() = default; void Ref(const ID id, const std::shared_ptr

& payload) { - std::lock_guard locked(mu_); - auto it = map_.find(id); - if (it == map_.end()) { - list_.emplace_front(id, payload); - map_.emplace(id, list_.begin()); + std::lock_guard locked(lru_field_mu_); + auto it = ref_list_iter_map_.find(id); + if (it == ref_list_iter_map_.end()) { + ref_list_.emplace_front(id, payload); + ref_list_iter_map_.emplace(id, ref_list_.begin()); } else { - list_.erase(it->second); - list_.emplace_front(id, payload); - it->second = list_.begin(); + ref_list_.erase(it->second); + ref_list_.emplace_front(id, payload); + it->second = ref_list_.begin(); } } - bool CheckExist(const ID id) const { - std::lock_guard locked(mu_); - return map_.find(id) != map_.end(); + bool CheckIsSpilled(const ID id) const { + std::lock_guard locked(lru_field_mu_); + return spilled_obj_.find(id) != spilled_obj_.end(); } /** @@ -234,9 +235,9 @@ class ColdObjectTracker */ Status Unref(const ID id, const bool fast_delete, const std::shared_ptr& bulk_store) { - std::lock_guard locked(mu_); - auto it = map_.find(id); - if (it == map_.end()) { + std::lock_guard locked(lru_field_mu_); + auto it = ref_list_iter_map_.find(id); + if (it == ref_list_iter_map_.end()) { auto spilled = spilled_obj_.find(id); if (spilled == spilled_obj_.end()) { return Status::OK(); @@ -245,6 +246,15 @@ class ColdObjectTracker // NB: explicitly copy the std::shared_ptr as the iterator is not // stable. auto payload = spilled->second; + payload->pointer = bulk_store->AllocateMemoryWithSpill( + payload->data_size, &payload->store_fd, &payload->map_size, + &payload->data_offset); + if (payload->pointer == nullptr) { + return Status::NotEnoughMemory( + "Failed to allocate memory of size " + + std::to_string(payload->data_size) + + " while unref spilling file"); + } RETURN_ON_ERROR(bulk_store->ReloadPayload(id, payload)); } else { RETURN_ON_ERROR(bulk_store->DeletePayloadFile(id)); @@ -252,19 +262,19 @@ class ColdObjectTracker spilled_obj_.erase(spilled); return Status::OK(); } else { - list_.erase(it->second); - map_.erase(it); + ref_list_.erase(it->second); + ref_list_iter_map_.erase(it); return Status::OK(); } } Status SpillFor(const size_t sz, const std::shared_ptr& bulk_store) { - std::lock_guard locked(mu_); + std::lock_guard locked(lru_field_mu_); size_t spilled_sz = 0; auto status = Status::OK(); - auto it = list_.rbegin(); + auto it = ref_list_.rbegin(); std::map> pinned_objects; - while (it != list_.rend()) { + while (it != ref_list_.rend()) { if (it->second->IsPinned()) { // bypass pinned pinned_objects.emplace(it->first, it->second); @@ -275,9 +285,9 @@ class ColdObjectTracker auto s = this->spill(it->first, it->second, bulk_store); if (s.ok()) { spilled_sz += it->second->data_size; - map_.erase(it->first); + ref_list_iter_map_.erase(it->first); } else if (s.IsObjectSpilled()) { - map_.erase(it->first); + ref_list_iter_map_.erase(it->first); } else { status += s; break; @@ -287,9 +297,9 @@ class ColdObjectTracker break; } } - auto popped_size = std::distance(list_.rbegin(), it); + auto popped_size = std::distance(ref_list_.rbegin(), it); while (popped_size-- > 0) { - list_.pop_back(); + ref_list_.pop_back(); } // restore pinned objects for (auto const& item : pinned_objects) { @@ -307,7 +317,7 @@ class ColdObjectTracker Status SpillObjects( const std::map>& objects, const std::shared_ptr& bulk_store) { - std::lock_guard locked(mu_); + std::lock_guard locked(lru_field_mu_); auto status = Status::OK(); for (auto const& item : objects) { if (item.second->IsPinned()) { @@ -322,7 +332,7 @@ class ColdObjectTracker Status ReloadObjects( const std::map>& objects, const bool pin, const std::shared_ptr& bulk_store) { - std::lock_guard locked(mu_); + std::lock_guard locked(lru_field_mu_); auto status = Status::OK(); for (auto const& item : objects) { status += this->reload(item.first, item.second, pin, bulk_store); @@ -330,8 +340,16 @@ class ColdObjectTracker return status; } + Status ReloadObject(const ObjectID& object_id, + const std::shared_ptr& payload, const bool pin, + const std::shared_ptr& bulk_store) { + std::lock_guard locked(lru_field_mu_); + auto status = this->reload(object_id, payload, pin, bulk_store); + return status; + } + bool CheckSpilled(const ID& id) { - std::lock_guard locked(mu_); + std::lock_guard locked(lru_field_mu_); return spilled_obj_.find(id) != spilled_obj_.end(); } @@ -339,7 +357,7 @@ class ColdObjectTracker Status spill(const ObjectID object_id, const std::shared_ptr& payload, const std::shared_ptr& bulk_store) { - std::lock_guard locked(mu_); + std::lock_guard locked(lru_field_mu_); if (payload->is_spilled) { return Status::ObjectSpilled(object_id); } @@ -350,7 +368,7 @@ class ColdObjectTracker Status reload(const ObjectID object_id, const std::shared_ptr& payload, const bool pin, const std::shared_ptr& bulk_store) { - std::lock_guard locked(mu_); + std::lock_guard locked(lru_field_mu_); if (pin) { payload->Pin(); } @@ -366,11 +384,12 @@ class ColdObjectTracker return bulk_store->ReloadPayload(object_id, payload); } - mutable std::recursive_mutex mu_; - // protected by mu_ - lru_map_t map_; - lru_list_t list_; + mutable std::recursive_mutex lru_field_mu_; + // protected by lru_field_mu_ + lru_map_t ref_list_iter_map_; + lru_list_t ref_list_; ska::flat_hash_map> spilled_obj_; + friend class ColdObjectTracker; }; public: @@ -426,11 +445,25 @@ class ColdObjectTracker return Status::OK(); } + /** + * @brief Add a blob list to the cold object list + */ + Status MarkAsCold(const std::vector& ids, + const std::vector>& payloads) { + for (size_t i = 0; i < payloads.size(); i++) { + const std::shared_ptr

& payload = payloads[i]; + if (payload->IsSealed()) { + cold_obj_lru_.Ref(ids[i], payload); + } + } + return Status::OK(); + } + /** * @brief check if a blob is in-use. Return true if it is in-use. */ Status IsInUse(const ID id, bool& is_in_use) { - if (cold_obj_lru_.CheckExist(id)) { + if (cold_obj_lru_.CheckIsSpilled(id)) { is_in_use = false; } else { is_in_use = true; @@ -479,6 +512,33 @@ class ColdObjectTracker return cold_obj_lru_.SpillObjects(objects, shared_from_self()); } + /** + * @brief Triggered when been requested to spill specified object to disk. + * @param object reloaded blob + */ + Status ReloadColdObject(const ObjectID& object_id, + const std::shared_ptr& payload, + const bool pin) { + if (spill_path_.empty() || payload->data_size == 0) { + return Status::OK(); // bypass, as spill is not enabled + } + payload->pointer = + AllocateMemoryWithSpill(payload->data_size, &payload->store_fd, + &payload->map_size, &payload->data_offset); + if (payload->pointer == nullptr) { + return Status::NotEnoughMemory("Failed to allocate memory of size " + + std::to_string(payload->data_size) + + " while reload spilling file"); + } + Status status = + cold_obj_lru_.ReloadObject(object_id, payload, pin, shared_from_self()); + if (!status.ok()) { + BulkAllocator::Free(payload->pointer, payload->data_size); + payload->pointer = nullptr; + } + return status; + } + /** * @brief Triggered when been requested to spill specified objects to disk. * @param objects reloaded blobs @@ -503,6 +563,14 @@ class ColdObjectTracker */ uint8_t* AllocateMemoryWithSpill(const size_t size, int* fd, int64_t* map_size, ptrdiff_t* offset) { + /* + * FIXME: + * Because the sequence of getting spill lock and lru mu_ lock is + * non-deterministic, we use the same lock to protect both of them + * to avoid deadlock. Maybe there exists a better solution. + */ + std::lock_guardcold_obj_lru_.lru_field_mu_)> locked( + this->cold_obj_lru_.lru_field_mu_); uint8_t* pointer = nullptr; pointer = self().AllocateMemory(size, fd, map_size, offset); // no spill will be conducted @@ -516,19 +584,17 @@ class ColdObjectTracker // 2. memory usage is above upper bound if (pointer == nullptr || BulkAllocator::Allocated() >= self().mem_spill_upper_bound_) { - std::unique_lock locked(spill_mu_); - int64_t min_spill_size = 0; if (pointer == nullptr) { min_spill_size = size - (BulkAllocator::GetFootprintLimit() - BulkAllocator::Allocated()); } + if (BulkAllocator::Allocated() > self().mem_spill_lower_bound_) { min_spill_size = std::max(min_spill_size, BulkAllocator::Allocated() - self().mem_spill_lower_bound_); } - auto s = SpillColdObjectFor(min_spill_size); if (!s.ok()) { DLOG(ERROR) << "Error during spilling cold object: " << s.ToString(); @@ -581,6 +647,7 @@ class ColdObjectTracker io::SpillFileReader reader(spill_path_); RETURN_ON_ERROR(reader.Read(payload, shared_from_self())); } + payload->is_spilled = false; return this->DeletePayloadFile(id); } @@ -617,7 +684,6 @@ class ColdObjectTracker lru_t cold_obj_lru_; std::string spill_path_; - std::mutex spill_mu_; }; } // namespace detail diff --git a/src/server/util/spill_file.cc b/src/server/util/spill_file.cc index b7ab68348..cc24197fd 100644 --- a/src/server/util/spill_file.cc +++ b/src/server/util/spill_file.cc @@ -96,14 +96,6 @@ Status SpillFileReader::Read(const std::shared_ptr& payload, ObjectIDToString(payload->object_id)); } } - payload->pointer = bulk_store->AllocateMemoryWithSpill( - payload->data_size, &(payload->store_fd), &(payload->map_size), - &(payload->data_offset)); - if (payload->pointer == nullptr) { - return Status::NotEnoughMemory("Failed to allocate memory of size " + - std::to_string(payload->data_size) + - " while reload spilling file"); - } RETURN_ON_ERROR(io_adaptor_->Read(payload->pointer, payload->data_size)); RETURN_ON_ERROR(Delete_(payload->object_id)); io_adaptor_ = nullptr; diff --git a/src/server/util/spill_file.h b/src/server/util/spill_file.h index 7d26a2547..bbf3a2e32 100644 --- a/src/server/util/spill_file.h +++ b/src/server/util/spill_file.h @@ -17,7 +17,10 @@ limitations under the License. #define SRC_SERVER_UTIL_SPILL_FILE_H_ #include +#include +#include #include +#include #include "common/memory/payload.h" #include "common/util/arrow.h" diff --git a/test/concurrent_lru_spill_test.cc b/test/concurrent_lru_spill_test.cc new file mode 100644 index 000000000..59624d155 --- /dev/null +++ b/test/concurrent_lru_spill_test.cc @@ -0,0 +1,342 @@ +/** Copyright 2020-2023 Alibaba Group Holding Limited. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +#include +#include + +#include "basic/ds/array.h" +#include "client/client.h" +#include "client/ds/object_meta.h" +#include "client/rpc_client.h" +#include "common/util/logging.h" +#include "common/util/status.h" +#include "common/util/uuid.h" + +using namespace vineyard; // NOLINT +using namespace std; // NOLINT + +constexpr double delta = 1E-10; + +template +ObjectID GetObjectID(const std::shared_ptr>& sealed_array) { + return ObjectIDFromString(sealed_array->meta() + .MetaData()["buffer_"]["id"] + .template get_ref()); +} + +template +vector InitArray(int size, std::function init_func) { + std::vector array; + array.resize(size); + for (int i = 0; i < size; i++) { + array[i] = init_func(i); + } + return array; +} + +void ConcurrentPutWithClient(std::string ipc_socket) { + const int array_size = 250; + const int num_objects = 500; + const int num_threads = 10; + auto create_and_seal_array = [&](Client& c) { + auto double_array = InitArray(array_size, [](int i) { return i; }); + ArrayBuilder builder(c, double_array); + auto sealed_array = + std::dynamic_pointer_cast>(builder.Seal(c)); + }; + + Client client; + VINEYARD_CHECK_OK(client.Connect(ipc_socket)); + + auto worker = [&]() { + Client client; + VINEYARD_CHECK_OK(client.Connect(ipc_socket)); + for (int i = 0; i < num_objects; ++i) { + create_and_seal_array(client); + } + }; + std::vector threads; + for (int i = 0; i < num_threads; ++i) { + threads.emplace_back(worker); + } + + for (auto& thread : threads) { + thread.join(); + } + + VINEYARD_CHECK_OK(client.Clear()); +} + +void ConcurrentGetWithClient(std::string ipc_socket) { + const int array_size = 250; + const int num_objects = 500; + const int num_threads = 10; + auto create_and_seal_array = [&](Client& c) { + auto double_array = InitArray(array_size, [](int i) { return i; }); + ArrayBuilder builder(c, double_array); + auto sealed_array = + std::dynamic_pointer_cast>(builder.Seal(c)); + return GetObjectID(sealed_array); + }; + Client client; + VINEYARD_CHECK_OK(client.Connect(ipc_socket)); + + std::vector objects; + for (int i = 0; i < num_objects * num_threads; i++) { + objects.push_back(create_and_seal_array(client)); + } + + auto worker = [&](std::vector ids) { + Client client; + VINEYARD_CHECK_OK(client.Connect(ipc_socket)); + for (int i = 0; i < num_objects * num_threads; ++i) { + std::shared_ptr object; + ObjectID id = ids[i]; + VINEYARD_CHECK_OK(client.GetObject(id, object)); + } + }; + std::vector threads; + for (int i = 0; i < num_threads; ++i) { + threads.emplace_back(worker, objects); + } + + for (auto& thread : threads) { + thread.join(); + } + VINEYARD_CHECK_OK(client.Clear()); +} + +void ConcurrentPutWithRPCClient(std::string rpc_endpoint) { + const int array_size = 250; + const int num_objects = 500; + const int num_threads = 10; + + auto double_array = InitArray(array_size, [](int i) { return i; }); + auto create_remote_blob = [&](RPCClient& c) { + auto remote_blob_writer = std::make_shared( + double_array.size() * sizeof(double)); + std::memcpy(remote_blob_writer->data(), double_array.data(), + double_array.size() * sizeof(double)); + ObjectMeta meta; + VINEYARD_CHECK_OK(c.CreateRemoteBlob(remote_blob_writer, meta)); + }; + + RPCClient rpc_client; + VINEYARD_CHECK_OK(rpc_client.Connect(rpc_endpoint)); + + auto worker = [&]() { + RPCClient rpc_client; + VINEYARD_CHECK_OK(rpc_client.Connect(rpc_endpoint)); + for (int i = 0; i < num_objects; ++i) { + create_remote_blob(rpc_client); + } + }; + std::vector threads; + for (int i = 0; i < num_threads; ++i) { + threads.emplace_back(worker); + } + + for (auto& thread : threads) { + thread.join(); + } + + VINEYARD_CHECK_OK(rpc_client.Clear()); +} + +void ConcurrentGetWithRPCClient(std::string rpc_endpoint) { + const int array_size = 250; + const int num_objects = 500; + const int num_threads = 10; + + auto double_array = InitArray(array_size, [](int i) { return i; }); + auto create_remote_blob = [&](RPCClient& c) { + auto remote_blob_writer = std::make_shared( + double_array.size() * sizeof(double)); + std::memcpy(remote_blob_writer->data(), double_array.data(), + double_array.size() * sizeof(double)); + ObjectMeta meta; + VINEYARD_CHECK_OK(c.CreateRemoteBlob(remote_blob_writer, meta)); + return meta.GetId(); + }; + + RPCClient rpc_client; + VINEYARD_CHECK_OK(rpc_client.Connect(rpc_endpoint)); + + std::vector objects; + for (int i = 0; i < num_objects * num_threads; i++) { + objects.push_back(create_remote_blob(rpc_client)); + } + auto worker = [&](std::vector ids) { + RPCClient rpc_client; + VINEYARD_CHECK_OK(rpc_client.Connect(rpc_endpoint)); + for (int i = 0; i < num_objects * num_threads; ++i) { + ObjectID id = ids[i]; + rpc_client.GetObject(id); + } + }; + std::vector threads; + for (int i = 0; i < num_threads; ++i) { + threads.emplace_back(worker, objects); + } + + for (auto& thread : threads) { + thread.join(); + } + + VINEYARD_CHECK_OK(rpc_client.Clear()); +} + +void ConcurrentGetAndPut(std::string ipc_socket, std::string rpc_endpoint) { + const int num_threads = 20; + const int num_objects = 500; + const int array_size = 250; + const int initial_objects = 100; + + std::vector object_ids; + std::mutex object_ids_mutex; + + auto create_and_seal_array = [&](Client& c) { + try { + auto double_array = InitArray( + array_size, [](int i) { return static_cast(i); }); + ArrayBuilder builder(c, double_array); + auto sealed_array = + std::dynamic_pointer_cast>(builder.Seal(c)); + return GetObjectID(sealed_array); + } catch (std::exception& e) { + LOG(ERROR) << e.what(); + return InvalidObjectID(); + } + }; + + auto create_remote_blob = [&](RPCClient& c) { + try { + auto double_array = InitArray( + array_size, [](int i) { return static_cast(i); }); + auto remote_blob_writer = std::make_shared( + double_array.size() * sizeof(double)); + std::memcpy(remote_blob_writer->data(), double_array.data(), + double_array.size() * sizeof(double)); + ObjectMeta blob_meta; + VINEYARD_CHECK_OK(c.CreateRemoteBlob(remote_blob_writer, blob_meta)); + return blob_meta.GetId(); + } catch (std::exception& e) { + LOG(ERROR) << e.what(); + return InvalidObjectID(); + } + }; + + Client client; + VINEYARD_CHECK_OK(client.Connect(ipc_socket)); + for (int i = 0; i < initial_objects; i++) { + object_ids.push_back(create_and_seal_array(client)); + } + + auto worker = [&](int id, std::vector object_ids) { + Client client; + VINEYARD_CHECK_OK(client.Connect(ipc_socket)); + RPCClient rpc_client; + VINEYARD_CHECK_OK(rpc_client.Connect(rpc_endpoint)); + std::vector ids; + + for (int i = 0; i < num_objects; ++i) { + if (id % 2 == 0) { + if (i % 3 == 0) { + ObjectID new_id = create_and_seal_array(client); + if (new_id != InvalidObjectID()) { + VINEYARD_DISCARD(client.Release(new_id)); + } + + { + std::lock_guard lock(object_ids_mutex); + object_ids.push_back(new_id); + } + } else { + ObjectID id_to_get; + { + std::lock_guard lock(object_ids_mutex); + id_to_get = object_ids[rand() % object_ids.size()]; + } + if (id_to_get != InvalidObjectID()) { + std::shared_ptr object = client.GetObject(id_to_get); + VINEYARD_DISCARD(client.Release(object->id())); + } + } + } else { + if (i % 3 == 0) { + ObjectID new_id = create_remote_blob(rpc_client); + { + std::lock_guard lock(object_ids_mutex); + object_ids.push_back(new_id); + } + } else { + ObjectID id_to_get; + { + std::lock_guard lock(object_ids_mutex); + id_to_get = object_ids[rand() % object_ids.size()]; + } + if (id_to_get != InvalidObjectID()) { + std::shared_ptr object = rpc_client.GetObject(id_to_get); + } + } + } + } + client.Disconnect(); + rpc_client.Disconnect(); + }; + + std::vector threads; + for (int i = 0; i < num_threads; ++i) { + threads.emplace_back(worker, i, object_ids); + } + + for (auto& thread : threads) { + thread.join(); + } + + VINEYARD_CHECK_OK(client.Clear()); + client.Disconnect(); +} + +int main(int argc, char** argv) { + if (argc < 3) { + printf("usage ./concurrent_lru_spill_test "); + return 1; + } + std::string ipc_socket = std::string(argv[1]); + std::string rpc_endpoint = std::string(argv[2]); + + LOG(INFO) << "Start concurrent put test with IPCClient ..."; + ConcurrentPutWithClient(ipc_socket); + LOG(INFO) << "Passed concurrent put test with IPCClient"; + + LOG(INFO) << "Start concurrent get test with IPCClient ..."; + ConcurrentGetWithClient(ipc_socket); + LOG(INFO) << "Passed concurrent get test with IPCClient"; + + LOG(INFO) << "Start concurrent put test with RPCClient ..."; + ConcurrentPutWithRPCClient(rpc_endpoint); + LOG(INFO) << "Passed concurrent put test with RPCClient"; + + LOG(INFO) << "Start concurrent get test with RPCClient ..."; + ConcurrentGetWithRPCClient(rpc_endpoint); + LOG(INFO) << "Passed concurrent get test with RPCClient"; + + LOG(INFO) << "Start concurrent get and put test ..."; + ConcurrentGetAndPut(ipc_socket, rpc_endpoint); + LOG(INFO) << "Passed concurrent get and put test"; + + LOG(INFO) << "Passed concurrent lru spill tests ..."; +} diff --git a/test/lru_spill_test.cc b/test/lru_spill_test.cc new file mode 100644 index 000000000..19aa5a059 --- /dev/null +++ b/test/lru_spill_test.cc @@ -0,0 +1,373 @@ +/** Copyright 2020-2023 Alibaba Group Holding Limited. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +#include +#include + +#include "basic/ds/array.h" +#include "client/client.h" +#include "client/ds/object_meta.h" +#include "client/rpc_client.h" +#include "common/util/logging.h" +#include "common/util/status.h" +#include "common/util/uuid.h" + +using namespace vineyard; // NOLINT +using namespace std; // NOLINT + +constexpr double delta = 1E-10; + +template +ObjectID GetObjectID(const std::shared_ptr>& sealed_array) { + return ObjectIDFromString(sealed_array->meta() + .MetaData()["buffer_"]["id"] + .template get_ref()); +} + +template +vector InitArray(int size, std::function init_func) { + std::vector array; + array.resize(size); + for (int i = 0; i < size; i++) { + array[i] = init_func(i); + } + return array; +} + +/** + * @brief Test whether the LRU mechanism of spill works. + * + * @note Suppose the vineyard server memory is 8000 bytes, + * the spill_high_watermark is 0.8, and the spill_low_watermark is 0.5. + * In this test, we create 10 arrays, each of which has 250 doubles. + * So the vineyardd server can only hold 3 arrays at most, when it + * reaches the high watermark, it will spill the least recently used + * 3 arrays to the disk. + * The test steps are as follows: + * 1. IPCClient create: Array1, Array2 => (Array1, Array2) + * 2. RPCClient create: Array3, Array4 => (Array1, Array2, Array3, Array4) + * => Spill Array1, Array2. => (Array3, Array4) + * 3. RPCClient get: Array2. => (Array3, Array4, Array2) + * 4. IPCClient create: Array5 => (Array3, Array4, Array2, Array5) => + * Spill Array3, Array4. => (Array2, Array5) + * 5. IPCClient get: Array2 => (Array5, Array2) + * 6. IPCClient get: Array4 => (Array5, Array2, Array4) + * 7. RPCClient create: Array6 => (Array5, Array2, Array4, Array6) => + * Spill Array5, Array2. => (Array4, Array6) + * 8. RPCClient get: Array1 => (Array4, Array6, Array1) + * 9. RPCClient get: Array3 => (Array4, Array6, Array1, Array3) => Spill + * Array4, Array6 => (Array1, Array3) + * 10. RPCClient get: Array6 => (Array1, Array3, Array6) + * 11. IPCClient get: Array3, Array4 => (Array1, Array6, Array3, Array4) + * => Spill Array1, Array6. => (Array3, Array4) + * 12. RPCClient create: Array7 => (Array3, Array4, Array7) + * 13. IPCClient create: Array8, Array9, Array10 => (Array3, Array4, + * Array7, Array8) => Spill Array3, Array4. => (Array7, Array8) => (Array7, + * Array8, Array9, Array10) => Spill Array7, Array8. => (Array9, Array10) + * 14. RPCClient get: Array5, Array3, Array1 => (Array9, Array10, Array5, + * Array3) => Spill Array9, Array10. => (Array5, Array3, Array1) + * 15. IPCClient get: Array2, Array4, Array6 => (Array5, Array3, Array1, + * Array2) => Spill Array5, Array3. => (Array1, Array2, Array4, Array6) => Spill + * Array1, Array2. => (Array4, Array6) + * + */ + +void LRUTest(Client& client, RPCClient& rpc_client) { + auto double_array = InitArray(250, [](int i) { return i; }); + /* step1: IPCClient create: Array1, Array2 => (Array1, Array2) */ + ArrayBuilder builder1(client, double_array); + auto sealed_double_array1 = + std::dynamic_pointer_cast>(builder1.Seal(client)); + VINEYARD_CHECK_OK(client.Release(sealed_double_array1->id())); + + ArrayBuilder builder2(client, double_array); + auto sealed_double_array2 = + std::dynamic_pointer_cast>(builder2.Seal(client)); + VINEYARD_CHECK_OK(client.Release(sealed_double_array2->id())); + + /* step2: RPCClient create: Array3, Array4 */ + auto remote_blob_writer3 = + std::make_shared(double_array.size() * sizeof(double)); + std::memcpy(remote_blob_writer3->data(), double_array.data(), + double_array.size() * sizeof(double)); + ObjectMeta blob_meta3; + VINEYARD_CHECK_OK( + rpc_client.CreateRemoteBlob(remote_blob_writer3, blob_meta3)); + + auto remote_blob_writer4 = + std::make_shared(double_array.size() * sizeof(double)); + std::memcpy(remote_blob_writer4->data(), double_array.data(), + double_array.size() * sizeof(double)); + ObjectMeta blob_meta4; + VINEYARD_CHECK_OK( + rpc_client.CreateRemoteBlob(remote_blob_writer4, blob_meta4)); + + // (Array1, Array2, Array3, Array4) => Spill Array1, Array2. => (Array3, + // Array4) + bool is_spilled{false}; + VINEYARD_CHECK_OK( + client.IsSpilled(GetObjectID(sealed_double_array1), is_spilled)); + CHECK(is_spilled); + + is_spilled = false; + VINEYARD_CHECK_OK( + client.IsSpilled(GetObjectID(sealed_double_array2), is_spilled)); + CHECK(is_spilled); + + /* step3: RPCClient get: Array2. => (Array3, Array4, Array2) */ + std::shared_ptr object; + VINEYARD_CHECK_OK(rpc_client.GetObject(sealed_double_array2->id(), object)); + + /* step4: IPCClient create: Array5 */ + ArrayBuilder builder5(client, double_array); + auto sealed_double_array5 = + std::dynamic_pointer_cast>(builder5.Seal(client)); + VINEYARD_CHECK_OK(client.Release(sealed_double_array5->id())); + + // (Array3, Array4, Array2, Array5) => Spill Array3, Array4. => (Array2, + // Array5) + is_spilled = false; + VINEYARD_CHECK_OK(client.IsSpilled(blob_meta3.GetId(), is_spilled)); + CHECK(is_spilled); + + is_spilled = false; + VINEYARD_CHECK_OK(client.IsSpilled(blob_meta4.GetId(), is_spilled)); + CHECK(is_spilled); + + is_spilled = false; + VINEYARD_CHECK_OK( + client.IsSpilled(GetObjectID(sealed_double_array5), is_spilled)); + CHECK(!is_spilled); + + is_spilled = false; + VINEYARD_CHECK_OK( + client.IsSpilled(GetObjectID(sealed_double_array2), is_spilled)); + CHECK(!is_spilled); + + /* step5: IPCClient get: Array2 => (Array5, Array2) */ + auto obj = client.GetObject(sealed_double_array2->id()); + VINEYARD_CHECK_OK(client.Release(sealed_double_array2->id())); + CHECK(obj != nullptr); + + /* step6: IPCClient get: Array4 => (Array5, Array2, Array4) */ + obj = client.GetObject(blob_meta4.GetId()); + VINEYARD_CHECK_OK(client.Release(blob_meta4.GetId())); + CHECK(obj != nullptr); + + /* step7: RPCClient create: Array6 */ + auto remote_blob_writer6 = + std::make_shared(double_array.size() * sizeof(double)); + std::memcpy(remote_blob_writer6->data(), double_array.data(), + double_array.size() * sizeof(double)); + ObjectMeta blob_meta6; + VINEYARD_CHECK_OK( + rpc_client.CreateRemoteBlob(remote_blob_writer6, blob_meta6)); + + // (Array5, Array2, Array4, Array6) => Spill Array5, Array2. => (Array4, + // Array6) + is_spilled = false; + VINEYARD_CHECK_OK( + client.IsSpilled(GetObjectID(sealed_double_array5), is_spilled)); + CHECK(is_spilled); + + is_spilled = false; + VINEYARD_CHECK_OK( + client.IsSpilled(GetObjectID(sealed_double_array2), is_spilled)); + CHECK(is_spilled); + + is_spilled = false; + VINEYARD_CHECK_OK(client.IsSpilled(blob_meta4.GetId(), is_spilled)); + CHECK(!is_spilled); + + is_spilled = false; + VINEYARD_CHECK_OK(client.IsSpilled(blob_meta6.GetId(), is_spilled)); + CHECK(!is_spilled); + + /* step8: RPCClient get: Array1 => (Array4, Array6, Array1) */ + VINEYARD_CHECK_OK(rpc_client.GetObject(sealed_double_array1->id(), object)); + + /* step9: RPCClient get: Array3 */ + VINEYARD_CHECK_OK(rpc_client.GetObject(blob_meta3.GetId(), object)); + + // (Array4, Array6, Array1, Array3) => Spill Array4, Array6 => (Array1, + // Array3) + is_spilled = false; + VINEYARD_CHECK_OK(client.IsSpilled(blob_meta4.GetId(), is_spilled)); + CHECK(is_spilled); + + is_spilled = false; + VINEYARD_CHECK_OK(client.IsSpilled(blob_meta6.GetId(), is_spilled)); + CHECK(is_spilled); + + /* step10: RPCClient get: Array6 => (Array1, Array3, Array6) */ + VINEYARD_CHECK_OK(rpc_client.GetObject(blob_meta6.GetId(), object)); + + /* step11: IPCClient get: Array3, Array4 */ + obj = client.GetObject(blob_meta3.GetId()); + VINEYARD_CHECK_OK(client.Release(blob_meta3.GetId())); + CHECK(obj != nullptr); + + obj = client.GetObject(blob_meta4.GetId()); + VINEYARD_CHECK_OK(client.Release(blob_meta4.GetId())); + CHECK(obj != nullptr); + + // (Array1, Array6, Array3, Array4) => Spill Array1, Array6. => (Array3, + // Array4) + is_spilled = false; + VINEYARD_CHECK_OK( + client.IsSpilled(GetObjectID(sealed_double_array1), is_spilled)); + CHECK(is_spilled); + + is_spilled = false; + VINEYARD_CHECK_OK(client.IsSpilled(blob_meta6.GetId(), is_spilled)); + CHECK(is_spilled); + + /* step12: RPCClient create: Array7 */ + auto remote_blob_writer7 = + std::make_shared(double_array.size() * sizeof(double)); + std::memcpy(remote_blob_writer7->data(), double_array.data(), + double_array.size() * sizeof(double)); + ObjectMeta blob_meta7; + VINEYARD_CHECK_OK( + rpc_client.CreateRemoteBlob(remote_blob_writer7, blob_meta7)); + + // (Array3, Array4) => (Array3, Array4, Array7) + is_spilled = false; + VINEYARD_CHECK_OK(client.IsSpilled(blob_meta3.GetId(), is_spilled)); + CHECK(!is_spilled); + + is_spilled = false; + VINEYARD_CHECK_OK(client.IsSpilled(blob_meta4.GetId(), is_spilled)); + CHECK(!is_spilled); + + is_spilled = false; + VINEYARD_CHECK_OK(client.IsSpilled(blob_meta7.GetId(), is_spilled)); + CHECK(!is_spilled); + + /* step13: IPCClient create: Array8, Array9, Array10 */ + ArrayBuilder builder8(client, double_array); + auto sealed_double_array8 = + std::dynamic_pointer_cast>(builder8.Seal(client)); + VINEYARD_CHECK_OK(client.Release(sealed_double_array8->id())); + + ArrayBuilder builder9(client, double_array); + auto sealed_double_array9 = + std::dynamic_pointer_cast>(builder9.Seal(client)); + VINEYARD_CHECK_OK(client.Release(sealed_double_array9->id())); + + ArrayBuilder builder10(client, double_array); + auto sealed_double_array10 = + std::dynamic_pointer_cast>(builder10.Seal(client)); + VINEYARD_CHECK_OK(client.Release(sealed_double_array10->id())); + // (Array3, Array4, Array7, Array8) => Spill Array3, Array4. => (Array7, + // Array8) (Array7, Array8, Array9, Array10) => Spill Array7, Array8. => + // (Array9, Array10) + is_spilled = false; + VINEYARD_CHECK_OK(client.IsSpilled(blob_meta3.GetId(), is_spilled)); + CHECK(is_spilled); + + is_spilled = false; + VINEYARD_CHECK_OK(client.IsSpilled(blob_meta4.GetId(), is_spilled)); + CHECK(is_spilled); + + is_spilled = false; + VINEYARD_CHECK_OK(client.IsSpilled(blob_meta7.GetId(), is_spilled)); + CHECK(is_spilled); + + is_spilled = false; + VINEYARD_CHECK_OK( + client.IsSpilled(GetObjectID(sealed_double_array8), is_spilled)); + CHECK(is_spilled); + + /* step14: RPCClient get: Array5, Array3, Array1 */ + VINEYARD_CHECK_OK(rpc_client.GetObject(sealed_double_array5->id(), object)); + + VINEYARD_CHECK_OK(rpc_client.GetObject(blob_meta3.GetId(), object)); + + VINEYARD_CHECK_OK(rpc_client.GetObject(sealed_double_array1->id(), object)); + // (Array9, Array10, Array5, Array3) => Spill Array9, Array10 => (Array5, + // Array3) (Array5, Array3) => (Array5, Array3, Array1) + is_spilled = false; + VINEYARD_CHECK_OK( + client.IsSpilled(GetObjectID(sealed_double_array9), is_spilled)); + CHECK(is_spilled); + + is_spilled = false; + VINEYARD_CHECK_OK( + client.IsSpilled(GetObjectID(sealed_double_array10), is_spilled)); + CHECK(is_spilled); + + /* step15: IPCClient get: Array2, Array4, Array6 */ + obj = client.GetObject(sealed_double_array2->id()); + VINEYARD_CHECK_OK(client.Release(sealed_double_array2->id())); + CHECK(obj != nullptr); + + obj = client.GetObject(blob_meta4.GetId()); + VINEYARD_CHECK_OK(client.Release(blob_meta4.GetId())); + CHECK(obj != nullptr); + + obj = client.GetObject(blob_meta6.GetId()); + VINEYARD_CHECK_OK(client.Release(blob_meta6.GetId())); + CHECK(obj != nullptr); + + // (Array5, Array3, Array1, Array2) => Spill Array5, Array3. => (Array1, + // Array2) + is_spilled = false; + VINEYARD_CHECK_OK( + client.IsSpilled(GetObjectID(sealed_double_array5), is_spilled)); + CHECK(is_spilled); + + is_spilled = false; + VINEYARD_CHECK_OK(client.IsSpilled(blob_meta3.GetId(), is_spilled)); + CHECK(is_spilled); + + // (Array1, Array2, Array4, Array6) => Spill Array1, Array2. => (Array4, + // Array6) + is_spilled = false; + VINEYARD_CHECK_OK( + client.IsSpilled(GetObjectID(sealed_double_array1), is_spilled)); + CHECK(is_spilled); + + is_spilled = false; + VINEYARD_CHECK_OK( + client.IsSpilled(GetObjectID(sealed_double_array2), is_spilled)); + CHECK(is_spilled); +} + +int main(int argc, char** argv) { + if (argc < 3) { + printf("usage ./lru_spill_test "); + return 1; + } + std::string ipc_socket = std::string(argv[1]); + std::string rpc_endpoint = std::string(argv[2]); + + Client client; + VINEYARD_CHECK_OK(client.Connect(ipc_socket)); + LOG(INFO) << "Connected to IPCServer: " << ipc_socket; + + RPCClient rpc_client; + VINEYARD_CHECK_OK(rpc_client.Connect(rpc_endpoint)); + LOG(INFO) << "Connected to RPCServer: " << rpc_endpoint; + + LRUTest(client, rpc_client); + VINEYARD_CHECK_OK(client.Clear()); + VINEYARD_CHECK_OK(rpc_client.Clear()); + + client.Disconnect(); + rpc_client.Disconnect(); + + LOG(INFO) << "Passed lru spill tests ..."; +} diff --git a/test/lru_test.cc b/test/lru_test.cc index 094b16fac..05519d45c 100644 --- a/test/lru_test.cc +++ b/test/lru_test.cc @@ -42,7 +42,7 @@ void BasicTest() { } { for (int i = 0; i < 1000; i++) { - CHECK(lru_.CheckExist(i)); + CHECK(!lru_.CheckIsSpilled(i)); } } } diff --git a/test/release_test.cc b/test/release_test.cc index b09d6d180..9a4cb9fb6 100644 --- a/test/release_test.cc +++ b/test/release_test.cc @@ -66,9 +66,9 @@ int main(int argc, char** argv) { bool is_in_use{false}; VINEYARD_CHECK_OK(client1.IsInUse(blob_id, is_in_use)); CHECK(is_in_use); - VINEYARD_CHECK_OK(client1.Release({id, blob_id})); + VINEYARD_CHECK_OK(client1.Release({id})); VINEYARD_CHECK_OK(client1.IsInUse(blob_id, is_in_use)); - CHECK(!is_in_use); + CHECK(is_in_use); } { // single client @@ -80,12 +80,9 @@ int main(int argc, char** argv) { auto blob = client1.GetObject(blob_id); CHECK(blob != nullptr); - VINEYARD_CHECK_OK(client1.Release({id})); + VINEYARD_CHECK_OK(client1.Release({obj->id()})); VINEYARD_CHECK_OK(client1.IsInUse(blob_id, is_in_use)); CHECK(is_in_use); - VINEYARD_CHECK_OK(client1.Release({blob_id})); - VINEYARD_CHECK_OK(client1.IsInUse(blob_id, is_in_use)); - CHECK(!is_in_use); } { // multiple clients @@ -93,13 +90,13 @@ int main(int argc, char** argv) { CHECK(blob1 != nullptr); auto blob2 = client2.GetObject(blob_id); CHECK(blob2 != nullptr); - VINEYARD_CHECK_OK(client1.Release({blob_id})); + VINEYARD_CHECK_OK(client1.Release({blob1->id()})); bool is_in_use{false}; VINEYARD_CHECK_OK(client1.IsInUse(blob_id, is_in_use)); CHECK(is_in_use); - VINEYARD_CHECK_OK(client2.Release({blob_id})); + VINEYARD_CHECK_OK(client2.Release({blob2->id()})); VINEYARD_CHECK_OK(client2.IsInUse(blob_id, is_in_use)); - CHECK(!is_in_use); + CHECK(is_in_use); } { // diamond reference count @@ -122,7 +119,7 @@ int main(int argc, char** argv) { CHECK(is_in_use); VINEYARD_CHECK_OK(client1.Release({copy_id})); VINEYARD_CHECK_OK(client1.IsInUse(blob_id, is_in_use)); - CHECK(!is_in_use); + CHECK(is_in_use); } { // auto release in disconnection @@ -134,7 +131,7 @@ int main(int argc, char** argv) { client1.Disconnect(); sleep(5); VINEYARD_CHECK_OK(client2.IsInUse(blob_id, is_in_use)); - CHECK(!is_in_use); + CHECK(is_in_use); } LOG(INFO) << "Passed release tests..."; diff --git a/test/runner.py b/test/runner.py index bc42c23f9..a02d4223b 100755 --- a/test/runner.py +++ b/test/runner.py @@ -538,6 +538,36 @@ def run_vineyard_spill_tests(meta, allocator, endpoints, tests): run_test(tests, 'spill_test') +def run_vineyard_lru_spill_tests(meta, allocator, endpoints, tests): + meta_prefix = 'vineyard_test_%s' % time.time() + metadata_settings = make_metadata_settings(meta, endpoints, meta_prefix) + with start_vineyardd( + metadata_settings, + ['--allocator', allocator], + size=8000, + default_ipc_socket=VINEYARD_CI_IPC_SOCKET, + spill_path='/tmp/spill_path', + spill_upper_rate=0.8, + spill_lower_rate=0.5, + ) as (_, rpc_socket_port): + run_test(tests, 'lru_spill_test', '127.0.0.1:%d' % rpc_socket_port) + + +def run_vineyard_concurrent_lru_spill_tests(meta, allocator, endpoints, tests): + meta_prefix = 'vineyard_test_%s' % time.time() + metadata_settings = make_metadata_settings(meta, endpoints, meta_prefix) + with start_vineyardd( + metadata_settings, + ['--allocator', allocator], + size="10Mi", + default_ipc_socket=VINEYARD_CI_IPC_SOCKET, + spill_path='/tmp/spill_path', + spill_upper_rate=0.8, + spill_lower_rate=0.5, + ) as (_, rpc_socket_port): + run_test(tests, 'concurrent_lru_spill_test', '127.0.0.1:%d' % rpc_socket_port) + + def run_etcd_member_tests(meta, allocator, endpoints, tests): """ Here we start 2 vineyard instances, @@ -1180,6 +1210,10 @@ def execute_tests(args): if args.with_cpp: run_vineyard_cpp_tests(args.meta, args.allocator, endpoints, args.tests) run_vineyard_spill_tests(args.meta, args.allocator, endpoints, args.tests) + run_vineyard_lru_spill_tests(args.meta, args.allocator, endpoints, args.tests) + run_vineyard_concurrent_lru_spill_tests( + args.meta, args.allocator, endpoints, args.tests + ) if args.with_graph: run_graph_tests(args.meta, args.allocator, endpoints, args.tests) diff --git a/test/spill_test.cc b/test/spill_test.cc index d95614098..48d185fa6 100644 --- a/test/spill_test.cc +++ b/test/spill_test.cc @@ -73,9 +73,9 @@ void BasicTest(Client& client) { bool is_spilled{false}; bool is_in_use{false}; - VINEYARD_CHECK_OK(client.Release({id1, blob_id})); + VINEYARD_CHECK_OK(client.Release({id1})); VINEYARD_CHECK_OK(client.IsInUse(blob_id, is_in_use)); - CHECK(!is_in_use); + CHECK(is_in_use); ArrayBuilder builder2(client, double_array); auto sealed_double_array2 = @@ -86,7 +86,7 @@ void BasicTest(Client& client) { CHECK(is_spilled); VINEYARD_CHECK_OK(client.IsInUse(blob_id2, is_in_use)); CHECK(is_in_use); - VINEYARD_CHECK_OK(client.Release({id2, blob_id2})); + VINEYARD_CHECK_OK(client.Release({id2})); LOG(INFO) << "Finish basic test ..."; } @@ -111,7 +111,7 @@ void ReloadTest(Client& client) { bool is_in_use{false}; VINEYARD_CHECK_OK(client.IsInUse(bid, is_in_use)); CHECK(is_in_use); - VINEYARD_CHECK_OK(client.Release({id, bid})); + VINEYARD_CHECK_OK(client.Release({id})); LOG(INFO) << "Finish reload test, case 1 ..."; } { @@ -124,7 +124,7 @@ void ReloadTest(Client& client) { bool is_in_use{false}; VINEYARD_CHECK_OK(client.IsInUse(bid1, is_in_use)); CHECK(is_in_use); - VINEYARD_CHECK_OK(client.Release({id1, bid1})); + VINEYARD_CHECK_OK(client.Release({id1})); LOG(INFO) << "Finish reload test, case 2 ..."; } { @@ -137,7 +137,7 @@ void ReloadTest(Client& client) { bool is_in_use{false}; VINEYARD_CHECK_OK(client.IsInUse(bid2, is_in_use)); CHECK(is_in_use); - VINEYARD_CHECK_OK(client.Release({id2, bid2})); + VINEYARD_CHECK_OK(client.Release({id2})); LOG(INFO) << "Finish reload test, case 3 ..."; } { @@ -150,7 +150,7 @@ void ReloadTest(Client& client) { bool is_in_use{false}; VINEYARD_CHECK_OK(client.IsInUse(bid3, is_in_use)); CHECK(is_in_use); - VINEYARD_CHECK_OK(client.Release({id3, bid3})); + VINEYARD_CHECK_OK(client.Release({id3})); LOG(INFO) << "Finish reload test, case 4 ..."; }