Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add configuration max_log_segment_file_size and fix illegal open segment detection #366

Merged
merged 2 commits into from
Sep 29, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 6 additions & 8 deletions .github/workflows/github_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@
private_key = private_key_part_1 + private_key_part_2

def comment_on_pr(content, title):
print("content:", content)
print(f"content: {content}, title: {title}")
if content is None:
raise Exception("Content is required")

Expand All @@ -68,18 +68,16 @@ def comment_on_pr(content, title):
comments = pull_request.get_issue_comments()

# Check if a comment by raftkeepeer-robot[bot] exists
comment_found = False
for comment in comments:
print("find comment:", comment)
if comment.user.login == "raftkeepeer-robot[bot]" and title in comment.body:
# print(f"find comment for {comment.user.login}")
# print(f"comment body: {comment.body}")
if comment.user.login == "raftkeeper-robot[bot]" and title in comment.body:
# Update the existing comment
comment.edit(content)
comment_found = True
break
return

# If not, create a new comment
if not comment_found:
pull_request.create_issue_comment(content)
pull_request.create_issue_comment(content)


def get_artifact_url(artifact_name):
Expand Down
3 changes: 1 addition & 2 deletions .github/workflows/report.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,10 +16,9 @@ def report(report_dir, report_type):
else:
raise ValueError('Integration test report not available')

title_prefix_copy = title_prefix
content = generate_report(report_dir, title_prefix)
if content is not None:
comment_on_pr(content, title_prefix_copy)
comment_on_pr(content, title_prefix)


