From 1895964a27aefb694f98c0568cd3d0dc9fe1fb67 Mon Sep 17 00:00:00 2001 From: JackyWoo Date: Sun, 29 Sep 2024 11:53:13 +0800 Subject: [PATCH] Add configuration max_log_segment_file_size and fix illegal open segment detection --- docs/how-to-monitor-and-manage.md | 1 + programs/server/config.xml | 3 ++ src/Service/LoggerWrapper.cpp | 36 +++++++++--------- src/Service/LoggerWrapper.h | 8 ++-- src/Service/NuRaftFileLogStore.cpp | 38 +++++++------------ src/Service/NuRaftFileLogStore.h | 6 +-- src/Service/NuRaftLogSegment.cpp | 14 +++---- src/Service/NuRaftLogSegment.h | 15 ++++---- src/Service/NuRaftStateManager.cpp | 14 +++++-- src/Service/NuRaftStateManager.h | 4 +- src/Service/Settings.cpp | 5 +++ src/Service/Settings.h | 4 +- src/Service/tests/gtest_raft_log.cpp | 20 +++++----- .../test_four_word_command/test.py | 1 + .../configs/enable_keeper.xml | 1 + tests/integration/test_persistent_log/test.py | 21 ++++++++++ 16 files changed, 109 insertions(+), 82 deletions(-) diff --git a/docs/how-to-monitor-and-manage.md b/docs/how-to-monitor-and-manage.md index 0c14e04196f..872abf4dc01 100644 --- a/docs/how-to-monitor-and-manage.md +++ b/docs/how-to-monitor-and-manage.md @@ -195,6 +195,7 @@ startup_timeout=6000000 raft_logs_level=information log_fsync_mode=fsync_parallel log_fsync_interval=1000 +max_log_segment_file_size=1073741824 nuraft_thread_size=16 fresh_log_gap=200 ``` diff --git a/programs/server/config.xml b/programs/server/config.xml index 58dbd04030a..3568349ec70 100644 --- a/programs/server/config.xml +++ b/programs/server/config.xml @@ -133,6 +133,9 @@ + + + diff --git a/src/Service/LoggerWrapper.cpp b/src/Service/LoggerWrapper.cpp index 5dad252e4da..0f0a66df30a 100644 --- a/src/Service/LoggerWrapper.cpp +++ b/src/Service/LoggerWrapper.cpp @@ -13,17 +13,17 @@ NuRaftLogLevel parseNuRaftLogLevel(const String & level) { NuRaftLogLevel log_level; if (level == "trace") - log_level = RAFT_LOG_TRACE; + log_level = NuRaftLogLevel::RAFT_LOG_TRACE; else if (level == "debug") - log_level = RAFT_LOG_DEBUG; + log_level = NuRaftLogLevel::RAFT_LOG_DEBUG; else if (level == "information") - log_level = RAFT_LOG_INFORMATION; + log_level = NuRaftLogLevel::RAFT_LOG_INFORMATION; else if (level == "warning") - log_level = RAFT_LOG_WARNING; + log_level = NuRaftLogLevel::RAFT_LOG_WARNING; else if (level == "error") - log_level = RAFT_LOG_ERROR; + log_level = NuRaftLogLevel::RAFT_LOG_ERROR; else if (level == "fatal") - log_level = RAFT_LOG_FATAL; + log_level = NuRaftLogLevel::RAFT_LOG_FATAL; else throw Exception("Valid log level values: 'trace', 'debug', 'information', 'warning', 'error', 'fatal'", ErrorCodes::INVALID_LOG_LEVEL); return log_level; @@ -32,17 +32,17 @@ NuRaftLogLevel parseNuRaftLogLevel(const String & level) String nuRaftLogLevelToString(NuRaftLogLevel level) { String log_level; - if (level == RAFT_LOG_TRACE) + if (level == NuRaftLogLevel::RAFT_LOG_TRACE) log_level = "trace"; - else if (level == RAFT_LOG_DEBUG) + else if (level == NuRaftLogLevel::RAFT_LOG_DEBUG) log_level = "debug"; - else if (level == RAFT_LOG_INFORMATION) + else if (level == NuRaftLogLevel::RAFT_LOG_INFORMATION) log_level = "information"; - else if (level == RAFT_LOG_WARNING) + else if (level == NuRaftLogLevel::RAFT_LOG_WARNING) log_level = "warning"; - else if (level == RAFT_LOG_ERROR) + else if (level == NuRaftLogLevel::RAFT_LOG_ERROR) log_level = "error"; - else if (level == RAFT_LOG_FATAL) + else if (level == NuRaftLogLevel::RAFT_LOG_FATAL) log_level = "fatal"; else throw Exception("Valid log level", ErrorCodes::INVALID_LOG_LEVEL); @@ -55,22 +55,22 @@ Poco::Message::Priority toPocoLogLevel(NuRaftLogLevel level) int poco_log_level; switch (level) { - case RAFT_LOG_FATAL: + case NuRaftLogLevel::RAFT_LOG_FATAL: poco_log_level = Message::Priority::PRIO_FATAL; break; - case RAFT_LOG_ERROR: + case NuRaftLogLevel::RAFT_LOG_ERROR: poco_log_level = Message::Priority::PRIO_ERROR; break; - case RAFT_LOG_WARNING: + case NuRaftLogLevel::RAFT_LOG_WARNING: poco_log_level = Message::Priority::PRIO_WARNING; break; - case RAFT_LOG_INFORMATION: + case NuRaftLogLevel::RAFT_LOG_INFORMATION: poco_log_level = Message::Priority::PRIO_INFORMATION; break; - case RAFT_LOG_DEBUG: + case NuRaftLogLevel::RAFT_LOG_DEBUG: poco_log_level = Message::Priority::PRIO_DEBUG; break; - case RAFT_LOG_TRACE: + case NuRaftLogLevel::RAFT_LOG_TRACE: poco_log_level = Message::Priority::PRIO_TRACE; break; } diff --git a/src/Service/LoggerWrapper.h b/src/Service/LoggerWrapper.h index 11ed5b4defe..972492e1f20 100644 --- a/src/Service/LoggerWrapper.h +++ b/src/Service/LoggerWrapper.h @@ -7,7 +7,7 @@ namespace RK { -enum NuRaftLogLevel +enum class NuRaftLogLevel { RAFT_LOG_FATAL = 1, RAFT_LOG_ERROR, @@ -30,8 +30,8 @@ Poco::Message::Priority toPocoLogLevel(NuRaftLogLevel level); class LoggerWrapper : public nuraft::logger { private: - static inline const int LEVEL_MAX = static_cast(RAFT_LOG_TRACE); - static inline const int LEVEL_MIN = static_cast(RAFT_LOG_FATAL); + static inline const int LEVEL_MAX = static_cast(NuRaftLogLevel::RAFT_LOG_TRACE); + static inline const int LEVEL_MIN = static_cast(NuRaftLogLevel::RAFT_LOG_FATAL); public: LoggerWrapper(const String & name, NuRaftLogLevel level_) : log(&Poco::Logger::get(name)), nuraft_log_level(level_) @@ -53,7 +53,7 @@ class LoggerWrapper : public nuraft::logger log->setLevel(toPocoLogLevel(static_cast(nuraft_log_level))); } - int get_level() override { return static_cast(nuraft_log_level); } + int get_level() override { return static_cast(nuraft_log_level.load()); } private: Poco::Logger * log; diff --git a/src/Service/NuRaftFileLogStore.cpp b/src/Service/NuRaftFileLogStore.cpp index c83383883a6..8da90640033 100644 --- a/src/Service/NuRaftFileLogStore.cpp +++ b/src/Service/NuRaftFileLogStore.cpp @@ -1,6 +1,3 @@ -#include -#include -#include #include #include @@ -42,25 +39,13 @@ void LogEntryQueue::clear() i = nullptr; } -NuRaftFileLogStore::NuRaftFileLogStore( - const String & log_dir, - bool force_new, - FsyncMode log_fsync_mode_, - UInt64 log_fsync_interval_, - UInt32 max_log_size_) - : log_fsync_mode(log_fsync_mode_), log_fsync_interval(log_fsync_interval_) +NuRaftFileLogStore::NuRaftFileLogStore( const String & log_dir, bool force_new, FsyncMode log_fsync_mode_, UInt64 log_fsync_interval_, UInt64 max_log_segment_file_size_) + : log_fsync_mode(log_fsync_mode_) + , log_fsync_interval(log_fsync_interval_) + , log(&Poco::Logger::get("FileLogStore")) { - log = &(Poco::Logger::get("FileLogStore")); - - if (log_fsync_mode == FsyncMode::FSYNC_PARALLEL) - { - parallel_fsync_event = std::make_shared(); - - fsync_thread = ThreadFromGlobalPool([this] { fsyncThread(); }); - } - - segment_store = LogSegmentStore::getInstance(log_dir, force_new); - segment_store->init(max_log_size_); + segment_store = LogSegmentStore::getInstance(log_dir, force_new, max_log_segment_file_size_); + segment_store->init(); if (segment_store->lastLogIndex() < 1) /// no log entry exists, return a dummy constant entry with value set to null and term set to zero @@ -69,6 +54,12 @@ NuRaftFileLogStore::NuRaftFileLogStore( last_log_entry = segment_store->getEntry(segment_store->lastLogIndex()); disk_last_durable_index = segment_store->lastLogIndex(); + + if (log_fsync_mode == FsyncMode::FSYNC_PARALLEL) + { + parallel_fsync_event = std::make_shared(); + fsync_thread = ThreadFromGlobalPool([this] { fsyncThread(); }); + } } void NuRaftFileLogStore::shutdown() @@ -99,8 +90,7 @@ void NuRaftFileLogStore::fsyncThread() { parallel_fsync_event->wait(); - UInt64 last_flush_index = segment_store->flush(); - if (last_flush_index) + if (UInt64 last_flush_index = segment_store->flush()) { disk_last_durable_index = last_flush_index; if (raft_instance) /// For test @@ -130,7 +120,7 @@ ptr NuRaftFileLogStore::last_entry() const ulong NuRaftFileLogStore::append(ptr & entry) { - ptr cloned = cloneLogEntry(entry); + const ptr cloned = cloneLogEntry(entry); UInt64 log_index = segment_store->appendEntry(entry); log_queue.putEntry(log_index, cloned); diff --git a/src/Service/NuRaftFileLogStore.h b/src/Service/NuRaftFileLogStore.h index 739ef46824b..54bdd074157 100644 --- a/src/Service/NuRaftFileLogStore.h +++ b/src/Service/NuRaftFileLogStore.h @@ -55,7 +55,7 @@ class NuRaftFileLogStore : public nuraft::log_store bool force_new = false, FsyncMode log_fsync_mode_ = FsyncMode::FSYNC_PARALLEL, UInt64 log_fsync_interval_ = 1000, - UInt32 max_log_size_ = LogSegmentStore::MAX_SEGMENT_FILE_SIZE); + UInt64 max_log_segment_file_size_ = LogSegmentStore::MAX_LOG_SEGMENT_FILE_SIZE); ~NuRaftFileLogStore() override; @@ -192,8 +192,6 @@ class NuRaftFileLogStore : public nuraft::log_store /// Thread used to flush log, only used in FSYNC_PARALLEL mode void fsyncThread(); - Poco::Logger * log; - /// Used to operate log in the store ptr segment_store; @@ -224,6 +222,8 @@ class NuRaftFileLogStore : public nuraft::log_store nuraft::ptr raft_instance; std::atomic shutdown_called{false}; + + Poco::Logger * log; }; } diff --git a/src/Service/NuRaftLogSegment.cpp b/src/Service/NuRaftLogSegment.cpp index ff0d3bae10f..5907b380751 100644 --- a/src/Service/NuRaftLogSegment.cpp +++ b/src/Service/NuRaftLogSegment.cpp @@ -535,19 +535,17 @@ bool NuRaftLogSegment::truncate(const UInt64 last_index_kept) return true; } -ptr LogSegmentStore::getInstance(const String & log_dir_, bool force_new) +ptr LogSegmentStore::getInstance(const String & log_dir_, bool force_new, UInt32 max_log_segment_file_size_) { static ptr segment_store; if (segment_store == nullptr || force_new) - segment_store = cs_new(log_dir_); + segment_store = cs_new(log_dir_, max_log_segment_file_size_); return segment_store; } -void LogSegmentStore::init(UInt32 max_segment_file_size_) +void LogSegmentStore::init() { - LOG_INFO(log, "Initializing log segment store, max segment file size {} bytes.", max_segment_file_size_); - - max_segment_file_size = max_segment_file_size_; + LOG_INFO(log, "Initializing log segment store with directory {}", log_dir); Poco::File(log_dir).createDirectories(); @@ -588,7 +586,7 @@ void LogSegmentStore::openNewSegmentIfNeeded() { { std::shared_lock read_lock(seg_mutex); - if (open_segment && open_segment->getFileSize() <= max_segment_file_size && open_segment->getVersion() >= CURRENT_LOG_VERSION) + if (open_segment && open_segment->getFileSize() <= max_log_segment_file_size && open_segment->getVersion() >= CURRENT_LOG_VERSION) return; } @@ -854,9 +852,9 @@ bool parseSegmentFileName(const String & file_name, UInt64 & first_index, UInt64 } else { + is_open = false; if (!tryReadUInt64Text(tokens[2], last_index)) return false; - is_open = true; } create_time = std::move(tokens[3]); diff --git a/src/Service/NuRaftLogSegment.h b/src/Service/NuRaftLogSegment.h index e9366fa410f..959d36cf740 100644 --- a/src/Service/NuRaftLogSegment.h +++ b/src/Service/NuRaftLogSegment.h @@ -160,7 +160,7 @@ class NuRaftLogSegment }; /** - * LogSegmentStore manages log segments and it uses segmented append-only file, all data + * LogSegmentStore manages log segments, it uses segmented append-only file, all data * in disk, all index in memory. Append one log entry, only cause one disk write, every * disk write will call fsync(). * @@ -173,23 +173,22 @@ class LogSegmentStore final public: using Segments = std::vector>; - static constexpr UInt32 MAX_SEGMENT_FILE_SIZE = 1000 * 1024 * 1024; /// 1GB, 0.3K/Log, 3M logs + static constexpr UInt64 MAX_LOG_SEGMENT_FILE_SIZE = 1024 * 1024 * 1024; /// 1GB, 0.3K/Log, 3M logs static constexpr size_t LOAD_THREAD_NUM = 8; - explicit LogSegmentStore(const String & log_dir_) + explicit LogSegmentStore(const String & log_dir_, UInt64 max_log_segment_file_size_ = MAX_LOG_SEGMENT_FILE_SIZE) : log_dir(log_dir_) , first_log_index(1) , last_log_index(0) - , max_segment_file_size(MAX_SEGMENT_FILE_SIZE) + , max_log_segment_file_size(max_log_segment_file_size_) , log(&Poco::Logger::get("LogSegmentStore")) { - LOG_INFO(log, "Create LogSegmentStore {}.", log_dir_); } - static ptr getInstance(const String & log_dir, bool force_new = false); + static ptr getInstance(const String & log_dir, bool force_new = false, UInt32 max_log_segment_file_size_ = MAX_LOG_SEGMENT_FILE_SIZE); /// Init log store, will create dir if not exist - void init(UInt32 max_segment_file_size_ = MAX_SEGMENT_FILE_SIZE); + void init(); void close(); /// Return last flushed log index @@ -250,7 +249,7 @@ class LogSegmentStore final std::atomic last_log_index; /// max segment file size - UInt32 max_segment_file_size; + UInt32 max_log_segment_file_size; Poco::Logger * log; diff --git a/src/Service/NuRaftStateManager.cpp b/src/Service/NuRaftStateManager.cpp index 6604f7c0f00..d5bc3365dd2 100644 --- a/src/Service/NuRaftStateManager.cpp +++ b/src/Service/NuRaftStateManager.cpp @@ -13,11 +13,17 @@ namespace RK using namespace nuraft; NuRaftStateManager::NuRaftStateManager(int32_t id_, const Poco::Util::AbstractConfiguration & config_, SettingsPtr settings_) - : settings(settings_), my_id(id_), my_host(settings_->host), my_internal_port(settings_->internal_port), log_dir(settings_->log_dir) + : settings(settings_) + , my_id(id_) + , my_host(settings_->host) + , my_internal_port(settings_->internal_port) + , log_dir(settings_->log_dir) + , log(&Poco::Logger::get("NuRaftStateManager")) { - log = &(Poco::Logger::get("NuRaftStateManager")); - curr_log_store - = cs_new(log_dir, false, settings->raft_settings->log_fsync_mode, settings->raft_settings->log_fsync_interval); + curr_log_store = cs_new(log_dir + , false, settings->raft_settings->log_fsync_mode + , settings->raft_settings->log_fsync_interval + , settings->raft_settings->max_log_segment_file_size); srv_state_file = fs::path(log_dir) / "srv_state"; cluster_config_file = fs::path(log_dir) / "cluster_config"; diff --git a/src/Service/NuRaftStateManager.h b/src/Service/NuRaftStateManager.h index 4a5c3672df3..a0a9073aef9 100644 --- a/src/Service/NuRaftStateManager.h +++ b/src/Service/NuRaftStateManager.h @@ -97,11 +97,11 @@ class NuRaftStateManager : public nuraft::state_mgr */ mutable std::mutex cluster_config_mutex; - protected: - Poco::Logger * log; String srv_state_file; String cluster_config_file; + + Poco::Logger * log; }; } diff --git a/src/Service/Settings.cpp b/src/Service/Settings.cpp index c9a450befbc..cba1072bb4f 100644 --- a/src/Service/Settings.cpp +++ b/src/Service/Settings.cpp @@ -88,6 +88,7 @@ void RaftSettings::loadFromConfig(const String & config_elem, const Poco::Util:: max_batch_size = config.getUInt(get_key("max_batch_size"), 1000); log_fsync_mode = FsyncModeNS::parseFsyncMode(config.getString(get_key("log_fsync_mode"), "fsync_parallel")); log_fsync_interval = config.getUInt(get_key("log_fsync_interval"), 1000); + max_log_segment_file_size = config.getUInt(get_key("max_log_segment_file_size"), 1073741824); async_snapshot = config.getBool(get_key("async_snapshot"), true); } catch (Exception & e) @@ -121,6 +122,7 @@ RaftSettingsPtr RaftSettings::getDefault() settings->configuration_change_tries_count = 30; settings->max_batch_size = 1000; settings->log_fsync_interval = 1000; + settings->max_log_segment_file_size = 1073741824; settings->log_fsync_mode = FsyncMode::FSYNC_PARALLEL; settings->async_snapshot = true; @@ -222,6 +224,9 @@ void Settings::dump(WriteBufferFromOwnString & buf) const buf.write('\n'); writeText("log_fsync_interval=", buf); write_int(raft_settings->log_fsync_interval); + buf.write('\n'); + writeText("max_log_segment_file_size=", buf); + write_int(raft_settings->max_log_segment_file_size); writeText("nuraft_thread_size=", buf); write_int(raft_settings->nuraft_thread_size); diff --git a/src/Service/Settings.h b/src/Service/Settings.h index c179d9aa221..448ebab9a59 100644 --- a/src/Service/Settings.h +++ b/src/Service/Settings.h @@ -11,7 +11,7 @@ namespace RK { /// Raft log fsync mode. -enum FsyncMode +enum class FsyncMode { /// The leader can do log replication and log persisting in parallel, thus it can reduce the latency of write operation path. /// In this mode data is safety. @@ -110,6 +110,8 @@ struct RaftSettings FsyncMode log_fsync_mode; /// How many logs do once fsync when async_fsync is false UInt64 log_fsync_interval; + /// We store logs in multiple file, this setting represent the max single log segment file size in bytes. + UInt64 max_log_segment_file_size; /// Whether async snapshot bool async_snapshot; diff --git a/src/Service/tests/gtest_raft_log.cpp b/src/Service/tests/gtest_raft_log.cpp index 91a6b1ba458..2e1a7f15d3c 100644 --- a/src/Service/tests/gtest_raft_log.cpp +++ b/src/Service/tests/gtest_raft_log.cpp @@ -157,8 +157,8 @@ TEST(RaftLog, splitSegment) { String log_dir(LOG_DIR + "/4"); cleanDirectory(log_dir); - auto log_store = LogSegmentStore::getInstance(log_dir, true); - ASSERT_NO_THROW(log_store->init(200)); //81 byte / log + auto log_store = LogSegmentStore::getInstance(log_dir, true, 200); //81 byte / log + ASSERT_NO_THROW(log_store->init()); for (int i = 0; i < 12; i++) { UInt64 term = 1; @@ -175,8 +175,8 @@ TEST(RaftLog, removeSegment) { String log_dir(LOG_DIR + "/5"); cleanDirectory(log_dir); - auto log_store = LogSegmentStore::getInstance(log_dir, true); - ASSERT_NO_THROW(log_store->init(200)); + auto log_store = LogSegmentStore::getInstance(log_dir, true, 200); + ASSERT_NO_THROW(log_store->init()); //5 segment for (int i = 0; i < 10; i++) { @@ -203,8 +203,8 @@ TEST(RaftLog, truncateLog) { String log_dir(LOG_DIR + "/6"); cleanDirectory(log_dir); - auto log_store = LogSegmentStore::getInstance(log_dir, true); - ASSERT_NO_THROW(log_store->init(200)); + auto log_store = LogSegmentStore::getInstance(log_dir, true, 200); + ASSERT_NO_THROW(log_store->init()); //8 segment, index 1-16 for (int i = 0; i < 16; i++) { @@ -373,8 +373,8 @@ TEST(RaftLog, getEntry) { String log_dir(LOG_DIR + "/7"); cleanDirectory(log_dir); - auto log_store = LogSegmentStore::getInstance(log_dir, true); - ASSERT_NO_THROW(log_store->init(100)); + auto log_store = LogSegmentStore::getInstance(log_dir, true, 100); + ASSERT_NO_THROW(log_store->init()); UInt64 term = 1; String key("/ck/table/table1"); String data("CREATE TABLE table1;"); @@ -403,8 +403,8 @@ TEST(RaftLog, getEntries) { String log_dir(LOG_DIR + "/8"); cleanDirectory(log_dir); - auto log_store = LogSegmentStore::getInstance(log_dir, true); - ASSERT_NO_THROW(log_store->init(250)); //69 * 4 = 276 + auto log_store = LogSegmentStore::getInstance(log_dir, true, 250); //69 * 4 = 276 + ASSERT_NO_THROW(log_store->init()); for (int i = 0; i < 8; i++) { UInt64 term = 1; diff --git a/tests/integration/test_four_word_command/test.py b/tests/integration/test_four_word_command/test.py index 48c87cdde27..b49c95ebb3b 100644 --- a/tests/integration/test_four_word_command/test.py +++ b/tests/integration/test_four_word_command/test.py @@ -310,6 +310,7 @@ def test_cmd_conf(started_cluster): assert result["log_fsync_mode"] == "fsync_parallel" assert result["log_fsync_interval"] == "1000" + assert result["max_log_segment_file_size"] == "1073741824" assert result["nuraft_thread_size"] == "32" assert result["fresh_log_gap"] == "200" diff --git a/tests/integration/test_persistent_log/configs/enable_keeper.xml b/tests/integration/test_persistent_log/configs/enable_keeper.xml index 17466eeeb4c..30613bb801c 100644 --- a/tests/integration/test_persistent_log/configs/enable_keeper.xml +++ b/tests/integration/test_persistent_log/configs/enable_keeper.xml @@ -24,6 +24,7 @@ 80000 1000 1000 + 100 diff --git a/tests/integration/test_persistent_log/test.py b/tests/integration/test_persistent_log/test.py index 32e9a649235..e7370eced3e 100644 --- a/tests/integration/test_persistent_log/test.py +++ b/tests/integration/test_persistent_log/test.py @@ -31,6 +31,26 @@ def started_cluster(): cluster.shutdown() +def test_open_and_closed_log_segment(started_cluster): + node_zk = None + try: + node_zk = node.get_fake_zk(session_timeout=120) + + # Will generate a closed log segment file and an open one + for i in range(150): + node_zk.create("/test_open_and_closed_log_segment/node" + str(i), b"") + + close_zk_clients([node_zk]) + + node.restart_raftkeeper(kill=True) + node.wait_for_join_cluster() + + # test node can successfully start + node_zk = node.get_fake_zk(session_timeout=120) + finally: + close_zk_clients([node_zk]) + + def test_state_after_restart(started_cluster): node_zk = node_zk2 = None try: @@ -133,3 +153,4 @@ def test_ephemeral_after_restart(started_cluster): assert node_zk2.get("/test_ephemeral_after_restart/node" + str(i))[0] == strs[i] finally: close_zk_clients([node_zk, node_zk2]) +