Skip to content

Commit

Permalink
fix(interactive): Fix the problem of actor scope cancellation and cre…
Browse files Browse the repository at this point in the history
…ation (#4089)

Resolve the issue of actor scope cancellation and creation.

Solve the issue #4090 

When the query service is not available, we will return 503. `Unable to
send message, the target actor has been canceled!`
  • Loading branch information
zhanglei1949 authored Jul 29, 2024
1 parent 7e682ab commit 95bf6c7
Show file tree
Hide file tree
Showing 7 changed files with 174 additions and 55 deletions.
3 changes: 3 additions & 0 deletions .github/workflows/interactive.yml
Original file line number Diff line number Diff line change
Expand Up @@ -160,9 +160,12 @@ jobs:
GLOG_v=10 ./bin/bulk_loader -g ${SCHEMA_FILE} -l ${BULK_LOAD_FILE} -d ${TMP_INTERACTIVE_WORKSPACE}/data/modern_graph/indices/
cd ${GITHUB_WORKSPACE}/flex/tests/hqps
sed -i 's/interactive_workspace/temp_workspace/g' ./engine_config_test.yaml
# set thread_num_per_worker to 4
sed -i 's/thread_num_per_worker: 1/thread_num_per_worker: 4/g' ./engine_config_test.yaml
bash hqps_sdk_test.sh ${TMP_INTERACTIVE_WORKSPACE} ./engine_config_test.yaml java
bash hqps_sdk_test.sh ${TMP_INTERACTIVE_WORKSPACE} ./engine_config_test.yaml python
sed -i 's/temp_workspace/interactive_workspace/g' ./engine_config_test.yaml
sed -i 's/thread_num_per_worker: 4/thread_num_per_worker: 1/g' ./engine_config_test.yaml
- name: Sample Query test
env:
Expand Down
160 changes: 114 additions & 46 deletions flex/engines/http_server/handler/graph_db_http_handler.cc
Original file line number Diff line number Diff line change
Expand Up @@ -137,19 +137,29 @@ bool is_running_graph(const seastar::sstring& graph_id) {
////////////////////////////stored_proc_handler////////////////////////////
class stored_proc_handler : public StoppableHandler {
public:
static std::vector<std::vector<executor_ref>>& get_executors() {
static std::vector<std::vector<executor_ref>> executor_refs;
return executor_refs;
}

stored_proc_handler(uint32_t init_group_id, uint32_t max_group_id,
uint32_t group_inc_step, uint32_t shard_concurrency)
: StoppableHandler(init_group_id, max_group_id, group_inc_step,
shard_concurrency),
dispatcher_(shard_concurrency) {
executor_refs_.reserve(shard_concurrency);
auto& executors = get_executors();
CHECK(executors.size() >= StoppableHandler::shard_id());
executors[StoppableHandler::shard_id()].reserve(shard_concurrency);
hiactor::scope_builder builder;
builder.set_shard(hiactor::local_shard_id())
LOG(INFO) << "Creating stored proc handler on shard id: "
<< StoppableHandler::shard_id();
builder.set_shard(StoppableHandler::shard_id())
.enter_sub_scope(hiactor::scope<executor_group>(0))
.enter_sub_scope(hiactor::scope<hiactor::actor_group>(
StoppableHandler::cur_group_id_));
for (unsigned i = 0; i < StoppableHandler::shard_concurrency_; ++i) {
executor_refs_.emplace_back(builder.build_ref<executor_ref>(i));
executors[StoppableHandler::shard_id()].emplace_back(
builder.build_ref<executor_ref>(i));
}
#ifdef HAVE_OPENTELEMETRY_CPP
total_counter_ = otel::create_int_counter("hqps_procedure_query_total");
Expand All @@ -160,18 +170,20 @@ class stored_proc_handler : public StoppableHandler {
~stored_proc_handler() override = default;

seastar::future<> stop() override {
return StoppableHandler::cancel_scope([this] { executor_refs_.clear(); });
return StoppableHandler::cancel_scope(
[this] { get_executors()[StoppableHandler::shard_id()].clear(); });
}

bool start() override {
if (executor_refs_.size() > 0) {
if (get_executors()[StoppableHandler::shard_id()].size() > 0) {
LOG(ERROR) << "The actors have been already created!";
return false;
}
return StoppableHandler::start_scope(
[this](hiactor::scope_builder& builder) {
for (unsigned i = 0; i < StoppableHandler::shard_concurrency_; ++i) {
executor_refs_.emplace_back(builder.build_ref<executor_ref>(i));
get_executors()[StoppableHandler::shard_id()].emplace_back(
builder.build_ref<executor_ref>(i));
}
});
}
Expand Down Expand Up @@ -233,7 +245,7 @@ class stored_proc_handler : public StoppableHandler {
auto start_ts = gs::GetCurrentTimeStamp();
#endif // HAVE_OPENTELEMETRY_CPP

return executor_refs_[dst_executor]
return get_executors()[StoppableHandler::shard_id()][dst_executor]
.run_graph_db_query(query_param{std::move(req->content)})
.then([last_byte
#ifdef HAVE_OPENTELEMETRY_CPP
Expand Down Expand Up @@ -273,11 +285,20 @@ class stored_proc_handler : public StoppableHandler {
#endif // HAVE_OPENTELEMETRY_CPP
](seastar::future<query_result>&& fut) mutable {
if (__builtin_expect(fut.failed(), false)) {
rep->set_status(
seastar::httpd::reply::status_type::internal_server_error);
try {
std::rethrow_exception(fut.get_exception());
} catch (std::exception& e) {
// if the exception's message contains "Unable to send message",
// then set the status to 503, otherwise set the status to 500
if (std::string(e.what()).find(
StoppableHandler::ACTOR_SCOPE_CANCEL_MESSAGE) !=
std::string::npos) {
rep->set_status(
seastar::httpd::reply::status_type::service_unavailable);
} else {
rep->set_status(
seastar::httpd::reply::status_type::internal_server_error);
}
rep->write_body("bin", seastar::sstring(e.what()));
}
#ifdef HAVE_OPENTELEMETRY_CPP
Expand Down Expand Up @@ -313,7 +334,6 @@ class stored_proc_handler : public StoppableHandler {

private:
query_dispatcher dispatcher_;
std::vector<executor_ref> executor_refs_;
#ifdef HAVE_OPENTELEMETRY_CPP
opentelemetry::nostd::unique_ptr<opentelemetry::metrics::Counter<uint64_t>>
total_counter_;
Expand All @@ -324,27 +344,43 @@ class stored_proc_handler : public StoppableHandler {

class adhoc_query_handler : public StoppableHandler {
public:
static std::vector<std::vector<executor_ref>>& get_executors() {
static std::vector<std::vector<executor_ref>> executor_refs;
return executor_refs;
}

static std::vector<std::vector<codegen_actor_ref>>& get_codegen_actors() {
static std::vector<std::vector<codegen_actor_ref>> codegen_actor_refs;
return codegen_actor_refs;
}

adhoc_query_handler(uint32_t init_group_id, uint32_t max_group_id,
uint32_t group_inc_step, uint32_t shard_concurrency)
: StoppableHandler(init_group_id, max_group_id, group_inc_step,
shard_concurrency),
executor_idx_(0) {
executor_refs_.reserve(shard_concurrency_);
auto& executor_refs = get_executors();
CHECK(executor_refs.size() >= StoppableHandler::shard_id());
executor_refs[StoppableHandler::shard_id()].reserve(shard_concurrency_);
{
hiactor::scope_builder builder;
builder.set_shard(hiactor::local_shard_id())
builder.set_shard(StoppableHandler::shard_id())
.enter_sub_scope(hiactor::scope<executor_group>(0))
.enter_sub_scope(hiactor::scope<hiactor::actor_group>(init_group_id));
for (unsigned i = 0; i < shard_concurrency_; ++i) {
executor_refs_.emplace_back(builder.build_ref<executor_ref>(i));
executor_refs[StoppableHandler::shard_id()].emplace_back(
builder.build_ref<executor_ref>(i));
}
}
auto& codegen_actor_refs = get_codegen_actors();
CHECK(codegen_actor_refs.size() >= StoppableHandler::shard_id());
{
hiactor::scope_builder builder;
builder.set_shard(hiactor::local_shard_id())
builder.set_shard(StoppableHandler::shard_id())
.enter_sub_scope(hiactor::scope<executor_group>(0))
.enter_sub_scope(hiactor::scope<hiactor::actor_group>(init_group_id));
codegen_actor_refs_.emplace_back(builder.build_ref<codegen_actor_ref>(0));
codegen_actor_refs[StoppableHandler::shard_id()].emplace_back(
builder.build_ref<codegen_actor_ref>(0));
}
#ifdef HAVE_OPENTELEMETRY_CPP
total_counter_ = otel::create_int_counter("hqps_adhoc_query_total");
Expand All @@ -357,23 +393,28 @@ class adhoc_query_handler : public StoppableHandler {

seastar::future<> stop() override {
return StoppableHandler::cancel_scope([this] {
executor_refs_.clear();
codegen_actor_refs_.clear();
LOG(INFO) << "Stopping adhoc actors on shard id: "
<< StoppableHandler::shard_id();
get_executors()[StoppableHandler::shard_id()].clear();
get_codegen_actors()[StoppableHandler::shard_id()].clear();
});
}

bool start() override {
if (executor_refs_.size() > 0 || codegen_actor_refs_.size() > 0) {
if (get_executors()[StoppableHandler::shard_id()].size() > 0 ||
get_codegen_actors()[StoppableHandler::shard_id()].size() > 0) {
LOG(ERROR) << "The actors have been already created!";
return false;
}
return StoppableHandler::start_scope([this](
hiactor::scope_builder& builder) {
for (unsigned i = 0; i < StoppableHandler::shard_concurrency_; ++i) {
executor_refs_.emplace_back(builder.build_ref<executor_ref>(i));
}
codegen_actor_refs_.emplace_back(builder.build_ref<codegen_actor_ref>(0));
});
return StoppableHandler::start_scope(
[this](hiactor::scope_builder& builder) {
for (unsigned i = 0; i < StoppableHandler::shard_concurrency_; ++i) {
get_executors()[StoppableHandler::shard_id()].emplace_back(
builder.build_ref<executor_ref>(i));
}
get_codegen_actors()[StoppableHandler::shard_id()].emplace_back(
builder.build_ref<codegen_actor_ref>(0));
});
}

seastar::future<std::unique_ptr<seastar::httpd::reply>> handle(
Expand Down Expand Up @@ -416,7 +457,7 @@ class adhoc_query_handler : public StoppableHandler {
std::chrono::system_clock::now().time_since_epoch())
.count();
#endif // HAVE_OPENTELEMETRY_CPP
return codegen_actor_refs_[0]
return get_codegen_actors()[StoppableHandler::shard_id()][0]
.do_codegen(query_param{std::move(req->content)})
.then([this, dst_executor
#ifdef HAVE_OPENTELEMETRY_CPP
Expand All @@ -436,7 +477,7 @@ class adhoc_query_handler : public StoppableHandler {
// The content contains the path to dynamic library
param.content.append(gs::Schema::HQPS_ADHOC_WRITE_PLUGIN_ID_STR, 1);
param.content.append(gs::GraphDBSession::kCypherProtoAdhocStr, 1);
return executor_refs_[dst_executor]
return get_executors()[StoppableHandler::shard_id()][dst_executor]
.run_graph_db_query(query_param{std::move(param.content)})
.then([
#ifdef HAVE_OPENTELEMETRY_CPP
Expand Down Expand Up @@ -477,11 +518,20 @@ class adhoc_query_handler : public StoppableHandler {
#endif // HAVE_OPENTELEMETRY_CPP
](seastar::future<query_result>&& fut) mutable {
if (__builtin_expect(fut.failed(), false)) {
rep->set_status(
seastar::httpd::reply::status_type::internal_server_error);
try {
std::rethrow_exception(fut.get_exception());
} catch (std::exception& e) {
// if the exception's message contains "Unable to send message",
// then set the status to 503, otherwise set the status to 500
if (std::string(e.what()).find(
StoppableHandler::ACTOR_SCOPE_CANCEL_MESSAGE) !=
std::string::npos) {
rep->set_status(
seastar::httpd::reply::status_type::service_unavailable);
} else {
rep->set_status(
seastar::httpd::reply::status_type::internal_server_error);
}
rep->write_body("bin", seastar::sstring(e.what()));
#ifdef HAVE_OPENTELEMETRY_CPP
std::map<std::string, std::string> labels = {
Expand Down Expand Up @@ -521,8 +571,7 @@ class adhoc_query_handler : public StoppableHandler {

private:
uint32_t executor_idx_;
std::vector<executor_ref> executor_refs_;
std::vector<codegen_actor_ref> codegen_actor_refs_;

#ifdef HAVE_OPENTELEMETRY_CPP
opentelemetry::nostd::unique_ptr<opentelemetry::metrics::Counter<uint64_t>>
total_counter_;
Expand All @@ -544,7 +593,10 @@ graph_db_http_handler::graph_db_http_handler(uint16_t http_port,
all_graph_query_handlers_.resize(shard_num);
if (enable_adhoc_handlers_) {
adhoc_query_handlers_.resize(shard_num);
adhoc_query_handler::get_executors().resize(shard_num);
adhoc_query_handler::get_codegen_actors().resize(shard_num);
}
stored_proc_handler::get_executors().resize(shard_num);
}

graph_db_http_handler::~graph_db_http_handler() {
Expand All @@ -563,31 +615,47 @@ bool graph_db_http_handler::is_actors_running() const {
return actors_running_.load();
}

seastar::future<> graph_db_http_handler::stop_query_actors() {
return current_graph_query_handlers_[hiactor::local_shard_id()]
seastar::future<> graph_db_http_handler::stop_query_actors(size_t index) {
if (index >= current_graph_query_handlers_.size()) {
return seastar::make_ready_future<>();
}
return current_graph_query_handlers_[index]
->stop()
.then([this] {
return all_graph_query_handlers_[hiactor::local_shard_id()]->stop();
.then([this, index] {
LOG(INFO) << "Stopped current query actors on shard id: " << index;
return all_graph_query_handlers_[index]->stop();
})
.then([this] {
.then([this, index] {
LOG(INFO) << "Stopped all query actors on shard id: " << index;
if (enable_adhoc_handlers_.load()) {
return adhoc_query_handlers_[hiactor::local_shard_id()]->stop();
} else {
return seastar::make_ready_future<>();
return adhoc_query_handlers_[index]->stop();
}
})
.then([this] {
actors_running_.store(false);
return seastar::make_ready_future<>();
})
.then([this, index] {
if (index + 1 == current_graph_query_handlers_.size()) {
actors_running_.store(false);
return seastar::make_ready_future<>();
} else {
return stop_query_actors(index + 1);
}
});
}

seastar::future<> graph_db_http_handler::stop_query_actors() {
return stop_query_actors(0);
}

void graph_db_http_handler::start_query_actors() {
current_graph_query_handlers_[hiactor::local_shard_id()]->start();
all_graph_query_handlers_[hiactor::local_shard_id()]->start();
if (enable_adhoc_handlers_.load()) {
adhoc_query_handlers_[hiactor::local_shard_id()]->start();
// to start actors, call method on each handler
for (size_t i = 0; i < current_graph_query_handlers_.size(); ++i) {
current_graph_query_handlers_[i]->start();
all_graph_query_handlers_[i]->start();
if (enable_adhoc_handlers_.load()) {
adhoc_query_handlers_[i]->start();
}
}

actors_running_.store(true);
}

Expand Down
16 changes: 10 additions & 6 deletions flex/engines/http_server/handler/graph_db_http_handler.h
Original file line number Diff line number Diff line change
Expand Up @@ -26,16 +26,21 @@ namespace server {

class StoppableHandler : public seastar::httpd::handler_base {
public:
static constexpr const char* ACTOR_SCOPE_CANCEL_MESSAGE =
"Unable to send message";
StoppableHandler(uint32_t init_group_id, uint32_t max_group_id,
uint32_t group_inc_step, uint32_t shard_concurrency)
: is_cancelled_(false),
cur_group_id_(init_group_id),
max_group_id_(max_group_id),
group_inc_step_(group_inc_step),
shard_concurrency_(shard_concurrency) {}
shard_concurrency_(shard_concurrency),
shard_id_(hiactor::local_shard_id()) {}

inline bool is_stopped() const { return is_cancelled_; }

inline uint32_t shard_id() const { return shard_id_; }

virtual seastar::future<> stop() = 0;
virtual bool start() = 0;

Expand All @@ -47,7 +52,7 @@ class StoppableHandler : public seastar::httpd::handler_base {
return seastar::make_ready_future<>();
}
hiactor::scope_builder builder;
builder.set_shard(hiactor::local_shard_id())
builder.set_shard(shard_id_)
.enter_sub_scope(hiactor::scope<executor_group>(0))
.enter_sub_scope(hiactor::scope<hiactor::actor_group>(cur_group_id_));
return hiactor::actor_engine()
Expand Down Expand Up @@ -83,12 +88,9 @@ class StoppableHandler : public seastar::httpd::handler_base {
}
cur_group_id_ += group_inc_step_;
hiactor::scope_builder builder;
builder.set_shard(hiactor::local_shard_id())
builder.set_shard(shard_id_)
.enter_sub_scope(hiactor::scope<executor_group>(0))
.enter_sub_scope(hiactor::scope<hiactor::actor_group>(cur_group_id_));
// for (unsigned i = 0; i < shard_concurrency_; ++i) {
// executor_refs_.emplace_back(builder.build_ref<executor_ref>(i));
// }
func(builder);
is_cancelled_ = false; // locked outside
return true;
Expand All @@ -98,6 +100,7 @@ class StoppableHandler : public seastar::httpd::handler_base {
uint32_t cur_group_id_;
const uint32_t max_group_id_, group_inc_step_;
const uint32_t shard_concurrency_;
const uint32_t shard_id_;
};

class graph_db_http_handler {
Expand All @@ -122,6 +125,7 @@ class graph_db_http_handler {

private:
seastar::future<> set_routes();
seastar::future<> stop_query_actors(size_t index);

private:
const uint16_t http_port_;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -415,8 +415,11 @@ public void test9CallCppProcedureJson() {
.SIGNED_INT32))));
{
// 1. call cpp procedure with graph id and procedure id, sync
Result<IrResult.CollectiveResults> resp = session.callProcedure(graphId, request);
assertOk(resp);
// call 10 times to make sure all shard is working.
for (int i = 0; i < 10; i++) {
Result<IrResult.CollectiveResults> resp = session.callProcedure(graphId, request);
assertOk(resp);
}
}
{
// 2. call cpp procedure with graph id and procedure id, async
Expand Down
Loading

0 comments on commit 95bf6c7

Please sign in to comment.