Skip to content

Commit

Permalink
fix(interactive): Fix the behavior the service restarting from worksp…
Browse files Browse the repository at this point in the history
…ace (#4259)

Previously, when Interactive server exits and restart, the server will
still opens the graph data of default_graph(with graph_id 1), while the
MetaService may returns other graph's id as running graph id.

With this PR merged, when Interactive starts from an exiting
workspace(directory), it will load the previous running graph. The
status of metadata store will be consistent with actual graph storage.

Fix #4253
  • Loading branch information
zhanglei1949 authored Sep 25, 2024
1 parent b814e5b commit b8523f9
Show file tree
Hide file tree
Showing 3 changed files with 91 additions and 84 deletions.
58 changes: 1 addition & 57 deletions flex/bin/interactive_server.cc
Original file line number Diff line number Diff line change
Expand Up @@ -84,60 +84,6 @@ void init_codegen_proxy(const bpo::variables_map& vm,
graph_schema_file);
}

void openDefaultGraph(const std::string workspace, int32_t thread_num,
const std::string& default_graph, uint32_t memory_level) {
if (!std::filesystem::exists(workspace)) {
LOG(ERROR) << "Workspace directory not exists: " << workspace;
}
auto data_dir_path =
workspace + "/" + server::WorkDirManipulator::DATA_DIR_NAME;
if (!std::filesystem::exists(data_dir_path)) {
LOG(ERROR) << "Data directory not exists: " << data_dir_path;
return;
}

// Get current executable path

server::WorkDirManipulator::SetWorkspace(workspace);

VLOG(1) << "Finish init workspace";

if (default_graph.empty()) {
LOG(FATAL) << "No Default graph is specified";
return;
}

auto& db = gs::GraphDB::get();
auto schema_path =
server::WorkDirManipulator::GetGraphSchemaPath(default_graph);
auto schema_res = gs::Schema::LoadFromYaml(schema_path);
if (!schema_res.ok()) {
LOG(FATAL) << "Fail to load graph schema from yaml file: " << schema_path;
}
auto data_dir_res =
server::WorkDirManipulator::GetDataDirectory(default_graph);
if (!data_dir_res.ok()) {
LOG(FATAL) << "Fail to get data directory for default graph: "
<< data_dir_res.status().error_message();
}
std::string data_dir = data_dir_res.value();
if (!std::filesystem::exists(data_dir)) {
LOG(FATAL) << "Data directory not exists: " << data_dir
<< ", for graph: " << default_graph;
}
db.Close();
gs::GraphDBConfig config(schema_res.value(), data_dir, thread_num);
config.memory_level = memory_level;
if (config.memory_level >= 2) {
config.enable_auto_compaction = true;
}
if (!db.Open(config).ok()) {
LOG(FATAL) << "Fail to load graph from data directory: " << data_dir;
}
LOG(INFO) << "Successfully init graph db for default graph: "
<< default_graph;
}

