Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor(interactive): refactor runtime #4418

Merged
merged 35 commits into from
Jan 20, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
35 commits
Select commit Hold shift + click to select a range
19fa341
refactor runtime
liulx20 Jan 10, 2025
cfa0bd4
pass validation
liulx20 Jan 11, 2025
c5ddd34
revert mimalloc temporarily
liulx20 Jan 11, 2025
19e2c90
add top_n_generator
liulx20 Jan 11, 2025
d514b16
fix graph db
liulx20 Jan 11, 2025
f72570b
fix
liulx20 Jan 12, 2025
81e2649
refactor
liulx20 Jan 14, 2025
80dcbf6
fix cmake
liulx20 Jan 15, 2025
e6cff63
fix
liulx20 Jan 15, 2025
38d009f
fix orderby
liulx20 Jan 15, 2025
f7cd984
remove compress
liulx20 Jan 15, 2025
5ec8eaa
format
liulx20 Jan 15, 2025
cd16035
fix ci
liulx20 Jan 16, 2025
9cde590
add mimalloc deps
liulx20 Jan 16, 2025
43fb5a8
add parallel-hashmap as submodule
liulx20 Jan 16, 2025
db72eee
copy graph.yaml to data_dir when bulk_loading
liulx20 Jan 16, 2025
34a8393
fix procedure call
liulx20 Jan 16, 2025
a0f9351
fix
liulx20 Jan 16, 2025
81c0fae
test
liulx20 Jan 16, 2025
6843a29
Merge branch 'main' into refactor
liulx20 Jan 16, 2025
7e0d003
fix pk
liulx20 Jan 17, 2025
b91d856
fix
liulx20 Jan 17, 2025
bbe27bc
format code
liulx20 Jan 17, 2025
c069b44
add more edge expand branch
liulx20 Jan 17, 2025
3516ef5
fix edge
liulx20 Jan 17, 2025
667e641
fix
liulx20 Jan 17, 2025
2efa361
fix intersect
liulx20 Jan 17, 2025
918a79a
fix
liulx20 Jan 17, 2025
a7ea666
fix
liulx20 Jan 17, 2025
bec7146
test
liulx20 Jan 17, 2025
297fce5
delete adhoc app
liulx20 Jan 17, 2025
b67c292
try to fix
liulx20 Jan 17, 2025
ee70ec1
simplify group by
liulx20 Jan 19, 2025
9f078e8
remove redundant comments
liulx20 Jan 20, 2025
ee2c7a7
fix project
liulx20 Jan 20, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions .gitmodules
Original file line number Diff line number Diff line change
Expand Up @@ -10,3 +10,6 @@
path = learning_engine/graphlearn-for-pytorch
url = https://github.com/alibaba/graphlearn-for-pytorch.git

[submodule "flex/third_party/parallel-hashmap"]
path = flex/third_party/parallel-hashmap
url = https://github.com/greg7mdp/parallel-hashmap.git
1 change: 1 addition & 0 deletions flex/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -207,6 +207,7 @@ add_definitions(-DRAPIDJSON_HAS_CXX11_RANGE_FOR=1)
if (BUILD_ODPS_FRAGMENT_LOADER)
include_directories(SYSTEM ${CMAKE_SOURCE_DIR}/third_party/odps/include)
endif()
include_directories(SYSTEM ${CMAKE_SOURCE_DIR}/third_party/parallel-hashmap)

