Skip to content

Commit

Permalink
fix(interactive): Support Index predicate for dynamic params in IR-Co…
Browse files Browse the repository at this point in the history
…re (#3282)

<!--
Thanks for your contribution! please review
https://github.com/alibaba/GraphScope/blob/main/CONTRIBUTING.md before
opening an issue.
-->

## What do these changes do?

<!-- Please give a short brief about these changes. -->

As titled.

## Related issue number

<!-- Are there any issues opened that will be resolved by merging this
change? -->

#3277

---------

Co-authored-by: zhanglei1949 <[email protected]>
Co-authored-by: Longbin Lai <[email protected]>
  • Loading branch information
3 people authored Oct 19, 2023
1 parent 7cb43e9 commit f813f9b
Show file tree
Hide file tree
Showing 9 changed files with 375 additions and 187 deletions.
270 changes: 134 additions & 136 deletions flex/codegen/src/hqps/hqps_scan_builder.h
Original file line number Diff line number Diff line change
Expand Up @@ -54,8 +54,18 @@ static constexpr const char* SCAN_OP_TEMPLATE_NO_EXPR_STR =
/// 3. graph name
/// 4. vertex label
/// 5. oid
static constexpr const char* SCAN_OP_WITH_OID_TEMPLATE_STR =
"auto %1% = Engine::template ScanVertex<%2%>(%3%, %4%, %5%));\n";
static constexpr const char* SCAN_OP_WITH_OID_ONE_LABEL_TEMPLATE_STR =
"auto %1% = Engine::template ScanVertexWithOid<%2%>(%3%, %4%, %5%);\n";

/// Args
/// 1. res_ctx_name
/// 2. AppendOpt,
/// 3. graph name
/// 4. vertex label
/// 5. oid
static constexpr const char* SCAN_OP_WITH_OID_MUL_LABEL_TEMPLATE_STR =
"auto %1% = Engine::template ScanVertexWithOid<%2%>(%3%, "
"std::array<label_id_t, %4%>{%5%}, %6%);\n";

/**
* @brief When building scanOp, we ignore the data type provided in the pb.
Expand Down Expand Up @@ -85,16 +95,67 @@ class ScanOpBuilder {
if (!query_params.has_predicate()) {
VLOG(10) << "No expr in params";
}
query_params_ = query_params;
CHECK(labels_ids_.empty()) << "label ids should be empty";
if (!try_to_get_label_id_from_query_params(query_params, labels_ids_)) {
LOG(FATAL) << "fail to label id from expr";
}

// the user provide oid can be a const or a param const
if (query_params.has_predicate()) {
auto& predicate = query_params.predicate();
VLOG(10) << "predicate: " << predicate.DebugString();
// We first scan the predicate to find whether there is conditions on
// labels.
std::vector<int32_t> expr_label_ids;
if (try_to_get_label_ids_from_expr(predicate, expr_label_ids)) {
// join expr_label_ids with table_lable_ids;
VLOG(10) << "Found label ids in expr: "
<< gs::to_string(expr_label_ids);
intersection(labels_ids_, expr_label_ids);
}

auto expr_builder = ExprBuilder(ctx_);
expr_builder.set_return_type(common::DataType::BOOLEAN);
expr_builder.AddAllExprOpr(query_params.predicate().operators());

std::string expr_code;
std::vector<codegen::ParamConst> func_call_param_const;
std::vector<std::pair<int32_t, std::string>> expr_tag_props;
std::vector<common::DataType> unused_expr_ret_type;
std::tie(expr_func_name_, func_call_param_const, expr_tag_props,
expr_code, unused_expr_ret_type) = expr_builder.Build();
VLOG(10) << "Found expr in scan: " << expr_func_name_;
// generate code.
ctx_.AddExprCode(expr_code);
expr_var_name_ = ctx_.GetNextExprVarName();
{
std::stringstream ss;
for (auto i = 0; i < func_call_param_const.size(); ++i) {
ss << func_call_param_const[i].var_name;
if (i != func_call_param_const.size() - 1) {
ss << ",";
}
}
expr_construct_params_ = ss.str();
}
{
std::stringstream ss;
if (expr_tag_props.size() > 0) {
ss << ",";
for (auto i = 0; i + 1 < expr_tag_props.size(); ++i) {
ss << expr_tag_props[i].second << ", ";
}
ss << expr_tag_props[expr_tag_props.size() - 1].second;
}
selectors_str_ = ss.str();
}
}
return *this;
}

ScanOpBuilder& idx_predicate(const algebra::IndexPredicate& predicate) {
// check query_params not has predicate.
if (query_params_.has_predicate()) {
VLOG(10) << "query params already has predicate";
return *this;
}

// Currently we only support one predicate.
if (predicate.or_predicates_size() < 1) {
VLOG(10) << "No predicate in index predicate";
Expand All @@ -104,151 +165,82 @@ class ScanOpBuilder {
throw std::runtime_error(
std::string("Currently only support one predicate"));
}
CHECK(expr_func_name_.empty()) << "Predicate is already given by expr";
auto or_predicate = predicate.or_predicates(0);
if (or_predicate.predicates_size() != 1) {
throw std::runtime_error(
std::string("Currently only support one and predicate"));
}
auto triplet = or_predicate.predicates(0);
// add index predicate to query params
auto* new_predicate = query_params_.mutable_predicate();
{
auto first_op = new_predicate->add_operators();
common::Variable variable;
auto& property = triplet.key();
*(variable.mutable_property()) = property;
variable.mutable_node_type()->set_data_type(common::DataType::INT64);
*(first_op->mutable_var()) = variable;
}
{
auto second = new_predicate->add_operators();
second->set_logical(common::Logical::EQ);
second->mutable_node_type()->set_data_type(common::DataType::BOOLEAN);
}
{
auto third = new_predicate->add_operators();
auto& value = triplet.value();
third->mutable_node_type()->set_data_type(common::DataType::INT64);
*(third->mutable_const_()) = value;
auto& property = triplet.key();
if (triplet.value_case() == algebra::IndexPredicate::Triplet::kConst) {
// FUTURE: check property is really the primary key.
auto const_value = triplet.const_();
switch (const_value.item_case()) {
case common::Value::kI32:
oid_ = std::to_string(const_value.i32());
break;
case common::Value::kI64:
oid_ = std::to_string(const_value.i64());
break;
default:
LOG(FATAL) << "Currently only support int, long as primary key";
}
VLOG(1) << "Found oid: " << oid_ << " in index scan";
} else {
// dynamic param
auto dyn_param_pb = triplet.param();
auto param_const = param_const_pb_to_param_const(dyn_param_pb);
VLOG(10) << "receive param const in index predicate: "
<< dyn_param_pb.DebugString();
ctx_.AddParameterVar(param_const);
}
VLOG(10) << "Add index predicate to query params: "
<< query_params_.DebugString();

return *this;
}

std::string Build() const {
std::string label_name;
std::vector<int32_t> labels_ids;
if (!try_to_get_label_name_from_query_params(query_params_, label_name)) {
LOG(WARNING) << "fail to label name from expr";
if (!try_to_get_label_id_from_query_params(query_params_, labels_ids)) {
LOG(FATAL) << "fail to label id from expr";
}
}

// the user provide oid can be a const or a param const
if (query_params_.has_predicate()) {
auto& predicate = query_params_.predicate();
VLOG(10) << "predicate: " << predicate.DebugString();
// We first scan the predicate to find whether there is conditions on
// labels.
std::vector<int32_t> expr_label_ids;
if (try_to_get_label_ids_from_expr(predicate, expr_label_ids)) {
// join expr_label_ids with table_lable_ids;
VLOG(10) << "Found label ids in expr: "
<< gs::to_string(expr_label_ids);
intersection(labels_ids, expr_label_ids);
}
}
// CHECK(labels_ids.size() == 1) << "only support one label in scan";

#ifdef FAST_SCAN
gs::codegen::oid_t oid;
gs::codegen::ParamConst oid_param;
if (try_to_get_oid_from_expr(predicate, oid)) {
VLOG(10) << "Parse oid: " << oid << "from expr";
return scan_with_oid(label_name, label_id, oid);
} else if (try_to_get_oid_param_from_expr(predicate, oid_param)) {
VLOG(10) << "Parse oid param: " << oid_param.var_name << "from expr";
return scan_with_oid(label_name, label_id, oid_param.var_name);
// 1. If common expression predicate presents, scan with expression
if (!expr_func_name_.empty()) {
VLOG(1) << "Scan with expression";
return scan_with_expr(labels_ids_, expr_var_name_, expr_func_name_,
expr_construct_params_, selectors_str_);
} else {
VLOG(10) << "Fail to parse oid from expr";
{
#endif
if (query_params_.has_predicate()) {
auto expr_builder = ExprBuilder(ctx_);
expr_builder.set_return_type(common::DataType::BOOLEAN);
expr_builder.AddAllExprOpr(query_params_.predicate().operators());

std::string expr_func_name, expr_code;
std::vector<codegen::ParamConst> func_call_param_const;
std::vector<std::pair<int32_t, std::string>> expr_tag_props;
std::vector<common::DataType> unused_expr_ret_type;
std::tie(expr_func_name, func_call_param_const, expr_tag_props,
expr_code, unused_expr_ret_type) = expr_builder.Build();
VLOG(10) << "Found expr in scan: " << expr_func_name;
// generate code.
ctx_.AddExprCode(expr_code);
std::string expr_var_name = ctx_.GetNextExprVarName();
std::string
expr_construct_params; // function construction params and
std::string selectors_str; // selectors str, concatenated
{
std::stringstream ss;
for (auto i = 0; i < func_call_param_const.size(); ++i) {
ss << func_call_param_const[i].var_name;
if (i != func_call_param_const.size() - 1) {
ss << ",";
}
}
expr_construct_params = ss.str();
}
{
std::stringstream ss;
if (expr_tag_props.size() > 0) {
ss << ",";
for (auto i = 0; i + 1 < expr_tag_props.size(); ++i) {
ss << expr_tag_props[i].second << ", ";
}
ss << expr_tag_props[expr_tag_props.size() - 1].second;
}
selectors_str = ss.str();
}

// use expression to filter.
return scan_with_expr(labels_ids, expr_var_name, expr_func_name,
expr_construct_params, selectors_str);
} else {
return scan_without_expr(labels_ids);
}

#ifdef FAST_SCAN
// If oid_ not empty, scan with oid
if (!oid_.empty()) {
VLOG(1) << "Scan with oid: " << oid_;
return scan_with_oid(labels_ids_, oid_);
} else {
// If no oid, scan without expression
VLOG(1) << "Scan without expression";
return scan_without_expr(labels_ids_);
}
}
#endif
}

private:
std::string scan_with_oid(const std::string& label_name,
const int32_t& label_id, codegen::oid_t oid) const {
VLOG(10) << "Scan with fixed oid" << oid;
std::string next_ctx_name = ctx_.GetCurCtxName();
auto append_opt = res_alias_to_append_opt(res_alias_);

boost::format formater(SCAN_OP_WITH_OID_TEMPLATE_STR);
formater % next_ctx_name % append_opt % ctx_.GraphVar() % label_id % oid;
return formater.str();
}
std::string scan_with_oid(const std::string& label_name,
const int32_t& label_id,
std::string scan_with_oid(const std::vector<int32_t>& label_ids,
const std::string& oid) const {
VLOG(10) << "Scan with dynamic param oid";
VLOG(10) << "Scan with oid: " << oid;
std::string next_ctx_name = ctx_.GetCurCtxName();
auto append_opt = res_alias_to_append_opt(res_alias_);

boost::format formater(SCAN_OP_WITH_OID_TEMPLATE_STR);
formater % next_ctx_name % append_opt % ctx_.GraphVar() % label_id % oid;
return formater.str();
if (label_ids.size() == 1) {
boost::format formater(SCAN_OP_WITH_OID_ONE_LABEL_TEMPLATE_STR);
formater % next_ctx_name % append_opt % ctx_.GraphVar() % label_ids[0] %
oid;
return formater.str();
} else {
boost::format formater(SCAN_OP_WITH_OID_MUL_LABEL_TEMPLATE_STR);
std::stringstream ss;
for (auto i = 0; i + 1 < label_ids.size(); ++i) {
ss << std::to_string(label_ids[i]) << ", ";
}
ss << std::to_string(label_ids[label_ids.size() - 1]);
formater % next_ctx_name % append_opt % ctx_.GraphVar() %
label_ids.size() % ss.str() % oid;
return formater.str();
}
}

std::string scan_without_expr(const std::vector<int32_t>& label_ids) const {
Expand Down Expand Up @@ -303,9 +295,13 @@ class ScanOpBuilder {
ctx_.GraphVar() % label_ids_str;
return formater.str();
}

BuildingContext& ctx_;
physical::Scan::ScanOpt scan_opt_;
algebra::QueryParams query_params_;
std::vector<int32_t> labels_ids_;
std::string expr_var_name_, expr_func_name_, expr_construct_params_,
selectors_str_; // The expression decode from params.
std::string oid_; // the oid decode from idx predicate, or param name.
int res_alias_;
};

Expand All @@ -322,11 +318,13 @@ static std::string BuildScanOp(
} else {
builder.resAlias(-1);
}
return builder.queryParams(scan_pb.params())
.idx_predicate(scan_pb.idx_predicate())
.Build();
builder.queryParams(scan_pb.params());
if (scan_pb.has_idx_predicate()) {
builder.idx_predicate(scan_pb.idx_predicate());
}
return builder.Build();
}

} // namespace gs

#endif // CODEGEN_SRC_HQPS_HQPS_SCAN_BUILDER_H_
#endif // CODEGEN_SRC_HQPS_HQPS_SCAN_BUILDER_H_
34 changes: 33 additions & 1 deletion flex/engines/hqps_db/core/operator/scan.h
Original file line number Diff line number Diff line change
Expand Up @@ -100,10 +100,42 @@ class Scan {
const label_id_t& v_label_id,
int64_t oid) {
std::vector<vertex_id_t> gids;
gids.emplace_back(graph.ScanVerticesWithOid(v_label_id, oid));
vertex_id_t vid;
if (graph.ScanVerticesWithOid(v_label_id, oid, vid)) {
gids.emplace_back(vid);
}
return make_default_row_vertex_set(std::move(gids), v_label_id);
}

/// @brief Scan vertex with oid
/// @param graph
/// @param v_label_ids
/// @param oid
/// @return
template <size_t num_labels>
static auto ScanVertexWithOid(
const GRAPH_INTERFACE& graph,
const std::array<label_id_t, num_labels>& v_label_ids, int64_t oid) {
std::vector<vertex_id_t> gids;
std::vector<label_id_t> labels_vec;
std::vector<grape::Bitset> bitsets;
vertex_id_t vid;
for (auto i = 0; i < num_labels; ++i) {
if (graph.ScanVerticesWithOid(v_label_ids[i], oid, vid)) {
labels_vec.emplace_back(v_label_ids[i]);
gids.emplace_back(vid);
}
}
bitsets.resize(labels_vec.size());
for (auto i = 0; i < bitsets.size(); ++i) {
bitsets[i].init(gids.size());
bitsets[i].set_bit(i);
}

return make_general_set(std::move(gids), std::move(labels_vec),
std::move(bitsets));
}

private:
template <typename EXPR, typename... SELECTOR, size_t num_labels>
static GeneralVertexSet<vertex_id_t, label_id_t>
Expand Down
Loading

0 comments on commit f813f9b

Please sign in to comment.