void config_log_level(int log_level, int verbose_level) {
if (getenv("GLOG_minloglevel") != nullptr) {
FLAGS_stderrthreshold = atoi(getenv("GLOG_minloglevel"));
Expand Down Expand Up @@ -220,6 +166,7 @@ int main(int argc, char** argv) {
if (vm.count("workspace")) {
workspace = vm["workspace"].as<std::string>();
}
server::WorkDirManipulator::SetWorkspace(workspace);

if (!vm.count("server-config")) {
LOG(FATAL) << "server-config is needed";
Expand Down Expand Up @@ -259,9 +206,6 @@ int main(int argc, char** argv) {
"data-path should NOT be specified";
}

gs::openDefaultGraph(workspace, service_config.shard_num,
service_config.default_graph,
service_config.memory_level);
// Suppose the default_graph is already loaded.
LOG(INFO) << "Finish init workspace";
auto schema_file = server::WorkDirManipulator::GetGraphSchemaPath(
Expand Down
115 changes: 90 additions & 25 deletions flex/engines/http_server/graph_db_service.cc
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,52 @@ GraphDBService& GraphDBService::get() {
return instance;
}

void openGraph(const gs::GraphId& graph_id,
const ServiceConfig& service_config) {
auto workspace = server::WorkDirManipulator::GetWorkspace();
if (!std::filesystem::exists(workspace)) {
LOG(ERROR) << "Workspace directory not exists: " << workspace;
}
if (graph_id.empty()) {
LOG(FATAL) << "No graph is specified";
return;
}
auto data_dir_path =
workspace + "/" + server::WorkDirManipulator::DATA_DIR_NAME;
if (!std::filesystem::exists(data_dir_path)) {
LOG(ERROR) << "Data directory not exists: " << data_dir_path;
return;
}

auto& db = gs::GraphDB::get();
auto schema_path = server::WorkDirManipulator::GetGraphSchemaPath(graph_id);
auto schema_res = gs::Schema::LoadFromYaml(schema_path);
if (!schema_res.ok()) {
LOG(FATAL) << "Fail to load graph schema from yaml file: " << schema_path;
}
auto data_dir_res = server::WorkDirManipulator::GetDataDirectory(graph_id);
if (!data_dir_res.ok()) {
LOG(FATAL) << "Fail to get data directory for default graph: "
<< data_dir_res.status().error_message();
}
std::string data_dir = data_dir_res.value();
if (!std::filesystem::exists(data_dir)) {
LOG(FATAL) << "Data directory not exists: " << data_dir
<< ", for graph: " << graph_id;
}
db.Close();
gs::GraphDBConfig config(schema_res.value(), data_dir,
service_config.shard_num);
config.memory_level = service_config.memory_level;
if (config.memory_level >= 2) {
config.enable_auto_compaction = true;
}
if (!db.Open(config).ok()) {
LOG(FATAL) << "Fail to load graph from data directory: " << data_dir;
}
LOG(INFO) << "Successfully init graph db for graph: " << graph_id;
}

void GraphDBService::init(const ServiceConfig& config) {
if (initialized_.load(std::memory_order_relaxed)) {
std::cerr << "High QPS service has been already initialized!" << std::endl;
Expand Down Expand Up @@ -87,15 +133,56 @@ void GraphDBService::init(const ServiceConfig& config) {
return;
}
LOG(INFO) << "Metadata store opened successfully.";
gs::GraphId default_graph_id = insert_default_graph_meta();
auto set_res = metadata_store_->SetRunningGraph(default_graph_id);
// If there is no graph in the metadata store, insert the default graph.
auto graph_metas_res = metadata_store_->GetAllGraphMeta();
if (!graph_metas_res.ok()) {
LOG(FATAL) << "Failed to get graph metas: "
<< graph_metas_res.status().error_message();
}
gs::GraphId cur_graph_id = "";
// Try to launch service on the previous running graph.
auto running_graph_res = metadata_store_->GetRunningGraph();
if (running_graph_res.ok() && !running_graph_res.value().empty()) {
cur_graph_id = running_graph_res.value();
// make sure the cur_graph_id is in the graph_metas_res.
auto it = std::find_if(graph_metas_res.value().begin(),
graph_metas_res.value().end(),
[&cur_graph_id](const gs::GraphMeta& meta) {
return meta.id == cur_graph_id;
});
if (it == graph_metas_res.value().end()) {
LOG(ERROR) << "The running graph: " << cur_graph_id
<< " is not in the metadata store, maybe the metadata is "
"corrupted.";
cur_graph_id = "";
}
}
if (cur_graph_id.empty()) {
if (!graph_metas_res.value().empty()) {
LOG(INFO) << "There are already " << graph_metas_res.value().size()
<< " graph metas in the metadata store.";
// return the graph id with the smallest value.
cur_graph_id =
(std::min_element(
graph_metas_res.value().begin(), graph_metas_res.value().end(),
[](const gs::GraphMeta& a, const gs::GraphMeta& b) {
return a.id < b.id;
}))
->id;
} else {
cur_graph_id = insert_default_graph_meta();
}
}
// open the graph with the default graph id.
openGraph(cur_graph_id, service_config_);
auto set_res = metadata_store_->SetRunningGraph(cur_graph_id);
if (!set_res.ok()) {
LOG(FATAL) << "Failed to set running graph: "
<< res.status().error_message();
return;
}

auto lock_res = metadata_store_->LockGraphIndices(default_graph_id);
auto lock_res = metadata_store_->LockGraphIndices(cur_graph_id);
if (!lock_res.ok()) {
LOG(FATAL) << lock_res.status().error_message();
return;
Expand Down Expand Up @@ -380,28 +467,6 @@ std::string GraphDBService::find_interactive_class_path() {
}

gs::GraphId GraphDBService::insert_default_graph_meta() {
if (!metadata_store_) {
LOG(FATAL) << "Metadata store has not been inited!" << std::endl;
}
// If there is no graph in the metadata store, insert the default graph.
auto graph_metas_res = metadata_store_->GetAllGraphMeta();
if (!graph_metas_res.ok()) {
LOG(FATAL) << "Failed to get graph metas: "
<< graph_metas_res.status().error_message();
}
if (!graph_metas_res.value().empty()) {
LOG(INFO) << "There are already " << graph_metas_res.value().size()
<< " graph metas in the metadata store.";

// return the graph id with the smallest value.
auto min_graph_id = std::min_element(
graph_metas_res.value().begin(), graph_metas_res.value().end(),
[](const gs::GraphMeta& a, const gs::GraphMeta& b) {
return a.id < b.id;
});
return min_graph_id->id;
}

auto default_graph_name = this->service_config_.default_graph;
auto schema_str_res =
WorkDirManipulator::GetGraphSchemaString(default_graph_name);
Expand Down
2 changes: 0 additions & 2 deletions flex/storages/metadata/default_graph_meta_store.cc
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@ DefaultGraphMetaStore::DefaultGraphMetaStore(
std::unique_ptr<IMetaStore> base_store)
: base_store_(std::move(base_store)) {
// Clear previous context, in case of dirty data.
ClearRunningGraph();
clear_locks();
}

Expand All @@ -29,7 +28,6 @@ DefaultGraphMetaStore::~DefaultGraphMetaStore() { Close(); }
Result<bool> DefaultGraphMetaStore::Open() { return base_store_->Open(); }

Result<bool> DefaultGraphMetaStore::Close() {
RETURN_IF_NOT_OK(ClearRunningGraph());
RETURN_IF_NOT_OK(clear_locks());
return base_store_->Close();
}
Expand Down

0 comments on commit b8523f9

Please sign in to comment.