macro(install_flex_target target)
install(TARGETS ${target}
Expand Down
40 changes: 13 additions & 27 deletions flex/bin/adhoc_runner.cc
Original file line number Diff line number Diff line change
Expand Up @@ -20,10 +20,12 @@
#include <iostream>
#include <vector>
#include "flex/engines/graph_db/database/graph_db.h"
#include "flex/engines/graph_db/runtime/adhoc/runtime.h"
#include "flex/engines/graph_db/runtime/common/operators/retrieve/sink.h"
#include "flex/engines/graph_db/runtime/execute/plan_parser.h"
#include "flex/engines/graph_db/runtime/utils/opr_timer.h"
#include "flex/proto_generated_gie/physical.pb.h"

namespace bpo = boost::program_options;
namespace bl = boost::leaf;

std::string read_pb(const std::string& filename) {
std::ifstream file(filename, std::ios::binary);
Expand Down Expand Up @@ -78,27 +80,11 @@ void load_params(const std::string& filename,
gs::runtime::Context eval_plan(
const physical::PhysicalPlan& plan, gs::ReadTransaction& txn,
const std::map<std::string, std::string>& params) {
gs::runtime::Context ctx;
{
ctx = bl::try_handle_all(
[&plan, &txn, &params]() {
return gs::runtime::runtime_eval(plan, txn, params);
},
[&ctx](const gs::Status& err) {
LOG(FATAL) << "Error in execution: " << err.error_message();
return ctx;
},
[&](const bl::error_info& err) {
LOG(FATAL) << "boost leaf error: " << err.error().value() << ", "
<< err.exception()->what();
return ctx;
},
[&]() {
LOG(FATAL) << "Unknown error in execution";
return ctx;
});
}
return ctx;
gs::runtime::GraphReadInterface gri(txn);
gs::runtime::OprTimer timer;
return gs::runtime::PlanParser::get()
.parse_read_pipeline(gri.schema(), gs::runtime::ContextMeta(), plan)
.Execute(gri, gs::runtime::Context(), params, timer);
}

int main(int argc, char** argv) {
Expand Down Expand Up @@ -187,7 +173,7 @@ int main(int argc, char** argv) {
auto& m = map[i % params_num];
auto ctx = eval_plan(pb, txn, m);
gs::Encoder output(outputs[i]);
gs::runtime::eval_sink(ctx, txn, output);
gs::runtime::Sink::sink(ctx, txn, output);
}
t1 += grape::GetCurrentTime();

Expand All @@ -197,7 +183,7 @@ int main(int argc, char** argv) {
auto ctx = eval_plan(pb, txn, m);
outputs[i].clear();
gs::Encoder output(outputs[i]);
gs::runtime::eval_sink(ctx, txn, output);
gs::runtime::Sink::sink(ctx, txn, output);
}
t2 += grape::GetCurrentTime();

Expand All @@ -207,7 +193,7 @@ int main(int argc, char** argv) {
auto ctx = eval_plan(pb, txn, m);
outputs[i].clear();
gs::Encoder output(outputs[i]);
gs::runtime::eval_sink(ctx, txn, output);
gs::runtime::Sink::sink(ctx, txn, output);
}
t3 += grape::GetCurrentTime();

Expand All @@ -231,4 +217,4 @@ int main(int argc, char** argv) {
}

return 0;
}
}
10 changes: 10 additions & 0 deletions flex/bin/bulk_loader.cc
Original file line number Diff line number Diff line change
Expand Up @@ -152,6 +152,16 @@ int main(int argc, char** argv) {
return -1;
}

{
std::error_code ec;
std::filesystem::copy(graph_schema_path, data_dir_path / "graph.yaml",
std::filesystem::copy_options::overwrite_existing,
ec);
if (ec) {
LOG(FATAL) << "Failed to copy graph schema file: " << ec.message();
}
}

work_dir = data_dir_path.string();

// Register handlers for SIGKILL, SIGINT, SIGTERM, SIGSEGV, SIGABRT
Expand Down
13 changes: 3 additions & 10 deletions flex/bin/rt_server.cc
Original file line number Diff line number Diff line change
Expand Up @@ -34,9 +34,8 @@ int main(int argc, char** argv) {
bpo::value<uint32_t>()->default_value(1),
"shard number of actor system")(
"http-port,p", bpo::value<uint16_t>()->default_value(10000),
"http port of query handler")("graph-config,g", bpo::value<std::string>(),
"graph schema config file")(
"data-path,d", bpo::value<std::string>(), "data directory path")(
"http port of query handler")("data-path,d", bpo::value<std::string>(),
"data directory path")(
"warmup,w", bpo::value<bool>()->default_value(false),
"warmup graph data")("memory-level,m",
bpo::value<int>()->default_value(1));
Expand All @@ -62,14 +61,8 @@ int main(int argc, char** argv) {
uint32_t shard_num = vm["shard-num"].as<uint32_t>();
uint16_t http_port = vm["http-port"].as<uint16_t>();

std::string graph_schema_path = "";
std::string data_path = "";

if (!vm.count("graph-config")) {
LOG(ERROR) << "graph-config is required";
return -1;
}
graph_schema_path = vm["graph-config"].as<std::string>();
if (!vm.count("data-path")) {
LOG(ERROR) << "data-path is required";
return -1;
Expand All @@ -81,7 +74,7 @@ int main(int argc, char** argv) {

double t0 = -grape::GetCurrentTime();
auto& db = gs::GraphDB::get();

std::string graph_schema_path = data_path + "/graph.yaml";
auto schema = gs::Schema::LoadFromYaml(graph_schema_path);
if (!schema.ok()) {
LOG(FATAL) << "Failed to load schema: " << schema.status().error_message();
Expand Down
4 changes: 1 addition & 3 deletions flex/engines/graph_db/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ add_library(flex_graph_db SHARED ${GRAPH_DB_SRC_FILES})

target_include_directories(flex_graph_db PUBLIC $<BUILD_INTERFACE:${CMAKE_CURRENT_BINARY_DIR}>)
target_link_libraries(flex_graph_db flex_rt_mutable_graph flex_utils ${LIBGRAPELITE_LIBRARIES} ${CMAKE_THREAD_LIBS_INIT})
target_link_libraries(flex_graph_db flex_plan_proto runtime_adhoc)
target_link_libraries(flex_graph_db runtime_execute)
install_flex_target(flex_graph_db)

install(FILES ${CMAKE_CURRENT_SOURCE_DIR}/database/graph_db.h
Expand All @@ -28,7 +28,5 @@ install(FILES ${CMAKE_CURRENT_SOURCE_DIR}/app/app_base.h
DESTINATION include/flex/engines/graph_db/app)
install(FILES ${CMAKE_CURRENT_SOURCE_DIR}/app/hqps_app.h
DESTINATION include/flex/engines/graph_db/app)
install(FILES ${CMAKE_CURRENT_SOURCE_DIR}/app/adhoc_app.h
DESTINATION include/flex/engines/graph_db/app)


77 changes: 0 additions & 77 deletions flex/engines/graph_db/app/adhoc_app.cc

This file was deleted.

Loading
Loading