Skip to content

Commit

Permalink
feat(flex): Implemented Immutable Csrs. (#3527)
Browse files Browse the repository at this point in the history
  • Loading branch information
luoxiaojian authored Feb 1, 2024
1 parent b39a340 commit b8c02b9
Show file tree
Hide file tree
Showing 28 changed files with 2,091 additions and 1,257 deletions.
8 changes: 0 additions & 8 deletions flex/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -14,15 +14,12 @@ option(BUILD_HQPS "Whether to build HighQPS Engine" ON)
option(BUILD_TEST "Whether to build test" ON)
option(BUILD_DOC "Whether to build doc" ON)
option(BUILD_ODPS_FRAGMENT_LOADER "Whether to build odps fragment loader" ON)
option(MONITOR_SESSIONS "Whether monitor sessions" OFF)

#print options
message(STATUS "Build HighQPS Engine: ${BUILD_HQPS}")
message(STATUS "Build test: ${BUILD_TEST}")
message(STATUS "Build doc: ${BUILD_DOC}")
message(STATUS "Build odps fragment loader: ${BUILD_ODPS_FRAGMENT_LOADER}")
message(STATUS "Monitor sessions: ${MONITOR_SESSIONS}")
message(STATUS "Enable hugepage: ${ENABLE_HUGEPAGE}")

# ------------------------------------------------------------------------------
# cmake configs
Expand All @@ -43,11 +40,6 @@ if (BUILD_HQPS)
add_definitions(-DBUILD_HQPS)
endif ()

if (MONITOR_SESSIONS)
message("Monitor sessions is enabled")
add_definitions(-DMONITOR_SESSIONS)
endif ()

execute_process(COMMAND uname -r OUTPUT_VARIABLE LINUX_KERNEL_VERSION)
string(STRIP ${LINUX_KERNEL_VERSION} LINUX_KERNEL_VERSION)
message(${LINUX_KERNEL_VERSION})
Expand Down
5 changes: 3 additions & 2 deletions flex/bin/rt_server.cc
Original file line number Diff line number Diff line change
Expand Up @@ -85,8 +85,9 @@ int main(int argc, char** argv) {
auto schema = gs::Schema::LoadFromYaml(graph_schema_path);
gs::GraphDBConfig config(schema, data_path, shard_num);
config.memory_level = memory_level;
config.enable_auto_compaction = true;
config.service_port = http_port;
if (config.memory_level >= 2) {
config.enable_auto_compaction = true;
}
db.Open(config);

t0 += grape::GetCurrentTime();
Expand Down
194 changes: 89 additions & 105 deletions flex/engines/graph_db/database/graph_db.cc
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ GraphDB& GraphDB::get() {

Result<bool> GraphDB::Open(const Schema& schema, const std::string& data_dir,
int32_t thread_num, bool warmup, bool memory_only,
bool enable_auto_compaction, int port) {
bool enable_auto_compaction) {
GraphDBConfig config(schema, data_dir, thread_num);
config.warmup = warmup;
if (memory_only) {
Expand All @@ -71,7 +71,6 @@ Result<bool> GraphDB::Open(const Schema& schema, const std::string& data_dir,
config.memory_level = 0;
}
config.enable_auto_compaction = enable_auto_compaction;
config.service_port = port;
return Open(config);
}

Expand Down Expand Up @@ -135,57 +134,103 @@ Result<bool> GraphDB::Open(const GraphDBConfig& config) {
graph_.Warmup(thread_num_);
}

if (config.enable_auto_compaction && (config.service_port != -1)) {
if (config.enable_monitering) {
if (monitor_thread_running_) {
monitor_thread_running_ = false;
monitor_thread_.join();
}
monitor_thread_running_ = true;
monitor_thread_ = std::thread([&]() {
std::vector<double> last_eval_durations(thread_num_, 0);
std::vector<int64_t> last_query_nums(thread_num_, 0);
while (monitor_thread_running_) {
sleep(10);
size_t curr_allocated_size = 0;
double total_eval_durations = 0;
double min_eval_duration = std::numeric_limits<double>::max();
double max_eval_duration = 0;
int64_t total_query_num = 0;
int64_t min_query_num = std::numeric_limits<int64_t>::max();
int64_t max_query_num = 0;

for (int i = 0; i < thread_num_; ++i) {
curr_allocated_size += contexts_[i].allocator.allocated_memory();
if (last_eval_durations[i] == 0) {
last_eval_durations[i] = contexts_[i].session.eval_duration();
} else {
double curr = contexts_[i].session.eval_duration();
double eval_duration = curr;
total_eval_durations += eval_duration;
min_eval_duration = std::min(min_eval_duration, eval_duration);
max_eval_duration = std::max(max_eval_duration, eval_duration);

last_eval_durations[i] = curr;
}
if (last_query_nums[i] == 0) {
last_query_nums[i] = contexts_[i].session.query_num();
} else {
int64_t curr = contexts_[i].session.query_num();
total_query_num += curr;
min_query_num = std::min(min_query_num, curr);
max_query_num = std::max(max_query_num, curr);

last_query_nums[i] = curr;
}
}
if (max_query_num != 0) {
double avg_eval_durations =
total_eval_durations / static_cast<double>(thread_num_);
double avg_query_num = static_cast<double>(total_query_num) /
static_cast<double>(thread_num_);
double allocated_size_in_gb =
static_cast<double>(curr_allocated_size) / 1024.0 / 1024.0 /
1024.0;
LOG(INFO) << "allocated: " << allocated_size_in_gb << " GB, eval: ["
<< min_eval_duration << ", " << avg_eval_durations << ", "
<< max_eval_duration << "] s, query num: [" << min_query_num
<< ", " << avg_query_num << ", " << max_query_num << "]";
}
}
});
}

if (config.enable_auto_compaction) {
if (compact_thread_running_) {
compact_thread_running_ = false;
compact_thread_.join();
}
compact_thread_running_ = true;
compact_thread_ = std::thread(
[&](int http_port) {
size_t last_compaction_at = 0;
while (compact_thread_running_) {
size_t query_num_before = getExecutedQueryNum();
sleep(30);
if (!compact_thread_running_) {
break;
}
size_t query_num_after = getExecutedQueryNum();
if (query_num_before == query_num_after &&
(query_num_after > (last_compaction_at + 100000))) {
VLOG(10) << "Trigger auto compaction";
last_compaction_at = query_num_after;
std::string url = "127.0.0.1";
httplib::Client cli(url, http_port);
cli.set_connection_timeout(0, 300000);
cli.set_read_timeout(300, 0);
cli.set_write_timeout(300, 0);

std::vector<char> buf;
Encoder encoder(buf);
encoder.put_string("COMPACTION");
encoder.put_byte(0);
std::string content(buf.data(), buf.size());
auto res = cli.Post("/interactive/query", content, "text/plain");
std::string ret = res->body;
Decoder decoder(ret.data(), ret.size());
std::string_view info = decoder.get_string();

VLOG(10) << "Finish compaction, info: " << info;
}
}
},
config.service_port);
compact_thread_ = std::thread([&]() {
size_t last_compaction_at = 0;
while (compact_thread_running_) {
size_t query_num_before = getExecutedQueryNum();
sleep(30);
if (!compact_thread_running_) {
break;
}
size_t query_num_after = getExecutedQueryNum();
if (query_num_before == query_num_after &&
(query_num_after > (last_compaction_at + 100000))) {
VLOG(10) << "Trigger auto compaction";
last_compaction_at = query_num_after;
timestamp_t ts = this->version_manager_.acquire_update_timestamp();
auto txn = CompactTransaction(this->graph_, this->contexts_[0].logger,
this->version_manager_, ts);
txn.Commit();
VLOG(10) << "Finish compaction";
}
}
});
}

return Result<bool>(true);
}

void GraphDB::Close() {
#ifdef MONITOR_SESSIONS
monitor_thread_running_ = false;
monitor_thread_.join();
#endif
if (monitor_thread_running_) {
monitor_thread_running_ = false;
monitor_thread_.join();
}
if (compact_thread_running_) {
compact_thread_running_ = false;
compact_thread_.join();
Expand Down Expand Up @@ -399,63 +444,6 @@ void GraphDB::openWalAndCreateContexts(const std::string& data_dir,

initApps(graph_.schema().GetPlugins());
VLOG(1) << "Successfully restore load plugins";

#ifdef MONITOR_SESSIONS
monitor_thread_running_ = true;
monitor_thread_ = std::thread([&]() {
size_t last_allocated_size = 0;
std::vector<double> last_eval_durations(thread_num_, 0);
std::vector<int64_t> last_query_nums(thread_num_, 0);
while (monitor_thread_running_) {
sleep(10);
size_t curr_allocated_size = 0;
double total_eval_durations = 0;
double min_eval_duration = std::numeric_limits<double>::max();
double max_eval_duration = 0;
int64_t total_query_num = 0;
int64_t min_query_num = std::numeric_limits<int64_t>::max();
int64_t max_query_num = 0;

for (int i = 0; i < thread_num_; ++i) {
curr_allocated_size += contexts_[i].allocator.allocated_memory();
if (last_eval_durations[i] == 0) {
last_eval_durations[i] = contexts_[i].session.eval_duration();
} else {
double curr = contexts_[i].session.eval_duration();
double eval_duration = curr;
total_eval_durations += eval_duration;
min_eval_duration = std::min(min_eval_duration, eval_duration);
max_eval_duration = std::max(max_eval_duration, eval_duration);

last_eval_durations[i] = curr;
}
if (last_query_nums[i] == 0) {
last_query_nums[i] = contexts_[i].session.query_num();
} else {
int64_t curr = contexts_[i].session.query_num();
total_query_num += curr;
min_query_num = std::min(min_query_num, curr);
max_query_num = std::max(max_query_num, curr);

last_query_nums[i] = curr;
}
}
last_allocated_size = curr_allocated_size;
if (max_query_num != 0) {
double avg_eval_durations =
total_eval_durations / static_cast<double>(thread_num_);
double avg_query_num = static_cast<double>(total_query_num) /
static_cast<double>(thread_num_);
double allocated_size_in_gb =
static_cast<double>(curr_allocated_size) / 1024.0 / 1024.0 / 1024.0;
LOG(INFO) << "allocated: " << allocated_size_in_gb << " GB, eval: ["
<< min_eval_duration << ", " << avg_eval_durations << ", "
<< max_eval_duration << "] s, query num: [" << min_query_num
<< ", " << avg_query_num << ", " << max_query_num << "]";
}
}
});
#endif
}

void GraphDB::showAppMetrics() const {
Expand All @@ -469,12 +457,8 @@ void GraphDB::showAppMetrics() const {
std::string query_name = "UNKNOWN";
if (i == 0) {
query_name = "ServerApp";
} else if (i <= 14) {
query_name = "IC" + std::to_string(i);
} else if (i <= 21) {
query_name = "IS" + std::to_string(i - 14);
} else if (i <= 29) {
query_name = "INS" + std::to_string(i - 21);
} else {
query_name = "Query-" + std::to_string(i);
}
summary.output(query_name);
}
Expand Down
16 changes: 7 additions & 9 deletions flex/engines/graph_db/database/graph_db.h
Original file line number Diff line number Diff line change
Expand Up @@ -47,24 +47,24 @@ struct GraphDBConfig {
data_dir(data_dir_),
thread_num(thread_num_),
warmup(false),
enable_monitering(false),
enable_auto_compaction(false),
service_port(-1),
memory_level(1) {}

Schema schema;
std::string data_dir;
int thread_num;
bool warmup;
bool enable_monitering;
bool enable_auto_compaction;
int service_port;

/*
0 - sync with disk;
1 - mmap virtual memory;
2 - prefering hugepages;
0 - sync with disk;
1 - mmap virtual memory;
2 - prefering hugepages;
3 - force hugepages;
*/
int memory_level;
int memory_level;
};

class GraphDB {
Expand All @@ -85,7 +85,7 @@ class GraphDB {
Result<bool> Open(const Schema& schema, const std::string& data_dir,
int32_t thread_num = 1, bool warmup = false,
bool memory_only = true,
bool enable_auto_compaction = false, int port = -1);
bool enable_auto_compaction = false);

Result<bool> Open(const GraphDBConfig& config);

Expand Down Expand Up @@ -179,10 +179,8 @@ class GraphDB {
std::array<std::string, 256> app_paths_;
std::array<std::shared_ptr<AppFactoryBase>, 256> app_factories_;

#ifdef MONITOR_SESSIONS
std::thread monitor_thread_;
bool monitor_thread_running_;
#endif

timestamp_t last_compaction_ts_;
bool compact_thread_running_ = false;
Expand Down
9 changes: 0 additions & 9 deletions flex/engines/graph_db/database/graph_db_session.cc
Original file line number Diff line number Diff line change
Expand Up @@ -13,9 +13,7 @@
* limitations under the License.
*/

#ifdef MONITOR_SESSIONS
#include <chrono>
#endif

#include "flex/engines/graph_db/app/app_base.h"
#include "flex/engines/graph_db/database/graph_db.h"
Expand Down Expand Up @@ -127,11 +125,9 @@ Result<std::vector<char>> GraphDBSession::Eval(const std::string& input) {
app_metrics_[type].add_record(
std::chrono::duration_cast<std::chrono::microseconds>(end - start)
.count());
#ifdef MONITOR_SESSIONS
eval_duration_.fetch_add(
std::chrono::duration_cast<std::chrono::microseconds>(end - start)
.count());
#endif
++query_num_;
return result_buffer;
}
Expand All @@ -146,12 +142,10 @@ Result<std::vector<char>> GraphDBSession::Eval(const std::string& input) {
result_buffer.clear();
}

#ifdef MONITOR_SESSIONS
const auto end = std::chrono::high_resolution_clock::now();
eval_duration_.fetch_add(
std::chrono::duration_cast<std::chrono::microseconds>(end - start)
.count());
#endif
++query_num_;
return Result<std::vector<char>>(
StatusCode::QueryFailed,
Expand Down Expand Up @@ -180,13 +174,10 @@ bool GraphDBSession::Compact() {
}
}

#ifdef MONITOR_SESSIONS
double GraphDBSession::eval_duration() const {
return static_cast<double>(eval_duration_.load()) / 1000000.0;
}

#endif

int64_t GraphDBSession::query_num() const { return query_num_.load(); }

#define likely(x) __builtin_expect(!!(x), 1)
Expand Down
Loading

0 comments on commit b8c02b9

Please sign in to comment.