Skip to content

Commit

Permalink
add cypher client
Browse files Browse the repository at this point in the history
  • Loading branch information
liulx20 committed Jan 24, 2025
1 parent 52cd8b5 commit 44e8912
Show file tree
Hide file tree
Showing 9 changed files with 105 additions and 179 deletions.
6 changes: 3 additions & 3 deletions flex/bin/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -20,9 +20,9 @@ add_executable(adhoc_runner adhoc_runner.cc)
target_link_libraries(adhoc_runner flex_utils flex_graph_db)
install_without_export_flex_target(adhoc_runner)

add_executable(cypher_runner cypher_runner.cc)
target_link_libraries(cypher_runner flex_utils flex_graph_db)
install_without_export_flex_target(cypher_runner)
add_executable(cypher_client cypher_client.cc)
target_link_libraries(cypher_client flex_utils)
install_without_export_flex_target(cypher_client)

add_executable(flex_analytical_engine flex_analytical_engine.cc)
target_link_libraries(flex_analytical_engine flex_immutable_graph flex_bsp ${GLOG_LIBRARIES} ${GFLAGS_LIBRARIES})
Expand Down
72 changes: 72 additions & 0 deletions flex/bin/cypher_client.cc
Original file line number Diff line number Diff line change
@@ -0,0 +1,72 @@
/** Copyright 2020 Alibaba Group Holding Limited.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

#include "grape/util.h"

#include <boost/program_options.hpp>
#include <fstream>
#include <iostream>
#include <vector>
#include "flex/third_party/httplib.h"

namespace bpo = boost::program_options;

int main(int argc, char** argv) {
bpo::options_description desc("Usage:");
desc.add_options()("help", "Display help message")("version,v",
"Display version")(
"uri,u", bpo::value<std::string>()->default_value("127.0.0.1"),
"uri of the db")("port,p", bpo::value<int>()->default_value(10000),
"port number");
google::InitGoogleLogging(argv[0]);
FLAGS_logtostderr = true;

bpo::variables_map vm;
bpo::store(bpo::command_line_parser(argc, argv).options(desc).run(), vm);
bpo::notify(vm);

if (vm.count("help")) {
std::cout << desc << std::endl;
return 0;
}
if (vm.count("version")) {
std::cout << "GraphScope/Flex version " << FLEX_VERSION << std::endl;
return 0;
}

std::string uri = vm["uri"].as<std::string>();
int port = vm["port"].as<int>();
httplib::Client cli(uri, port);
setenv("TZ", "Asia/Shanghai", 1);
tzset();

while (true) {
std::cout << ">>> ";
std::string query;
getline(std::cin, query);
if (query == "exit") {
break;
}
if (query == "") {
continue;
}
query.append(1, '\xF6');
query.append(1, 4);
auto res = cli.Post("/v1/graph/current/query", query, "text/plain");
std::string ret = res->body;
std::cout << ret << std::endl;
}
return 0;
}
154 changes: 0 additions & 154 deletions flex/bin/cypher_runner.cc

This file was deleted.

5 changes: 3 additions & 2 deletions flex/engines/graph_db/app/cypher_app_utils.cc
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
*/

#include "flex/engines/graph_db/app/cypher_app_utils.h"
#include <glog/logging.h>

