diff --git a/flex/engines/graph_db/runtime/adhoc/operators/group_by.cc b/flex/engines/graph_db/runtime/adhoc/operators/group_by.cc index 14f070759c1e..20cff565f700 100644 --- a/flex/engines/graph_db/runtime/adhoc/operators/group_by.cc +++ b/flex/engines/graph_db/runtime/adhoc/operators/group_by.cc @@ -413,6 +413,29 @@ std::shared_ptr string_to_list( return builder.finish(); } +template +std::shared_ptr scalar_to_list( + const Var& var, const std::vector>& to_aggregate) { + ListValueColumnBuilder builder; + size_t col_size = to_aggregate.size(); + builder.reserve(col_size); + std::vector> impls; + for (size_t k = 0; k < col_size; ++k) { + auto& vec = to_aggregate[k]; + + std::vector elem; + for (auto idx : vec) { + elem.push_back(TypedConverter::to_typed(var.get(idx))); + } + auto impl = ListImpl::make_list_impl(std::move(elem)); + auto list = List::make_list(impl); + impls.emplace_back(impl); + builder.push_back_opt(list); + } + builder.set_list_impls(impls); + return builder.finish(); +} + bl::result> apply_reduce( const AggFunc& func, const std::vector>& to_aggregate) { if (func.aggregate == AggrKind::kSum) { @@ -497,6 +520,14 @@ bl::result> apply_reduce( return tuple_to_list(var, to_aggregate); } else if (var.type() == RTAnyType::kStringValue) { return string_to_list(var, to_aggregate); + } else if (var.type() == RTAnyType::kI32Value) { + return scalar_to_list(var, to_aggregate); + } else if (var.type() == RTAnyType::kI64Value) { + return scalar_to_list(var, to_aggregate); + } else if (var.type() == RTAnyType::kU64Value) { + return scalar_to_list(var, to_aggregate); + } else if (var.type() == RTAnyType::kF64Value) { + return scalar_to_list(var, to_aggregate); } else { LOG(FATAL) << "not support" << static_cast(var.type().type_enum_); } diff --git a/flex/engines/graph_db/runtime/common/operators/scan.cc b/flex/engines/graph_db/runtime/common/operators/scan.cc index 9b260008cd71..d61e3fd776ae 100644 --- a/flex/engines/graph_db/runtime/common/operators/scan.cc +++ b/flex/engines/graph_db/runtime/common/operators/scan.cc @@ -47,11 +47,10 @@ bl::result Scan::find_vertex_with_id(const ReadTransaction& txn, } if (GlobalId::get_label_id(gid) == label) { vid = GlobalId::get_vid(gid); + builder.push_back_opt(vid); } else { LOG(ERROR) << "Global id " << gid << " does not match label " << label; - return Context(); } - builder.push_back_opt(vid); Context ctx; ctx.set(alias, builder.finish()); return ctx; diff --git a/flex/engines/graph_db/runtime/common/rt_any.h b/flex/engines/graph_db/runtime/common/rt_any.h index cc5371e4005f..3b73f06e3998 100644 --- a/flex/engines/graph_db/runtime/common/rt_any.h +++ b/flex/engines/graph_db/runtime/common/rt_any.h @@ -420,6 +420,7 @@ class ListImpl : ListImplBase { static std::shared_ptr make_list_impl(std::vector&& vals) { auto new_list = new ListImpl(); new_list->list_ = std::move(vals); + new_list->is_valid_.resize(new_list->list_.size(), true); return std::shared_ptr(static_cast(new_list)); }