From ba952b067d3c81f62ac734b219eb3f5fed529cc3 Mon Sep 17 00:00:00 2001 From: Zhang Lei Date: Thu, 19 Dec 2024 11:41:04 +0800 Subject: [PATCH] feat(interactive): Make use of all available cpus at startup (#4343) At startup, try to make use of all available cpus on hosts. Also fix a deep copy bug in `test_robustness.py`. The CI failures are due to rust version, and will be fixed in PR: https://github.com/alibaba/GraphScope/pull/4373 --- .github/workflows/pr-check.yml | 6 ++- .../graph_db/runtime/adhoc/expr_impl.h | 28 ++++++---- .../handler/graph_db_http_handler.cc | 2 +- .../handler/graph_db_http_handler.h | 2 +- .../python/gs_interactive/client/status.py | 3 ++ .../python/gs_interactive/tests/conftest.py | 31 +++++++++-- .../gs_interactive/tests/test_robustness.py | 42 ++++++++++++++- flex/interactive/sdk/python/setup.cfg | 2 +- .../metadata/local_file_metadata_store.cc | 53 +++++++++++-------- .../metadata/local_file_metadata_store.h | 8 ++- flex/tests/hqps/hqps_robust_test.sh | 2 +- .../graphscope/common/ir/meta/GraphId.java | 5 ++ k8s/dockerfiles/interactive-entrypoint.sh | 3 ++ 13 files changed, 142 insertions(+), 45 deletions(-) diff --git a/.github/workflows/pr-check.yml b/.github/workflows/pr-check.yml index 7f59f2182263..a66be7b6d96c 100644 --- a/.github/workflows/pr-check.yml +++ b/.github/workflows/pr-check.yml @@ -213,11 +213,15 @@ jobs: python3 -m black --check --diff . python3 -m flake8 . popd - pushd flex/interactive/sdk/python + # we need to generate the code first + pushd flex/interactive/sdk + bash generate_sdk.sh -g python + pushd python python3 -m isort --check --diff . python3 -m black --check --diff . python3 -m flake8 . popd + popd - name: Generate Docs shell: bash diff --git a/flex/engines/graph_db/runtime/adhoc/expr_impl.h b/flex/engines/graph_db/runtime/adhoc/expr_impl.h index 20a764de0696..91046e31e19c 100644 --- a/flex/engines/graph_db/runtime/adhoc/expr_impl.h +++ b/flex/engines/graph_db/runtime/adhoc/expr_impl.h @@ -85,17 +85,23 @@ class WithInExpr : public ExprBase { WithInExpr(const ReadTransaction& txn, const Context& ctx, std::unique_ptr&& key, const common::Value& array) : key_(std::move(key)) { - if constexpr (std::is_same_v) { - CHECK(array.item_case() == common::Value::kI64Array); - size_t len = array.i64_array().item_size(); - for (size_t idx = 0; idx < len; ++idx) { - container_.push_back(array.i64_array().item(idx)); - } - } else if constexpr (std::is_same_v) { - CHECK(array.item_case() == common::Value::kI32Array); - size_t len = array.i32_array().item_size(); - for (size_t idx = 0; idx < len; ++idx) { - container_.push_back(array.i32_array().item(idx)); + if constexpr ((std::is_same_v) || + (std::is_same_v) ) { + // Implicitly convert to T + if (array.item_case() == common::Value::kI64Array) { + size_t len = array.i64_array().item_size(); + for (size_t idx = 0; idx < len; ++idx) { + container_.push_back(array.i64_array().item(idx)); + } + } else if (array.item_case() == common::Value::kI32Array) { + size_t len = array.i32_array().item_size(); + for (size_t idx = 0; idx < len; ++idx) { + container_.push_back(array.i32_array().item(idx)); + } + } else { + LOG(FATAL) << "Fail to construct WithInExpr of type " + << typeid(T).name() << " with array of type " + << array.item_case(); } } else if constexpr (std::is_same_v) { CHECK(array.item_case() == common::Value::kStrArray); diff --git a/flex/engines/http_server/handler/graph_db_http_handler.cc b/flex/engines/http_server/handler/graph_db_http_handler.cc index 319920459058..4ff8c9e540ff 100644 --- a/flex/engines/http_server/handler/graph_db_http_handler.cc +++ b/flex/engines/http_server/handler/graph_db_http_handler.cc @@ -176,7 +176,7 @@ class stored_proc_handler : public StoppableHandler { bool start() override { if (get_executors()[StoppableHandler::shard_id()].size() > 0) { - LOG(ERROR) << "The actors have been already created!"; + VLOG(10) << "The actors have been already created!"; return false; } return StoppableHandler::start_scope( diff --git a/flex/engines/http_server/handler/graph_db_http_handler.h b/flex/engines/http_server/handler/graph_db_http_handler.h index 6bc5c906910e..22090e66dc41 100644 --- a/flex/engines/http_server/handler/graph_db_http_handler.h +++ b/flex/engines/http_server/handler/graph_db_http_handler.h @@ -69,7 +69,7 @@ class StoppableHandler : public seastar::httpd::handler_base { } catch (const std::exception& e) { // In case the scope is already cancelled, we should ignore the // exception. - LOG(INFO) << "Failed to cancel IC scope: " << e.what(); + VLOG(1) << "Failed to cancel IC scope: " << e.what(); } func(); return seastar::make_ready_future<>(); diff --git a/flex/interactive/sdk/python/gs_interactive/client/status.py b/flex/interactive/sdk/python/gs_interactive/client/status.py index 13af0e0ed660..21564752ecdf 100644 --- a/flex/interactive/sdk/python/gs_interactive/client/status.py +++ b/flex/interactive/sdk/python/gs_interactive/client/status.py @@ -23,6 +23,7 @@ from gs_interactive.exceptions import NotFoundException from gs_interactive.exceptions import ServiceException from urllib3.exceptions import MaxRetryError +from urllib3.exceptions import ProtocolError from gs_interactive.client.generated.interactive_pb2 import Code as StatusCode from gs_interactive.models.api_response_with_code import APIResponseWithCode @@ -108,6 +109,8 @@ def from_exception(exception: ApiException): return Status(StatusCode.INTERNAL_ERROR, exception.body) elif isinstance(exception, MaxRetryError): return Status(StatusCode.INTERNAL_ERROR, exception) + elif isinstance(exception, ProtocolError): + return Status(StatusCode.INTERNAL_ERROR, exception) return Status( StatusCode.UNKNOWN, "Unknown Error from exception " + exception.body ) diff --git a/flex/interactive/sdk/python/gs_interactive/tests/conftest.py b/flex/interactive/sdk/python/gs_interactive/tests/conftest.py index 94f0b34a25f2..3617115b4740 100644 --- a/flex/interactive/sdk/python/gs_interactive/tests/conftest.py +++ b/flex/interactive/sdk/python/gs_interactive/tests/conftest.py @@ -17,6 +17,7 @@ # # get the directory of the current file +import copy import os import time @@ -27,6 +28,7 @@ from gs_interactive.client.session import Session from gs_interactive.models import CreateGraphRequest from gs_interactive.models import CreateProcedureRequest +from gs_interactive.models import GetGraphSchemaResponse from gs_interactive.models import SchemaMapping from gs_interactive.models import StartServiceRequest from gs_interactive.models import UpdateProcedureRequest @@ -39,7 +41,7 @@ modern_graph_full = { - "name": "modern_graph", + "name": "full_graph", "description": "This is a test graph", "schema": { "vertex_types": [ @@ -120,7 +122,7 @@ } modern_graph_vertex_only = { - "name": "modern_graph", + "name": "vertex_only", "description": "This is a test graph, only contains vertex", "schema": { "vertex_types": [ @@ -148,7 +150,7 @@ } modern_graph_partial = { - "name": "modern_graph", + "name": "partial_graph", "description": "This is a test graph", "schema": { "vertex_types": [ @@ -336,7 +338,7 @@ def create_partial_modern_graph(interactive_session): @pytest.fixture(scope="function") def create_graph_with_custom_pk_name(interactive_session): - modern_graph_custom_pk_name = modern_graph_full.copy() + modern_graph_custom_pk_name = copy.deepcopy(modern_graph_full) for vertex_type in modern_graph_custom_pk_name["schema"]["vertex_types"]: vertex_type["properties"][0]["property_name"] = "custom_id" vertex_type["primary_keys"] = ["custom_id"] @@ -479,3 +481,24 @@ def start_service_on_graph(interactive_session, graph_id: str): assert resp.is_ok() # wait three second to let compiler get the new graph time.sleep(3) + + +def ensure_compiler_schema_ready( + interactive_session, neo4j_session: Neo4jSession, graph_id: str +): + rel_graph_meta = interactive_session.get_graph_schema(graph_id).get_value() + max_times = 10 + while True: + if max_times == 0: + raise Exception("compiler schema is not ready") + res = neo4j_session.run("CALL gs.procedure.meta.schema();") + val = res.single().value() + compiler_graph_schema = GetGraphSchemaResponse.from_json(val) + # print("compiler_graph_schema: ", compiler_graph_schema) + # print("rel_graph_meta: ", rel_graph_meta) + if compiler_graph_schema == rel_graph_meta: + break + print("compiler schema is not ready, wait for 1 second") + time.sleep(1) + max_times -= 1 + print("compiler schema is ready") diff --git a/flex/interactive/sdk/python/gs_interactive/tests/test_robustness.py b/flex/interactive/sdk/python/gs_interactive/tests/test_robustness.py index a144a1e07948..2fb50d138f4e 100644 --- a/flex/interactive/sdk/python/gs_interactive/tests/test_robustness.py +++ b/flex/interactive/sdk/python/gs_interactive/tests/test_robustness.py @@ -27,6 +27,7 @@ from gs_interactive.tests.conftest import call_procedure # noqa: E402 from gs_interactive.tests.conftest import create_procedure from gs_interactive.tests.conftest import delete_procedure +from gs_interactive.tests.conftest import ensure_compiler_schema_ready from gs_interactive.tests.conftest import import_data_to_full_modern_graph from gs_interactive.tests.conftest import import_data_to_partial_modern_graph from gs_interactive.tests.conftest import import_data_to_vertex_only_modern_graph @@ -61,6 +62,9 @@ def test_query_on_vertex_only_graph( """ print("[Query on vertex only graph]") start_service_on_graph(interactive_session, create_vertex_only_modern_graph) + ensure_compiler_schema_ready( + interactive_session, neo4j_session, create_vertex_only_modern_graph + ) run_cypher_test_suite( neo4j_session, create_vertex_only_modern_graph, vertex_only_cypher_queries ) @@ -69,6 +73,10 @@ def test_query_on_vertex_only_graph( import_data_to_vertex_only_modern_graph( interactive_session, create_vertex_only_modern_graph ) + start_service_on_graph(interactive_session, create_vertex_only_modern_graph) + ensure_compiler_schema_ready( + interactive_session, neo4j_session, create_vertex_only_modern_graph + ) run_cypher_test_suite( neo4j_session, create_vertex_only_modern_graph, vertex_only_cypher_queries ) @@ -83,12 +91,19 @@ def test_query_on_partial_graph( print("[Query on partial graph]") # start service on new graph start_service_on_graph(interactive_session, create_partial_modern_graph) + ensure_compiler_schema_ready( + interactive_session, neo4j_session, create_partial_modern_graph + ) # try to query on the graph run_cypher_test_suite(neo4j_session, create_partial_modern_graph, cypher_queries) start_service_on_graph(interactive_session, "1") import_data_to_partial_modern_graph( interactive_session, create_partial_modern_graph ) + start_service_on_graph(interactive_session, create_partial_modern_graph) + ensure_compiler_schema_ready( + interactive_session, neo4j_session, create_partial_modern_graph + ) run_cypher_test_suite(neo4j_session, create_partial_modern_graph, cypher_queries) @@ -100,10 +115,17 @@ def test_query_on_full_modern_graph( """ print("[Query on full modern graph]") start_service_on_graph(interactive_session, create_modern_graph) + ensure_compiler_schema_ready( + interactive_session, neo4j_session, create_modern_graph + ) # try to query on the graph run_cypher_test_suite(neo4j_session, create_modern_graph, cypher_queries) start_service_on_graph(interactive_session, "1") import_data_to_full_modern_graph(interactive_session, create_modern_graph) + start_service_on_graph(interactive_session, create_modern_graph) + ensure_compiler_schema_ready( + interactive_session, neo4j_session, create_modern_graph + ) run_cypher_test_suite(neo4j_session, create_modern_graph, cypher_queries) @@ -129,6 +151,9 @@ def test_service_switching( ) print("Procedure id: ", a_proc_id) start_service_on_graph(interactive_session, create_modern_graph) + ensure_compiler_schema_ready( + interactive_session, neo4j_session, create_modern_graph + ) call_procedure(neo4j_session, create_modern_graph, a_proc_id) # create procedure on graph_b_id @@ -139,6 +164,9 @@ def test_service_switching( "MATCH(n: person) return count(n);", ) start_service_on_graph(interactive_session, create_vertex_only_modern_graph) + ensure_compiler_schema_ready( + interactive_session, neo4j_session, create_vertex_only_modern_graph + ) call_procedure(neo4j_session, create_vertex_only_modern_graph, b_proc_id) @@ -156,6 +184,9 @@ def test_procedure_creation(interactive_session, neo4j_session, create_modern_gr ) print("Procedure id: ", a_proc_id) start_service_on_graph(interactive_session, create_modern_graph) + ensure_compiler_schema_ready( + interactive_session, neo4j_session, create_modern_graph + ) call_procedure(neo4j_session, create_modern_graph, a_proc_id) # create procedure with name containing space, @@ -202,6 +233,9 @@ def test_builtin_procedure(interactive_session, neo4j_session, create_modern_gra ) # Call the builtin procedure start_service_on_graph(interactive_session, create_modern_graph) + ensure_compiler_schema_ready( + interactive_session, neo4j_session, create_modern_graph + ) call_procedure( neo4j_session, create_modern_graph, @@ -259,6 +293,10 @@ def test_list_jobs(interactive_session, create_vertex_only_modern_graph): def test_call_proc_in_cypher(interactive_session, neo4j_session, create_modern_graph): print("[Test call procedure in cypher]") import_data_to_full_modern_graph(interactive_session, create_modern_graph) + start_service_on_graph(interactive_session, create_modern_graph) + ensure_compiler_schema_ready( + interactive_session, neo4j_session, create_modern_graph + ) result = neo4j_session.run( 'MATCH(p: person) with p.id as oid CALL k_neighbors("person", oid, 1) return label_name, vertex_oid;' ) @@ -276,6 +314,9 @@ def test_custom_pk_name( interactive_session, create_graph_with_custom_pk_name ) start_service_on_graph(interactive_session, create_graph_with_custom_pk_name) + ensure_compiler_schema_ready( + interactive_session, neo4j_session, create_graph_with_custom_pk_name + ) result = neo4j_session.run( "MATCH (n: person) where n.custom_id = 4 return n.custom_id;" ) @@ -289,4 +330,3 @@ def test_custom_pk_name( ) records = result.fetch(1) assert len(records) == 1 and records[0]["$f0"] == 2 - start_service_on_graph(interactive_session, "1") diff --git a/flex/interactive/sdk/python/setup.cfg b/flex/interactive/sdk/python/setup.cfg index 1e1839250158..255781291114 100644 --- a/flex/interactive/sdk/python/setup.cfg +++ b/flex/interactive/sdk/python/setup.cfg @@ -3,7 +3,7 @@ profile = black ensure_newline_before_comments = True line_length = 88 force_single_line = True -skip = build/,dist/,gs_interactive/api/,gs_interactive/api_response.py,gs_interactive/configuration.py,gs_interactive/exceptions.py,gs_interactive/models/,gs_interactiverest.py, +skip = build/,dist/,gs_interactive/api/,gs_interactive/api_response.py,gs_interactive/configuration.py,gs_interactive/exceptions.py,gs_interactive/models/,gs_interactiverest.py,gs_interactive/api_client.py,gs_interactive/__init__.py,gs_interactive/rest.py skip_glob = *_pb2.py,*_pb2_grpc.py,build/* [flake8] diff --git a/flex/storages/metadata/local_file_metadata_store.cc b/flex/storages/metadata/local_file_metadata_store.cc index 5f77d0cc2e7a..bcff4bf4ed38 100644 --- a/flex/storages/metadata/local_file_metadata_store.cc +++ b/flex/storages/metadata/local_file_metadata_store.cc @@ -181,8 +181,8 @@ Result LocalFileMetadataStore::UpdateMeta(const meta_kind_t& meta_kind, Result LocalFileMetadataStore::get_next_meta_key( - const LocalFileMetadataStore::meta_kind_t& meta_kind) const { - return std::to_string(get_max_id(meta_kind) + 1); + const LocalFileMetadataStore::meta_kind_t& meta_kind) { + return std::to_string(increase_and_get_id(meta_kind)); } std::string LocalFileMetadataStore::get_root_meta_dir() const { @@ -208,29 +208,38 @@ std::string LocalFileMetadataStore::get_meta_file(const meta_kind_t& meta_kind, return ret; } -int32_t LocalFileMetadataStore::get_max_id(const meta_kind_t& meta_kind) const { - // iterate all files in the directory, get the max id. - int max_id_ = 0; +// Guarded by meta_mutex_ outside. +int32_t LocalFileMetadataStore::increase_and_get_id( + const meta_kind_t& meta_kind) { auto dir = get_meta_kind_dir(meta_kind); - for (auto& p : std::filesystem::directory_iterator(dir)) { - if (std::filesystem::is_directory(p)) { - continue; - } - auto file_name = p.path().filename().string(); - if (file_name.find(META_FILE_PREFIX) != std::string::npos) { - auto id_str = file_name.substr(strlen(META_FILE_PREFIX)); - int32_t id; - try { - id = std::stoi(id_str); - } catch (std::invalid_argument& e) { - LOG(ERROR) << "Invalid id: " << id_str; - continue; - } - if (id > max_id_) { - max_id_ = id; - } + int max_id_ = 0; + // In the directory, we expect a file with name CUR_ID_FILE_NAME. + // If the file does not exist, we will create one with content "0". + auto cur_id_file = dir + "/" + CUR_ID_FILE_NAME; + if (!std::filesystem::exists(cur_id_file)) { + std::ofstream out_file(cur_id_file); + if (!out_file.is_open()) { + LOG(ERROR) << "Failed to create file: " << cur_id_file; + return -1; } + out_file << "0"; + out_file.close(); + } + std::ifstream in_file(cur_id_file); + if (!in_file.is_open()) { + LOG(ERROR) << "Failed to open file: " << cur_id_file; + return -1; } + in_file >> max_id_; + in_file.close(); + max_id_++; + std::ofstream out_file(cur_id_file); + if (!out_file.is_open()) { + LOG(ERROR) << "Failed to open file: " << cur_id_file; + return -1; + } + out_file << max_id_; + out_file.close(); return max_id_; } diff --git a/flex/storages/metadata/local_file_metadata_store.h b/flex/storages/metadata/local_file_metadata_store.h index 186aa68e7efd..bada3778d535 100644 --- a/flex/storages/metadata/local_file_metadata_store.h +++ b/flex/storages/metadata/local_file_metadata_store.h @@ -48,6 +48,7 @@ class LocalFileMetadataStore : public IMetaStore { static constexpr const char* METADATA_DIR = "METADATA"; static constexpr const char* META_FILE_PREFIX = "META_"; + static constexpr const char* CUR_ID_FILE_NAME = "CUR_ID"; LocalFileMetadataStore(const std::string& path); @@ -110,12 +111,15 @@ class LocalFileMetadataStore : public IMetaStore { update_func_t update_func) override; private: - Result get_next_meta_key(const meta_kind_t& meta_kind) const; + Result get_next_meta_key(const meta_kind_t& meta_kind); std::string get_root_meta_dir() const; std::string get_meta_kind_dir(const meta_kind_t& meta_kind) const; std::string get_meta_file(const meta_kind_t& meta_kind, const meta_key_t& meta_key) const; - int32_t get_max_id(const meta_kind_t& meta_kind) const; + /** + * For the specified meta_kind, increase the id and return the new id. + */ + int32_t increase_and_get_id(const meta_kind_t& meta_kind); bool is_key_exist(const meta_kind_t& meta_kind, const meta_key_t& meta_key) const; diff --git a/flex/tests/hqps/hqps_robust_test.sh b/flex/tests/hqps/hqps_robust_test.sh index 8090d8be8164..af21a57459ac 100644 --- a/flex/tests/hqps/hqps_robust_test.sh +++ b/flex/tests/hqps/hqps_robust_test.sh @@ -82,7 +82,7 @@ start_engine_service(){ fi cmd="${SERVER_BIN} -c ${config_path} --enable-admin-service true " - cmd="${cmd} -w ${INTERACTIVE_WORKSPACE} --start-compiler true &" + cmd="${cmd} -w ${INTERACTIVE_WORKSPACE} --start-compiler true > /tmp/engine.log 2>&1 & " echo "Start engine service with command: ${cmd}" eval ${cmd} diff --git a/interactive_engine/compiler/src/main/java/com/alibaba/graphscope/common/ir/meta/GraphId.java b/interactive_engine/compiler/src/main/java/com/alibaba/graphscope/common/ir/meta/GraphId.java index 8ddf0d75a511..4c63425263cd 100644 --- a/interactive_engine/compiler/src/main/java/com/alibaba/graphscope/common/ir/meta/GraphId.java +++ b/interactive_engine/compiler/src/main/java/com/alibaba/graphscope/common/ir/meta/GraphId.java @@ -51,4 +51,9 @@ public boolean equals(Object o) { public int hashCode() { return Objects.hashCode(id); } + + @Override + public String toString() { + return "GraphId{" + "id=" + id + '}'; + } } diff --git a/k8s/dockerfiles/interactive-entrypoint.sh b/k8s/dockerfiles/interactive-entrypoint.sh index 009323f6c31f..76715ec609d6 100644 --- a/k8s/dockerfiles/interactive-entrypoint.sh +++ b/k8s/dockerfiles/interactive-entrypoint.sh @@ -57,6 +57,9 @@ function prepare_workspace() { cp /opt/flex/share/interactive_config.yaml $engine_config_path #make sure the line which start with default_graph is changed to default_graph: ${DEFAULT_GRAPH_NAME} sed -i "s/default_graph:.*/default_graph: ${DEFAULT_GRAPH_NAME}/" $engine_config_path + # By default, we occupy the all available cpus + cpus=$(grep -c ^processor /proc/cpuinfo) + sed -i "s/thread_num_per_worker:.*/thread_num_per_worker: ${cpus}/" $engine_config_path echo "Using default graph: ${DEFAULT_GRAPH_NAME} to start the service" # copy the builtin graph