Skip to content

Commit

Permalink
feat(flex): Implemented compaction. (#3482)
Browse files Browse the repository at this point in the history
Co-authored-by: Zhang Lei <[email protected]>
  • Loading branch information
luoxiaojian and zhanglei1949 authored Jan 16, 2024
1 parent d12b058 commit b8974be
Show file tree
Hide file tree
Showing 25 changed files with 617 additions and 41 deletions.
15 changes: 4 additions & 11 deletions flex/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@ option(BUILD_DOC "Whether to build doc" ON)
option(BUILD_ODPS_FRAGMENT_LOADER "Whether to build odps fragment loader" ON)
option(MONITOR_SESSIONS "Whether monitor sessions" OFF)


# ------------------------------------------------------------------------------
# cmake configs
# ------------------------------------------------------------------------------
Expand Down Expand Up @@ -55,16 +54,6 @@ if (NOT CMAKE_BUILD_TYPE AND NOT CMAKE_CONFIGURATION_TYPES)
"Debug" "Release" "MinSizeRel" "RelWithDebInfo")
endif ()

if (APPLE)
set(CMAKE_MACOSX_RPATH ON)
else ()
set(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -Werror -Wl,-rpath,$ORIGIN")
endif ()

set(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -std=c++17 -Wall -fPIC")
set(CMAKE_CXX_FLAGS_DEBUG "${CMAKE_CXX_FLAGS_DEBUG} -O0 -g")
set(CMAKE_CXX_FLAGS_RELEASE "${CMAKE_CXX_FLAGS_RELEASE} -O3 -g")

add_compile_definitions(FLEX_VERSION="${FLEX_VERSION}")

if (APPLE)
Expand All @@ -77,6 +66,10 @@ else ()
endif ()
endif ()

set(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -std=c++17 -Wall -fPIC")
set(CMAKE_CXX_FLAGS_DEBUG "${CMAKE_CXX_FLAGS_DEBUG} -O0 -g")
set(CMAKE_CXX_FLAGS_RELEASE "${CMAKE_CXX_FLAGS_RELEASE} -O3 -g")

find_package(MPI REQUIRED)
include_directories(SYSTEM ${MPI_CXX_INCLUDE_PATH})

Expand Down
2 changes: 1 addition & 1 deletion flex/bin/rt_server.cc
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,7 @@ int main(int argc, char** argv) {
auto& db = gs::GraphDB::get();

auto schema = gs::Schema::LoadFromYaml(graph_schema_path);
db.Open(schema, data_path, shard_num, warmup, memory_only);
db.Open(schema, data_path, shard_num, warmup, memory_only, true, http_port);

t0 += grape::GetCurrentTime();

Expand Down
1 change: 1 addition & 0 deletions flex/engines/graph_db/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ install(FILES ${CMAKE_CURRENT_SOURCE_DIR}/database/graph_db.h
${CMAKE_CURRENT_SOURCE_DIR}/database/single_edge_insert_transaction.h
${CMAKE_CURRENT_SOURCE_DIR}/database/single_vertex_insert_transaction.h
${CMAKE_CURRENT_SOURCE_DIR}/database/update_transaction.h
${CMAKE_CURRENT_SOURCE_DIR}/database/compact_transaction.h
${CMAKE_CURRENT_SOURCE_DIR}/database/version_manager.h
${CMAKE_CURRENT_SOURCE_DIR}/database/wal.h
${CMAKE_CURRENT_SOURCE_DIR}/database/transaction_utils.h
Expand Down
8 changes: 8 additions & 0 deletions flex/engines/graph_db/app/server_app.cc
Original file line number Diff line number Diff line change
Expand Up @@ -293,6 +293,14 @@ bool ServerApp::Query(Decoder& input, Encoder& output) {
}
return true;
}
} else if (op == "COMPACTION") {
bool ret = graph_.Compact();
if (ret) {
output.put_string("SUCCESS");
} else {
output.put_string("ABORTED");
}
return true;
}
return false;
}
Expand Down
61 changes: 61 additions & 0 deletions flex/engines/graph_db/database/compact_transaction.cc
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
/** Copyright 2020 Alibaba Group Holding Limited.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

#include "flex/engines/graph_db/database/compact_transaction.h"
#include "flex/engines/graph_db/database/version_manager.h"
#include "flex/engines/graph_db/database/wal.h"
#include "flex/storages/rt_mutable_graph/mutable_property_fragment.h"

namespace gs {

CompactTransaction::CompactTransaction(MutablePropertyFragment& graph,
WalWriter& logger, VersionManager& vm,
timestamp_t timestamp)
: graph_(graph), logger_(logger), vm_(vm), timestamp_(timestamp) {
arc_.Resize(sizeof(WalHeader));
}

CompactTransaction::~CompactTransaction() { Abort(); }

timestamp_t CompactTransaction::timestamp() const { return timestamp_; }

void CompactTransaction::Commit() {
if (timestamp_ != std::numeric_limits<timestamp_t>::max()) {
auto* header = reinterpret_cast<WalHeader*>(arc_.GetBuffer());
header->length = 0;
header->timestamp = timestamp_;
header->type = 1;

logger_.append(arc_.GetBuffer(), arc_.GetSize());
arc_.Clear();

LOG(INFO) << "before compact - " << timestamp_;
graph_.Compact(timestamp_);
LOG(INFO) << "after compact - " << timestamp_;

vm_.release_update_timestamp(timestamp_);
timestamp_ = std::numeric_limits<timestamp_t>::max();
}
}

void CompactTransaction::Abort() {
if (timestamp_ != std::numeric_limits<timestamp_t>::max()) {
arc_.Clear();
vm_.revert_update_timestamp(timestamp_);
timestamp_ = std::numeric_limits<timestamp_t>::max();
}
}

} // namespace gs
51 changes: 51 additions & 0 deletions flex/engines/graph_db/database/compact_transaction.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
/** Copyright 2020 Alibaba Group Holding Limited.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

#ifndef GRAPHSCOPE_DATABASE_COMPACT_TRANSACTION_H_
#define GRAPHSCOPE_DATABASE_COMPACT_TRANSACTION_H_

#include "flex/storages/rt_mutable_graph/types.h"
#include "grape/serialization/in_archive.h"

namespace gs {

class MutablePropertyFragment;
class WalWriter;
class VersionManager;

class CompactTransaction {
public:
CompactTransaction(MutablePropertyFragment& graph, WalWriter& logger,
VersionManager& vm, timestamp_t timestamp);
~CompactTransaction();

timestamp_t timestamp() const;

void Commit();

void Abort();

private:
MutablePropertyFragment& graph_;
WalWriter& logger_;
VersionManager& vm_;
timestamp_t timestamp_;

grape::InArchive arc_;
};

} // namespace gs

#endif // GRAPHSCOPE_DATABASE_COMPACT_TRANSACTION_H_
80 changes: 77 additions & 3 deletions flex/engines/graph_db/database/graph_db.cc
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@
#include "flex/engines/graph_db/database/wal.h"
#include "flex/utils/yaml_utils.h"

#include "flex/third_party/httplib.h"

namespace gs {

struct SessionLocalContext {
Expand All @@ -41,6 +43,10 @@ struct SessionLocalContext {

GraphDB::GraphDB() = default;
GraphDB::~GraphDB() {
if (compact_thread_running_) {
compact_thread_running_ = false;
compact_thread_.join();
}
for (int i = 0; i < thread_num_; ++i) {
contexts_[i].~SessionLocalContext();
}
Expand All @@ -53,7 +59,8 @@ GraphDB& GraphDB::get() {
}

Result<bool> GraphDB::Open(const Schema& schema, const std::string& data_dir,
int32_t thread_num, bool warmup, bool memory_only) {
int32_t thread_num, bool warmup, bool memory_only,
bool enable_auto_compaction, int port) {
if (!std::filesystem::exists(data_dir)) {
std::filesystem::create_directories(data_dir);
}
Expand Down Expand Up @@ -97,11 +104,54 @@ Result<bool> GraphDB::Open(const Schema& schema, const std::string& data_dir,
});
mutable_schema.EmplacePlugins(plugin_paths);

last_compaction_ts_ = 0;
openWalAndCreateContexts(data_dir, memory_only);

if ((!create_empty_graph) && warmup) {
graph_.Warmup(thread_num_);
}

if (enable_auto_compaction && (port != -1)) {
if (compact_thread_running_) {
compact_thread_running_ = false;
compact_thread_.join();
}
compact_thread_running_ = true;
compact_thread_ = std::thread([&](int http_port) {
size_t last_compaction_at = 0;
while (compact_thread_running_) {
size_t query_num_before = getExecutedQueryNum();
sleep(30);
if (!compact_thread_running_) {
break;
}
size_t query_num_after = getExecutedQueryNum();
if (query_num_before == query_num_after &&
(query_num_after > (last_compaction_at + 100000))) {
VLOG(10) << "Trigger auto compaction";
last_compaction_at = query_num_after;
std::string url = "127.0.0.1";
httplib::Client cli(url, http_port);
cli.set_connection_timeout(0, 300000);
cli.set_read_timeout(300, 0);
cli.set_write_timeout(300, 0);

std::vector<char> buf;
Encoder encoder(buf);
encoder.put_string("COMPACTION");
encoder.put_byte(0);
std::string content(buf.data(), buf.size());
auto res = cli.Post("/interactive/query", content, "text/plain");
std::string ret = res->body;
Decoder decoder(ret.data(), ret.size());
std::string_view info = decoder.get_string();

VLOG(10) << "Finish compaction, info: " << info;
}
}
}, port);
}

return Result<bool>(true);
}

Expand All @@ -110,6 +160,10 @@ void GraphDB::Close() {
monitor_thread_running_ = false;
monitor_thread_.join();
#endif
if (compact_thread_running_) {
compact_thread_running_ = false;
compact_thread_.join();
}
//-----------Clear graph_db----------------
graph_.Clear();
version_manager_.clear();
Expand Down Expand Up @@ -152,6 +206,13 @@ GraphDBSession& GraphDB::GetSession(int thread_id) {

int GraphDB::SessionNum() const { return thread_num_; }

void GraphDB::UpdateCompactionTimestamp(timestamp_t ts) {
last_compaction_ts_ = ts;
}
timestamp_t GraphDB::GetLastCompactionTimestamp() const {
return last_compaction_ts_;
}

const MutablePropertyFragment& GraphDB::graph() const { return graph_; }
MutablePropertyFragment& GraphDB::graph() { return graph_; }

Expand Down Expand Up @@ -236,8 +297,13 @@ void GraphDB::ingestWals(const std::vector<std::string>& wals,
if (from_ts < to_ts) {
IngestWalRange(contexts_, graph_, parser, from_ts, to_ts, thread_num);
}
UpdateTransaction::IngestWal(graph_, work_dir, to_ts, update_wal.ptr,
update_wal.size, contexts_[0].allocator);
if (update_wal.size == 0) {
graph_.Compact(update_wal.timestamp);
last_compaction_ts_ = update_wal.timestamp;
} else {
UpdateTransaction::IngestWal(graph_, work_dir, to_ts, update_wal.ptr,
update_wal.size, contexts_[0].allocator);
}
from_ts = to_ts + 1;
}
if (from_ts <= parser.last_ts()) {
Expand Down Expand Up @@ -353,4 +419,12 @@ void GraphDB::openWalAndCreateContexts(const std::string& data_dir,
#endif
}

size_t GraphDB::getExecutedQueryNum() const {
size_t ret = 0;
for (int i = 0; i < thread_num_; ++i) {
ret += contexts_[i].session.query_num();
}
return ret;
}

} // namespace gs
12 changes: 11 additions & 1 deletion flex/engines/graph_db/database/graph_db.h
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,8 @@ class GraphDB {
*/
Result<bool> Open(const Schema& schema, const std::string& data_dir,
int32_t thread_num = 1, bool warmup = false,
bool memory_only = true);
bool memory_only = true,
bool enable_auto_compaction = false, int port = -1);

/**
* @brief Close the current opened graph.
Expand Down Expand Up @@ -115,6 +116,9 @@ class GraphDB {

int SessionNum() const;

void UpdateCompactionTimestamp(timestamp_t ts);
timestamp_t GetLastCompactionTimestamp() const;

private:
bool registerApp(const std::string& path, uint8_t index = 0);

Expand All @@ -128,6 +132,8 @@ class GraphDB {
void openWalAndCreateContexts(const std::string& data_dir_path,
bool memory_only);

size_t getExecutedQueryNum() const;

friend class GraphDBSession;

std::string work_dir_;
Expand All @@ -145,6 +151,10 @@ class GraphDB {
std::thread monitor_thread_;
bool monitor_thread_running_;
#endif

timestamp_t last_compaction_ts_;
bool compact_thread_running_ = false;
std::thread compact_thread_;
};

} // namespace gs
Expand Down
Loading

0 comments on commit b8974be

Please sign in to comment.