Skip to content

Commit

Permalink
add more edge expand branch
Browse files Browse the repository at this point in the history
  • Loading branch information
liulx20 committed Jan 17, 2025
1 parent bbe27bc commit c069b44
Show file tree
Hide file tree
Showing 7 changed files with 215 additions and 167 deletions.
2 changes: 0 additions & 2 deletions flex/engines/graph_db/app/cypher_app_utils.cc
Original file line number Diff line number Diff line change
Expand Up @@ -167,8 +167,6 @@ bool generate_plan(
unlink(output_file.c_str());
unlink(query_file.c_str());
unlink(compiler_config_path.c_str());
// unlink("/tmp/temp.cypher.yaml");
// unlink("/tmp/temp.cypher.yaml_extra_config.yaml");
}
}
}
Expand Down
3 changes: 0 additions & 3 deletions flex/engines/graph_db/app/cypher_write_app.cc
Original file line number Diff line number Diff line change
Expand Up @@ -50,9 +50,6 @@ bool CypherWriteApp::Query(GraphDBSession& graph, Decoder& input,
runtime::PlanParser::get().parse_write_pipeline(db_.schema(), plan));
} else {
}
// const auto& plan = plan_cache_[query];

// LOG(INFO) << "plan: " << plan.DebugString();

gs::runtime::GraphInsertInterface gri(txn);
auto ctx = pipeline_cache_.at(query).Execute(gri, runtime::WriteContext(),
Expand Down
293 changes: 170 additions & 123 deletions flex/engines/graph_db/runtime/common/operators/retrieve/edge_expand.cc
Original file line number Diff line number Diff line change
Expand Up @@ -475,7 +475,6 @@ Context EdgeExpand::expand_vertex_without_predicate(
VertexColumnType input_vertex_list_type =
input_vertex_list->vertex_column_type();
if (input_vertex_list_type == VertexColumnType::kSingle) {
// LOG(INFO) << "input vertex size: " << input_vertex_list->size();
if (input_vertex_list->is_optional()) {
auto casted_input_vertex_list =
std::dynamic_pointer_cast<SLVertexColumnBase>(input_vertex_list);
Expand Down Expand Up @@ -527,8 +526,8 @@ Context EdgeExpand::expand_vertex_without_predicate(
ctx.set_with_reshuffle(params.alias, pair.first, pair.second);
return ctx;
} else {
LOG(FATAL) << "unexpected to reach here";
return ctx;
LOG(FATAL) << "not support vertex column type "
<< static_cast<int>(input_vertex_list_type);
}
}

Expand Down Expand Up @@ -597,12 +596,53 @@ Context EdgeExpand::expand_edge_with_special_edge_predicate(
return Context();
}

Context EdgeExpand::expand_vertex_ep_lt(const GraphReadInterface& graph,
Context&& ctx,
const EdgeExpandParams& params,
const std::string& ep_val) {
template <typename T, typename VERTEX_COL_T>
Context expand_vertex_ep_lt_ml_impl(
const GraphReadInterface& graph, Context&& ctx,
const std::vector<std::tuple<label_t, label_t, Direction>>& label_dirs,
label_t input_label, const std::string& ep_val,
VERTEX_COL_T* casted_input_vertex_list, int alias) {
T max_value(TypedConverter<T>::typed_from_string(ep_val));
std::vector<GraphReadInterface::graph_view_t<T>> views;
for (auto& t : label_dirs) {
label_t nbr_label = std::get<0>(t);
label_t edge_label = std::get<1>(t);
Direction dir = std::get<2>(t);
if (dir == Direction::kOut) {
views.emplace_back(
graph.GetOutgoingGraphView<T>(input_label, nbr_label, edge_label));
} else {
CHECK(dir == Direction::kIn);
views.emplace_back(
graph.GetIncomingGraphView<T>(input_label, nbr_label, edge_label));
}
}
MSVertexColumnBuilder builder;
size_t csr_idx = 0;
std::vector<size_t> offsets;
for (auto& csr : views) {
label_t nbr_label = std::get<0>(label_dirs[csr_idx]);
size_t idx = 0;
builder.start_label(nbr_label);
for (auto v : casted_input_vertex_list->vertices()) {
csr.foreach_edges_lt(v, max_value, [&](vid_t nbr, const T& e) {
builder.push_back_opt(nbr);
offsets.push_back(idx);
});
++idx;
}
++csr_idx;
}
std::shared_ptr<IContextColumn> col = builder.finish();
ctx.set_with_reshuffle(alias, col, offsets);
return ctx;
}
std::optional<Context> EdgeExpand::expand_vertex_ep_lt(
const GraphReadInterface& graph, Context&& ctx,
const EdgeExpandParams& params, const std::string& ep_val) {
if (params.is_optional) {
LOG(FATAL) << "not support optional edge expand";
LOG(ERROR) << "not support optional edge expand";
return std::nullopt;
}
std::shared_ptr<IVertexColumn> input_vertex_list =
std::dynamic_pointer_cast<IVertexColumn>(ctx.get(params.v_tag));
Expand Down Expand Up @@ -661,61 +701,116 @@ Context EdgeExpand::expand_vertex_ep_lt(const GraphReadInterface& graph,
}
}
if (!sp) {
LOG(FATAL) << "not support multiple edge types";
LOG(ERROR) << "not support edge type";
return std::nullopt;
}
const PropertyType& ed_type = ed_types[0];
if (ed_type == PropertyType::Date()) {
Date max_value(std::stoll(ep_val));
std::vector<GraphReadInterface::graph_view_t<Date>> views;
for (auto& t : label_dirs) {
label_t nbr_label = std::get<0>(t);
label_t edge_label = std::get<1>(t);
Direction dir = std::get<2>(t);
if (dir == Direction::kOut) {
views.emplace_back(graph.GetOutgoingGraphView<Date>(
input_label, nbr_label, edge_label));
} else {
CHECK(dir == Direction::kIn);
views.emplace_back(graph.GetIncomingGraphView<Date>(
input_label, nbr_label, edge_label));
}
}
MSVertexColumnBuilder builder;
size_t csr_idx = 0;
std::vector<size_t> offsets;
for (auto& csr : views) {
label_t nbr_label = std::get<0>(label_dirs[csr_idx]);
// label_t edge_label = std::get<1>(label_dirs[csr_idx]);
// Direction dir = std::get<2>(label_dirs[csr_idx]);
size_t idx = 0;
builder.start_label(nbr_label);
for (auto v : casted_input_vertex_list->vertices()) {
csr.foreach_edges_lt(v, max_value, [&](vid_t nbr, const Date& e) {
builder.push_back_opt(nbr);
offsets.push_back(idx);
});
++idx;
}
++csr_idx;
}
std::shared_ptr<IContextColumn> col = builder.finish();
ctx.set_with_reshuffle(params.alias, col, offsets);
return ctx;
return expand_vertex_ep_lt_ml_impl<Date>(
graph, std::move(ctx), label_dirs, input_label, ep_val,
casted_input_vertex_list.get(), params.alias);
} else if (ed_type == PropertyType::Int64()) {

Check notice on line 712 in flex/engines/graph_db/runtime/common/operators/retrieve/edge_expand.cc

View check run for this annotation

codefactor.io / CodeFactor

flex/engines/graph_db/runtime/common/operators/retrieve/edge_expand.cc#L600-L712

Complex Method
return expand_vertex_ep_lt_ml_impl<int64_t>(
graph, std::move(ctx), label_dirs, input_label, ep_val,
casted_input_vertex_list.get(), params.alias);
} else {
LOG(FATAL) << "not support edge type";
LOG(ERROR) << "not support edge type";
return std::nullopt;
}
} else {
LOG(FATAL) << "unexpected to reach here...";
return ctx;
LOG(ERROR) << "not support vertex column type";
return std::nullopt;
}
}

Context EdgeExpand::expand_vertex_ep_gt(const GraphReadInterface& graph,
Context&& ctx,
const EdgeExpandParams& params,
const std::string& ep_val) {
template <typename T, typename VERTEX_COL_T>
Context expand_vertex_ep_gt_sl_impl(
const GraphReadInterface& graph, Context&& ctx,
const std::vector<std::tuple<label_t, label_t, Direction>>& label_dirs,
label_t input_label, const std::string& ep_val,
VERTEX_COL_T* casted_input_vertex_list, int alias) {
T max_value(TypedConverter<T>::typed_from_string(ep_val));
std::vector<GraphReadInterface::graph_view_t<T>> views;
for (auto& t : label_dirs) {
label_t nbr_label = std::get<0>(t);
label_t edge_label = std::get<1>(t);
Direction dir = std::get<2>(t);
if (dir == Direction::kOut) {
views.emplace_back(
graph.GetOutgoingGraphView<T>(input_label, nbr_label, edge_label));
} else {
CHECK(dir == Direction::kIn);
views.emplace_back(
graph.GetIncomingGraphView<T>(input_label, nbr_label, edge_label));
}
}
SLVertexColumnBuilder builder(std::get<0>(label_dirs[0]));
std::vector<size_t> offsets;
for (auto& csr : views) {
size_t idx = 0;
for (auto v : casted_input_vertex_list->vertices()) {
csr.foreach_edges_gt(v, max_value, [&](vid_t nbr, const T& val) {
builder.push_back_opt(nbr);
offsets.push_back(idx);
});
++idx;
}
}
std::shared_ptr<IContextColumn> col = builder.finish();
ctx.set_with_reshuffle(alias, col, offsets);
return ctx;
}

template <typename T, typename VERTEX_COL_T>
Context expand_vertex_ep_gt_ml_impl(
const GraphReadInterface& graph, Context&& ctx,
const std::vector<std::tuple<label_t, label_t, Direction>>& label_dirs,
label_t input_label, const EdgeExpandParams& params,
const std::string& ep_val, VERTEX_COL_T* casted_input_vertex_list,
int alias) {
T max_value = TypedConverter<T>::typed_from_string(ep_val);
std::vector<GraphReadInterface::graph_view_t<T>> views;
for (auto& t : label_dirs) {
label_t nbr_label = std::get<0>(t);
label_t edge_label = std::get<1>(t);
Direction dir = std::get<2>(t);
if (dir == Direction::kOut) {
views.emplace_back(
graph.GetOutgoingGraphView<T>(input_label, nbr_label, edge_label));
} else {
CHECK(dir == Direction::kIn);
views.emplace_back(
graph.GetIncomingGraphView<T>(input_label, nbr_label, edge_label));
}
}
MSVertexColumnBuilder builder;
size_t csr_idx = 0;
std::vector<size_t> offsets;
for (auto& csr : views) {
label_t nbr_label = std::get<0>(label_dirs[csr_idx]);
size_t idx = 0;
builder.start_label(nbr_label);
LOG(INFO) << "start label: " << static_cast<int>(nbr_label);
for (auto v : casted_input_vertex_list->vertices()) {
csr.foreach_edges_gt(v, max_value, [&](vid_t nbr, const T& val) {
builder.push_back_opt(nbr);
offsets.push_back(idx);
});
++idx;
}
++csr_idx;
}
std::shared_ptr<IContextColumn> col = builder.finish();
ctx.set_with_reshuffle(alias, col, offsets);
return ctx;
}

std::optional<Context> EdgeExpand::expand_vertex_ep_gt(
const GraphReadInterface& graph, Context&& ctx,
const EdgeExpandParams& params, const std::string& ep_val) {
if (params.is_optional) {
LOG(FATAL) << "not support optional edge expand";
LOG(ERROR) << "not support optional edge expand";
return std::nullopt;
}
std::shared_ptr<IVertexColumn> input_vertex_list =
std::dynamic_pointer_cast<IVertexColumn>(ctx.get(params.v_tag));
Expand Down Expand Up @@ -774,88 +869,40 @@ Context EdgeExpand::expand_vertex_ep_gt(const GraphReadInterface& graph,
}
}
if (!sp) {
LOG(FATAL) << "not support multiple edge types";
LOG(ERROR) << "not support multiple edge types";
return std::nullopt;
}
const PropertyType& ed_type = ed_types[0];
if (se) {
if (ed_type == PropertyType::Date()) {
Date max_value(std::stoll(ep_val));
std::vector<GraphReadInterface::graph_view_t<Date>> views;
for (auto& t : label_dirs) {
label_t nbr_label = std::get<0>(t);
label_t edge_label = std::get<1>(t);
Direction dir = std::get<2>(t);
if (dir == Direction::kOut) {
views.emplace_back(graph.GetOutgoingGraphView<Date>(
input_label, nbr_label, edge_label));
} else {
CHECK(dir == Direction::kIn);
views.emplace_back(graph.GetIncomingGraphView<Date>(
input_label, nbr_label, edge_label));
}
}
SLVertexColumnBuilder builder(std::get<0>(label_dirs[0]));
std::vector<size_t> offsets;
for (auto& csr : views) {
size_t idx = 0;
for (auto v : casted_input_vertex_list->vertices()) {
csr.foreach_edges_gt(v, max_value, [&](vid_t nbr, const Date& val) {
builder.push_back_opt(nbr);
offsets.push_back(idx);
});
++idx;
}
}
std::shared_ptr<IContextColumn> col = builder.finish();
ctx.set_with_reshuffle(params.alias, col, offsets);
return ctx;
return expand_vertex_ep_gt_sl_impl<Date>(
graph, std::move(ctx), label_dirs, input_label, ep_val,
casted_input_vertex_list.get(), params.alias);
} else if (ed_type == PropertyType::Int64()) {
return expand_vertex_ep_gt_sl_impl<int64_t>(
graph, std::move(ctx), label_dirs, input_label, ep_val,
casted_input_vertex_list.get(), params.alias);
} else {
LOG(FATAL) << "not support edge type";
LOG(ERROR) << "not support edge type" << ed_type.ToString();
return std::nullopt;
}
} else {
if (ed_type == PropertyType::Date()) {
Date max_value(std::stoll(ep_val));
std::vector<GraphReadInterface::graph_view_t<Date>> views;
for (auto& t : label_dirs) {
label_t nbr_label = std::get<0>(t);
label_t edge_label = std::get<1>(t);
Direction dir = std::get<2>(t);
if (dir == Direction::kOut) {
views.emplace_back(graph.GetOutgoingGraphView<Date>(
input_label, nbr_label, edge_label));
} else {
CHECK(dir == Direction::kIn);
views.emplace_back(graph.GetIncomingGraphView<Date>(
input_label, nbr_label, edge_label));
}
}
MSVertexColumnBuilder builder;
size_t csr_idx = 0;
std::vector<size_t> offsets;
for (auto& csr : views) {
label_t nbr_label = std::get<0>(label_dirs[csr_idx]);
size_t idx = 0;
builder.start_label(nbr_label);
LOG(INFO) << "start label: " << static_cast<int>(nbr_label);
for (auto v : casted_input_vertex_list->vertices()) {
csr.foreach_edges_gt(v, max_value, [&](vid_t nbr, const Date& val) {
builder.push_back_opt(nbr);
offsets.push_back(idx);
});
++idx;
}
++csr_idx;
}
std::shared_ptr<IContextColumn> col = builder.finish();
ctx.set_with_reshuffle(params.alias, col, offsets);
return ctx;
return expand_vertex_ep_gt_ml_impl<Date>(
graph, std::move(ctx), label_dirs, input_label, params, ep_val,
casted_input_vertex_list.get(), params.alias);
} else if (ed_type == PropertyType::Int64()) {
return expand_vertex_ep_gt_ml_impl<int64_t>(
graph, std::move(ctx), label_dirs, input_label, params, ep_val,
casted_input_vertex_list.get(), params.alias);
} else {
LOG(FATAL) << "not support edge type";
LOG(ERROR) << "not support edge type" << ed_type.ToString();
return std::nullopt;
}
}
} else {
LOG(FATAL) << "unexpected to reach here...";
return ctx;
LOG(ERROR) << "unexpected to reach here...";
return std::nullopt;
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -289,19 +289,17 @@ class EdgeExpand {
ctx.set_with_reshuffle(params.alias, pair.first, pair.second);
return ctx;
} else {
LOG(FATAL) << "unexpected to reach here...";
return ctx;
LOG(FATAL) << "not support vertex column type "
<< static_cast<int>(input_vertex_list_type);
}
}

static Context expand_vertex_ep_lt(const GraphReadInterface& graph,
Context&& ctx,
const EdgeExpandParams& params,
const std::string& ep_val);
static Context expand_vertex_ep_gt(const GraphReadInterface& graph,
Context&& ctx,
const EdgeExpandParams& params,
const std::string& ep_val);
static std::optional<Context> expand_vertex_ep_lt(
const GraphReadInterface& graph, Context&& ctx,
const EdgeExpandParams& params, const std::string& ep_val);
static std::optional<Context> expand_vertex_ep_gt(
const GraphReadInterface& graph, Context&& ctx,
const EdgeExpandParams& params, const std::string& ep_val);
template <typename PRED_T>
struct SPVPWrapper {
SPVPWrapper(const PRED_T& pred) : pred_(pred) {}
Expand Down
Loading

0 comments on commit c069b44

Please sign in to comment.