Skip to content

Commit

Permalink
Add configuration max_log_segment_file_size and fix illegal open segm…
Browse files Browse the repository at this point in the history
…ent detection
  • Loading branch information
JackyWoo committed Sep 29, 2024
1 parent f64e8dc commit 1895964
Show file tree
Hide file tree
Showing 16 changed files with 109 additions and 82 deletions.
1 change: 1 addition & 0 deletions docs/how-to-monitor-and-manage.md
Original file line number Diff line number Diff line change
Expand Up @@ -195,6 +195,7 @@ startup_timeout=6000000
raft_logs_level=information
log_fsync_mode=fsync_parallel
log_fsync_interval=1000
max_log_segment_file_size=1073741824
nuraft_thread_size=16
fresh_log_gap=200
```
Expand Down
3 changes: 3 additions & 0 deletions programs/server/config.xml
Original file line number Diff line number Diff line change
Expand Up @@ -133,6 +133,9 @@

<!-- If log_fsync_mode is fsync_batch, will fsync log after x appending entries, default value is 1000. -->
<!-- <log_fsync_interval>1000</log_fsync_interval> -->

<!-- Max single log segment file size, default is 1G. -->
<!-- <max_log_segment_file_size>1073741824</max_log_segment_file_size> -->
</raft_settings>

<!-- If you want a RaftKeeper cluster, you can uncomment this and configure it carefully -->
Expand Down
36 changes: 18 additions & 18 deletions src/Service/LoggerWrapper.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -13,17 +13,17 @@ NuRaftLogLevel parseNuRaftLogLevel(const String & level)
{
NuRaftLogLevel log_level;
if (level == "trace")
log_level = RAFT_LOG_TRACE;
log_level = NuRaftLogLevel::RAFT_LOG_TRACE;
else if (level == "debug")
log_level = RAFT_LOG_DEBUG;
log_level = NuRaftLogLevel::RAFT_LOG_DEBUG;
else if (level == "information")
log_level = RAFT_LOG_INFORMATION;
log_level = NuRaftLogLevel::RAFT_LOG_INFORMATION;
else if (level == "warning")
log_level = RAFT_LOG_WARNING;
log_level = NuRaftLogLevel::RAFT_LOG_WARNING;
else if (level == "error")
log_level = RAFT_LOG_ERROR;
log_level = NuRaftLogLevel::RAFT_LOG_ERROR;
else if (level == "fatal")
log_level = RAFT_LOG_FATAL;
log_level = NuRaftLogLevel::RAFT_LOG_FATAL;
else
throw Exception("Valid log level values: 'trace', 'debug', 'information', 'warning', 'error', 'fatal'", ErrorCodes::INVALID_LOG_LEVEL);
return log_level;
Expand All @@ -32,17 +32,17 @@ NuRaftLogLevel parseNuRaftLogLevel(const String & level)
String nuRaftLogLevelToString(NuRaftLogLevel level)
{
String log_level;
if (level == RAFT_LOG_TRACE)
if (level == NuRaftLogLevel::RAFT_LOG_TRACE)
log_level = "trace";
else if (level == RAFT_LOG_DEBUG)
else if (level == NuRaftLogLevel::RAFT_LOG_DEBUG)
log_level = "debug";
else if (level == RAFT_LOG_INFORMATION)
else if (level == NuRaftLogLevel::RAFT_LOG_INFORMATION)
log_level = "information";
else if (level == RAFT_LOG_WARNING)
else if (level == NuRaftLogLevel::RAFT_LOG_WARNING)
log_level = "warning";
else if (level == RAFT_LOG_ERROR)
else if (level == NuRaftLogLevel::RAFT_LOG_ERROR)
log_level = "error";
else if (level == RAFT_LOG_FATAL)
else if (level == NuRaftLogLevel::RAFT_LOG_FATAL)
log_level = "fatal";
else
throw Exception("Valid log level", ErrorCodes::INVALID_LOG_LEVEL);
Expand All @@ -55,22 +55,22 @@ Poco::Message::Priority toPocoLogLevel(NuRaftLogLevel level)
int poco_log_level;
switch (level)
{
case RAFT_LOG_FATAL:
case NuRaftLogLevel::RAFT_LOG_FATAL:
poco_log_level = Message::Priority::PRIO_FATAL;
break;
case RAFT_LOG_ERROR:
case NuRaftLogLevel::RAFT_LOG_ERROR:
poco_log_level = Message::Priority::PRIO_ERROR;
break;
case RAFT_LOG_WARNING:
case NuRaftLogLevel::RAFT_LOG_WARNING:
poco_log_level = Message::Priority::PRIO_WARNING;
break;
case RAFT_LOG_INFORMATION:
case NuRaftLogLevel::RAFT_LOG_INFORMATION:
poco_log_level = Message::Priority::PRIO_INFORMATION;
break;
case RAFT_LOG_DEBUG:
case NuRaftLogLevel::RAFT_LOG_DEBUG:
poco_log_level = Message::Priority::PRIO_DEBUG;
break;
case RAFT_LOG_TRACE:
case NuRaftLogLevel::RAFT_LOG_TRACE:
poco_log_level = Message::Priority::PRIO_TRACE;
break;
}
Expand Down
8 changes: 4 additions & 4 deletions src/Service/LoggerWrapper.h
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
namespace RK
{

enum NuRaftLogLevel
enum class NuRaftLogLevel
{
RAFT_LOG_FATAL = 1,
RAFT_LOG_ERROR,
Expand All @@ -30,8 +30,8 @@ Poco::Message::Priority toPocoLogLevel(NuRaftLogLevel level);
class LoggerWrapper : public nuraft::logger
{
private:
static inline const int LEVEL_MAX = static_cast<int>(RAFT_LOG_TRACE);
static inline const int LEVEL_MIN = static_cast<int>(RAFT_LOG_FATAL);
static inline const int LEVEL_MAX = static_cast<int>(NuRaftLogLevel::RAFT_LOG_TRACE);
static inline const int LEVEL_MIN = static_cast<int>(NuRaftLogLevel::RAFT_LOG_FATAL);

public:
LoggerWrapper(const String & name, NuRaftLogLevel level_) : log(&Poco::Logger::get(name)), nuraft_log_level(level_)
Expand All @@ -53,7 +53,7 @@ class LoggerWrapper : public nuraft::logger
log->setLevel(toPocoLogLevel(static_cast<NuRaftLogLevel>(nuraft_log_level)));
}

int get_level() override { return static_cast<int>(nuraft_log_level); }
int get_level() override { return static_cast<int>(nuraft_log_level.load()); }

private:
Poco::Logger * log;
Expand Down
38 changes: 14 additions & 24 deletions src/Service/NuRaftFileLogStore.cpp
Original file line number Diff line number Diff line change
@@ -1,6 +1,3 @@
#include <memory>
#include <unistd.h>
#include <Service/LogEntry.h>
#include <Service/NuRaftFileLogStore.h>
#include <Common/setThreadName.h>

Expand Down Expand Up @@ -42,25 +39,13 @@ void LogEntryQueue::clear()
i = nullptr;
}

NuRaftFileLogStore::NuRaftFileLogStore(
const String & log_dir,
bool force_new,
FsyncMode log_fsync_mode_,
UInt64 log_fsync_interval_,
UInt32 max_log_size_)
: log_fsync_mode(log_fsync_mode_), log_fsync_interval(log_fsync_interval_)
NuRaftFileLogStore::NuRaftFileLogStore( const String & log_dir, bool force_new, FsyncMode log_fsync_mode_, UInt64 log_fsync_interval_, UInt64 max_log_segment_file_size_)
: log_fsync_mode(log_fsync_mode_)
, log_fsync_interval(log_fsync_interval_)
, log(&Poco::Logger::get("FileLogStore"))
{
log = &(Poco::Logger::get("FileLogStore"));

if (log_fsync_mode == FsyncMode::FSYNC_PARALLEL)
{
parallel_fsync_event = std::make_shared<Poco::Event>();

fsync_thread = ThreadFromGlobalPool([this] { fsyncThread(); });
}

segment_store = LogSegmentStore::getInstance(log_dir, force_new);
segment_store->init(max_log_size_);
segment_store = LogSegmentStore::getInstance(log_dir, force_new, max_log_segment_file_size_);
segment_store->init();

if (segment_store->lastLogIndex() < 1)
/// no log entry exists, return a dummy constant entry with value set to null and term set to zero
Expand All @@ -69,6 +54,12 @@ NuRaftFileLogStore::NuRaftFileLogStore(
last_log_entry = segment_store->getEntry(segment_store->lastLogIndex());

disk_last_durable_index = segment_store->lastLogIndex();

if (log_fsync_mode == FsyncMode::FSYNC_PARALLEL)
{
parallel_fsync_event = std::make_shared<Poco::Event>();
fsync_thread = ThreadFromGlobalPool([this] { fsyncThread(); });
}
}

void NuRaftFileLogStore::shutdown()
Expand Down Expand Up @@ -99,8 +90,7 @@ void NuRaftFileLogStore::fsyncThread()
{
parallel_fsync_event->wait();

UInt64 last_flush_index = segment_store->flush();
if (last_flush_index)
if (UInt64 last_flush_index = segment_store->flush())
{
disk_last_durable_index = last_flush_index;
if (raft_instance) /// For test
Expand Down Expand Up @@ -130,7 +120,7 @@ ptr<log_entry> NuRaftFileLogStore::last_entry() const

ulong NuRaftFileLogStore::append(ptr<log_entry> & entry)
{
ptr<log_entry> cloned = cloneLogEntry(entry);
const ptr<log_entry> cloned = cloneLogEntry(entry);
UInt64 log_index = segment_store->appendEntry(entry);
log_queue.putEntry(log_index, cloned);

Expand Down
6 changes: 3 additions & 3 deletions src/Service/NuRaftFileLogStore.h
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ class NuRaftFileLogStore : public nuraft::log_store
bool force_new = false,
FsyncMode log_fsync_mode_ = FsyncMode::FSYNC_PARALLEL,
UInt64 log_fsync_interval_ = 1000,
UInt32 max_log_size_ = LogSegmentStore::MAX_SEGMENT_FILE_SIZE);
UInt64 max_log_segment_file_size_ = LogSegmentStore::MAX_LOG_SEGMENT_FILE_SIZE);

~NuRaftFileLogStore() override;

Expand Down Expand Up @@ -192,8 +192,6 @@ class NuRaftFileLogStore : public nuraft::log_store
/// Thread used to flush log, only used in FSYNC_PARALLEL mode
void fsyncThread();

Poco::Logger * log;

/// Used to operate log in the store
ptr<LogSegmentStore> segment_store;

Expand Down Expand Up @@ -224,6 +222,8 @@ class NuRaftFileLogStore : public nuraft::log_store
nuraft::ptr<nuraft::raft_server> raft_instance;

std::atomic<bool> shutdown_called{false};

Poco::Logger * log;
};

}
14 changes: 6 additions & 8 deletions src/Service/NuRaftLogSegment.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -535,19 +535,17 @@ bool NuRaftLogSegment::truncate(const UInt64 last_index_kept)
return true;
}

ptr<LogSegmentStore> LogSegmentStore::getInstance(const String & log_dir_, bool force_new)
ptr<LogSegmentStore> LogSegmentStore::getInstance(const String & log_dir_, bool force_new, UInt32 max_log_segment_file_size_)
{
static ptr<LogSegmentStore> segment_store;
if (segment_store == nullptr || force_new)
segment_store = cs_new<LogSegmentStore>(log_dir_);
segment_store = cs_new<LogSegmentStore>(log_dir_, max_log_segment_file_size_);
return segment_store;
}

void LogSegmentStore::init(UInt32 max_segment_file_size_)
void LogSegmentStore::init()
{
LOG_INFO(log, "Initializing log segment store, max segment file size {} bytes.", max_segment_file_size_);

max_segment_file_size = max_segment_file_size_;
LOG_INFO(log, "Initializing log segment store with directory {}", log_dir);

Poco::File(log_dir).createDirectories();

Expand Down Expand Up @@ -588,7 +586,7 @@ void LogSegmentStore::openNewSegmentIfNeeded()
{
{
std::shared_lock read_lock(seg_mutex);
if (open_segment && open_segment->getFileSize() <= max_segment_file_size && open_segment->getVersion() >= CURRENT_LOG_VERSION)
if (open_segment && open_segment->getFileSize() <= max_log_segment_file_size && open_segment->getVersion() >= CURRENT_LOG_VERSION)
return;
}

Expand Down Expand Up @@ -854,9 +852,9 @@ bool parseSegmentFileName(const String & file_name, UInt64 & first_index, UInt64
}
else
{
is_open = false;
if (!tryReadUInt64Text(tokens[2], last_index))
return false;
is_open = true;
}

create_time = std::move(tokens[3]);
Expand Down
15 changes: 7 additions & 8 deletions src/Service/NuRaftLogSegment.h
Original file line number Diff line number Diff line change
Expand Up @@ -160,7 +160,7 @@ class NuRaftLogSegment
};

/**
* LogSegmentStore manages log segments and it uses segmented append-only file, all data
* LogSegmentStore manages log segments, it uses segmented append-only file, all data
* in disk, all index in memory. Append one log entry, only cause one disk write, every
* disk write will call fsync().
*
Expand All @@ -173,23 +173,22 @@ class LogSegmentStore final
public:
using Segments = std::vector<ptr<NuRaftLogSegment>>;

static constexpr UInt32 MAX_SEGMENT_FILE_SIZE = 1000 * 1024 * 1024; /// 1GB, 0.3K/Log, 3M logs
static constexpr UInt64 MAX_LOG_SEGMENT_FILE_SIZE = 1024 * 1024 * 1024; /// 1GB, 0.3K/Log, 3M logs
static constexpr size_t LOAD_THREAD_NUM = 8;

explicit LogSegmentStore(const String & log_dir_)
explicit LogSegmentStore(const String & log_dir_, UInt64 max_log_segment_file_size_ = MAX_LOG_SEGMENT_FILE_SIZE)
: log_dir(log_dir_)
, first_log_index(1)
, last_log_index(0)
, max_segment_file_size(MAX_SEGMENT_FILE_SIZE)
, max_log_segment_file_size(max_log_segment_file_size_)
, log(&Poco::Logger::get("LogSegmentStore"))
{
LOG_INFO(log, "Create LogSegmentStore {}.", log_dir_);
}

static ptr<LogSegmentStore> getInstance(const String & log_dir, bool force_new = false);
static ptr<LogSegmentStore> getInstance(const String & log_dir, bool force_new = false, UInt32 max_log_segment_file_size_ = MAX_LOG_SEGMENT_FILE_SIZE);

/// Init log store, will create dir if not exist
void init(UInt32 max_segment_file_size_ = MAX_SEGMENT_FILE_SIZE);
void init();

void close();
/// Return last flushed log index
Expand Down Expand Up @@ -250,7 +249,7 @@ class LogSegmentStore final
std::atomic<UInt64> last_log_index;

/// max segment file size
UInt32 max_segment_file_size;
UInt32 max_log_segment_file_size;

Poco::Logger * log;

Expand Down
14 changes: 10 additions & 4 deletions src/Service/NuRaftStateManager.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -13,11 +13,17 @@ namespace RK
using namespace nuraft;

NuRaftStateManager::NuRaftStateManager(int32_t id_, const Poco::Util::AbstractConfiguration & config_, SettingsPtr settings_)
: settings(settings_), my_id(id_), my_host(settings_->host), my_internal_port(settings_->internal_port), log_dir(settings_->log_dir)
: settings(settings_)
, my_id(id_)
, my_host(settings_->host)
, my_internal_port(settings_->internal_port)
, log_dir(settings_->log_dir)
, log(&Poco::Logger::get("NuRaftStateManager"))
{
log = &(Poco::Logger::get("NuRaftStateManager"));
curr_log_store
= cs_new<NuRaftFileLogStore>(log_dir, false, settings->raft_settings->log_fsync_mode, settings->raft_settings->log_fsync_interval);
curr_log_store = cs_new<NuRaftFileLogStore>(log_dir
, false, settings->raft_settings->log_fsync_mode
, settings->raft_settings->log_fsync_interval
, settings->raft_settings->max_log_segment_file_size);

srv_state_file = fs::path(log_dir) / "srv_state";
cluster_config_file = fs::path(log_dir) / "cluster_config";
Expand Down
4 changes: 2 additions & 2 deletions src/Service/NuRaftStateManager.h
Original file line number Diff line number Diff line change
Expand Up @@ -97,11 +97,11 @@ class NuRaftStateManager : public nuraft::state_mgr
*/
mutable std::mutex cluster_config_mutex;


protected:
Poco::Logger * log;
String srv_state_file;
String cluster_config_file;

Poco::Logger * log;
};

}
5 changes: 5 additions & 0 deletions src/Service/Settings.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,7 @@ void RaftSettings::loadFromConfig(const String & config_elem, const Poco::Util::
max_batch_size = config.getUInt(get_key("max_batch_size"), 1000);
log_fsync_mode = FsyncModeNS::parseFsyncMode(config.getString(get_key("log_fsync_mode"), "fsync_parallel"));
log_fsync_interval = config.getUInt(get_key("log_fsync_interval"), 1000);
max_log_segment_file_size = config.getUInt(get_key("max_log_segment_file_size"), 1073741824);
async_snapshot = config.getBool(get_key("async_snapshot"), true);
}
catch (Exception & e)
Expand Down Expand Up @@ -121,6 +122,7 @@ RaftSettingsPtr RaftSettings::getDefault()
settings->configuration_change_tries_count = 30;
settings->max_batch_size = 1000;
settings->log_fsync_interval = 1000;
settings->max_log_segment_file_size = 1073741824;
settings->log_fsync_mode = FsyncMode::FSYNC_PARALLEL;
settings->async_snapshot = true;

Expand Down Expand Up @@ -222,6 +224,9 @@ void Settings::dump(WriteBufferFromOwnString & buf) const
buf.write('\n');
writeText("log_fsync_interval=", buf);
write_int(raft_settings->log_fsync_interval);
buf.write('\n');
writeText("max_log_segment_file_size=", buf);
write_int(raft_settings->max_log_segment_file_size);

writeText("nuraft_thread_size=", buf);
write_int(raft_settings->nuraft_thread_size);
Expand Down
4 changes: 3 additions & 1 deletion src/Service/Settings.h
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ namespace RK
{

/// Raft log fsync mode.
enum FsyncMode
enum class FsyncMode
{
/// The leader can do log replication and log persisting in parallel, thus it can reduce the latency of write operation path.
/// In this mode data is safety.
Expand Down Expand Up @@ -110,6 +110,8 @@ struct RaftSettings
FsyncMode log_fsync_mode;
/// How many logs do once fsync when async_fsync is false
UInt64 log_fsync_interval;
/// We store logs in multiple file, this setting represent the max single log segment file size in bytes.
UInt64 max_log_segment_file_size;
/// Whether async snapshot
bool async_snapshot;

Expand Down
Loading

0 comments on commit 1895964

Please sign in to comment.