Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor(interactive): Add error handling for adhoc queries #4188

Merged
merged 14 commits into from
Sep 6, 2024
2 changes: 1 addition & 1 deletion .devcontainer/devcontainer.json
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
{
"name": "GraphScope",
// Or use a Dockerfile or Docker Compose file. More info: https://containers.dev/guide/dockerfile
"image": "registry.cn-hongkong.aliyuncs.com/graphscope/graphscope-dev:latest",
"image": "registry.cn-hongkong.aliyuncs.com/graphscope/graphscope-dev:v0.24.2-amd64",

// Features to add to the dev container. More info: https://containers.dev/features.
"features": {
Expand Down
2 changes: 1 addition & 1 deletion .github/workflows/interactive.yml
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ jobs:
runs-on: ubuntu-20.04
if: ${{ github.repository == 'alibaba/GraphScope' }}
container:
image: registry.cn-hongkong.aliyuncs.com/graphscope/graphscope-dev:v0.23.0
image: registry.cn-hongkong.aliyuncs.com/graphscope/graphscope-dev:v0.24.2-amd64
steps:
- uses: actions/checkout@v4

Expand Down
33 changes: 30 additions & 3 deletions flex/bin/adhoc_runner.cc
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
#include "flex/engines/graph_db/runtime/adhoc/runtime.h"

namespace bpo = boost::program_options;
namespace bl = boost::leaf;

std::string read_pb(const std::string& filename) {
std::ifstream file(filename, std::ios::binary);
Expand Down Expand Up @@ -74,6 +75,32 @@ void load_params(const std::string& filename,
}
}

gs::runtime::Context eval_plan(
const physical::PhysicalPlan& plan, gs::ReadTransaction& txn,
const std::map<std::string, std::string>& params) {
gs::runtime::Context ctx;
{
ctx = bl::try_handle_all(
[&plan, &txn, &params]() {
return gs::runtime::runtime_eval(plan, txn, params);
},
[&ctx](const gs::Status& err) {
LOG(FATAL) << "Error in execution: " << err.error_message();
return ctx;
},
[&](const bl::error_info& err) {
LOG(FATAL) << "boost leaf error: " << err.error().value() << ", "
<< err.exception()->what();
return ctx;
},
[&]() {
LOG(FATAL) << "Unknown error in execution";
return ctx;
});
}
return ctx;
}

int main(int argc, char** argv) {
bpo::options_description desc("Usage:");
desc.add_options()("help", "Display help message")(
Expand Down Expand Up @@ -158,7 +185,7 @@ int main(int argc, char** argv) {
double t1 = -grape::GetCurrentTime();
for (int i = 0; i < query_num; ++i) {
auto& m = map[i % params_num];
auto ctx = gs::runtime::runtime_eval(pb, txn, m);
auto ctx = eval_plan(pb, txn, m);
gs::Encoder output(outputs[i]);
gs::runtime::eval_sink(ctx, txn, output);
}
Expand All @@ -167,7 +194,7 @@ int main(int argc, char** argv) {
double t2 = -grape::GetCurrentTime();
for (int i = 0; i < query_num; ++i) {
auto& m = map[i % params_num];
auto ctx = gs::runtime::runtime_eval(pb, txn, m);
auto ctx = eval_plan(pb, txn, m);
outputs[i].clear();
gs::Encoder output(outputs[i]);
gs::runtime::eval_sink(ctx, txn, output);
Expand All @@ -177,7 +204,7 @@ int main(int argc, char** argv) {
double t3 = -grape::GetCurrentTime();
for (int i = 0; i < query_num; ++i) {
auto& m = map[i % params_num];
auto ctx = gs::runtime::runtime_eval(pb, txn, m);
auto ctx = eval_plan(pb, txn, m);
outputs[i].clear();
gs::Encoder output(outputs[i]);
gs::runtime::eval_sink(ctx, txn, output);
Expand Down
51 changes: 48 additions & 3 deletions flex/engines/graph_db/app/adhoc_app.cc
Original file line number Diff line number Diff line change
@@ -1,10 +1,27 @@
#include "flex/engines/graph_db/app/adhoc_app.h"
/** Copyright 2020 Alibaba Group Holding Limited.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

#include "flex/engines/graph_db/app/adhoc_app.h"
#include "flex/engines/graph_db/runtime/adhoc/operators/operators.h"
#include "flex/engines/graph_db/runtime/adhoc/runtime.h"

#include "flex/proto_generated_gie/physical.pb.h"

#include <string>

namespace bl = boost::leaf;

namespace gs {

bool AdhocReadApp::Query(const GraphDBSession& graph, Decoder& input,
Expand All @@ -20,7 +37,35 @@ bool AdhocReadApp::Query(const GraphDBSession& graph, Decoder& input,

LOG(INFO) << "plan: " << plan.DebugString();

auto ctx = runtime::runtime_eval(plan, txn, {});
gs::runtime::Context ctx;
gs::Status status = gs::Status::OK();
{
ctx = bl::try_handle_all(
[&plan, &txn]() { return runtime::runtime_eval(plan, txn, {}); },
[&ctx, &status](const gs::Status& err) {
status = err;
return ctx;
},
[&](const bl::error_info& err) {
status = gs::Status(
gs::StatusCode::INTERNAL_ERROR,
"BOOST LEAF Error: " + std::to_string(err.error().value()) +
", Exception: " + err.exception()->what());
return ctx;
},
[&]() {
status = gs::Status(gs::StatusCode::UNKNOWN, "Unknown error");
return ctx;
});
}

if (!status.ok()) {
LOG(ERROR) << "Error: " << status.ToString();
// We encode the error message to the output, so that the client can
// get the error message.
output.put_string(status.ToString());
return false;
}

runtime::eval_sink(ctx, txn, output);

Expand Down
18 changes: 14 additions & 4 deletions flex/engines/graph_db/database/graph_db_session.cc
Original file line number Diff line number Diff line change
Expand Up @@ -166,10 +166,20 @@ Result<std::vector<char>> GraphDBSession::Eval(const std::string& input) {
std::chrono::duration_cast<std::chrono::microseconds>(end - start)
.count());
++query_num_;
return Result<std::vector<char>>(
StatusCode::QUERY_FAILED,
"Query failed for procedure id:" + std::to_string((int) type),
result_buffer);
// When query failed, we assume the user may put the error message in the
// output buffer.
// For example, for adhoc_app.cc, if the query failed, the error info will
// be put in the output buffer.
if (result_buffer.size() > 0) {
return Result<std::vector<char>>(
StatusCode::QUERY_FAILED,
std::string{result_buffer.data(), result_buffer.size()}, result_buffer);
} else {
return Result<std::vector<char>>(
StatusCode::QUERY_FAILED,
"Query failed for procedure id:" + std::to_string((int) type),
result_buffer);
}
}

void GraphDBSession::GetAppInfo(Encoder& result) { db_.GetAppInfo(result); }
Expand Down
1 change: 1 addition & 0 deletions flex/engines/graph_db/runtime/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ install_flex_target(runtime_common)
file(GLOB_RECURSE ADHOC_SOURCES "adhoc/*.cc")
add_library(runtime_adhoc SHARED ${ADHOC_SOURCES})
target_link_libraries(runtime_adhoc runtime_common)
target_link_libraries(runtime_adhoc Boost::headers)
install_flex_target(runtime_adhoc)


4 changes: 2 additions & 2 deletions flex/engines/graph_db/runtime/adhoc/operators/dedup.cc
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,8 @@ namespace gs {

namespace runtime {

Context eval_dedup(const algebra::Dedup& opr, const ReadTransaction& txn,
Context&& ctx) {
bl::result<Context> eval_dedup(const algebra::Dedup& opr,
const ReadTransaction& txn, Context&& ctx) {
std::vector<size_t> keys;
std::vector<std::function<RTAny(size_t)>> vars;
int keys_num = opr.keys_size();
Expand Down
19 changes: 13 additions & 6 deletions flex/engines/graph_db/runtime/adhoc/operators/edge_expand.cc
Original file line number Diff line number Diff line change
Expand Up @@ -23,10 +23,10 @@ namespace gs {

namespace runtime {

Context eval_edge_expand(const physical::EdgeExpand& opr,
const ReadTransaction& txn, Context&& ctx,
const std::map<std::string, std::string>& params,
const physical::PhysicalOpr_MetaData& meta) {
bl::result<Context> eval_edge_expand(
const physical::EdgeExpand& opr, const ReadTransaction& txn, Context&& ctx,
const std::map<std::string, std::string>& params,
const physical::PhysicalOpr_MetaData& meta) {
int v_tag;
if (!opr.has_v_tag()) {
v_tag = -1;
Expand All @@ -49,7 +49,9 @@ Context eval_edge_expand(const physical::EdgeExpand& opr,
if (opr.expand_opt() ==
physical::EdgeExpand_ExpandOpt::EdgeExpand_ExpandOpt_VERTEX) {
if (query_params.has_predicate()) {
LOG(FATAL) << "not support";
LOG(ERROR) << "edge expand vertex with predicate is not supported";
RETURN_UNSUPPORTED_ERROR(
"edge expand vertex with predicate is not supported");
} else {
EdgeExpandParams eep;
eep.v_tag = v_tag;
Expand Down Expand Up @@ -82,7 +84,12 @@ Context eval_edge_expand(const physical::EdgeExpand& opr,
eep);
}
} else {
LOG(FATAL) << "not support";
LOG(ERROR) << "EdgeExpand with expand_opt: " << opr.expand_opt()
<< " is "
"not supported";
RETURN_UNSUPPORTED_ERROR(
"EdgeExpand with expand_opt is not supported: " +
std::to_string(static_cast<int>(opr.expand_opt())));
}
return ctx;
}
Expand Down
9 changes: 5 additions & 4 deletions flex/engines/graph_db/runtime/adhoc/operators/get_v.cc
Original file line number Diff line number Diff line change
Expand Up @@ -39,9 +39,9 @@ VOpt parse_opt(const physical::GetV_VOpt& opt) {
}
}

Context eval_get_v(const physical::GetV& opr, const ReadTransaction& txn,
Context&& ctx,
const std::map<std::string, std::string>& params) {
bl::result<Context> eval_get_v(
const physical::GetV& opr, const ReadTransaction& txn, Context&& ctx,
const std::map<std::string, std::string>& params) {
int tag = -1;
if (opr.has_tag()) {
tag = opr.tag().value();
Expand Down Expand Up @@ -76,7 +76,8 @@ Context eval_get_v(const physical::GetV& opr, const ReadTransaction& txn,
}
}

LOG(FATAL) << "not support";
LOG(ERROR) << "Unsupported GetV operation: " << opr.DebugString();
RETURN_UNSUPPORTED_ERROR("Unsupported GetV operation: " + opr.DebugString());
return ctx;
}

Expand Down
29 changes: 21 additions & 8 deletions flex/engines/graph_db/runtime/adhoc/operators/group_by.cc
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
#include "flex/engines/graph_db/runtime/adhoc/var.h"
#include "flex/engines/graph_db/runtime/common/columns/value_columns.h"
#include "flex/engines/graph_db/runtime/common/columns/vertex_columns.h"
#include "flex/engines/graph_db/runtime/common/leaf_utils.h"

namespace gs {

Expand Down Expand Up @@ -412,7 +413,7 @@ std::shared_ptr<IContextColumn> string_to_list(
return builder.finish();
}

std::shared_ptr<IContextColumn> apply_reduce(
bl::result<std::shared_ptr<IContextColumn>> apply_reduce(
const AggFunc& func, const std::vector<std::vector<size_t>>& to_aggregate) {
if (func.aggregate == AggrKind::kSum) {
if (func.vars.size() != 1) {
Expand All @@ -422,8 +423,12 @@ std::shared_ptr<IContextColumn> apply_reduce(
if (var.type() == RTAnyType::kI32Value) {
return numeric_sum<int>(var, to_aggregate);
} else {
LOG(FATAL) << "reduce on " << static_cast<int>(var.type().type_enum_)
LOG(ERROR) << "reduce on " << static_cast<int>(var.type().type_enum_)
<< " is not supported...";
RETURN_UNSUPPORTED_ERROR(
"reduce on " +
std::to_string(static_cast<int>(var.type().type_enum_)) +
" is not supported...");
}
} else if (func.aggregate == AggrKind::kToSet) {
if (func.vars.size() != 1) {
Expand All @@ -433,7 +438,12 @@ std::shared_ptr<IContextColumn> apply_reduce(
if (var.type() == RTAnyType::kStringValue) {
return string_to_set(var, to_aggregate);
} else {
LOG(FATAL) << "not support";
LOG(ERROR) << "reduce on " << static_cast<int>(var.type().type_enum_)
<< " is not supported...";
RETURN_UNSUPPORTED_ERROR(
"reduce on " +
std::to_string(static_cast<int>(var.type().type_enum_)) +
" is not supported...");
}
} else if (func.aggregate == AggrKind::kCountDistinct) {
if (func.vars.size() == 1 && func.vars[0].type() == RTAnyType::kVertex) {
Expand Down Expand Up @@ -513,12 +523,15 @@ std::shared_ptr<IContextColumn> apply_reduce(
}
}

LOG(FATAL) << "unsupport " << static_cast<int>(func.aggregate);
LOG(ERROR) << "Unsupported aggregate function "
<< static_cast<int>(func.aggregate);
RETURN_UNSUPPORTED_ERROR("Unsupported aggregate function " +
std::to_string(static_cast<int>(func.aggregate)));
return nullptr;
}

Context eval_group_by(const physical::GroupBy& opr, const ReadTransaction& txn,
Context&& ctx) {
bl::result<Context> eval_group_by(const physical::GroupBy& opr,
const ReadTransaction& txn, Context&& ctx) {
std::vector<AggFunc> functions;
std::vector<AggKey> mappings;
int func_num = opr.functions_size();
Expand All @@ -535,7 +548,7 @@ Context eval_group_by(const physical::GroupBy& opr, const ReadTransaction& txn,
for (size_t _i = 0; _i < ctx.row_num(); ++_i) {
tmp.emplace_back(_i);
}
auto new_col = apply_reduce(functions[i], {tmp});
BOOST_LEAF_AUTO(new_col, apply_reduce(functions[i], {tmp}));
ret.set(functions[i].alias, new_col);
ret.append_tag_id(functions[i].alias);
}
Expand Down Expand Up @@ -569,7 +582,7 @@ Context eval_group_by(const physical::GroupBy& opr, const ReadTransaction& txn,
}

for (int i = 0; i < func_num; ++i) {
auto new_col = apply_reduce(functions[i], to_aggregate);
BOOST_LEAF_AUTO(new_col, apply_reduce(functions[i], to_aggregate));
ret.set(functions[i].alias, new_col);
ret.append_tag_id(functions[i].alias);
}
Expand Down
6 changes: 3 additions & 3 deletions flex/engines/graph_db/runtime/adhoc/operators/intersect.cc
Original file line number Diff line number Diff line change
Expand Up @@ -20,9 +20,9 @@
namespace gs {
namespace runtime {

Context eval_intersect(const ReadTransaction& txn,
const physical::Intersect& opr,
std::vector<Context>&& ctxs) {
bl::result<Context> eval_intersect(const ReadTransaction& txn,
const physical::Intersect& opr,
std::vector<Context>&& ctxs) {
int32_t key = opr.key();
if (ctxs.size() == 1) {
return std::move(ctxs[0]);
Expand Down
12 changes: 8 additions & 4 deletions flex/engines/graph_db/runtime/adhoc/operators/join.cc
Original file line number Diff line number Diff line change
Expand Up @@ -18,19 +18,22 @@

namespace gs {
namespace runtime {
Context eval_join(const physical::Join& opr, Context&& ctx, Context&& ctx2) {
bl::result<Context> eval_join(const physical::Join& opr, Context&& ctx,
Context&& ctx2) {
JoinParams p;
auto left_keys = opr.left_keys();
for (int i = 0; i < left_keys.size(); i++) {
if (!left_keys.Get(i).has_tag()) {
LOG(FATAL) << "left_keys should have tag";
LOG(ERROR) << "left_keys should have tag";
RETURN_BAD_REQUEST_ERROR("left_keys should have tag");
}
p.left_columns.push_back(left_keys.Get(i).tag().id());
}
auto right_keys = opr.right_keys();
for (int i = 0; i < right_keys.size(); i++) {
if (!right_keys.Get(i).has_tag()) {
LOG(FATAL) << "right_keys should have tag";
LOG(ERROR) << "right_keys should have tag";
RETURN_BAD_REQUEST_ERROR("right_keys should have tag");
}
p.right_columns.push_back(right_keys.Get(i).tag().id());
}
Expand All @@ -48,7 +51,8 @@ Context eval_join(const physical::Join& opr, Context&& ctx, Context&& ctx2) {
p.join_type = JoinKind::kLeftOuterJoin;
break;
default:
LOG(FATAL) << "unsupported join kind" << opr.join_kind();
RETURN_UNSUPPORTED_ERROR("Unsupported join kind: " +
std::to_string(static_cast<int>(opr.join_kind())));
}
return Join::join(std::move(ctx), std::move(ctx2), p);
}
Expand Down
2 changes: 1 addition & 1 deletion flex/engines/graph_db/runtime/adhoc/operators/limit.cc
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@
namespace gs {
namespace runtime {

Context eval_limit(const algebra::Limit& opr, Context&& ctx) {
bl::result<Context> eval_limit(const algebra::Limit& opr, Context&& ctx) {
int lower = 0;
int upper = ctx.row_num();

Expand Down
Loading
Loading