Skip to content

Commit

Permalink
feat(interactive): Update graph graph_algo (#3486)
Browse files Browse the repository at this point in the history
- Update the sample graph `graph_algo` and add some example`cypher`
queries.
- Fix some bugs.
  • Loading branch information
zhanglei1949 authored Feb 2, 2024
1 parent cdc4d02 commit 3e85935
Show file tree
Hide file tree
Showing 36 changed files with 1,404 additions and 861 deletions.
2 changes: 1 addition & 1 deletion docs/flex/interactive/custom_graph_data.md
Original file line number Diff line number Diff line change
Expand Up @@ -211,4 +211,4 @@ Now you can move to [Stored Procedure](./stored_procedures) to explore querying

## Try other graphs

In addition to `movies` graph, we have also prepared the `graph_algo` graph. You can find the raw CSV files, graph.yaml, and import.yaml in the `./examples/graph_algo/` directory. You can import the `graph_algo` graph just like importing the `movies` graph.
In addition to `movies` graph, we have also prepared the `graph_algo` graph. You can find the raw CSV files, graph.yaml, and import.yaml in the `./examples/graph_algo/` directory. You can import the `graph_algo` graph just like importing the `movies` graph. There are also some sample cypher queries, you can find them at [GraphScope/flex/interactive/examples/graph_algo](https://github.com/alibaba/GraphScope/tree/main/flex/interactive/examples/graph_algo).
253 changes: 152 additions & 101 deletions flex/engines/hqps_db/core/operator/edge_expand.h
Original file line number Diff line number Diff line change
Expand Up @@ -159,73 +159,6 @@ class EdgeExpand {
return EdgeExpandVFromSingleLabel(state);
}

/// @brief Directly obtain vertices from edge, without property and apply from
/// multi label set, Activation: MultiLabelVertexSet, TruePredicate.
/// @tparam EDATA_T
/// @tparam VERTEX_SET_T
/// @param frag
/// @param v_sets
/// @param edge_expand_opt
/// @return
template <
typename VERTEX_SET_T, typename... SELECTOR,
typename RES_T = std::pair<vertex_set_t, std::vector<offset_t>>,
typename std::enable_if<VERTEX_SET_T::is_multi_label>::type* = nullptr>
static RES_T EdgeExpandV(const GRAPH_INTERFACE& graph,
const VERTEX_SET_T& cur_vertex_set,
Direction direction, label_id_t edge_label,
label_id_t other_label,
Filter<TruePredicate, SELECTOR...>&& edge_filter,
size_t limit = INT_MAX) {
auto state = EdgeExpandVState(graph, cur_vertex_set, direction, edge_label,
other_label, std::move(edge_filter), limit);

std::vector<vertex_id_t> vids;
std::vector<offset_t> offset;
static constexpr size_t num_src_labels = VERTEX_SET_T::num_labels;
using nbr_list_array_t = typename GRAPH_INTERFACE::nbr_list_array_t;
std::vector<nbr_list_array_t> nbr_lists;
for (size_t i = 0; i < num_src_labels; ++i) {
auto& cur_set = state.cur_vertex_set_.GetSet(i);
label_id_t src_label, dst_label;
std::tie(src_label, dst_label) = get_graph_label_pair(
direction, cur_set.GetLabel(), state.other_label_);
VLOG(10) << "[EdgeExpandVMultiSrcLabel: from label: "
<< cur_set.GetLabel() << ", other label: " << state.other_label_
<< ",edge label: " << state.edge_label_ << "src: " << src_label
<< ",dst: " << dst_label << ",dire: " << state.direction_;
auto nbr_list_array = state.graph_.GetOtherVertices(
src_label, dst_label, state.edge_label_, cur_set.GetVertices(),
gs::to_string(state.direction_), state.limit_);
nbr_lists.emplace_back(std::move(nbr_list_array));
}

offset.reserve(state.cur_vertex_set_.Size() + 1);
// first gather size.
offset.emplace_back(vids.size());
for (auto iter : state.cur_vertex_set_) {
auto vid = iter.GetVertex();
auto cur_set_ind = iter.GetCurInd();
auto set_inner_ind = iter.GetCurSetInnerInd();
CHECK(nbr_lists.size() > cur_set_ind);
CHECK(nbr_lists[cur_set_ind].size() > set_inner_ind);
auto& cur_array = nbr_lists[cur_set_ind];
auto cur_nbr_list = cur_array.get(set_inner_ind);
// VLOG(10) << "vertex: " << vid << ", num nbrs: " << cur_nbr_list.size();

for (auto nbr : cur_nbr_list) {
// TODO: use edge_filter to filter.
vids.emplace_back(nbr.neighbor());
}
offset.emplace_back(vids.size());
}
VLOG(10) << "vids size: " << vids.size();
VLOG(10) << "offset: " << gs::to_string(offset);
vertex_set_t result_set(std::move(vids), state.other_label_);
auto pair = std::make_pair(std::move(result_set), std::move(offset));
return pair;
}

/// @brief Directly obtain vertices from two label vertex set.
/// multi label set
/// Activation: From two label set, TruePredicate.
Expand Down Expand Up @@ -504,6 +437,112 @@ class EdgeExpand {
return std::make_pair(std::move(res_set), std::move(res_offset));
}

/// @brief Directly obtain multiple label triplets.
/// @tparam EDATA_T
/// @tparam VERTEX_SET_T
/// @param frag
/// @param v_sets
/// @param edge_expand_opt
/// @return
template <typename VERTEX_SET_T, typename EDGE_FILTER_T, typename... SET_T>
static auto EdgeExpandV(
const GRAPH_INTERFACE& graph, const VERTEX_SET_T& cur_vertex_set,
Direction direction,
const std::vector<std::array<label_id_t, 3>>& edge_triplets,
const EDGE_FILTER_T& edge_filter) {
CHECK(edge_triplets.size() > 0);
using result_pair_t = std::pair<vertex_set_t, std::vector<offset_t>>;
std::vector<result_pair_t> result_pairs;
for (auto i = 0; i < edge_triplets.size(); ++i) {
auto copied_filter = edge_filter;
result_pairs.emplace_back(
EdgeExpandV(graph, cur_vertex_set, direction, edge_triplets[i][2],
edge_triplets[i][1], std::move(copied_filter)));
}

size_t offset_array_size = result_pairs[0].second.size();

VLOG(10) << "prev set size: " << cur_vertex_set.Size()
<< ", new offset size: " << offset_array_size;
CHECK(offset_array_size == cur_vertex_set.Size() + 1);
size_t prev_set_size = cur_vertex_set.Size();

std::vector<vertex_id_t> res_vids;
std::vector<grape::Bitset> res_bitset;
std::unordered_map<label_id_t, int32_t> label_to_ind;
std::vector<offset_t> res_offset;

size_t num_labels = 0;
{
for (auto i = 0; i < edge_triplets.size(); ++i) {
auto& triplet = edge_triplets[i];
if (direction == Direction::In || direction == Direction::Both) {
if (label_to_ind.find(triplet[0]) == label_to_ind.end()) {
label_to_ind[triplet[0]] = num_labels++;
}
}
if (direction == Direction::Out || direction == Direction::Both) {
if (label_to_ind.find(triplet[1]) == label_to_ind.end()) {
label_to_ind[triplet[1]] = num_labels++;
}
}
}
VLOG(10) << "num labels: " << num_labels;
}
res_bitset.resize(num_labels);

size_t total_size = 0;
for (size_t i = 0; i < result_pairs.size(); ++i) {
total_size += result_pairs[i].first.Size();
}
VLOG(10) << "total size: " << total_size;
res_vids.reserve(total_size);
res_offset.reserve(prev_set_size + 1);
for (size_t i = 0; i < num_labels; ++i) {
res_bitset[i].init(total_size);
}

size_t cur_ind = 0;
res_offset.emplace_back(0);
for (size_t i = 0; i < prev_set_size; ++i) {
for (size_t j = 0; j < result_pairs.size(); ++j) {
auto& vertex_set = result_pairs[j].first;
auto& vertex_set_label = vertex_set.GetLabel();
CHECK(label_to_ind.find(vertex_set_label) != label_to_ind.end())
<< "label " << vertex_set_label << " not found";
auto res_label_ind = label_to_ind[vertex_set_label];
auto& offset_array = result_pairs[j].second;
auto& vec = vertex_set.GetVertices();
auto start_off = offset_array[i];
auto end_off = offset_array[i + 1];
for (auto k = start_off; k < end_off; ++k) {
res_vids.emplace_back(vec[k]);
res_bitset[res_label_ind].set_bit(cur_ind);
// res_bitset[j].set_bit(cur_ind);
cur_ind += 1;
}
}
res_offset.emplace_back(cur_ind);
}
CHECK(cur_ind == total_size);
std::vector<label_id_t> copied_labels;
for (auto pair : label_to_ind) {
copied_labels.emplace_back(pair.first);
}
CHECK(copied_labels.size() == num_labels &&
res_bitset.size() == num_labels);
if constexpr (sizeof...(SET_T) > 0) {
GeneralVertexSet<vertex_id_t, label_id_t, SET_T...> res_set(
std::move(res_vids), std::move(copied_labels), std::move(res_bitset));

return std::make_pair(std::move(res_set), std::move(res_offset));
} else {
GeneralVertexSet<vertex_id_t, label_id_t, grape::EmptyType> res_set(
std::move(res_vids), std::move(copied_labels), std::move(res_bitset));
return std::make_pair(std::move(res_set), std::move(res_offset));
}
}

/// @brief Directly obtain multiple label vertices from edge. specialization
/// for two label.
/// @tparam EDATA_T
Expand Down Expand Up @@ -1103,8 +1142,8 @@ class EdgeExpand {
T...>& state) {
auto prop_names = state.prop_names_;
static constexpr size_t num_labels = VERTEX_SET_T::num_labels;
auto& general_set = state.cur_vertex_set_;
auto total_vertices_num = general_set.Size();
auto& two_label_set = state.cur_vertex_set_;
auto total_vertices_num = two_label_set.Size();
VLOG(10) << "[EdgeExpandETwoLabelSetImpl]" << prop_names.size()
<< ", total vnum: " << total_vertices_num;

Expand All @@ -1113,7 +1152,7 @@ class EdgeExpand {
adj_list_array_t res_adj_list_arrays;
res_adj_list_arrays.resize(total_vertices_num);
// overall vid array.
std::vector<vertex_id_t> vids_arrays(general_set.GetVertices());
std::vector<vertex_id_t> vids_arrays(two_label_set.GetVertices());
std::array<std::vector<offset_t>, num_labels> offset_arrays;

label_id_t src_label, dst_label;
Expand All @@ -1122,35 +1161,41 @@ class EdgeExpand {
for (size_t i = 0; i < num_labels; ++i) {
if (state.direction_ == Direction::In) {
src_label = state.other_label_;
dst_label = general_set.GetLabel(i);
dst_label = two_label_set.GetLabel(i);
} else if (state.direction_ == Direction::Out) {
src_label = two_label_set.GetLabel(i);
dst_label = state.other_label_;
} else {
src_label = general_set.GetLabel(i);
// If direction is both, we need to make sure what is src and what is
// dst.
src_label = two_label_set.GetLabel(i);
dst_label = state.other_label_;
auto& schema = state.graph_.schema();
if (!schema.exist(src_label, dst_label, state.edge_label_)) {
std::swap(src_label, dst_label);
}
}
VLOG(1) << "src label: " << (int) src_label
<< ", dst label: " << (int) dst_label;
std::vector<vertex_id_t> cur_vids;
std::vector<int32_t> cur_active_inds;
std::tie(cur_vids, cur_active_inds) = general_set.GetVertices(i);
std::tie(cur_vids, cur_active_inds) = two_label_set.GetVertices(i);
auto tmp = state.graph_.template GetEdges<T...>(
src_label, dst_label, state.edge_label_, cur_vids, direction_str,
state.limit_, prop_names);
CHECK(tmp.size() == cur_active_inds.size());
if constexpr (GRAPH_INTERFACE::is_grape) {
// for grape graph, we can use operator =, since all data is already in
// memory
for (size_t j = 0; j < cur_active_inds.size(); ++j) {
// res_adj_list_arrays[cur_active_inds[j]] = tmp.get(j);
res_adj_list_arrays.set(cur_active_inds[j], tmp.get(j));
}
} else {
for (size_t j = 0; j < cur_active_inds.size(); ++j) {
res_adj_list_arrays.get_vector(cur_active_inds[j])
.swap(tmp.get_vector(j));
}
if (i == 0) {
// first time, update flag field.
res_adj_list_arrays.set_flag(tmp.get_flag());
}

for (size_t j = 0; j < cur_active_inds.size(); ++j) {
res_adj_list_arrays.set(cur_active_inds[j], tmp.get(j));
}
}

std::vector<size_t> offset;
offset.reserve(general_set.Size() + 1);
offset.reserve(two_label_set.Size() + 1);
size_t size = 0;
offset.emplace_back(size);
// Construct offset from adj_list.
Expand All @@ -1161,18 +1206,20 @@ class EdgeExpand {
}
VLOG(10) << "num edges: " << size;
VLOG(10) << "offset: array: " << gs::to_string(offset);
auto copied_labels(general_set.GetLabels());
auto& old_bitset = general_set.GetBitset();
auto copied_labels(two_label_set.GetLabels());
auto& old_bitset = two_label_set.GetBitset();
grape::Bitset new_bitset;
new_bitset.init(old_bitset.cardinality());
for (size_t i = 0; i < old_bitset.cardinality(); ++i) {
new_bitset.set_bit(i);
}

auto prop_names_vec = array_to_vec(prop_names);

GeneralEdgeSet<num_labels, GRAPH_INTERFACE, vertex_id_t, label_id_t,
std::tuple<T...>, std::tuple<T...>>
edge_set(std::move(vids_arrays), std::move(res_adj_list_arrays),
std::move(new_bitset), prop_names, state.edge_label_,
std::move(new_bitset), prop_names_vec, state.edge_label_,
copied_labels, state.other_label_, state.direction_);
CHECK(offset.back() == edge_set.Size())
<< "offset: " << offset.back() << ", " << edge_set.Size();
Expand Down Expand Up @@ -1209,9 +1256,18 @@ class EdgeExpand {
if (state.direction_ == Direction::In) {
src_label = state.other_label_;
dst_label = two_label_set.GetLabel(i);
} else if (state.direction_ == Direction::Out) {
src_label = two_label_set.GetLabel(i);
dst_label = state.other_label_;
} else {
// If direction is both, we need to make sure what is src and what is
// dst.
src_label = two_label_set.GetLabel(i);
dst_label = state.other_label_;
auto& schema = state.graph_.schema();
if (!schema.exist(src_label, dst_label, state.edge_label_)) {
std::swap(src_label, dst_label);
}
}
std::vector<vertex_id_t> cur_vids;
std::vector<int32_t> cur_active_inds;
Expand All @@ -1220,18 +1276,13 @@ class EdgeExpand {
src_label, dst_label, state.edge_label_, cur_vids, direction_str,
state.limit_, prop_names);
CHECK(tmp.size() == cur_active_inds.size());
if constexpr (GRAPH_INTERFACE::is_grape) {
// for grape graph, we can use operator =, since all data is already in
// memory
for (size_t j = 0; j < cur_active_inds.size(); ++j) {
// res_adj_list_arrays[cur_active_inds[j]] = tmp.get(j);
res_adj_list_arrays.set(cur_active_inds[j], tmp.get(j));
}
} else {
for (size_t j = 0; j < cur_active_inds.size(); ++j) {
res_adj_list_arrays.get_vector(cur_active_inds[j])
.swap(tmp.get_vector(j));
}
if (i == 0) {
// first time, update flag field.
res_adj_list_arrays.set_flag(tmp.get_flag());
}
for (size_t j = 0; j < cur_active_inds.size(); ++j) {
// res_adj_list_arrays[cur_active_inds[j]] = tmp.get(j);
res_adj_list_arrays.set(cur_active_inds[j], tmp.get(j));
}
}
using edge_tuple_t = std::tuple<vertex_id_t, vertex_id_t, std::tuple<T...>>;
Expand Down
21 changes: 21 additions & 0 deletions flex/engines/hqps_db/core/sync_engine.h
Original file line number Diff line number Diff line change
Expand Up @@ -393,6 +393,27 @@ class SyncEngine : public BaseEngine {
// old context will be abondon here.
}

template <AppendOpt opt, int alias_to_use, typename CTX_HEAD_T, int cur_alias,
int base_tag, typename... CTX_PREV, typename LabelT,
typename EDGE_FILTER_T>
static auto EdgeExpandV(
const GRAPH_INTERFACE& graph,
Context<CTX_HEAD_T, cur_alias, base_tag, CTX_PREV...>&& ctx,
EdgeExpandVMultiTripletOpt<LabelT, EDGE_FILTER_T>&& edge_expand_opt) {
// Unwrap params here.
auto& select_node = gs::Get<alias_to_use>(ctx);
// Modifiy offsets.
// pass select node by reference.
auto pair = EdgeExpand<GRAPH_INTERFACE>::EdgeExpandV(
graph, select_node, edge_expand_opt.direction_,
edge_expand_opt.edge_label_triplets_,
std::move(edge_expand_opt.edge_filter_));
// create new context node, update offsets.
return ctx.template AddNode<opt>(std::move(pair.first),
std::move(pair.second), alias_to_use);
// old context will be abondon here.
}

//////////////////////////////////////Path Expand/////////////////////////
// Path Expand to vertices with columns
template <AppendOpt opt, int alias_to_use, typename VERTEX_FILTER_T,
Expand Down
Loading

0 comments on commit 3e85935

Please sign in to comment.