Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(interactive): Support Index predicate for dynamic params in IR-Core #3282

Merged
merged 8 commits into from
Oct 19, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
270 changes: 134 additions & 136 deletions flex/codegen/src/hqps/hqps_scan_builder.h
Original file line number Diff line number Diff line change
Expand Up @@ -54,8 +54,18 @@ static constexpr const char* SCAN_OP_TEMPLATE_NO_EXPR_STR =
/// 3. graph name
/// 4. vertex label
/// 5. oid
static constexpr const char* SCAN_OP_WITH_OID_TEMPLATE_STR =
"auto %1% = Engine::template ScanVertex<%2%>(%3%, %4%, %5%));\n";
static constexpr const char* SCAN_OP_WITH_OID_ONE_LABEL_TEMPLATE_STR =
"auto %1% = Engine::template ScanVertexWithOid<%2%>(%3%, %4%, %5%);\n";

/// Args
/// 1. res_ctx_name
/// 2. AppendOpt,
/// 3. graph name
/// 4. vertex label
/// 5. oid
static constexpr const char* SCAN_OP_WITH_OID_MUL_LABEL_TEMPLATE_STR =
"auto %1% = Engine::template ScanVertexWithOid<%2%>(%3%, "
"std::array<label_id_t, %4%>{%5%}, %6%);\n";

