Skip to content

Commit

Permalink
refactor(interactive): Reorganize the http handler of interactive (#4037
Browse files Browse the repository at this point in the history
)

Merge `hqps_http_handler` and `graph_db_http_handler` into one.

related: #3848

---------

Co-authored-by: liulx20 <[email protected]>
  • Loading branch information
zhanglei1949 and liulx20 authored Jul 26, 2024
1 parent 627de60 commit cac1d9d
Show file tree
Hide file tree
Showing 48 changed files with 1,711 additions and 1,485 deletions.
17 changes: 14 additions & 3 deletions .github/workflows/interactive.yml
Original file line number Diff line number Diff line change
Expand Up @@ -291,6 +291,18 @@ jobs:
./modern_graph_schema_v0_0.yaml ./modern_graph_schema_v0_1.yaml
sed -i 's/temp_workspace/interactive_workspace/g' ${GITHUB_WORKSPACE}/flex/tests/hqps/engine_config_test.yaml
sed -i 's/default_graph: modern_graph/default_graph: movies/g' ${GITHUB_WORKSPACE}/flex/tests/hqps/engine_config_test.yaml

- name: Let compiler use latest interactive java sdk
env:
HOME: /home/graphscope/
run: |
. /home/graphscope/.graphscope_env
. /home/graphscope/.cargo/env
# replace the <interactive.sdk.version>0.4</interactive.sdk.version> with the latest version in flex/interactive/sdk/java/pom.xml
sdk_version=$(grep -oPm1 "(?<=<version>)[^<]+" ${GITHUB_WORKSPACE}/flex/interactive/sdk/java/pom.xml)
sed -i "s/<interactive.sdk.version>.*<\/interactive.sdk.version>/<interactive.sdk.version>${sdk_version}<\/interactive.sdk.version>/" ${GITHUB_WORKSPACE}/interactive_engine/pom.xml
cd ${GITHUB_WORKSPACE}/interactive_engine/
mvn clean install -Pexperimental -DskipTests
- name: Run End-to-End cypher adhoc ldbc query test
env:
Expand Down Expand Up @@ -375,14 +387,13 @@ jobs:
mkdir build && cd build # only test default build
cmake .. -DCMAKE_BUILD_TYPE=DEBUG -DBUILD_DOC=OFF && sudo make -j 4
# test the different combination of cmake options: -DBUILD_HQPS=ON/OFF -DBUILD_TEST=ON/OFF, -DBUILD_ODPS_FRAGMENT_LOADER=ON/OFF
# test the different combination of cmake options: -DBUILD_TEST=ON/OFF, -DBUILD_ODPS_FRAGMENT_LOADER=ON/OFF
test-cmake-options:
runs-on: ubuntu-20.04
container:
image: registry.cn-hongkong.aliyuncs.com/graphscope/graphscope-dev:v0.23.0
strategy:
matrix:
BUILD_HQPS: [ON, OFF]
BUILD_TEST: [ON, OFF]
BUILD_ODPS_FRAGMENT_LOADER: [ON, OFF]
steps:
Expand All @@ -393,7 +404,7 @@ jobs:
cd ${GITHUB_WORKSPACE}/flex
git submodule update --init
mkdir build && cd build
cmake .. -DBUILD_HQPS=${{ matrix.BUILD_HQPS }} -DBUILD_TEST=${{ matrix.BUILD_TEST }} \
cmake .. -DBUILD_TEST=${{ matrix.BUILD_TEST }} \
-DBUILD_ODPS_FRAGMENT_LOADER=${{ matrix.BUILD_ODPS_FRAGMENT_LOADER }}
sudo make -j4
Expand Down
7 changes: 0 additions & 7 deletions flex/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -10,14 +10,12 @@ project (
LANGUAGES CXX)


option(BUILD_HQPS "Whether to build HighQPS Engine" ON)
option(BUILD_TEST "Whether to build test" ON)
option(BUILD_DOC "Whether to build doc" OFF)
option(BUILD_ODPS_FRAGMENT_LOADER "Whether to build odps fragment loader" OFF)
option(USE_PTHASH "Whether to use pthash" OFF)

