From b8974beaf143afc2bed53c82fec633d5f55a19da Mon Sep 17 00:00:00 2001 From: luoxiaojian Date: Tue, 16 Jan 2024 19:41:17 +0800 Subject: [PATCH] feat(flex): Implemented compaction. (#3482) Co-authored-by: Zhang Lei --- flex/CMakeLists.txt | 15 +-- flex/bin/rt_server.cc | 2 +- flex/engines/graph_db/CMakeLists.txt | 1 + flex/engines/graph_db/app/server_app.cc | 8 ++ .../graph_db/database/compact_transaction.cc | 61 ++++++++++++ .../graph_db/database/compact_transaction.h | 51 ++++++++++ flex/engines/graph_db/database/graph_db.cc | 80 ++++++++++++++- flex/engines/graph_db/database/graph_db.h | 12 ++- .../graph_db/database/graph_db_session.cc | 24 ++++- .../graph_db/database/graph_db_session.h | 16 +-- .../graph_db/database/read_transaction.h | 98 ++++++++++++++++++- .../graph_db/database/update_transaction.cc | 10 +- .../graph_db/database/version_manager.cc | 11 ++- .../handler/graph_db_http_handler.cc | 39 +++++++- .../examples/modern_graph/modern_graph.yaml | 2 + flex/storages/rt_mutable_graph/dual_csr.h | 11 +++ .../loader/basic_fragment_loader.cc | 4 + flex/storages/rt_mutable_graph/mutable_csr.h | 78 ++++++++++++++- .../mutable_property_fragment.cc | 32 ++++++ .../mutable_property_fragment.h | 2 + flex/storages/rt_mutable_graph/schema.cc | 53 +++++++++- flex/storages/rt_mutable_graph/schema.h | 8 +- flex/utils/app_utils.cc | 22 +++++ flex/utils/app_utils.h | 6 ++ flex/utils/property/types.h | 12 +++ 25 files changed, 617 insertions(+), 41 deletions(-) create mode 100644 flex/engines/graph_db/database/compact_transaction.cc create mode 100644 flex/engines/graph_db/database/compact_transaction.h diff --git a/flex/CMakeLists.txt b/flex/CMakeLists.txt index aa2e91049751..9ed2ab2f9a0c 100644 --- a/flex/CMakeLists.txt +++ b/flex/CMakeLists.txt @@ -16,7 +16,6 @@ option(BUILD_DOC "Whether to build doc" ON) option(BUILD_ODPS_FRAGMENT_LOADER "Whether to build odps fragment loader" ON) option(MONITOR_SESSIONS "Whether monitor sessions" OFF) - # ------------------------------------------------------------------------------ # cmake configs # ------------------------------------------------------------------------------ @@ -55,16 +54,6 @@ if (NOT CMAKE_BUILD_TYPE AND NOT CMAKE_CONFIGURATION_TYPES) "Debug" "Release" "MinSizeRel" "RelWithDebInfo") endif () -if (APPLE) - set(CMAKE_MACOSX_RPATH ON) -else () - set(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -Werror -Wl,-rpath,$ORIGIN") -endif () - -set(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -std=c++17 -Wall -fPIC") -set(CMAKE_CXX_FLAGS_DEBUG "${CMAKE_CXX_FLAGS_DEBUG} -O0 -g") -set(CMAKE_CXX_FLAGS_RELEASE "${CMAKE_CXX_FLAGS_RELEASE} -O3 -g") - add_compile_definitions(FLEX_VERSION="${FLEX_VERSION}") if (APPLE) @@ -77,6 +66,10 @@ else () endif () endif () +set(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -std=c++17 -Wall -fPIC") +set(CMAKE_CXX_FLAGS_DEBUG "${CMAKE_CXX_FLAGS_DEBUG} -O0 -g") +set(CMAKE_CXX_FLAGS_RELEASE "${CMAKE_CXX_FLAGS_RELEASE} -O3 -g") + find_package(MPI REQUIRED) include_directories(SYSTEM ${MPI_CXX_INCLUDE_PATH}) diff --git a/flex/bin/rt_server.cc b/flex/bin/rt_server.cc index 787c51da557e..fa6e32004406 100644 --- a/flex/bin/rt_server.cc +++ b/flex/bin/rt_server.cc @@ -83,7 +83,7 @@ int main(int argc, char** argv) { auto& db = gs::GraphDB::get(); auto schema = gs::Schema::LoadFromYaml(graph_schema_path); - db.Open(schema, data_path, shard_num, warmup, memory_only); + db.Open(schema, data_path, shard_num, warmup, memory_only, true, http_port); t0 += grape::GetCurrentTime(); diff --git a/flex/engines/graph_db/CMakeLists.txt b/flex/engines/graph_db/CMakeLists.txt index 38ba71101e54..4f5544fcb23b 100644 --- a/flex/engines/graph_db/CMakeLists.txt +++ b/flex/engines/graph_db/CMakeLists.txt @@ -14,6 +14,7 @@ install(FILES ${CMAKE_CURRENT_SOURCE_DIR}/database/graph_db.h ${CMAKE_CURRENT_SOURCE_DIR}/database/single_edge_insert_transaction.h ${CMAKE_CURRENT_SOURCE_DIR}/database/single_vertex_insert_transaction.h ${CMAKE_CURRENT_SOURCE_DIR}/database/update_transaction.h + ${CMAKE_CURRENT_SOURCE_DIR}/database/compact_transaction.h ${CMAKE_CURRENT_SOURCE_DIR}/database/version_manager.h ${CMAKE_CURRENT_SOURCE_DIR}/database/wal.h ${CMAKE_CURRENT_SOURCE_DIR}/database/transaction_utils.h diff --git a/flex/engines/graph_db/app/server_app.cc b/flex/engines/graph_db/app/server_app.cc index 4e518d1747a0..05ee56e5115e 100644 --- a/flex/engines/graph_db/app/server_app.cc +++ b/flex/engines/graph_db/app/server_app.cc @@ -293,6 +293,14 @@ bool ServerApp::Query(Decoder& input, Encoder& output) { } return true; } + } else if (op == "COMPACTION") { + bool ret = graph_.Compact(); + if (ret) { + output.put_string("SUCCESS"); + } else { + output.put_string("ABORTED"); + } + return true; } return false; } diff --git a/flex/engines/graph_db/database/compact_transaction.cc b/flex/engines/graph_db/database/compact_transaction.cc new file mode 100644 index 000000000000..cbf5e9df0ea8 --- /dev/null +++ b/flex/engines/graph_db/database/compact_transaction.cc @@ -0,0 +1,61 @@ +/** Copyright 2020 Alibaba Group Holding Limited. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include "flex/engines/graph_db/database/compact_transaction.h" +#include "flex/engines/graph_db/database/version_manager.h" +#include "flex/engines/graph_db/database/wal.h" +#include "flex/storages/rt_mutable_graph/mutable_property_fragment.h" + +namespace gs { + +CompactTransaction::CompactTransaction(MutablePropertyFragment& graph, + WalWriter& logger, VersionManager& vm, + timestamp_t timestamp) + : graph_(graph), logger_(logger), vm_(vm), timestamp_(timestamp) { + arc_.Resize(sizeof(WalHeader)); +} + +CompactTransaction::~CompactTransaction() { Abort(); } + +timestamp_t CompactTransaction::timestamp() const { return timestamp_; } + +void CompactTransaction::Commit() { + if (timestamp_ != std::numeric_limits::max()) { + auto* header = reinterpret_cast(arc_.GetBuffer()); + header->length = 0; + header->timestamp = timestamp_; + header->type = 1; + + logger_.append(arc_.GetBuffer(), arc_.GetSize()); + arc_.Clear(); + + LOG(INFO) << "before compact - " << timestamp_; + graph_.Compact(timestamp_); + LOG(INFO) << "after compact - " << timestamp_; + + vm_.release_update_timestamp(timestamp_); + timestamp_ = std::numeric_limits::max(); + } +} + +void CompactTransaction::Abort() { + if (timestamp_ != std::numeric_limits::max()) { + arc_.Clear(); + vm_.revert_update_timestamp(timestamp_); + timestamp_ = std::numeric_limits::max(); + } +} + +} // namespace gs \ No newline at end of file diff --git a/flex/engines/graph_db/database/compact_transaction.h b/flex/engines/graph_db/database/compact_transaction.h new file mode 100644 index 000000000000..082dbb304f05 --- /dev/null +++ b/flex/engines/graph_db/database/compact_transaction.h @@ -0,0 +1,51 @@ +/** Copyright 2020 Alibaba Group Holding Limited. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#ifndef GRAPHSCOPE_DATABASE_COMPACT_TRANSACTION_H_ +#define GRAPHSCOPE_DATABASE_COMPACT_TRANSACTION_H_ + +#include "flex/storages/rt_mutable_graph/types.h" +#include "grape/serialization/in_archive.h" + +namespace gs { + +class MutablePropertyFragment; +class WalWriter; +class VersionManager; + +class CompactTransaction { + public: + CompactTransaction(MutablePropertyFragment& graph, WalWriter& logger, + VersionManager& vm, timestamp_t timestamp); + ~CompactTransaction(); + + timestamp_t timestamp() const; + + void Commit(); + + void Abort(); + + private: + MutablePropertyFragment& graph_; + WalWriter& logger_; + VersionManager& vm_; + timestamp_t timestamp_; + + grape::InArchive arc_; +}; + +} // namespace gs + +#endif // GRAPHSCOPE_DATABASE_COMPACT_TRANSACTION_H_ \ No newline at end of file diff --git a/flex/engines/graph_db/database/graph_db.cc b/flex/engines/graph_db/database/graph_db.cc index 919061fa7a56..1be5163f4c34 100644 --- a/flex/engines/graph_db/database/graph_db.cc +++ b/flex/engines/graph_db/database/graph_db.cc @@ -20,6 +20,8 @@ #include "flex/engines/graph_db/database/wal.h" #include "flex/utils/yaml_utils.h" +#include "flex/third_party/httplib.h" + namespace gs { struct SessionLocalContext { @@ -41,6 +43,10 @@ struct SessionLocalContext { GraphDB::GraphDB() = default; GraphDB::~GraphDB() { + if (compact_thread_running_) { + compact_thread_running_ = false; + compact_thread_.join(); + } for (int i = 0; i < thread_num_; ++i) { contexts_[i].~SessionLocalContext(); } @@ -53,7 +59,8 @@ GraphDB& GraphDB::get() { } Result GraphDB::Open(const Schema& schema, const std::string& data_dir, - int32_t thread_num, bool warmup, bool memory_only) { + int32_t thread_num, bool warmup, bool memory_only, + bool enable_auto_compaction, int port) { if (!std::filesystem::exists(data_dir)) { std::filesystem::create_directories(data_dir); } @@ -97,11 +104,54 @@ Result GraphDB::Open(const Schema& schema, const std::string& data_dir, }); mutable_schema.EmplacePlugins(plugin_paths); + last_compaction_ts_ = 0; openWalAndCreateContexts(data_dir, memory_only); if ((!create_empty_graph) && warmup) { graph_.Warmup(thread_num_); } + + if (enable_auto_compaction && (port != -1)) { + if (compact_thread_running_) { + compact_thread_running_ = false; + compact_thread_.join(); + } + compact_thread_running_ = true; + compact_thread_ = std::thread([&](int http_port) { + size_t last_compaction_at = 0; + while (compact_thread_running_) { + size_t query_num_before = getExecutedQueryNum(); + sleep(30); + if (!compact_thread_running_) { + break; + } + size_t query_num_after = getExecutedQueryNum(); + if (query_num_before == query_num_after && + (query_num_after > (last_compaction_at + 100000))) { + VLOG(10) << "Trigger auto compaction"; + last_compaction_at = query_num_after; + std::string url = "127.0.0.1"; + httplib::Client cli(url, http_port); + cli.set_connection_timeout(0, 300000); + cli.set_read_timeout(300, 0); + cli.set_write_timeout(300, 0); + + std::vector buf; + Encoder encoder(buf); + encoder.put_string("COMPACTION"); + encoder.put_byte(0); + std::string content(buf.data(), buf.size()); + auto res = cli.Post("/interactive/query", content, "text/plain"); + std::string ret = res->body; + Decoder decoder(ret.data(), ret.size()); + std::string_view info = decoder.get_string(); + + VLOG(10) << "Finish compaction, info: " << info; + } + } + }, port); + } + return Result(true); } @@ -110,6 +160,10 @@ void GraphDB::Close() { monitor_thread_running_ = false; monitor_thread_.join(); #endif + if (compact_thread_running_) { + compact_thread_running_ = false; + compact_thread_.join(); + } //-----------Clear graph_db---------------- graph_.Clear(); version_manager_.clear(); @@ -152,6 +206,13 @@ GraphDBSession& GraphDB::GetSession(int thread_id) { int GraphDB::SessionNum() const { return thread_num_; } +void GraphDB::UpdateCompactionTimestamp(timestamp_t ts) { + last_compaction_ts_ = ts; +} +timestamp_t GraphDB::GetLastCompactionTimestamp() const { + return last_compaction_ts_; +} + const MutablePropertyFragment& GraphDB::graph() const { return graph_; } MutablePropertyFragment& GraphDB::graph() { return graph_; } @@ -236,8 +297,13 @@ void GraphDB::ingestWals(const std::vector& wals, if (from_ts < to_ts) { IngestWalRange(contexts_, graph_, parser, from_ts, to_ts, thread_num); } - UpdateTransaction::IngestWal(graph_, work_dir, to_ts, update_wal.ptr, - update_wal.size, contexts_[0].allocator); + if (update_wal.size == 0) { + graph_.Compact(update_wal.timestamp); + last_compaction_ts_ = update_wal.timestamp; + } else { + UpdateTransaction::IngestWal(graph_, work_dir, to_ts, update_wal.ptr, + update_wal.size, contexts_[0].allocator); + } from_ts = to_ts + 1; } if (from_ts <= parser.last_ts()) { @@ -353,4 +419,12 @@ void GraphDB::openWalAndCreateContexts(const std::string& data_dir, #endif } +size_t GraphDB::getExecutedQueryNum() const { + size_t ret = 0; + for (int i = 0; i < thread_num_; ++i) { + ret += contexts_[i].session.query_num(); + } + return ret; +} + } // namespace gs diff --git a/flex/engines/graph_db/database/graph_db.h b/flex/engines/graph_db/database/graph_db.h index 3d36fb124a61..722f6526d0a6 100644 --- a/flex/engines/graph_db/database/graph_db.h +++ b/flex/engines/graph_db/database/graph_db.h @@ -57,7 +57,8 @@ class GraphDB { */ Result Open(const Schema& schema, const std::string& data_dir, int32_t thread_num = 1, bool warmup = false, - bool memory_only = true); + bool memory_only = true, + bool enable_auto_compaction = false, int port = -1); /** * @brief Close the current opened graph. @@ -115,6 +116,9 @@ class GraphDB { int SessionNum() const; + void UpdateCompactionTimestamp(timestamp_t ts); + timestamp_t GetLastCompactionTimestamp() const; + private: bool registerApp(const std::string& path, uint8_t index = 0); @@ -128,6 +132,8 @@ class GraphDB { void openWalAndCreateContexts(const std::string& data_dir_path, bool memory_only); + size_t getExecutedQueryNum() const; + friend class GraphDBSession; std::string work_dir_; @@ -145,6 +151,10 @@ class GraphDB { std::thread monitor_thread_; bool monitor_thread_running_; #endif + + timestamp_t last_compaction_ts_; + bool compact_thread_running_ = false; + std::thread compact_thread_; }; } // namespace gs diff --git a/flex/engines/graph_db/database/graph_db_session.cc b/flex/engines/graph_db/database/graph_db_session.cc index 9d31f73d4594..20423d0c6cf4 100644 --- a/flex/engines/graph_db/database/graph_db_session.cc +++ b/flex/engines/graph_db/database/graph_db_session.cc @@ -164,8 +164,8 @@ Result> GraphDBSession::Eval(const std::string& input) { eval_duration_.fetch_add( std::chrono::duration_cast(end - start) .count()); - ++query_num_; #endif + ++query_num_; return result_buffer; } @@ -184,8 +184,8 @@ Result> GraphDBSession::Eval(const std::string& input) { eval_duration_.fetch_add( std::chrono::duration_cast(end - start) .count()); - ++query_num_; #endif + ++query_num_; return Result>( StatusCode::QueryFailed, "Query failed for procedure id:" + std::to_string((int) type), @@ -317,12 +317,30 @@ void GraphDBSession::GetAppInfo(Encoder& result) { db_.GetAppInfo(result); } int GraphDBSession::SessionId() const { return thread_id_; } +CompactTransaction GraphDBSession::GetCompactTransaction() { + timestamp_t ts = db_.version_manager_.acquire_update_timestamp(); + return CompactTransaction(db_.graph_, logger_, db_.version_manager_, ts); +} + +bool GraphDBSession::Compact() { + auto txn = GetCompactTransaction(); + if (txn.timestamp() > db_.GetLastCompactionTimestamp() + 100000) { + db_.UpdateCompactionTimestamp(txn.timestamp()); + txn.Commit(); + return true; + } else { + txn.Abort(); + return false; + } +} + #ifdef MONITOR_SESSIONS double GraphDBSession::eval_duration() const { return static_cast(eval_duration_.load()) / 1000000.0; } -int64_t GraphDBSession::query_num() const { return query_num_.load(); } #endif +int64_t GraphDBSession::query_num() const { return query_num_.load(); } + } // namespace gs diff --git a/flex/engines/graph_db/database/graph_db_session.h b/flex/engines/graph_db/database/graph_db_session.h index e41d43778e6a..c12cec2e7c0e 100644 --- a/flex/engines/graph_db/database/graph_db_session.h +++ b/flex/engines/graph_db/database/graph_db_session.h @@ -17,6 +17,7 @@ #define GRAPHSCOPE_DATABASE_GRAPH_DB_SESSION_H_ #include "flex/engines/graph_db/app/app_base.h" +#include "flex/engines/graph_db/database/compact_transaction.h" #include "flex/engines/graph_db/database/insert_transaction.h" #include "flex/engines/graph_db/database/read_transaction.h" #include "flex/engines/graph_db/database/single_edge_insert_transaction.h" @@ -44,13 +45,11 @@ class GraphDBSession { alloc_(alloc), logger_(logger), work_dir_(work_dir), - thread_id_(thread_id) + thread_id_(thread_id), #ifdef MONITOR_SESSIONS - , eval_duration_(0), - query_num_(0) #endif - { + query_num_(0) { for (auto& app : apps_) { app = nullptr; } @@ -67,6 +66,8 @@ class GraphDBSession { UpdateTransaction GetUpdateTransaction(); + CompactTransaction GetCompactTransaction(); + bool BatchUpdate(UpdateBatch& batch); const MutablePropertyFragment& graph() const; @@ -93,11 +94,14 @@ class GraphDBSession { int SessionId() const; + bool Compact(); + #ifdef MONITOR_SESSIONS double eval_duration() const; - int64_t query_num() const; #endif + int64_t query_num() const; + private: GraphDB& db_; Allocator& alloc_; @@ -110,8 +114,8 @@ class GraphDBSession { #ifdef MONITOR_SESSIONS std::atomic eval_duration_; - std::atomic query_num_; #endif + std::atomic query_num_; }; } // namespace gs diff --git a/flex/engines/graph_db/database/read_transaction.h b/flex/engines/graph_db/database/read_transaction.h index 0ae10dbc4c8c..3430a712765a 100644 --- a/flex/engines/graph_db/database/read_transaction.h +++ b/flex/engines/graph_db/database/read_transaction.h @@ -92,15 +92,111 @@ template class GraphView { public: GraphView(const MutableCsr& csr, timestamp_t timestamp) - : csr_(csr), timestamp_(timestamp) {} + : csr_(csr), + timestamp_(timestamp), + unsorted_since_(csr.unsorted_since()) {} AdjListView get_edges(vid_t v) const { return AdjListView(csr_.get_edges(v), timestamp_); } + // iterate edges with data in [min_value, max_value) + template + void foreach_edges_between(vid_t v, EDATA_T& min_value, EDATA_T& max_value, + const FUNC_T& func) const { + const auto& edges = csr_.get_edges(v); + auto ptr = edges.end() - 1; + auto end = edges.begin() - 1; + while (ptr != end) { + if (ptr->timestamp > timestamp_) { + --ptr; + continue; + } + if (ptr->timestamp < unsorted_since_) { + break; + } + if (!(ptr->data < min_value) && (ptr->data < max_value)) { + func(*ptr, min_value, max_value); + } + --ptr; + } + if (ptr == end) { + return; + } + ptr = std::upper_bound(end + 1, ptr + 1, max_value, + [](const EDATA_T& a, const MutableNbr& b) { + return a < b.data; + }) - + 1; + while (ptr != end) { + if (ptr->data < min_value) { + break; + } + func(*ptr, min_value, max_value); + --ptr; + } + } + + // iterate edges with data in (min_value, +inf) + template + void foreach_edges_gt(vid_t v, EDATA_T& min_value, const FUNC_T& func) const { + const auto& edges = csr_.get_edges(v); + auto ptr = edges.end() - 1; + auto end = edges.begin() - 1; + while (ptr != end) { + if (ptr->timestamp > timestamp_) { + --ptr; + continue; + } + if (ptr->timestamp < unsorted_since_) { + break; + } + if (min_value < ptr->data) { + func(*ptr, min_value); + } + --ptr; + } + while (ptr != end) { + if (!(min_value < ptr->data)) { + break; + } + func(*ptr, min_value); + --ptr; + } + } + + // iterate edges with data in [min_value, +inf) + template + void foreach_edges_ge(vid_t v, EDATA_T& min_value, const FUNC_T& func) const { + const auto& edges = csr_.get_edges(v); + auto ptr = edges.end() - 1; + auto end = edges.begin() - 1; + while (ptr != end) { + if (ptr->timestamp > timestamp_) { + --ptr; + continue; + } + if (ptr->timestamp < unsorted_since_) { + break; + } + if (!(ptr->data < min_value)) { + func(*ptr, min_value); + } + --ptr; + } + while (ptr != end) { + if (ptr->data < min_value) { + break; + } + func(*ptr, min_value); + --ptr; + } + } + private: const MutableCsr& csr_; timestamp_t timestamp_; + timestamp_t unsorted_since_; }; template diff --git a/flex/engines/graph_db/database/update_transaction.cc b/flex/engines/graph_db/database/update_transaction.cc index c4e67ea47c33..c6c75b58f7fa 100644 --- a/flex/engines/graph_db/database/update_transaction.cc +++ b/flex/engines/graph_db/database/update_transaction.cc @@ -715,10 +715,12 @@ void UpdateTransaction::batch_commit(UpdateBatch& batch) { } auto& arc = batch.GetArc(); auto* header = reinterpret_cast(arc.GetBuffer()); - header->length = arc.GetSize() - sizeof(WalHeader); - header->type = 1; - header->timestamp = timestamp_; - logger_.append(arc.GetBuffer(), arc.GetSize()); + if (arc.GetSize() != sizeof(WalHeader)) { + header->length = arc.GetSize() - sizeof(WalHeader); + header->type = 1; + header->timestamp = timestamp_; + logger_.append(arc.GetBuffer(), arc.GetSize()); + } release(); } diff --git a/flex/engines/graph_db/database/version_manager.cc b/flex/engines/graph_db/database/version_manager.cc index 3f81e968d94c..9ff538b67505 100644 --- a/flex/engines/graph_db/database/version_manager.cc +++ b/flex/engines/graph_db/database/version_manager.cc @@ -116,7 +116,16 @@ uint32_t VersionManager::acquire_update_timestamp() { return write_ts_.fetch_add(1); } void VersionManager::release_update_timestamp(uint32_t ts) { - buf_.set_bit(ts & ring_index_mask); + lock_.lock(); + if (ts == read_ts_.load() + 1) { + read_ts_.store(ts); + } else { + LOG(ERROR) << "read ts is expected to be " << ts - 1 << ", while it is " + << read_ts_.load(); + buf_.set_bit(ts & ring_index_mask); + } + lock_.unlock(); + pending_reqs_ += thread_num_; pending_update_reqs_.store(0); } diff --git a/flex/engines/http_server/handler/graph_db_http_handler.cc b/flex/engines/http_server/handler/graph_db_http_handler.cc index b5aac790c298..335eec54378d 100644 --- a/flex/engines/http_server/handler/graph_db_http_handler.cc +++ b/flex/engines/http_server/handler/graph_db_http_handler.cc @@ -23,12 +23,44 @@ #include "flex/engines/http_server/generated/actor/executor_ref.act.autogen.h" #include "flex/engines/http_server/types.h" +#if 0 +class query_dispatcher { + public: + query_dispatcher(uint32_t shard_concurrency) + : shard_concurrency_(shard_concurrency), executor_idx_(0) {} + + int get_executor_idx() { + auto idx = executor_idx_; + executor_idx_ = (executor_idx_ + 1) % shard_concurrency_; + return idx; + } + + private: + int shard_concurrency_; + int executor_idx_; +}; +#else +#include +class query_dispatcher { + public: + query_dispatcher(uint32_t shard_concurrency) + : rd_(), gen_(rd_()), dis_(0, shard_concurrency - 1) {} + + int get_executor_idx() { return dis_(gen_); } + + private: + std::random_device rd_; + std::mt19937 gen_; + std::uniform_int_distribution<> dis_; +}; +#endif + namespace server { class graph_db_ic_handler : public seastar::httpd::handler_base { public: graph_db_ic_handler(uint32_t group_id, uint32_t shard_concurrency) - : shard_concurrency_(shard_concurrency), executor_idx_(0) { + : shard_concurrency_(shard_concurrency), dispatcher_(shard_concurrency) { executor_refs_.reserve(shard_concurrency_); hiactor::scope_builder builder; builder.set_shard(hiactor::local_shard_id()) @@ -44,8 +76,7 @@ class graph_db_ic_handler : public seastar::httpd::handler_base { const seastar::sstring& path, std::unique_ptr req, std::unique_ptr rep) override { - auto dst_executor = executor_idx_; - executor_idx_ = (executor_idx_ + 1) % shard_concurrency_; + auto dst_executor = dispatcher_.get_executor_idx(); return executor_refs_[dst_executor] .run_graph_db_query(query_param{std::move(req->content)}) @@ -65,7 +96,7 @@ class graph_db_ic_handler : public seastar::httpd::handler_base { private: const uint32_t shard_concurrency_; - uint32_t executor_idx_; + query_dispatcher dispatcher_; std::vector executor_refs_; }; diff --git a/flex/interactive/examples/modern_graph/modern_graph.yaml b/flex/interactive/examples/modern_graph/modern_graph.yaml index 9b7389f7488e..d168a7c2ad8e 100644 --- a/flex/interactive/examples/modern_graph/modern_graph.yaml +++ b/flex/interactive/examples/modern_graph/modern_graph.yaml @@ -47,6 +47,8 @@ schema: - source_vertex: person destination_vertex: person relation: MANY_TO_MANY + x_csr_params: + sort_on_compaction: TRUE properties: - property_id: 0 property_name: weight diff --git a/flex/storages/rt_mutable_graph/dual_csr.h b/flex/storages/rt_mutable_graph/dual_csr.h index a9090faf8ffa..e9c22660a60e 100644 --- a/flex/storages/rt_mutable_graph/dual_csr.h +++ b/flex/storages/rt_mutable_graph/dual_csr.h @@ -49,6 +49,8 @@ class DualCsrBase { virtual void PutEdge(vid_t src, vid_t dst, const Any& data, timestamp_t ts, Allocator& alloc) = 0; + virtual void SortByEdgeData(timestamp_t ts) = 0; + virtual void IngestEdge(vid_t src, vid_t dst, grape::OutArchive& oarc, timestamp_t timestamp, Allocator& alloc) = 0; @@ -127,6 +129,11 @@ class DualCsr : public DualCsrBase { out_csr_->put_edge(src, dst, prop, ts, alloc); } + void SortByEdgeData(timestamp_t ts) override { + in_csr_->batch_sort_by_edge_data(ts); + out_csr_->batch_sort_by_edge_data(ts); + } + void UpdateEdge(vid_t src, vid_t dst, const Any& data, timestamp_t ts, Allocator& alloc) override { auto oe = out_csr_->edge_iter_mut(src); @@ -256,6 +263,10 @@ class DualCsr : public DualCsrBase { out_csr_->put_edge_with_index(src, dst, row_id, ts, alloc); } + void SortByEdgeData(timestamp_t ts) override { + LOG(FATAL) << "Not implemented"; + } + void UpdateEdge(vid_t src, vid_t dst, const Any& data, timestamp_t ts, Allocator& alloc) override { auto oe_ptr = out_csr_->edge_iter_mut(src); diff --git a/flex/storages/rt_mutable_graph/loader/basic_fragment_loader.cc b/flex/storages/rt_mutable_graph/loader/basic_fragment_loader.cc index 2ed134807fa5..e6bc5e8030f0 100644 --- a/flex/storages/rt_mutable_graph/loader/basic_fragment_loader.cc +++ b/flex/storages/rt_mutable_graph/loader/basic_fragment_loader.cc @@ -79,6 +79,10 @@ void BasicFragmentLoader::LoadFragment() { dst_label * edge_label_num_ + edge_label; if (schema_.exist(src_label_name, dst_label_name, edge_label_name)) { if (dual_csr_list_[index] != NULL) { + if (schema_.get_sort_on_compaction(src_label_name, dst_label_name, + edge_label_name)) { + dual_csr_list_[index]->SortByEdgeData(1); + } dual_csr_list_[index]->Dump( oe_prefix(src_label_name, dst_label_name, edge_label_name), ie_prefix(src_label_name, dst_label_name, edge_label_name), diff --git a/flex/storages/rt_mutable_graph/mutable_csr.h b/flex/storages/rt_mutable_graph/mutable_csr.h index 10fdbeb59f9e..3221ffdb586c 100644 --- a/flex/storages/rt_mutable_graph/mutable_csr.h +++ b/flex/storages/rt_mutable_graph/mutable_csr.h @@ -43,6 +43,13 @@ struct MutableNbr { data(rhs.data) {} ~MutableNbr() = default; + MutableNbr& operator=(const MutableNbr& rhs) { + neighbor = rhs.neighbor; + timestamp.store(rhs.timestamp.load()); + data = rhs.data; + return *this; + } + const EDATA_T& get_data() const { return data; } vid_t get_neighbor() const { return neighbor; } timestamp_t get_timestamp() const { return timestamp.load(); } @@ -65,6 +72,13 @@ struct MutableNbr { MutableNbr(const MutableNbr& rhs) : neighbor(rhs.neighbor), timestamp(rhs.timestamp.load()) {} ~MutableNbr() = default; + + MutableNbr& operator=(const MutableNbr& rhs) { + neighbor = rhs.neighbor; + timestamp.store(rhs.timestamp.load()); + return *this; + } + void set_data(const grape::EmptyType&, timestamp_t ts) { timestamp.store(ts); } @@ -262,7 +276,7 @@ class MutableNbrSliceMut { bool operator<(const MutableColumnNbr& nbr) { return ptr_ < nbr.ptr_; } nbr_t* ptr_; - StringColumn& column_; + StringColumn & column_; }; using nbr_ptr_t = MutableColumnNbr; @@ -477,6 +491,12 @@ class MutableCsrBase { const std::vector& degree, double reserve_ratio = 1.2) = 0; + virtual void batch_sort_by_edge_data(timestamp_t ts) { + LOG(FATAL) << "not supported..."; + } + + virtual timestamp_t unsorted_since() const { return 0; } + virtual void open(const std::string& name, const std::string& snapshot_dir, const std::string& work_dir) = 0; @@ -673,6 +693,8 @@ class MutableCsr : public TypedMutableCsrBase { adj_lists_[i].init(ptr, cap, 0); ptr += cap; } + + unsorted_since_ = 0; return edge_num; } @@ -687,6 +709,7 @@ class MutableCsr : public TypedMutableCsrBase { cap_list->open(snapshot_dir + "/" + name + ".cap", true); } nbr_list_.open(snapshot_dir + "/" + name + ".nbr", true); + load_meta(snapshot_dir + "/" + name); } nbr_list_.touch(work_dir + "/" + name + ".nbr"); adj_lists_.open(work_dir + "/" + name + ".adj", false); @@ -709,6 +732,7 @@ class MutableCsr : public TypedMutableCsrBase { void open_in_memory(const std::string& prefix, size_t v_cap) override { mmap_array degree_list; degree_list.open_in_memory(prefix + ".deg"); + load_meta(prefix); mmap_array* cap_list = °ree_list; if (std::filesystem::exists(prefix + ".cap")) { cap_list = new mmap_array(); @@ -776,6 +800,7 @@ class MutableCsr : public TypedMutableCsrBase { const std::string& new_spanshot_dir) override { size_t vnum = adj_lists_.size(); bool reuse_nbr_list = true; + dump_meta(new_spanshot_dir + "/" + name); mmap_array degree_list; std::vector cap_list; degree_list.open(new_spanshot_dir + "/" + name + ".deg", false); @@ -895,10 +920,49 @@ class MutableCsr : public TypedMutableCsrBase { return std::make_shared>(get_edges_mut(v)); } + void batch_sort_by_edge_data(timestamp_t ts) override { + size_t vnum = adj_lists_.size(); + for (size_t i = 0; i != vnum; ++i) { + std::sort(adj_lists_[i].data(), + adj_lists_[i].data() + adj_lists_[i].size(), + [](const nbr_t& lhs, const nbr_t& rhs) { + return lhs.data < rhs.data; + }); + } + unsorted_since_ = ts; + } + + timestamp_t unsorted_since() const override { return unsorted_since_; } + private: + void load_meta(const std::string& prefix) { + std::string meta_file_path = prefix + ".meta"; + if (std::filesystem::exists(meta_file_path)) { + FILE* meta_file_fd = fopen(meta_file_path.c_str(), "r"); + CHECK_EQ( + fread(&unsorted_since_, sizeof(timestamp_t), 1, meta_file_fd), + 1); + fclose(meta_file_fd); + } else { + unsorted_since_ = 0; + } + } + + void dump_meta(const std::string& prefix) const { + std::string meta_file_path = prefix + ".meta"; + FILE* meta_file_fd = + fopen((prefix + ".meta").c_str(), "wb"); + CHECK_EQ( + fwrite(&unsorted_since_, sizeof(timestamp_t), 1, meta_file_fd), + 1); + fflush(meta_file_fd); + fclose(meta_file_fd); + } + grape::SpinLock* locks_; mmap_array adj_lists_; mmap_array nbr_list_; + timestamp_t unsorted_since_; }; template <> @@ -1312,6 +1376,12 @@ class SingleMutableCsr : public TypedMutableCsrBase { (void) output.load(); } + void batch_sort_by_edge_data(timestamp_t ts) override {} + + timestamp_t unsorted_since() const override { + return std::numeric_limits::max(); + } + private: mmap_array nbr_list_; }; @@ -1561,6 +1631,12 @@ class EmptyCsr : public TypedMutableCsrBase { return std::make_shared>( MutableNbrSliceMut::empty()); } + + void batch_sort_by_edge_data(timestamp_t ts) override {} + + timestamp_t unsorted_since() const override { + return std::numeric_limits::max(); + } }; template <> diff --git a/flex/storages/rt_mutable_graph/mutable_property_fragment.cc b/flex/storages/rt_mutable_graph/mutable_property_fragment.cc index 3fcba9fa1219..a9453fd1e7c4 100644 --- a/flex/storages/rt_mutable_graph/mutable_property_fragment.cc +++ b/flex/storages/rt_mutable_graph/mutable_property_fragment.cc @@ -233,6 +233,34 @@ void MutablePropertyFragment::Open(const std::string& work_dir, } } +void MutablePropertyFragment::Compact(uint32_t version) { + for (size_t src_label_i = 0; src_label_i != vertex_label_num_; + ++src_label_i) { + std::string src_label = + schema_.get_vertex_label_name(static_cast(src_label_i)); + for (size_t dst_label_i = 0; dst_label_i != vertex_label_num_; + ++dst_label_i) { + std::string dst_label = + schema_.get_vertex_label_name(static_cast(dst_label_i)); + for (size_t e_label_i = 0; e_label_i != edge_label_num_; ++e_label_i) { + std::string edge_label = + schema_.get_edge_label_name(static_cast(e_label_i)); + if (!schema_.exist(src_label, dst_label, edge_label)) { + continue; + } + size_t index = src_label_i * vertex_label_num_ * edge_label_num_ + + dst_label_i * edge_label_num_ + e_label_i; + if (dual_csr_list_[index] != NULL) { + if (schema_.get_sort_on_compaction(src_label, dst_label, + edge_label)) { + dual_csr_list_[index]->SortByEdgeData(version); + } + } + } + } + } +} + void MutablePropertyFragment::Dump(const std::string& work_dir, uint32_t version) { std::string snapshot_dir_path = snapshot_dir(work_dir, version); @@ -266,6 +294,10 @@ void MutablePropertyFragment::Dump(const std::string& work_dir, if (dual_csr_list_[index] != NULL) { ie_[index]->resize(vertex_num[dst_label_i]); oe_[index]->resize(vertex_num[src_label_i]); + if (schema_.get_sort_on_compaction(src_label, dst_label, + edge_label)) { + dual_csr_list_[index]->SortByEdgeData(version + 1); + } dual_csr_list_[index]->Dump( oe_prefix(src_label, dst_label, edge_label), ie_prefix(src_label, dst_label, edge_label), diff --git a/flex/storages/rt_mutable_graph/mutable_property_fragment.h b/flex/storages/rt_mutable_graph/mutable_property_fragment.h index f9c87d74f7f1..0955d69d968e 100644 --- a/flex/storages/rt_mutable_graph/mutable_property_fragment.h +++ b/flex/storages/rt_mutable_graph/mutable_property_fragment.h @@ -50,6 +50,8 @@ class MutablePropertyFragment { void Open(const std::string& work_dir, bool memory_only); + void Compact(uint32_t version); + void Warmup(int thread_num); void Dump(const std::string& work_dir, uint32_t version); diff --git a/flex/storages/rt_mutable_graph/schema.cc b/flex/storages/rt_mutable_graph/schema.cc index ea3454c0d486..ba2c49d388e0 100644 --- a/flex/storages/rt_mutable_graph/schema.cc +++ b/flex/storages/rt_mutable_graph/schema.cc @@ -33,6 +33,7 @@ void Schema::Clear() { eprop_names_.clear(); ie_strategy_.clear(); oe_strategy_.clear(); + sort_on_compactions_.clear(); max_vnum_.clear(); plugin_name_to_path_and_id_.clear(); plugin_dir_.clear(); @@ -59,7 +60,8 @@ void Schema::add_edge_label(const std::string& src_label, const std::string& edge_label, const std::vector& properties, const std::vector& prop_names, - EdgeStrategy oe, EdgeStrategy ie) { + EdgeStrategy oe, EdgeStrategy ie, + bool sort_on_compaction) { label_t src_label_id = vertex_label_to_index(src_label); label_t dst_label_id = vertex_label_to_index(dst_label); label_t edge_label_id = edge_label_to_index(edge_label); @@ -70,6 +72,7 @@ void Schema::add_edge_label(const std::string& src_label, oe_strategy_[label_id] = oe; ie_strategy_[label_id] = ie; eprop_names_[label_id] = prop_names; + sort_on_compactions_[label_id] = sort_on_compaction; } label_t Schema::vertex_label_num() const { @@ -229,6 +232,18 @@ EdgeStrategy Schema::get_incoming_edge_strategy( return ie_strategy_.at(index); } +bool Schema::get_sort_on_compaction(const std::string& src_label, + const std::string& dst_label, + const std::string& label) const { + label_t src, dst, edge; + CHECK(vlabel_indexer_.get_index(src_label, src)); + CHECK(vlabel_indexer_.get_index(dst_label, dst)); + CHECK(elabel_indexer_.get_index(label, edge)); + uint32_t index = generate_edge_label(src, dst, edge); + CHECK(sort_on_compactions_.find(index) != sort_on_compactions_.end()); + return sort_on_compactions_.at(index); +} + label_t Schema::get_edge_label_id(const std::string& label) const { label_t ret; CHECK(elabel_indexer_.get_index(label, ret)); @@ -265,7 +280,7 @@ void Schema::Serialize(std::unique_ptr& writer) const { grape::InArchive arc; arc << v_primary_keys_ << vproperties_ << vprop_names_ << vprop_storage_ << eproperties_ << eprop_names_ << ie_strategy_ << oe_strategy_ - << max_vnum_; + << sort_on_compactions_ << max_vnum_; CHECK(writer->WriteArchive(arc)); } @@ -276,7 +291,8 @@ void Schema::Deserialize(std::unique_ptr& reader) { grape::OutArchive arc; CHECK(reader->ReadArchive(arc)); arc >> v_primary_keys_ >> vproperties_ >> vprop_names_ >> vprop_storage_ >> - eproperties_ >> eprop_names_ >> ie_strategy_ >> oe_strategy_ >> max_vnum_; + eproperties_ >> eprop_names_ >> ie_strategy_ >> oe_strategy_ >> + sort_on_compactions_ >> max_vnum_; } label_t Schema::vertex_label_to_index(const std::string& label) { @@ -676,6 +692,7 @@ static bool parse_edge_schema(YAML::Node node, Schema& schema) { } EdgeStrategy default_ie = EdgeStrategy::kMultiple; EdgeStrategy default_oe = EdgeStrategy::kMultiple; + bool default_sort_on_compaction = false; // get vertex type pair relation auto vertex_type_pair_node = node["vertex_type_pair_relations"]; @@ -693,6 +710,7 @@ static bool parse_edge_schema(YAML::Node node, Schema& schema) { auto cur_node = vertex_type_pair_node[i]; EdgeStrategy cur_ie = default_ie; EdgeStrategy cur_oe = default_oe; + bool cur_sort_on_compaction = default_sort_on_compaction; if (!get_scalar(cur_node, "source_vertex", src_label_name)) { LOG(ERROR) << "Expect field source_vertex for edge [" << edge_label_name << "] in vertex_type_pair_relations"; @@ -746,13 +764,40 @@ static bool parse_edge_schema(YAML::Node node, Schema& schema) { } } } + // try to parse sort on compaction + if (csr_node["sort_on_compaction"]) { + std::string sort_on_compaction_str; + if (get_scalar(csr_node, "sort_on_compaction", + sort_on_compaction_str)) { + if (sort_on_compaction_str == "true" || + sort_on_compaction_str == "TRUE") { + VLOG(10) << "Sort on compaction for edge: " << src_label_name + << "-[" << edge_label_name << "]->" << dst_label_name; + cur_sort_on_compaction = true; + } else if (sort_on_compaction_str == "false" || + sort_on_compaction_str == "FALSE") { + VLOG(10) << "Do not sort on compaction for edge: " << src_label_name + << "-[" << edge_label_name << "]->" << dst_label_name; + cur_sort_on_compaction = false; + } else { + LOG(ERROR) << "sort_on_compaction is not set properly for edge: " + << src_label_name << "-[" << edge_label_name << "]->" + << dst_label_name << "expect TRUE/FALSE"; + return false; + } + } + } else { + VLOG(10) << "Do not sort on compaction for edge: " << src_label_name + << "-[" << edge_label_name << "]->" << dst_label_name; + } } VLOG(10) << "edge " << edge_label_name << " from " << src_label_name << " to " << dst_label_name << " with " << property_types.size() << " properties"; schema.add_edge_label(src_label_name, dst_label_name, edge_label_name, - property_types, prop_names, cur_oe, cur_ie); + property_types, prop_names, cur_oe, cur_ie, + cur_sort_on_compaction); } // check the type_id equals to storage's label_id diff --git a/flex/storages/rt_mutable_graph/schema.h b/flex/storages/rt_mutable_graph/schema.h index 1b41311a28eb..7f69cbac14b5 100644 --- a/flex/storages/rt_mutable_graph/schema.h +++ b/flex/storages/rt_mutable_graph/schema.h @@ -55,7 +55,8 @@ class Schema { const std::vector& properties, const std::vector& prop_names, EdgeStrategy oe = EdgeStrategy::kMultiple, - EdgeStrategy ie = EdgeStrategy::kMultiple); + EdgeStrategy ie = EdgeStrategy::kMultiple, + bool sort_on_compaction = false); label_t vertex_label_num() const; @@ -135,6 +136,10 @@ class Schema { const std::string& dst_label, const std::string& label) const; + bool get_sort_on_compaction(const std::string& src_label, + const std::string& dst_label, + const std::string& label) const; + bool contains_edge_label(const std::string& label) const; label_t get_edge_label_id(const std::string& label) const; @@ -187,6 +192,7 @@ class Schema { std::map> eprop_names_; std::map oe_strategy_; std::map ie_strategy_; + std::map sort_on_compactions_; std::vector max_vnum_; std::unordered_map> plugin_name_to_path_and_id_; // key is plugin name, value is plugin path diff --git a/flex/utils/app_utils.cc b/flex/utils/app_utils.cc index e5e34a959075..e2acfbb4cdec 100644 --- a/flex/utils/app_utils.cc +++ b/flex/utils/app_utils.cc @@ -79,6 +79,21 @@ void Encoder::put_string_view(const std::string_view& v) { memcpy(&buf_[size + 4], v.data(), len); } +void Encoder::put_small_string(const std::string& v) { + size_t size = buf_.size(); + int len = v.size(); + buf_.resize(size + sizeof(uint8_t) + len); + buf_[size] = static_cast(len); + memcpy(&buf_[size + 1], v.data(), len); +} + +void Encoder::put_small_string_view(const std::string_view& v) { + size_t size = buf_.size(); + int len = v.size(); + buf_.resize(size + sizeof(uint8_t) + len); + buf_[size] = static_cast(len); + memcpy(&buf_[size + 1], v.data(), len); +} void Encoder::put_double(double v){ size_t size = buf_.size(); @@ -128,6 +143,13 @@ std::string_view Decoder::get_string() { return ret; } +std::string_view Decoder::get_small_string() { + int len = static_cast(get_byte()); + std::string_view ret(data_, len); + data_ += len; + return ret; +} + uint8_t Decoder::get_byte() { return static_cast(*(data_++)); } const char* Decoder::data() const { return data_; } diff --git a/flex/utils/app_utils.h b/flex/utils/app_utils.h index 2173cd13069e..cc23eadf0473 100644 --- a/flex/utils/app_utils.h +++ b/flex/utils/app_utils.h @@ -48,6 +48,10 @@ class Encoder { void put_string_view(const std::string_view& v); + void put_small_string(const std::string& v); + + void put_small_string_view(const std::string_view& v); + void put_double(double v); void clear(); @@ -67,6 +71,8 @@ class Decoder { std::string_view get_string(); + std::string_view get_small_string(); + uint8_t get_byte(); double get_double(); diff --git a/flex/utils/property/types.h b/flex/utils/property/types.h index f97217389363..79de26a6ecae 100644 --- a/flex/utils/property/types.h +++ b/flex/utils/property/types.h @@ -25,6 +25,14 @@ limitations under the License. #include "grape/serialization/in_archive.h" #include "grape/serialization/out_archive.h" +namespace grape { + +inline bool operator<(const EmptyType& lhs, const EmptyType& rhs) { + return false; +} + +} // namespace grape + namespace gs { enum class StorageStrategy { @@ -113,6 +121,10 @@ struct Date { std::string to_string() const; + bool operator<(const Date& rhs) const { + return milli_second < rhs.milli_second; + } + int64_t milli_second; };