From b8523f9fb576b9a3ae8196f02a55ebcd02044875 Mon Sep 17 00:00:00 2001 From: Zhang Lei Date: Wed, 25 Sep 2024 14:08:04 +0800 Subject: [PATCH] fix(interactive): Fix the behavior the service restarting from workspace (#4259) Previously, when Interactive server exits and restart, the server will still opens the graph data of default_graph(with graph_id 1), while the MetaService may returns other graph's id as running graph id. With this PR merged, when Interactive starts from an exiting workspace(directory), it will load the previous running graph. The status of metadata store will be consistent with actual graph storage. Fix #4253 --- flex/bin/interactive_server.cc | 58 +-------- flex/engines/http_server/graph_db_service.cc | 115 ++++++++++++++---- .../metadata/default_graph_meta_store.cc | 2 - 3 files changed, 91 insertions(+), 84 deletions(-) diff --git a/flex/bin/interactive_server.cc b/flex/bin/interactive_server.cc index 1bb33d2e556e..2ca6b33a8349 100644 --- a/flex/bin/interactive_server.cc +++ b/flex/bin/interactive_server.cc @@ -84,60 +84,6 @@ void init_codegen_proxy(const bpo::variables_map& vm, graph_schema_file); } -void openDefaultGraph(const std::string workspace, int32_t thread_num, - const std::string& default_graph, uint32_t memory_level) { - if (!std::filesystem::exists(workspace)) { - LOG(ERROR) << "Workspace directory not exists: " << workspace; - } - auto data_dir_path = - workspace + "/" + server::WorkDirManipulator::DATA_DIR_NAME; - if (!std::filesystem::exists(data_dir_path)) { - LOG(ERROR) << "Data directory not exists: " << data_dir_path; - return; - } - - // Get current executable path - - server::WorkDirManipulator::SetWorkspace(workspace); - - VLOG(1) << "Finish init workspace"; - - if (default_graph.empty()) { - LOG(FATAL) << "No Default graph is specified"; - return; - } - - auto& db = gs::GraphDB::get(); - auto schema_path = - server::WorkDirManipulator::GetGraphSchemaPath(default_graph); - auto schema_res = gs::Schema::LoadFromYaml(schema_path); - if (!schema_res.ok()) { - LOG(FATAL) << "Fail to load graph schema from yaml file: " << schema_path; - } - auto data_dir_res = - server::WorkDirManipulator::GetDataDirectory(default_graph); - if (!data_dir_res.ok()) { - LOG(FATAL) << "Fail to get data directory for default graph: " - << data_dir_res.status().error_message(); - } - std::string data_dir = data_dir_res.value(); - if (!std::filesystem::exists(data_dir)) { - LOG(FATAL) << "Data directory not exists: " << data_dir - << ", for graph: " << default_graph; - } - db.Close(); - gs::GraphDBConfig config(schema_res.value(), data_dir, thread_num); - config.memory_level = memory_level; - if (config.memory_level >= 2) { - config.enable_auto_compaction = true; - } - if (!db.Open(config).ok()) { - LOG(FATAL) << "Fail to load graph from data directory: " << data_dir; - } - LOG(INFO) << "Successfully init graph db for default graph: " - << default_graph; -} - void config_log_level(int log_level, int verbose_level) { if (getenv("GLOG_minloglevel") != nullptr) { FLAGS_stderrthreshold = atoi(getenv("GLOG_minloglevel")); @@ -220,6 +166,7 @@ int main(int argc, char** argv) { if (vm.count("workspace")) { workspace = vm["workspace"].as(); } + server::WorkDirManipulator::SetWorkspace(workspace); if (!vm.count("server-config")) { LOG(FATAL) << "server-config is needed"; @@ -259,9 +206,6 @@ int main(int argc, char** argv) { "data-path should NOT be specified"; } - gs::openDefaultGraph(workspace, service_config.shard_num, - service_config.default_graph, - service_config.memory_level); // Suppose the default_graph is already loaded. LOG(INFO) << "Finish init workspace"; auto schema_file = server::WorkDirManipulator::GetGraphSchemaPath( diff --git a/flex/engines/http_server/graph_db_service.cc b/flex/engines/http_server/graph_db_service.cc index c28dbb83a277..64854344b1ab 100644 --- a/flex/engines/http_server/graph_db_service.cc +++ b/flex/engines/http_server/graph_db_service.cc @@ -59,6 +59,52 @@ GraphDBService& GraphDBService::get() { return instance; } +void openGraph(const gs::GraphId& graph_id, + const ServiceConfig& service_config) { + auto workspace = server::WorkDirManipulator::GetWorkspace(); + if (!std::filesystem::exists(workspace)) { + LOG(ERROR) << "Workspace directory not exists: " << workspace; + } + if (graph_id.empty()) { + LOG(FATAL) << "No graph is specified"; + return; + } + auto data_dir_path = + workspace + "/" + server::WorkDirManipulator::DATA_DIR_NAME; + if (!std::filesystem::exists(data_dir_path)) { + LOG(ERROR) << "Data directory not exists: " << data_dir_path; + return; + } + + auto& db = gs::GraphDB::get(); + auto schema_path = server::WorkDirManipulator::GetGraphSchemaPath(graph_id); + auto schema_res = gs::Schema::LoadFromYaml(schema_path); + if (!schema_res.ok()) { + LOG(FATAL) << "Fail to load graph schema from yaml file: " << schema_path; + } + auto data_dir_res = server::WorkDirManipulator::GetDataDirectory(graph_id); + if (!data_dir_res.ok()) { + LOG(FATAL) << "Fail to get data directory for default graph: " + << data_dir_res.status().error_message(); + } + std::string data_dir = data_dir_res.value(); + if (!std::filesystem::exists(data_dir)) { + LOG(FATAL) << "Data directory not exists: " << data_dir + << ", for graph: " << graph_id; + } + db.Close(); + gs::GraphDBConfig config(schema_res.value(), data_dir, + service_config.shard_num); + config.memory_level = service_config.memory_level; + if (config.memory_level >= 2) { + config.enable_auto_compaction = true; + } + if (!db.Open(config).ok()) { + LOG(FATAL) << "Fail to load graph from data directory: " << data_dir; + } + LOG(INFO) << "Successfully init graph db for graph: " << graph_id; +} + void GraphDBService::init(const ServiceConfig& config) { if (initialized_.load(std::memory_order_relaxed)) { std::cerr << "High QPS service has been already initialized!" << std::endl; @@ -87,15 +133,56 @@ void GraphDBService::init(const ServiceConfig& config) { return; } LOG(INFO) << "Metadata store opened successfully."; - gs::GraphId default_graph_id = insert_default_graph_meta(); - auto set_res = metadata_store_->SetRunningGraph(default_graph_id); + // If there is no graph in the metadata store, insert the default graph. + auto graph_metas_res = metadata_store_->GetAllGraphMeta(); + if (!graph_metas_res.ok()) { + LOG(FATAL) << "Failed to get graph metas: " + << graph_metas_res.status().error_message(); + } + gs::GraphId cur_graph_id = ""; + // Try to launch service on the previous running graph. + auto running_graph_res = metadata_store_->GetRunningGraph(); + if (running_graph_res.ok() && !running_graph_res.value().empty()) { + cur_graph_id = running_graph_res.value(); + // make sure the cur_graph_id is in the graph_metas_res. + auto it = std::find_if(graph_metas_res.value().begin(), + graph_metas_res.value().end(), + [&cur_graph_id](const gs::GraphMeta& meta) { + return meta.id == cur_graph_id; + }); + if (it == graph_metas_res.value().end()) { + LOG(ERROR) << "The running graph: " << cur_graph_id + << " is not in the metadata store, maybe the metadata is " + "corrupted."; + cur_graph_id = ""; + } + } + if (cur_graph_id.empty()) { + if (!graph_metas_res.value().empty()) { + LOG(INFO) << "There are already " << graph_metas_res.value().size() + << " graph metas in the metadata store."; + // return the graph id with the smallest value. + cur_graph_id = + (std::min_element( + graph_metas_res.value().begin(), graph_metas_res.value().end(), + [](const gs::GraphMeta& a, const gs::GraphMeta& b) { + return a.id < b.id; + })) + ->id; + } else { + cur_graph_id = insert_default_graph_meta(); + } + } + // open the graph with the default graph id. + openGraph(cur_graph_id, service_config_); + auto set_res = metadata_store_->SetRunningGraph(cur_graph_id); if (!set_res.ok()) { LOG(FATAL) << "Failed to set running graph: " << res.status().error_message(); return; } - auto lock_res = metadata_store_->LockGraphIndices(default_graph_id); + auto lock_res = metadata_store_->LockGraphIndices(cur_graph_id); if (!lock_res.ok()) { LOG(FATAL) << lock_res.status().error_message(); return; @@ -380,28 +467,6 @@ std::string GraphDBService::find_interactive_class_path() { } gs::GraphId GraphDBService::insert_default_graph_meta() { - if (!metadata_store_) { - LOG(FATAL) << "Metadata store has not been inited!" << std::endl; - } - // If there is no graph in the metadata store, insert the default graph. - auto graph_metas_res = metadata_store_->GetAllGraphMeta(); - if (!graph_metas_res.ok()) { - LOG(FATAL) << "Failed to get graph metas: " - << graph_metas_res.status().error_message(); - } - if (!graph_metas_res.value().empty()) { - LOG(INFO) << "There are already " << graph_metas_res.value().size() - << " graph metas in the metadata store."; - - // return the graph id with the smallest value. - auto min_graph_id = std::min_element( - graph_metas_res.value().begin(), graph_metas_res.value().end(), - [](const gs::GraphMeta& a, const gs::GraphMeta& b) { - return a.id < b.id; - }); - return min_graph_id->id; - } - auto default_graph_name = this->service_config_.default_graph; auto schema_str_res = WorkDirManipulator::GetGraphSchemaString(default_graph_name); diff --git a/flex/storages/metadata/default_graph_meta_store.cc b/flex/storages/metadata/default_graph_meta_store.cc index e81a2d5dd601..78657a54dc94 100644 --- a/flex/storages/metadata/default_graph_meta_store.cc +++ b/flex/storages/metadata/default_graph_meta_store.cc @@ -20,7 +20,6 @@ DefaultGraphMetaStore::DefaultGraphMetaStore( std::unique_ptr base_store) : base_store_(std::move(base_store)) { // Clear previous context, in case of dirty data. - ClearRunningGraph(); clear_locks(); } @@ -29,7 +28,6 @@ DefaultGraphMetaStore::~DefaultGraphMetaStore() { Close(); } Result DefaultGraphMetaStore::Open() { return base_store_->Open(); } Result DefaultGraphMetaStore::Close() { - RETURN_IF_NOT_OK(ClearRunningGraph()); RETURN_IF_NOT_OK(clear_locks()); return base_store_->Close(); }