Skip to content

Commit

Permalink
fix(interactive): Fix waiting compiler ready (alibaba#3770)
Browse files Browse the repository at this point in the history
As titled.
  • Loading branch information
zhanglei1949 authored May 7, 2024
1 parent 43c04de commit 5b2712c
Show file tree
Hide file tree
Showing 2 changed files with 75 additions and 11 deletions.
66 changes: 59 additions & 7 deletions flex/engines/http_server/service/hqps_service.cc
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,21 @@
#include "flex/engines/http_server/workdir_manipulator.h"
namespace server {

bool check_port_occupied(uint16_t port) {
VLOG(10) << "Check port " << port << " is occupied or not.";
int sockfd = socket(AF_INET, SOCK_STREAM, 0);
if (sockfd == -1) {
return false;
}
struct sockaddr_in addr;
addr.sin_family = AF_INET;
addr.sin_port = htons(port);
addr.sin_addr.s_addr = htonl(INADDR_ANY);
int ret = bind(sockfd, (struct sockaddr*) &addr, sizeof(addr));
close(sockfd);
return ret < 0;
}

ServiceConfig::ServiceConfig()
: bolt_port(DEFAULT_BOLT_PORT),
admin_port(DEFAULT_ADMIN_PORT),
Expand All @@ -27,6 +42,8 @@ ServiceConfig::ServiceConfig()
external_thread_num(2),
start_admin_service(true),
start_compiler(false),
enable_gremlin(true),
enable_bolt(true),
metadata_store_type_(gs::MetadataStoreType::kLocalFile) {}

const std::string HQPSService::DEFAULT_GRAPH_NAME = "modern_graph";
Expand Down Expand Up @@ -183,6 +200,28 @@ void HQPSService::start_query_actors() {
}
}

bool HQPSService::check_compiler_ready() const {
if (service_config_.start_compiler) {
if (service_config_.enable_gremlin) {
if (check_port_occupied(service_config_.gremlin_port)) {
return true;
} else {
LOG(ERROR) << "Gremlin server is not ready!";
return false;
}
}
if (service_config_.enable_bolt) {
if (check_port_occupied(service_config_.bolt_port)) {
return true;
} else {
LOG(ERROR) << "Bolt server is not ready!";
return false;
}
}
}
return true;
}

bool HQPSService::start_compiler_subprocess(
const std::string& graph_schema_path) {
LOG(INFO) << "Start compiler subprocess";
Expand Down Expand Up @@ -213,14 +252,27 @@ bool HQPSService::start_compiler_subprocess(
boost::process::child(cmd_str, boost::process::std_out > compiler_log,
boost::process::std_err > compiler_log);
LOG(INFO) << "Compiler process started with pid: " << compiler_process_.id();
// sleep for a while to wait for the compiler to start
std::this_thread::sleep_for(std::chrono::seconds(4));
// check if the compiler process is still running
if (!compiler_process_.running()) {
LOG(ERROR) << "Compiler process failed to start!";
return false;
// sleep for a maximum 30 seconds to wait for the compiler process to start
int32_t sleep_time = 0;
int32_t max_sleep_time = 30;
int32_t sleep_interval = 4;
while (sleep_time < max_sleep_time) {
std::this_thread::sleep_for(std::chrono::seconds(sleep_interval));
if (!compiler_process_.running()) {
LOG(ERROR) << "Compiler process failed to start!";
return false;
}
// check query server port is ready
if (check_compiler_ready()) {
LOG(INFO) << "Compiler server is ready!";
return true;
}
sleep_time += sleep_interval;
LOG(INFO) << "Sleep " << sleep_time << " seconds to wait for compiler "
<< "server to start.";
}
return true;
LOG(ERROR) << "Max sleep time reached, fail to start compiler server!";
return false;
}

bool HQPSService::stop_compiler_subprocess() {
Expand Down
20 changes: 16 additions & 4 deletions flex/engines/http_server/service/hqps_service.h
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,8 @@ struct ServiceConfig {
bool start_admin_service; // Whether to start the admin service or only
// start the query service.
bool start_compiler;
bool enable_gremlin;
bool enable_bolt;
gs::MetadataStoreType metadata_store_type_;

// Those has not default value
Expand Down Expand Up @@ -103,6 +105,8 @@ class HQPSService {

bool stop_compiler_subprocess();

bool check_compiler_ready() const;

private:
HQPSService() = default;

Expand Down Expand Up @@ -203,15 +207,23 @@ struct convert<server::ServiceConfig> {
auto endpoint_node = compiler_node["endpoint"];
if (endpoint_node) {
auto bolt_node = endpoint_node["bolt_connector"];
if (bolt_node && bolt_node["port"] &&
bolt_node["disabled"].as<bool>() == false) {
if (bolt_node && bolt_node["disabled"]) {
service_config.enable_bolt = !bolt_node["disabled"].as<bool>();
} else {
service_config.enable_bolt = true;
}
if (bolt_node && bolt_node["port"]) {
service_config.bolt_port = bolt_node["port"].as<uint32_t>();
} else {
LOG(INFO) << "bolt_port not found, or disabled";
}
auto gremlin_node = endpoint_node["gremlin_connector"];
if (gremlin_node && gremlin_node["port"] &&
gremlin_node["disabled"].as<bool>() == false) {
if (gremlin_node && gremlin_node["disabled"]) {
service_config.enable_gremlin = !gremlin_node["disabled"].as<bool>();
} else {
service_config.enable_gremlin = true;
}
if (gremlin_node && gremlin_node["port"]) {
service_config.gremlin_port = gremlin_node["port"].as<uint32_t>();
} else {
LOG(INFO) << "gremlin_port not found, use default value "
Expand Down

0 comments on commit 5b2712c

Please sign in to comment.