From 01e59783cd145fe175fa6220bb3447a7c42cc342 Mon Sep 17 00:00:00 2001 From: "xiaolei.zl" Date: Sat, 21 Oct 2023 15:40:35 +0800 Subject: [PATCH] Fix query hang when two same queries are submitted --- flex/bin/sync_server.cc | 10 +- flex/engines/http_server/actor_system.cc | 8 + flex/engines/http_server/actor_system.h | 11 +- flex/engines/http_server/codegen_actor.act.cc | 89 ++++++++ flex/engines/http_server/codegen_actor.act.h | 43 ++++ flex/engines/http_server/codegen_proxy.cc | 199 ++++++++++-------- flex/engines/http_server/codegen_proxy.h | 22 +- flex/engines/http_server/executor.act.cc | 54 +---- flex/engines/http_server/executor.act.h | 2 +- flex/engines/http_server/hqps_http_handler.cc | 36 +++- flex/engines/http_server/hqps_service.cc | 8 +- flex/engines/http_server/hqps_service.h | 3 +- flex/engines/http_server/options.h | 1 + flex/engines/http_server/types.h | 1 + 14 files changed, 333 insertions(+), 154 deletions(-) create mode 100644 flex/engines/http_server/codegen_actor.act.cc create mode 100644 flex/engines/http_server/codegen_actor.act.h diff --git a/flex/bin/sync_server.cc b/flex/bin/sync_server.cc index 739d740bb26a..5d9ca2adf32c 100644 --- a/flex/bin/sync_server.cc +++ b/flex/bin/sync_server.cc @@ -182,7 +182,11 @@ int main(int argc, char** argv) { "codegen binary path")( "graph-config,g", bpo::value(), "graph schema config file")( "data-path,a", bpo::value(), "data directory path")( - "bulk-load,l", bpo::value(), "bulk-load config file"); + "bulk-load,l", bpo::value(), "bulk-load config file")( + "open-thread-resource-pool", bpo::value()->default_value(true), + "open thread resource pool")("worker-thread-number", + bpo::value()->default_value(2), + "worker thread number"); setenv("TZ", "Asia/Shanghai", 1); tzset(); @@ -253,7 +257,9 @@ int main(int argc, char** argv) { gs::init_codegen_proxy(vm, graph_schema_path, server_config_path); - server::HQPSService::get().init(shard_num, http_port, false); + server::HQPSService::get().init(shard_num, http_port, false, + vm["open-thread-resource-pool"].as(), + vm["worker-thread-number"].as()); server::HQPSService::get().run_and_wait_for_exit(); return 0; diff --git a/flex/engines/http_server/actor_system.cc b/flex/engines/http_server/actor_system.cc index 8196b8b129ef..06bbcc05188c 100644 --- a/flex/engines/http_server/actor_system.cc +++ b/flex/engines/http_server/actor_system.cc @@ -34,10 +34,18 @@ void actor_system::launch_worker() { char gateway[] = "--gw-ipv4-addr=172.24.255.253"; char net_mask[] = "--netmask-ipv4-addr=255.255.240.0"; char enable_dpdk[] = "--dpdk-pmd"; + char enable_thread_resource_pool[] = "--open-thread-resource-pool=true"; + char external_thread_num[32]; char shards[16]; snprintf(shards, sizeof(shards), "-c%d", num_shards_); + snprintf(external_thread_num, sizeof(external_thread_num), + "--worker-thread-number=%d", external_thread_num_); std::vector argv = {prog_name, shards}; + if (enable_thread_resource_pool_) { + argv.push_back(enable_thread_resource_pool); + argv.push_back(external_thread_num); + } if (enable_dpdk_) { argv.push_back(enable_native_stack); argv.push_back(close_dhcp); diff --git a/flex/engines/http_server/actor_system.h b/flex/engines/http_server/actor_system.h index 83858d8e6c5d..d96b87f399ed 100644 --- a/flex/engines/http_server/actor_system.h +++ b/flex/engines/http_server/actor_system.h @@ -26,8 +26,13 @@ namespace server { class actor_system { public: - actor_system(uint32_t num_shards, bool enable_dpdk) - : num_shards_(num_shards), enable_dpdk_(enable_dpdk) {} + actor_system(uint32_t num_shards, bool enable_dpdk, + bool enable_thread_resource_pool = false, + unsigned external_thread_num = 1) + : num_shards_(num_shards), + enable_dpdk_(enable_dpdk), + enable_thread_resource_pool_(enable_thread_resource_pool), + external_thread_num_(external_thread_num) {} ~actor_system(); void launch(); @@ -39,6 +44,8 @@ class actor_system { private: const uint32_t num_shards_; const bool enable_dpdk_; + const bool enable_thread_resource_pool_; + const unsigned external_thread_num_; std::unique_ptr main_thread_; std::atomic running_{false}; sem_t ready_; diff --git a/flex/engines/http_server/codegen_actor.act.cc b/flex/engines/http_server/codegen_actor.act.cc new file mode 100644 index 000000000000..f39a151606cf --- /dev/null +++ b/flex/engines/http_server/codegen_actor.act.cc @@ -0,0 +1,89 @@ +/** Copyright 2020 Alibaba Group Holding Limited. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include "flex/engines/http_server/codegen_actor.act.h" + +#include "flex/engines/graph_db/database/graph_db.h" +#include "flex/engines/graph_db/database/graph_db_session.h" +#include "flex/engines/http_server/codegen_proxy.h" +#include "flex/engines/http_server/stored_procedure.h" + +#include + +namespace server { + +codegen_actor::~codegen_actor() { + // finalization + // ... +} + +codegen_actor::codegen_actor(hiactor::actor_base* exec_ctx, + const hiactor::byte_t* addr) + : hiactor::actor(exec_ctx, addr) { + set_max_concurrency(1); // set max concurrency for task reentrancy + // (stateful) initialization + // ... +} + +seastar::future codegen_actor::do_codegen(query_param&& param) { + LOG(INFO) << "Running codegen for " << param.content.size(); + // The received query's pay load shoud be able to deserialze to physical plan + auto& str = param.content; + if (str.size() <= 0) { + LOG(INFO) << "Empty query"; + return seastar::make_exception_future( + std::runtime_error("Empty query string")); + } + + const char* str_data = str.data(); + size_t str_length = str.size(); + LOG(INFO) << "Deserialize physical job request" << str_length; + + physical::PhysicalPlan plan; + bool ret = plan.ParseFromArray(str_data, str_length); + if (ret) { + VLOG(10) << "Parse physical plan: " << plan.DebugString(); + } else { + LOG(ERROR) << "Fail to parse physical plan"; + return seastar::make_exception_future( + std::runtime_error("Fail to parse physical plan")); + } + + // 0. do codegen gen. + std::string lib_path = ""; + int32_t job_id = -1; + auto& codegen_proxy = server::CodegenProxy::get(); + if (codegen_proxy.Initialized()) { + return codegen_proxy.DoGen(plan).then( + [](std::pair&& job_id_and_lib_path) { + if (job_id_and_lib_path.first == -1) { + return seastar::make_exception_future( + std::runtime_error("Fail to parse job id from codegen proxy")); + } + // 1. load and run. + LOG(INFO) << "Okay, try to run the query of lib path: " + << job_id_and_lib_path.second + << ", job id: " << job_id_and_lib_path.first + << "local shard id: " << hiactor::local_shard_id(); + return seastar::make_ready_future( + std::move(job_id_and_lib_path)); + }); + } else { + return seastar::make_exception_future( + std::runtime_error("Codegen proxy not initialized")); + } +} + +} // namespace server diff --git a/flex/engines/http_server/codegen_actor.act.h b/flex/engines/http_server/codegen_actor.act.h new file mode 100644 index 000000000000..d2b45652888c --- /dev/null +++ b/flex/engines/http_server/codegen_actor.act.h @@ -0,0 +1,43 @@ +/** Copyright 2020 Alibaba Group Holding Limited. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#ifndef ENGINES_HTTP_SERVER_CODEGEN_ACTOR_ACT_H_ +#define ENGINES_HTTP_SERVER_CODEGEN_ACTOR_ACT_H_ + +#include "flex/engines/http_server/types.h" + +#include +#include + +namespace server { + +class ANNOTATION(actor:impl) codegen_actor : public hiactor::actor { + public: + codegen_actor(hiactor::actor_base* exec_ctx, const hiactor::byte_t* addr); + ~codegen_actor() override; + + seastar::future ANNOTATION(actor:method) do_codegen(query_param&& param); + + // DECLARE_RUN_QUERYS; + /// Declare `do_work` func here, no need to implement. + ACTOR_DO_WORK() + + private: + int32_t your_private_members_ = 0; +}; + +} // namespace serverP + +#endif // ENGINES_HTTP_SERVER_CODEGEN_ACTOR_ACT_H_ diff --git a/flex/engines/http_server/codegen_proxy.cc b/flex/engines/http_server/codegen_proxy.cc index 324c14bb01b3..3029d5d7dd0d 100644 --- a/flex/engines/http_server/codegen_proxy.cc +++ b/flex/engines/http_server/codegen_proxy.cc @@ -30,6 +30,11 @@ StoredProcedureLibMeta::StoredProcedureLibMeta(CodegenStatus status, std::string res_lib_path) : status(status), res_lib_path(res_lib_path) {} +std::string StoredProcedureLibMeta::to_string() const { + return "status: " + std::to_string(status) + + ", res_lib_path: " + res_lib_path; +} + CodegenProxy::CodegenProxy() : initialized_(false){}; CodegenProxy::~CodegenProxy() {} @@ -54,110 +59,122 @@ seastar::future> CodegenProxy::DoGen( const physical::PhysicalPlan& plan) { LOG(INFO) << "Start generating for query: "; auto next_job_id = plan.plan_id(); - auto work_dir = get_work_directory(next_job_id); - auto query_name = "query_" + std::to_string(next_job_id); - std::string plan_path = prepare_next_job_dir(work_dir, query_name, plan); - if (plan_path.empty()) { - return seastar::make_exception_future>( - std::runtime_error("Fail to prepare next job dir")); + + { + std::unique_lock lock(mutex_); + cv_.wait(lock, + [this, next_job_id] { return !check_job_running(next_job_id); }); } - if (job_id_2_procedures_.find(next_job_id) == job_id_2_procedures_.end() || - job_id_2_procedures_[next_job_id].status == CodegenStatus::FAILED) { - // Do gen. - { - // First lock - std::lock_guard lock(mutex_); - job_id_2_procedures_[next_job_id] = - StoredProcedureLibMeta{CodegenStatus::RUNNING, ""}; + return call_codegen_cmd(plan).then_wrapped([this, + next_job_id](auto&& future) { + int return_code; + try { + return_code = future.get(); + } catch (std::exception& e) { + LOG(ERROR) << "Compilation failed: " << e.what(); + return seastar::make_ready_future>( + std::make_pair(next_job_id, + std::string("Compilation failed: ") + e.what())); } - std::string res_lib_path = - call_codegen_cmd(plan_path, query_name, work_dir); - if (!std::filesystem::exists(res_lib_path)) { - LOG(ERROR) << "Res lib path " << res_lib_path - << " not exists, compilation failed"; - { - std::lock_guard lock(mutex_); - if (job_id_2_procedures_.find(next_job_id) != - job_id_2_procedures_.end()) { - job_id_2_procedures_[next_job_id].status = CodegenStatus::FAILED; - } else { - job_id_2_procedures_.emplace( - next_job_id, StoredProcedureLibMeta{CodegenStatus::FAILED}); - } - } + if (return_code != 0) { + LOG(ERROR) << "Codegen failed"; return seastar::make_exception_future>( std::runtime_error("Codegen failed")); - } else { - // Add res_lib_path to query_cache. - { - std::lock_guard lock(mutex_); - if (job_id_2_procedures_.find(next_job_id) != - job_id_2_procedures_.end()) { - job_id_2_procedures_[next_job_id].status = CodegenStatus::SUCCESS; - job_id_2_procedures_[next_job_id].res_lib_path = res_lib_path; - } else { - job_id_2_procedures_.emplace( - next_job_id, - StoredProcedureLibMeta{CodegenStatus::SUCCESS, res_lib_path}); - } - } } + return get_res_lib_path_from_cache(next_job_id); + }); +} + +seastar::future CodegenProxy::call_codegen_cmd( + const physical::PhysicalPlan& plan) { + // if the desired query lib for next_job_id is in cache, just return 0 + // otherwise, call codegen cmd + auto next_job_id = plan.plan_id(); + auto query_name = "query_" + std::to_string(next_job_id); + auto work_dir = get_work_directory(next_job_id); + std::string plan_path; + + if (job_id_2_procedures_.find(next_job_id) != job_id_2_procedures_.end() && + job_id_2_procedures_[next_job_id].status == CodegenStatus::SUCCESS) { + return seastar::make_ready_future(0); + } + + insert_or_update(next_job_id, CodegenStatus::RUNNING, ""); + + plan_path = prepare_next_job_dir(work_dir, query_name, plan); + if (plan_path.empty()) { + insert_or_update(next_job_id, CodegenStatus::FAILED, ""); + return seastar::make_exception_future(std::runtime_error( + "Fail to prepare next job dir for " + query_name + ", job id: " + + std::to_string(next_job_id) + ", plan path: " + plan_path)); } - return get_res_lib_path_from_cache(next_job_id); + std::string expected_res_lib_path = work_dir + "/lib" + query_name + ".so"; + return call_codegen_cmd_impl(plan_path, query_name, work_dir) + .then([this, next_job_id, expected_res_lib_path](int codegen_res) { + if (codegen_res != 0 || + !std::filesystem::exists(expected_res_lib_path)) { + LOG(ERROR) << "Expected lib path " << expected_res_lib_path + << " not exists, or compilation failure: " << codegen_res + << " compilation failed"; + + insert_or_update(next_job_id, CodegenStatus::FAILED, ""); + VLOG(10) << "Compilation failed, job id: " << next_job_id; + } else { + VLOG(10) << "Compilation success, job id: " << next_job_id; + insert_or_update(next_job_id, CodegenStatus::SUCCESS, + expected_res_lib_path); + } + { + std::lock_guard lock(mutex_); + cv_.notify_all(); + } + return seastar::make_ready_future(codegen_res); + }); } seastar::future> CodegenProxy::get_res_lib_path_from_cache(int32_t next_job_id) { - // status could be running, - int retry_times = 0; - volatile CodegenStatus codegen_status = - job_id_2_procedures_[next_job_id].status; - while (codegen_status == CodegenStatus::RUNNING && - retry_times < MAX_RETRY_TIMES) { - // wait for codegen to finish - std::this_thread::sleep_for(std::chrono::milliseconds(1000)); - VLOG(10) << "Waiting for codegen to finish: retry times:" << retry_times; - retry_times += 1; - codegen_status = job_id_2_procedures_[next_job_id].status; - } + // the entry must exists + StoredProcedureLibMeta meta = job_id_2_procedures_[next_job_id]; - if (retry_times >= MAX_RETRY_TIMES) { - LOG(ERROR) << "Codegen timeout"; - return seastar::make_exception_future>( - std::runtime_error("Codegen timeout")); - } - - if (job_id_2_procedures_[next_job_id].status != CodegenStatus::SUCCESS) { - LOG(ERROR) << "Invalid state: " - << std::to_string(job_id_2_procedures_[next_job_id].status) - << ", " << job_id_2_procedures_[next_job_id].res_lib_path + if (meta.status == CodegenStatus::SUCCESS) { + return seastar::make_ready_future>( + std::make_pair(next_job_id, meta.res_lib_path)); + } else { + LOG(ERROR) << "Invalid state: " << meta.to_string() << ", " << ", compilation failure"; return seastar::make_exception_future>( - std::runtime_error("Invalid state")); + std::runtime_error("Compilation failed, invalid state: " + + meta.to_string())); } - - return seastar::make_ready_future>( - std::make_pair(next_job_id, - job_id_2_procedures_[next_job_id].res_lib_path)); } -std::string CodegenProxy::call_codegen_cmd(const std::string& plan_path, - const std::string& query_name, - const std::string& work_dir) { +seastar::future CodegenProxy::call_codegen_cmd_impl( + const std::string& plan_path, const std::string& query_name, + const std::string& work_dir) { // TODO: different suffix for different platform - std::string res_lib_path = work_dir + "/lib" + query_name + ".so"; std::string cmd = codegen_bin_ + " -e=hqps " + " -i=" + plan_path + " -w=" + work_dir + " --ir_conf=" + ir_compiler_prop_ + " --graph_schema_path=" + compiler_graph_schema_; LOG(INFO) << "Start call codegen cmd: [" << cmd << "]"; - auto res = std::system(cmd.c_str()); - if (res != 0) { - LOG(ERROR) << "call codegen cmd failed: " << cmd; - return ""; - } - return res_lib_path; + + return hiactor::thread_resource_pool::submit_work([this, cmd] { + auto res = std::system(cmd.c_str()); + LOG(INFO) << "Codegen cmd: [" << cmd << "] return: " << res; + return res; + }) + .then_wrapped([](auto fut) { + VLOG(10) << "try"; + try { + VLOG(10) << "Got future "; + return seastar::make_ready_future(fut.get0()); + } catch (std::exception& e) { + LOG(ERROR) << "Compilation failed: " << e.what(); + return seastar::make_ready_future(-1); + } + }); } std::string CodegenProxy::get_work_directory(int32_t job_id) { @@ -166,6 +183,24 @@ std::string CodegenProxy::get_work_directory(int32_t job_id) { return work_dir; } +void CodegenProxy::insert_or_update(int32_t job_id, CodegenStatus status, + std::string path) { + if (job_id_2_procedures_.find(job_id) != job_id_2_procedures_.end()) { + job_id_2_procedures_[job_id].status = status; + job_id_2_procedures_[job_id].res_lib_path = path; + } else { + job_id_2_procedures_.emplace(job_id, StoredProcedureLibMeta{status, path}); + } +} + +bool CodegenProxy::check_job_running(int32_t job_id) { + if (job_id_2_procedures_.find(job_id) != job_id_2_procedures_.end()) { + return job_id_2_procedures_[job_id].status == CodegenStatus::RUNNING; + } else { + return false; + } +} + void CodegenProxy::ensure_dir_exists(const std::string& working_dir) { LOG(INFO) << "Ensuring [" << working_dir << "] exists "; std::filesystem::path path = working_dir; @@ -214,6 +249,4 @@ std::string CodegenProxy::prepare_next_job_dir( return plan_path; } -const int32_t CodegenProxy::MAX_RETRY_TIMES = 10; - } // namespace server diff --git a/flex/engines/http_server/codegen_proxy.h b/flex/engines/http_server/codegen_proxy.h index 6c356d3c6298..b738b2611a60 100644 --- a/flex/engines/http_server/codegen_proxy.h +++ b/flex/engines/http_server/codegen_proxy.h @@ -15,6 +15,7 @@ #ifndef ENGINES_HQPS_SERVER_CODEGEN_PROXY_H_ #define ENGINES_HQPS_SERVER_CODEGEN_PROXY_H_ +#include #include #include #include @@ -27,6 +28,8 @@ #include "flex/proto_generated_gie/job_service.pb.h" #include "flex/proto_generated_gie/physical.pb.h" +#include +#include #include namespace server { @@ -45,6 +48,8 @@ struct StoredProcedureLibMeta { StoredProcedureLibMeta(); StoredProcedureLibMeta(CodegenStatus status); StoredProcedureLibMeta(CodegenStatus status, std::string res_lib_path); + + std::string to_string() const; }; // Manages the codegen runner, process the incoming adhoc query, and output to @@ -77,15 +82,21 @@ class CodegenProxy { const physical::PhysicalPlan& plan); private: + seastar::future call_codegen_cmd(const physical::PhysicalPlan& plan); + + seastar::future call_codegen_cmd_impl(const std::string& plan_path, + const std::string& query_name, + const std::string& work_dir); + seastar::future> get_res_lib_path_from_cache( int32_t job_id); - std::string call_codegen_cmd(const std::string& plan_path, - const std::string& query_name, - const std::string& work_dir); - std::string get_work_directory(int32_t job_id); + void insert_or_update(int32_t job_id, CodegenStatus status, std::string path); + + bool check_job_running(int32_t job_id); + void ensure_dir_exists(const std::string& working_dir); void clear_dir(const std::string& working_dir); @@ -99,10 +110,9 @@ class CodegenProxy { std::string ir_compiler_prop_; std::string compiler_graph_schema_; std::mutex mutex_; + std::condition_variable cv_; std::unordered_map job_id_2_procedures_; bool initialized_; - - static const int32_t MAX_RETRY_TIMES; }; } // namespace server diff --git a/flex/engines/http_server/executor.act.cc b/flex/engines/http_server/executor.act.cc index 17c22e387634..917fd0270b20 100644 --- a/flex/engines/http_server/executor.act.cc +++ b/flex/engines/http_server/executor.act.cc @@ -72,54 +72,16 @@ seastar::future executor::run_hqps_procedure_query( } seastar::future executor::run_hqps_adhoc_query( - query_param&& param) { + adhoc_result&& param) { LOG(INFO) << "Run adhoc query"; // The received query's pay load shoud be able to deserialze to physical plan - auto& str = param.content; - if (str.size() <= 0) { - LOG(INFO) << "Empty query"; - return seastar::make_exception_future( - std::runtime_error("Empty query string")); - } - - const char* str_data = str.data(); - size_t str_length = str.size(); - LOG(INFO) << "Deserialize physical job request" << str_length; - - physical::PhysicalPlan plan; - bool ret = plan.ParseFromArray(str_data, str_length); - if (ret) { - VLOG(10) << "Parse physical plan: " << plan.DebugString(); - } else { - LOG(ERROR) << "Fail to parse physical plan"; - return seastar::make_exception_future( - std::runtime_error("Fail to parse physical plan")); - } - - // 0. do codegen gen. - std::string lib_path = ""; - int32_t job_id = -1; - auto& codegen_proxy = server::CodegenProxy::get(); - if (codegen_proxy.Initialized()) { - return codegen_proxy.DoGen(plan).then( - [](std::pair&& job_id_and_lib_path) { - if (job_id_and_lib_path.first == -1) { - return seastar::make_exception_future( - std::runtime_error("Fail to parse job id from codegen proxy")); - } - // 1. load and run. - LOG(INFO) << "Okay, try to run the query of lib path: " - << job_id_and_lib_path.second - << ", job id: " << job_id_and_lib_path.first - << "local shard id: " << hiactor::local_shard_id(); - seastar::sstring content = server::load_and_run( - job_id_and_lib_path.first, job_id_and_lib_path.second); - return seastar::make_ready_future(std::move(content)); - }); - } else { - return seastar::make_exception_future( - std::runtime_error("Codegen proxy not initialized")); - } + // 1. load and run. + auto& content = param.content; + LOG(INFO) << "Okay, try to run the query of lib path: " << content.second + << ", job id: " << content.first + << "local shard id: " << hiactor::local_shard_id(); + seastar::sstring result = server::load_and_run(content.first, content.second); + return seastar::make_ready_future(std::move(result)); } } // namespace server diff --git a/flex/engines/http_server/executor.act.h b/flex/engines/http_server/executor.act.h index cf722586f2cb..0d67e3033866 100644 --- a/flex/engines/http_server/executor.act.h +++ b/flex/engines/http_server/executor.act.h @@ -32,7 +32,7 @@ class ANNOTATION(actor:impl) executor : public hiactor::actor { seastar::future ANNOTATION(actor:method) run_hqps_procedure_query(query_param&& param); - seastar::future ANNOTATION(actor:method) run_hqps_adhoc_query(query_param&& param); + seastar::future ANNOTATION(actor:method) run_hqps_adhoc_query(adhoc_result&& param); // DECLARE_RUN_QUERYS; /// Declare `do_work` func here, no need to implement. diff --git a/flex/engines/http_server/hqps_http_handler.cc b/flex/engines/http_server/hqps_http_handler.cc index 9a6f375d2fed..40a4897117e4 100644 --- a/flex/engines/http_server/hqps_http_handler.cc +++ b/flex/engines/http_server/hqps_http_handler.cc @@ -19,6 +19,7 @@ #include #include #include +#include "flex/engines/http_server/generated/codegen_actor_ref.act.autogen.h" #include "flex/engines/http_server/generated/executor_ref.act.autogen.h" #include "flex/engines/http_server/types.h" @@ -79,15 +80,26 @@ class hqps_ic_handler : public seastar::httpd::handler_base { // a handler for handl adhoc query. class hqps_adhoc_query_handler : public seastar::httpd::handler_base { public: - hqps_adhoc_query_handler(uint32_t group_id, uint32_t shard_concurrency) + hqps_adhoc_query_handler(uint32_t group_id, uint32_t codegen_actor_group_id, + uint32_t shard_concurrency) : shard_concurrency_(shard_concurrency), executor_idx_(0) { executor_refs_.reserve(shard_concurrency_); - hiactor::scope_builder builder; - builder.set_shard(hiactor::local_shard_id()) - .enter_sub_scope(hiactor::scope(0)) - .enter_sub_scope(hiactor::scope(group_id)); - for (unsigned i = 0; i < shard_concurrency_; ++i) { - executor_refs_.emplace_back(builder.build_ref(i)); + { + hiactor::scope_builder builder; + builder.set_shard(hiactor::local_shard_id()) + .enter_sub_scope(hiactor::scope(0)) + .enter_sub_scope(hiactor::scope(group_id)); + for (unsigned i = 0; i < shard_concurrency_; ++i) { + executor_refs_.emplace_back(builder.build_ref(i)); + } + } + { + hiactor::scope_builder builder; + builder.set_shard(hiactor::local_shard_id()) + .enter_sub_scope(hiactor::scope(0)) + .enter_sub_scope( + hiactor::scope(codegen_actor_group_id)); + codegen_actor_ref_ = builder.build_ref(0); } } ~hqps_adhoc_query_handler() override = default; @@ -99,8 +111,11 @@ class hqps_adhoc_query_handler : public seastar::httpd::handler_base { auto dst_executor = executor_idx_; executor_idx_ = (executor_idx_ + 1) % shard_concurrency_; - return executor_refs_[dst_executor] - .run_hqps_adhoc_query(query_param{std::move(req->content)}) + return codegen_actor_ref_.do_codegen(query_param{std::move(req->content)}) + .then([this, dst_executor](auto&& param) { + return executor_refs_[dst_executor].run_hqps_adhoc_query( + std::move(param)); + }) .then_wrapped([rep = std::move(rep)]( seastar::future&& fut) mutable { if (__builtin_expect(fut.failed(), false)) { @@ -127,6 +142,7 @@ class hqps_adhoc_query_handler : public seastar::httpd::handler_base { const uint32_t shard_concurrency_; uint32_t executor_idx_; std::vector executor_refs_; + codegen_actor_ref codegen_actor_ref_; }; class hqps_exit_handler : public seastar::httpd::handler_base { @@ -177,7 +193,7 @@ seastar::future<> hqps_http_handler::set_routes() { new hqps_ic_handler(ic_query_group_id, shard_query_concurrency)); r.add(seastar::httpd::operation_type::POST, seastar::httpd::url("/interactive/adhoc_query"), - new hqps_adhoc_query_handler(ic_adhoc_group_id, + new hqps_adhoc_query_handler(ic_adhoc_group_id, codegen_group_id, shard_adhoc_concurrency)); r.add(seastar::httpd::operation_type::POST, seastar::httpd::url("/interactive/exit"), new hqps_exit_handler()); diff --git a/flex/engines/http_server/hqps_service.cc b/flex/engines/http_server/hqps_service.cc index f09b3dc56220..e503eea293a0 100644 --- a/flex/engines/http_server/hqps_service.cc +++ b/flex/engines/http_server/hqps_service.cc @@ -16,9 +16,11 @@ #include "flex/engines/http_server/options.h" namespace server { -void HQPSService::init(uint32_t num_shards, uint16_t http_port, - bool dpdk_mode) { - actor_sys_ = std::make_unique(num_shards, dpdk_mode); +void HQPSService::init(uint32_t num_shards, uint16_t http_port, bool dpdk_mode, + bool enable_thread_resource_pool, + unsigned external_thread_num) { + actor_sys_ = std::make_unique( + num_shards, dpdk_mode, enable_thread_resource_pool, external_thread_num); http_hdl_ = std::make_unique(http_port); } diff --git a/flex/engines/http_server/hqps_service.h b/flex/engines/http_server/hqps_service.h index 4a7987e1e32c..49e2776294bc 100644 --- a/flex/engines/http_server/hqps_service.h +++ b/flex/engines/http_server/hqps_service.h @@ -31,7 +31,8 @@ class HQPSService { ~HQPSService(); // the store procedure contains - void init(uint32_t num_shards, uint16_t http_port, bool dpdk_mode); + void init(uint32_t num_shards, uint16_t http_port, bool dpdk_mode, + bool enable_thread_resource_pool, unsigned external_thread_num); void run_and_wait_for_exit(); void set_exit_state(); diff --git a/flex/engines/http_server/options.h b/flex/engines/http_server/options.h index 642018eedbc9..7fbdde48a700 100644 --- a/flex/engines/http_server/options.h +++ b/flex/engines/http_server/options.h @@ -24,6 +24,7 @@ namespace server { const uint32_t ic_query_group_id = 1; const uint32_t ic_update_group_id = 2; const uint32_t ic_adhoc_group_id = 3; +const uint32_t codegen_group_id = 4; extern uint32_t shard_query_concurrency; extern uint32_t shard_update_concurrency; diff --git a/flex/engines/http_server/types.h b/flex/engines/http_server/types.h index 8c53a5f39337..692670b00291 100644 --- a/flex/engines/http_server/types.h +++ b/flex/engines/http_server/types.h @@ -52,6 +52,7 @@ struct payload { using query_param = payload; using query_result = payload; +using adhoc_result = payload>; } // namespace server