From 536375a2aea395d6641692fc6df5e3c21c0d97cc Mon Sep 17 00:00:00 2001 From: Violeta Stepanyan <82702795+VioletaStepanyan@users.noreply.github.com> Date: Mon, 15 May 2023 20:59:51 +0400 Subject: [PATCH 1/6] Add: Exception handling in flight client. --- src/flight_client.cpp | 379 +++++++++++++++++++++++------------------- 1 file changed, 210 insertions(+), 169 deletions(-) diff --git a/src/flight_client.cpp b/src/flight_client.cpp index ead1e66be..782166302 100644 --- a/src/flight_client.cpp +++ b/src/flight_client.cpp @@ -1311,31 +1311,36 @@ void ustore_collection_create(ustore_collection_create_t* c_ptr) { auto name_len = c.name ? std::strlen(c.name) : 0; return_error_if_m(name_len, c.error, args_wrong_k, "Default collection is always present"); - rpc_client_t& db = *reinterpret_cast(c.db); - - arf::Action action; - fmt::format_to(std::back_inserter(action.type), "{}?{}={}", kFlightColCreate, kParamCollectionName, c.name); - if (c.config) - action.body = std::make_shared(std::string_view {c.config}); + try { + rpc_client_t& db = *reinterpret_cast(c.db); - ar::Result> maybe_stream; - { - std::lock_guard lk(db.arena_lock); - arrow_mem_pool_t pool(db.arena); - arf::FlightCallOptions options = arrow_call_options(pool); - maybe_stream = db.flight->DoAction(options, action); + arf::Action action; + fmt::format_to(std::back_inserter(action.type), "{}?{}={}", kFlightColCreate, kParamCollectionName, c.name); + if (c.config) + action.body = std::make_shared(std::string_view {c.config}); + + ar::Result> maybe_stream; + { + std::lock_guard lk(db.arena_lock); + arrow_mem_pool_t pool(db.arena); + arf::FlightCallOptions options = arrow_call_options(pool); + maybe_stream = db.flight->DoAction(options, action); + } + return_error_if_m(maybe_stream.ok(), c.error, network_k, "Failed to act on Arrow server"); + auto& stream_ptr = maybe_stream.ValueUnsafe(); + ar::Result> maybe_id = stream_ptr->Next(); + return_error_if_m(maybe_id.ok(), c.error, network_k, "No response received"); + + auto& id_ptr = maybe_id.ValueUnsafe(); + return_error_if_m(id_ptr->body->size() == sizeof(ustore_collection_t), + c.error, + error_unknown_k, + "Inadequate response"); + std::memcpy(c.id, id_ptr->body->data(), sizeof(ustore_collection_t)); + } + catch (...) { + *c.error = "Collection Create Failure"; } - return_error_if_m(maybe_stream.ok(), c.error, network_k, "Failed to act on Arrow server"); - auto& stream_ptr = maybe_stream.ValueUnsafe(); - ar::Result> maybe_id = stream_ptr->Next(); - return_error_if_m(maybe_id.ok(), c.error, network_k, "No response received"); - - auto& id_ptr = maybe_id.ValueUnsafe(); - return_error_if_m(id_ptr->body->size() == sizeof(ustore_collection_t), - c.error, - error_unknown_k, - "Inadequate response"); - std::memcpy(c.id, id_ptr->body->data(), sizeof(ustore_collection_t)); } void ustore_collection_drop(ustore_collection_drop_t* c_ptr) { @@ -1343,77 +1348,88 @@ void ustore_collection_drop(ustore_collection_drop_t* c_ptr) { ustore_collection_drop_t& c = *c_ptr; return_error_if_m(c.db, c.error, uninitialized_state_k, "DataBase is uninitialized"); - std::string_view mode; - switch (c.mode) { - case ustore_drop_vals_k: mode = kParamDropModeValues; break; - case ustore_drop_keys_vals_k: mode = kParamDropModeContents; break; - case ustore_drop_keys_vals_handle_k: mode = kParamDropModeCollection; break; - } + try { + std::string_view mode; + switch (c.mode) { + case ustore_drop_vals_k: mode = kParamDropModeValues; break; + case ustore_drop_keys_vals_k: mode = kParamDropModeContents; break; + case ustore_drop_keys_vals_handle_k: mode = kParamDropModeCollection; break; + } - rpc_client_t& db = *reinterpret_cast(c.db); + rpc_client_t& db = *reinterpret_cast(c.db); - arf::Action action; - fmt::format_to(std::back_inserter(action.type), - "{}?{}=0x{:0>16x}&{}={}", - kFlightColDrop, - kParamCollectionID, - c.id, - kParamDropMode, - mode); - - std::lock_guard lk(db.arena_lock); - arrow_mem_pool_t pool(db.arena); - arf::FlightCallOptions options = arrow_call_options(pool); - ar::Result> maybe_stream = db.flight->DoAction(options, action); - return_error_if_m(maybe_stream.ok(), c.error, network_k, "Failed to act on Arrow server"); + arf::Action action; + fmt::format_to(std::back_inserter(action.type), + "{}?{}=0x{:0>16x}&{}={}", + kFlightColDrop, + kParamCollectionID, + c.id, + kParamDropMode, + mode); + + std::lock_guard lk(db.arena_lock); + arrow_mem_pool_t pool(db.arena); + arf::FlightCallOptions options = arrow_call_options(pool); + ar::Result> maybe_stream = db.flight->DoAction(options, action); + return_error_if_m(maybe_stream.ok(), c.error, network_k, "Failed to act on Arrow server"); + } + catch (...) { + *c.error = "Collection Drop Failure"; + } } void ustore_collection_list(ustore_collection_list_t* c_ptr) { ustore_collection_list_t& c = *c_ptr; return_error_if_m(c.db, c.error, uninitialized_state_k, "DataBase is uninitialized"); - rpc_client_t& db = *reinterpret_cast(c.db); - if (!(c.options & ustore_option_dont_discard_memory_k)) - db.readers.clear(); - linked_memory_lock_t arena = linked_memory(c.arena, c.options, c.error); - return_if_error_m(c.error); + try { + rpc_client_t& db = *reinterpret_cast(c.db); + if (!(c.options & ustore_option_dont_discard_memory_k)) + db.readers.clear(); - ar::Status ar_status; - arrow_mem_pool_t pool(arena); - arf::FlightCallOptions options = arrow_call_options(pool); + linked_memory_lock_t arena = linked_memory(c.arena, c.options, c.error); + return_if_error_m(c.error); - arf::Ticket ticket {kFlightListCols}; - if (c.transaction) - fmt::format_to(std::back_inserter(ticket.ticket), - "?{}=0x{:0>16x}", - kParamTransactionID, - std::uintptr_t(c.transaction)); + ar::Status ar_status; + arrow_mem_pool_t pool(arena); + arf::FlightCallOptions options = arrow_call_options(pool); - auto maybe_stream = db.flight->DoGet(options, ticket); - return_error_if_m(maybe_stream.ok(), c.error, network_k, "Failed to act on Arrow server"); - auto& stream_ptr = maybe_stream.ValueUnsafe(); + arf::Ticket ticket {kFlightListCols}; + if (c.transaction) + fmt::format_to(std::back_inserter(ticket.ticket), + "?{}=0x{:0>16x}", + kParamTransactionID, + std::uintptr_t(c.transaction)); - auto maybe_table = stream_ptr->ToTable(); - return_error_if_m(maybe_table.ok(), c.error, error_unknown_k, "Failed to create table"); - auto table = maybe_table.ValueUnsafe(); + auto maybe_stream = db.flight->DoGet(options, ticket); + return_error_if_m(maybe_stream.ok(), c.error, network_k, "Failed to act on Arrow server"); + auto& stream_ptr = maybe_stream.ValueUnsafe(); + + auto maybe_table = stream_ptr->ToTable(); + return_error_if_m(maybe_table.ok(), c.error, error_unknown_k, "Failed to create table"); + auto table = maybe_table.ValueUnsafe(); + + if (c.count) + *c.count = static_cast(table->num_rows()); + if (c.names) { + auto array = std::static_pointer_cast(table->column(1)->chunk(0)); + return_error_if_m(table->column(1)->num_chunks() == 1, c.error, network_k, "Expected one chunk"); + *c.names = (ustore_str_span_t)array->value_data()->data(); + if (c.offsets) + *c.offsets = (ustore_length_t*)array->value_offsets()->data(); + } + if (c.ids) { + auto array = std::static_pointer_cast>(table->column(0)->chunk(0)); + return_error_if_m(table->column(0)->num_chunks() == 1, c.error, network_k, "Expected one chunk"); + *c.ids = (ustore_collection_t*)array->raw_values(); + } - if (c.count) - *c.count = static_cast(table->num_rows()); - if (c.names) { - auto array = std::static_pointer_cast(table->column(1)->chunk(0)); - return_error_if_m(table->column(1)->num_chunks() == 1, c.error, network_k, "Expected one chunk"); - *c.names = (ustore_str_span_t)array->value_data()->data(); - if (c.offsets) - *c.offsets = (ustore_length_t*)array->value_offsets()->data(); + db.readers.push_back(std::move(stream_ptr)); } - if (c.ids) { - auto array = std::static_pointer_cast>(table->column(0)->chunk(0)); - return_error_if_m(table->column(0)->num_chunks() == 1, c.error, network_k, "Expected one chunk"); - *c.ids = (ustore_collection_t*)array->raw_values(); + catch (...) { + *c.error = "Collection List Failure"; } - - db.readers.push_back(std::move(stream_ptr)); } void ustore_database_control(ustore_database_control_t* c_ptr) { @@ -1433,63 +1449,73 @@ void ustore_snapshot_list(ustore_snapshot_list_t* c_ptr) { ustore_snapshot_list_t& c = *c_ptr; return_error_if_m(c.db, c.error, uninitialized_state_k, "DataBase is uninitialized"); - linked_memory_lock_t arena = linked_memory(c.arena, c.options, c.error); - return_if_error_m(c.error); + try { + linked_memory_lock_t arena = linked_memory(c.arena, c.options, c.error); + return_if_error_m(c.error); - ar::Status ar_status; - arrow_mem_pool_t pool(arena); - arf::FlightCallOptions options = arrow_call_options(pool); + ar::Status ar_status; + arrow_mem_pool_t pool(arena); + arf::FlightCallOptions options = arrow_call_options(pool); - rpc_client_t& db = *reinterpret_cast(c.db); + rpc_client_t& db = *reinterpret_cast(c.db); - arf::Ticket ticket {kFlightListSnap}; - ar::Result> maybe_stream = db.flight->DoGet(options, ticket); - return_error_if_m(maybe_stream.ok(), c.error, network_k, "Failed to act on Arrow server"); + arf::Ticket ticket {kFlightListSnap}; + ar::Result> maybe_stream = db.flight->DoGet(options, ticket); + return_error_if_m(maybe_stream.ok(), c.error, network_k, "Failed to act on Arrow server"); - auto& stream_ptr = maybe_stream.ValueUnsafe(); - ar::Result> maybe_table = stream_ptr->ToTable(); + auto& stream_ptr = maybe_stream.ValueUnsafe(); + ar::Result> maybe_table = stream_ptr->ToTable(); - ArrowSchema schema_c; - ArrowArray batch_c; - ar_status = unpack_table(maybe_table, schema_c, batch_c, &pool); - return_error_if_m(ar_status.ok(), c.error, args_combo_k, "Failed to unpack list of snapshots"); + ArrowSchema schema_c; + ArrowArray batch_c; + ar_status = unpack_table(maybe_table, schema_c, batch_c, &pool); + return_error_if_m(ar_status.ok(), c.error, args_combo_k, "Failed to unpack list of snapshots"); - auto ids_column_idx = column_idx(schema_c, kArgSnaps); - return_error_if_m(ids_column_idx, c.error, args_combo_k, "Expecting one column"); + auto ids_column_idx = column_idx(schema_c, kArgSnaps); + return_error_if_m(ids_column_idx, c.error, args_combo_k, "Expecting one column"); - if (c.count) - *c.count = static_cast(batch_c.length); - if (c.ids) - *c.ids = (ustore_collection_t*)batch_c.children[*ids_column_idx]->buffers[1]; + if (c.count) + *c.count = static_cast(batch_c.length); + if (c.ids) + *c.ids = (ustore_collection_t*)batch_c.children[*ids_column_idx]->buffers[1]; + } + catch (...) { + *c.error = "Snapshot List Failure"; + } } void ustore_snapshot_create(ustore_snapshot_create_t* c_ptr) { ustore_snapshot_create_t& c = *c_ptr; return_error_if_m(c.db, c.error, uninitialized_state_k, "DataBase is uninitialized"); - rpc_client_t& db = *reinterpret_cast(c.db); - - arf::Action action; - fmt::format_to(std::back_inserter(action.type), "{}", kFlightSnapCreate); + try { + rpc_client_t& db = *reinterpret_cast(c.db); - ar::Result> maybe_stream; - { - std::lock_guard lk(db.arena_lock); - arrow_mem_pool_t pool(db.arena); - arf::FlightCallOptions options = arrow_call_options(pool); - maybe_stream = db.flight->DoAction(options, action); + arf::Action action; + fmt::format_to(std::back_inserter(action.type), "{}", kFlightSnapCreate); + + ar::Result> maybe_stream; + { + std::lock_guard lk(db.arena_lock); + arrow_mem_pool_t pool(db.arena); + arf::FlightCallOptions options = arrow_call_options(pool); + maybe_stream = db.flight->DoAction(options, action); + } + return_error_if_m(maybe_stream.ok(), c.error, network_k, "Failed to act on Arrow server"); + auto& stream_ptr = maybe_stream.ValueUnsafe(); + ar::Result> maybe_id = stream_ptr->Next(); + return_error_if_m(maybe_id.ok(), c.error, network_k, "No response received"); + + auto& id_ptr = maybe_id.ValueUnsafe(); + return_error_if_m(id_ptr->body->size() == sizeof(ustore_snapshot_t), + c.error, + error_unknown_k, + "Inadequate response"); + std::memcpy(c.id, id_ptr->body->data(), sizeof(ustore_snapshot_t)); + } + catch (...) { + *c.error = "Snapshot Create Failure"; } - return_error_if_m(maybe_stream.ok(), c.error, network_k, "Failed to act on Arrow server"); - auto& stream_ptr = maybe_stream.ValueUnsafe(); - ar::Result> maybe_id = stream_ptr->Next(); - return_error_if_m(maybe_id.ok(), c.error, network_k, "No response received"); - - auto& id_ptr = maybe_id.ValueUnsafe(); - return_error_if_m(id_ptr->body->size() == sizeof(ustore_snapshot_t), - c.error, - error_unknown_k, - "Inadequate response"); - std::memcpy(c.id, id_ptr->body->data(), sizeof(ustore_snapshot_t)); } void ustore_snapshot_export(ustore_snapshot_export_t* c_ptr) { @@ -1523,16 +1549,21 @@ void ustore_snapshot_drop(ustore_snapshot_drop_t* c_ptr) { ustore_snapshot_drop_t& c = *c_ptr; return_error_if_m(c.db, c.error, uninitialized_state_k, "DataBase is uninitialized"); - rpc_client_t& db = *reinterpret_cast(c.db); + try { + rpc_client_t& db = *reinterpret_cast(c.db); - arf::Action action; - fmt::format_to(std::back_inserter(action.type), "{}?{}={}", kFlightSnapDrop, kParamSnapshotID, c.id); + arf::Action action; + fmt::format_to(std::back_inserter(action.type), "{}?{}={}", kFlightSnapDrop, kParamSnapshotID, c.id); - std::lock_guard lk(db.arena_lock); - arrow_mem_pool_t pool(db.arena); - arf::FlightCallOptions options = arrow_call_options(pool); - ar::Result> maybe_stream = db.flight->DoAction(options, action); - return_error_if_m(maybe_stream.ok(), c.error, network_k, "Failed to act on Arrow server"); + std::lock_guard lk(db.arena_lock); + arrow_mem_pool_t pool(db.arena); + arf::FlightCallOptions options = arrow_call_options(pool); + ar::Result> maybe_stream = db.flight->DoAction(options, action); + return_error_if_m(maybe_stream.ok(), c.error, network_k, "Failed to act on Arrow server"); + } + catch (...) { + *c.error = "Snapshot Drop Failure"; + } } /*********************************************************/ @@ -1545,35 +1576,40 @@ void ustore_transaction_init(ustore_transaction_init_t* c_ptr) { return_error_if_m(c.db, c.error, uninitialized_state_k, "DataBase is uninitialized"); return_error_if_m(c.transaction, c.error, uninitialized_state_k, "Transaction is uninitialized"); - rpc_client_t& db = *reinterpret_cast(c.db); + try { + rpc_client_t& db = *reinterpret_cast(c.db); - arf::Action action; - ustore_size_t txn_id = *reinterpret_cast(c.transaction); - fmt::format_to(std::back_inserter(action.type), "{}?", kFlightTxnBegin); - if (txn_id != 0) - fmt::format_to(std::back_inserter(action.type), "{}=0x{:0>16x}&", kParamTransactionID, txn_id); - if (c.options & ustore_option_transaction_dont_watch_k) - fmt::format_to(std::back_inserter(action.type), "{}&", kParamFlagDontWatch); + arf::Action action; + ustore_size_t txn_id = *reinterpret_cast(c.transaction); + fmt::format_to(std::back_inserter(action.type), "{}?", kFlightTxnBegin); + if (txn_id != 0) + fmt::format_to(std::back_inserter(action.type), "{}=0x{:0>16x}&", kParamTransactionID, txn_id); + if (c.options & ustore_option_transaction_dont_watch_k) + fmt::format_to(std::back_inserter(action.type), "{}&", kParamFlagDontWatch); + + ar::Result> maybe_stream; + { + std::lock_guard lk(db.arena_lock); + arrow_mem_pool_t pool(db.arena); + arf::FlightCallOptions options = arrow_call_options(pool); + maybe_stream = db.flight->DoAction(options, action); + } + return_error_if_m(maybe_stream.ok(), c.error, network_k, "Failed to act on Arrow server"); - ar::Result> maybe_stream; - { - std::lock_guard lk(db.arena_lock); - arrow_mem_pool_t pool(db.arena); - arf::FlightCallOptions options = arrow_call_options(pool); - maybe_stream = db.flight->DoAction(options, action); + auto& stream_ptr = maybe_stream.ValueUnsafe(); + ar::Result> maybe_id = stream_ptr->Next(); + return_error_if_m(maybe_id.ok(), c.error, network_k, "No response received"); + + auto& id_ptr = maybe_id.ValueUnsafe(); + return_error_if_m(id_ptr->body->size() == sizeof(ustore_transaction_t), + c.error, + error_unknown_k, + "Inadequate response"); + std::memcpy(c.transaction, id_ptr->body->data(), sizeof(ustore_transaction_t)); + } + catch (...) { + *c.error = "Transaction Init Failure"; } - return_error_if_m(maybe_stream.ok(), c.error, network_k, "Failed to act on Arrow server"); - - auto& stream_ptr = maybe_stream.ValueUnsafe(); - ar::Result> maybe_id = stream_ptr->Next(); - return_error_if_m(maybe_id.ok(), c.error, network_k, "No response received"); - - auto& id_ptr = maybe_id.ValueUnsafe(); - return_error_if_m(id_ptr->body->size() == sizeof(ustore_transaction_t), - c.error, - error_unknown_k, - "Inadequate response"); - std::memcpy(c.transaction, id_ptr->body->data(), sizeof(ustore_transaction_t)); } void ustore_transaction_commit(ustore_transaction_commit_t* c_ptr) { @@ -1581,22 +1617,27 @@ void ustore_transaction_commit(ustore_transaction_commit_t* c_ptr) { ustore_transaction_commit_t& c = *c_ptr; return_error_if_m(c.transaction, c.error, uninitialized_state_k, "Transaction is uninitialized"); - rpc_client_t& db = *reinterpret_cast(c.db); + try { + rpc_client_t& db = *reinterpret_cast(c.db); - arf::Action action; - fmt::format_to(std::back_inserter(action.type), - "{}?{}=0x{:0>16x}&", - kFlightTxnCommit, - kParamTransactionID, - std::uintptr_t(c.transaction)); - if (c.options & ustore_option_write_flush_k) - fmt::format_to(std::back_inserter(action.type), "{}&", kParamFlagFlushWrite); - - std::lock_guard lk(db.arena_lock); - arrow_mem_pool_t pool(db.arena); - arf::FlightCallOptions options = arrow_call_options(pool); - ar::Result> maybe_stream = db.flight->DoAction(options, action); - return_error_if_m(maybe_stream.ok(), c.error, network_k, "Failed to act on Arrow server"); + arf::Action action; + fmt::format_to(std::back_inserter(action.type), + "{}?{}=0x{:0>16x}&", + kFlightTxnCommit, + kParamTransactionID, + std::uintptr_t(c.transaction)); + if (c.options & ustore_option_write_flush_k) + fmt::format_to(std::back_inserter(action.type), "{}&", kParamFlagFlushWrite); + + std::lock_guard lk(db.arena_lock); + arrow_mem_pool_t pool(db.arena); + arf::FlightCallOptions options = arrow_call_options(pool); + ar::Result> maybe_stream = db.flight->DoAction(options, action); + return_error_if_m(maybe_stream.ok(), c.error, network_k, "Failed to act on Arrow server"); + } + catch (...) { + *c.error = "Transaction Commit Failure"; + } } /*********************************************************/ From 0d7eb9b330e6a87bbfb62797e062780dee3fc8a6 Mon Sep 17 00:00:00 2001 From: Violeta Stepanyan <82702795+VioletaStepanyan@users.noreply.github.com> Date: Wed, 17 May 2023 12:06:49 +0400 Subject: [PATCH 2/6] Add: Exception handling for collection, snapshot, transaction methods in rockdb. --- src/engine_rocksdb.cpp | 331 +++++++++++++++++++++++------------------ 1 file changed, 186 insertions(+), 145 deletions(-) diff --git a/src/engine_rocksdb.cpp b/src/engine_rocksdb.cpp index 83d6b029f..332130c7e 100644 --- a/src/engine_rocksdb.cpp +++ b/src/engine_rocksdb.cpp @@ -244,21 +244,26 @@ void ustore_snapshot_list(ustore_snapshot_list_t* c_ptr) { return_error_if_m(c.db, c.error, uninitialized_state_k, "DataBase is uninitialized"); return_error_if_m(c.count && c.ids, c.error, args_combo_k, "Need outputs!"); - linked_memory_lock_t arena = linked_memory(c.arena, c.options, c.error); - return_if_error_m(c.error); + try { + linked_memory_lock_t arena = linked_memory(c.arena, c.options, c.error); + return_if_error_m(c.error); - rocks_db_t& db = *reinterpret_cast(c.db); - std::lock_guard locker(db.mutex); - std::size_t snapshots_count = db.snapshots.size(); - *c.count = static_cast(snapshots_count); + rocks_db_t& db = *reinterpret_cast(c.db); + std::lock_guard locker(db.mutex); + std::size_t snapshots_count = db.snapshots.size(); + *c.count = static_cast(snapshots_count); - // For every snapshot we also need to export IDs - auto ids = arena.alloc_or_dummy(snapshots_count, c.error, c.ids); - return_if_error_m(c.error); + // For every snapshot we also need to export IDs + auto ids = arena.alloc_or_dummy(snapshots_count, c.error, c.ids); + return_if_error_m(c.error); - std::size_t i = 0; - for (const auto& [id, _] : db.snapshots) - ids[i++] = id; + std::size_t i = 0; + for (const auto& [id, _] : db.snapshots) + ids[i++] = id; + } + catch (...) { + *c.error = "Snapshot List Failure"; + } } void ustore_snapshot_create(ustore_snapshot_create_t* c_ptr) { @@ -266,25 +271,31 @@ void ustore_snapshot_create(ustore_snapshot_create_t* c_ptr) { ustore_snapshot_create_t& c = *c_ptr; return_error_if_m(c.db, c.error, uninitialized_state_k, "DataBase is uninitialized"); - rocks_db_t& db = *reinterpret_cast(c.db); - std::lock_guard locker(db.mutex); - auto it = db.snapshots.find(*c.id); - if (it != db.snapshots.end()) - return_error_if_m(it->second, c.error, args_wrong_k, "Such snapshot already exists!"); + try { + rocks_db_t& db = *reinterpret_cast(c.db); + std::lock_guard locker(db.mutex); + auto it = db.snapshots.find(*c.id); + if (it != db.snapshots.end()) + return_error_if_m(it->second, c.error, args_wrong_k, "Such snapshot already exists!"); - rocks_snapshot_t* rocks_snapshot = nullptr; - safe_section("Allocating snapshot handle", c.error, [&] { rocks_snapshot = new rocks_snapshot_t(); }); - return_if_error_m(c.error); + rocks_snapshot_t* rocks_snapshot = nullptr; + safe_section("Allocating snapshot handle", c.error, [&] { rocks_snapshot = new rocks_snapshot_t(); }); + return_if_error_m(c.error); - rocks_snapshot->snapshot = db.native->GetSnapshot(); - if (!rocks_snapshot->snapshot) - *c.error = "Couldn't get a snapshot!"; + rocks_snapshot->snapshot = db.native->GetSnapshot(); + if (!rocks_snapshot->snapshot) + *c.error = "Couldn't get a snapshot!"; - *c.id = reinterpret_cast(rocks_snapshot); - db.snapshots[*c.id] = rocks_snapshot; + *c.id = reinterpret_cast(rocks_snapshot); + db.snapshots[*c.id] = rocks_snapshot; + } + catch (...) { + *c.error = "Snapshot Create Failure"; + } } void ustore_snapshot_export(ustore_snapshot_export_t* c_ptr) { + ustore_snapshot_export_t& c = *c_ptr; return_error_if_m(c.db, c.error, uninitialized_state_k, "DataBase is uninitialized"); @@ -314,25 +325,27 @@ void ustore_snapshot_export(ustore_snapshot_export_t* c_ptr) { void ustore_snapshot_drop(ustore_snapshot_drop_t* c_ptr) { - if (!c_ptr) - return; - ustore_snapshot_drop_t& c = *c_ptr; if (!c.id) return; - rocks_db_t& db = *reinterpret_cast(c.db); - rocks_snapshot_t& snap = *reinterpret_cast(c.id); - if (!snap.snapshot) - return; + try { + rocks_db_t& db = *reinterpret_cast(c.db); + rocks_snapshot_t& snap = *reinterpret_cast(c.id); + if (!snap.snapshot) + return; - db.native->ReleaseSnapshot(snap.snapshot); - snap.snapshot = nullptr; + db.native->ReleaseSnapshot(snap.snapshot); + snap.snapshot = nullptr; - auto id = reinterpret_cast(c.id); - db.mutex.lock(); - db.snapshots.erase(id); - db.mutex.unlock(); + auto id = reinterpret_cast(c.id); + db.mutex.lock(); + db.snapshots.erase(id); + db.mutex.unlock(); + } + catch (...) { + *c.error = "Snapshot Drop Failure"; + } } void write_one( // @@ -773,20 +786,28 @@ void ustore_collection_create(ustore_collection_create_t* c_ptr) { return_error_if_m(name_len, c.error, args_wrong_k, "Default collection is always present"); return_error_if_m(c.db, c.error, uninitialized_state_k, "DataBase is uninitialized"); - rocks_db_t& db = *reinterpret_cast(c.db); + try { + rocks_db_t& db = *reinterpret_cast(c.db); - for (auto handle : db.columns) { - if (handle) - return_error_if_m(handle->GetName() != c.name, c.error, args_wrong_k, "Such collection already exists!"); - } + for (auto handle : db.columns) { + if (handle) + return_error_if_m(handle->GetName() != c.name, + c.error, + args_wrong_k, + "Such collection already exists!"); + } - rocks_collection_t* collection = nullptr; - auto cf_options = rocksdb::ColumnFamilyOptions(); - cf_options.comparator = &key_comparator_k; - rocks_status_t status = db.native->CreateColumnFamily(std::move(cf_options), c.name, &collection); - if (!export_error(status, c.error)) { - db.columns.push_back(collection); - *c.id = reinterpret_cast(collection); + rocks_collection_t* collection = nullptr; + auto cf_options = rocksdb::ColumnFamilyOptions(); + cf_options.comparator = &key_comparator_k; + rocks_status_t status = db.native->CreateColumnFamily(std::move(cf_options), c.name, &collection); + if (!export_error(status, c.error)) { + db.columns.push_back(collection); + *c.id = reinterpret_cast(collection); + } + } + catch (...) { + *c.error = "Collection Create Failure"; } } @@ -801,55 +822,60 @@ void ustore_collection_drop(ustore_collection_drop_t* c_ptr) { args_combo_k, "Default collection can't be invalidated."); - rocks_db_t& db = *reinterpret_cast(c.db); - rocks_collection_t* collection_ptr = reinterpret_cast(c.id); - rocks_collection_t* collection_ptr_to_clear = nullptr; + try { + rocks_db_t& db = *reinterpret_cast(c.db); + rocks_collection_t* collection_ptr = reinterpret_cast(c.id); + rocks_collection_t* collection_ptr_to_clear = nullptr; - if (c.id == ustore_collection_main_k) - collection_ptr_to_clear = db.native->DefaultColumnFamily(); - else { - for (auto it = db.columns.begin(); it != db.columns.end(); it++) { - collection_ptr_to_clear = reinterpret_cast(*it); - if (collection_ptr_to_clear == collection_ptr) - break; + if (c.id == ustore_collection_main_k) + collection_ptr_to_clear = db.native->DefaultColumnFamily(); + else { + for (auto it = db.columns.begin(); it != db.columns.end(); it++) { + collection_ptr_to_clear = reinterpret_cast(*it); + if (collection_ptr_to_clear == collection_ptr) + break; + } } - } - rocksdb::WriteOptions options; - options.sync = true; - - if (c.mode == ustore_drop_keys_vals_handle_k) { - for (auto it = db.columns.begin(); it != db.columns.end(); it++) { - if (collection_ptr_to_clear == *it) { - rocks_status_t status = db.native->DropColumnFamily(collection_ptr_to_clear); - if (export_error(status, c.error)) - return; - db.columns.erase(it); - break; + rocksdb::WriteOptions options; + options.sync = true; + + if (c.mode == ustore_drop_keys_vals_handle_k) { + for (auto it = db.columns.begin(); it != db.columns.end(); it++) { + if (collection_ptr_to_clear == *it) { + rocks_status_t status = db.native->DropColumnFamily(collection_ptr_to_clear); + if (export_error(status, c.error)) + return; + db.columns.erase(it); + break; + } } + return; + } + else if (c.mode == ustore_drop_keys_vals_k) { + rocksdb::WriteBatch batch; + auto it = std::unique_ptr( + db.native->NewIterator(rocksdb::ReadOptions(), collection_ptr_to_clear)); + for (it->SeekToFirst(); it->Valid(); it->Next()) + batch.Delete(collection_ptr_to_clear, it->key()); + rocks_status_t status = db.native->Write(options, &batch); + export_error(status, c.error); + return; } - return; - } - else if (c.mode == ustore_drop_keys_vals_k) { - rocksdb::WriteBatch batch; - auto it = - std::unique_ptr(db.native->NewIterator(rocksdb::ReadOptions(), collection_ptr_to_clear)); - for (it->SeekToFirst(); it->Valid(); it->Next()) - batch.Delete(collection_ptr_to_clear, it->key()); - rocks_status_t status = db.native->Write(options, &batch); - export_error(status, c.error); - return; - } - else if (c.mode == ustore_drop_vals_k) { - rocksdb::WriteBatch batch; - auto it = - std::unique_ptr(db.native->NewIterator(rocksdb::ReadOptions(), collection_ptr_to_clear)); - for (it->SeekToFirst(); it->Valid(); it->Next()) - batch.Put(collection_ptr_to_clear, it->key(), rocksdb::Slice()); - rocks_status_t status = db.native->Write(options, &batch); - export_error(status, c.error); - return; + else if (c.mode == ustore_drop_vals_k) { + rocksdb::WriteBatch batch; + auto it = std::unique_ptr( + db.native->NewIterator(rocksdb::ReadOptions(), collection_ptr_to_clear)); + for (it->SeekToFirst(); it->Valid(); it->Next()) + batch.Put(collection_ptr_to_clear, it->key(), rocksdb::Slice()); + rocks_status_t status = db.native->Write(options, &batch); + export_error(status, c.error); + return; + } + } + catch (...) { + *c.error = "Collection Drop Failure"; } } @@ -859,42 +885,47 @@ void ustore_collection_list(ustore_collection_list_t* c_ptr) { return_error_if_m(c.db, c.error, uninitialized_state_k, "DataBase is uninitialized"); return_error_if_m(c.count && c.names, c.error, args_combo_k, "Need names and outputs!"); - linked_memory_lock_t arena = linked_memory(c.arena, c.options, c.error); - return_if_error_m(c.error); - - rocks_db_t& db = *reinterpret_cast(c.db); - std::size_t collections_count = db.columns.size() - 1; - *c.count = static_cast(collections_count); + try { + linked_memory_lock_t arena = linked_memory(c.arena, c.options, c.error); + return_if_error_m(c.error); - // Every string will be null-terminated - std::size_t strings_length = 0; - for (auto const& column : db.columns) - strings_length += column->GetName().size() + 1; + rocks_db_t& db = *reinterpret_cast(c.db); + std::size_t collections_count = db.columns.size() - 1; + *c.count = static_cast(collections_count); - auto names = arena.alloc(strings_length, c.error).begin(); - *c.names = names; - return_if_error_m(c.error); + // Every string will be null-terminated + std::size_t strings_length = 0; + for (auto const& column : db.columns) + strings_length += column->GetName().size() + 1; - // For every collection we also need to export IDs and offsets - auto ids = arena.alloc_or_dummy(collections_count, c.error, c.ids); - return_if_error_m(c.error); - auto offs = arena.alloc_or_dummy(collections_count + 1, c.error, c.offsets); - return_if_error_m(c.error); + auto names = arena.alloc(strings_length, c.error).begin(); + *c.names = names; + return_if_error_m(c.error); - std::size_t i = 0; - for (auto const& column : db.columns) { - if (column->GetName() == rocksdb::kDefaultColumnFamilyName) - continue; + // For every collection we also need to export IDs and offsets + auto ids = arena.alloc_or_dummy(collections_count, c.error, c.ids); + return_if_error_m(c.error); + auto offs = arena.alloc_or_dummy(collections_count + 1, c.error, c.offsets); + return_if_error_m(c.error); - auto len = column->GetName().size(); - std::memcpy(names, column->GetName().data(), len); - names[len] = '\0'; - ids[i] = reinterpret_cast(column); + std::size_t i = 0; + for (auto const& column : db.columns) { + if (column->GetName() == rocksdb::kDefaultColumnFamilyName) + continue; + + auto len = column->GetName().size(); + std::memcpy(names, column->GetName().data(), len); + names[len] = '\0'; + ids[i] = reinterpret_cast(column); + offs[i] = static_cast(names - *c.names); + names += len + 1; + ++i; + } offs[i] = static_cast(names - *c.names); - names += len + 1; - ++i; } - offs[i] = static_cast(names - *c.names); + catch (...) { + *c.error = "Collection List Failure"; + } } void ustore_database_control(ustore_database_control_t* c_ptr) { @@ -911,19 +942,24 @@ void ustore_transaction_init(ustore_transaction_init_t* c_ptr) { validate_transaction_begin(c.transaction, c.options, c.error); return_if_error_m(c.error); - bool const safe = c.options & ustore_option_write_flush_k; - rocks_db_t& db = *reinterpret_cast(c.db); - rocks_txn_t& txn = **reinterpret_cast(c.transaction); - rocksdb::OptimisticTransactionOptions txn_options; - txn_options.set_snapshot = false; - rocksdb::WriteOptions options; - options.sync = safe; - options.disableWAL = !safe; - auto new_txn = db.native->BeginTransaction(options, txn_options, &txn); - if (!new_txn) - *c.error = "Couldn't start a transaction!"; - else - *c.transaction = new_txn; + try { + bool const safe = c.options & ustore_option_write_flush_k; + rocks_db_t& db = *reinterpret_cast(c.db); + rocks_txn_t& txn = **reinterpret_cast(c.transaction); + rocksdb::OptimisticTransactionOptions txn_options; + txn_options.set_snapshot = false; + rocksdb::WriteOptions options; + options.sync = safe; + options.disableWAL = !safe; + auto new_txn = db.native->BeginTransaction(options, txn_options, &txn); + if (!new_txn) + *c.error = "Couldn't start a transaction!"; + else + *c.transaction = new_txn; + } + catch (...) { + *c.error = "Transaction Init Failure"; + } } void ustore_transaction_commit(ustore_transaction_commit_t* c_ptr) { @@ -934,17 +970,22 @@ void ustore_transaction_commit(ustore_transaction_commit_t* c_ptr) { validate_transaction_commit(c.transaction, c.options, c.error); return_if_error_m(c.error); - rocks_db_t& db = *reinterpret_cast(c.db); - rocks_txn_t& txn = *reinterpret_cast(c.transaction); + try { + rocks_db_t& db = *reinterpret_cast(c.db); + rocks_txn_t& txn = *reinterpret_cast(c.transaction); - if (c.sequence_number) - db.mutex.lock(); - rocks_status_t status = txn.Commit(); - export_error(status, c.error); - if (c.sequence_number) { - if (status.ok()) - *c.sequence_number = db.native->GetLatestSequenceNumber(); - db.mutex.unlock(); + if (c.sequence_number) + db.mutex.lock(); + rocks_status_t status = txn.Commit(); + export_error(status, c.error); + if (c.sequence_number) { + if (status.ok()) + *c.sequence_number = db.native->GetLatestSequenceNumber(); + db.mutex.unlock(); + } + } + catch (...) { + *c.error = "Transaction Commit Failure"; } } From 1f0025b4fd78606d40e83bc9154e1ad3663f6fad Mon Sep 17 00:00:00 2001 From: Violeta Stepanyan <82702795+VioletaStepanyan@users.noreply.github.com> Date: Wed, 17 May 2023 12:19:53 +0400 Subject: [PATCH 3/6] Add: Exception handling for collection, snapshot methods in leveldb. --- src/engine_leveldb.cpp | 124 +++++++++++++++++++++++------------------ 1 file changed, 71 insertions(+), 53 deletions(-) diff --git a/src/engine_leveldb.cpp b/src/engine_leveldb.cpp index 19eae136f..1c8683ce5 100644 --- a/src/engine_leveldb.cpp +++ b/src/engine_leveldb.cpp @@ -194,43 +194,53 @@ void ustore_snapshot_list(ustore_snapshot_list_t* c_ptr) { return_error_if_m(c.db, c.error, uninitialized_state_k, "DataBase is uninitialized"); return_error_if_m(c.count && c.ids, c.error, args_combo_k, "Need outputs!"); - linked_memory_lock_t arena = linked_memory(c.arena, c.options, c.error); - return_if_error_m(c.error); + try { + linked_memory_lock_t arena = linked_memory(c.arena, c.options, c.error); + return_if_error_m(c.error); - level_db_t& db = *reinterpret_cast(c.db); - std::lock_guard locker(db.mutex); - std::size_t snapshots_count = db.snapshots.size(); - *c.count = static_cast(snapshots_count); + level_db_t& db = *reinterpret_cast(c.db); + std::lock_guard locker(db.mutex); + std::size_t snapshots_count = db.snapshots.size(); + *c.count = static_cast(snapshots_count); - // For every snapshot we also need to export IDs - auto ids = arena.alloc_or_dummy(snapshots_count, c.error, c.ids); - return_if_error_m(c.error); + // For every snapshot we also need to export IDs + auto ids = arena.alloc_or_dummy(snapshots_count, c.error, c.ids); + return_if_error_m(c.error); - std::size_t i = 0; - for (const auto& [id, _] : db.snapshots) - ids[i++] = id; + std::size_t i = 0; + for (const auto& [id, _] : db.snapshots) + ids[i++] = id; + } + catch (...) { + *c.error = "Snapshot List Failure"; + } } void ustore_snapshot_create(ustore_snapshot_create_t* c_ptr) { ustore_snapshot_create_t& c = *c_ptr; return_error_if_m(c.db, c.error, uninitialized_state_k, "DataBase is uninitialized"); - level_db_t& db = *reinterpret_cast(c.db); - std::lock_guard locker(db.mutex); - auto it = db.snapshots.find(*c.id); - if (it != db.snapshots.end()) - return_error_if_m(it->second, c.error, args_wrong_k, "Such snapshot already exists!"); - - level_snapshot_t* level_snapshot = nullptr; - safe_section("Allocating snapshot handle", c.error, [&] { level_snapshot = new level_snapshot_t(); }); - return_if_error_m(c.error); + try { + level_db_t& db = *reinterpret_cast(c.db); + std::lock_guard locker(db.mutex); + auto it = db.snapshots.find(*c.id); + if (it != db.snapshots.end()) + return_error_if_m(it->second, c.error, args_wrong_k, "Such snapshot already exists!"); + + level_snapshot_t* level_snapshot = nullptr; + safe_section("Allocating snapshot handle", c.error, [&] { level_snapshot = new level_snapshot_t(); }); + return_if_error_m(c.error); - level_snapshot->snapshot = db.native->GetSnapshot(); - if (!level_snapshot->snapshot) - *c.error = "Couldn't get a snapshot!"; + level_snapshot->snapshot = db.native->GetSnapshot(); + if (!level_snapshot->snapshot) + *c.error = "Couldn't get a snapshot!"; - *c.id = reinterpret_cast(level_snapshot); - db.snapshots[*c.id] = level_snapshot; + *c.id = reinterpret_cast(level_snapshot); + db.snapshots[*c.id] = level_snapshot; + } + catch (...) { + *c.error = "Snapshot Create Failure"; + } } void ustore_snapshot_export(ustore_snapshot_export_t* c_ptr) { @@ -272,25 +282,28 @@ void ustore_snapshot_export(ustore_snapshot_export_t* c_ptr) { } void ustore_snapshot_drop(ustore_snapshot_drop_t* c_ptr) { - if (!c_ptr) - return; ustore_snapshot_drop_t& c = *c_ptr; if (!c.id) return; - level_db_t& db = *reinterpret_cast(c.db); - level_snapshot_t& snap = *reinterpret_cast(c.id); - if (!snap.snapshot) - return; + try { + level_db_t& db = *reinterpret_cast(c.db); + level_snapshot_t& snap = *reinterpret_cast(c.id); + if (!snap.snapshot) + return; - db.native->ReleaseSnapshot(snap.snapshot); - snap.snapshot = nullptr; + db.native->ReleaseSnapshot(snap.snapshot); + snap.snapshot = nullptr; - auto id = reinterpret_cast(c.id); - db.mutex.lock(); - db.snapshots.erase(id); - db.mutex.unlock(); + auto id = reinterpret_cast(c.id); + db.mutex.lock(); + db.snapshots.erase(id); + db.mutex.unlock(); + } + catch (...) { + *c.error = "Snapshot Drop Failure"; + } } void write_one( // @@ -624,25 +637,30 @@ void ustore_collection_drop(ustore_collection_drop_t* c_ptr) { args_combo_k, "Collections not supported by LevelDB!"); - level_db_t& db = *reinterpret_cast(c.db); + try { + level_db_t& db = *reinterpret_cast(c.db); - leveldb::WriteBatch batch; - auto it = std::unique_ptr(db.native->NewIterator(leveldb::ReadOptions())); + leveldb::WriteBatch batch; + auto it = std::unique_ptr(db.native->NewIterator(leveldb::ReadOptions())); - if (c.mode == ustore_drop_keys_vals_k) { - for (it->SeekToFirst(); it->Valid(); it->Next()) - batch.Delete(it->key()); - } + if (c.mode == ustore_drop_keys_vals_k) { + for (it->SeekToFirst(); it->Valid(); it->Next()) + batch.Delete(it->key()); + } - else if (c.mode == ustore_drop_vals_k) { - for (it->SeekToFirst(); it->Valid(); it->Next()) - batch.Put(it->key(), leveldb::Slice()); - } + else if (c.mode == ustore_drop_vals_k) { + for (it->SeekToFirst(); it->Valid(); it->Next()) + batch.Put(it->key(), leveldb::Slice()); + } - leveldb::WriteOptions options; - options.sync = true; - level_status_t status = db.native->Write(options, &batch); - export_error(status, c.error); + leveldb::WriteOptions options; + options.sync = true; + level_status_t status = db.native->Write(options, &batch); + export_error(status, c.error); + } + catch (...) { + *c.error = "Collection Drop Failure"; + } } void ustore_collection_list(ustore_collection_list_t* c_ptr) { From 0c3227ae4275605b02b1dfe39bf5c58fbda0db7d Mon Sep 17 00:00:00 2001 From: Violeta Stepanyan <82702795+VioletaStepanyan@users.noreply.github.com> Date: Tue, 23 May 2023 11:03:38 +0400 Subject: [PATCH 4/6] Refactor: Use safe_section method for exeption hendling. --- src/engine_leveldb.cpp | 28 ++++++------------- src/engine_rocksdb.cpp | 63 ++++++++++++------------------------------ src/flight_client.cpp | 63 ++++++++++++------------------------------ 3 files changed, 44 insertions(+), 110 deletions(-) diff --git a/src/engine_leveldb.cpp b/src/engine_leveldb.cpp index 1c8683ce5..70ca9ab9f 100644 --- a/src/engine_leveldb.cpp +++ b/src/engine_leveldb.cpp @@ -194,7 +194,7 @@ void ustore_snapshot_list(ustore_snapshot_list_t* c_ptr) { return_error_if_m(c.db, c.error, uninitialized_state_k, "DataBase is uninitialized"); return_error_if_m(c.count && c.ids, c.error, args_combo_k, "Need outputs!"); - try { + safe_section("Geting Snapshot List", c.error, [&] { linked_memory_lock_t arena = linked_memory(c.arena, c.options, c.error); return_if_error_m(c.error); @@ -210,17 +210,14 @@ void ustore_snapshot_list(ustore_snapshot_list_t* c_ptr) { std::size_t i = 0; for (const auto& [id, _] : db.snapshots) ids[i++] = id; - } - catch (...) { - *c.error = "Snapshot List Failure"; - } + }); } void ustore_snapshot_create(ustore_snapshot_create_t* c_ptr) { ustore_snapshot_create_t& c = *c_ptr; return_error_if_m(c.db, c.error, uninitialized_state_k, "DataBase is uninitialized"); - try { + safe_section("Creating Snapshot", c.error, [&] { level_db_t& db = *reinterpret_cast(c.db); std::lock_guard locker(db.mutex); auto it = db.snapshots.find(*c.id); @@ -237,10 +234,7 @@ void ustore_snapshot_create(ustore_snapshot_create_t* c_ptr) { *c.id = reinterpret_cast(level_snapshot); db.snapshots[*c.id] = level_snapshot; - } - catch (...) { - *c.error = "Snapshot Create Failure"; - } + }); } void ustore_snapshot_export(ustore_snapshot_export_t* c_ptr) { @@ -287,7 +281,7 @@ void ustore_snapshot_drop(ustore_snapshot_drop_t* c_ptr) { if (!c.id) return; - try { + safe_section("Dropping Snapshot", c.error, [&] { level_db_t& db = *reinterpret_cast(c.db); level_snapshot_t& snap = *reinterpret_cast(c.id); if (!snap.snapshot) @@ -300,10 +294,7 @@ void ustore_snapshot_drop(ustore_snapshot_drop_t* c_ptr) { db.mutex.lock(); db.snapshots.erase(id); db.mutex.unlock(); - } - catch (...) { - *c.error = "Snapshot Drop Failure"; - } + }); } void write_one( // @@ -637,7 +628,7 @@ void ustore_collection_drop(ustore_collection_drop_t* c_ptr) { args_combo_k, "Collections not supported by LevelDB!"); - try { + safe_section("Dropping Collection", c.error, [&] { level_db_t& db = *reinterpret_cast(c.db); leveldb::WriteBatch batch; @@ -657,10 +648,7 @@ void ustore_collection_drop(ustore_collection_drop_t* c_ptr) { options.sync = true; level_status_t status = db.native->Write(options, &batch); export_error(status, c.error); - } - catch (...) { - *c.error = "Collection Drop Failure"; - } + }); } void ustore_collection_list(ustore_collection_list_t* c_ptr) { diff --git a/src/engine_rocksdb.cpp b/src/engine_rocksdb.cpp index 332130c7e..0ce50818c 100644 --- a/src/engine_rocksdb.cpp +++ b/src/engine_rocksdb.cpp @@ -244,7 +244,7 @@ void ustore_snapshot_list(ustore_snapshot_list_t* c_ptr) { return_error_if_m(c.db, c.error, uninitialized_state_k, "DataBase is uninitialized"); return_error_if_m(c.count && c.ids, c.error, args_combo_k, "Need outputs!"); - try { + safe_section("Geting Snapshot List", c.error, [&] { linked_memory_lock_t arena = linked_memory(c.arena, c.options, c.error); return_if_error_m(c.error); @@ -260,10 +260,7 @@ void ustore_snapshot_list(ustore_snapshot_list_t* c_ptr) { std::size_t i = 0; for (const auto& [id, _] : db.snapshots) ids[i++] = id; - } - catch (...) { - *c.error = "Snapshot List Failure"; - } + }); } void ustore_snapshot_create(ustore_snapshot_create_t* c_ptr) { @@ -271,7 +268,7 @@ void ustore_snapshot_create(ustore_snapshot_create_t* c_ptr) { ustore_snapshot_create_t& c = *c_ptr; return_error_if_m(c.db, c.error, uninitialized_state_k, "DataBase is uninitialized"); - try { + safe_section("Creating Snapshot", c.error, [&] { rocks_db_t& db = *reinterpret_cast(c.db); std::lock_guard locker(db.mutex); auto it = db.snapshots.find(*c.id); @@ -288,10 +285,7 @@ void ustore_snapshot_create(ustore_snapshot_create_t* c_ptr) { *c.id = reinterpret_cast(rocks_snapshot); db.snapshots[*c.id] = rocks_snapshot; - } - catch (...) { - *c.error = "Snapshot Create Failure"; - } + }); } void ustore_snapshot_export(ustore_snapshot_export_t* c_ptr) { @@ -299,7 +293,7 @@ void ustore_snapshot_export(ustore_snapshot_export_t* c_ptr) { ustore_snapshot_export_t& c = *c_ptr; return_error_if_m(c.db, c.error, uninitialized_state_k, "DataBase is uninitialized"); - try { + safe_section("Exporting Snapshot", c.error, [&] { rocks_db_t& db = *reinterpret_cast(c.db); rocksdb::Checkpoint* chp_ptr = nullptr; @@ -317,10 +311,7 @@ void ustore_snapshot_export(ustore_snapshot_export_t* c_ptr) { rocks_status_t status = chp_ptr->CreateCheckpoint(c.path, 0, &snapshot_id); export_error(status, c.error); - } - catch (...) { - *c.error = "Snapshot Export Failure"; - } + }); } void ustore_snapshot_drop(ustore_snapshot_drop_t* c_ptr) { @@ -329,7 +320,7 @@ void ustore_snapshot_drop(ustore_snapshot_drop_t* c_ptr) { if (!c.id) return; - try { + safe_section("Dropping Snapshot", c.error, [&] { rocks_db_t& db = *reinterpret_cast(c.db); rocks_snapshot_t& snap = *reinterpret_cast(c.id); if (!snap.snapshot) @@ -342,10 +333,7 @@ void ustore_snapshot_drop(ustore_snapshot_drop_t* c_ptr) { db.mutex.lock(); db.snapshots.erase(id); db.mutex.unlock(); - } - catch (...) { - *c.error = "Snapshot Drop Failure"; - } + }); } void write_one( // @@ -786,7 +774,7 @@ void ustore_collection_create(ustore_collection_create_t* c_ptr) { return_error_if_m(name_len, c.error, args_wrong_k, "Default collection is always present"); return_error_if_m(c.db, c.error, uninitialized_state_k, "DataBase is uninitialized"); - try { + safe_section("Creating Collection", c.error, [&] { rocks_db_t& db = *reinterpret_cast(c.db); for (auto handle : db.columns) { @@ -805,10 +793,7 @@ void ustore_collection_create(ustore_collection_create_t* c_ptr) { db.columns.push_back(collection); *c.id = reinterpret_cast(collection); } - } - catch (...) { - *c.error = "Collection Create Failure"; - } + }); } void ustore_collection_drop(ustore_collection_drop_t* c_ptr) { @@ -822,7 +807,7 @@ void ustore_collection_drop(ustore_collection_drop_t* c_ptr) { args_combo_k, "Default collection can't be invalidated."); - try { + safe_section("Dropping Collection", c.error, [&] { rocks_db_t& db = *reinterpret_cast(c.db); rocks_collection_t* collection_ptr = reinterpret_cast(c.id); rocks_collection_t* collection_ptr_to_clear = nullptr; @@ -873,10 +858,7 @@ void ustore_collection_drop(ustore_collection_drop_t* c_ptr) { export_error(status, c.error); return; } - } - catch (...) { - *c.error = "Collection Drop Failure"; - } + }); } void ustore_collection_list(ustore_collection_list_t* c_ptr) { @@ -885,7 +867,7 @@ void ustore_collection_list(ustore_collection_list_t* c_ptr) { return_error_if_m(c.db, c.error, uninitialized_state_k, "DataBase is uninitialized"); return_error_if_m(c.count && c.names, c.error, args_combo_k, "Need names and outputs!"); - try { + safe_section("Geting Collection List", c.error, [&] { linked_memory_lock_t arena = linked_memory(c.arena, c.options, c.error); return_if_error_m(c.error); @@ -922,10 +904,7 @@ void ustore_collection_list(ustore_collection_list_t* c_ptr) { ++i; } offs[i] = static_cast(names - *c.names); - } - catch (...) { - *c.error = "Collection List Failure"; - } + }); } void ustore_database_control(ustore_database_control_t* c_ptr) { @@ -942,7 +921,7 @@ void ustore_transaction_init(ustore_transaction_init_t* c_ptr) { validate_transaction_begin(c.transaction, c.options, c.error); return_if_error_m(c.error); - try { + safe_section("Initializing Transaction", c.error, [&] { bool const safe = c.options & ustore_option_write_flush_k; rocks_db_t& db = *reinterpret_cast(c.db); rocks_txn_t& txn = **reinterpret_cast(c.transaction); @@ -956,10 +935,7 @@ void ustore_transaction_init(ustore_transaction_init_t* c_ptr) { *c.error = "Couldn't start a transaction!"; else *c.transaction = new_txn; - } - catch (...) { - *c.error = "Transaction Init Failure"; - } + }); } void ustore_transaction_commit(ustore_transaction_commit_t* c_ptr) { @@ -970,7 +946,7 @@ void ustore_transaction_commit(ustore_transaction_commit_t* c_ptr) { validate_transaction_commit(c.transaction, c.options, c.error); return_if_error_m(c.error); - try { + safe_section("Committing Transaction", c.error, [&] { rocks_db_t& db = *reinterpret_cast(c.db); rocks_txn_t& txn = *reinterpret_cast(c.transaction); @@ -983,10 +959,7 @@ void ustore_transaction_commit(ustore_transaction_commit_t* c_ptr) { *c.sequence_number = db.native->GetLatestSequenceNumber(); db.mutex.unlock(); } - } - catch (...) { - *c.error = "Transaction Commit Failure"; - } + }); } void ustore_arena_free(ustore_arena_t c_arena) { diff --git a/src/flight_client.cpp b/src/flight_client.cpp index 782166302..51e28d68a 100644 --- a/src/flight_client.cpp +++ b/src/flight_client.cpp @@ -1311,7 +1311,7 @@ void ustore_collection_create(ustore_collection_create_t* c_ptr) { auto name_len = c.name ? std::strlen(c.name) : 0; return_error_if_m(name_len, c.error, args_wrong_k, "Default collection is always present"); - try { + safe_section("Creating Collection", c.error, [&] { rpc_client_t& db = *reinterpret_cast(c.db); arf::Action action; @@ -1337,10 +1337,7 @@ void ustore_collection_create(ustore_collection_create_t* c_ptr) { error_unknown_k, "Inadequate response"); std::memcpy(c.id, id_ptr->body->data(), sizeof(ustore_collection_t)); - } - catch (...) { - *c.error = "Collection Create Failure"; - } + }); } void ustore_collection_drop(ustore_collection_drop_t* c_ptr) { @@ -1348,7 +1345,7 @@ void ustore_collection_drop(ustore_collection_drop_t* c_ptr) { ustore_collection_drop_t& c = *c_ptr; return_error_if_m(c.db, c.error, uninitialized_state_k, "DataBase is uninitialized"); - try { + safe_section("Dropping Collection", c.error, [&] { std::string_view mode; switch (c.mode) { case ustore_drop_vals_k: mode = kParamDropModeValues; break; @@ -1372,10 +1369,7 @@ void ustore_collection_drop(ustore_collection_drop_t* c_ptr) { arf::FlightCallOptions options = arrow_call_options(pool); ar::Result> maybe_stream = db.flight->DoAction(options, action); return_error_if_m(maybe_stream.ok(), c.error, network_k, "Failed to act on Arrow server"); - } - catch (...) { - *c.error = "Collection Drop Failure"; - } + }); } void ustore_collection_list(ustore_collection_list_t* c_ptr) { @@ -1383,7 +1377,7 @@ void ustore_collection_list(ustore_collection_list_t* c_ptr) { ustore_collection_list_t& c = *c_ptr; return_error_if_m(c.db, c.error, uninitialized_state_k, "DataBase is uninitialized"); - try { + safe_section("Getting Collection List", c.error, [&] { rpc_client_t& db = *reinterpret_cast(c.db); if (!(c.options & ustore_option_dont_discard_memory_k)) db.readers.clear(); @@ -1426,10 +1420,7 @@ void ustore_collection_list(ustore_collection_list_t* c_ptr) { } db.readers.push_back(std::move(stream_ptr)); - } - catch (...) { - *c.error = "Collection List Failure"; - } + }); } void ustore_database_control(ustore_database_control_t* c_ptr) { @@ -1449,7 +1440,7 @@ void ustore_snapshot_list(ustore_snapshot_list_t* c_ptr) { ustore_snapshot_list_t& c = *c_ptr; return_error_if_m(c.db, c.error, uninitialized_state_k, "DataBase is uninitialized"); - try { + safe_section("Getting Snapshot List", c.error, [&] { linked_memory_lock_t arena = linked_memory(c.arena, c.options, c.error); return_if_error_m(c.error); @@ -1478,17 +1469,14 @@ void ustore_snapshot_list(ustore_snapshot_list_t* c_ptr) { *c.count = static_cast(batch_c.length); if (c.ids) *c.ids = (ustore_collection_t*)batch_c.children[*ids_column_idx]->buffers[1]; - } - catch (...) { - *c.error = "Snapshot List Failure"; - } + }); } void ustore_snapshot_create(ustore_snapshot_create_t* c_ptr) { ustore_snapshot_create_t& c = *c_ptr; return_error_if_m(c.db, c.error, uninitialized_state_k, "DataBase is uninitialized"); - try { + safe_section("Creating Snapshot", c.error, [&] { rpc_client_t& db = *reinterpret_cast(c.db); arf::Action action; @@ -1512,17 +1500,14 @@ void ustore_snapshot_create(ustore_snapshot_create_t* c_ptr) { error_unknown_k, "Inadequate response"); std::memcpy(c.id, id_ptr->body->data(), sizeof(ustore_snapshot_t)); - } - catch (...) { - *c.error = "Snapshot Create Failure"; - } + }); } void ustore_snapshot_export(ustore_snapshot_export_t* c_ptr) { ustore_snapshot_export_t& c = *c_ptr; return_error_if_m(c.db, c.error, uninitialized_state_k, "DataBase is uninitialized"); - try { + safe_section("Exporting Snapshot", c.error, [&] { rpc_client_t& db = *reinterpret_cast(c.db); arf::Action action; @@ -1539,17 +1524,14 @@ void ustore_snapshot_export(ustore_snapshot_export_t* c_ptr) { arf::FlightCallOptions options = arrow_call_options(pool); ar::Result> maybe_stream = db.flight->DoAction(options, action); return_error_if_m(maybe_stream.ok(), c.error, network_k, "Failed to act on Arrow server"); - } - catch (...) { - *c.error = "Snapshot Export Failure"; - } + }); } void ustore_snapshot_drop(ustore_snapshot_drop_t* c_ptr) { ustore_snapshot_drop_t& c = *c_ptr; return_error_if_m(c.db, c.error, uninitialized_state_k, "DataBase is uninitialized"); - try { + safe_section("Dropping Collection", c.error, [&] { rpc_client_t& db = *reinterpret_cast(c.db); arf::Action action; @@ -1560,10 +1542,7 @@ void ustore_snapshot_drop(ustore_snapshot_drop_t* c_ptr) { arf::FlightCallOptions options = arrow_call_options(pool); ar::Result> maybe_stream = db.flight->DoAction(options, action); return_error_if_m(maybe_stream.ok(), c.error, network_k, "Failed to act on Arrow server"); - } - catch (...) { - *c.error = "Snapshot Drop Failure"; - } + }); } /*********************************************************/ @@ -1576,7 +1555,7 @@ void ustore_transaction_init(ustore_transaction_init_t* c_ptr) { return_error_if_m(c.db, c.error, uninitialized_state_k, "DataBase is uninitialized"); return_error_if_m(c.transaction, c.error, uninitialized_state_k, "Transaction is uninitialized"); - try { + safe_section("Initializing Transaction", c.error, [&] { rpc_client_t& db = *reinterpret_cast(c.db); arf::Action action; @@ -1606,10 +1585,7 @@ void ustore_transaction_init(ustore_transaction_init_t* c_ptr) { error_unknown_k, "Inadequate response"); std::memcpy(c.transaction, id_ptr->body->data(), sizeof(ustore_transaction_t)); - } - catch (...) { - *c.error = "Transaction Init Failure"; - } + }); } void ustore_transaction_commit(ustore_transaction_commit_t* c_ptr) { @@ -1617,7 +1593,7 @@ void ustore_transaction_commit(ustore_transaction_commit_t* c_ptr) { ustore_transaction_commit_t& c = *c_ptr; return_error_if_m(c.transaction, c.error, uninitialized_state_k, "Transaction is uninitialized"); - try { + safe_section("Commiting Transaction", c.error, [&] { rpc_client_t& db = *reinterpret_cast(c.db); arf::Action action; @@ -1634,10 +1610,7 @@ void ustore_transaction_commit(ustore_transaction_commit_t* c_ptr) { arf::FlightCallOptions options = arrow_call_options(pool); ar::Result> maybe_stream = db.flight->DoAction(options, action); return_error_if_m(maybe_stream.ok(), c.error, network_k, "Failed to act on Arrow server"); - } - catch (...) { - *c.error = "Transaction Commit Failure"; - } + }); } /*********************************************************/ From 04654ad9d06007985be53e33ffa98d8cea542f2f Mon Sep 17 00:00:00 2001 From: Violeta Stepanyan <82702795+VioletaStepanyan@users.noreply.github.com> Date: Mon, 29 May 2023 09:41:29 +0400 Subject: [PATCH 5/6] Refactor: Spelling. --- src/engine_leveldb.cpp | 2 +- src/engine_rocksdb.cpp | 4 ++-- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/src/engine_leveldb.cpp b/src/engine_leveldb.cpp index 70ca9ab9f..9ed5ca774 100644 --- a/src/engine_leveldb.cpp +++ b/src/engine_leveldb.cpp @@ -194,7 +194,7 @@ void ustore_snapshot_list(ustore_snapshot_list_t* c_ptr) { return_error_if_m(c.db, c.error, uninitialized_state_k, "DataBase is uninitialized"); return_error_if_m(c.count && c.ids, c.error, args_combo_k, "Need outputs!"); - safe_section("Geting Snapshot List", c.error, [&] { + safe_section("Getting Snapshot List", c.error, [&] { linked_memory_lock_t arena = linked_memory(c.arena, c.options, c.error); return_if_error_m(c.error); diff --git a/src/engine_rocksdb.cpp b/src/engine_rocksdb.cpp index 0ce50818c..b6eb23a9a 100644 --- a/src/engine_rocksdb.cpp +++ b/src/engine_rocksdb.cpp @@ -244,7 +244,7 @@ void ustore_snapshot_list(ustore_snapshot_list_t* c_ptr) { return_error_if_m(c.db, c.error, uninitialized_state_k, "DataBase is uninitialized"); return_error_if_m(c.count && c.ids, c.error, args_combo_k, "Need outputs!"); - safe_section("Geting Snapshot List", c.error, [&] { + safe_section("Getting Snapshot List", c.error, [&] { linked_memory_lock_t arena = linked_memory(c.arena, c.options, c.error); return_if_error_m(c.error); @@ -867,7 +867,7 @@ void ustore_collection_list(ustore_collection_list_t* c_ptr) { return_error_if_m(c.db, c.error, uninitialized_state_k, "DataBase is uninitialized"); return_error_if_m(c.count && c.names, c.error, args_combo_k, "Need names and outputs!"); - safe_section("Geting Collection List", c.error, [&] { + safe_section("Getting Collection List", c.error, [&] { linked_memory_lock_t arena = linked_memory(c.arena, c.options, c.error); return_if_error_m(c.error); From d990ab4c722fba07e3a7cba23a7baa831cfa9744 Mon Sep 17 00:00:00 2001 From: violeta Date: Thu, 22 Jun 2023 12:53:06 +0400 Subject: [PATCH 6/6] Refactor: Make separate methods for eh. --- src/engine_rocksdb.cpp | 20 ++++++++++++-------- 1 file changed, 12 insertions(+), 8 deletions(-) diff --git a/src/engine_rocksdb.cpp b/src/engine_rocksdb.cpp index b6eb23a9a..0e31b36c2 100644 --- a/src/engine_rocksdb.cpp +++ b/src/engine_rocksdb.cpp @@ -244,7 +244,7 @@ void ustore_snapshot_list(ustore_snapshot_list_t* c_ptr) { return_error_if_m(c.db, c.error, uninitialized_state_k, "DataBase is uninitialized"); return_error_if_m(c.count && c.ids, c.error, args_combo_k, "Need outputs!"); - safe_section("Getting Snapshot List", c.error, [&] { + auto list_snap = [&] { linked_memory_lock_t arena = linked_memory(c.arena, c.options, c.error); return_if_error_m(c.error); @@ -260,7 +260,8 @@ void ustore_snapshot_list(ustore_snapshot_list_t* c_ptr) { std::size_t i = 0; for (const auto& [id, _] : db.snapshots) ids[i++] = id; - }); + }; + safe_section("Getting Snapshot List", c.error, list_snap); } void ustore_snapshot_create(ustore_snapshot_create_t* c_ptr) { @@ -268,7 +269,7 @@ void ustore_snapshot_create(ustore_snapshot_create_t* c_ptr) { ustore_snapshot_create_t& c = *c_ptr; return_error_if_m(c.db, c.error, uninitialized_state_k, "DataBase is uninitialized"); - safe_section("Creating Snapshot", c.error, [&] { + auto create_snap = [&] { rocks_db_t& db = *reinterpret_cast(c.db); std::lock_guard locker(db.mutex); auto it = db.snapshots.find(*c.id); @@ -285,7 +286,8 @@ void ustore_snapshot_create(ustore_snapshot_create_t* c_ptr) { *c.id = reinterpret_cast(rocks_snapshot); db.snapshots[*c.id] = rocks_snapshot; - }); + }; + safe_section("Creating Snapshot", c.error, create_snap); } void ustore_snapshot_export(ustore_snapshot_export_t* c_ptr) { @@ -293,7 +295,7 @@ void ustore_snapshot_export(ustore_snapshot_export_t* c_ptr) { ustore_snapshot_export_t& c = *c_ptr; return_error_if_m(c.db, c.error, uninitialized_state_k, "DataBase is uninitialized"); - safe_section("Exporting Snapshot", c.error, [&] { + auto export_snap = [&] { rocks_db_t& db = *reinterpret_cast(c.db); rocksdb::Checkpoint* chp_ptr = nullptr; @@ -311,7 +313,8 @@ void ustore_snapshot_export(ustore_snapshot_export_t* c_ptr) { rocks_status_t status = chp_ptr->CreateCheckpoint(c.path, 0, &snapshot_id); export_error(status, c.error); - }); + }; + safe_section("Exporting Snapshot", c.error, export_snap); } void ustore_snapshot_drop(ustore_snapshot_drop_t* c_ptr) { @@ -320,7 +323,7 @@ void ustore_snapshot_drop(ustore_snapshot_drop_t* c_ptr) { if (!c.id) return; - safe_section("Dropping Snapshot", c.error, [&] { + auto drop_snap = [&] { rocks_db_t& db = *reinterpret_cast(c.db); rocks_snapshot_t& snap = *reinterpret_cast(c.id); if (!snap.snapshot) @@ -333,7 +336,8 @@ void ustore_snapshot_drop(ustore_snapshot_drop_t* c_ptr) { db.mutex.lock(); db.snapshots.erase(id); db.mutex.unlock(); - }); + }; + safe_section("Dropping Snapshot", c.error, drop_snap); } void write_one( //