Skip to content

Commit

Permalink
fix intersect
Browse files Browse the repository at this point in the history
  • Loading branch information
liulx20 committed Jan 16, 2025
1 parent 6843a29 commit 4c7fed8
Show file tree
Hide file tree
Showing 3 changed files with 10 additions and 95 deletions.
17 changes: 8 additions & 9 deletions flex/engines/graph_db/app/cypher_app_utils.cc
Original file line number Diff line number Diff line change
Expand Up @@ -79,10 +79,13 @@ bool generate_plan(
const std::string& compiler_yaml, const std::string& tmp_dir,
std::unordered_map<std::string, physical::PhysicalPlan>& plan_cache) {
// dump query to file
const char* graphscope_dir = getenv("GRAPHSCOPE_DIR");
if (graphscope_dir == nullptr) {
std::cerr << "GRAPHSCOPE_DIR is not set!" << std::endl;
graphscope_dir = "../../../GraphScope/";
const char* compiler_jar = getenv("COMPILER_JAR");
if (compiler_jar == nullptr) {
std::cerr << "COMPILER_JAR is not set!" << std::endl;
compiler_jar =
"../../interactive_engine/compiler/target/"
"compiler-0.0.1-SNAPSHOT.jar:../../interactive_engine/compiler/target/"
"libs/*";
}

auto id = std::this_thread::get_id();
Expand All @@ -94,11 +97,7 @@ bool generate_plan(
tmp_dir + "/compiler_config_" + thread_id + ".yaml";
const std::string query_file = tmp_dir + "/temp" + thread_id + ".cypher";
const std::string output_file = tmp_dir + "/temp" + thread_id + ".pb";
const std::string jar_path = std::string(graphscope_dir) +
"/interactive_engine/compiler/target/"
"compiler-0.0.1-SNAPSHOT.jar:" +
std::string(graphscope_dir) +
"/interactive_engine/compiler/target/libs/*";
const std::string jar_path = compiler_jar;
const std::string schema_path = "-Dgraph.schema=" + compiler_yaml;
auto raw_query = query;
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,86 +23,6 @@ namespace gs {

namespace runtime {

static void ensure_sorted(std::shared_ptr<ValueColumn<size_t>> idx_col,
std::shared_ptr<IContextColumn> val_col) {
auto& idx_col_ref = *idx_col;
size_t row_num = idx_col_ref.size();
for (size_t k = 1; k < row_num; ++k) {
CHECK_GE(idx_col_ref.get_value(k), idx_col_ref.get_value(k - 1));
}
}

Context Intersect::intersect(Context&& ctx,
std::vector<std::tuple<Context, int, int>>&& ctxs,
int alias) {
std::vector<std::pair<std::shared_ptr<ValueColumn<size_t>>,
std::shared_ptr<IContextColumn>>>
cols;
for (auto& c : ctxs) {
auto& this_ctx = std::get<0>(c);
int idx_col = std::get<1>(c);
int value_col = std::get<2>(c);
cols.emplace_back(
std::dynamic_pointer_cast<ValueColumn<size_t>>(this_ctx.get(idx_col)),
this_ctx.get(value_col));
}
for (auto& pair : cols) {
ensure_sorted(pair.first, pair.second);
}
if (cols.size() == 2) {
auto& idx_col0 = *cols[0].first;
auto& idx_col1 = *cols[1].first;

size_t rn0 = idx_col0.size();
size_t rn1 = idx_col1.size();

CHECK(cols[0].second->column_type() == cols[1].second->column_type());
if (cols[0].second->column_type() == ContextColumnType::kVertex) {
auto vlist0_ptr =
std::dynamic_pointer_cast<IVertexColumn>(cols[0].second);
auto vlist1_ptr =
std::dynamic_pointer_cast<IVertexColumn>(cols[1].second);
if (vlist0_ptr->vertex_column_type() == VertexColumnType::kSingle &&
vlist1_ptr->vertex_column_type() == VertexColumnType::kSingle) {
auto& vlist0 = *std::dynamic_pointer_cast<SLVertexColumn>(vlist0_ptr);
auto& vlist1 = *std::dynamic_pointer_cast<SLVertexColumn>(vlist1_ptr);

std::vector<size_t> shuffle_offsets;
SLVertexColumnBuilder builder(*vlist0.get_labels_set().begin());

size_t idx0 = 0, idx1 = 0;
std::set<vid_t> lhs_set;
while (idx0 < rn0 && idx1 < rn1) {
if (idx_col0.get_value(idx0) < idx_col1.get_value(idx1)) {
++idx0;
} else if (idx_col0.get_value(idx0) > idx_col1.get_value(idx1)) {
++idx1;
} else {
lhs_set.clear();
size_t common_index = idx_col0.get_value(idx0);
while (idx_col0.get_value(idx0) == common_index) {
lhs_set.insert(vlist0.get_vertex(idx0).vid_);
++idx0;
}
while (idx_col1.get_value(idx1) == common_index) {
vid_t cur_v = vlist1.get_vertex(idx1).vid_;
if (lhs_set.find(cur_v) != lhs_set.end()) {
shuffle_offsets.push_back(common_index);
builder.push_back_opt(cur_v);
}
++idx1;
}
}
}

ctx.set_with_reshuffle(alias, builder.finish(), shuffle_offsets);
return ctx;
}
}
}

LOG(FATAL) << "not support";
}
static Context left_outer_intersect(Context&& ctx, Context&& ctx0,
Context&& ctx1, int key) {
// specifically, this function is called when the first context is not
Expand Down Expand Up @@ -323,7 +243,7 @@ static Context intersect_impl(Context&& ctx, std::vector<Context>&& ctxs,
}
}
if (i < ctxs[1].col_num()) {
if (ctxs[1].columns[i] != nullptr) {
if (ctxs[1].columns[i] != nullptr && ctx.get(i) == nullptr) {
ctx.set(i, ctxs[1].get(i));
}
} else if (i > ctx.col_num()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,11 +27,7 @@ namespace runtime {

class Intersect {
public:
static Context intersect(Context&& ctx,
std::vector<std::tuple<Context, int, int>>&& ctxs,
int alias);

static Context intersect(Context&& ctx, std::vector<Context>&& ctxs, int key);
static Context intersect(Context&& ctx, std::vector<Context>&& ctxs, int key);
};

} // namespace runtime
Expand Down

0 comments on commit 4c7fed8

Please sign in to comment.