#print options
message(STATUS "Build HighQPS Engine: ${BUILD_HQPS}")
message(STATUS "Build test: ${BUILD_TEST}")
message(STATUS "Build doc: ${BUILD_DOC}")
message(STATUS "Build odps fragment loader: ${BUILD_ODPS_FRAGMENT_LOADER}")
Expand All @@ -37,11 +35,6 @@ set(CMAKE_BUILD_WITH_INSTALL_RPATH FALSE)
set(CMAKE_INSTALL_RPATH "${CMAKE_INSTALL_PREFIX}/lib")
set(CMAKE_INSTALL_RPATH_USE_LINK_PATH TRUE)

if (BUILD_HQPS)
message("Build HighQPS Engine")
add_definitions(-DBUILD_HQPS)
endif ()

if(USE_PTHASH)
message("Use PTHash")
add_definitions(-DUSE_PTHASH)
Expand Down
20 changes: 10 additions & 10 deletions flex/bin/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -19,18 +19,18 @@ add_executable(flex_analytical_engine flex_analytical_engine.cc)
target_link_libraries(flex_analytical_engine flex_immutable_graph flex_bsp ${GLOG_LIBRARIES} ${GFLAGS_LIBRARIES})
install_without_export_flex_target(flex_analytical_engine)

if(BUILD_HQPS)
if(Hiactor_FOUND)
add_executable(interactive_server interactive_server.cc)
target_link_libraries(interactive_server flex_utils flex_graph_db flex_server hqps_plan_proto flex_utils ${GLOG_LIBRARIES} ${GFLAGS_LIBRARIES})
if (OPENTELEMETRY_CPP_FOUND)
target_link_libraries(interactive_server otel)
endif()
install_without_export_flex_target(interactive_server)

if(Hiactor_FOUND)
add_executable(interactive_server interactive_server.cc)
target_link_libraries(interactive_server flex_utils flex_graph_db flex_server hqps_plan_proto flex_utils ${GLOG_LIBRARIES} ${GFLAGS_LIBRARIES})
if (OPENTELEMETRY_CPP_FOUND)
target_link_libraries(interactive_server otel)
endif()
# install the script
install(PROGRAMS load_plan_and_gen.sh DESTINATION bin)
install_without_export_flex_target(interactive_server)
endif()
# install the script
install(PROGRAMS load_plan_and_gen.sh DESTINATION bin)


include_directories(${Boost_INCLUDE_DIRS})
add_executable(bulk_loader bulk_loader.cc)
Expand Down
9 changes: 5 additions & 4 deletions flex/bin/interactive_server.cc
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@
#include "stdlib.h"

#include "flex/engines/http_server/codegen_proxy.h"
#include "flex/engines/http_server/service/hqps_service.h"
#include "flex/engines/http_server/graph_db_service.h"
#include "flex/engines/http_server/workdir_manipulator.h"
#include "flex/otel/otel.h"
#include "flex/storages/rt_mutable_graph/loading_config.h"
Expand Down Expand Up @@ -64,7 +64,7 @@ void blockSignal(int sig) {
}

