Skip to content

Commit

Permalink
feat(interactive): Add current running graph meta info in `GetService…
Browse files Browse the repository at this point in the history
…Status` API's response. (#3793)

Interactive service now returns metadata of the current running graph
through the `GetServiceStatus` API. This enables the Compiler to
retrieve the running graph's schema for query optimization and plan
generation, as mentioned by @shirly121.

@lidongze0629, kindly update the coordinator to accommodate the changes
in the `GetServiceStatus` API.

@lidongze0629 Please note that the `graph` field in the schema mapping
has been removed since it is dummy.

Fix #3787
  • Loading branch information
zhanglei1949 authored May 14, 2024
1 parent 348a2b7 commit e767a28
Show file tree
Hide file tree
Showing 9 changed files with 61 additions and 2,196 deletions.
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -116,6 +116,7 @@ flex/interactive/sdk/java/gradlew
flex/interactive/sdk/java/.travis.yml
flex/interactive/sdk/java/git_push.sh
flex/interactive/sdk/java/gradle/
flex/interactive/sdk/java/api/
flex/interactive/sdk/java/.openapi-generator/*
flex/interactive/sdk/java/src/main/AndroidManifest.xml
flex/interactive/sdk/java/.openapi-generate/
Expand Down
71 changes: 55 additions & 16 deletions flex/engines/http_server/actor/admin_actor.act.cc
Original file line number Diff line number Diff line change
Expand Up @@ -446,6 +446,20 @@ seastar::future<admin_query_result> admin_actor::run_delete_graph(
query_param&& query_param) {
LOG(INFO) << "Delete graph: " << query_param.content;

auto lock_info = metadata_store_->GetGraphIndicesLocked(query_param.content);
if (!lock_info.ok()) {
LOG(ERROR) << "Fail to get lock info for graph: " << query_param.content;
return seastar::make_ready_future<admin_query_result>(
gs::Result<seastar::sstring>(lock_info.status()));
}
if (lock_info.value()) {
LOG(ERROR) << "Graph is running, cannot delete: " << query_param.content;
return seastar::make_ready_future<admin_query_result>(
gs::Result<seastar::sstring>(gs::Status(
gs::StatusCode::AlreadyLocked,
"Graph is running, cannot delete: " + query_param.content)));
}

auto get_res = metadata_store_->GetGraphMeta(query_param.content);
if (!get_res.ok()) {
LOG(ERROR) << "Graph not exists: " << query_param.content;
Expand Down Expand Up @@ -779,10 +793,7 @@ seastar::future<admin_query_result> admin_actor::start_service(

auto cur_running_graph_res = metadata_store_->GetRunningGraph();
if (!cur_running_graph_res.ok()) {
LOG(ERROR) << "Fail to get running graph: "
<< cur_running_graph_res.status().error_message();
return seastar::make_ready_future<admin_query_result>(
gs::Result<seastar::sstring>(cur_running_graph_res.status()));
LOG(INFO) << "No running graph, will start on the graph in request";
}
auto cur_running_graph = cur_running_graph_res.value();
LOG(INFO) << "Current running graph: " << cur_running_graph;
Expand Down Expand Up @@ -973,17 +984,41 @@ seastar::future<admin_query_result> admin_actor::start_service(
seastar::future<admin_query_result> admin_actor::stop_service(
query_param&& query_param) {
auto& hqps_service = HQPSService::get();
return hqps_service.stop_query_actors().then([&hqps_service] {
return hqps_service.stop_query_actors().then([this, &hqps_service] {
LOG(INFO) << "Successfully stopped query handler";
if (hqps_service.stop_compiler_subprocess()) {
LOG(INFO) << "Successfully stop compiler";
return seastar::make_ready_future<admin_query_result>(
gs::Result<seastar::sstring>("Successfully stop service"));
} else {
LOG(ERROR) << "Fail to stop compiler";
return seastar::make_ready_future<admin_query_result>(
gs::Result<seastar::sstring>(gs::Status(gs::StatusCode::InternalError,
"Fail to stop compiler")));
// Add also remove current running graph
{
std::lock_guard<std::mutex> lock(mtx_);
// unlock the graph
auto cur_running_graph_res = metadata_store_->GetRunningGraph();
if (cur_running_graph_res.ok()) {
auto unlock_res =
metadata_store_->UnlockGraphIndices(cur_running_graph_res.value());
if (!unlock_res.ok()) {
LOG(ERROR) << "Fail to unlock graph: "
<< cur_running_graph_res.value();
return seastar::make_ready_future<admin_query_result>(
gs::Result<seastar::sstring>(unlock_res.status()));
}
if (!metadata_store_->ClearRunningGraph().ok()) {
LOG(ERROR) << "Fail to clear running graph";
return seastar::make_ready_future<admin_query_result>(
gs::Result<seastar::sstring>(
gs::Status(gs::StatusCode::InternalError,
"Fail to clear running graph")));
}
}

if (hqps_service.stop_compiler_subprocess()) {
LOG(INFO) << "Successfully stop compiler";
return seastar::make_ready_future<admin_query_result>(
gs::Result<seastar::sstring>("Successfully stop service"));
} else {
LOG(ERROR) << "Fail to stop compiler";
return seastar::make_ready_future<admin_query_result>(
gs::Result<seastar::sstring>(gs::Status(
gs::StatusCode::InternalError, "Fail to stop compiler")));
}
}
});
}
Expand All @@ -1001,9 +1036,13 @@ seastar::future<admin_query_result> admin_actor::service_status(
res["bolt_port"] = hqps_service.get_service_config().bolt_port;
res["gremlin_port"] = hqps_service.get_service_config().gremlin_port;
if (running_graph_res.ok()) {
res["graph_id"] = running_graph_res.value();
auto graph_meta =
metadata_store_->GetGraphMeta(running_graph_res.value());
if (graph_meta.ok()) {
res["graph"] = nlohmann::json::parse(graph_meta.value().ToJson());
}
} else {
res["graph_id"] = "UNKNOWN";
res["graph"] = {};
}
} else {
LOG(INFO) << "Query service has not been inited!";
Expand Down
2 changes: 2 additions & 0 deletions flex/engines/http_server/service/hqps_service.cc
Original file line number Diff line number Diff line change
Expand Up @@ -264,6 +264,8 @@ bool HQPSService::start_compiler_subprocess(
// check query server port is ready
if (check_compiler_ready()) {
LOG(INFO) << "Compiler server is ready!";
// sleep another 2 seconds to make sure the server is ready
std::this_thread::sleep_for(std::chrono::seconds(2));
return true;
}
sleep_time += sleep_interval;
Expand Down
Loading

0 comments on commit e767a28

Please sign in to comment.