#include <sys/wait.h> // for waitpid()
#include <unistd.h> // for fork() and execvp()
Expand Down Expand Up @@ -135,8 +136,8 @@ bool generate_plan(
int status;
waitpid(pid, &status, 0);
if (WIFEXITED(status)) {
std::cout << "Child exited with status " << WEXITSTATUS(status)
<< std::endl;
VLOG(1) << "Child exited with status " << WEXITSTATUS(status)
<< std::endl;
}

{
Expand Down
24 changes: 14 additions & 10 deletions flex/engines/graph_db/app/cypher_read_app.cc
Original file line number Diff line number Diff line change
Expand Up @@ -84,14 +84,15 @@ bool CypherReadApp::Query(const GraphDBSession& graph, Decoder& input,
const std::string& compiler_yaml = db_.work_dir() + "/graph.yaml";
const std::string& tmp_dir = db_.work_dir() + "/runtime/tmp/";
const auto& compiler_path = db_.schema().get_compiler_path();
for (int i = 0; i < 3; ++i) {
if (!generate_plan(query, statistics, compiler_path, compiler_yaml,
tmp_dir, plan_cache_)) {
LOG(ERROR) << "Generate plan failed for query: " << query;
} else {
query_cache.put(query, plan_cache_[query].SerializeAsString());
break;
}
if (!generate_plan(query, statistics, compiler_path, compiler_yaml,
tmp_dir, plan_cache_)) {
LOG(ERROR) << "Generate plan failed for query: " << query;
std::string error =
" Compiler failed to generate physical plan: " + query;
output.put_bytes(error.data(), error.size());
return false;
} else {
query_cache.put(query, plan_cache_[query].SerializeAsString());
}
}
}
Expand All @@ -107,8 +108,11 @@ bool CypherReadApp::Query(const GraphDBSession& graph, Decoder& input,
gs::runtime::GraphReadInterface gri(txn);
auto ctx = pipeline_cache_.at(query).Execute(gri, runtime::Context(),
params, timer_);

runtime::Sink::sink_encoder(ctx.value(), gri, output);
if (type == Schema::CYPHER_READ_PLUGIN_ID) {
runtime::Sink::sink_encoder(ctx.value(), gri, output);
} else {
runtime::Sink::sink_beta(ctx.value(), gri, output);
}
}
return true;
}
Expand Down
2 changes: 2 additions & 0 deletions flex/engines/graph_db/database/graph_db.cc
Original file line number Diff line number Diff line change
Expand Up @@ -421,6 +421,8 @@ void GraphDB::initApps(
std::make_shared<HQPSAdhocWriteAppFactory>();
app_factories_[Schema::ADHOC_READ_PLUGIN_ID] =
std::make_shared<CypherReadAppFactory>();
app_factories_[Schema::CYPHER_READ_DEBUG_PLUGIN_ID] =
std::make_shared<CypherReadAppFactory>();

auto& parser = gs::runtime::PlanParser::get();
parser.init();
Expand Down
16 changes: 7 additions & 9 deletions flex/engines/graph_db/runtime/common/operators/retrieve/sink.cc
Original file line number Diff line number Diff line change
Expand Up @@ -58,25 +58,23 @@ void Sink::sink_encoder(const Context& ctx, const GraphReadInterface& graph,
void Sink::sink_beta(const Context& ctx, const GraphReadInterface& graph,
Encoder& output) {
size_t row_num = ctx.row_num();
results::CollectiveResults results;
std::stringstream ss;

for (size_t i = 0; i < row_num; ++i) {
auto result = results.add_results();
std::stringstream ss;
for (size_t j : ctx.tag_ids) {
auto col = ctx.get(j);
if (col == nullptr) {
continue;
}
auto column = result->mutable_record()->add_columns();
auto val = col->get_elem(i);
ss << val.to_string() << "|";
val.sink(graph, j, column);
}
std::cout << ss.str() << std::endl;
ss << std::endl;
}
std::cout << "========================================================="
<< std::endl;
auto res = results.SerializeAsString();
ss << "========================================================="
<< std::endl;
// auto res = results.SerializeAsString();
auto res = ss.str();
output.put_bytes(res.data(), res.size());
}

Expand Down
2 changes: 2 additions & 0 deletions flex/engines/graph_db/runtime/common/rt_any.cc
Original file line number Diff line number Diff line change
Expand Up @@ -695,6 +695,8 @@ static void sink_any(const Any& any, common::Value* value) {
value->set_f64(any.AsDouble());
} else if (any.type == PropertyType::Empty()) {
value->mutable_none();
} else if (any.type == PropertyType::Day()) {
value->set_i64(any.AsDay().to_timestamp());
} else {
LOG(FATAL) << "Any value: " << any.to_string()
<< ", type = " << any.type.type_enum;
Expand Down
3 changes: 2 additions & 1 deletion flex/storages/rt_mutable_graph/schema.h
Original file line number Diff line number Diff line change
Expand Up @@ -31,13 +31,14 @@ class Schema {
// How many built-in plugins are there.
// Currently only one builtin plugin, SERVER_APP is supported.
static constexpr uint8_t RESERVED_PLUGIN_NUM = 1;
static constexpr uint8_t MAX_PLUGIN_ID = 246;
static constexpr uint8_t MAX_PLUGIN_ID = 245;
static constexpr uint8_t ADHOC_READ_PLUGIN_ID = 253;
static constexpr uint8_t HQPS_ADHOC_READ_PLUGIN_ID = 254;
static constexpr uint8_t HQPS_ADHOC_WRITE_PLUGIN_ID = 255;

static constexpr uint8_t CYPHER_READ_PLUGIN_ID = 248;
static constexpr uint8_t CYPHER_WRITE_PLUGIN_ID = 247;
static constexpr uint8_t CYPHER_READ_DEBUG_PLUGIN_ID = 246;
static constexpr const char* HQPS_ADHOC_READ_PLUGIN_ID_STR = "\xFE";
static constexpr const char* HQPS_ADHOC_WRITE_PLUGIN_ID_STR = "\xFF";
static constexpr const char* ADHOC_READ_PLUGIN_ID_STR = "\xFD";
Expand Down

0 comments on commit 44e8912

Please sign in to comment.