Skip to content

Commit

Permalink
fix(interactive): fix some bug with GroupBy (#4165)
Browse files Browse the repository at this point in the history
## Related issue number

<!-- Are there any issues opened that will be resolved by merging this
change? -->

#4162
  • Loading branch information
liulx20 authored Aug 21, 2024
1 parent ae51397 commit 54ad17f
Show file tree
Hide file tree
Showing 4 changed files with 79 additions and 1 deletion.
26 changes: 26 additions & 0 deletions flex/engines/graph_db/runtime/adhoc/operators/group_by.cc
Original file line number Diff line number Diff line change
Expand Up @@ -390,6 +390,28 @@ std::shared_ptr<IContextColumn> tuple_to_list(
return builder.finish();
}

std::shared_ptr<IContextColumn> string_to_list(
const Var& var, const std::vector<std::vector<size_t>>& to_aggregate) {
ListValueColumnBuilder<std::string> builder;
size_t col_size = to_aggregate.size();
builder.reserve(col_size);
std::vector<std::shared_ptr<ListImplBase>> impls;
for (size_t k = 0; k < col_size; ++k) {
auto& vec = to_aggregate[k];

std::vector<std::string> elem;
for (auto idx : vec) {
elem.push_back(std::string(var.get(idx).as_string()));
}
auto impl = ListImpl<std::string_view>::make_list_impl(std::move(elem));
auto list = List::make_list(impl);
impls.emplace_back(impl);
builder.push_back_opt(list);
}
builder.set_list_impls(impls);
return builder.finish();
}

std::shared_ptr<IContextColumn> apply_reduce(
const AggFunc& func, const std::vector<std::vector<size_t>>& to_aggregate) {
if (func.aggregate == AggrKind::kSum) {
Expand Down Expand Up @@ -463,6 +485,10 @@ std::shared_ptr<IContextColumn> apply_reduce(
}
if (var.type() == RTAnyType::kTuple) {
return tuple_to_list(var, to_aggregate);
} else if (var.type() == RTAnyType::kStringValue) {
return string_to_list(var, to_aggregate);
} else {
LOG(FATAL) << "not support" << static_cast<int>(var.type().type_enum_);
}
} else if (func.aggregate == AggrKind::kAvg) {
if (func.vars.size() != 1) {
Expand Down
4 changes: 4 additions & 0 deletions flex/engines/graph_db/runtime/adhoc/utils.cc
Original file line number Diff line number Diff line change
Expand Up @@ -133,6 +133,10 @@ std::shared_ptr<IContextColumnBuilder> create_column_builder(RTAnyType type) {
case RTAnyType::RTAnyTypeImpl::kBoolValue:
// fix me
return std::make_shared<ValueColumnBuilder<bool>>();
case RTAnyType::RTAnyTypeImpl::kEdge:
return std::make_shared<BDMLEdgeColumnBuilder>();
case RTAnyType::RTAnyTypeImpl::kStringSetValue:
return std::make_shared<ValueColumnBuilder<std::set<std::string>>>();
default:
LOG(FATAL) << "unsupport type: " << static_cast<int>(type.type_enum_);
break;
Expand Down
4 changes: 3 additions & 1 deletion flex/engines/graph_db/runtime/common/rt_any.cc
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@ RTAnyType parse_from_ir_data_type(const ::common::IrDataType& dt) {
case ::common::DataType::DATE32:
return RTAnyType::kDate32;
case ::common::DataType::STRING_ARRAY:
return RTAnyType::kStringSetValue;
return RTAnyType::kList;
case ::common::DataType::TIMESTAMP:
return RTAnyType::kDate32;
case ::common::DataType::DOUBLE:
Expand Down Expand Up @@ -175,6 +175,8 @@ RTAny& RTAny::operator=(const RTAny& rhs) {
value_.f64_val = rhs.value_.f64_val;
} else if (type_ == RTAnyType::kMap) {
value_.map = rhs.value_.map;
} else if (type_ == RTAnyType::kEdge) {
value_.edge = rhs.value_.edge;
} else {
LOG(FATAL) << "unexpected type: " << static_cast<int>(type_.type_enum_);
}
Expand Down
46 changes: 46 additions & 0 deletions flex/engines/graph_db/runtime/common/rt_any.h
Original file line number Diff line number Diff line change
Expand Up @@ -456,7 +456,53 @@ class ListImpl : ListImplBase {
std::vector<T> list_;
std::vector<bool> is_valid_;
};
template <>
class ListImpl<std::string_view> : public ListImplBase {
public:
ListImpl() = default;
static std::shared_ptr<ListImplBase> make_list_impl(
std::vector<std::string>&& vals) {
auto new_list = new ListImpl<std::string_view>();
new_list->list_ = std::move(vals);
new_list->is_valid_.resize(new_list->list_.size(), true);
return std::shared_ptr<ListImplBase>(static_cast<ListImplBase*>(new_list));
}

static std::shared_ptr<ListImplBase> make_list_impl(
const std::vector<RTAny>& vals) {
auto new_list = new ListImpl<std::string_view>();
for (auto& val : vals) {
if (val.is_null()) {
new_list->is_valid_.push_back(false);
new_list->list_.push_back("");
} else {
new_list->list_.push_back(
std::string(TypedConverter<std::string_view>::to_typed(val)));
new_list->is_valid_.push_back(true);
}
}
return std::shared_ptr<ListImplBase>(static_cast<ListImplBase*>(new_list));
}

bool operator<(const ListImplBase& p) const {
return list_ < (dynamic_cast<const ListImpl<std::string_view>&>(p)).list_;
}
bool operator==(const ListImplBase& p) const {
return list_ == (dynamic_cast<const ListImpl<std::string_view>&>(p)).list_;
}
size_t size() const { return list_.size(); }
RTAny get(size_t idx) const {
if (is_valid_[idx]) {
return TypedConverter<std::string_view>::from_typed(
std::string_view(list_[idx].data(), list_[idx].size()));
} else {
return RTAny(RTAnyType::kNull);
}
}

std::vector<std::string> list_;
std::vector<bool> is_valid_;
};
} // namespace runtime

} // namespace gs
Expand Down

0 comments on commit 54ad17f

Please sign in to comment.