Skip to content

Commit

Permalink
feat(interactive): Make use of all available cpus at startup (#4343)
Browse files Browse the repository at this point in the history
At startup, try to make use of all available cpus on hosts.
Also fix a deep copy bug in `test_robustness.py`.

The CI failures are due to rust version, and will be fixed in PR:
#4373
  • Loading branch information
zhanglei1949 authored Dec 19, 2024
1 parent bbe9295 commit ba952b0
Show file tree
Hide file tree
Showing 13 changed files with 142 additions and 45 deletions.
6 changes: 5 additions & 1 deletion .github/workflows/pr-check.yml
Original file line number Diff line number Diff line change
Expand Up @@ -213,11 +213,15 @@ jobs:
python3 -m black --check --diff .
python3 -m flake8 .
popd
pushd flex/interactive/sdk/python
# we need to generate the code first
pushd flex/interactive/sdk
bash generate_sdk.sh -g python
pushd python
python3 -m isort --check --diff .
python3 -m black --check --diff .
python3 -m flake8 .
popd
popd
- name: Generate Docs
shell: bash
Expand Down
28 changes: 17 additions & 11 deletions flex/engines/graph_db/runtime/adhoc/expr_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -85,17 +85,23 @@ class WithInExpr : public ExprBase {
WithInExpr(const ReadTransaction& txn, const Context& ctx,
std::unique_ptr<ExprBase>&& key, const common::Value& array)
: key_(std::move(key)) {
if constexpr (std::is_same_v<T, int64_t>) {
CHECK(array.item_case() == common::Value::kI64Array);
size_t len = array.i64_array().item_size();
for (size_t idx = 0; idx < len; ++idx) {
container_.push_back(array.i64_array().item(idx));
}
} else if constexpr (std::is_same_v<T, int32_t>) {
CHECK(array.item_case() == common::Value::kI32Array);
size_t len = array.i32_array().item_size();
for (size_t idx = 0; idx < len; ++idx) {
container_.push_back(array.i32_array().item(idx));
if constexpr ((std::is_same_v<T, int64_t>) ||
(std::is_same_v<T, int32_t>) ) {
// Implicitly convert to T
if (array.item_case() == common::Value::kI64Array) {
size_t len = array.i64_array().item_size();
for (size_t idx = 0; idx < len; ++idx) {
container_.push_back(array.i64_array().item(idx));
}
} else if (array.item_case() == common::Value::kI32Array) {
size_t len = array.i32_array().item_size();
for (size_t idx = 0; idx < len; ++idx) {
container_.push_back(array.i32_array().item(idx));
}
} else {
LOG(FATAL) << "Fail to construct WithInExpr of type "
<< typeid(T).name() << " with array of type "
<< array.item_case();
}
} else if constexpr (std::is_same_v<T, std::string>) {
CHECK(array.item_case() == common::Value::kStrArray);
Expand Down
2 changes: 1 addition & 1 deletion flex/engines/http_server/handler/graph_db_http_handler.cc
Original file line number Diff line number Diff line change
Expand Up @@ -176,7 +176,7 @@ class stored_proc_handler : public StoppableHandler {

bool start() override {
if (get_executors()[StoppableHandler::shard_id()].size() > 0) {
LOG(ERROR) << "The actors have been already created!";
VLOG(10) << "The actors have been already created!";
return false;
}
return StoppableHandler::start_scope(
Expand Down
2 changes: 1 addition & 1 deletion flex/engines/http_server/handler/graph_db_http_handler.h
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@ class StoppableHandler : public seastar::httpd::handler_base {
} catch (const std::exception& e) {
// In case the scope is already cancelled, we should ignore the
// exception.
LOG(INFO) << "Failed to cancel IC scope: " << e.what();
VLOG(1) << "Failed to cancel IC scope: " << e.what();
}
func();
return seastar::make_ready_future<>();
Expand Down
3 changes: 3 additions & 0 deletions flex/interactive/sdk/python/gs_interactive/client/status.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
from gs_interactive.exceptions import NotFoundException
from gs_interactive.exceptions import ServiceException
from urllib3.exceptions import MaxRetryError
from urllib3.exceptions import ProtocolError

from gs_interactive.client.generated.interactive_pb2 import Code as StatusCode
from gs_interactive.models.api_response_with_code import APIResponseWithCode
Expand Down Expand Up @@ -108,6 +109,8 @@ def from_exception(exception: ApiException):
return Status(StatusCode.INTERNAL_ERROR, exception.body)
elif isinstance(exception, MaxRetryError):
return Status(StatusCode.INTERNAL_ERROR, exception)
elif isinstance(exception, ProtocolError):
return Status(StatusCode.INTERNAL_ERROR, exception)
return Status(
StatusCode.UNKNOWN, "Unknown Error from exception " + exception.body
)
Expand Down
31 changes: 27 additions & 4 deletions flex/interactive/sdk/python/gs_interactive/tests/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
#

# get the directory of the current file
import copy
import os
import time

Expand All @@ -27,6 +28,7 @@
from gs_interactive.client.session import Session
from gs_interactive.models import CreateGraphRequest
from gs_interactive.models import CreateProcedureRequest
from gs_interactive.models import GetGraphSchemaResponse
from gs_interactive.models import SchemaMapping
from gs_interactive.models import StartServiceRequest
from gs_interactive.models import UpdateProcedureRequest
Expand All @@ -39,7 +41,7 @@


modern_graph_full = {
"name": "modern_graph",
"name": "full_graph",
"description": "This is a test graph",
"schema": {
"vertex_types": [
Expand Down Expand Up @@ -120,7 +122,7 @@
}

modern_graph_vertex_only = {
"name": "modern_graph",
"name": "vertex_only",
"description": "This is a test graph, only contains vertex",
"schema": {
"vertex_types": [
Expand Down Expand Up @@ -148,7 +150,7 @@
}

modern_graph_partial = {
"name": "modern_graph",
"name": "partial_graph",
"description": "This is a test graph",
"schema": {
"vertex_types": [
Expand Down Expand Up @@ -336,7 +338,7 @@ def create_partial_modern_graph(interactive_session):

@pytest.fixture(scope="function")
def create_graph_with_custom_pk_name(interactive_session):
modern_graph_custom_pk_name = modern_graph_full.copy()
modern_graph_custom_pk_name = copy.deepcopy(modern_graph_full)
for vertex_type in modern_graph_custom_pk_name["schema"]["vertex_types"]:
vertex_type["properties"][0]["property_name"] = "custom_id"
vertex_type["primary_keys"] = ["custom_id"]
Expand Down Expand Up @@ -479,3 +481,24 @@ def start_service_on_graph(interactive_session, graph_id: str):
assert resp.is_ok()
# wait three second to let compiler get the new graph
time.sleep(3)


def ensure_compiler_schema_ready(
interactive_session, neo4j_session: Neo4jSession, graph_id: str
):
rel_graph_meta = interactive_session.get_graph_schema(graph_id).get_value()
max_times = 10
while True:
if max_times == 0:
raise Exception("compiler schema is not ready")
res = neo4j_session.run("CALL gs.procedure.meta.schema();")
val = res.single().value()
compiler_graph_schema = GetGraphSchemaResponse.from_json(val)
# print("compiler_graph_schema: ", compiler_graph_schema)
# print("rel_graph_meta: ", rel_graph_meta)
if compiler_graph_schema == rel_graph_meta:
break
print("compiler schema is not ready, wait for 1 second")
time.sleep(1)
max_times -= 1
print("compiler schema is ready")
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
from gs_interactive.tests.conftest import call_procedure # noqa: E402
from gs_interactive.tests.conftest import create_procedure
from gs_interactive.tests.conftest import delete_procedure
from gs_interactive.tests.conftest import ensure_compiler_schema_ready
from gs_interactive.tests.conftest import import_data_to_full_modern_graph
from gs_interactive.tests.conftest import import_data_to_partial_modern_graph
from gs_interactive.tests.conftest import import_data_to_vertex_only_modern_graph
Expand Down Expand Up @@ -61,6 +62,9 @@ def test_query_on_vertex_only_graph(
"""
print("[Query on vertex only graph]")
start_service_on_graph(interactive_session, create_vertex_only_modern_graph)
ensure_compiler_schema_ready(
interactive_session, neo4j_session, create_vertex_only_modern_graph
)
run_cypher_test_suite(
neo4j_session, create_vertex_only_modern_graph, vertex_only_cypher_queries
)
Expand All @@ -69,6 +73,10 @@ def test_query_on_vertex_only_graph(
import_data_to_vertex_only_modern_graph(
interactive_session, create_vertex_only_modern_graph
)
start_service_on_graph(interactive_session, create_vertex_only_modern_graph)
ensure_compiler_schema_ready(
interactive_session, neo4j_session, create_vertex_only_modern_graph
)
run_cypher_test_suite(
neo4j_session, create_vertex_only_modern_graph, vertex_only_cypher_queries
)
Expand All @@ -83,12 +91,19 @@ def test_query_on_partial_graph(
print("[Query on partial graph]")
# start service on new graph
start_service_on_graph(interactive_session, create_partial_modern_graph)
ensure_compiler_schema_ready(
interactive_session, neo4j_session, create_partial_modern_graph
)
# try to query on the graph
run_cypher_test_suite(neo4j_session, create_partial_modern_graph, cypher_queries)
start_service_on_graph(interactive_session, "1")
import_data_to_partial_modern_graph(
interactive_session, create_partial_modern_graph
)
start_service_on_graph(interactive_session, create_partial_modern_graph)
ensure_compiler_schema_ready(
interactive_session, neo4j_session, create_partial_modern_graph
)
run_cypher_test_suite(neo4j_session, create_partial_modern_graph, cypher_queries)


Expand All @@ -100,10 +115,17 @@ def test_query_on_full_modern_graph(
"""
print("[Query on full modern graph]")
start_service_on_graph(interactive_session, create_modern_graph)
ensure_compiler_schema_ready(
interactive_session, neo4j_session, create_modern_graph
)
# try to query on the graph
run_cypher_test_suite(neo4j_session, create_modern_graph, cypher_queries)
start_service_on_graph(interactive_session, "1")
import_data_to_full_modern_graph(interactive_session, create_modern_graph)
start_service_on_graph(interactive_session, create_modern_graph)
ensure_compiler_schema_ready(
interactive_session, neo4j_session, create_modern_graph
)
run_cypher_test_suite(neo4j_session, create_modern_graph, cypher_queries)


Expand All @@ -129,6 +151,9 @@ def test_service_switching(
)
print("Procedure id: ", a_proc_id)
start_service_on_graph(interactive_session, create_modern_graph)
ensure_compiler_schema_ready(
interactive_session, neo4j_session, create_modern_graph
)
call_procedure(neo4j_session, create_modern_graph, a_proc_id)

# create procedure on graph_b_id
Expand All @@ -139,6 +164,9 @@ def test_service_switching(
"MATCH(n: person) return count(n);",
)
start_service_on_graph(interactive_session, create_vertex_only_modern_graph)
ensure_compiler_schema_ready(
interactive_session, neo4j_session, create_vertex_only_modern_graph
)
call_procedure(neo4j_session, create_vertex_only_modern_graph, b_proc_id)


Expand All @@ -156,6 +184,9 @@ def test_procedure_creation(interactive_session, neo4j_session, create_modern_gr
)
print("Procedure id: ", a_proc_id)
start_service_on_graph(interactive_session, create_modern_graph)
ensure_compiler_schema_ready(
interactive_session, neo4j_session, create_modern_graph
)
call_procedure(neo4j_session, create_modern_graph, a_proc_id)

# create procedure with name containing space,
Expand Down Expand Up @@ -202,6 +233,9 @@ def test_builtin_procedure(interactive_session, neo4j_session, create_modern_gra
)
# Call the builtin procedure
start_service_on_graph(interactive_session, create_modern_graph)
ensure_compiler_schema_ready(
interactive_session, neo4j_session, create_modern_graph
)
call_procedure(
neo4j_session,
create_modern_graph,
Expand Down Expand Up @@ -259,6 +293,10 @@ def test_list_jobs(interactive_session, create_vertex_only_modern_graph):
def test_call_proc_in_cypher(interactive_session, neo4j_session, create_modern_graph):
print("[Test call procedure in cypher]")
import_data_to_full_modern_graph(interactive_session, create_modern_graph)
start_service_on_graph(interactive_session, create_modern_graph)
ensure_compiler_schema_ready(
interactive_session, neo4j_session, create_modern_graph
)
result = neo4j_session.run(
'MATCH(p: person) with p.id as oid CALL k_neighbors("person", oid, 1) return label_name, vertex_oid;'
)
Expand All @@ -276,6 +314,9 @@ def test_custom_pk_name(
interactive_session, create_graph_with_custom_pk_name
)
start_service_on_graph(interactive_session, create_graph_with_custom_pk_name)
ensure_compiler_schema_ready(
interactive_session, neo4j_session, create_graph_with_custom_pk_name
)
result = neo4j_session.run(
"MATCH (n: person) where n.custom_id = 4 return n.custom_id;"
)
Expand All @@ -289,4 +330,3 @@ def test_custom_pk_name(
)
records = result.fetch(1)
assert len(records) == 1 and records[0]["$f0"] == 2
start_service_on_graph(interactive_session, "1")
2 changes: 1 addition & 1 deletion flex/interactive/sdk/python/setup.cfg
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ profile = black
ensure_newline_before_comments = True
line_length = 88
force_single_line = True
skip = build/,dist/,gs_interactive/api/,gs_interactive/api_response.py,gs_interactive/configuration.py,gs_interactive/exceptions.py,gs_interactive/models/,gs_interactiverest.py,
skip = build/,dist/,gs_interactive/api/,gs_interactive/api_response.py,gs_interactive/configuration.py,gs_interactive/exceptions.py,gs_interactive/models/,gs_interactiverest.py,gs_interactive/api_client.py,gs_interactive/__init__.py,gs_interactive/rest.py
skip_glob = *_pb2.py,*_pb2_grpc.py,build/*

[flake8]
Expand Down
53 changes: 31 additions & 22 deletions flex/storages/metadata/local_file_metadata_store.cc
Original file line number Diff line number Diff line change
Expand Up @@ -181,8 +181,8 @@ Result<bool> LocalFileMetadataStore::UpdateMeta(const meta_kind_t& meta_kind,

Result<LocalFileMetadataStore::meta_key_t>
LocalFileMetadataStore::get_next_meta_key(
const LocalFileMetadataStore::meta_kind_t& meta_kind) const {
return std::to_string(get_max_id(meta_kind) + 1);
const LocalFileMetadataStore::meta_kind_t& meta_kind) {
return std::to_string(increase_and_get_id(meta_kind));
}

std::string LocalFileMetadataStore::get_root_meta_dir() const {
Expand All @@ -208,29 +208,38 @@ std::string LocalFileMetadataStore::get_meta_file(const meta_kind_t& meta_kind,
return ret;
}

int32_t LocalFileMetadataStore::get_max_id(const meta_kind_t& meta_kind) const {
// iterate all files in the directory, get the max id.
int max_id_ = 0;
// Guarded by meta_mutex_ outside.
int32_t LocalFileMetadataStore::increase_and_get_id(
const meta_kind_t& meta_kind) {
auto dir = get_meta_kind_dir(meta_kind);
for (auto& p : std::filesystem::directory_iterator(dir)) {
if (std::filesystem::is_directory(p)) {
continue;
}
auto file_name = p.path().filename().string();
if (file_name.find(META_FILE_PREFIX) != std::string::npos) {
auto id_str = file_name.substr(strlen(META_FILE_PREFIX));
int32_t id;
try {
id = std::stoi(id_str);
} catch (std::invalid_argument& e) {
LOG(ERROR) << "Invalid id: " << id_str;
continue;
}
if (id > max_id_) {
max_id_ = id;
}
int max_id_ = 0;
// In the directory, we expect a file with name CUR_ID_FILE_NAME.
// If the file does not exist, we will create one with content "0".
auto cur_id_file = dir + "/" + CUR_ID_FILE_NAME;
if (!std::filesystem::exists(cur_id_file)) {
std::ofstream out_file(cur_id_file);
if (!out_file.is_open()) {
LOG(ERROR) << "Failed to create file: " << cur_id_file;
return -1;
}
out_file << "0";
out_file.close();
}
std::ifstream in_file(cur_id_file);
if (!in_file.is_open()) {
LOG(ERROR) << "Failed to open file: " << cur_id_file;
return -1;
}
in_file >> max_id_;
in_file.close();
max_id_++;
std::ofstream out_file(cur_id_file);
if (!out_file.is_open()) {
LOG(ERROR) << "Failed to open file: " << cur_id_file;
return -1;
}
out_file << max_id_;
out_file.close();
return max_id_;
}

Expand Down
Loading

0 comments on commit ba952b0

Please sign in to comment.