/**
* @brief When building scanOp, we ignore the data type provided in the pb.
Expand Down Expand Up @@ -85,16 +95,67 @@ class ScanOpBuilder {
if (!query_params.has_predicate()) {
VLOG(10) << "No expr in params";
}
query_params_ = query_params;
CHECK(labels_ids_.empty()) << "label ids should be empty";
if (!try_to_get_label_id_from_query_params(query_params, labels_ids_)) {
LOG(FATAL) << "fail to label id from expr";
}

// the user provide oid can be a const or a param const
if (query_params.has_predicate()) {
auto& predicate = query_params.predicate();
VLOG(10) << "predicate: " << predicate.DebugString();
// We first scan the predicate to find whether there is conditions on
// labels.
std::vector<int32_t> expr_label_ids;
if (try_to_get_label_ids_from_expr(predicate, expr_label_ids)) {
// join expr_label_ids with table_lable_ids;
VLOG(10) << "Found label ids in expr: "
<< gs::to_string(expr_label_ids);
intersection(labels_ids_, expr_label_ids);
}

auto expr_builder = ExprBuilder(ctx_);
expr_builder.set_return_type(common::DataType::BOOLEAN);
expr_builder.AddAllExprOpr(query_params.predicate().operators());

std::string expr_code;
std::vector<codegen::ParamConst> func_call_param_const;
std::vector<std::pair<int32_t, std::string>> expr_tag_props;
std::vector<common::DataType> unused_expr_ret_type;
std::tie(expr_func_name_, func_call_param_const, expr_tag_props,
expr_code, unused_expr_ret_type) = expr_builder.Build();
VLOG(10) << "Found expr in scan: " << expr_func_name_;
// generate code.
ctx_.AddExprCode(expr_code);
expr_var_name_ = ctx_.GetNextExprVarName();
{
std::stringstream ss;
for (auto i = 0; i < func_call_param_const.size(); ++i) {
ss << func_call_param_const[i].var_name;
if (i != func_call_param_const.size() - 1) {
ss << ",";
}
}
expr_construct_params_ = ss.str();
}
{
std::stringstream ss;
if (expr_tag_props.size() > 0) {
ss << ",";
for (auto i = 0; i + 1 < expr_tag_props.size(); ++i) {
ss << expr_tag_props[i].second << ", ";
}
ss << expr_tag_props[expr_tag_props.size() - 1].second;
}
selectors_str_ = ss.str();
}
}
return *this;
}

ScanOpBuilder& idx_predicate(const algebra::IndexPredicate& predicate) {
// check query_params not has predicate.
if (query_params_.has_predicate()) {
VLOG(10) << "query params already has predicate";
return *this;
}

// Currently we only support one predicate.
if (predicate.or_predicates_size() < 1) {
VLOG(10) << "No predicate in index predicate";
Expand All @@ -104,151 +165,82 @@ class ScanOpBuilder {
throw std::runtime_error(
std::string("Currently only support one predicate"));
}
CHECK(expr_func_name_.empty()) << "Predicate is already given by expr";
auto or_predicate = predicate.or_predicates(0);
if (or_predicate.predicates_size() != 1) {
throw std::runtime_error(
std::string("Currently only support one and predicate"));
}
auto triplet = or_predicate.predicates(0);
// add index predicate to query params
auto* new_predicate = query_params_.mutable_predicate();
{
auto first_op = new_predicate->add_operators();
common::Variable variable;
auto& property = triplet.key();
*(variable.mutable_property()) = property;
variable.mutable_node_type()->set_data_type(common::DataType::INT64);
*(first_op->mutable_var()) = variable;
}
{
auto second = new_predicate->add_operators();
second->set_logical(common::Logical::EQ);
second->mutable_node_type()->set_data_type(common::DataType::BOOLEAN);
}
{
auto third = new_predicate->add_operators();
auto& value = triplet.value();
third->mutable_node_type()->set_data_type(common::DataType::INT64);
*(third->mutable_const_()) = value;
auto& property = triplet.key();
if (triplet.value_case() == algebra::IndexPredicate::Triplet::kConst) {
// FUTURE: check property is really the primary key.
auto const_value = triplet.const_();
switch (const_value.item_case()) {
case common::Value::kI32:
oid_ = std::to_string(const_value.i32());
break;
case common::Value::kI64:
oid_ = std::to_string(const_value.i64());
break;
default:
LOG(FATAL) << "Currently only support int, long as primary key";
}
VLOG(1) << "Found oid: " << oid_ << " in index scan";
} else {
// dynamic param
auto dyn_param_pb = triplet.param();
auto param_const = param_const_pb_to_param_const(dyn_param_pb);
VLOG(10) << "receive param const in index predicate: "
<< dyn_param_pb.DebugString();
ctx_.AddParameterVar(param_const);
}
VLOG(10) << "Add index predicate to query params: "
<< query_params_.DebugString();

return *this;
}

std::string Build() const {
std::string label_name;
std::vector<int32_t> labels_ids;
if (!try_to_get_label_name_from_query_params(query_params_, label_name)) {
LOG(WARNING) << "fail to label name from expr";
if (!try_to_get_label_id_from_query_params(query_params_, labels_ids)) {
LOG(FATAL) << "fail to label id from expr";
}
}

// the user provide oid can be a const or a param const
if (query_params_.has_predicate()) {
auto& predicate = query_params_.predicate();
VLOG(10) << "predicate: " << predicate.DebugString();
// We first scan the predicate to find whether there is conditions on
// labels.
std::vector<int32_t> expr_label_ids;
if (try_to_get_label_ids_from_expr(predicate, expr_label_ids)) {
// join expr_label_ids with table_lable_ids;
VLOG(10) << "Found label ids in expr: "
<< gs::to_string(expr_label_ids);
intersection(labels_ids, expr_label_ids);
}
}
// CHECK(labels_ids.size() == 1) << "only support one label in scan";

#ifdef FAST_SCAN
gs::codegen::oid_t oid;
gs::codegen::ParamConst oid_param;
if (try_to_get_oid_from_expr(predicate, oid)) {
VLOG(10) << "Parse oid: " << oid << "from expr";
return scan_with_oid(label_name, label_id, oid);
} else if (try_to_get_oid_param_from_expr(predicate, oid_param)) {
VLOG(10) << "Parse oid param: " << oid_param.var_name << "from expr";
return scan_with_oid(label_name, label_id, oid_param.var_name);
// 1. If common expression predicate presents, scan with expression
if (!expr_func_name_.empty()) {
VLOG(1) << "Scan with expression";
return scan_with_expr(labels_ids_, expr_var_name_, expr_func_name_,
expr_construct_params_, selectors_str_);
} else {
VLOG(10) << "Fail to parse oid from expr";
{
#endif
if (query_params_.has_predicate()) {
auto expr_builder = ExprBuilder(ctx_);
expr_builder.set_return_type(common::DataType::BOOLEAN);
expr_builder.AddAllExprOpr(query_params_.predicate().operators());

std::string expr_func_name, expr_code;
std::vector<codegen::ParamConst> func_call_param_const;
std::vector<std::pair<int32_t, std::string>> expr_tag_props;
std::vector<common::DataType> unused_expr_ret_type;
std::tie(expr_func_name, func_call_param_const, expr_tag_props,
expr_code, unused_expr_ret_type) = expr_builder.Build();
VLOG(10) << "Found expr in scan: " << expr_func_name;
// generate code.
ctx_.AddExprCode(expr_code);
std::string expr_var_name = ctx_.GetNextExprVarName();
std::string
expr_construct_params; // function construction params and
std::string selectors_str; // selectors str, concatenated
{
std::stringstream ss;
for (auto i = 0; i < func_call_param_const.size(); ++i) {
ss << func_call_param_const[i].var_name;
if (i != func_call_param_const.size() - 1) {
ss << ",";
}
}
expr_construct_params = ss.str();
}
{
std::stringstream ss;
if (expr_tag_props.size() > 0) {
ss << ",";
for (auto i = 0; i + 1 < expr_tag_props.size(); ++i) {
ss << expr_tag_props[i].second << ", ";
}
ss << expr_tag_props[expr_tag_props.size() - 1].second;
}
selectors_str = ss.str();
}

// use expression to filter.
return scan_with_expr(labels_ids, expr_var_name, expr_func_name,
expr_construct_params, selectors_str);
} else {
return scan_without_expr(labels_ids);
}

#ifdef FAST_SCAN
// If oid_ not empty, scan with oid
if (!oid_.empty()) {
VLOG(1) << "Scan with oid: " << oid_;
return scan_with_oid(labels_ids_, oid_);
} else {
// If no oid, scan without expression
VLOG(1) << "Scan without expression";
return scan_without_expr(labels_ids_);
}
}
#endif
}

private:
std::string scan_with_oid(const std::string& label_name,
const int32_t& label_id, codegen::oid_t oid) const {
VLOG(10) << "Scan with fixed oid" << oid;
std::string next_ctx_name = ctx_.GetCurCtxName();
auto append_opt = res_alias_to_append_opt(res_alias_);

boost::format formater(SCAN_OP_WITH_OID_TEMPLATE_STR);
formater % next_ctx_name % append_opt % ctx_.GraphVar() % label_id % oid;
return formater.str();
}
std::string scan_with_oid(const std::string& label_name,
const int32_t& label_id,
std::string scan_with_oid(const std::vector<int32_t>& label_ids,
const std::string& oid) const {
VLOG(10) << "Scan with dynamic param oid";
VLOG(10) << "Scan with oid: " << oid;
std::string next_ctx_name = ctx_.GetCurCtxName();
auto append_opt = res_alias_to_append_opt(res_alias_);

boost::format formater(SCAN_OP_WITH_OID_TEMPLATE_STR);
formater % next_ctx_name % append_opt % ctx_.GraphVar() % label_id % oid;
return formater.str();
if (label_ids.size() == 1) {
boost::format formater(SCAN_OP_WITH_OID_ONE_LABEL_TEMPLATE_STR);
formater % next_ctx_name % append_opt % ctx_.GraphVar() % label_ids[0] %
oid;
return formater.str();
} else {
boost::format formater(SCAN_OP_WITH_OID_MUL_LABEL_TEMPLATE_STR);
std::stringstream ss;
for (auto i = 0; i + 1 < label_ids.size(); ++i) {
ss << std::to_string(label_ids[i]) << ", ";
}
ss << std::to_string(label_ids[label_ids.size() - 1]);
formater % next_ctx_name % append_opt % ctx_.GraphVar() %
label_ids.size() % ss.str() % oid;
return formater.str();
}
}

std::string scan_without_expr(const std::vector<int32_t>& label_ids) const {
Expand Down Expand Up @@ -303,9 +295,13 @@ class ScanOpBuilder {
ctx_.GraphVar() % label_ids_str;
return formater.str();
}

BuildingContext& ctx_;
physical::Scan::ScanOpt scan_opt_;
algebra::QueryParams query_params_;
std::vector<int32_t> labels_ids_;
std::string expr_var_name_, expr_func_name_, expr_construct_params_,
selectors_str_; // The expression decode from params.
std::string oid_; // the oid decode from idx predicate, or param name.
int res_alias_;
};

Expand All @@ -322,11 +318,13 @@ static std::string BuildScanOp(
} else {
builder.resAlias(-1);
}
return builder.queryParams(scan_pb.params())
.idx_predicate(scan_pb.idx_predicate())
.Build();
builder.queryParams(scan_pb.params());
if (scan_pb.has_idx_predicate()) {
builder.idx_predicate(scan_pb.idx_predicate());
}
return builder.Build();
}

} // namespace gs

#endif // CODEGEN_SRC_HQPS_HQPS_SCAN_BUILDER_H_
#endif // CODEGEN_SRC_HQPS_HQPS_SCAN_BUILDER_H_
34 changes: 33 additions & 1 deletion flex/engines/hqps_db/core/operator/scan.h
Original file line number Diff line number Diff line change
Expand Up @@ -100,10 +100,42 @@ class Scan {
const label_id_t& v_label_id,
int64_t oid) {
std::vector<vertex_id_t> gids;
gids.emplace_back(graph.ScanVerticesWithOid(v_label_id, oid));
vertex_id_t vid;
if (graph.ScanVerticesWithOid(v_label_id, oid, vid)) {
gids.emplace_back(vid);
}
return make_default_row_vertex_set(std::move(gids), v_label_id);
}

/// @brief Scan vertex with oid
/// @param graph
/// @param v_label_ids
/// @param oid
/// @return
template <size_t num_labels>
static auto ScanVertexWithOid(
const GRAPH_INTERFACE& graph,
const std::array<label_id_t, num_labels>& v_label_ids, int64_t oid) {
std::vector<vertex_id_t> gids;
std::vector<label_id_t> labels_vec;
std::vector<grape::Bitset> bitsets;
vertex_id_t vid;
for (auto i = 0; i < num_labels; ++i) {
if (graph.ScanVerticesWithOid(v_label_ids[i], oid, vid)) {
labels_vec.emplace_back(v_label_ids[i]);
gids.emplace_back(vid);
}
}
bitsets.resize(labels_vec.size());
for (auto i = 0; i < bitsets.size(); ++i) {
bitsets[i].init(gids.size());
bitsets[i].set_bit(i);
}

return make_general_set(std::move(gids), std::move(labels_vec),
std::move(bitsets));
}

private:
template <typename EXPR, typename... SELECTOR, size_t num_labels>
static GeneralVertexSet<vertex_id_t, label_id_t>
Expand Down
Loading
Loading