diff --git a/.github/workflows/interactive.yml b/.github/workflows/interactive.yml
index 28b495de5147..37408f956d9c 100644
--- a/.github/workflows/interactive.yml
+++ b/.github/workflows/interactive.yml
@@ -291,6 +291,18 @@ jobs:
./modern_graph_schema_v0_0.yaml ./modern_graph_schema_v0_1.yaml
sed -i 's/temp_workspace/interactive_workspace/g' ${GITHUB_WORKSPACE}/flex/tests/hqps/engine_config_test.yaml
sed -i 's/default_graph: modern_graph/default_graph: movies/g' ${GITHUB_WORKSPACE}/flex/tests/hqps/engine_config_test.yaml
+
+ - name: Let compiler use latest interactive java sdk
+ env:
+ HOME: /home/graphscope/
+ run: |
+ . /home/graphscope/.graphscope_env
+ . /home/graphscope/.cargo/env
+ # replace the 0.4 with the latest version in flex/interactive/sdk/java/pom.xml
+ sdk_version=$(grep -oPm1 "(?<=)[^<]+" ${GITHUB_WORKSPACE}/flex/interactive/sdk/java/pom.xml)
+ sed -i "s/.*<\/interactive.sdk.version>/${sdk_version}<\/interactive.sdk.version>/" ${GITHUB_WORKSPACE}/interactive_engine/pom.xml
+ cd ${GITHUB_WORKSPACE}/interactive_engine/
+ mvn clean install -Pexperimental -DskipTests
- name: Run End-to-End cypher adhoc ldbc query test
env:
@@ -375,14 +387,13 @@ jobs:
mkdir build && cd build # only test default build
cmake .. -DCMAKE_BUILD_TYPE=DEBUG -DBUILD_DOC=OFF && sudo make -j 4
- # test the different combination of cmake options: -DBUILD_HQPS=ON/OFF -DBUILD_TEST=ON/OFF, -DBUILD_ODPS_FRAGMENT_LOADER=ON/OFF
+ # test the different combination of cmake options: -DBUILD_TEST=ON/OFF, -DBUILD_ODPS_FRAGMENT_LOADER=ON/OFF
test-cmake-options:
runs-on: ubuntu-20.04
container:
image: registry.cn-hongkong.aliyuncs.com/graphscope/graphscope-dev:v0.23.0
strategy:
matrix:
- BUILD_HQPS: [ON, OFF]
BUILD_TEST: [ON, OFF]
BUILD_ODPS_FRAGMENT_LOADER: [ON, OFF]
steps:
@@ -393,7 +404,7 @@ jobs:
cd ${GITHUB_WORKSPACE}/flex
git submodule update --init
mkdir build && cd build
- cmake .. -DBUILD_HQPS=${{ matrix.BUILD_HQPS }} -DBUILD_TEST=${{ matrix.BUILD_TEST }} \
+ cmake .. -DBUILD_TEST=${{ matrix.BUILD_TEST }} \
-DBUILD_ODPS_FRAGMENT_LOADER=${{ matrix.BUILD_ODPS_FRAGMENT_LOADER }}
sudo make -j4
diff --git a/flex/CMakeLists.txt b/flex/CMakeLists.txt
index 36f4228931d5..dd906d667f71 100644
--- a/flex/CMakeLists.txt
+++ b/flex/CMakeLists.txt
@@ -10,14 +10,12 @@ project (
LANGUAGES CXX)
-option(BUILD_HQPS "Whether to build HighQPS Engine" ON)
option(BUILD_TEST "Whether to build test" ON)
option(BUILD_DOC "Whether to build doc" OFF)
option(BUILD_ODPS_FRAGMENT_LOADER "Whether to build odps fragment loader" OFF)
option(USE_PTHASH "Whether to use pthash" OFF)
#print options
-message(STATUS "Build HighQPS Engine: ${BUILD_HQPS}")
message(STATUS "Build test: ${BUILD_TEST}")
message(STATUS "Build doc: ${BUILD_DOC}")
message(STATUS "Build odps fragment loader: ${BUILD_ODPS_FRAGMENT_LOADER}")
@@ -37,11 +35,6 @@ set(CMAKE_BUILD_WITH_INSTALL_RPATH FALSE)
set(CMAKE_INSTALL_RPATH "${CMAKE_INSTALL_PREFIX}/lib")
set(CMAKE_INSTALL_RPATH_USE_LINK_PATH TRUE)
-if (BUILD_HQPS)
- message("Build HighQPS Engine")
- add_definitions(-DBUILD_HQPS)
-endif ()
-
if(USE_PTHASH)
message("Use PTHash")
add_definitions(-DUSE_PTHASH)
diff --git a/flex/bin/CMakeLists.txt b/flex/bin/CMakeLists.txt
index cb3b1a271188..4a007d692316 100644
--- a/flex/bin/CMakeLists.txt
+++ b/flex/bin/CMakeLists.txt
@@ -19,18 +19,18 @@ add_executable(flex_analytical_engine flex_analytical_engine.cc)
target_link_libraries(flex_analytical_engine flex_immutable_graph flex_bsp ${GLOG_LIBRARIES} ${GFLAGS_LIBRARIES})
install_without_export_flex_target(flex_analytical_engine)
-if(BUILD_HQPS)
- if(Hiactor_FOUND)
- add_executable(interactive_server interactive_server.cc)
- target_link_libraries(interactive_server flex_utils flex_graph_db flex_server hqps_plan_proto flex_utils ${GLOG_LIBRARIES} ${GFLAGS_LIBRARIES})
- if (OPENTELEMETRY_CPP_FOUND)
- target_link_libraries(interactive_server otel)
- endif()
- install_without_export_flex_target(interactive_server)
+
+if(Hiactor_FOUND)
+ add_executable(interactive_server interactive_server.cc)
+ target_link_libraries(interactive_server flex_utils flex_graph_db flex_server hqps_plan_proto flex_utils ${GLOG_LIBRARIES} ${GFLAGS_LIBRARIES})
+ if (OPENTELEMETRY_CPP_FOUND)
+ target_link_libraries(interactive_server otel)
endif()
- # install the script
- install(PROGRAMS load_plan_and_gen.sh DESTINATION bin)
+ install_without_export_flex_target(interactive_server)
endif()
+# install the script
+install(PROGRAMS load_plan_and_gen.sh DESTINATION bin)
+
include_directories(${Boost_INCLUDE_DIRS})
add_executable(bulk_loader bulk_loader.cc)
diff --git a/flex/bin/interactive_server.cc b/flex/bin/interactive_server.cc
index 80132adb1d57..32fc5e67e13a 100644
--- a/flex/bin/interactive_server.cc
+++ b/flex/bin/interactive_server.cc
@@ -18,7 +18,7 @@
#include "stdlib.h"
#include "flex/engines/http_server/codegen_proxy.h"
-#include "flex/engines/http_server/service/hqps_service.h"
+#include "flex/engines/http_server/graph_db_service.h"
#include "flex/engines/http_server/workdir_manipulator.h"
#include "flex/otel/otel.h"
#include "flex/storages/rt_mutable_graph/loading_config.h"
@@ -64,7 +64,7 @@ void blockSignal(int sig) {
}
// When graph_schema is not specified, codegen proxy will use the running graph
-// schema in hqps_service
+// schema in graph_db_service
void init_codegen_proxy(const bpo::variables_map& vm,
const std::string& engine_config_file,
const std::string& graph_schema_file = "") {
@@ -203,6 +203,7 @@ int main(int argc, char** argv) {
service_config.start_admin_service = vm["enable-admin-service"].as();
service_config.start_compiler = vm["start-compiler"].as();
service_config.memory_level = vm["memory-level"].as();
+ service_config.enable_adhoc_handler = true;
auto& db = gs::GraphDB::get();
@@ -266,8 +267,8 @@ int main(int argc, char** argv) {
}
}
- server::HQPSService::get().init(service_config);
- server::HQPSService::get().run_and_wait_for_exit();
+ server::GraphDBService::get().init(service_config);
+ server::GraphDBService::get().run_and_wait_for_exit();
#ifdef HAVE_OPENTELEMETRY_CPP
otel::cleanUpTracer();
diff --git a/flex/bin/rt_bench.cc b/flex/bin/rt_bench.cc
index 85421c12bdcd..de47b3e3570f 100644
--- a/flex/bin/rt_bench.cc
+++ b/flex/bin/rt_bench.cc
@@ -24,7 +24,7 @@
#include "flex/engines/graph_db/database/graph_db.h"
#include "flex/engines/http_server/executor_group.actg.h"
#include "flex/engines/http_server/generated/actor/executor_ref.act.autogen.h"
-#include "flex/engines/http_server/service/graph_db_service.h"
+#include "flex/engines/http_server/graph_db_service.h"
namespace bpo = boost::program_options;
using namespace std::chrono_literals;
diff --git a/flex/bin/rt_server.cc b/flex/bin/rt_server.cc
index 608bc1f84306..cfa17f092a9c 100644
--- a/flex/bin/rt_server.cc
+++ b/flex/bin/rt_server.cc
@@ -16,8 +16,8 @@
#include "grape/util.h"
#include "flex/engines/graph_db/database/graph_db.h"
+#include "flex/engines/http_server/graph_db_service.h"
#include "flex/engines/http_server/options.h"
-#include "flex/engines/http_server/service/graph_db_service.h"
#include
#include
@@ -99,7 +99,14 @@ int main(int argc, char** argv) {
// start service
LOG(INFO) << "GraphScope http server start to listen on port " << http_port;
- server::GraphDBService::get().init(shard_num, http_port, enable_dpdk);
+
+ server::ServiceConfig service_config;
+ service_config.shard_num = shard_num;
+ service_config.dpdk_mode = enable_dpdk;
+ service_config.query_port = http_port;
+ service_config.start_admin_service = false;
+ service_config.start_compiler = false;
+ server::GraphDBService::get().init(service_config);
server::GraphDBService::get().run_and_wait_for_exit();
return 0;
diff --git a/flex/codegen/CMakeLists.txt b/flex/codegen/CMakeLists.txt
index 875e0cca5906..02b5d0b6f0ea 100644
--- a/flex/codegen/CMakeLists.txt
+++ b/flex/codegen/CMakeLists.txt
@@ -3,8 +3,6 @@ find_package(Boost REQUIRED COMPONENTS system filesystem
context program_options regex thread)
include_directories(SYSTEM ${Boost_INCLUDE_DIRS})
-if (BUILD_HQPS)
- add_executable(gen_code_from_plan gen_code_from_plan.cc)
- target_link_libraries(gen_code_from_plan flex_rt_mutable_graph flex_utils hqps_plan_proto ${GLOG_LIBRARIES} ${Boost_LIBRARIES})
- install_flex_target(gen_code_from_plan)
-endif()
+add_executable(gen_code_from_plan gen_code_from_plan.cc)
+target_link_libraries(gen_code_from_plan flex_rt_mutable_graph flex_utils hqps_plan_proto ${GLOG_LIBRARIES} ${Boost_LIBRARIES})
+install_flex_target(gen_code_from_plan)
diff --git a/flex/engines/CMakeLists.txt b/flex/engines/CMakeLists.txt
index 2438f1d51548..e14100c8c29e 100644
--- a/flex/engines/CMakeLists.txt
+++ b/flex/engines/CMakeLists.txt
@@ -2,8 +2,5 @@
add_subdirectory(graph_db)
add_subdirectory(bsp)
add_subdirectory(http_server)
-message(STATUS "BUILD_HQPS: ${BUILD_HQPS}")
-if (BUILD_HQPS)
- add_subdirectory(hqps_db)
-endif()
+add_subdirectory(hqps_db)
diff --git a/flex/engines/graph_db/CMakeLists.txt b/flex/engines/graph_db/CMakeLists.txt
index 852af87019b8..9515597d94a0 100644
--- a/flex/engines/graph_db/CMakeLists.txt
+++ b/flex/engines/graph_db/CMakeLists.txt
@@ -1,16 +1,11 @@
file(GLOB_RECURSE GRAPH_DB_SRC_FILES "${CMAKE_CURRENT_SOURCE_DIR}/app/*.cc"
"${CMAKE_CURRENT_SOURCE_DIR}/database/*.cc")
-if (NOT BUILD_HQPS)
- list(FILTER GRAPH_DB_SRC_FILES EXCLUDE REGEX ".*hqps_app*.")
-endif()
add_library(flex_graph_db SHARED ${GRAPH_DB_SRC_FILES})
target_include_directories(flex_graph_db PUBLIC $)
target_link_libraries(flex_graph_db flex_rt_mutable_graph flex_utils ${GLOG_LIBRARIES} ${LIBGRAPELITE_LIBRARIES} ${CMAKE_THREAD_LIBS_INIT})
-if (BUILD_HQPS)
- target_link_libraries(flex_graph_db hqps_plan_proto)
-endif()
+target_link_libraries(flex_graph_db hqps_plan_proto)
install_flex_target(flex_graph_db)
install(FILES ${CMAKE_CURRENT_SOURCE_DIR}/database/graph_db.h
@@ -28,8 +23,7 @@ install(FILES ${CMAKE_CURRENT_SOURCE_DIR}/database/graph_db.h
install(FILES ${CMAKE_CURRENT_SOURCE_DIR}/app/app_base.h
DESTINATION include/flex/engines/graph_db/app)
-if (BUILD_HQPS)
- install(FILES ${CMAKE_CURRENT_SOURCE_DIR}/app/hqps_app.h
- DESTINATION include/flex/engines/graph_db/app)
-endif()
+install(FILES ${CMAKE_CURRENT_SOURCE_DIR}/app/hqps_app.h
+ DESTINATION include/flex/engines/graph_db/app)
+
diff --git a/flex/engines/graph_db/database/graph_db.cc b/flex/engines/graph_db/database/graph_db.cc
index e761a83c3551..16ad5826dffd 100644
--- a/flex/engines/graph_db/database/graph_db.cc
+++ b/flex/engines/graph_db/database/graph_db.cc
@@ -404,12 +404,10 @@ void GraphDB::initApps(
}
// Builtin apps
app_factories_[0] = std::make_shared();
-#ifdef BUILD_HQPS
app_factories_[Schema::HQPS_ADHOC_READ_PLUGIN_ID] =
std::make_shared();
app_factories_[Schema::HQPS_ADHOC_WRITE_PLUGIN_ID] =
std::make_shared();
-#endif // BUILD_HQPS
size_t valid_plugins = 0;
for (auto& path_and_index : plugins) {
diff --git a/flex/engines/graph_db/database/graph_db_session.cc b/flex/engines/graph_db/database/graph_db_session.cc
index 979b14ec55a4..1e99cd03b333 100644
--- a/flex/engines/graph_db/database/graph_db_session.cc
+++ b/flex/engines/graph_db/database/graph_db_session.cc
@@ -20,10 +20,8 @@
#include "flex/engines/graph_db/database/graph_db_session.h"
#include "flex/utils/app_utils.h"
-#ifdef BUILD_HQPS
#include "flex/proto_generated_gie/stored_procedure.pb.h"
#include "nlohmann/json.hpp"
-#endif // BUILD_HQPS
namespace gs {
@@ -229,7 +227,6 @@ AppBase* GraphDBSession::GetApp(int type) {
#undef likely // likely
-#ifdef BUILD_HQPS
Result>
GraphDBSession::parse_query_type_from_cypher_json(
const std::string_view& str_view) {
@@ -282,7 +279,6 @@ GraphDBSession::parse_query_type_from_cypher_internal(
}
return std::make_pair(app_name_to_path_index.at(query_name).second, str_view);
}
-#endif
const AppMetric& GraphDBSession::GetAppMetric(int idx) const {
return app_metrics_[idx];
diff --git a/flex/engines/graph_db/database/graph_db_session.h b/flex/engines/graph_db/database/graph_db_session.h
index 5d21f13ec28c..e333335c0cec 100644
--- a/flex/engines/graph_db/database/graph_db_session.h
+++ b/flex/engines/graph_db/database/graph_db_session.h
@@ -37,21 +37,17 @@ class GraphDBSession {
public:
enum class InputFormat : uint8_t {
kCppEncoder = 0,
-#ifdef BUILD_HQPS
- kCypherJson = 1, // External usage format
- kCypherInternalAdhoc = 2, // Internal format for adhoc query
- kCypherInternalProcedure = 3, // Internal format for procedure
-#endif // BUILD_HQPS
+ kCypherJson = 1, // Json format for cypher query
+ kCypherProtoAdhoc = 2, // Protobuf format for adhoc query
+ kCypherProtoProcedure = 3, // Protobuf format for procedure query
};
static constexpr int32_t MAX_RETRY = 3;
static constexpr int32_t MAX_PLUGIN_NUM = 256; // 2^(sizeof(uint8_t)*8)
-#ifdef BUILD_HQPS
- static constexpr const char* kCppEncoder = "\x00";
- static constexpr const char* kCypherJson = "\x01";
- static constexpr const char* kCypherInternalAdhoc = "\x02";
- static constexpr const char* kCypherInternalProcedure = "\x03";
-#endif // BUILD_HQPS
+ static constexpr const char* kCppEncoderStr = "\x00";
+ static constexpr const char* kCypherJsonStr = "\x01";
+ static constexpr const char* kCypherProtoAdhocStr = "\x02";
+ static constexpr const char* kCypherProtoProcedureStr = "\x03";
GraphDBSession(GraphDB& db, Allocator& alloc, WalWriter& logger,
const std::string& work_dir, int thread_id)
: db_(db),
@@ -110,12 +106,10 @@ class GraphDBSession {
AppBase* GetApp(int idx);
private:
-#ifdef BUILD_HQPS
Result>
parse_query_type_from_cypher_json(const std::string_view& input);
Result>
parse_query_type_from_cypher_internal(const std::string_view& input);
-#endif // BUILD_HQPS
/**
* @brief Parse the input format of the query.
* There are four formats:
@@ -154,10 +148,8 @@ class GraphDBSession {
// user-defined payload,
return std::make_pair((uint8_t) input[len - 2],
std::string_view(str_data, len - 2));
- }
-#ifdef BUILD_HQPS
- else if (input_tag ==
- static_cast(InputFormat::kCypherInternalAdhoc)) {
+ } else if (input_tag ==
+ static_cast(InputFormat::kCypherProtoAdhoc)) {
// For cypher internal adhoc, the query id is the
// second last byte,which is fixed to 255, and other bytes are a string
// representing the path to generated dynamic lib.
@@ -169,15 +161,13 @@ class GraphDBSession {
std::string_view str_view(input.data(), len - 1);
return parse_query_type_from_cypher_json(str_view);
} else if (input_tag ==
- static_cast(InputFormat::kCypherInternalProcedure)) {
+ static_cast(InputFormat::kCypherProtoProcedure)) {
// For cypher internal procedure, the query_name is
// provided in the protobuf message.
std::string_view str_view(input.data(), len - 1);
return parse_query_type_from_cypher_internal(str_view);
- }
-#endif // BUILD_HQPS
- else {
+ } else {
return Result>(
gs::Status(StatusCode::InValidArgument,
"Invalid input tag: " + std::to_string(input_tag)));
diff --git a/flex/engines/hqps_db/app/interactive_app_base.h b/flex/engines/hqps_db/app/interactive_app_base.h
index 3265368c4637..e9465145e4ca 100644
--- a/flex/engines/hqps_db/app/interactive_app_base.h
+++ b/flex/engines/hqps_db/app/interactive_app_base.h
@@ -16,8 +16,6 @@
#ifndef ENGINES_HQPS_DB_APP_INTERACTIVE_APP_BASE_H_
#define ENGINES_HQPS_DB_APP_INTERACTIVE_APP_BASE_H_
-#ifdef BUILD_HQPS
-
#include "flex/engines/graph_db/app/app_base.h"
#include "flex/proto_generated_gie/results.pb.h"
#include "flex/proto_generated_gie/stored_procedure.pb.h"
@@ -234,6 +232,4 @@ class CypherInternalPbWriteAppBase : public WriteAppBase {
} // namespace gs
-#endif // BUILD_HQPS
-
#endif // ENGINES_HQPS_DB_APP_INTERACTIVE_APP_BASE_H_
diff --git a/flex/engines/http_server/CMakeLists.txt b/flex/engines/http_server/CMakeLists.txt
index 07cd7034d813..315e4aed4f0a 100644
--- a/flex/engines/http_server/CMakeLists.txt
+++ b/flex/engines/http_server/CMakeLists.txt
@@ -6,22 +6,10 @@ if (Hiactor_FOUND)
SOURCE_DIR ${CMAKE_CURRENT_SOURCE_DIR}/
INCLUDE_PATHS ${Hiactor_INCLUDE_DIR},${CMAKE_CURRENT_SOURCE_DIR}/../../../,${CMAKE_CURRENT_SOURCE_DIR}/../../third_party/nlohmann-json/single_include/)
- if (NOT BUILD_HQPS)
- list(FILTER server_actor_autogen_files EXCLUDE REGEX ".*admin.*")
- list(FILTER server_actor_autogen_files EXCLUDE REGEX ".*codegen.*")
- endif ()
-
# get all .cc files in current directory, except for generated/
file(GLOB_RECURSE SERVER_FILES "${CMAKE_CURRENT_SOURCE_DIR}/*.cc")
list(FILTER SERVER_FILES EXCLUDE REGEX ".*generated.*")
- if (NOT BUILD_HQPS)
- list(FILTER SERVER_FILES EXCLUDE REGEX ".*admin*")
- list(FILTER SERVER_FILES EXCLUDE REGEX ".*hqps*")
- list(FILTER SERVER_FILES EXCLUDE REGEX ".*codegen*")
- list(FILTER SERVER_FILES EXCLUDE REGEX ".*workdir_manipulator*")
- endif ()
-
add_library(flex_server STATIC ${SERVER_FILES} ${server_actor_autogen_files})
add_dependencies(flex_server server_actor_autogen)
target_compile_options (flex_server
@@ -42,10 +30,8 @@ if (Hiactor_FOUND)
endif ()
set_target_properties(Hiactor::seastar PROPERTIES INTERFACE_COMPILE_OPTIONS "${seastar_options}")
- if (BUILD_HQPS)
- target_include_directories(flex_server PUBLIC ${CMAKE_CURRENT_BINARY_DIR}/../hqps/)
- target_link_libraries(flex_server hqps_plan_proto)
- endif ()
+ target_include_directories(flex_server PUBLIC ${CMAKE_CURRENT_BINARY_DIR}/../hqps/)
+ target_link_libraries(flex_server hqps_plan_proto)
if (OPENTELEMETRY_CPP_FOUND)
target_link_libraries(flex_server otel)
endif()
diff --git a/flex/engines/http_server/actor/admin_actor.act.cc b/flex/engines/http_server/actor/admin_actor.act.cc
index b6832289288a..f93d84ff1664 100644
--- a/flex/engines/http_server/actor/admin_actor.act.cc
+++ b/flex/engines/http_server/actor/admin_actor.act.cc
@@ -20,7 +20,7 @@
#include "flex/engines/graph_db/database/graph_db.h"
#include "flex/engines/graph_db/database/graph_db_session.h"
#include "flex/engines/http_server/codegen_proxy.h"
-#include "flex/engines/http_server/service/hqps_service.h"
+#include "flex/engines/http_server/graph_db_service.h"
#include "flex/engines/http_server/workdir_manipulator.h"
#include "flex/utils/service_utils.h"
#include "nlohmann/json.hpp"
@@ -211,7 +211,7 @@ gs::Result invoke_loading_graph(
seastar::future invoke_creating_procedure(
std::shared_ptr metadata_store,
const std::string& graph_id, const std::string& plugin_creation_parameter) {
- auto& hqps_service = HQPSService::get();
+ auto& graph_db_service = GraphDBService::get();
// First create a plugin meta to get the plugin id, then do the real
// creation.
nlohmann::json json;
@@ -244,7 +244,7 @@ seastar::future invoke_creating_procedure(
return server::WorkDirManipulator::CreateProcedure(
graph_id, plugin_id, json,
- hqps_service.get_service_config().engine_config_path)
+ graph_db_service.get_service_config().engine_config_path)
.then_wrapped([graph_id = graph_id, old_plugin_id = plugin_id,
json = json, metadata_store = metadata_store](auto&& f) {
std::string proc_id;
@@ -375,9 +375,9 @@ admin_actor::admin_actor(hiactor::actor_base* exec_ctx,
set_max_concurrency(1); // set max concurrency for task reentrancy (stateful)
// initialization
// ...
- auto& hqps_service = HQPSService::get();
+ auto& graph_db_service = GraphDBService::get();
// meta_data_ should be thread safe.
- metadata_store_ = hqps_service.get_metadata_store();
+ metadata_store_ = graph_db_service.get_metadata_store();
}
// Create a new Graph with the passed graph config.
@@ -1009,67 +1009,68 @@ seastar::future admin_actor::start_service(
// First Stop query_handler's actors.
- auto& hqps_service = HQPSService::get();
- return hqps_service.stop_query_actors().then([this, prev_lock, graph_name,
- schema_value, cur_running_graph,
- data_dir_value, &hqps_service] {
- LOG(INFO) << "Successfully stopped query handler";
-
- {
- std::lock_guard lock(mtx_);
- auto& db = gs::GraphDB::get();
- LOG(INFO) << "Update service running on graph:" << graph_name;
-
- // use the previous thread num
- auto thread_num = db.SessionNum();
- db.Close();
- VLOG(10) << "Closed the previous graph db";
- if (!db.Open(schema_value, data_dir_value, thread_num).ok()) {
- LOG(ERROR) << "Fail to load graph from data directory: "
- << data_dir_value;
- if (!prev_lock) { // If the graph is not locked before, and we
- // fail at some steps after locking, we should
- // unlock it.
- metadata_store_->UnlockGraphIndices(graph_name);
- }
- return seastar::make_ready_future(
- gs::Result(gs::Status(
- gs::StatusCode::InternalError,
- "Fail to load graph from data directory: " + data_dir_value)));
- }
- LOG(INFO) << "Successfully load graph from data directory: "
- << data_dir_value;
- // unlock the previous graph
- if (graph_name != cur_running_graph) {
- auto unlock_res =
- metadata_store_->UnlockGraphIndices(cur_running_graph);
- if (!unlock_res.ok()) {
- LOG(ERROR) << "Fail to unlock graph: " << cur_running_graph;
- if (!prev_lock) {
- metadata_store_->UnlockGraphIndices(graph_name);
+ auto& graph_db_service = GraphDBService::get();
+ return graph_db_service.stop_query_actors().then(
+ [this, prev_lock, graph_name, schema_value, cur_running_graph,
+ data_dir_value, &graph_db_service] {
+ LOG(INFO) << "Successfully stopped query handler";
+
+ {
+ std::lock_guard lock(mtx_);
+ auto& db = gs::GraphDB::get();
+ LOG(INFO) << "Update service running on graph:" << graph_name;
+
+ // use the previous thread num
+ auto thread_num = db.SessionNum();
+ db.Close();
+ VLOG(10) << "Closed the previous graph db";
+ if (!db.Open(schema_value, data_dir_value, thread_num).ok()) {
+ LOG(ERROR) << "Fail to load graph from data directory: "
+ << data_dir_value;
+ if (!prev_lock) { // If the graph is not locked before, and we
+ // fail at some steps after locking, we should
+ // unlock it.
+ metadata_store_->UnlockGraphIndices(graph_name);
+ }
+ return seastar::make_ready_future(
+ gs::Result(
+ gs::Status(gs::StatusCode::InternalError,
+ "Fail to load graph from data directory: " +
+ data_dir_value)));
+ }
+ LOG(INFO) << "Successfully load graph from data directory: "
+ << data_dir_value;
+ // unlock the previous graph
+ if (graph_name != cur_running_graph) {
+ auto unlock_res =
+ metadata_store_->UnlockGraphIndices(cur_running_graph);
+ if (!unlock_res.ok()) {
+ LOG(ERROR) << "Fail to unlock graph: " << cur_running_graph;
+ if (!prev_lock) {
+ metadata_store_->UnlockGraphIndices(graph_name);
+ }
+ return seastar::make_ready_future(
+ gs::Result(unlock_res.status()));
+ }
+ }
+ LOG(INFO) << "Update running graph to: " << graph_name;
+ auto set_res = metadata_store_->SetRunningGraph(graph_name);
+ if (!set_res.ok()) {
+ LOG(ERROR) << "Fail to set running graph: " << graph_name;
+ if (!prev_lock) {
+ metadata_store_->UnlockGraphIndices(graph_name);
+ }
+ return seastar::make_ready_future(
+ gs::Result(set_res.status()));
}
- return seastar::make_ready_future(
- gs::Result(unlock_res.status()));
- }
- }
- LOG(INFO) << "Update running graph to: " << graph_name;
- auto set_res = metadata_store_->SetRunningGraph(graph_name);
- if (!set_res.ok()) {
- LOG(ERROR) << "Fail to set running graph: " << graph_name;
- if (!prev_lock) {
- metadata_store_->UnlockGraphIndices(graph_name);
}
+ graph_db_service.start_query_actors(); // start on a new scope.
+ LOG(INFO) << "Successfully started service with graph: " << graph_name;
+ graph_db_service.reset_start_time();
return seastar::make_ready_future(
- gs::Result(set_res.status()));
- }
- }
- hqps_service.start_query_actors(); // start on a new scope.
- LOG(INFO) << "Successfully started service with graph: " << graph_name;
- hqps_service.reset_start_time();
- return seastar::make_ready_future(
- gs::Result(
- to_message_json("Successfully start service")));
- });
+ gs::Result(
+ to_message_json("Successfully start service")));
+ });
}
// Stop service.
@@ -1077,8 +1078,8 @@ seastar::future admin_actor::start_service(
// The port is still connectable.
seastar::future admin_actor::stop_service(
query_param&& query_param) {
- auto& hqps_service = HQPSService::get();
- return hqps_service.stop_query_actors().then([this] {
+ auto& graph_db_service = GraphDBService::get();
+ return graph_db_service.stop_query_actors().then([this] {
LOG(INFO) << "Successfully stopped query handler";
// Add also remove current running graph
{
@@ -1112,15 +1113,16 @@ seastar::future admin_actor::stop_service(
// get service status
seastar::future admin_actor::service_status(
query_param&& query_param) {
- auto& hqps_service = HQPSService::get();
- auto query_port = hqps_service.get_query_port();
+ auto& graph_db_service = GraphDBService::get();
+ auto query_port = graph_db_service.get_query_port();
auto running_graph_res = metadata_store_->GetRunningGraph();
nlohmann::json res;
if (query_port != 0) {
- res["status"] = hqps_service.is_actors_running() ? "Running" : "Stopped";
+ res["status"] =
+ graph_db_service.is_actors_running() ? "Running" : "Stopped";
res["hqps_port"] = query_port;
- res["bolt_port"] = hqps_service.get_service_config().bolt_port;
- res["gremlin_port"] = hqps_service.get_service_config().gremlin_port;
+ res["bolt_port"] = graph_db_service.get_service_config().bolt_port;
+ res["gremlin_port"] = graph_db_service.get_service_config().gremlin_port;
if (running_graph_res.ok()) {
auto graph_meta_res =
metadata_store_->GetGraphMeta(running_graph_res.value());
@@ -1161,7 +1163,7 @@ seastar::future admin_actor::service_status(
res["graph"] = {};
LOG(INFO) << "No graph is running";
}
- res["start_time"] = hqps_service.get_start_time();
+ res["start_time"] = graph_db_service.get_start_time();
} else {
LOG(INFO) << "Query service has not been inited!";
res["status"] = "Query service has not been inited!";
diff --git a/flex/engines/http_server/codegen_proxy.cc b/flex/engines/http_server/codegen_proxy.cc
index 066dca3f4d65..7228c4440afe 100644
--- a/flex/engines/http_server/codegen_proxy.cc
+++ b/flex/engines/http_server/codegen_proxy.cc
@@ -13,7 +13,7 @@
* limitations under the License.
*/
#include "flex/engines/http_server/codegen_proxy.h"
-#include "flex/engines/http_server/service/hqps_service.h"
+#include "flex/engines/http_server/graph_db_service.h"
#include "flex/engines/http_server/workdir_manipulator.h"
namespace server {
@@ -70,10 +70,10 @@ seastar::future> CodegenProxy::DoGen(
auto cur_graph_schema_path = default_graph_schema_path_;
if (cur_graph_schema_path.empty()) {
- auto& hqps_service = server::HQPSService::get();
- if (hqps_service.get_metadata_store()) {
+ auto& graph_db_service = server::GraphDBService::get();
+ if (graph_db_service.get_metadata_store()) {
auto running_graph_res =
- hqps_service.get_metadata_store()->GetRunningGraph();
+ graph_db_service.get_metadata_store()->GetRunningGraph();
if (!running_graph_res.ok()) {
return seastar::make_exception_future>(
std::runtime_error("Get running graph failed"));
diff --git a/flex/engines/http_server/service/hqps_service.cc b/flex/engines/http_server/graph_db_service.cc
similarity index 88%
rename from flex/engines/http_server/service/hqps_service.cc
rename to flex/engines/http_server/graph_db_service.cc
index f94751f75fa6..21442759028b 100644
--- a/flex/engines/http_server/service/hqps_service.cc
+++ b/flex/engines/http_server/graph_db_service.cc
@@ -12,7 +12,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-#include "flex/engines/http_server/service/hqps_service.h"
+#include "flex/engines/http_server/graph_db_service.h"
#include "flex/engines/http_server/options.h"
#include "flex/engines/http_server/workdir_manipulator.h"
namespace server {
@@ -37,26 +37,27 @@ ServiceConfig::ServiceConfig()
admin_port(DEFAULT_ADMIN_PORT),
query_port(DEFAULT_QUERY_PORT),
shard_num(DEFAULT_SHARD_NUM),
+ enable_adhoc_handler(false),
dpdk_mode(false),
enable_thread_resource_pool(true),
external_thread_num(2),
- start_admin_service(true),
+ start_admin_service(false),
start_compiler(false),
- enable_gremlin(true),
- enable_bolt(true),
+ enable_gremlin(false),
+ enable_bolt(false),
metadata_store_type_(gs::MetadataStoreType::kLocalFile) {}
-const std::string HQPSService::DEFAULT_GRAPH_NAME = "modern_graph";
-const std::string HQPSService::DEFAULT_INTERACTIVE_HOME = "/opt/flex/";
-const std::string HQPSService::COMPILER_SERVER_CLASS_NAME =
+const std::string GraphDBService::DEFAULT_GRAPH_NAME = "modern_graph";
+const std::string GraphDBService::DEFAULT_INTERACTIVE_HOME = "/opt/flex/";
+const std::string GraphDBService::COMPILER_SERVER_CLASS_NAME =
"com.alibaba.graphscope.GraphServer";
-HQPSService& HQPSService::get() {
- static HQPSService instance;
+GraphDBService& GraphDBService::get() {
+ static GraphDBService instance;
return instance;
}
-void HQPSService::init(const ServiceConfig& config) {
+void GraphDBService::init(const ServiceConfig& config) {
if (initialized_.load(std::memory_order_relaxed)) {
std::cerr << "High QPS service has been already initialized!" << std::endl;
return;
@@ -64,8 +65,8 @@ void HQPSService::init(const ServiceConfig& config) {
actor_sys_ = std::make_unique(
config.shard_num, config.dpdk_mode, config.enable_thread_resource_pool,
config.external_thread_num, [this]() { set_exit_state(); });
- query_hdl_ =
- std::make_unique(config.query_port, config.shard_num);
+ query_hdl_ = std::make_unique(
+ config.query_port, config.shard_num, config.enable_adhoc_handler);
if (config.start_admin_service) {
admin_hdl_ = std::make_unique(config.admin_port);
}
@@ -100,7 +101,7 @@ void HQPSService::init(const ServiceConfig& config) {
}
}
-HQPSService::~HQPSService() {
+GraphDBService::~GraphDBService() {
if (actor_sys_) {
actor_sys_->terminate();
}
@@ -110,38 +111,39 @@ HQPSService::~HQPSService() {
}
}
-const ServiceConfig& HQPSService::get_service_config() const {
+const ServiceConfig& GraphDBService::get_service_config() const {
return service_config_;
}
-bool HQPSService::is_initialized() const {
+bool GraphDBService::is_initialized() const {
return initialized_.load(std::memory_order_relaxed);
}
-bool HQPSService::is_running() const {
+bool GraphDBService::is_running() const {
return running_.load(std::memory_order_relaxed);
}
-uint16_t HQPSService::get_query_port() const {
+uint16_t GraphDBService::get_query_port() const {
if (query_hdl_) {
return query_hdl_->get_port();
}
return 0;
}
-uint64_t HQPSService::get_start_time() const {
+uint64_t GraphDBService::get_start_time() const {
return start_time_.load(std::memory_order_relaxed);
}
-void HQPSService::reset_start_time() {
+void GraphDBService::reset_start_time() {
start_time_.store(gs::GetCurrentTimeStamp());
}
-std::shared_ptr HQPSService::get_metadata_store() const {
+std::shared_ptr GraphDBService::get_metadata_store()
+ const {
return metadata_store_;
}
-gs::Result HQPSService::service_status() {
+gs::Result GraphDBService::service_status() {
if (!is_initialized()) {
return gs::Result(
gs::StatusCode::OK, "High QPS service has not been inited!", "");
@@ -154,7 +156,7 @@ gs::Result HQPSService::service_status() {
seastar::sstring("High QPS service is running ..."));
}
-void HQPSService::run_and_wait_for_exit() {
+void GraphDBService::run_and_wait_for_exit() {
if (!is_initialized()) {
std::cerr << "High QPS service has not been inited!" << std::endl;
return;
@@ -181,16 +183,16 @@ void HQPSService::run_and_wait_for_exit() {
actor_sys_->terminate();
}
-void HQPSService::set_exit_state() { running_.store(false); }
+void GraphDBService::set_exit_state() { running_.store(false); }
-bool HQPSService::is_actors_running() const {
+bool GraphDBService::is_actors_running() const {
if (query_hdl_) {
return query_hdl_->is_actors_running();
} else
return false;
}
-seastar::future<> HQPSService::stop_query_actors() {
+seastar::future<> GraphDBService::stop_query_actors() {
std::unique_lock lock(mtx_);
if (query_hdl_) {
return query_hdl_->stop_query_actors();
@@ -201,7 +203,7 @@ seastar::future<> HQPSService::stop_query_actors() {
}
}
-void HQPSService::start_query_actors() {
+void GraphDBService::start_query_actors() {
std::unique_lock lock(mtx_);
if (query_hdl_) {
query_hdl_->start_query_actors();
@@ -211,7 +213,7 @@ void HQPSService::start_query_actors() {
}
}
-bool HQPSService::check_compiler_ready() const {
+bool GraphDBService::check_compiler_ready() const {
if (service_config_.start_compiler) {
if (service_config_.enable_gremlin) {
if (check_port_occupied(service_config_.gremlin_port)) {
@@ -233,7 +235,7 @@ bool HQPSService::check_compiler_ready() const {
return true;
}
-bool HQPSService::start_compiler_subprocess(
+bool GraphDBService::start_compiler_subprocess(
const std::string& graph_schema_path) {
if (!service_config_.start_compiler) {
return true;
@@ -292,7 +294,7 @@ bool HQPSService::start_compiler_subprocess(
return false;
}
-bool HQPSService::stop_compiler_subprocess() {
+bool GraphDBService::stop_compiler_subprocess() {
if (compiler_process_.running()) {
LOG(INFO) << "Terminate previous compiler process with pid: "
<< compiler_process_.id();
@@ -325,7 +327,7 @@ bool HQPSService::stop_compiler_subprocess() {
return true;
}
-std::string HQPSService::find_interactive_class_path() {
+std::string GraphDBService::find_interactive_class_path() {
std::string interactive_home = DEFAULT_INTERACTIVE_HOME;
if (std::getenv("INTERACTIVE_HOME")) {
// try to use DEFAULT_INTERACTIVE_HOME
@@ -375,7 +377,7 @@ std::string HQPSService::find_interactive_class_path() {
return "";
}
-gs::GraphId HQPSService::insert_default_graph_meta() {
+gs::GraphId GraphDBService::insert_default_graph_meta() {
if (!metadata_store_) {
LOG(FATAL) << "Metadata store has not been inited!" << std::endl;
}
diff --git a/flex/engines/http_server/service/hqps_service.h b/flex/engines/http_server/graph_db_service.h
similarity index 96%
rename from flex/engines/http_server/service/hqps_service.h
rename to flex/engines/http_server/graph_db_service.h
index 3193ff0b5bcb..fe329a7ce81b 100644
--- a/flex/engines/http_server/service/hqps_service.h
+++ b/flex/engines/http_server/graph_db_service.h
@@ -21,7 +21,7 @@
#include "flex/engines/graph_db/database/graph_db.h"
#include "flex/engines/http_server/actor_system.h"
#include "flex/engines/http_server/handler/admin_http_handler.h"
-#include "flex/engines/http_server/handler/hqps_http_handler.h"
+#include "flex/engines/http_server/handler/graph_db_http_handler.h"
#include "flex/engines/http_server/workdir_manipulator.h"
#include "flex/storages/metadata/graph_meta_store.h"
#include "flex/storages/metadata/metadata_store_factory.h"
@@ -48,6 +48,7 @@ struct ServiceConfig {
uint32_t query_port;
uint32_t shard_num;
uint32_t memory_level;
+ bool enable_adhoc_handler; // Whether to enable adhoc handler.
bool dpdk_mode;
bool enable_thread_resource_pool;
unsigned external_thread_num;
@@ -65,15 +66,14 @@ struct ServiceConfig {
ServiceConfig();
};
-class HQPSService {
+class GraphDBService {
public:
static const std::string DEFAULT_GRAPH_NAME;
static const std::string DEFAULT_INTERACTIVE_HOME;
static const std::string COMPILER_SERVER_CLASS_NAME;
- static HQPSService& get();
- ~HQPSService();
+ static GraphDBService& get();
+ ~GraphDBService();
- // only start the query service.
void init(const ServiceConfig& config);
const ServiceConfig& get_service_config() const;
@@ -113,7 +113,7 @@ class HQPSService {
bool check_compiler_ready() const;
private:
- HQPSService() = default;
+ GraphDBService() = default;
std::string find_interactive_class_path();
// Insert graph meta into metadata store.
@@ -124,7 +124,7 @@ class HQPSService {
private:
std::unique_ptr actor_sys_;
std::unique_ptr admin_hdl_;
- std::unique_ptr query_hdl_;
+ std::unique_ptr query_hdl_;
std::atomic running_{false};
std::atomic initialized_{false};
std::atomic start_time_{0};
diff --git a/flex/engines/http_server/handler/admin_http_handler.cc b/flex/engines/http_server/handler/admin_http_handler.cc
index 509c64e60b25..f7873f647454 100644
--- a/flex/engines/http_server/handler/admin_http_handler.cc
+++ b/flex/engines/http_server/handler/admin_http_handler.cc
@@ -691,7 +691,7 @@ seastar::future<> admin_http_handler::set_routes() {
{
auto match_rule =
new seastar::httpd::match_rule(new admin_http_procedure_handler_impl(
- interactive_admin_group_id, shard_admin_procedure_concurrency));
+ interactive_admin_group_id, shard_admin_concurrency));
match_rule->add_str("/v1/graph")
.add_param("graph_id")
.add_str("/procedure");
@@ -701,7 +701,7 @@ seastar::future<> admin_http_handler::set_routes() {
{
auto match_rule =
new seastar::httpd::match_rule(new admin_http_procedure_handler_impl(
- interactive_admin_group_id, shard_admin_procedure_concurrency));
+ interactive_admin_group_id, shard_admin_concurrency));
match_rule->add_str("/v1/graph")
.add_param("graph_id")
.add_str("/procedure");
@@ -712,7 +712,7 @@ seastar::future<> admin_http_handler::set_routes() {
// Each procedure's handling
auto match_rule =
new seastar::httpd::match_rule(new admin_http_procedure_handler_impl(
- interactive_admin_group_id, shard_admin_procedure_concurrency));
+ interactive_admin_group_id, shard_admin_concurrency));
match_rule->add_str("/v1/graph")
.add_param("graph_id")
.add_str("/procedure")
@@ -725,7 +725,7 @@ seastar::future<> admin_http_handler::set_routes() {
// Each procedure's handling
auto match_rule =
new seastar::httpd::match_rule(new admin_http_procedure_handler_impl(
- interactive_admin_group_id, shard_admin_procedure_concurrency));
+ interactive_admin_group_id, shard_admin_concurrency));
match_rule->add_str("/v1/graph")
.add_param("graph_id")
.add_str("/procedure")
@@ -737,7 +737,7 @@ seastar::future<> admin_http_handler::set_routes() {
// Each procedure's handling
auto match_rule =
new seastar::httpd::match_rule(new admin_http_procedure_handler_impl(
- interactive_admin_group_id, shard_admin_procedure_concurrency));
+ interactive_admin_group_id, shard_admin_concurrency));
match_rule->add_str("/v1/graph")
.add_param("graph_id")
.add_str("/procedure")
@@ -750,24 +750,24 @@ seastar::future<> admin_http_handler::set_routes() {
// List all graphs.
r.add(seastar::httpd::operation_type::GET, seastar::httpd::url("/v1/graph"),
new admin_http_graph_handler_impl(interactive_admin_group_id,
- shard_admin_graph_concurrency));
+ shard_admin_concurrency));
// Create a new Graph
r.add(seastar::httpd::operation_type::POST,
seastar::httpd::url("/v1/graph"),
new admin_http_graph_handler_impl(interactive_admin_group_id,
- shard_admin_graph_concurrency));
+ shard_admin_concurrency));
// Delete a graph
r.add(SEASTAR_DELETE,
seastar::httpd::url("/v1/graph").remainder("graph_id"),
new admin_http_graph_handler_impl(interactive_admin_group_id,
- shard_admin_graph_concurrency));
+ shard_admin_concurrency));
{
// uploading file to server
r.add(seastar::httpd::operation_type::POST,
seastar::httpd::url("/v1/file/upload"),
new admin_file_upload_handler_impl(interactive_admin_group_id,
- shard_admin_graph_concurrency));
+ shard_admin_concurrency));
}
// Get graph metadata
@@ -776,7 +776,7 @@ seastar::future<> admin_http_handler::set_routes() {
// /v1/graph/{graph_id}/schema
auto match_rule =
new seastar::httpd::match_rule(new admin_http_graph_handler_impl(
- interactive_admin_group_id, shard_admin_graph_concurrency));
+ interactive_admin_group_id, shard_admin_concurrency));
match_rule->add_str("/v1/graph").add_param("graph_id", false);
// Get graph schema
r.add(match_rule, seastar::httpd::operation_type::GET);
@@ -785,7 +785,7 @@ seastar::future<> admin_http_handler::set_routes() {
{ // load data to graph
auto match_rule =
new seastar::httpd::match_rule(new admin_http_graph_handler_impl(
- interactive_admin_group_id, shard_admin_graph_concurrency));
+ interactive_admin_group_id, shard_admin_concurrency));
match_rule->add_str("/v1/graph")
.add_param("graph_id")
.add_str("/dataloading");
@@ -794,7 +794,7 @@ seastar::future<> admin_http_handler::set_routes() {
{ // Get Graph Schema
auto match_rule =
new seastar::httpd::match_rule(new admin_http_graph_handler_impl(
- interactive_admin_group_id, shard_admin_graph_concurrency));
+ interactive_admin_group_id, shard_admin_concurrency));
match_rule->add_str("/v1/graph").add_param("graph_id").add_str("/schema");
r.add(match_rule, seastar::httpd::operation_type::GET);
}
@@ -802,7 +802,7 @@ seastar::future<> admin_http_handler::set_routes() {
// Get running graph statistics
auto match_rule =
new seastar::httpd::match_rule(new admin_http_graph_handler_impl(
- interactive_admin_group_id, shard_admin_graph_concurrency));
+ interactive_admin_group_id, shard_admin_concurrency));
match_rule->add_str("/v1/graph")
.add_param("graph_id")
.add_str("/statistics");
@@ -814,18 +814,18 @@ seastar::future<> admin_http_handler::set_routes() {
r.add(seastar::httpd::operation_type::GET,
seastar::httpd::url("/v1/node/status"),
new admin_http_node_handler_impl(interactive_admin_group_id,
- shard_admin_node_concurrency));
+ shard_admin_concurrency));
auto match_rule =
new seastar::httpd::match_rule(new admin_http_service_handler_impl(
- interactive_admin_group_id, shard_admin_service_concurrency));
+ interactive_admin_group_id, shard_admin_concurrency));
match_rule->add_str("/v1/service").add_param("action");
r.add(match_rule, seastar::httpd::operation_type::POST);
r.add(seastar::httpd::operation_type::GET,
seastar::httpd::url("/v1/service/status"),
- new admin_http_service_handler_impl(
- interactive_admin_group_id, shard_admin_service_concurrency));
+ new admin_http_service_handler_impl(interactive_admin_group_id,
+ shard_admin_concurrency));
}
{
@@ -897,17 +897,17 @@ seastar::future<> admin_http_handler::set_routes() {
// job request handling.
r.add(seastar::httpd::operation_type::GET, seastar::httpd::url("/v1/job"),
new admin_http_job_handler_impl(interactive_admin_group_id,
- shard_admin_job_concurrency));
+ shard_admin_concurrency));
auto match_rule =
new seastar::httpd::match_rule(new admin_http_job_handler_impl(
- interactive_admin_group_id, shard_admin_job_concurrency));
+ interactive_admin_group_id, shard_admin_concurrency));
match_rule->add_str("/v1/job").add_param("job_id");
r.add(match_rule, seastar::httpd::operation_type::GET);
r.add(SEASTAR_DELETE, seastar::httpd::url("/v1/job").remainder("job_id"),
new admin_http_job_handler_impl(interactive_admin_group_id,
- shard_admin_job_concurrency));
+ shard_admin_concurrency));
}
return seastar::make_ready_future<>();
diff --git a/flex/engines/http_server/handler/graph_db_http_handler.cc b/flex/engines/http_server/handler/graph_db_http_handler.cc
index 077d32975573..88aed71f6703 100644
--- a/flex/engines/http_server/handler/graph_db_http_handler.cc
+++ b/flex/engines/http_server/handler/graph_db_http_handler.cc
@@ -13,81 +13,298 @@
* limitations under the License.
*/
+#include "flex/engines/http_server/handler/graph_db_http_handler.h"
+#include "flex/engines/graph_db/database/graph_db_session.h"
#include "flex/engines/http_server/executor_group.actg.h"
+#include "flex/engines/http_server/graph_db_service.h"
#include "flex/engines/http_server/options.h"
-#include "flex/engines/http_server/service/graph_db_service.h"
+#include "flex/engines/http_server/types.h"
+#include "flex/otel/otel.h"
#include
#include
+#include
#include
-#include "flex/engines/http_server/generated/actor/executor_ref.act.autogen.h"
-#include "flex/engines/http_server/types.h"
-#if 0
+#ifdef HAVE_OPENTELEMETRY_CPP
+#include "opentelemetry/context/context.h"
+#include "opentelemetry/trace/span_metadata.h"
+#include "opentelemetry/trace/span_startoptions.h"
+#endif // HAVE_OPENTELEMETRY_CPP
+
+#define RANDOM_DISPATCHER 1
+// when RANDOM_DISPATCHER is false, the dispatcher will use round-robin
+// algorithm to dispatch the query to different executors
+
+#if RANDOM_DISPATCHER
+#include
+#endif
+
class query_dispatcher {
public:
query_dispatcher(uint32_t shard_concurrency)
- : shard_concurrency_(shard_concurrency), executor_idx_(0) {}
+ :
+#if RANDOM_DISPATCHER
+ rd_(),
+ gen_(rd_()),
+ dis_(0, shard_concurrency - 1)
+#else
+ shard_concurrency_(shard_concurrency),
+ executor_idx_(0)
+#endif // RANDOM_DISPATCHER
+ {
+ }
- int get_executor_idx() {
+ inline int get_executor_idx() {
+#if RANDOM_DISPATCHER
+ return dis_(gen_);
+#else
auto idx = executor_idx_;
executor_idx_ = (executor_idx_ + 1) % shard_concurrency_;
return idx;
+#endif // RANDOM_DISPATCHER
}
private:
+#if RANDOM_DISPATCHER
+ std::random_device rd_;
+ std::mt19937 gen_;
+ std::uniform_int_distribution<> dis_;
+#else
int shard_concurrency_;
int executor_idx_;
+#endif
};
-#else
-#include
-class query_dispatcher {
+
+#undef RANDOM_DISPATCHER
+
+//////////////////////////////////////////////////////////////////////////
+namespace seastar {
+namespace httpd {
+// The seastar::httpd::param_matcher will fail to match if param is not
+// specified.
+class optional_param_matcher : public matcher {
public:
- query_dispatcher(uint32_t shard_concurrency)
- : rd_(), gen_(rd_()), dis_(0, shard_concurrency - 1) {}
+ /**
+ * Constructor
+ * @param name the name of the parameter, will be used as the key
+ * in the parameters object
+ * @param entire_path when set to true, the matched parameters will
+ * include all the remaining url until the end of it.
+ * when set to false the match will terminate at the next slash
+ */
+ explicit optional_param_matcher(const sstring& name) : _name(name) {}
- int get_executor_idx() { return dis_(gen_); }
+ size_t match(const sstring& url, size_t ind, parameters& param) override {
+ size_t last = find_end_param(url, ind);
+ if (last == url.size()) {
+ // Means we didn't find the parameter, but we still return true,
+ // and set the value to empty string.
+ param.set(_name, "");
+ return ind;
+ }
+ param.set(_name, url.substr(ind, last - ind));
+ return last;
+ }
private:
- std::random_device rd_;
- std::mt19937 gen_;
- std::uniform_int_distribution<> dis_;
+ size_t find_end_param(const sstring& url, size_t ind) {
+ size_t pos = url.find('/', ind + 1);
+ if (pos == sstring::npos) {
+ return url.length();
+ }
+ return pos;
+ }
+ sstring _name;
};
-#endif
+} // namespace httpd
+} // namespace seastar
namespace server {
-class graph_db_ic_handler : public seastar::httpd::handler_base {
+bool is_running_graph(const seastar::sstring& graph_id) {
+ std::string graph_id_str(graph_id.data(), graph_id.size());
+ auto running_graph_res =
+ GraphDBService::get().get_metadata_store()->GetRunningGraph();
+ if (!running_graph_res.ok()) {
+ LOG(ERROR) << "Failed to get running graph: "
+ << running_graph_res.status().error_message();
+ return false;
+ }
+ return running_graph_res.value() == graph_id_str;
+}
+
+////////////////////////////stored_proc_handler////////////////////////////
+class stored_proc_handler : public StoppableHandler {
public:
- graph_db_ic_handler(uint32_t group_id, uint32_t shard_concurrency)
- : shard_concurrency_(shard_concurrency), dispatcher_(shard_concurrency) {
- executor_refs_.reserve(shard_concurrency_);
+ stored_proc_handler(uint32_t init_group_id, uint32_t max_group_id,
+ uint32_t group_inc_step, uint32_t shard_concurrency)
+ : StoppableHandler(init_group_id, max_group_id, group_inc_step,
+ shard_concurrency),
+ dispatcher_(shard_concurrency) {
+ executor_refs_.reserve(shard_concurrency);
hiactor::scope_builder builder;
builder.set_shard(hiactor::local_shard_id())
.enter_sub_scope(hiactor::scope(0))
- .enter_sub_scope(hiactor::scope(group_id));
- for (unsigned i = 0; i < shard_concurrency_; ++i) {
+ .enter_sub_scope(hiactor::scope(
+ StoppableHandler::cur_group_id_));
+ for (unsigned i = 0; i < StoppableHandler::shard_concurrency_; ++i) {
executor_refs_.emplace_back(builder.build_ref(i));
}
+#ifdef HAVE_OPENTELEMETRY_CPP
+ total_counter_ = otel::create_int_counter("hqps_procedure_query_total");
+ latency_histogram_ =
+ otel::create_double_histogram("hqps_procedure_query_latency");
+#endif
+ }
+ ~stored_proc_handler() override = default;
+
+ seastar::future<> stop() override {
+ return StoppableHandler::cancel_scope([this] { executor_refs_.clear(); });
+ }
+
+ bool start() override {
+ if (executor_refs_.size() > 0) {
+ LOG(ERROR) << "The actors have been already created!";
+ return false;
+ }
+ return StoppableHandler::start_scope(
+ [this](hiactor::scope_builder& builder) {
+ for (unsigned i = 0; i < StoppableHandler::shard_concurrency_; ++i) {
+ executor_refs_.emplace_back(builder.build_ref(i));
+ }
+ });
}
- ~graph_db_ic_handler() override = default;
seastar::future> handle(
const seastar::sstring& path,
std::unique_ptr req,
std::unique_ptr rep) override {
auto dst_executor = dispatcher_.get_executor_idx();
- req->content.append("\0", 1);
+ // TODO(zhanglei): choose read or write based on the request, after the
+ // read/write info is supported in physical plan
+ uint8_t last_byte;
+ if (req->content.size() > 0) {
+ // read last byte and get the format info from the byte.
+ last_byte = req->content.back();
+ if (last_byte >
+ static_cast(
+ gs::GraphDBSession::InputFormat::kCypherProtoProcedure)) {
+ LOG(ERROR) << "Unsupported request format: " << (int) last_byte;
+ rep->set_status(
+ seastar::httpd::reply::status_type::internal_server_error);
+ rep->write_body("bin", seastar::sstring("Unsupported request format!"));
+ rep->done();
+ return seastar::make_ready_future<
+ std::unique_ptr>(std::move(rep));
+ }
+ } else {
+ LOG(ERROR) << "Empty request content!";
+ rep->set_status(
+ seastar::httpd::reply::status_type::internal_server_error);
+ rep->write_body("bin", seastar::sstring("Empty request content!"));
+ rep->done();
+ return seastar::make_ready_future>(
+ std::move(rep));
+ }
+ if (path != "/v1/graph/current/query" && req->param.exists("graph_id")) {
+ // TODO(zhanglei): get from graph_db.
+ if (!is_running_graph(req->param["graph_id"])) {
+ rep->set_status(
+ seastar::httpd::reply::status_type::internal_server_error);
+ rep->write_body("bin",
+ seastar::sstring("The querying query is not running:" +
+ req->param["graph_id"]));
+ rep->done();
+ return seastar::make_ready_future<
+ std::unique_ptr>(std::move(rep));
+ }
+ }
+#ifdef HAVE_OPENTELEMETRY_CPP
+ auto tracer = otel::get_tracer("hqps_procedure_query_handler");
+ // Extract context from headers. This copy is necessary to avoid access
+ // after header content been freed
+ std::map headers(req->_headers.begin(),
+ req->_headers.end());
+ auto current_ctx = opentelemetry::context::RuntimeContext::GetCurrent();
+ auto options = otel::get_parent_ctx(current_ctx, headers);
+ auto outer_span = tracer->StartSpan("procedure_query_handling", options);
+ auto scope = tracer->WithActiveSpan(outer_span);
+ auto start_ts = gs::GetCurrentTimeStamp();
+#endif // HAVE_OPENTELEMETRY_CPP
+
return executor_refs_[dst_executor]
.run_graph_db_query(query_param{std::move(req->content)})
- .then_wrapped([rep = std::move(rep)](
- seastar::future&& fut) mutable {
+ .then([last_byte
+#ifdef HAVE_OPENTELEMETRY_CPP
+ ,
+ this, outer_span = outer_span
+#endif // HAVE_OPENTELEMETRY_CPP
+ ](auto&& output) {
+ if (last_byte == static_cast(
+ gs::GraphDBSession::InputFormat::kCppEncoder)) {
+ return seastar::make_ready_future(
+ std::move(output.content));
+ } else {
+ // For cypher input format, the results are written with
+ // output.put_string(), which will add extra 4 bytes. So we need
+ // to remove the first 4 bytes here.
+ if (output.content.size() < 4) {
+ LOG(ERROR) << "Invalid output size: " << output.content.size();
+#ifdef HAVE_OPENTELEMETRY_CPP
+ outer_span->SetStatus(opentelemetry::trace::StatusCode::kError,
+ "Invalid output size");
+ outer_span->End();
+ std::map labels = {
+ { "status",
+ "fail" }};
+ total_counter_->Add(1, labels);
+#endif // HAVE_OPENTELEMETRY_CPP
+ return seastar::make_ready_future(std::move(output));
+ }
+ return seastar::make_ready_future(
+ std::move(output.content.substr(4)));
+ }
+ })
+ .then_wrapped([rep = std::move(rep)
+#ifdef HAVE_OPENTELEMETRY_CPP
+ ,
+ this, outer_span, start_ts
+#endif // HAVE_OPENTELEMETRY_CPP
+ ](seastar::future&& fut) mutable {
if (__builtin_expect(fut.failed(), false)) {
- return seastar::make_exception_future<
- std::unique_ptr>(fut.get_exception());
+ rep->set_status(
+ seastar::httpd::reply::status_type::internal_server_error);
+ try {
+ std::rethrow_exception(fut.get_exception());
+ } catch (std::exception& e) {
+ rep->write_body("bin", seastar::sstring(e.what()));
+ }
+#ifdef HAVE_OPENTELEMETRY_CPP
+ outer_span->SetStatus(opentelemetry::trace::StatusCode::kError,
+ "Internal Server Error");
+ outer_span->End();
+ std::map labels = {{ "status", "fail" }};
+ total_counter_->Add(1, labels);
+#endif // HAVE_OPENTELEMETRY_CPP
+ rep->done();
+ return seastar::make_ready_future<
+ std::unique_ptr>(std::move(rep));
}
auto result = fut.get0();
rep->write_body("bin", std::move(result.content));
+#ifdef HAVE_OPENTELEMETRY_CPP
+ outer_span->End();
+ std::map labels = {{ "status", "success" }};
+ total_counter_->Add(1, labels);
+ auto end_ts = gs::GetCurrentTimeStamp();
+#if OPENTELEMETRY_ABI_VERSION_NO >= 2
+ latency_histogram_->Record(end_ts - start_ts);
+#else
+ latency_histogram_->Record(end_ts - start_ts,
+ opentelemetry::context::Context{});
+#endif
+#endif // HAVE_OPENTELEMETRY_CPP
rep->done();
return seastar::make_ready_future<
std::unique_ptr>(std::move(rep));
@@ -95,27 +312,284 @@ class graph_db_ic_handler : public seastar::httpd::handler_base {
}
private:
- const uint32_t shard_concurrency_;
query_dispatcher dispatcher_;
std::vector executor_refs_;
+#ifdef HAVE_OPENTELEMETRY_CPP
+ opentelemetry::nostd::unique_ptr>
+ total_counter_;
+ opentelemetry::nostd::unique_ptr>
+ latency_histogram_;
+#endif
};
-class graph_db_exit_handler : public seastar::httpd::handler_base {
+class adhoc_query_handler : public StoppableHandler {
public:
+ adhoc_query_handler(uint32_t init_group_id, uint32_t max_group_id,
+ uint32_t group_inc_step, uint32_t shard_concurrency)
+ : StoppableHandler(init_group_id, max_group_id, group_inc_step,
+ shard_concurrency),
+ executor_idx_(0) {
+ executor_refs_.reserve(shard_concurrency_);
+ {
+ hiactor::scope_builder builder;
+ builder.set_shard(hiactor::local_shard_id())
+ .enter_sub_scope(hiactor::scope(0))
+ .enter_sub_scope(hiactor::scope(init_group_id));
+ for (unsigned i = 0; i < shard_concurrency_; ++i) {
+ executor_refs_.emplace_back(builder.build_ref(i));
+ }
+ }
+ {
+ hiactor::scope_builder builder;
+ builder.set_shard(hiactor::local_shard_id())
+ .enter_sub_scope(hiactor::scope(0))
+ .enter_sub_scope(hiactor::scope(init_group_id));
+ codegen_actor_refs_.emplace_back(builder.build_ref(0));
+ }
+#ifdef HAVE_OPENTELEMETRY_CPP
+ total_counter_ = otel::create_int_counter("hqps_adhoc_query_total");
+ latency_histogram_ =
+ otel::create_double_histogram("hqps_adhoc_query_latency");
+#endif // HAVE_OPENTELEMETRY_CPP
+ }
+
+ ~adhoc_query_handler() override = default;
+
+ seastar::future<> stop() override {
+ return StoppableHandler::cancel_scope([this] {
+ executor_refs_.clear();
+ codegen_actor_refs_.clear();
+ });
+ }
+
+ bool start() override {
+ if (executor_refs_.size() > 0 || codegen_actor_refs_.size() > 0) {
+ LOG(ERROR) << "The actors have been already created!";
+ return false;
+ }
+ return StoppableHandler::start_scope([this](
+ hiactor::scope_builder& builder) {
+ for (unsigned i = 0; i < StoppableHandler::shard_concurrency_; ++i) {
+ executor_refs_.emplace_back(builder.build_ref(i));
+ }
+ codegen_actor_refs_.emplace_back(builder.build_ref(0));
+ });
+ }
+
seastar::future> handle(
const seastar::sstring& path,
std::unique_ptr req,
std::unique_ptr rep) override {
- GraphDBService::get().set_exit_state();
- rep->write_body("bin",
- seastar::sstring{"The graph_db server is exiting ..."});
- return seastar::make_ready_future>(
- std::move(rep));
+ auto dst_executor = executor_idx_;
+ executor_idx_ = (executor_idx_ + 1) % shard_concurrency_;
+
+ if (path != "/v1/graph/current/adhoc_query" &&
+ req->param.exists("graph_id")) {
+ // TODO(zhanglei): get from graph_db.
+ if (!is_running_graph(req->param["graph_id"])) {
+ rep->set_status(
+ seastar::httpd::reply::status_type::internal_server_error);
+ rep->write_body("bin",
+ seastar::sstring("The querying query is not running:" +
+ req->param["graph_id"]));
+ rep->done();
+ return seastar::make_ready_future<
+ std::unique_ptr>(std::move(rep));
+ }
+ }
+
+#ifdef HAVE_OPENTELEMETRY_CPP
+ auto tracer = otel::get_tracer("adhoc_query_handler");
+ // Extract context from headers. This copy is necessary to avoid access
+ // after header content been freed
+ std::map headers(req->_headers.begin(),
+ req->_headers.end());
+ auto current_ctx = opentelemetry::context::RuntimeContext::GetCurrent();
+ auto options = otel::get_parent_ctx(current_ctx, headers);
+ auto outer_span = tracer->StartSpan("adhoc_query_handling", options);
+ auto scope = tracer->WithActiveSpan(outer_span);
+ // Start a new span for codegen
+ auto codegen_span = tracer->StartSpan("adhoc_codegen", options);
+ auto codegen_scope = tracer->WithActiveSpan(codegen_span);
+ // create a new span for query execution, not started.
+ auto start_ts = std::chrono::duration_cast(
+ std::chrono::system_clock::now().time_since_epoch())
+ .count();
+#endif // HAVE_OPENTELEMETRY_CPP
+ return codegen_actor_refs_[0]
+ .do_codegen(query_param{std::move(req->content)})
+ .then([this, dst_executor
+#ifdef HAVE_OPENTELEMETRY_CPP
+ ,
+ codegen_span = codegen_span, tracer = tracer, options = options,
+ codegen_scope = std::move(codegen_scope), outer_span = outer_span
+#endif // HAVE_OPENTELEMETRY_CPP
+ ](auto&& param) mutable {
+#ifdef HAVE_OPENTELEMETRY_CPP
+ codegen_span->End();
+ options.parent = outer_span->GetContext();
+ auto query_span = tracer->StartSpan("adhoc_query_execution", options);
+ auto query_scope = tracer->WithActiveSpan(query_span);
+#endif // HAVE_OPENTELEMETRY_CPP
+ // TODO(zhanglei): choose read or write based on the request, after the
+ // read/write info is supported in physical plan
+ // The content contains the path to dynamic library
+ param.content.append(gs::Schema::HQPS_ADHOC_WRITE_PLUGIN_ID_STR, 1);
+ param.content.append(gs::GraphDBSession::kCypherProtoAdhocStr, 1);
+ return executor_refs_[dst_executor]
+ .run_graph_db_query(query_param{std::move(param.content)})
+ .then([
+#ifdef HAVE_OPENTELEMETRY_CPP
+ query_span = query_span,
+ query_scope = std::move(query_scope)
+#endif // HAVE_OPENTELEMETRY_CPP
+ ](auto&& output) {
+#ifdef HAVE_OPENTELEMETRY_CPP
+ query_span->End();
+#endif // HAVE_OPENTELEMETRY_CPP
+ return seastar::make_ready_future(
+ std::move(output.content));
+ });
+ })
+ .then([
+#ifdef HAVE_OPENTELEMETRY_CPP
+ this, outer_span = outer_span
+#endif // HAVE_OPENTELEMETRY_CPP
+ ](auto&& output) {
+ if (output.content.size() < 4) {
+ LOG(ERROR) << "Invalid output size: " << output.content.size();
+#ifdef HAVE_OPENTELEMETRY_CPP
+ std::map labels = {{ "status", "fail" }};
+ total_counter_->Add(1, labels);
+ outer_span->SetStatus(opentelemetry::trace::StatusCode::kError,
+ "Internal output size");
+ outer_span->End();
+#endif // HAVE_OPENTELEMETRY_CPP
+ return seastar::make_ready_future(std::move(output));
+ }
+ return seastar::make_ready_future(
+ std::move(output.content.substr(4)));
+ })
+ .then_wrapped([rep = std::move(rep)
+#ifdef HAVE_OPENTELEMETRY_CPP
+ ,
+ this, outer_span, start_ts
+#endif // HAVE_OPENTELEMETRY_CPP
+ ](seastar::future&& fut) mutable {
+ if (__builtin_expect(fut.failed(), false)) {
+ rep->set_status(
+ seastar::httpd::reply::status_type::internal_server_error);
+ try {
+ std::rethrow_exception(fut.get_exception());
+ } catch (std::exception& e) {
+ rep->write_body("bin", seastar::sstring(e.what()));
+#ifdef HAVE_OPENTELEMETRY_CPP
+ std::map labels = {
+ { "status",
+ "fail" }};
+ total_counter_->Add(1, labels);
+ outer_span->SetStatus(opentelemetry::trace::StatusCode::kError,
+ "Internal Server Error");
+ outer_span->SetAttribute(
+ "exception", opentelemetry::common::AttributeValue(e.what()));
+ outer_span->End();
+#endif // HAVE_OPENTELEMETRY_CPP
+ }
+ rep->done();
+ return seastar::make_ready_future<
+ std::unique_ptr>(std::move(rep));
+ }
+ auto result = fut.get0();
+ rep->write_body("bin", std::move(result.content));
+#ifdef HAVE_OPENTELEMETRY_CPP
+ std::map labels = {{ "status", "success" }};
+ total_counter_->Add(1, labels);
+ outer_span->End();
+ auto end_ts = gs::GetCurrentTimeStamp();
+#if OPENTELEMETRY_ABI_VERSION_NO >= 2
+ latency_histogram_->Record(end_ts - start_ts);
+#else
+ latency_histogram_->Record(end_ts - start_ts,
+ opentelemetry::context::Context{});
+#endif
+#endif // HAVE_OPENTELEMETRY_CPP
+ rep->done();
+ return seastar::make_ready_future<
+ std::unique_ptr>(std::move(rep));
+ });
}
+
+ private:
+ uint32_t executor_idx_;
+ std::vector executor_refs_;
+ std::vector codegen_actor_refs_;
+#ifdef HAVE_OPENTELEMETRY_CPP
+ opentelemetry::nostd::unique_ptr>
+ total_counter_;
+ opentelemetry::nostd::unique_ptr>
+ latency_histogram_;
+#endif
};
-graph_db_http_handler::graph_db_http_handler(uint16_t http_port)
- : http_port_(http_port) {}
+///////////////////////////graph_db_http_handler/////////////////////////////
+
+graph_db_http_handler::graph_db_http_handler(uint16_t http_port,
+ int32_t shard_num,
+ bool enable_adhoc_handlers)
+ : http_port_(http_port),
+ enable_adhoc_handlers_(enable_adhoc_handlers),
+ running_(false),
+ actors_running_(true) {
+ current_graph_query_handlers_.resize(shard_num);
+ all_graph_query_handlers_.resize(shard_num);
+ if (enable_adhoc_handlers_) {
+ adhoc_query_handlers_.resize(shard_num);
+ }
+}
+
+graph_db_http_handler::~graph_db_http_handler() {
+ if (is_running()) {
+ stop();
+ }
+ // DO NOT DELETE the handler pointers, they will be deleted by
+ // seastar::httpd::match_rule
+}
+
+uint16_t graph_db_http_handler::get_port() const { return http_port_; }
+
+bool graph_db_http_handler::is_running() const { return running_.load(); }
+
+bool graph_db_http_handler::is_actors_running() const {
+ return actors_running_.load();
+}
+
+seastar::future<> graph_db_http_handler::stop_query_actors() {
+ return current_graph_query_handlers_[hiactor::local_shard_id()]
+ ->stop()
+ .then([this] {
+ return all_graph_query_handlers_[hiactor::local_shard_id()]->stop();
+ })
+ .then([this] {
+ if (enable_adhoc_handlers_.load()) {
+ return adhoc_query_handlers_[hiactor::local_shard_id()]->stop();
+ } else {
+ return seastar::make_ready_future<>();
+ }
+ })
+ .then([this] {
+ actors_running_.store(false);
+ return seastar::make_ready_future<>();
+ });
+}
+
+void graph_db_http_handler::start_query_actors() {
+ current_graph_query_handlers_[hiactor::local_shard_id()]->start();
+ all_graph_query_handlers_[hiactor::local_shard_id()]->start();
+ if (enable_adhoc_handlers_.load()) {
+ adhoc_query_handlers_[hiactor::local_shard_id()]->start();
+ }
+ actors_running_.store(true);
+}
void graph_db_http_handler::start() {
auto fut = seastar::alien::submit_to(
@@ -126,34 +600,55 @@ void graph_db_http_handler::start() {
.then([this] {
fmt::print("Http handler is listening on port {} ...\n",
http_port_);
+ running_.store(true);
});
});
fut.wait();
}
void graph_db_http_handler::stop() {
- auto fut =
- seastar::alien::submit_to(*seastar::alien::internal::default_instance, 0,
- [this] { return server_.stop(); });
+ auto fut = seastar::alien::submit_to(
+ *seastar::alien::internal::default_instance, 0, [this] {
+ LOG(INFO) << "Stopping http handler ...";
+ return server_.stop();
+ });
fut.wait();
+ // update running state
+ running_.store(false);
}
seastar::future<> graph_db_http_handler::set_routes() {
- return server_.set_routes([](seastar::httpd::routes& r) {
- r.add(seastar::httpd::operation_type::POST,
- seastar::httpd::url("/interactive/query"),
- new graph_db_ic_handler(ic_query_group_id, shard_query_concurrency));
- r.add(
- seastar::httpd::operation_type::POST,
- seastar::httpd::url("/interactive/update"),
- new graph_db_ic_handler(ic_update_group_id, shard_update_concurrency));
- r.add(
- seastar::httpd::operation_type::POST,
- seastar::httpd::url("/interactive/app"),
- new graph_db_ic_handler(ic_update_group_id, shard_update_concurrency));
- r.add(seastar::httpd::operation_type::POST,
- seastar::httpd::url("/interactive/exit"),
- new graph_db_exit_handler());
+ return server_.set_routes([this](seastar::httpd::routes& r) {
+ // matches /v1/graph/current/query
+ current_graph_query_handlers_[hiactor::local_shard_id()] =
+ new stored_proc_handler(ic_query_group_id, max_group_id, group_inc_step,
+ shard_query_concurrency);
+ r.put(seastar::httpd::operation_type::POST, "/v1/graph/current/query",
+ current_graph_query_handlers_[hiactor::local_shard_id()]);
+
+ // matches /v1/graph/{graph_id}/query
+ all_graph_query_handlers_[hiactor::local_shard_id()] =
+ new stored_proc_handler(ic_query_group_id, max_group_id, group_inc_step,
+ shard_query_concurrency);
+ auto rule_proc = new seastar::httpd::match_rule(
+ all_graph_query_handlers_[hiactor::local_shard_id()]);
+ rule_proc->add_str("/v1/graph")
+ .add_matcher(new seastar::httpd::optional_param_matcher("graph_id"))
+ .add_str("/query");
+ r.add(rule_proc, seastar::httpd::operation_type::POST);
+ if (enable_adhoc_handlers_.load()) {
+ auto adhoc_query_handler_ =
+ new adhoc_query_handler(ic_adhoc_group_id, max_group_id,
+ group_inc_step, shard_adhoc_concurrency);
+ adhoc_query_handlers_[hiactor::local_shard_id()] = adhoc_query_handler_;
+ // Add routes
+ auto rule_adhoc = new seastar::httpd::match_rule(adhoc_query_handler_);
+ rule_adhoc->add_str("/v1/graph")
+ .add_matcher(new seastar::httpd::optional_param_matcher("graph_id"))
+ .add_str("/adhoc_query");
+ r.add(rule_adhoc, seastar::httpd::operation_type::POST);
+ }
+
return seastar::make_ready_future<>();
});
}
diff --git a/flex/engines/http_server/handler/graph_db_http_handler.h b/flex/engines/http_server/handler/graph_db_http_handler.h
index 64e03913280b..a37df668b8c3 100644
--- a/flex/engines/http_server/handler/graph_db_http_handler.h
+++ b/flex/engines/http_server/handler/graph_db_http_handler.h
@@ -16,23 +16,124 @@
#ifndef ENGINES_HTTP_SERVER_HANDLER_GRAPH_DB_HTTP_HANDLER_H_
#define ENGINES_HTTP_SERVER_HANDLER_GRAPH_DB_HTTP_HANDLER_H_
+#include "flex/engines/http_server/executor_group.actg.h"
+#include "flex/engines/http_server/generated/actor/codegen_actor_ref.act.autogen.h"
+#include "flex/engines/http_server/generated/actor/executor_ref.act.autogen.h"
+
#include
namespace server {
+class StoppableHandler : public seastar::httpd::handler_base {
+ public:
+ StoppableHandler(uint32_t init_group_id, uint32_t max_group_id,
+ uint32_t group_inc_step, uint32_t shard_concurrency)
+ : is_cancelled_(false),
+ cur_group_id_(init_group_id),
+ max_group_id_(max_group_id),
+ group_inc_step_(group_inc_step),
+ shard_concurrency_(shard_concurrency) {}
+
+ inline bool is_stopped() const { return is_cancelled_; }
+
+ virtual seastar::future<> stop() = 0;
+ virtual bool start() = 0;
+
+ protected:
+ template
+ seastar::future<> cancel_scope(FuncT func) {
+ if (is_cancelled_) {
+ LOG(INFO) << "The current scope has been already cancelled!";
+ return seastar::make_ready_future<>();
+ }
+ hiactor::scope_builder builder;
+ builder.set_shard(hiactor::local_shard_id())
+ .enter_sub_scope(hiactor::scope(0))
+ .enter_sub_scope(hiactor::scope(cur_group_id_));
+ return hiactor::actor_engine()
+ .cancel_scope_request(builder, false)
+ .then_wrapped([this, func](auto&& fut) {
+ try {
+ fut.get();
+ LOG(INFO) << "Cancel IC scope successfully!";
+ // clear the actor refs
+ // executor_refs_.clear();
+ is_cancelled_ = true;
+ } catch (const std::exception& e) {
+ // In case the scope is already cancelled, we should ignore the
+ // exception.
+ LOG(INFO) << "Failed to cancel IC scope: " << e.what();
+ }
+ func();
+ return seastar::make_ready_future<>();
+ });
+ }
+
+ template
+ bool start_scope(FuncT func) {
+ VLOG(10) << "Create actors with a different sub scope id: "
+ << cur_group_id_;
+ if (cur_group_id_ + group_inc_step_ > max_group_id_) {
+ LOG(ERROR) << "The max group id is reached, cannot create more actors!";
+ return false;
+ }
+ if (cur_group_id_ + group_inc_step_ < cur_group_id_) {
+ LOG(ERROR) << "overflow detected!";
+ return false;
+ }
+ cur_group_id_ += group_inc_step_;
+ hiactor::scope_builder builder;
+ builder.set_shard(hiactor::local_shard_id())
+ .enter_sub_scope(hiactor::scope(0))
+ .enter_sub_scope(hiactor::scope(cur_group_id_));
+ // for (unsigned i = 0; i < shard_concurrency_; ++i) {
+ // executor_refs_.emplace_back(builder.build_ref(i));
+ // }
+ func(builder);
+ is_cancelled_ = false; // locked outside
+ return true;
+ }
+
+ bool is_cancelled_;
+ uint32_t cur_group_id_;
+ const uint32_t max_group_id_, group_inc_step_;
+ const uint32_t shard_concurrency_;
+};
+
class graph_db_http_handler {
public:
- graph_db_http_handler(uint16_t http_port);
+ graph_db_http_handler(uint16_t http_port, int32_t shard_num,
+ bool enable_adhoc_handlers = false);
+
+ ~graph_db_http_handler();
void start();
void stop();
+ uint16_t get_port() const;
+
+ bool is_running() const;
+
+ bool is_actors_running() const;
+
+ seastar::future<> stop_query_actors();
+
+ void start_query_actors();
+
private:
seastar::future<> set_routes();
private:
const uint16_t http_port_;
seastar::httpd::http_server_control server_;
+
+ std::atomic enable_adhoc_handlers_{false}, running_{false},
+ actors_running_{false};
+
+ // Handles graph queries submitted to /v1/graph/current/query
+ std::vector current_graph_query_handlers_;
+ std::vector all_graph_query_handlers_;
+ std::vector adhoc_query_handlers_;
};
} // namespace server
diff --git a/flex/engines/http_server/handler/hqps_http_handler.cc b/flex/engines/http_server/handler/hqps_http_handler.cc
deleted file mode 100644
index 2ddd83c984ab..000000000000
--- a/flex/engines/http_server/handler/hqps_http_handler.cc
+++ /dev/null
@@ -1,635 +0,0 @@
-/** Copyright 2020 Alibaba Group Holding Limited.
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-#include "flex/engines/http_server/handler/hqps_http_handler.h"
-
-#ifdef HAVE_OPENTELEMETRY_CPP
-#include "opentelemetry/context/context.h"
-#include "opentelemetry/trace/span_metadata.h"
-#include "opentelemetry/trace/span_startoptions.h"
-#endif // HAVE_OPENTELEMETRY_CPP
-
-#include "flex/engines/graph_db/database/graph_db_session.h"
-#include "flex/engines/http_server/executor_group.actg.h"
-#include "flex/engines/http_server/options.h"
-#include "flex/engines/http_server/service/hqps_service.h"
-#include "flex/engines/http_server/types.h"
-#include "flex/otel/otel.h"
-
-#include
-
-namespace seastar {
-namespace httpd {
-// The seastar::httpd::param_matcher will fail to match if param is not
-// specified.
-class optional_param_matcher : public matcher {
- public:
- /**
- * Constructor
- * @param name the name of the parameter, will be used as the key
- * in the parameters object
- * @param entire_path when set to true, the matched parameters will
- * include all the remaining url until the end of it.
- * when set to false the match will terminate at the next slash
- */
- explicit optional_param_matcher(const sstring& name) : _name(name) {}
-
- size_t match(const sstring& url, size_t ind, parameters& param) override {
- size_t last = find_end_param(url, ind);
- if (last == url.size()) {
- // Means we didn't find the parameter, but we still return true,
- // and set the value to empty string.
- param.set(_name, "");
- return ind;
- }
- param.set(_name, url.substr(ind, last - ind));
- return last;
- }
-
- private:
- size_t find_end_param(const sstring& url, size_t ind) {
- size_t pos = url.find('/', ind + 1);
- if (pos == sstring::npos) {
- return url.length();
- }
- return pos;
- }
- sstring _name;
-};
-} // namespace httpd
-} // namespace seastar
-
-namespace server {
-
-hqps_ic_handler::hqps_ic_handler(uint32_t init_group_id, uint32_t max_group_id,
- uint32_t group_inc_step,
- uint32_t shard_concurrency)
- : cur_group_id_(init_group_id),
- max_group_id_(max_group_id),
- group_inc_step_(group_inc_step),
- shard_concurrency_(shard_concurrency),
- executor_idx_(0),
- is_cancelled_(false) {
- executor_refs_.reserve(shard_concurrency_);
- hiactor::scope_builder builder;
- builder.set_shard(hiactor::local_shard_id())
- .enter_sub_scope(hiactor::scope(0))
- .enter_sub_scope(hiactor::scope(cur_group_id_));
- for (unsigned i = 0; i < shard_concurrency_; ++i) {
- executor_refs_.emplace_back(builder.build_ref(i));
- }
-#ifdef HAVE_OPENTELEMETRY_CPP
- total_counter_ = otel::create_int_counter("hqps_procedure_query_total");
- latency_histogram_ =
- otel::create_double_histogram("hqps_procedure_query_latency");
-#endif
-}
-
-hqps_ic_handler::~hqps_ic_handler() = default;
-
-seastar::future<> hqps_ic_handler::cancel_current_scope() {
- if (is_cancelled_) {
- LOG(INFO) << "The current scope has been already cancelled!";
- return seastar::make_ready_future<>();
- }
- hiactor::scope_builder builder;
- builder.set_shard(hiactor::local_shard_id())
- .enter_sub_scope(hiactor::scope(0))
- .enter_sub_scope(hiactor::scope(cur_group_id_));
- return hiactor::actor_engine()
- .cancel_scope_request(builder, false)
- .then([this] {
- LOG(INFO) << "Cancel IC scope successfully!";
- // clear the actor refs
- executor_refs_.clear();
- is_cancelled_ = true;
- return seastar::make_ready_future<>();
- });
-}
-
-bool hqps_ic_handler::is_current_scope_cancelled() const {
- return is_cancelled_;
-}
-
-bool hqps_ic_handler::create_actors() {
- if (executor_refs_.size() > 0) {
- LOG(ERROR) << "The actors have been already created!";
- return false;
- }
-
- VLOG(10) << "Create actors with a different sub scope id: " << cur_group_id_;
- if (cur_group_id_ + group_inc_step_ > max_group_id_) {
- LOG(ERROR) << "The max group id is reached, cannot create more actors!";
- return false;
- }
- if (cur_group_id_ + group_inc_step_ < cur_group_id_) {
- LOG(ERROR) << "overflow detected!";
- return false;
- }
- cur_group_id_ += group_inc_step_;
- hiactor::scope_builder builder;
- builder.set_shard(hiactor::local_shard_id())
- .enter_sub_scope(hiactor::scope(0))
- .enter_sub_scope(hiactor::scope(cur_group_id_));
- for (unsigned i = 0; i < shard_concurrency_; ++i) {
- executor_refs_.emplace_back(builder.build_ref(i));
- }
- is_cancelled_ = false; // locked outside
- return true;
-}
-
-bool hqps_ic_handler::is_running_graph(const seastar::sstring& graph_id) const {
- std::string graph_id_str(graph_id.data(), graph_id.size());
- auto running_graph_res =
- HQPSService::get().get_metadata_store()->GetRunningGraph();
- if (!running_graph_res.ok()) {
- LOG(ERROR) << "Failed to get running graph: "
- << running_graph_res.status().error_message();
- return false;
- }
- return running_graph_res.value() == graph_id_str;
-}
-
-// Handles both /v1/graph/{graph_id}/query and /v1/graph/current/query/
-seastar::future> hqps_ic_handler::handle(
- const seastar::sstring& path, std::unique_ptr req,
- std::unique_ptr rep) {
- auto dst_executor = executor_idx_;
- executor_idx_ = (executor_idx_ + 1) % shard_concurrency_;
- // TODO(zhanglei): choose read or write based on the request, after the
- // read/write info is supported in physical plan
- auto request_format = req->get_header(INTERACTIVE_REQUEST_FORMAT);
- if (request_format.empty()) {
- // If no format specfied, we use default format: proto
- request_format = PROTOCOL_FORMAT;
- }
- if (request_format == JSON_FORMAT) {
- req->content.append(gs::GraphDBSession::kCypherJson, 1);
- } else if (request_format == PROTOCOL_FORMAT) {
- req->content.append(gs::GraphDBSession::kCypherInternalProcedure, 1);
- } else if (request_format == ENCODER_FORMAT) {
- req->content.append(gs::GraphDBSession::kCppEncoder, 1);
- } else {
- LOG(ERROR) << "Unsupported request format: " << request_format;
- rep->set_status(seastar::httpd::reply::status_type::internal_server_error);
- rep->write_body("bin", seastar::sstring("Unsupported request format!"));
- rep->done();
- return seastar::make_ready_future>(
- std::move(rep));
- }
- if (path != "/v1/graph/current/query" && req->param.exists("graph_id")) {
- // TODO(zhanglei): get from graph_db.
- if (!is_running_graph(req->param["graph_id"])) {
- rep->set_status(
- seastar::httpd::reply::status_type::internal_server_error);
- rep->write_body("bin",
- seastar::sstring("The querying query is not running:" +
- req->param["graph_id"]));
- rep->done();
- return seastar::make_ready_future>(
- std::move(rep));
- }
- }
-#ifdef HAVE_OPENTELEMETRY_CPP
- auto tracer = otel::get_tracer("hqps_procedure_query_handler");
- // Extract context from headers. This copy is necessary to avoid access after
- // header content been freed
- std::map headers(req->_headers.begin(),
- req->_headers.end());
- auto current_ctx = opentelemetry::context::RuntimeContext::GetCurrent();
- auto options = otel::get_parent_ctx(current_ctx, headers);
- auto outer_span = tracer->StartSpan("procedure_query_handling", options);
- auto scope = tracer->WithActiveSpan(outer_span);
- auto start_ts = gs::GetCurrentTimeStamp();
-#endif // HAVE_OPENTELEMETRY_CPP
-
- return executor_refs_[dst_executor]
- .run_graph_db_query(query_param{std::move(req->content)})
- .then([request_format
-#ifdef HAVE_OPENTELEMETRY_CPP
- ,
- this, outer_span = outer_span
-#endif // HAVE_OPENTELEMETRY_CPP
- ](auto&& output) {
- if (request_format == ENCODER_FORMAT) {
- return seastar::make_ready_future(
- std::move(output.content));
- } else {
- // For cypher input format, the results are written with
- // output.put_string(), which will add extra 4 bytes. So we need to
- // remove the first 4 bytes here.
- if (output.content.size() < 4) {
- LOG(ERROR) << "Invalid output size: " << output.content.size();
-#ifdef HAVE_OPENTELEMETRY_CPP
- outer_span->SetStatus(opentelemetry::trace::StatusCode::kError,
- "Invalid output size");
- outer_span->End();
- std::map labels = {{"status", "fail"}};
- total_counter_->Add(1, labels);
-#endif // HAVE_OPENTELEMETRY_CPP
- return seastar::make_ready_future(std::move(output));
- }
- return seastar::make_ready_future(
- std::move(output.content.substr(4)));
- }
- })
- .then_wrapped([rep = std::move(rep)
-#ifdef HAVE_OPENTELEMETRY_CPP
- ,
- this, outer_span, start_ts
-#endif // HAVE_OPENTELEMETRY_CPP
- ](seastar::future&& fut) mutable {
- if (__builtin_expect(fut.failed(), false)) {
- rep->set_status(
- seastar::httpd::reply::status_type::internal_server_error);
- try {
- std::rethrow_exception(fut.get_exception());
- } catch (std::exception& e) {
- rep->write_body("bin", seastar::sstring(e.what()));
- }
-#ifdef HAVE_OPENTELEMETRY_CPP
- outer_span->SetStatus(opentelemetry::trace::StatusCode::kError,
- "Internal Server Error");
- outer_span->End();
- std::map labels = {{"status", "fail"}};
- total_counter_->Add(1, labels);
-#endif // HAVE_OPENTELEMETRY_CPP
- rep->done();
- return seastar::make_ready_future<
- std::unique_ptr>(std::move(rep));
- }
- auto result = fut.get0();
- rep->write_body("bin", std::move(result.content));
-#ifdef HAVE_OPENTELEMETRY_CPP
- outer_span->End();
- std::map labels = {{"status", "success"}};
- total_counter_->Add(1, labels);
- auto end_ts = gs::GetCurrentTimeStamp();
-#if OPENTELEMETRY_ABI_VERSION_NO >= 2
- latency_histogram_->Record(end_ts - start_ts);
-#else
- latency_histogram_->Record(end_ts - start_ts,
- opentelemetry::context::Context{});
-#endif
-#endif // HAVE_OPENTELEMETRY_CPP
- rep->done();
- return seastar::make_ready_future<
- std::unique_ptr>(std::move(rep));
- });
-}
-
-// a handler to handle adhoc query.
-
-hqps_adhoc_query_handler::hqps_adhoc_query_handler(
- uint32_t init_adhoc_group_id, uint32_t init_codegen_group_id,
- uint32_t max_group_id, uint32_t group_inc_step, uint32_t shard_concurrency)
- : cur_adhoc_group_id_(init_adhoc_group_id),
- cur_codegen_group_id_(init_codegen_group_id),
- max_group_id_(max_group_id),
- group_inc_step_(group_inc_step),
- shard_concurrency_(shard_concurrency),
- executor_idx_(0),
- is_cancelled_(false) {
- executor_refs_.reserve(shard_concurrency_);
- {
- hiactor::scope_builder builder;
- builder.set_shard(hiactor::local_shard_id())
- .enter_sub_scope(hiactor::scope(0))
- .enter_sub_scope(
- hiactor::scope(cur_adhoc_group_id_));
- for (unsigned i = 0; i < shard_concurrency_; ++i) {
- executor_refs_.emplace_back(builder.build_ref(i));
- }
- }
- {
- hiactor::scope_builder builder;
- builder.set_shard(hiactor::local_shard_id())
- .enter_sub_scope(hiactor::scope(0))
- .enter_sub_scope(
- hiactor::scope(cur_codegen_group_id_));
- codegen_actor_refs_.emplace_back(builder.build_ref(0));
- }
-#ifdef HAVE_OPENTELEMETRY_CPP
- total_counter_ = otel::create_int_counter("hqps_adhoc_query_total");
- latency_histogram_ =
- otel::create_double_histogram("hqps_adhoc_query_latency");
-#endif // HAVE_OPENTELEMETRY_CPP
-}
-hqps_adhoc_query_handler::~hqps_adhoc_query_handler() = default;
-
-seastar::future<> hqps_adhoc_query_handler::cancel_current_scope() {
- if (is_cancelled_) {
- LOG(INFO) << "The current scope has been already cancelled!";
- return seastar::make_ready_future<>();
- }
- hiactor::scope_builder adhoc_builder;
- adhoc_builder.set_shard(hiactor::local_shard_id())
- .enter_sub_scope(hiactor::scope(0))
- .enter_sub_scope(
- hiactor::scope(cur_adhoc_group_id_));
- hiactor::scope_builder codegen_builder;
- codegen_builder.set_shard(hiactor::local_shard_id())
- .enter_sub_scope(hiactor::scope(0))
- .enter_sub_scope(
- hiactor::scope(cur_codegen_group_id_));
- return hiactor::actor_engine()
- .cancel_scope_request(adhoc_builder, false)
- .then([codegen_builder] {
- LOG(INFO) << "Cancel adhoc scope successfully!";
- return hiactor::actor_engine().cancel_scope_request(codegen_builder,
- false);
- })
- .then([this] {
- LOG(INFO) << "Cancel codegen scope successfully!";
- // clear the actor refs
- executor_refs_.clear();
- codegen_actor_refs_.clear();
- LOG(INFO) << "Clear actor refs successfully!";
- is_cancelled_ = true;
- return seastar::make_ready_future<>();
- });
-}
-
-bool hqps_adhoc_query_handler::is_current_scope_cancelled() const {
- return is_cancelled_;
-}
-
-bool hqps_adhoc_query_handler::create_actors() {
- if (executor_refs_.size() > 0 || codegen_actor_refs_.size() > 0) {
- LOG(ERROR) << "The actors have been already created!";
- return false;
- }
- // Check whether cur_adhoc_group_id + group_inc_step_ is larger than
- // max_group_id_, considering overflow
- if (cur_adhoc_group_id_ + group_inc_step_ > max_group_id_ ||
- cur_codegen_group_id_ + group_inc_step_ > max_group_id_) {
- LOG(ERROR) << "The max group id is reached, cannot create more actors!";
- return false;
- }
- if (cur_adhoc_group_id_ + group_inc_step_ < cur_adhoc_group_id_ ||
- cur_codegen_group_id_ + group_inc_step_ < cur_codegen_group_id_) {
- LOG(ERROR) << "overflow detected!";
- return false;
- }
-
- {
- cur_adhoc_group_id_ += group_inc_step_;
- hiactor::scope_builder builder;
- builder.set_shard(hiactor::local_shard_id())
- .enter_sub_scope(hiactor::scope(0))
- .enter_sub_scope(
- hiactor::scope(cur_adhoc_group_id_));
- for (unsigned i = 0; i < shard_concurrency_; ++i) {
- executor_refs_.emplace_back(builder.build_ref(i));
- }
- }
- {
- cur_codegen_group_id_ += group_inc_step_;
- hiactor::scope_builder builder;
- builder.set_shard(hiactor::local_shard_id())
- .enter_sub_scope(hiactor::scope(0))
- .enter_sub_scope(
- hiactor::scope(cur_codegen_group_id_));
- codegen_actor_refs_.emplace_back(builder.build_ref(0));
- }
- is_cancelled_ = false;
- return true;
-}
-
-seastar::future>
-hqps_adhoc_query_handler::handle(const seastar::sstring& path,
- std::unique_ptr req,
- std::unique_ptr rep) {
- auto dst_executor = executor_idx_;
- executor_idx_ = (executor_idx_ + 1) % shard_concurrency_;
-
-#ifdef HAVE_OPENTELEMETRY_CPP
- auto tracer = otel::get_tracer("hqps_adhoc_query_handler");
- // Extract context from headers. This copy is necessary to avoid access
- // after header content been freed
- std::map headers(req->_headers.begin(),
- req->_headers.end());
- auto current_ctx = opentelemetry::context::RuntimeContext::GetCurrent();
- auto options = otel::get_parent_ctx(current_ctx, headers);
- auto outer_span = tracer->StartSpan("adhoc_query_handling", options);
- auto scope = tracer->WithActiveSpan(outer_span);
- // Start a new span for codegen
- auto codegen_span = tracer->StartSpan("adhoc_codegen", options);
- auto codegen_scope = tracer->WithActiveSpan(codegen_span);
- // create a new span for query execution, not started.
- auto start_ts = std::chrono::duration_cast(
- std::chrono::system_clock::now().time_since_epoch())
- .count();
-#endif // HAVE_OPENTELEMETRY_CPP
- return codegen_actor_refs_[0]
- .do_codegen(query_param{std::move(req->content)})
- .then([this, dst_executor
-#ifdef HAVE_OPENTELEMETRY_CPP
- ,
- codegen_span = codegen_span, tracer = tracer, options = options,
- codegen_scope = std::move(codegen_scope), outer_span = outer_span
-#endif // HAVE_OPENTELEMETRY_CPP
- ](auto&& param) mutable {
-#ifdef HAVE_OPENTELEMETRY_CPP
- codegen_span->End();
- options.parent = outer_span->GetContext();
- auto query_span = tracer->StartSpan("adhoc_query_execution", options);
- auto query_scope = tracer->WithActiveSpan(query_span);
-#endif // HAVE_OPENTELEMETRY_CPP
- // TODO(zhanglei): choose read or write based on the request, after the
- // read/write info is supported in physical plan
- // The content contains the path to dynamic library
- param.content.append(gs::Schema::HQPS_ADHOC_WRITE_PLUGIN_ID_STR, 1);
- param.content.append(gs::GraphDBSession::kCypherInternalAdhoc, 1);
- return executor_refs_[dst_executor]
- .run_graph_db_query(query_param{std::move(param.content)})
- .then([
-#ifdef HAVE_OPENTELEMETRY_CPP
- query_span = query_span,
- query_scope = std::move(query_scope)
-#endif // HAVE_OPENTELEMETRY_CPP
- ](auto&& output) {
-#ifdef HAVE_OPENTELEMETRY_CPP
- query_span->End();
-#endif // HAVE_OPENTELEMETRY_CPP
- return seastar::make_ready_future(
- std::move(output.content));
- });
- })
- .then([
-#ifdef HAVE_OPENTELEMETRY_CPP
- this, outer_span = outer_span
-#endif // HAVE_OPENTELEMETRY_CPP
- ](auto&& output) {
- if (output.content.size() < 4) {
- LOG(ERROR) << "Invalid output size: " << output.content.size();
-#ifdef HAVE_OPENTELEMETRY_CPP
- std::map labels = {{"status", "fail"}};
- total_counter_->Add(1, labels);
- outer_span->SetStatus(opentelemetry::trace::StatusCode::kError,
- "Internal output size");
- outer_span->End();
-#endif // HAVE_OPENTELEMETRY_CPP
- return seastar::make_ready_future(std::move(output));
- }
- return seastar::make_ready_future(
- std::move(output.content.substr(4)));
- })
- .then_wrapped([rep = std::move(rep)
-#ifdef HAVE_OPENTELEMETRY_CPP
- ,
- this, outer_span, start_ts
-#endif // HAVE_OPENTELEMETRY_CPP
- ](seastar::future&& fut) mutable {
- if (__builtin_expect(fut.failed(), false)) {
- rep->set_status(
- seastar::httpd::reply::status_type::internal_server_error);
- try {
- std::rethrow_exception(fut.get_exception());
- } catch (std::exception& e) {
- rep->write_body("bin", seastar::sstring(e.what()));
-#ifdef HAVE_OPENTELEMETRY_CPP
- std::map labels = {{"status", "fail"}};
- total_counter_->Add(1, labels);
- outer_span->SetStatus(opentelemetry::trace::StatusCode::kError,
- "Internal Server Error");
- outer_span->SetAttribute(
- "exception", opentelemetry::common::AttributeValue(e.what()));
- outer_span->End();
-#endif // HAVE_OPENTELEMETRY_CPP
- }
- rep->done();
- return seastar::make_ready_future<
- std::unique_ptr>(std::move(rep));
- }
- auto result = fut.get0();
- rep->write_body("bin", std::move(result.content));
-#ifdef HAVE_OPENTELEMETRY_CPP
- std::map labels = {{"status", "success"}};
- total_counter_->Add(1, labels);
- outer_span->End();
- auto end_ts = gs::GetCurrentTimeStamp();
-#if OPENTELEMETRY_ABI_VERSION_NO >= 2
- latency_histogram_->Record(end_ts - start_ts);
-#else
- latency_histogram_->Record(end_ts - start_ts,
- opentelemetry::context::Context{});
-#endif
-#endif // HAVE_OPENTELEMETRY_CPP
- rep->done();
- return seastar::make_ready_future<
- std::unique_ptr>(std::move(rep));
- });
-}
-
-hqps_http_handler::hqps_http_handler(uint16_t http_port, int32_t shard_num)
- : http_port_(http_port), actors_running_(true) {
- ic_handlers_.resize(shard_num);
- adhoc_query_handlers_.resize(shard_num);
-}
-
-hqps_http_handler::~hqps_http_handler() {
- if (is_running()) {
- stop();
- }
- // DO NOT DELETE the handler pointers, they will be deleted by
- // seastar::httpd::match_rule
-}
-
-uint16_t hqps_http_handler::get_port() const { return http_port_; }
-
-bool hqps_http_handler::is_running() const { return running_.load(); }
-
-bool hqps_http_handler::is_actors_running() const {
- return actors_running_.load();
-}
-
-void hqps_http_handler::start() {
- auto fut = seastar::alien::submit_to(
- *seastar::alien::internal::default_instance, 0, [this] {
- return server_.start()
- .then([this] { return set_routes(); })
- .then([this] { return server_.listen(http_port_); })
- .then([this] {
- fmt::print(
- "HQPS Query http handler is listening on port {} "
- "...\n",
- http_port_);
- });
- });
- fut.wait();
- // update running state
- running_.store(true);
-}
-
-void hqps_http_handler::stop() {
- auto fut = seastar::alien::submit_to(
- *seastar::alien::internal::default_instance, 0, [this] {
- LOG(INFO) << "Stopping HQPS http handler ...";
- return server_.stop();
- });
- fut.wait();
- // update running state
- running_.store(false);
-}
-
-seastar::future<> hqps_http_handler::stop_query_actors() {
- // First cancel the scope.
- return ic_handlers_[hiactor::local_shard_id()]
- ->cancel_current_scope()
- .then([this] {
- LOG(INFO) << "Cancelled ic scope";
- return adhoc_query_handlers_[hiactor::local_shard_id()]
- ->cancel_current_scope();
- })
- .then([this] {
- LOG(INFO) << "Cancelled proc scope";
- actors_running_.store(false);
- return seastar::make_ready_future<>();
- });
-}
-
-void hqps_http_handler::start_query_actors() {
- ic_handlers_[hiactor::local_shard_id()]->create_actors();
- adhoc_query_handlers_[hiactor::local_shard_id()]->create_actors();
- actors_running_.store(true);
-}
-
-seastar::future<> hqps_http_handler::set_routes() {
- return server_.set_routes([this](seastar::httpd::routes& r) {
- auto ic_handler =
- new hqps_ic_handler(ic_query_group_id, max_group_id, group_inc_step,
- shard_query_concurrency);
- auto adhoc_query_handler = new hqps_adhoc_query_handler(
- ic_adhoc_group_id, codegen_group_id, max_group_id, group_inc_step,
- shard_adhoc_concurrency);
-
- auto rule_proc = new seastar::httpd::match_rule(ic_handler);
- rule_proc->add_str("/v1/graph")
- .add_matcher(new seastar::httpd::optional_param_matcher("graph_id"))
- .add_str("/query");
-
- r.add(rule_proc, seastar::httpd::operation_type::POST);
-
- r.add(seastar::httpd::operation_type::POST,
- seastar::httpd::url("/interactive/adhoc_query"), adhoc_query_handler);
-
- ic_handlers_[hiactor::local_shard_id()] = ic_handler;
- adhoc_query_handlers_[hiactor::local_shard_id()] = adhoc_query_handler;
-
- return seastar::make_ready_future<>();
- });
-}
-
-} // namespace server
diff --git a/flex/engines/http_server/handler/hqps_http_handler.h b/flex/engines/http_server/handler/hqps_http_handler.h
deleted file mode 100644
index a89e97dfe6e5..000000000000
--- a/flex/engines/http_server/handler/hqps_http_handler.h
+++ /dev/null
@@ -1,140 +0,0 @@
-/** Copyright 2020 Alibaba Group Holding Limited.
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-#ifndef ENGINES_HTTP_SERVER_HANDLER_HQPS_HTTP_HANDLER_H_
-#define ENGINES_HTTP_SERVER_HANDLER_HQPS_HTTP_HANDLER_H_
-
-#include
-#include
-#include
-#include
-#include "flex/engines/http_server/generated/actor/codegen_actor_ref.act.autogen.h"
-#include "flex/engines/http_server/generated/actor/executor_ref.act.autogen.h"
-
-#ifdef HAVE_OPENTELEMETRY_CPP
-#include "opentelemetry/metrics/sync_instruments.h"
-#include "opentelemetry/nostd/unique_ptr.h"
-#endif // HAVE_OPENTELEMETRY_CPP
-
-namespace server {
-
-class hqps_ic_handler : public seastar::httpd::handler_base {
- public:
- // extra headers
- static constexpr const char* INTERACTIVE_REQUEST_FORMAT =
- "X-Interactive-Request-Format";
- static constexpr const char* PROTOCOL_FORMAT = "proto";
- static constexpr const char* JSON_FORMAT = "json";
- static constexpr const char* ENCODER_FORMAT = "encoder";
- hqps_ic_handler(uint32_t init_group_id, uint32_t max_group_id,
- uint32_t group_inc_step, uint32_t shard_concurrency);
- ~hqps_ic_handler() override;
-
- bool create_actors();
-
- seastar::future<> cancel_current_scope();
-
- bool is_current_scope_cancelled() const;
-
- seastar::future> handle(
- const seastar::sstring& path,
- std::unique_ptr req,
- std::unique_ptr rep) override;
-
- private:
- bool is_running_graph(const seastar::sstring& graph_id) const;
-
- uint32_t cur_group_id_;
- const uint32_t max_group_id_, group_inc_step_;
- const uint32_t shard_concurrency_;
- uint32_t executor_idx_;
- std::vector executor_refs_;
- bool is_cancelled_;
-#ifdef HAVE_OPENTELEMETRY_CPP
- opentelemetry::nostd::unique_ptr>
- total_counter_;
- opentelemetry::nostd::unique_ptr>
- latency_histogram_;
-#endif
-};
-
-class hqps_adhoc_query_handler : public seastar::httpd::handler_base {
- public:
- hqps_adhoc_query_handler(uint32_t init_adhoc_group_id,
- uint32_t init_codegen_group_id,
- uint32_t max_group_id, uint32_t group_inc_step,
- uint32_t shard_concurrency);
-
- ~hqps_adhoc_query_handler() override;
-
- seastar::future<> cancel_current_scope();
-
- bool is_current_scope_cancelled() const;
-
- bool create_actors();
-
- seastar::future> handle(
- const seastar::sstring& path,
- std::unique_ptr req,
- std::unique_ptr rep) override;
-
- private:
- uint32_t cur_adhoc_group_id_, cur_codegen_group_id_;
- const uint32_t max_group_id_, group_inc_step_;
- const uint32_t shard_concurrency_;
- uint32_t executor_idx_;
- std::vector executor_refs_;
- std::vector codegen_actor_refs_;
- bool is_cancelled_;
-#ifdef HAVE_OPENTELEMETRY_CPP
- opentelemetry::nostd::unique_ptr>
- total_counter_;
- opentelemetry::nostd::unique_ptr>
- latency_histogram_;
-#endif
-};
-
-class hqps_http_handler {
- public:
- hqps_http_handler(uint16_t http_port, int32_t shard_num);
- ~hqps_http_handler();
-
- void start();
- void stop();
-
- uint16_t get_port() const;
-
- bool is_running() const;
-
- bool is_actors_running() const;
-
- seastar::future<> stop_query_actors();
-
- void start_query_actors();
-
- private:
- seastar::future<> set_routes();
-
- private:
- const uint16_t http_port_;
- seastar::httpd::http_server_control server_;
- std::atomic running_{false}, actors_running_{false};
-
- std::vector ic_handlers_;
- std::vector adhoc_query_handlers_;
-};
-
-} // namespace server
-
-#endif // ENGINES_HTTP_SERVER_HANDLER_HQPS_HTTP_HANDLER_H_
diff --git a/flex/engines/http_server/options.cc b/flex/engines/http_server/options.cc
index 2f7c0441acf3..58d03092e49b 100644
--- a/flex/engines/http_server/options.cc
+++ b/flex/engines/http_server/options.cc
@@ -17,13 +17,8 @@
namespace server {
+uint32_t shard_admin_concurrency = 2; // at least 2, to allow restarting
uint32_t shard_query_concurrency = 16;
-uint32_t shard_update_concurrency = 4;
uint32_t shard_adhoc_concurrency = 4;
-uint32_t shard_admin_graph_concurrency = 1;
-uint32_t shard_admin_procedure_concurrency = 1;
-uint32_t shard_admin_node_concurrency = 1;
-uint32_t shard_admin_job_concurrency = 1;
-uint32_t shard_admin_service_concurrency = 1;
} // namespace server
diff --git a/flex/engines/http_server/options.h b/flex/engines/http_server/options.h
index b7110a9d6620..a7792e709d44 100644
--- a/flex/engines/http_server/options.h
+++ b/flex/engines/http_server/options.h
@@ -24,24 +24,16 @@ namespace server {
/// make update executors with higher priority.
const uint32_t interactive_admin_group_id = 1;
const uint32_t ic_query_group_id = 2;
-const uint32_t ic_update_group_id = 3;
-const uint32_t ic_adhoc_group_id = 4;
-const uint32_t codegen_group_id = 5;
-const uint32_t proc_query_group_id = 6;
+const uint32_t ic_adhoc_group_id = 3;
const uint32_t max_group_id = std::numeric_limits::max();
const uint32_t group_inc_step =
- 5; // should equal to number of non-admin groups.
+ 2; // should equal to number of non-admin groups.
// Each time we cancel a scope, we will increase the group id by this step.
+extern uint32_t shard_admin_concurrency;
extern uint32_t shard_query_concurrency;
-extern uint32_t shard_update_concurrency;
extern uint32_t shard_adhoc_concurrency;
-extern uint32_t shard_admin_graph_concurrency;
-extern uint32_t shard_admin_node_concurrency;
-extern uint32_t shard_admin_service_concurrency;
-extern uint32_t shard_admin_job_concurrency;
-extern uint32_t shard_admin_procedure_concurrency;
} // namespace server
diff --git a/flex/engines/http_server/service/graph_db_service.cc b/flex/engines/http_server/service/graph_db_service.cc
deleted file mode 100644
index 93530445a35c..000000000000
--- a/flex/engines/http_server/service/graph_db_service.cc
+++ /dev/null
@@ -1,43 +0,0 @@
-/** Copyright 2020 Alibaba Group Holding Limited.
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-#include "flex/engines/http_server/service/graph_db_service.h"
-#include "flex/engines/http_server/options.h"
-namespace server {
-
-void GraphDBService::init(uint32_t num_shards, uint16_t http_port,
- bool dpdk_mode) {
- actor_sys_ = std::make_unique(num_shards, dpdk_mode);
- http_hdl_ = std::make_unique(http_port);
-}
-
-void GraphDBService::run_and_wait_for_exit() {
- if (!actor_sys_ || !http_hdl_) {
- std::cerr << "GraphDB service has not been inited!" << std::endl;
- return;
- }
- actor_sys_->launch();
- http_hdl_->start();
- running_.store(true);
- while (running_.load(std::memory_order_relaxed)) {
- std::this_thread::sleep_for(std::chrono::milliseconds(100));
- }
- http_hdl_->stop();
- actor_sys_->terminate();
-}
-
-void GraphDBService::set_exit_state() { running_.store(false); }
-
-} // namespace server
diff --git a/flex/engines/http_server/service/graph_db_service.h b/flex/engines/http_server/service/graph_db_service.h
deleted file mode 100644
index 903e99f629d3..000000000000
--- a/flex/engines/http_server/service/graph_db_service.h
+++ /dev/null
@@ -1,47 +0,0 @@
-/** Copyright 2020 Alibaba Group Holding Limited.
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-#ifndef ENGINES_HTTP_SERVER_SERVICE_GRAPH_DB_SERVICE_H_
-#define ENGINES_HTTP_SERVER_SERVICE_GRAPH_DB_SERVICE_H_
-
-#include "flex/engines/http_server/actor_system.h"
-#include "flex/engines/http_server/handler/graph_db_http_handler.h"
-
-namespace server {
-
-class GraphDBService {
- public:
- static GraphDBService& get() {
- static GraphDBService instance;
- return instance;
- }
- ~GraphDBService() = default;
-
- void init(uint32_t num_shards, uint16_t http_port, bool dpdk_mode);
- void run_and_wait_for_exit();
- void set_exit_state();
-
- private:
- GraphDBService() = default;
-
- private:
- std::unique_ptr actor_sys_;
- std::unique_ptr http_hdl_;
- std::atomic running_{false};
-};
-
-} // namespace server
-
-#endif // ENGINES_HTTP_SERVER_SERVICE_GRAPH_DB_SERVICE_H_
diff --git a/flex/interactive/sdk/java/pom.xml b/flex/interactive/sdk/java/pom.xml
index aa59b5cdb629..7efb55d5ff0c 100644
--- a/flex/interactive/sdk/java/pom.xml
+++ b/flex/interactive/sdk/java/pom.xml
@@ -5,7 +5,7 @@
interactive
jar
interactive
- 0.3
+ 0.4-SNAPSHOT
https://github.com/alibaba/GraphScope/tree/main/flex/interactive
GraphScope Interactive Java SDK
@@ -453,7 +453,35 @@
protobuf-java-util
${protobuf.version}
+
+ io.opentelemetry
+ opentelemetry-bom
+ ${opentelemetry-bom.version}
+ pom
+ import
+
+
+ io.opentelemetry
+ opentelemetry-api
+
+
+
+ io.opentelemetry.semconv
+ opentelemetry-semconv
+ 1.23.1-alpha
+
+
+
+
+ io.opentelemetry
+ opentelemetry-bom
+ 1.40.0
+ pom
+ import
+
+
+
1.8
${java.version}
@@ -480,5 +508,6 @@
3.0.1
1.6.7
2.2.1
+ 1.37.0
diff --git a/flex/interactive/sdk/java/src/main/java/com/alibaba/graphscope/interactive/client/ProcedureInterface.java b/flex/interactive/sdk/java/src/main/java/com/alibaba/graphscope/interactive/client/ProcedureInterface.java
index e48d24810884..e339022559e6 100644
--- a/flex/interactive/sdk/java/src/main/java/com/alibaba/graphscope/interactive/client/ProcedureInterface.java
+++ b/flex/interactive/sdk/java/src/main/java/com/alibaba/graphscope/interactive/client/ProcedureInterface.java
@@ -22,7 +22,6 @@
/**
* All APIs about procedure management.
- * TODO(zhanglei): differ between ProcedureRequest and Procedure
*/
public interface ProcedureInterface {
Result createProcedure(
diff --git a/flex/interactive/sdk/java/src/main/java/com/alibaba/graphscope/interactive/client/QueryInterface.java b/flex/interactive/sdk/java/src/main/java/com/alibaba/graphscope/interactive/client/QueryInterface.java
index 7327ff82d6ed..3bfe81656f5b 100644
--- a/flex/interactive/sdk/java/src/main/java/com/alibaba/graphscope/interactive/client/QueryInterface.java
+++ b/flex/interactive/sdk/java/src/main/java/com/alibaba/graphscope/interactive/client/QueryInterface.java
@@ -15,16 +15,67 @@
*/
package com.alibaba.graphscope.interactive.client;
+import com.alibaba.graphscope.gaia.proto.GraphAlgebraPhysical;
import com.alibaba.graphscope.gaia.proto.IrResult;
+import com.alibaba.graphscope.gaia.proto.StoredProcedure;
import com.alibaba.graphscope.interactive.client.common.Result;
import com.alibaba.graphscope.interactive.models.QueryRequest;
+import java.util.concurrent.CompletableFuture;
+
public interface QueryInterface {
+ ///////////// Submitting Queries////////////////////
Result callProcedure(String graphId, QueryRequest request);
+ CompletableFuture> callProcedureAsync(
+ String graphId, QueryRequest request);
+
Result callProcedure(QueryRequest request);
+ CompletableFuture> callProcedureAsync(QueryRequest request);
+
+ ///////// Call procedure via stored_procedure.proto//////
+ Result callProcedure(String graphId, StoredProcedure.Query request);
+
+ CompletableFuture> callProcedureAsync(
+ String graphId, StoredProcedure.Query request);
+
+ Result callProcedure(StoredProcedure.Query request);
+
+ CompletableFuture> callProcedureAsync(
+ StoredProcedure.Query request);
+
+ /////////// Call procedure via raw bytes//////////////
+
Result callProcedureRaw(String graphId, byte[] request);
+ CompletableFuture> callProcedureRawAsync(String graphId, byte[] request);
+
Result callProcedureRaw(byte[] request);
+
+ CompletableFuture> callProcedureRawAsync(byte[] request);
+
+ /////////// Submitting adhoc queries//////////////
+ /**
+ * Submit a adhoc query, represented via physical plan.
+ * @param graphId the identifier of the graph
+ * @param physicalPlan physical execution plan.
+ * @return the results.
+ */
+ Result runAdhocQuery(
+ String graphId, GraphAlgebraPhysical.PhysicalPlan physicalPlan);
+
+ CompletableFuture> runAdhocQueryAsync(
+ String graphId, GraphAlgebraPhysical.PhysicalPlan physicalPlan);
+
+ /**
+ * Submit a adhoc query, represented via physical plan.
+ * @param physicalPlan physical execution plan.
+ * @return the results.
+ */
+ Result runAdhocQuery(
+ GraphAlgebraPhysical.PhysicalPlan physicalPlan);
+
+ CompletableFuture> runAdhocQueryAsync(
+ GraphAlgebraPhysical.PhysicalPlan physicalPlan);
}
diff --git a/flex/interactive/sdk/java/src/main/java/com/alibaba/graphscope/interactive/client/QueryServiceInterface.java b/flex/interactive/sdk/java/src/main/java/com/alibaba/graphscope/interactive/client/QueryServiceInterface.java
index fcec18faba82..7bc154073210 100644
--- a/flex/interactive/sdk/java/src/main/java/com/alibaba/graphscope/interactive/client/QueryServiceInterface.java
+++ b/flex/interactive/sdk/java/src/main/java/com/alibaba/graphscope/interactive/client/QueryServiceInterface.java
@@ -16,7 +16,8 @@
package com.alibaba.graphscope.interactive.client;
import com.alibaba.graphscope.interactive.client.common.Result;
-import com.alibaba.graphscope.interactive.models.*;
+import com.alibaba.graphscope.interactive.models.ServiceStatus;
+import com.alibaba.graphscope.interactive.models.StartServiceRequest;
/**
* Manage the query interface.
diff --git a/flex/interactive/sdk/java/src/main/java/com/alibaba/graphscope/interactive/client/impl/DefaultSession.java b/flex/interactive/sdk/java/src/main/java/com/alibaba/graphscope/interactive/client/impl/DefaultSession.java
index 5855275fd912..fc78a70ab2a8 100644
--- a/flex/interactive/sdk/java/src/main/java/com/alibaba/graphscope/interactive/client/impl/DefaultSession.java
+++ b/flex/interactive/sdk/java/src/main/java/com/alibaba/graphscope/interactive/client/impl/DefaultSession.java
@@ -15,7 +15,10 @@
*/
package com.alibaba.graphscope.interactive.client.impl;
+import com.alibaba.graphscope.gaia.proto.GraphAlgebraPhysical;
import com.alibaba.graphscope.gaia.proto.IrResult;
+import com.alibaba.graphscope.gaia.proto.StoredProcedure;
+import com.alibaba.graphscope.interactive.ApiCallback;
import com.alibaba.graphscope.interactive.ApiClient;
import com.alibaba.graphscope.interactive.ApiException;
import com.alibaba.graphscope.interactive.ApiResponse;
@@ -25,16 +28,36 @@
import com.alibaba.graphscope.interactive.client.common.Config;
import com.alibaba.graphscope.interactive.client.common.Result;
import com.alibaba.graphscope.interactive.client.common.Status;
+import com.alibaba.graphscope.interactive.client.utils.InputFormat;
import com.alibaba.graphscope.interactive.models.*;
+import com.google.gson.reflect.TypeToken;
import com.google.protobuf.InvalidProtocolBufferException;
+import io.opentelemetry.api.GlobalOpenTelemetry;
+import io.opentelemetry.api.OpenTelemetry;
+import io.opentelemetry.api.trace.Span;
+import io.opentelemetry.api.trace.SpanKind;
+import io.opentelemetry.api.trace.Tracer;
+import io.opentelemetry.context.Context;
+import io.opentelemetry.context.Scope;
+import io.opentelemetry.context.propagation.TextMapSetter;
+import io.opentelemetry.semconv.SemanticAttributes;
+
import okhttp3.ConnectionPool;
import okhttp3.OkHttpClient;
+import okhttp3.Request;
+import okhttp3.RequestBody;
+import okhttp3.Response;
+
+import org.jetbrains.annotations.Nullable;
import java.io.Closeable;
import java.io.File;
+import java.lang.reflect.Type;
import java.util.ArrayList;
import java.util.List;
+import java.util.Map;
+import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
/***
@@ -42,11 +65,6 @@
* Based on the code generated by OpenAPI Generator.
*/
public class DefaultSession implements Session {
- private static final int DEFAULT_READ_TIMEOUT = 30000;
- private static final int DEFAULT_WRITE_TIMEOUT = 30000;
- private static String JSON_FORMAT_STRING = "json";
- private static String PROTO_FORMAT_STRING = "proto";
- private static String ENCODER_FORMAT_STRING = "encoder";
private final AdminServiceGraphManagementApi graphApi;
private final AdminServiceJobManagementApi jobApi;
private final AdminServiceProcedureManagementApi procedureApi;
@@ -58,6 +76,18 @@ public class DefaultSession implements Session {
private final ApiClient client, queryClient;
private final Config config;
+ private OpenTelemetry openTelemetry;
+ private Tracer tracer;
+
+ private final TextMapSetter setter =
+ new TextMapSetter() {
+ @Override
+ public void set(@Nullable Request.Builder builder, String s, String s1) {
+ assert builder != null;
+ builder.addHeader(s, s1);
+ }
+ };
+
/**
* Create a default GraphScope Interactive Session.
*
@@ -65,7 +95,6 @@ public class DefaultSession implements Session {
*/
private DefaultSession(String uri, String storedProcUri, Config config) {
this.config = config;
- System.out.println("uri neq null" + (uri != null));
OkHttpClient httpClient = createHttpClient(config);
if (uri != null) {
client = new ApiClient(httpClient);
@@ -109,16 +138,25 @@ private DefaultSession(String uri, String storedProcUri, Config config) {
queryClient = new ApiClient(httpClient);
queryClient.setBasePath(storedProcUri);
queryApi = new QueryServiceApi(queryClient);
+ initOpenTelemetry();
}
public static DefaultSession newInstance(String adminUri, Config config) {
return new DefaultSession(adminUri, null, config);
}
+ public static DefaultSession newInstance(String adminUri) {
+ return new DefaultSession(adminUri, null, new Config.ConfigBuilder().build());
+ }
+
public static DefaultSession newInstance(String adminUri, String storedProcUri, Config config) {
return new DefaultSession(adminUri, storedProcUri, config);
}
+ public static DefaultSession newInstance(String adminUri, String storedProcUri) {
+ return new DefaultSession(adminUri, storedProcUri, new Config.ConfigBuilder().build());
+ }
+
/**
* Create defaultSession in stored procedure only mode, which means the session will only connect to query service,
* for launching queries.
@@ -130,6 +168,16 @@ public static QueryInterface queryInterfaceOnly(String storedProcUri, Config con
return new DefaultSession(null, storedProcUri, config);
}
+ private void initOpenTelemetry() {
+ if (config.isEnableTracing()) {
+ this.openTelemetry = GlobalOpenTelemetry.get();
+ this.tracer = openTelemetry.getTracer(DefaultSession.class.getName());
+ } else {
+ this.openTelemetry = null;
+ this.tracer = null;
+ }
+ }
+
/**
* Try to upload the input files if they are specified with a starting @
* for input files in schema_mapping. Replace the path to the uploaded file with the
@@ -588,95 +636,137 @@ public Result updateProcedure(
}
}
+ //////////////////////////////// Submitting Queries//////////////////////////////////////
@Override
- public Result callProcedure(
- String graphName, QueryRequest request) {
- try {
- // Interactive currently support four type of inputformat, see
- // flex/engines/graph_db/graph_db_session.h
- // Here we add byte of value 1 to denote the input format is in JSON format.
- ApiResponse response =
- queryApi.procCallWithHttpInfo(
- graphName, JSON_FORMAT_STRING, request.toJson().getBytes());
- if (response.getStatusCode() != 200) {
- return Result.fromException(
- new ApiException(response.getStatusCode(), "Failed to call procedure"));
- }
- IrResult.CollectiveResults results =
- IrResult.CollectiveResults.parseFrom(response.getData());
- return new Result<>(results);
- } catch (ApiException e) {
- e.printStackTrace();
- return Result.fromException(e);
- } catch (InvalidProtocolBufferException e) {
- e.printStackTrace();
- return Result.error(e.getMessage());
- }
+ public Result callProcedure(String graphId, QueryRequest request) {
+ return parseBytesToCollectiveResults(
+ callProcedureImpl(
+ graphId,
+ appendFormatByte(request.toJson().getBytes(), InputFormat.CYPHER_JSON)));
+ }
+
+ @Override
+ public CompletableFuture> callProcedureAsync(
+ String graphId, QueryRequest queryRequest) {
+ return callProcedureAsyncImpl(
+ graphId,
+ appendFormatByte(queryRequest.toJson().getBytes(), InputFormat.CYPHER_JSON))
+ .thenApply(this::parseBytesToCollectiveResults);
}
@Override
public Result callProcedure(QueryRequest request) {
- try {
- // Interactive currently support four type of inputformat, see
- // flex/engines/graph_db/graph_db_session.h
- // Here we add byte of value 1 to denote the input format is in JSON format.
- ApiResponse response =
- queryApi.procCallCurrentWithHttpInfo(
- JSON_FORMAT_STRING, request.toJson().getBytes());
- if (response.getStatusCode() != 200) {
- return Result.fromException(
- new ApiException(response.getStatusCode(), "Failed to call procedure"));
- }
- IrResult.CollectiveResults results =
- IrResult.CollectiveResults.parseFrom(response.getData());
- return new Result<>(results);
- } catch (ApiException e) {
- e.printStackTrace();
- return Result.fromException(e);
- } catch (InvalidProtocolBufferException e) {
- e.printStackTrace();
- return Result.error(e.getMessage());
- }
+ return parseBytesToCollectiveResults(
+ callProcedureImpl(
+ null,
+ appendFormatByte(request.toJson().getBytes(), InputFormat.CYPHER_JSON)));
}
@Override
- public Result callProcedureRaw(String graphName, byte[] request) {
- try {
- // Interactive currently support four type of inputformat, see
- // flex/engines/graph_db/graph_db_session.h
- // Here we add byte of value 0 to denote the input format is in raw encoder/decoder
- // format.
- ApiResponse response =
- queryApi.procCallWithHttpInfo(graphName, ENCODER_FORMAT_STRING, request);
- if (response.getStatusCode() != 200) {
- return Result.fromException(
- new ApiException(response.getStatusCode(), "Failed to call procedure"));
- }
- return new Result(response.getData());
- } catch (ApiException e) {
- e.printStackTrace();
- return Result.fromException(e);
- }
+ public CompletableFuture> callProcedureAsync(
+ QueryRequest queryRequest) {
+ return callProcedureAsyncImpl(
+ null,
+ appendFormatByte(queryRequest.toJson().getBytes(), InputFormat.CYPHER_JSON))
+ .thenApply(this::parseBytesToCollectiveResults);
+ }
+
+ @Override
+ public Result callProcedure(
+ String graphId, StoredProcedure.Query request) {
+ return parseBytesToCollectiveResults(
+ callProcedureImpl(
+ graphId,
+ appendFormatByte(
+ request.toByteArray(), InputFormat.CYPHER_PROTO_PROCEDURE)));
+ }
+
+ @Override
+ public CompletableFuture