// When graph_schema is not specified, codegen proxy will use the running graph
// schema in hqps_service
// schema in graph_db_service
void init_codegen_proxy(const bpo::variables_map& vm,
const std::string& engine_config_file,
const std::string& graph_schema_file = "") {
Expand Down Expand Up @@ -203,6 +203,7 @@ int main(int argc, char** argv) {
service_config.start_admin_service = vm["enable-admin-service"].as<bool>();
service_config.start_compiler = vm["start-compiler"].as<bool>();
service_config.memory_level = vm["memory-level"].as<unsigned>();
service_config.enable_adhoc_handler = true;

auto& db = gs::GraphDB::get();

Expand Down Expand Up @@ -266,8 +267,8 @@ int main(int argc, char** argv) {
}
}

server::HQPSService::get().init(service_config);
server::HQPSService::get().run_and_wait_for_exit();
server::GraphDBService::get().init(service_config);
server::GraphDBService::get().run_and_wait_for_exit();

#ifdef HAVE_OPENTELEMETRY_CPP
otel::cleanUpTracer();
Expand Down
2 changes: 1 addition & 1 deletion flex/bin/rt_bench.cc
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@
#include "flex/engines/graph_db/database/graph_db.h"
#include "flex/engines/http_server/executor_group.actg.h"
#include "flex/engines/http_server/generated/actor/executor_ref.act.autogen.h"
#include "flex/engines/http_server/service/graph_db_service.h"
#include "flex/engines/http_server/graph_db_service.h"

namespace bpo = boost::program_options;
using namespace std::chrono_literals;
Expand Down
11 changes: 9 additions & 2 deletions flex/bin/rt_server.cc
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,8 @@
#include "grape/util.h"

#include "flex/engines/graph_db/database/graph_db.h"
#include "flex/engines/http_server/graph_db_service.h"
#include "flex/engines/http_server/options.h"
#include "flex/engines/http_server/service/graph_db_service.h"

#include <boost/program_options.hpp>
#include <seastar/core/alien.hh>
Expand Down Expand Up @@ -99,7 +99,14 @@ int main(int argc, char** argv) {

// start service
LOG(INFO) << "GraphScope http server start to listen on port " << http_port;
server::GraphDBService::get().init(shard_num, http_port, enable_dpdk);

server::ServiceConfig service_config;
service_config.shard_num = shard_num;
service_config.dpdk_mode = enable_dpdk;
service_config.query_port = http_port;
service_config.start_admin_service = false;
service_config.start_compiler = false;
server::GraphDBService::get().init(service_config);
server::GraphDBService::get().run_and_wait_for_exit();

return 0;
Expand Down
8 changes: 3 additions & 5 deletions flex/codegen/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,6 @@ find_package(Boost REQUIRED COMPONENTS system filesystem
context program_options regex thread)
include_directories(SYSTEM ${Boost_INCLUDE_DIRS})

if (BUILD_HQPS)
add_executable(gen_code_from_plan gen_code_from_plan.cc)
target_link_libraries(gen_code_from_plan flex_rt_mutable_graph flex_utils hqps_plan_proto ${GLOG_LIBRARIES} ${Boost_LIBRARIES})
install_flex_target(gen_code_from_plan)
endif()
add_executable(gen_code_from_plan gen_code_from_plan.cc)
target_link_libraries(gen_code_from_plan flex_rt_mutable_graph flex_utils hqps_plan_proto ${GLOG_LIBRARIES} ${Boost_LIBRARIES})
install_flex_target(gen_code_from_plan)
5 changes: 1 addition & 4 deletions flex/engines/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,5 @@
add_subdirectory(graph_db)
add_subdirectory(bsp)
add_subdirectory(http_server)
message(STATUS "BUILD_HQPS: ${BUILD_HQPS}")
if (BUILD_HQPS)
add_subdirectory(hqps_db)
endif()
add_subdirectory(hqps_db)

14 changes: 4 additions & 10 deletions flex/engines/graph_db/CMakeLists.txt
Original file line number Diff line number Diff line change
@@ -1,16 +1,11 @@
file(GLOB_RECURSE GRAPH_DB_SRC_FILES "${CMAKE_CURRENT_SOURCE_DIR}/app/*.cc"
"${CMAKE_CURRENT_SOURCE_DIR}/database/*.cc")
if (NOT BUILD_HQPS)
list(FILTER GRAPH_DB_SRC_FILES EXCLUDE REGEX ".*hqps_app*.")
endif()

add_library(flex_graph_db SHARED ${GRAPH_DB_SRC_FILES})

target_include_directories(flex_graph_db PUBLIC $<BUILD_INTERFACE:${CMAKE_CURRENT_BINARY_DIR}>)
target_link_libraries(flex_graph_db flex_rt_mutable_graph flex_utils ${GLOG_LIBRARIES} ${LIBGRAPELITE_LIBRARIES} ${CMAKE_THREAD_LIBS_INIT})
if (BUILD_HQPS)
target_link_libraries(flex_graph_db hqps_plan_proto)
endif()
target_link_libraries(flex_graph_db hqps_plan_proto)
install_flex_target(flex_graph_db)

install(FILES ${CMAKE_CURRENT_SOURCE_DIR}/database/graph_db.h
Expand All @@ -28,8 +23,7 @@ install(FILES ${CMAKE_CURRENT_SOURCE_DIR}/database/graph_db.h

install(FILES ${CMAKE_CURRENT_SOURCE_DIR}/app/app_base.h
DESTINATION include/flex/engines/graph_db/app)
if (BUILD_HQPS)
install(FILES ${CMAKE_CURRENT_SOURCE_DIR}/app/hqps_app.h
DESTINATION include/flex/engines/graph_db/app)
endif()
install(FILES ${CMAKE_CURRENT_SOURCE_DIR}/app/hqps_app.h
DESTINATION include/flex/engines/graph_db/app)


2 changes: 0 additions & 2 deletions flex/engines/graph_db/database/graph_db.cc
Original file line number Diff line number Diff line change
Expand Up @@ -404,12 +404,10 @@ void GraphDB::initApps(
}
// Builtin apps
app_factories_[0] = std::make_shared<ServerAppFactory>();
#ifdef BUILD_HQPS
app_factories_[Schema::HQPS_ADHOC_READ_PLUGIN_ID] =
std::make_shared<HQPSAdhocReadAppFactory>();
app_factories_[Schema::HQPS_ADHOC_WRITE_PLUGIN_ID] =
std::make_shared<HQPSAdhocWriteAppFactory>();
#endif // BUILD_HQPS

size_t valid_plugins = 0;
for (auto& path_and_index : plugins) {
Expand Down
4 changes: 0 additions & 4 deletions flex/engines/graph_db/database/graph_db_session.cc
Original file line number Diff line number Diff line change
Expand Up @@ -20,10 +20,8 @@
#include "flex/engines/graph_db/database/graph_db_session.h"
#include "flex/utils/app_utils.h"

#ifdef BUILD_HQPS
#include "flex/proto_generated_gie/stored_procedure.pb.h"
#include "nlohmann/json.hpp"
#endif // BUILD_HQPS

namespace gs {

Expand Down Expand Up @@ -229,7 +227,6 @@ AppBase* GraphDBSession::GetApp(int type) {

#undef likely // likely

#ifdef BUILD_HQPS
Result<std::pair<uint8_t, std::string_view>>
GraphDBSession::parse_query_type_from_cypher_json(
const std::string_view& str_view) {
Expand Down Expand Up @@ -282,7 +279,6 @@ GraphDBSession::parse_query_type_from_cypher_internal(
}
return std::make_pair(app_name_to_path_index.at(query_name).second, str_view);
}
#endif

const AppMetric& GraphDBSession::GetAppMetric(int idx) const {
return app_metrics_[idx];
Expand Down
32 changes: 11 additions & 21 deletions flex/engines/graph_db/database/graph_db_session.h
Original file line number Diff line number Diff line change
Expand Up @@ -37,21 +37,17 @@ class GraphDBSession {
public:
enum class InputFormat : uint8_t {
kCppEncoder = 0,
#ifdef BUILD_HQPS
kCypherJson = 1, // External usage format
kCypherInternalAdhoc = 2, // Internal format for adhoc query
kCypherInternalProcedure = 3, // Internal format for procedure
#endif // BUILD_HQPS
kCypherJson = 1, // Json format for cypher query
kCypherProtoAdhoc = 2, // Protobuf format for adhoc query
kCypherProtoProcedure = 3, // Protobuf format for procedure query
};

static constexpr int32_t MAX_RETRY = 3;
static constexpr int32_t MAX_PLUGIN_NUM = 256; // 2^(sizeof(uint8_t)*8)
#ifdef BUILD_HQPS
static constexpr const char* kCppEncoder = "\x00";
static constexpr const char* kCypherJson = "\x01";
static constexpr const char* kCypherInternalAdhoc = "\x02";
static constexpr const char* kCypherInternalProcedure = "\x03";
#endif // BUILD_HQPS
static constexpr const char* kCppEncoderStr = "\x00";
static constexpr const char* kCypherJsonStr = "\x01";
static constexpr const char* kCypherProtoAdhocStr = "\x02";
static constexpr const char* kCypherProtoProcedureStr = "\x03";
GraphDBSession(GraphDB& db, Allocator& alloc, WalWriter& logger,
const std::string& work_dir, int thread_id)
: db_(db),
Expand Down Expand Up @@ -110,12 +106,10 @@ class GraphDBSession {
AppBase* GetApp(int idx);

private:
#ifdef BUILD_HQPS
Result<std::pair<uint8_t, std::string_view>>
parse_query_type_from_cypher_json(const std::string_view& input);
Result<std::pair<uint8_t, std::string_view>>
parse_query_type_from_cypher_internal(const std::string_view& input);
#endif // BUILD_HQPS
/**
* @brief Parse the input format of the query.
* There are four formats:
Expand Down Expand Up @@ -154,10 +148,8 @@ class GraphDBSession {
// user-defined payload,
return std::make_pair((uint8_t) input[len - 2],
std::string_view(str_data, len - 2));
}
#ifdef BUILD_HQPS
else if (input_tag ==
static_cast<uint8_t>(InputFormat::kCypherInternalAdhoc)) {
} else if (input_tag ==
static_cast<uint8_t>(InputFormat::kCypherProtoAdhoc)) {
// For cypher internal adhoc, the query id is the
// second last byte,which is fixed to 255, and other bytes are a string
// representing the path to generated dynamic lib.
Expand All @@ -169,15 +161,13 @@ class GraphDBSession {
std::string_view str_view(input.data(), len - 1);
return parse_query_type_from_cypher_json(str_view);
} else if (input_tag ==
static_cast<uint8_t>(InputFormat::kCypherInternalProcedure)) {
static_cast<uint8_t>(InputFormat::kCypherProtoProcedure)) {
// For cypher internal procedure, the query_name is
// provided in the protobuf message.
std::string_view str_view(input.data(), len - 1);
return parse_query_type_from_cypher_internal(str_view);

}
#endif // BUILD_HQPS
else {
} else {
return Result<std::pair<uint8_t, std::string_view>>(
gs::Status(StatusCode::InValidArgument,
"Invalid input tag: " + std::to_string(input_tag)));
Expand Down
4 changes: 0 additions & 4 deletions flex/engines/hqps_db/app/interactive_app_base.h
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,6 @@
#ifndef ENGINES_HQPS_DB_APP_INTERACTIVE_APP_BASE_H_
#define ENGINES_HQPS_DB_APP_INTERACTIVE_APP_BASE_H_

#ifdef BUILD_HQPS

#include "flex/engines/graph_db/app/app_base.h"
#include "flex/proto_generated_gie/results.pb.h"
#include "flex/proto_generated_gie/stored_procedure.pb.h"
Expand Down Expand Up @@ -234,6 +232,4 @@ class CypherInternalPbWriteAppBase : public WriteAppBase {

} // namespace gs

#endif // BUILD_HQPS

#endif // ENGINES_HQPS_DB_APP_INTERACTIVE_APP_BASE_H_
18 changes: 2 additions & 16 deletions flex/engines/http_server/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -6,22 +6,10 @@ if (Hiactor_FOUND)
SOURCE_DIR ${CMAKE_CURRENT_SOURCE_DIR}/
INCLUDE_PATHS ${Hiactor_INCLUDE_DIR},${CMAKE_CURRENT_SOURCE_DIR}/../../../,${CMAKE_CURRENT_SOURCE_DIR}/../../third_party/nlohmann-json/single_include/)

if (NOT BUILD_HQPS)
list(FILTER server_actor_autogen_files EXCLUDE REGEX ".*admin.*")
list(FILTER server_actor_autogen_files EXCLUDE REGEX ".*codegen.*")
endif ()

# get all .cc files in current directory, except for generated/
file(GLOB_RECURSE SERVER_FILES "${CMAKE_CURRENT_SOURCE_DIR}/*.cc")
list(FILTER SERVER_FILES EXCLUDE REGEX ".*generated.*")

if (NOT BUILD_HQPS)
list(FILTER SERVER_FILES EXCLUDE REGEX ".*admin*")
list(FILTER SERVER_FILES EXCLUDE REGEX ".*hqps*")
list(FILTER SERVER_FILES EXCLUDE REGEX ".*codegen*")
list(FILTER SERVER_FILES EXCLUDE REGEX ".*workdir_manipulator*")
endif ()

add_library(flex_server STATIC ${SERVER_FILES} ${server_actor_autogen_files})
add_dependencies(flex_server server_actor_autogen)
target_compile_options (flex_server
Expand All @@ -42,10 +30,8 @@ if (Hiactor_FOUND)
endif ()
set_target_properties(Hiactor::seastar PROPERTIES INTERFACE_COMPILE_OPTIONS "${seastar_options}")

if (BUILD_HQPS)
target_include_directories(flex_server PUBLIC ${CMAKE_CURRENT_BINARY_DIR}/../hqps/)
target_link_libraries(flex_server hqps_plan_proto)
endif ()
target_include_directories(flex_server PUBLIC ${CMAKE_CURRENT_BINARY_DIR}/../hqps/)
target_link_libraries(flex_server hqps_plan_proto)
if (OPENTELEMETRY_CPP_FOUND)
target_link_libraries(flex_server otel)
endif()
Expand Down
Loading

0 comments on commit cac1d9d

Please sign in to comment.