Skip to content

Commit

Permalink
test
Browse files Browse the repository at this point in the history
  • Loading branch information
liulx20 committed Jan 17, 2025
1 parent a7ea666 commit bec7146
Show file tree
Hide file tree
Showing 5 changed files with 74 additions and 45 deletions.
96 changes: 58 additions & 38 deletions flex/engines/graph_db/app/cypher_read_app.cc
Original file line number Diff line number Diff line change
Expand Up @@ -10,52 +10,72 @@ namespace gs {
bool CypherReadApp::Query(const GraphDBSession& graph, Decoder& input,
Encoder& output) {
auto txn = graph.GetReadTransaction();
std::string_view bytes = input.get_bytes();
std::string_view _bytes = input.get_bytes();
uint8_t type = static_cast<uint8_t>(_bytes.back());
std::string_view bytes = std::string_view(_bytes.data(), _bytes.size() - 1);
if (type == Schema::ADHOC_READ_PLUGIN_ID) {
auto txn = graph.GetReadTransaction();

size_t sep = bytes.find_first_of("&?");
auto query_str = bytes.substr(0, sep);
auto params_str = bytes.substr(sep + 2);
std::map<std::string, std::string> params;
parse_params(params_str, params);
auto query = std::string(query_str.data(), query_str.size());
if (!pipeline_cache_.count(query)) {
if (plan_cache_.count(query)) {
// LOG(INFO) << "Hit cache for query ";
} else {
auto& query_cache = db_.getQueryCache();
std::string_view plan_str;
if (query_cache.get(query, plan_str)) {
physical::PhysicalPlan plan;
if (!plan.ParseFromString(std::string(plan_str))) {
return false;
}
plan_cache_[query] = plan;
physical::PhysicalPlan plan;
if (!plan.ParseFromString(std::string(bytes))) {
LOG(ERROR) << "Parse plan failed...";
return false;
}

LOG(INFO) << "plan: " << plan.DebugString();
gs::runtime::GraphReadInterface gri(txn);
auto ctx = runtime::PlanParser::get()
.parse_read_pipeline(graph.schema(),
gs::runtime::ContextMeta(), plan)
.Execute(gri, runtime::Context(), {}, timer_);

runtime::Sink::sink(ctx, txn, output);
} else {
size_t sep = bytes.find_first_of("&?");
auto query_str = bytes.substr(0, sep);
auto params_str = bytes.substr(sep + 2);
std::map<std::string, std::string> params;
parse_params(params_str, params);
auto query = std::string(query_str.data(), query_str.size());
if (!pipeline_cache_.count(query)) {
if (plan_cache_.count(query)) {
// LOG(INFO) << "Hit cache for query ";
} else {
const std::string statistics = db_.work_dir() + "/statistics.json";
const std::string& compiler_yaml = db_.work_dir() + "/graph.yaml";
const std::string& tmp_dir = db_.work_dir() + "/runtime/tmp/";
for (int i = 0; i < 3; ++i) {
if (!generate_plan(query, statistics, compiler_yaml, tmp_dir,
plan_cache_)) {
LOG(ERROR) << "Generate plan failed for query: " << query;
} else {
query_cache.put(query, plan_cache_[query].SerializeAsString());
break;
auto& query_cache = db_.getQueryCache();
std::string_view plan_str;
if (query_cache.get(query, plan_str)) {
physical::PhysicalPlan plan;
if (!plan.ParseFromString(std::string(plan_str))) {
return false;
}
plan_cache_[query] = plan;
} else {
const std::string statistics = db_.work_dir() + "/statistics.json";
const std::string& compiler_yaml = db_.work_dir() + "/graph.yaml";
const std::string& tmp_dir = db_.work_dir() + "/runtime/tmp/";
for (int i = 0; i < 3; ++i) {
if (!generate_plan(query, statistics, compiler_yaml, tmp_dir,
plan_cache_)) {
LOG(ERROR) << "Generate plan failed for query: " << query;
} else {
query_cache.put(query, plan_cache_[query].SerializeAsString());
break;
}
}
}
}
const auto& plan = plan_cache_[query];
pipeline_cache_.emplace(
query, runtime::PlanParser::get().parse_read_pipeline(
db_.schema(), gs::runtime::ContextMeta(), plan));
}
const auto& plan = plan_cache_[query];
pipeline_cache_.emplace(
query, runtime::PlanParser::get().parse_read_pipeline(
db_.schema(), gs::runtime::ContextMeta(), plan));
}

gs::runtime::GraphReadInterface gri(txn);
auto ctx = pipeline_cache_.at(query).Execute(gri, runtime::Context(), params,
timer_);
gs::runtime::GraphReadInterface gri(txn);
auto ctx = pipeline_cache_.at(query).Execute(gri, runtime::Context(),
params, timer_);

runtime::Sink::sink_encoder(ctx, gri, output);
runtime::Sink::sink_encoder(ctx, gri, output);
}
return true;
}
AppWrapper CypherReadAppFactory::CreateApp(const GraphDB& db) {
Expand Down
3 changes: 2 additions & 1 deletion flex/engines/graph_db/app/cypher_write_app.cc
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,8 @@ namespace gs {
bool CypherWriteApp::Query(GraphDBSession& graph, Decoder& input,
Encoder& output) {
auto txn = graph.GetInsertTransaction();
std::string_view bytes = input.get_bytes();
std::string_view r_bytes = input.get_bytes();
std::string_view bytes = std::string_view(r_bytes.data(), r_bytes.size() - 1);

size_t sep = bytes.find_first_of("&?");
auto query_str = bytes.substr(0, sep);
Expand Down
4 changes: 3 additions & 1 deletion flex/engines/graph_db/database/graph_db.cc
Original file line number Diff line number Diff line change
Expand Up @@ -420,10 +420,12 @@ void GraphDB::initApps(
app_factories_[Schema::HQPS_ADHOC_WRITE_PLUGIN_ID] =
std::make_shared<HQPSAdhocWriteAppFactory>();
app_factories_[Schema::ADHOC_READ_PLUGIN_ID] =
std::make_shared<AdhocReadAppFactory>();
std::make_shared<CypherReadAppFactory>();

auto& parser = gs::runtime::PlanParser::get();
parser.init();
app_factories_[Schema::ADHOC_READ_PLUGIN_ID] =
std::make_shared<CypherReadAppFactory>();

app_factories_[Schema::CYPHER_READ_PLUGIN_ID] =
std::make_shared<CypherReadAppFactory>();
Expand Down
6 changes: 5 additions & 1 deletion flex/engines/graph_db/database/graph_db_session.h
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ class GraphDBSession {
kCypherJson = 1, // Json format for cypher query
kCypherProtoAdhoc = 2, // Protobuf format for adhoc query
kCypherProtoProcedure = 3, // Protobuf format for procedure query
kCypherString = 4,
};

static constexpr int32_t MAX_RETRY = 3;
Expand Down Expand Up @@ -156,7 +157,7 @@ class GraphDBSession {
// second last byte,which is fixed to 255, and other bytes are a string
// representing the path to generated dynamic lib.
return std::make_pair((uint8_t) input[len - 2],
std::string_view(str_data, len - 2));
std::string_view(str_data, len - 1));
} else if (input_tag == static_cast<uint8_t>(InputFormat::kCypherJson)) {
// For cypherJson there is no query-id provided. The query name is
// provided in the json string.
Expand All @@ -171,6 +172,9 @@ class GraphDBSession {
// Same as cypherJson, we don't discard the last byte.
std::string_view str_view(input.data(), len);
return parse_query_type_from_cypher_internal(str_view);
} else if (input_tag == static_cast<uint8_t>(InputFormat::kCypherString)) {
return std::make_pair((uint8_t) input[len - 2],
std::string_view(str_data, len - 1));
} else {
return Result<std::pair<uint8_t, std::string_view>>(
gs::Status(StatusCode::INVALID_ARGUMENT,
Expand Down
10 changes: 6 additions & 4 deletions flex/engines/http_server/handler/graph_db_http_handler.cc
Original file line number Diff line number Diff line change
Expand Up @@ -296,9 +296,8 @@ class stored_proc_handler : public StoppableHandler {
if (req->content.size() > 0) {
// read last byte and get the format info from the byte.
last_byte = req->content.back();
if (last_byte >
static_cast<uint8_t>(
gs::GraphDBSession::InputFormat::kCypherProtoProcedure)) {
if (last_byte > static_cast<uint8_t>(
gs::GraphDBSession::InputFormat::kCypherString)) {
LOG(ERROR) << "Unsupported request format: " << (int) last_byte;
rep->set_status(
seastar::httpd::reply::status_type::internal_server_error);
Expand Down Expand Up @@ -339,7 +338,10 @@ class stored_proc_handler : public StoppableHandler {
#endif // HAVE_OPENTELEMETRY_CPP
](auto&& output) {
if (last_byte == static_cast<uint8_t>(
gs::GraphDBSession::InputFormat::kCppEncoder)) {
gs::GraphDBSession::InputFormat::kCppEncoder) ||
last_byte ==
static_cast<uint8_t>(
gs::GraphDBSession::InputFormat::kCypherString)) {
return seastar::make_ready_future<query_param>(
std::move(output.content));
} else {
Expand Down

0 comments on commit bec7146

Please sign in to comment.