From 95bf6c7612facfe9955dce7eb201c3ddf5581b06 Mon Sep 17 00:00:00 2001 From: Zhang Lei Date: Mon, 29 Jul 2024 19:31:37 +0800 Subject: [PATCH] fix(interactive): Fix the problem of actor scope cancellation and creation (#4089) Resolve the issue of actor scope cancellation and creation. Solve the issue #4090 When the query service is not available, we will return 503. `Unable to send message, the target actor has been canceled!` --- .github/workflows/interactive.yml | 3 + .../handler/graph_db_http_handler.cc | 160 +++++++++++++----- .../handler/graph_db_http_handler.h | 16 +- .../interactive/client/DriverTest.java | 7 +- .../python/gs_interactive/client/result.py | 3 + .../python/gs_interactive/client/status.py | 8 +- .../sdk/python/test/test_driver.py | 32 ++++ 7 files changed, 174 insertions(+), 55 deletions(-) diff --git a/.github/workflows/interactive.yml b/.github/workflows/interactive.yml index 37408f956d9c..81491b240b75 100644 --- a/.github/workflows/interactive.yml +++ b/.github/workflows/interactive.yml @@ -160,9 +160,12 @@ jobs: GLOG_v=10 ./bin/bulk_loader -g ${SCHEMA_FILE} -l ${BULK_LOAD_FILE} -d ${TMP_INTERACTIVE_WORKSPACE}/data/modern_graph/indices/ cd ${GITHUB_WORKSPACE}/flex/tests/hqps sed -i 's/interactive_workspace/temp_workspace/g' ./engine_config_test.yaml + # set thread_num_per_worker to 4 + sed -i 's/thread_num_per_worker: 1/thread_num_per_worker: 4/g' ./engine_config_test.yaml bash hqps_sdk_test.sh ${TMP_INTERACTIVE_WORKSPACE} ./engine_config_test.yaml java bash hqps_sdk_test.sh ${TMP_INTERACTIVE_WORKSPACE} ./engine_config_test.yaml python sed -i 's/temp_workspace/interactive_workspace/g' ./engine_config_test.yaml + sed -i 's/thread_num_per_worker: 4/thread_num_per_worker: 1/g' ./engine_config_test.yaml - name: Sample Query test env: diff --git a/flex/engines/http_server/handler/graph_db_http_handler.cc b/flex/engines/http_server/handler/graph_db_http_handler.cc index 88aed71f6703..15ab11e703de 100644 --- a/flex/engines/http_server/handler/graph_db_http_handler.cc +++ b/flex/engines/http_server/handler/graph_db_http_handler.cc @@ -137,19 +137,29 @@ bool is_running_graph(const seastar::sstring& graph_id) { ////////////////////////////stored_proc_handler//////////////////////////// class stored_proc_handler : public StoppableHandler { public: + static std::vector>& get_executors() { + static std::vector> executor_refs; + return executor_refs; + } + stored_proc_handler(uint32_t init_group_id, uint32_t max_group_id, uint32_t group_inc_step, uint32_t shard_concurrency) : StoppableHandler(init_group_id, max_group_id, group_inc_step, shard_concurrency), dispatcher_(shard_concurrency) { - executor_refs_.reserve(shard_concurrency); + auto& executors = get_executors(); + CHECK(executors.size() >= StoppableHandler::shard_id()); + executors[StoppableHandler::shard_id()].reserve(shard_concurrency); hiactor::scope_builder builder; - builder.set_shard(hiactor::local_shard_id()) + LOG(INFO) << "Creating stored proc handler on shard id: " + << StoppableHandler::shard_id(); + builder.set_shard(StoppableHandler::shard_id()) .enter_sub_scope(hiactor::scope(0)) .enter_sub_scope(hiactor::scope( StoppableHandler::cur_group_id_)); for (unsigned i = 0; i < StoppableHandler::shard_concurrency_; ++i) { - executor_refs_.emplace_back(builder.build_ref(i)); + executors[StoppableHandler::shard_id()].emplace_back( + builder.build_ref(i)); } #ifdef HAVE_OPENTELEMETRY_CPP total_counter_ = otel::create_int_counter("hqps_procedure_query_total"); @@ -160,18 +170,20 @@ class stored_proc_handler : public StoppableHandler { ~stored_proc_handler() override = default; seastar::future<> stop() override { - return StoppableHandler::cancel_scope([this] { executor_refs_.clear(); }); + return StoppableHandler::cancel_scope( + [this] { get_executors()[StoppableHandler::shard_id()].clear(); }); } bool start() override { - if (executor_refs_.size() > 0) { + if (get_executors()[StoppableHandler::shard_id()].size() > 0) { LOG(ERROR) << "The actors have been already created!"; return false; } return StoppableHandler::start_scope( [this](hiactor::scope_builder& builder) { for (unsigned i = 0; i < StoppableHandler::shard_concurrency_; ++i) { - executor_refs_.emplace_back(builder.build_ref(i)); + get_executors()[StoppableHandler::shard_id()].emplace_back( + builder.build_ref(i)); } }); } @@ -233,7 +245,7 @@ class stored_proc_handler : public StoppableHandler { auto start_ts = gs::GetCurrentTimeStamp(); #endif // HAVE_OPENTELEMETRY_CPP - return executor_refs_[dst_executor] + return get_executors()[StoppableHandler::shard_id()][dst_executor] .run_graph_db_query(query_param{std::move(req->content)}) .then([last_byte #ifdef HAVE_OPENTELEMETRY_CPP @@ -273,11 +285,20 @@ class stored_proc_handler : public StoppableHandler { #endif // HAVE_OPENTELEMETRY_CPP ](seastar::future&& fut) mutable { if (__builtin_expect(fut.failed(), false)) { - rep->set_status( - seastar::httpd::reply::status_type::internal_server_error); try { std::rethrow_exception(fut.get_exception()); } catch (std::exception& e) { + // if the exception's message contains "Unable to send message", + // then set the status to 503, otherwise set the status to 500 + if (std::string(e.what()).find( + StoppableHandler::ACTOR_SCOPE_CANCEL_MESSAGE) != + std::string::npos) { + rep->set_status( + seastar::httpd::reply::status_type::service_unavailable); + } else { + rep->set_status( + seastar::httpd::reply::status_type::internal_server_error); + } rep->write_body("bin", seastar::sstring(e.what())); } #ifdef HAVE_OPENTELEMETRY_CPP @@ -313,7 +334,6 @@ class stored_proc_handler : public StoppableHandler { private: query_dispatcher dispatcher_; - std::vector executor_refs_; #ifdef HAVE_OPENTELEMETRY_CPP opentelemetry::nostd::unique_ptr> total_counter_; @@ -324,27 +344,43 @@ class stored_proc_handler : public StoppableHandler { class adhoc_query_handler : public StoppableHandler { public: + static std::vector>& get_executors() { + static std::vector> executor_refs; + return executor_refs; + } + + static std::vector>& get_codegen_actors() { + static std::vector> codegen_actor_refs; + return codegen_actor_refs; + } + adhoc_query_handler(uint32_t init_group_id, uint32_t max_group_id, uint32_t group_inc_step, uint32_t shard_concurrency) : StoppableHandler(init_group_id, max_group_id, group_inc_step, shard_concurrency), executor_idx_(0) { - executor_refs_.reserve(shard_concurrency_); + auto& executor_refs = get_executors(); + CHECK(executor_refs.size() >= StoppableHandler::shard_id()); + executor_refs[StoppableHandler::shard_id()].reserve(shard_concurrency_); { hiactor::scope_builder builder; - builder.set_shard(hiactor::local_shard_id()) + builder.set_shard(StoppableHandler::shard_id()) .enter_sub_scope(hiactor::scope(0)) .enter_sub_scope(hiactor::scope(init_group_id)); for (unsigned i = 0; i < shard_concurrency_; ++i) { - executor_refs_.emplace_back(builder.build_ref(i)); + executor_refs[StoppableHandler::shard_id()].emplace_back( + builder.build_ref(i)); } } + auto& codegen_actor_refs = get_codegen_actors(); + CHECK(codegen_actor_refs.size() >= StoppableHandler::shard_id()); { hiactor::scope_builder builder; - builder.set_shard(hiactor::local_shard_id()) + builder.set_shard(StoppableHandler::shard_id()) .enter_sub_scope(hiactor::scope(0)) .enter_sub_scope(hiactor::scope(init_group_id)); - codegen_actor_refs_.emplace_back(builder.build_ref(0)); + codegen_actor_refs[StoppableHandler::shard_id()].emplace_back( + builder.build_ref(0)); } #ifdef HAVE_OPENTELEMETRY_CPP total_counter_ = otel::create_int_counter("hqps_adhoc_query_total"); @@ -357,23 +393,28 @@ class adhoc_query_handler : public StoppableHandler { seastar::future<> stop() override { return StoppableHandler::cancel_scope([this] { - executor_refs_.clear(); - codegen_actor_refs_.clear(); + LOG(INFO) << "Stopping adhoc actors on shard id: " + << StoppableHandler::shard_id(); + get_executors()[StoppableHandler::shard_id()].clear(); + get_codegen_actors()[StoppableHandler::shard_id()].clear(); }); } bool start() override { - if (executor_refs_.size() > 0 || codegen_actor_refs_.size() > 0) { + if (get_executors()[StoppableHandler::shard_id()].size() > 0 || + get_codegen_actors()[StoppableHandler::shard_id()].size() > 0) { LOG(ERROR) << "The actors have been already created!"; return false; } - return StoppableHandler::start_scope([this]( - hiactor::scope_builder& builder) { - for (unsigned i = 0; i < StoppableHandler::shard_concurrency_; ++i) { - executor_refs_.emplace_back(builder.build_ref(i)); - } - codegen_actor_refs_.emplace_back(builder.build_ref(0)); - }); + return StoppableHandler::start_scope( + [this](hiactor::scope_builder& builder) { + for (unsigned i = 0; i < StoppableHandler::shard_concurrency_; ++i) { + get_executors()[StoppableHandler::shard_id()].emplace_back( + builder.build_ref(i)); + } + get_codegen_actors()[StoppableHandler::shard_id()].emplace_back( + builder.build_ref(0)); + }); } seastar::future> handle( @@ -416,7 +457,7 @@ class adhoc_query_handler : public StoppableHandler { std::chrono::system_clock::now().time_since_epoch()) .count(); #endif // HAVE_OPENTELEMETRY_CPP - return codegen_actor_refs_[0] + return get_codegen_actors()[StoppableHandler::shard_id()][0] .do_codegen(query_param{std::move(req->content)}) .then([this, dst_executor #ifdef HAVE_OPENTELEMETRY_CPP @@ -436,7 +477,7 @@ class adhoc_query_handler : public StoppableHandler { // The content contains the path to dynamic library param.content.append(gs::Schema::HQPS_ADHOC_WRITE_PLUGIN_ID_STR, 1); param.content.append(gs::GraphDBSession::kCypherProtoAdhocStr, 1); - return executor_refs_[dst_executor] + return get_executors()[StoppableHandler::shard_id()][dst_executor] .run_graph_db_query(query_param{std::move(param.content)}) .then([ #ifdef HAVE_OPENTELEMETRY_CPP @@ -477,11 +518,20 @@ class adhoc_query_handler : public StoppableHandler { #endif // HAVE_OPENTELEMETRY_CPP ](seastar::future&& fut) mutable { if (__builtin_expect(fut.failed(), false)) { - rep->set_status( - seastar::httpd::reply::status_type::internal_server_error); try { std::rethrow_exception(fut.get_exception()); } catch (std::exception& e) { + // if the exception's message contains "Unable to send message", + // then set the status to 503, otherwise set the status to 500 + if (std::string(e.what()).find( + StoppableHandler::ACTOR_SCOPE_CANCEL_MESSAGE) != + std::string::npos) { + rep->set_status( + seastar::httpd::reply::status_type::service_unavailable); + } else { + rep->set_status( + seastar::httpd::reply::status_type::internal_server_error); + } rep->write_body("bin", seastar::sstring(e.what())); #ifdef HAVE_OPENTELEMETRY_CPP std::map labels = { @@ -521,8 +571,7 @@ class adhoc_query_handler : public StoppableHandler { private: uint32_t executor_idx_; - std::vector executor_refs_; - std::vector codegen_actor_refs_; + #ifdef HAVE_OPENTELEMETRY_CPP opentelemetry::nostd::unique_ptr> total_counter_; @@ -544,7 +593,10 @@ graph_db_http_handler::graph_db_http_handler(uint16_t http_port, all_graph_query_handlers_.resize(shard_num); if (enable_adhoc_handlers_) { adhoc_query_handlers_.resize(shard_num); + adhoc_query_handler::get_executors().resize(shard_num); + adhoc_query_handler::get_codegen_actors().resize(shard_num); } + stored_proc_handler::get_executors().resize(shard_num); } graph_db_http_handler::~graph_db_http_handler() { @@ -563,31 +615,47 @@ bool graph_db_http_handler::is_actors_running() const { return actors_running_.load(); } -seastar::future<> graph_db_http_handler::stop_query_actors() { - return current_graph_query_handlers_[hiactor::local_shard_id()] +seastar::future<> graph_db_http_handler::stop_query_actors(size_t index) { + if (index >= current_graph_query_handlers_.size()) { + return seastar::make_ready_future<>(); + } + return current_graph_query_handlers_[index] ->stop() - .then([this] { - return all_graph_query_handlers_[hiactor::local_shard_id()]->stop(); + .then([this, index] { + LOG(INFO) << "Stopped current query actors on shard id: " << index; + return all_graph_query_handlers_[index]->stop(); }) - .then([this] { + .then([this, index] { + LOG(INFO) << "Stopped all query actors on shard id: " << index; if (enable_adhoc_handlers_.load()) { - return adhoc_query_handlers_[hiactor::local_shard_id()]->stop(); - } else { - return seastar::make_ready_future<>(); + return adhoc_query_handlers_[index]->stop(); } - }) - .then([this] { - actors_running_.store(false); return seastar::make_ready_future<>(); + }) + .then([this, index] { + if (index + 1 == current_graph_query_handlers_.size()) { + actors_running_.store(false); + return seastar::make_ready_future<>(); + } else { + return stop_query_actors(index + 1); + } }); } +seastar::future<> graph_db_http_handler::stop_query_actors() { + return stop_query_actors(0); +} + void graph_db_http_handler::start_query_actors() { - current_graph_query_handlers_[hiactor::local_shard_id()]->start(); - all_graph_query_handlers_[hiactor::local_shard_id()]->start(); - if (enable_adhoc_handlers_.load()) { - adhoc_query_handlers_[hiactor::local_shard_id()]->start(); + // to start actors, call method on each handler + for (size_t i = 0; i < current_graph_query_handlers_.size(); ++i) { + current_graph_query_handlers_[i]->start(); + all_graph_query_handlers_[i]->start(); + if (enable_adhoc_handlers_.load()) { + adhoc_query_handlers_[i]->start(); + } } + actors_running_.store(true); } diff --git a/flex/engines/http_server/handler/graph_db_http_handler.h b/flex/engines/http_server/handler/graph_db_http_handler.h index a37df668b8c3..b5ecbc2a5913 100644 --- a/flex/engines/http_server/handler/graph_db_http_handler.h +++ b/flex/engines/http_server/handler/graph_db_http_handler.h @@ -26,16 +26,21 @@ namespace server { class StoppableHandler : public seastar::httpd::handler_base { public: + static constexpr const char* ACTOR_SCOPE_CANCEL_MESSAGE = + "Unable to send message"; StoppableHandler(uint32_t init_group_id, uint32_t max_group_id, uint32_t group_inc_step, uint32_t shard_concurrency) : is_cancelled_(false), cur_group_id_(init_group_id), max_group_id_(max_group_id), group_inc_step_(group_inc_step), - shard_concurrency_(shard_concurrency) {} + shard_concurrency_(shard_concurrency), + shard_id_(hiactor::local_shard_id()) {} inline bool is_stopped() const { return is_cancelled_; } + inline uint32_t shard_id() const { return shard_id_; } + virtual seastar::future<> stop() = 0; virtual bool start() = 0; @@ -47,7 +52,7 @@ class StoppableHandler : public seastar::httpd::handler_base { return seastar::make_ready_future<>(); } hiactor::scope_builder builder; - builder.set_shard(hiactor::local_shard_id()) + builder.set_shard(shard_id_) .enter_sub_scope(hiactor::scope(0)) .enter_sub_scope(hiactor::scope(cur_group_id_)); return hiactor::actor_engine() @@ -83,12 +88,9 @@ class StoppableHandler : public seastar::httpd::handler_base { } cur_group_id_ += group_inc_step_; hiactor::scope_builder builder; - builder.set_shard(hiactor::local_shard_id()) + builder.set_shard(shard_id_) .enter_sub_scope(hiactor::scope(0)) .enter_sub_scope(hiactor::scope(cur_group_id_)); - // for (unsigned i = 0; i < shard_concurrency_; ++i) { - // executor_refs_.emplace_back(builder.build_ref(i)); - // } func(builder); is_cancelled_ = false; // locked outside return true; @@ -98,6 +100,7 @@ class StoppableHandler : public seastar::httpd::handler_base { uint32_t cur_group_id_; const uint32_t max_group_id_, group_inc_step_; const uint32_t shard_concurrency_; + const uint32_t shard_id_; }; class graph_db_http_handler { @@ -122,6 +125,7 @@ class graph_db_http_handler { private: seastar::future<> set_routes(); + seastar::future<> stop_query_actors(size_t index); private: const uint16_t http_port_; diff --git a/flex/interactive/sdk/java/src/test/java/com/alibaba/graphscope/interactive/client/DriverTest.java b/flex/interactive/sdk/java/src/test/java/com/alibaba/graphscope/interactive/client/DriverTest.java index cd3199927b9f..28b4df3f70e7 100644 --- a/flex/interactive/sdk/java/src/test/java/com/alibaba/graphscope/interactive/client/DriverTest.java +++ b/flex/interactive/sdk/java/src/test/java/com/alibaba/graphscope/interactive/client/DriverTest.java @@ -415,8 +415,11 @@ public void test9CallCppProcedureJson() { .SIGNED_INT32)))); { // 1. call cpp procedure with graph id and procedure id, sync - Result resp = session.callProcedure(graphId, request); - assertOk(resp); + // call 10 times to make sure all shard is working. + for (int i = 0; i < 10; i++) { + Result resp = session.callProcedure(graphId, request); + assertOk(resp); + } } { // 2. call cpp procedure with graph id and procedure id, async diff --git a/flex/interactive/sdk/python/gs_interactive/client/result.py b/flex/interactive/sdk/python/gs_interactive/client/result.py index 4aa57cfc6066..eee6405dd790 100644 --- a/flex/interactive/sdk/python/gs_interactive/client/result.py +++ b/flex/interactive/sdk/python/gs_interactive/client/result.py @@ -54,6 +54,9 @@ def get_status(self): def get_status_message(self): return self.status.message + + def get_status(self): + return self.status @staticmethod def ok(value): diff --git a/flex/interactive/sdk/python/gs_interactive/client/status.py b/flex/interactive/sdk/python/gs_interactive/client/status.py index be7fcf162f7f..357639a2f2c5 100644 --- a/flex/interactive/sdk/python/gs_interactive/client/status.py +++ b/flex/interactive/sdk/python/gs_interactive/client/status.py @@ -56,6 +56,9 @@ def is_ok(self) -> bool: def is_error(self) -> bool: return self.status != StatusCode.OK + + def get_code(self): + return self.status @property def get_message(self): @@ -78,7 +81,10 @@ def from_exception(exception: ApiException): elif isinstance(exception, UnauthorizedException): return Status(StatusCode.BAD_REQUEST, str(exception)) elif isinstance(exception, ServiceException): - return Status(StatusCode.SERVER_INTERNAL_ERROR, str(exception)) + if (exception.status == 503): + return Status(StatusCode.SERVICE_UNAVAILABLE, str(exception)) + else: + return Status(StatusCode.SERVER_INTERNAL_ERROR, str(exception)) return Status( StatusCode.UNKNOWN, "Unknown Error from exception " + str(exception) ) diff --git a/flex/interactive/sdk/python/test/test_driver.py b/flex/interactive/sdk/python/test/test_driver.py index e5afde4f0670..36f6e1ad04e3 100644 --- a/flex/interactive/sdk/python/test/test_driver.py +++ b/flex/interactive/sdk/python/test/test_driver.py @@ -25,6 +25,7 @@ from gs_interactive.client.driver import Driver from gs_interactive.models import * +from gs_interactive.client.status import StatusCode class TestDriver(unittest.TestCase): @@ -77,6 +78,8 @@ def test_example(self): self.callProcedure() self.callProcedureWithHttp() self.callProcedureWithHttpCurrent() + # test stop the service, and submit queries + self.queryWithServiceStop() self.createDriver() def createGraph(self): @@ -328,6 +331,35 @@ def restart(self): assert resp.is_ok() print("get service status: ", resp.get_value()) + def queryWithServiceStop(self): + # stop service + print("stop service: ") + stop_res = self._sess.stop_service() + assert stop_res.is_ok() + # submit query on stopped service should raise exception + req = QueryRequest( + query_name=self._cpp_proc_name, + arguments=[ + TypedValue( + type=GSDataType(PrimitiveType(primitive_type="DT_SIGNED_INT32")), + value=1, + ) + ], + ) + resp = self._sess.call_procedure_current(params=req) + assert not resp.is_ok() + print("call procedure failed: ", resp.get_status_message()) + assert resp.get_status().get_code() == StatusCode.SERVICE_UNAVAILABLE + + # start service + print("start service: ") + start_res = self._sess.start_service( + start_service_request=StartServiceRequest(graph_id=self._graph_id) + ) + assert start_res.is_ok() + # wait 5 seconds + time + def restartOnNewGraph(self): original_graph_id = None status_res = self._sess.get_service_status()