diff --git a/flex/bin/CMakeLists.txt b/flex/bin/CMakeLists.txt index 4a007d692316..34f92b96bd2e 100644 --- a/flex/bin/CMakeLists.txt +++ b/flex/bin/CMakeLists.txt @@ -15,6 +15,11 @@ add_executable(rt_admin rt_admin.cc) target_link_libraries(rt_admin flex_utils flex_rt_mutable_graph flex_graph_db) install_without_export_flex_target(rt_admin) + +add_executable(adhoc_runner adhoc_runner.cc) +target_link_libraries(adhoc_runner flex_utils flex_graph_db) +install_without_export_flex_target(adhoc_runner) + add_executable(flex_analytical_engine flex_analytical_engine.cc) target_link_libraries(flex_analytical_engine flex_immutable_graph flex_bsp ${GLOG_LIBRARIES} ${GFLAGS_LIBRARIES}) install_without_export_flex_target(flex_analytical_engine) @@ -36,3 +41,7 @@ include_directories(${Boost_INCLUDE_DIRS}) add_executable(bulk_loader bulk_loader.cc) target_link_libraries(bulk_loader flex_rt_mutable_graph flex_utils ${GLOG_LIBRARIES} ${GFLAGS_LIBRARIES} ${Boost_LIBRARIES}) install_without_export_flex_target(bulk_loader) + +add_executable(stored_procedure_runner stored_procedure_runner.cc) +target_link_libraries(stored_procedure_runner flex_rt_mutable_graph flex_utils flex_graph_db ${GLOG_LIBRARIES} ${GFLAGS_LIBRARIES} ${Boost_LIBRARIES}) +install_without_export_flex_target(stored_procedure_runner) \ No newline at end of file diff --git a/flex/bin/adhoc_runner.cc b/flex/bin/adhoc_runner.cc new file mode 100644 index 000000000000..88f49b3f20fb --- /dev/null +++ b/flex/bin/adhoc_runner.cc @@ -0,0 +1,207 @@ +/** Copyright 2020 Alibaba Group Holding Limited. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include "grape/util.h" + +#include +#include +#include +#include +#include "flex/engines/graph_db/database/graph_db.h" +#include "flex/engines/graph_db/runtime/adhoc/runtime.h" + +namespace bpo = boost::program_options; + +std::string read_pb(const std::string& filename) { + std::ifstream file(filename, std::ios::binary); + + if (!file.is_open()) { + LOG(FATAL) << "open pb file: " << filename << " failed..."; + return ""; + } + + file.seekg(0, std::ios::end); + size_t size = file.tellg(); + file.seekg(0, std::ios::beg); + + std::string buffer; + buffer.resize(size); + + file.read(&buffer[0], size); + + file.close(); + + return buffer; +} + +void load_params(const std::string& filename, + std::vector>& map) { + std::ifstream in(filename); + if (!in.is_open()) { + LOG(FATAL) << "open params file: " << filename << " failed..."; + return; + } + std::string line; + std::vector keys; + std::getline(in, line); + std::stringstream ss(line); + std::string key; + while (std::getline(ss, key, '|')) { + keys.push_back(key); + LOG(INFO) << key; + } + while (std::getline(in, line)) { + std::map m; + std::stringstream ss(line); + std::string value; + for (auto& key : keys) { + std::getline(ss, value, '|'); + m[key] = value; + } + map.push_back(m); + } +} + +int main(int argc, char** argv) { + bpo::options_description desc("Usage:"); + desc.add_options()("help", "Display help message")( + "version,v", "Display version")("shard-num,s", + bpo::value()->default_value(1), + "shard number of actor system")( + "data-path,d", bpo::value(), "data directory path")( + "graph-config,g", bpo::value(), "graph schema config file")( + "query-file,q", bpo::value(), "query file")( + "params_file,p", bpo::value(), "params file")( + "query-num,n", bpo::value()->default_value(0))( + "output-file,o", bpo::value(), "output file"); + + google::InitGoogleLogging(argv[0]); + FLAGS_logtostderr = true; + + bpo::variables_map vm; + bpo::store(bpo::command_line_parser(argc, argv).options(desc).run(), vm); + bpo::notify(vm); + + if (vm.count("help")) { + std::cout << desc << std::endl; + return 0; + } + if (vm.count("version")) { + std::cout << "GraphScope/Flex version " << FLEX_VERSION << std::endl; + return 0; + } + + uint32_t shard_num = vm["shard-num"].as(); + + std::string graph_schema_path = ""; + std::string data_path = ""; + std::string output_path = ""; + int query_num = vm["query-num"].as(); + + if (!vm.count("graph-config")) { + LOG(ERROR) << "graph-config is required"; + return -1; + } + graph_schema_path = vm["graph-config"].as(); + if (!vm.count("data-path")) { + LOG(ERROR) << "data-path is required"; + return -1; + } + data_path = vm["data-path"].as(); + if (vm.count("output-file")) { + output_path = vm["output-file"].as(); + } + + setenv("TZ", "Asia/Shanghai", 1); + tzset(); + + double t0 = -grape::GetCurrentTime(); + auto& db = gs::GraphDB::get(); + + auto schema = gs::Schema::LoadFromYaml(graph_schema_path); + if (!schema.ok()) { + LOG(ERROR) << "Failed to load graph schema from " << graph_schema_path; + return -1; + } + db.Open(schema.value(), data_path, shard_num); + + t0 += grape::GetCurrentTime(); + + LOG(INFO) << "Finished loading graph, elapsed " << t0 << " s"; + std::string req_file = vm["query-file"].as(); + std::string query = read_pb(req_file); + auto txn = db.GetReadTransaction(); + std::vector> map; + load_params(vm["params_file"].as(), map); + size_t params_num = map.size(); + + physical::PhysicalPlan pb; + pb.ParseFromString(query); + + if (query_num == 0) { + query_num = params_num; + } + std::vector> outputs(query_num); + + double t1 = -grape::GetCurrentTime(); + for (int i = 0; i < query_num; ++i) { + auto& m = map[i % params_num]; + auto ctx = gs::runtime::runtime_eval(pb, txn, m); + gs::Encoder output(outputs[i]); + gs::runtime::eval_sink(ctx, txn, output); + } + t1 += grape::GetCurrentTime(); + + double t2 = -grape::GetCurrentTime(); + for (int i = 0; i < query_num; ++i) { + auto& m = map[i % params_num]; + auto ctx = gs::runtime::runtime_eval(pb, txn, m); + outputs[i].clear(); + gs::Encoder output(outputs[i]); + gs::runtime::eval_sink(ctx, txn, output); + } + t2 += grape::GetCurrentTime(); + + double t3 = -grape::GetCurrentTime(); + for (int i = 0; i < query_num; ++i) { + auto& m = map[i % params_num]; + auto ctx = gs::runtime::runtime_eval(pb, txn, m); + outputs[i].clear(); + gs::Encoder output(outputs[i]); + gs::runtime::eval_sink(ctx, txn, output); + } + t3 += grape::GetCurrentTime(); + + LOG(INFO) << "Finished run " << query_num << " queries, elapsed " << t1 + << " s, avg " << t1 / static_cast(query_num) * 1000000 + << " us"; + LOG(INFO) << "Finished run " << query_num << " queries, elapsed " << t2 + << " s, avg " << t2 / static_cast(query_num) * 1000000 + << " us"; + LOG(INFO) << "Finished run " << query_num << " queries, elapsed " << t3 + << " s, avg " << t3 / static_cast(query_num) * 1000000 + << " us"; + + if (!output_path.empty()) { + FILE* fout = fopen(output_path.c_str(), "a"); + for (auto& output : outputs) { + fwrite(output.data(), sizeof(char), output.size(), fout); + } + fflush(fout); + fclose(fout); + } + + return 0; +} diff --git a/flex/bin/stored_procedure_runner.cc b/flex/bin/stored_procedure_runner.cc new file mode 100644 index 000000000000..a3315ac05f17 --- /dev/null +++ b/flex/bin/stored_procedure_runner.cc @@ -0,0 +1,212 @@ +/** Copyright 2020 Alibaba Group Holding Limited. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include "grape/util.h" + +#include "flex/engines/graph_db/database/graph_db.h" +#include "flex/engines/http_server/options.h" + +#include +#include + +#include + +using namespace server; +namespace bpo = boost::program_options; + +std::vector> parse_query_file(const std::string& fname) { + std::vector> ret; + std::ifstream fin(fname); + std::string line; + + std::vector types; + + while (std::getline(fin, line)) { + std::vector tokens; + std::stringstream ss(line); + std::string token; + while (std::getline(ss, token, '|')) { + tokens.push_back(token); + } + if (types.empty()) { + types = tokens; + } else { + CHECK_EQ(tokens.size(), types.size()); + std::vector buf; + gs::Encoder encoder(buf); + size_t n = tokens.size(); + for (size_t k = 0; k < n; ++k) { + if (types[k] == "INT") { + int val = std::stoi(tokens[k]); + encoder.put_int(val); + } else if (types[k] == "LONG") { + int64_t val = std::stoll(tokens[k]); + encoder.put_long(val); + } else if (types[k] == "STRING") { + encoder.put_string(tokens[k]); + } else { + LOG(FATAL) << "unrecognize type: " << types[k]; + } + } + ret.emplace_back(std::move(buf)); + } + } + return ret; +} + +int main(int argc, char** argv) { + bpo::options_description desc("Usage:"); + desc.add_options()("help", "Display help message")("version,v", + "Display version")( + "graph-config,g", bpo::value(), "graph schema config file")( + "data-path,d", bpo::value(), "data directory path")( + "memory-level,m", bpo::value()->default_value(1))( + "stored-procedure-lib,l", bpo::value(), + "stored procedure library path")( + "query-file,q", bpo::value(), "query parameters file")( + "query-num,n", bpo::value()->default_value(0))( + "output-file,o", bpo::value(), "output file"); + google::InitGoogleLogging(argv[0]); + FLAGS_logtostderr = true; + + bpo::variables_map vm; + bpo::store(bpo::command_line_parser(argc, argv).options(desc).run(), vm); + bpo::notify(vm); + + if (vm.count("help")) { + std::cout << desc << std::endl; + return 0; + } + if (vm.count("version")) { + std::cout << "GraphScope/Flex version " << FLEX_VERSION << std::endl; + return 0; + } + + int memory_level = vm["memory-level"].as(); + + std::string graph_schema_path = ""; + std::string data_path = ""; + std::string plugin_path = ""; + std::string query_file_path = ""; + std::string output_path = ""; + int query_num = vm["query-num"].as(); + + if (!vm.count("graph-config")) { + LOG(ERROR) << "graph-config is required"; + return -1; + } + graph_schema_path = vm["graph-config"].as(); + if (!vm.count("data-path")) { + LOG(ERROR) << "data-path is required"; + return -1; + } + data_path = vm["data-path"].as(); + if (!vm.count("stored-procedure-lib")) { + LOG(ERROR) << "stored-procedure-lib is required"; + return -1; + } + plugin_path = vm["stored-procedure-lib"].as(); + if (!vm.count("query-file")) { + LOG(ERROR) << "query-file is required"; + return -1; + } + query_file_path = vm["query-file"].as(); + if (vm.count("output-file")) { + output_path = vm["output-file"].as(); + } + + setenv("TZ", "Asia/Shanghai", 1); + tzset(); + + double t0 = -grape::GetCurrentTime(); + auto& db = gs::GraphDB::get(); + + auto schema = gs::Schema::LoadFromYaml(graph_schema_path); + if (!schema.ok()) { + LOG(FATAL) << "Failed to load schema: " << schema.status().error_message(); + } + gs::GraphDBConfig config(schema.value(), data_path, 1); + config.memory_level = memory_level; + if (config.memory_level >= 2) { + config.enable_auto_compaction = true; + } + db.Open(config); + + t0 += grape::GetCurrentTime(); + + LOG(INFO) << "Finished loading graph, elapsed " << t0 << " s"; + + auto app_factory = std::make_shared(plugin_path); + auto app_wrapper = app_factory->CreateApp(db); + auto app = app_wrapper.app(); + + auto parameters = parse_query_file(query_file_path); + + auto& session = db.GetSession(0); + if (query_num == 0) { + query_num = parameters.size(); + } + std::vector> outputs(query_num); + + double t1 = -grape::GetCurrentTime(); + for (int i = 0; i < query_num; ++i) { + auto& parameter = parameters[i % parameters.size()]; + gs::Decoder input(parameter.data(), parameter.size()); + gs::Encoder output(outputs[i]); + app->run(session, input, output); + } + t1 += grape::GetCurrentTime(); + + double t2 = -grape::GetCurrentTime(); + for (int i = 0; i < query_num; ++i) { + auto& parameter = parameters[i % parameters.size()]; + gs::Decoder input(parameter.data(), parameter.size()); + outputs[i].clear(); + gs::Encoder output(outputs[i]); + app->run(session, input, output); + } + t2 += grape::GetCurrentTime(); + + double t3 = -grape::GetCurrentTime(); + for (int i = 0; i < query_num; ++i) { + auto& parameter = parameters[i % parameters.size()]; + gs::Decoder input(parameter.data(), parameter.size()); + outputs[i].clear(); + gs::Encoder output(outputs[i]); + app->run(session, input, output); + } + t3 += grape::GetCurrentTime(); + + LOG(INFO) << "Finished run " << query_num << " queries, elapsed " << t1 + << " s, avg " << t1 / static_cast(query_num) * 1000000 + << " us"; + LOG(INFO) << "Finished run " << query_num << " queries, elapsed " << t2 + << " s, avg " << t2 / static_cast(query_num) * 1000000 + << " us"; + LOG(INFO) << "Finished run " << query_num << " queries, elapsed " << t3 + << " s, avg " << t3 / static_cast(query_num) * 1000000 + << " us"; + + if (!output_path.empty()) { + FILE* fout = fopen(output_path.c_str(), "a"); + for (auto& output : outputs) { + fwrite(output.data(), sizeof(char), output.size(), fout); + } + fflush(fout); + fclose(fout); + } + + return 0; +} \ No newline at end of file diff --git a/flex/engines/graph_db/runtime/adhoc/expr_impl.cc b/flex/engines/graph_db/runtime/adhoc/expr_impl.cc index 47bef9a153e0..7df4cdbeed15 100644 --- a/flex/engines/graph_db/runtime/adhoc/expr_impl.cc +++ b/flex/engines/graph_db/runtime/adhoc/expr_impl.cc @@ -450,6 +450,9 @@ static RTAny parse_param(const common::DynamicParam& param, } else if (dt == common::DataType::INT32) { int val = std::stoi(input.at(name)); return RTAny::from_int32(val); + } else if (dt == common::DataType::INT64) { + int64_t val = std::stoll(input.at(name)); + return RTAny::from_int64(val); } LOG(FATAL) << "not support type: " << common::DataType_Name(dt); diff --git a/flex/engines/graph_db/runtime/adhoc/operators/edge_expand.cc b/flex/engines/graph_db/runtime/adhoc/operators/edge_expand.cc index 1f671b826c7c..d8d82d5e7302 100644 --- a/flex/engines/graph_db/runtime/adhoc/operators/edge_expand.cc +++ b/flex/engines/graph_db/runtime/adhoc/operators/edge_expand.cc @@ -27,9 +27,6 @@ Context eval_edge_expand(const physical::EdgeExpand& opr, const ReadTransaction& txn, Context&& ctx, const std::map& params, const physical::PhysicalOpr_MetaData& meta) { - if (ctx.row_num() == 0) { - return ctx; - } int v_tag; if (!opr.has_v_tag()) { v_tag = -1; diff --git a/flex/engines/graph_db/runtime/adhoc/operators/get_v.cc b/flex/engines/graph_db/runtime/adhoc/operators/get_v.cc index c964099e5101..aec1fd39d6ff 100644 --- a/flex/engines/graph_db/runtime/adhoc/operators/get_v.cc +++ b/flex/engines/graph_db/runtime/adhoc/operators/get_v.cc @@ -42,9 +42,6 @@ VOpt parse_opt(const physical::GetV_VOpt& opt) { Context eval_get_v(const physical::GetV& opr, const ReadTransaction& txn, Context&& ctx, const std::map& params) { - if (ctx.row_num() == 0) { - return ctx; - } int tag = -1; if (opr.has_tag()) { tag = opr.tag().value(); diff --git a/flex/engines/graph_db/runtime/adhoc/operators/group_by.cc b/flex/engines/graph_db/runtime/adhoc/operators/group_by.cc index aedf96058238..aa12480e3e92 100644 --- a/flex/engines/graph_db/runtime/adhoc/operators/group_by.cc +++ b/flex/engines/graph_db/runtime/adhoc/operators/group_by.cc @@ -493,9 +493,6 @@ std::shared_ptr apply_reduce( Context eval_group_by(const physical::GroupBy& opr, const ReadTransaction& txn, Context&& ctx) { - if (ctx.row_num() == 0) { - return ctx; - } std::vector functions; std::vector mappings; int func_num = opr.functions_size(); diff --git a/flex/engines/graph_db/runtime/adhoc/operators/limit.cc b/flex/engines/graph_db/runtime/adhoc/operators/limit.cc index 1b7fc7300e5a..0fe37a290229 100644 --- a/flex/engines/graph_db/runtime/adhoc/operators/limit.cc +++ b/flex/engines/graph_db/runtime/adhoc/operators/limit.cc @@ -28,9 +28,6 @@ Context eval_limit(const algebra::Limit& opr, Context&& ctx) { upper = std::min(upper, static_cast(opr.range().upper())); } - if (lower >= upper) { - return Context(); - } if (lower == 0 && static_cast(upper) == ctx.row_num()) { return std::move(ctx); } diff --git a/flex/engines/graph_db/runtime/adhoc/operators/order_by.cc b/flex/engines/graph_db/runtime/adhoc/operators/order_by.cc index 424707c6054c..88e042ec55b9 100644 --- a/flex/engines/graph_db/runtime/adhoc/operators/order_by.cc +++ b/flex/engines/graph_db/runtime/adhoc/operators/order_by.cc @@ -59,9 +59,6 @@ class GeneralComparer { Context eval_order_by(const algebra::OrderBy& opr, const ReadTransaction& txn, Context&& ctx) { - if (ctx.row_num() == 0) { - return ctx; - } int lower = 0; int upper = std::numeric_limits::max(); if (opr.has_limit()) { diff --git a/flex/engines/graph_db/runtime/adhoc/operators/path_expand.cc b/flex/engines/graph_db/runtime/adhoc/operators/path_expand.cc index 4341e556b4c9..f374c0261f1a 100644 --- a/flex/engines/graph_db/runtime/adhoc/operators/path_expand.cc +++ b/flex/engines/graph_db/runtime/adhoc/operators/path_expand.cc @@ -26,9 +26,6 @@ Context eval_path_expand_v(const physical::PathExpand& opr, const std::map& params, const physical::PhysicalOpr_MetaData& meta, int alias) { - if (ctx.row_num() == 0) { - return ctx; - } CHECK(opr.has_start_tag()); int start_tag = opr.start_tag().value(); CHECK(opr.path_opt() == diff --git a/flex/engines/graph_db/runtime/adhoc/operators/project.cc b/flex/engines/graph_db/runtime/adhoc/operators/project.cc index a56d1c52d217..371faa2531fe 100644 --- a/flex/engines/graph_db/runtime/adhoc/operators/project.cc +++ b/flex/engines/graph_db/runtime/adhoc/operators/project.cc @@ -46,9 +46,6 @@ Context eval_project(const physical::Project& opr, const ReadTransaction& txn, Context&& ctx, const std::map& params, const std::vector& data_types) { - if (ctx.row_num() == 0) { - return ctx; - } bool is_append = opr.is_append(); Context ret; if (is_append) { diff --git a/flex/engines/graph_db/runtime/adhoc/operators/sink.cc b/flex/engines/graph_db/runtime/adhoc/operators/sink.cc index ead64bbd39dd..64d97c57cd00 100644 --- a/flex/engines/graph_db/runtime/adhoc/operators/sink.cc +++ b/flex/engines/graph_db/runtime/adhoc/operators/sink.cc @@ -36,7 +36,7 @@ void eval_sink(const Context& ctx, const ReadTransaction& txn, val.sink(txn, j, column); } } - LOG(INFO) << "sink: " << results.DebugString(); + // LOG(INFO) << "sink: " << results.DebugString(); auto res = results.SerializeAsString(); output.put_bytes(res.data(), res.size()); } diff --git a/flex/engines/graph_db/runtime/adhoc/utils.cc b/flex/engines/graph_db/runtime/adhoc/utils.cc index 14799f55d556..0db740607b16 100644 --- a/flex/engines/graph_db/runtime/adhoc/utils.cc +++ b/flex/engines/graph_db/runtime/adhoc/utils.cc @@ -215,9 +215,24 @@ std::shared_ptr build_optional_column( return builder.finish(); } break; + case common::DataType::TIMESTAMP: { + OptionalValueColumnBuilder builder; + builder.reserve(row_num); + for (size_t i = 0; i < row_num; ++i) { + auto v = expr.eval_path(i, 0); + if (v.is_null()) { + builder.push_back_null(); + } else { + builder.push_back_opt(v.as_date32(), true); + } + } + + return builder.finish(); + } break; default: { - LOG(FATAL) << "not support"; + LOG(FATAL) << "not support" + << common::DataType_Name(data_type.data_type()); break; } } diff --git a/flex/engines/graph_db/runtime/adhoc/var.cc b/flex/engines/graph_db/runtime/adhoc/var.cc index f2fa916109a2..bb38d3fab61e 100644 --- a/flex/engines/graph_db/runtime/adhoc/var.cc +++ b/flex/engines/graph_db/runtime/adhoc/var.cc @@ -48,6 +48,7 @@ Var::Var(const ReadTransaction& txn, const Context& ctx, } if (pb.has_tag() || var_type == VarType::kPathVar) { + CHECK(ctx.get(tag) != nullptr) << "tag not found - " << tag; if (ctx.get(tag)->column_type() == ContextColumnType::kVertex) { if (pb.has_property()) { auto& pt = pb.property(); diff --git a/flex/engines/graph_db/runtime/common/context.cc b/flex/engines/graph_db/runtime/common/context.cc index 65cb7050633a..5a01d79078c3 100644 --- a/flex/engines/graph_db/runtime/common/context.cc +++ b/flex/engines/graph_db/runtime/common/context.cc @@ -148,20 +148,20 @@ void Context::reshuffle(const std::vector& offsets) { } std::shared_ptr Context::get(int alias) { - assert(static_cast(alias) < columns.size()); if (alias == -1) { return head; - } else { - return columns[alias]; } + assert(static_cast(alias) < columns.size()); + return columns[alias]; } const std::shared_ptr Context::get(int alias) const { - assert(static_cast(alias) < columns.size()); if (alias == -1) { assert(head != nullptr); return head; - } else { + } + assert(static_cast(alias) < columns.size()); + { assert(columns[alias] != nullptr); return columns[alias]; } diff --git a/flex/engines/graph_db/runtime/common/operators/get_v.h b/flex/engines/graph_db/runtime/common/operators/get_v.h index 499616b57048..6d9c2f05c6e1 100644 --- a/flex/engines/graph_db/runtime/common/operators/get_v.h +++ b/flex/engines/graph_db/runtime/common/operators/get_v.h @@ -142,6 +142,8 @@ class GetV { auto labels = extract_labels(input_edge_list.get_labels(), params.tables, opt); if (labels.size() == 0) { + MLVertexColumnBuilder builder; + ctx.set_with_reshuffle(params.alias, builder.finish(), {}); return ctx; } if (labels.size() > 1) { diff --git a/flex/engines/graph_db/runtime/common/operators/join.cc b/flex/engines/graph_db/runtime/common/operators/join.cc index 0441c5b51772..04e9dcd6cc41 100644 --- a/flex/engines/graph_db/runtime/common/operators/join.cc +++ b/flex/engines/graph_db/runtime/common/operators/join.cc @@ -111,7 +111,7 @@ Context Join::join(Context&& ctx, Context&& ctx2, const JoinParams& params) { } else if (params.join_type == JoinKind::kLeftOuterJoin) { size_t right_size = ctx2.row_num(); auto right_col = ctx2.get(params.right_columns[0]); - CHECK(right_col->column_type() == ContextColumnType::kVertex); + // CHECK(right_col->column_type() == ContextColumnType::kVertex); std::map> right_map; for (size_t r_i = 0; r_i < right_size; r_i++) { diff --git a/flex/engines/graph_db/runtime/common/rt_any.cc b/flex/engines/graph_db/runtime/common/rt_any.cc index 7d54e1057792..bf26fc7bec74 100644 --- a/flex/engines/graph_db/runtime/common/rt_any.cc +++ b/flex/engines/graph_db/runtime/common/rt_any.cc @@ -488,37 +488,39 @@ bool RTAny::operator==(const RTAny& other) const { } RTAny RTAny::operator+(const RTAny& other) const { - // CHECK(type_ == other.type_); - if (type_ == RTAnyType::kI64Value) { - return RTAny::from_int64(value_.i64_val + other.value_.i64_val); - } - if (type_ == RTAnyType::kI64Value && other.type_ == RTAnyType::kI32Value) { return RTAny::from_int64(value_.i64_val + other.value_.i32_val); } else if (type_ == RTAnyType::kI32Value && other.type_ == RTAnyType::kI64Value) { return RTAny::from_int64(value_.i32_val * 1l + other.value_.i64_val); - } else if (type_ == RTAnyType::kF64Value) { + } + if (type_ == RTAnyType::kF64Value) { return RTAny::from_double(value_.f64_val + other.value_.f64_val); + } else if (type_ == RTAnyType::kI64Value) { + return RTAny::from_int64(value_.i64_val + other.value_.i64_val); + } else if (type_ == RTAnyType::kI32Value) { + return RTAny::from_int32(value_.i32_val + other.value_.i32_val); } - LOG(FATAL) << "not support"; + LOG(FATAL) << "not support" << static_cast(type_.type_enum_); return RTAny(); } RTAny RTAny::operator-(const RTAny& other) const { // CHECK(type_ == other.type_); - if (type_ == RTAnyType::kI64Value) { - return RTAny::from_int64(value_.i64_val - other.value_.i64_val); - } if (type_ == RTAnyType::kI64Value && other.type_ == RTAnyType::kI32Value) { return RTAny::from_int64(value_.i64_val - other.value_.i32_val); } else if (type_ == RTAnyType::kI32Value && other.type_ == RTAnyType::kI64Value) { return RTAny::from_int64(value_.i32_val * 1l - other.value_.i64_val); - } else if (type_ == RTAnyType::kF64Value) { + } + if (type_ == RTAnyType::kF64Value) { return RTAny::from_double(value_.f64_val - other.value_.f64_val); + } else if (type_ == RTAnyType::kI64Value) { + return RTAny::from_int64(value_.i64_val - other.value_.i64_val); + } else if (type_ == RTAnyType::kI32Value) { + return RTAny::from_int32(value_.i32_val - other.value_.i32_val); } LOG(FATAL) << "not support"; return RTAny(); @@ -526,9 +528,6 @@ RTAny RTAny::operator-(const RTAny& other) const { RTAny RTAny::operator/(const RTAny& other) const { // CHECK(type_ == other.type_); - if (type_ == RTAnyType::kI64Value) { - return RTAny::from_int64(value_.i64_val / other.value_.i64_val); - } if (type_ == RTAnyType::kI64Value && other.type_ == RTAnyType::kI32Value) { return RTAny::from_int64(value_.i64_val / other.value_.i32_val); @@ -536,6 +535,14 @@ RTAny RTAny::operator/(const RTAny& other) const { other.type_ == RTAnyType::kI64Value) { return RTAny::from_int64(value_.i32_val * 1l / other.value_.i64_val); } + + if (type_ == RTAnyType::kI64Value) { + return RTAny::from_int64(value_.i64_val / other.value_.i64_val); + } else if (type_ == RTAnyType::kF64Value) { + return RTAny::from_double(value_.f64_val / other.value_.f64_val); + } else if (type_ == RTAnyType::kI32Value) { + return RTAny::from_int32(value_.i32_val / other.value_.i32_val); + } LOG(FATAL) << "not support"; return RTAny(); } @@ -557,6 +564,12 @@ void RTAny::sink_impl(common::Value* value) const { value->set_f64(value_.f64_val); } else if (type_ == RTAnyType::kList) { LOG(FATAL) << "not support list sink"; + } else if (type_ == RTAnyType::kTuple) { + auto tup = value_.t; + for (size_t i = 0; i < value_.t.size(); ++i) { + std::string s = tup.get(i).to_string(); + value->mutable_str_array()->add_item(s.data(), s.size()); + } } else { LOG(FATAL) << "not implemented for " << static_cast(type_.type_enum_); }