if __name__ == "__main__":
Expand Down
1 change: 1 addition & 0 deletions docs/how-to-monitor-and-manage.md
Original file line number Diff line number Diff line change
Expand Up @@ -195,6 +195,7 @@ startup_timeout=6000000
raft_logs_level=information
log_fsync_mode=fsync_parallel
log_fsync_interval=1000
max_log_segment_file_size=1073741824
nuraft_thread_size=16
fresh_log_gap=200
```
Expand Down
3 changes: 3 additions & 0 deletions programs/server/config.xml
Original file line number Diff line number Diff line change
Expand Up @@ -133,6 +133,9 @@

<!-- If log_fsync_mode is fsync_batch, will fsync log after x appending entries, default value is 1000. -->
<!-- <log_fsync_interval>1000</log_fsync_interval> -->

<!-- Max single log segment file size, default is 1G. -->
<!-- <max_log_segment_file_size>1073741824</max_log_segment_file_size> -->
</raft_settings>

<!-- If you want a RaftKeeper cluster, you can uncomment this and configure it carefully -->
Expand Down
36 changes: 18 additions & 18 deletions src/Service/LoggerWrapper.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -13,17 +13,17 @@ NuRaftLogLevel parseNuRaftLogLevel(const String & level)
{
NuRaftLogLevel log_level;
if (level == "trace")
log_level = RAFT_LOG_TRACE;
log_level = NuRaftLogLevel::RAFT_LOG_TRACE;
else if (level == "debug")
log_level = RAFT_LOG_DEBUG;
log_level = NuRaftLogLevel::RAFT_LOG_DEBUG;
else if (level == "information")
log_level = RAFT_LOG_INFORMATION;
log_level = NuRaftLogLevel::RAFT_LOG_INFORMATION;
else if (level == "warning")
log_level = RAFT_LOG_WARNING;
log_level = NuRaftLogLevel::RAFT_LOG_WARNING;
else if (level == "error")
log_level = RAFT_LOG_ERROR;
log_level = NuRaftLogLevel::RAFT_LOG_ERROR;
else if (level == "fatal")
log_level = RAFT_LOG_FATAL;
log_level = NuRaftLogLevel::RAFT_LOG_FATAL;
else
throw Exception("Valid log level values: 'trace', 'debug', 'information', 'warning', 'error', 'fatal'", ErrorCodes::INVALID_LOG_LEVEL);
return log_level;
Expand All @@ -32,17 +32,17 @@ NuRaftLogLevel parseNuRaftLogLevel(const String & level)
String nuRaftLogLevelToString(NuRaftLogLevel level)
{
String log_level;
if (level == RAFT_LOG_TRACE)
if (level == NuRaftLogLevel::RAFT_LOG_TRACE)
log_level = "trace";
else if (level == RAFT_LOG_DEBUG)
else if (level == NuRaftLogLevel::RAFT_LOG_DEBUG)
log_level = "debug";
else if (level == RAFT_LOG_INFORMATION)
else if (level == NuRaftLogLevel::RAFT_LOG_INFORMATION)
log_level = "information";
else if (level == RAFT_LOG_WARNING)
else if (level == NuRaftLogLevel::RAFT_LOG_WARNING)
log_level = "warning";
else if (level == RAFT_LOG_ERROR)
else if (level == NuRaftLogLevel::RAFT_LOG_ERROR)
log_level = "error";
else if (level == RAFT_LOG_FATAL)
else if (level == NuRaftLogLevel::RAFT_LOG_FATAL)
log_level = "fatal";
else
throw Exception("Valid log level", ErrorCodes::INVALID_LOG_LEVEL);
Expand All @@ -55,22 +55,22 @@ Poco::Message::Priority toPocoLogLevel(NuRaftLogLevel level)
int poco_log_level;
switch (level)
{
case RAFT_LOG_FATAL:
case NuRaftLogLevel::RAFT_LOG_FATAL:
poco_log_level = Message::Priority::PRIO_FATAL;
break;
case RAFT_LOG_ERROR:
case NuRaftLogLevel::RAFT_LOG_ERROR:
poco_log_level = Message::Priority::PRIO_ERROR;
break;
case RAFT_LOG_WARNING:
case NuRaftLogLevel::RAFT_LOG_WARNING:
poco_log_level = Message::Priority::PRIO_WARNING;
break;
case RAFT_LOG_INFORMATION:
case NuRaftLogLevel::RAFT_LOG_INFORMATION:
poco_log_level = Message::Priority::PRIO_INFORMATION;
break;
case RAFT_LOG_DEBUG:
case NuRaftLogLevel::RAFT_LOG_DEBUG:
poco_log_level = Message::Priority::PRIO_DEBUG;
break;
case RAFT_LOG_TRACE:
case NuRaftLogLevel::RAFT_LOG_TRACE:
poco_log_level = Message::Priority::PRIO_TRACE;
break;
}
Expand Down
8 changes: 4 additions & 4 deletions src/Service/LoggerWrapper.h
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
namespace RK
{

enum NuRaftLogLevel
enum class NuRaftLogLevel
{
RAFT_LOG_FATAL = 1,
RAFT_LOG_ERROR,
Expand All @@ -30,8 +30,8 @@ Poco::Message::Priority toPocoLogLevel(NuRaftLogLevel level);
class LoggerWrapper : public nuraft::logger
{
private:
static inline const int LEVEL_MAX = static_cast<int>(RAFT_LOG_TRACE);
static inline const int LEVEL_MIN = static_cast<int>(RAFT_LOG_FATAL);
static inline const int LEVEL_MAX = static_cast<int>(NuRaftLogLevel::RAFT_LOG_TRACE);
static inline const int LEVEL_MIN = static_cast<int>(NuRaftLogLevel::RAFT_LOG_FATAL);

public:
LoggerWrapper(const String & name, NuRaftLogLevel level_) : log(&Poco::Logger::get(name)), nuraft_log_level(level_)
Expand All @@ -53,7 +53,7 @@ class LoggerWrapper : public nuraft::logger
log->setLevel(toPocoLogLevel(static_cast<NuRaftLogLevel>(nuraft_log_level)));
}

int get_level() override { return static_cast<int>(nuraft_log_level); }
int get_level() override { return static_cast<int>(nuraft_log_level.load()); }

private:
Poco::Logger * log;
Expand Down
38 changes: 14 additions & 24 deletions src/Service/NuRaftFileLogStore.cpp
Original file line number Diff line number Diff line change
@@ -1,6 +1,3 @@
#include <memory>
#include <unistd.h>
#include <Service/LogEntry.h>
#include <Service/NuRaftFileLogStore.h>
#include <Common/setThreadName.h>

Expand Down Expand Up @@ -42,25 +39,13 @@ void LogEntryQueue::clear()
i = nullptr;
}

NuRaftFileLogStore::NuRaftFileLogStore(
const String & log_dir,
bool force_new,
FsyncMode log_fsync_mode_,
UInt64 log_fsync_interval_,
UInt32 max_log_size_)
: log_fsync_mode(log_fsync_mode_), log_fsync_interval(log_fsync_interval_)
NuRaftFileLogStore::NuRaftFileLogStore( const String & log_dir, bool force_new, FsyncMode log_fsync_mode_, UInt64 log_fsync_interval_, UInt64 max_log_segment_file_size_)
: log_fsync_mode(log_fsync_mode_)
, log_fsync_interval(log_fsync_interval_)
, log(&Poco::Logger::get("FileLogStore"))
{
log = &(Poco::Logger::get("FileLogStore"));

if (log_fsync_mode == FsyncMode::FSYNC_PARALLEL)
{
parallel_fsync_event = std::make_shared<Poco::Event>();

fsync_thread = ThreadFromGlobalPool([this] { fsyncThread(); });
}

segment_store = LogSegmentStore::getInstance(log_dir, force_new);
segment_store->init(max_log_size_);
segment_store = LogSegmentStore::getInstance(log_dir, force_new, max_log_segment_file_size_);
segment_store->init();

if (segment_store->lastLogIndex() < 1)
/// no log entry exists, return a dummy constant entry with value set to null and term set to zero
Expand All @@ -69,6 +54,12 @@ NuRaftFileLogStore::NuRaftFileLogStore(
last_log_entry = segment_store->getEntry(segment_store->lastLogIndex());

disk_last_durable_index = segment_store->lastLogIndex();

if (log_fsync_mode == FsyncMode::FSYNC_PARALLEL)
{
parallel_fsync_event = std::make_shared<Poco::Event>();
fsync_thread = ThreadFromGlobalPool([this] { fsyncThread(); });
}
}

void NuRaftFileLogStore::shutdown()
Expand Down Expand Up @@ -99,8 +90,7 @@ void NuRaftFileLogStore::fsyncThread()
{
parallel_fsync_event->wait();

UInt64 last_flush_index = segment_store->flush();
if (last_flush_index)
if (UInt64 last_flush_index = segment_store->flush())
{
disk_last_durable_index = last_flush_index;
if (raft_instance) /// For test
Expand Down Expand Up @@ -130,7 +120,7 @@ ptr<log_entry> NuRaftFileLogStore::last_entry() const

ulong NuRaftFileLogStore::append(ptr<log_entry> & entry)
{
ptr<log_entry> cloned = cloneLogEntry(entry);
const ptr<log_entry> cloned = cloneLogEntry(entry);
UInt64 log_index = segment_store->appendEntry(entry);
log_queue.putEntry(log_index, cloned);

Expand Down
8 changes: 3 additions & 5 deletions src/Service/NuRaftFileLogStore.h
Original file line number Diff line number Diff line change
@@ -1,8 +1,6 @@
#pragma once

#include <atomic>
#include <map>
#include <mutex>
#include <Service/NuRaftLogSegment.h>
#include <Service/Settings.h>
#include <libnuraft/nuraft.hxx>
Expand Down Expand Up @@ -55,7 +53,7 @@ class NuRaftFileLogStore : public nuraft::log_store
bool force_new = false,
FsyncMode log_fsync_mode_ = FsyncMode::FSYNC_PARALLEL,
UInt64 log_fsync_interval_ = 1000,
UInt32 max_log_size_ = LogSegmentStore::MAX_SEGMENT_FILE_SIZE);
UInt64 max_log_segment_file_size_ = LogSegmentStore::MAX_LOG_SEGMENT_FILE_SIZE);

~NuRaftFileLogStore() override;

Expand Down Expand Up @@ -192,8 +190,6 @@ class NuRaftFileLogStore : public nuraft::log_store
/// Thread used to flush log, only used in FSYNC_PARALLEL mode
void fsyncThread();

Poco::Logger * log;

/// Used to operate log in the store
ptr<LogSegmentStore> segment_store;

Expand Down Expand Up @@ -224,6 +220,8 @@ class NuRaftFileLogStore : public nuraft::log_store
nuraft::ptr<nuraft::raft_server> raft_instance;

std::atomic<bool> shutdown_called{false};

Poco::Logger * log;
};

}
14 changes: 6 additions & 8 deletions src/Service/NuRaftLogSegment.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -535,19 +535,17 @@ bool NuRaftLogSegment::truncate(const UInt64 last_index_kept)
return true;
}

ptr<LogSegmentStore> LogSegmentStore::getInstance(const String & log_dir_, bool force_new)
ptr<LogSegmentStore> LogSegmentStore::getInstance(const String & log_dir_, bool force_new, UInt32 max_log_segment_file_size_)
{
static ptr<LogSegmentStore> segment_store;
if (segment_store == nullptr || force_new)
segment_store = cs_new<LogSegmentStore>(log_dir_);
segment_store = cs_new<LogSegmentStore>(log_dir_, max_log_segment_file_size_);
return segment_store;
}

void LogSegmentStore::init(UInt32 max_segment_file_size_)
void LogSegmentStore::init()
{
LOG_INFO(log, "Initializing log segment store, max segment file size {} bytes.", max_segment_file_size_);

max_segment_file_size = max_segment_file_size_;
LOG_INFO(log, "Initializing log segment store with directory {}", log_dir);

Poco::File(log_dir).createDirectories();

Expand Down Expand Up @@ -588,7 +586,7 @@ void LogSegmentStore::openNewSegmentIfNeeded()
{
{
std::shared_lock read_lock(seg_mutex);
if (open_segment && open_segment->getFileSize() <= max_segment_file_size && open_segment->getVersion() >= CURRENT_LOG_VERSION)
if (open_segment && open_segment->getFileSize() <= max_log_segment_file_size && open_segment->getVersion() >= CURRENT_LOG_VERSION)
return;
}

Expand Down Expand Up @@ -854,9 +852,9 @@ bool parseSegmentFileName(const String & file_name, UInt64 & first_index, UInt64
}
else
{
is_open = false;
if (!tryReadUInt64Text(tokens[2], last_index))
return false;
is_open = true;
}

create_time = std::move(tokens[3]);
Expand Down
15 changes: 7 additions & 8 deletions src/Service/NuRaftLogSegment.h
Original file line number Diff line number Diff line change
Expand Up @@ -160,7 +160,7 @@ class NuRaftLogSegment
};

/**
* LogSegmentStore manages log segments and it uses segmented append-only file, all data
* LogSegmentStore manages log segments, it uses segmented append-only file, all data
* in disk, all index in memory. Append one log entry, only cause one disk write, every
* disk write will call fsync().
*
Expand All @@ -173,23 +173,22 @@ class LogSegmentStore final
public:
using Segments = std::vector<ptr<NuRaftLogSegment>>;

static constexpr UInt32 MAX_SEGMENT_FILE_SIZE = 1000 * 1024 * 1024; /// 1GB, 0.3K/Log, 3M logs
static constexpr UInt64 MAX_LOG_SEGMENT_FILE_SIZE = 1024 * 1024 * 1024; /// 1GB, 0.3K/Log, 3M logs
static constexpr size_t LOAD_THREAD_NUM = 8;

explicit LogSegmentStore(const String & log_dir_)
explicit LogSegmentStore(const String & log_dir_, UInt64 max_log_segment_file_size_ = MAX_LOG_SEGMENT_FILE_SIZE)
: log_dir(log_dir_)
, first_log_index(1)
, last_log_index(0)
, max_segment_file_size(MAX_SEGMENT_FILE_SIZE)
, max_log_segment_file_size(max_log_segment_file_size_)
, log(&Poco::Logger::get("LogSegmentStore"))
{
LOG_INFO(log, "Create LogSegmentStore {}.", log_dir_);
}

static ptr<LogSegmentStore> getInstance(const String & log_dir, bool force_new = false);
static ptr<LogSegmentStore> getInstance(const String & log_dir, bool force_new = false, UInt32 max_log_segment_file_size_ = MAX_LOG_SEGMENT_FILE_SIZE);

/// Init log store, will create dir if not exist
void init(UInt32 max_segment_file_size_ = MAX_SEGMENT_FILE_SIZE);
void init();

void close();
/// Return last flushed log index
Expand Down Expand Up @@ -250,7 +249,7 @@ class LogSegmentStore final
std::atomic<UInt64> last_log_index;

/// max segment file size
UInt32 max_segment_file_size;
UInt32 max_log_segment_file_size;

Poco::Logger * log;

Expand Down
14 changes: 10 additions & 4 deletions src/Service/NuRaftStateManager.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -13,11 +13,17 @@ namespace RK
using namespace nuraft;

NuRaftStateManager::NuRaftStateManager(int32_t id_, const Poco::Util::AbstractConfiguration & config_, SettingsPtr settings_)
: settings(settings_), my_id(id_), my_host(settings_->host), my_internal_port(settings_->internal_port), log_dir(settings_->log_dir)
: settings(settings_)
, my_id(id_)
, my_host(settings_->host)
, my_internal_port(settings_->internal_port)
, log_dir(settings_->log_dir)
, log(&Poco::Logger::get("NuRaftStateManager"))
{
log = &(Poco::Logger::get("NuRaftStateManager"));
curr_log_store
= cs_new<NuRaftFileLogStore>(log_dir, false, settings->raft_settings->log_fsync_mode, settings->raft_settings->log_fsync_interval);
curr_log_store = cs_new<NuRaftFileLogStore>(log_dir
, false, settings->raft_settings->log_fsync_mode
, settings->raft_settings->log_fsync_interval
, settings->raft_settings->max_log_segment_file_size);

srv_state_file = fs::path(log_dir) / "srv_state";
cluster_config_file = fs::path(log_dir) / "cluster_config";
Expand Down
Loading
Loading