From 5543828260d2ffe56cd25c6e61ffec6a7676333e Mon Sep 17 00:00:00 2001 From: JackyWoo Date: Thu, 8 Aug 2024 18:36:16 +0800 Subject: [PATCH 1/4] Fail fast for log error --- src/Common/Exception.cpp | 1 - src/Common/Exception.h | 7 + src/Service/KeeperServer.cpp | 4 +- src/Service/NuRaftFileLogStore.cpp | 58 +- src/Service/NuRaftFileLogStore.h | 2 +- src/Service/NuRaftLogSegment.cpp | 1153 +++++++++--------------- src/Service/NuRaftLogSegment.h | 180 ++-- src/Service/NuRaftStateMachine.cpp | 2 +- src/Service/tests/gtest_raft_log.cpp | 27 +- src/Service/tests/raft_test_common.cpp | 1 - 10 files changed, 551 insertions(+), 884 deletions(-) diff --git a/src/Common/Exception.cpp b/src/Common/Exception.cpp index 1a9b9b5ef27..516b056e03a 100644 --- a/src/Common/Exception.cpp +++ b/src/Common/Exception.cpp @@ -101,7 +101,6 @@ std::string Exception::getStackTraceString() const #endif } - void throwFromErrno(const std::string & s, int code, int the_errno) { throw ErrnoException(s + ", " + errnoToString(code, the_errno), code, the_errno); diff --git a/src/Common/Exception.h b/src/Common/Exception.h index 99d11bf77a5..29663d7dd4c 100644 --- a/src/Common/Exception.h +++ b/src/Common/Exception.h @@ -7,6 +7,7 @@ #include #include +#include #include #include @@ -139,6 +140,12 @@ class ParsingException : public Exception using Exceptions = std::vector; +template +void throwFromErrno(int code, const std::string & fmt, Args&&... args) +{ + throw ErrnoException(fmt::format(fmt, std::forward(args)...) + ", " + errnoToString(code, errno), code, errno); +} + [[noreturn]] void throwFromErrno(const std::string & s, int code, int the_errno = errno); /// Useful to produce some extra information about available space and inodes on device [[noreturn]] void throwFromErrnoWithPath(const std::string & s, const std::string & path, int code, diff --git a/src/Service/KeeperServer.cpp b/src/Service/KeeperServer.cpp index 5aa17c4fa90..07934e18cb3 100644 --- a/src/Service/KeeperServer.cpp +++ b/src/Service/KeeperServer.cpp @@ -122,8 +122,8 @@ void KeeperServer::shutdown() LOG_WARNING(log, "Failed to shutdown NuRaft core in {}ms", settings->raft_settings->shutdown_timeout); LOG_INFO(log, "Flush Log store."); - if (state_manager->load_log_store() && !state_manager->load_log_store()->flush()) - LOG_WARNING(log, "Log store flush error while server shutdown."); + if (state_manager->load_log_store()) + state_manager->load_log_store()->flush(); dynamic_cast(*state_manager->load_log_store()).shutdown(); state_machine->shutdown(); diff --git a/src/Service/NuRaftFileLogStore.cpp b/src/Service/NuRaftFileLogStore.cpp index 7bf8ab6000a..6e78b231348 100644 --- a/src/Service/NuRaftFileLogStore.cpp +++ b/src/Service/NuRaftFileLogStore.cpp @@ -74,16 +74,7 @@ NuRaftFileLogStore::NuRaftFileLogStore( } segment_store = LogSegmentStore::getInstance(log_dir, force_new); - - if (segment_store->init(max_log_size_, max_segment_count_) >= 0) - { - LOG_INFO(log, "Init file log store, last log index {}, log dir {}", segment_store->lastLogIndex(), log_dir); - } - else - { - LOG_WARNING(log, "Init file log store failed, log dir {}", log_dir); - return; - } + segment_store->init(max_log_size_, max_segment_count_); if (segment_store->lastLogIndex() < 1) /// no log entry exists, return a dummy constant entry with value set to null and term set to zero @@ -217,40 +208,50 @@ ptr>> NuRaftFileLogStore::log_entries(ulong start, ul ptr>> NuRaftFileLogStore::log_entries_ext(ulong start, ulong end, int64 batch_size_hint_in_bytes) { ptr>> ret = cs_new>>(); + int64 get_size = 0; int64 entry_size; + for (auto i = start; i < end; i++) { - auto entry_ptr = entry_at(i); - entry_size = entry_ptr->get_buf().size() + sizeof(ulong) + sizeof(char); + auto entry = entry_at(i); + if (!entry) + return nullptr; + + entry_size = entry->get_buf().size() + sizeof(ulong) + sizeof(char); + if (batch_size_hint_in_bytes > 0 && get_size + entry_size > batch_size_hint_in_bytes) - { break; - } - ret->push_back(entry_ptr); + + ret->push_back(entry); get_size += entry_size; } - LOG_DEBUG(log, "log entries ext, start {} end {}, real size {}, max size {}", start, end, get_size, batch_size_hint_in_bytes); + return ret; } -ptr> NuRaftFileLogStore::log_entries_version_ext(ulong start, ulong end, int64 batch_size_hint_in_bytes) +ptr> NuRaftFileLogStore::log_entries_version_ext(ulong start, ulong end, int64 batch_size_hint_in_bytes) { - ptr> ret = cs_new>(); + ptr> ret = cs_new>(); + int64 get_size = 0; int64 entry_size; + for (auto i = start; i < end; i++) { - auto entry_ptr = entry_at(i); - entry_size = entry_ptr->get_buf().size() + sizeof(ulong) + sizeof(char); + auto entry = entry_at(i); + if (!entry) + return nullptr; + + entry_size = entry->get_buf().size() + sizeof(ulong) + sizeof(char); + if (batch_size_hint_in_bytes > 0 && get_size + entry_size > batch_size_hint_in_bytes) - { break; - } - ret->push_back({segment_store->getVersion(i), entry_ptr}); + + ret->push_back({segment_store->getVersion(i), entry}); get_size += entry_size; } - LOG_DEBUG(log, "log entries ext, start {} end {}, real size {}, max size {}", start, end, get_size, batch_size_hint_in_bytes); + return ret; } @@ -336,24 +337,23 @@ void NuRaftFileLogStore::apply_pack(ulong index, buffer & pack) bool NuRaftFileLogStore::compact(ulong last_log_index) { - segment_store->removeSegment(last_log_index + 1); + auto removed_count = segment_store->removeSegment(last_log_index + 1); log_queue.clear(); - LOG_DEBUG(log, "compact last_log_index {}", last_log_index); + LOG_DEBUG(log, "Compact log to {} and removed {} log segments", last_log_index, removed_count); return true; } bool NuRaftFileLogStore::flush() { - return segment_store->flush() > 0; + segment_store->flush(); + return true; } ulong NuRaftFileLogStore::last_durable_index() { uint64_t last_log = next_slot() - 1; if (log_fsync_mode != FsyncMode::FSYNC_PARALLEL) - { return last_log; - } return disk_last_durable_index; } diff --git a/src/Service/NuRaftFileLogStore.h b/src/Service/NuRaftFileLogStore.h index fc7d310f40c..79a19d3acf7 100644 --- a/src/Service/NuRaftFileLogStore.h +++ b/src/Service/NuRaftFileLogStore.h @@ -120,7 +120,7 @@ class NuRaftFileLogStore : public nuraft::log_store ptr>> log_entries_ext(ulong start, ulong end, int64 batch_size_hint_in_bytes = 0) override; /// Same with log_entries_ext, but return log entry with version info. - ptr> log_entries_version_ext(ulong start, ulong end, int64 batch_size_hint_in_bytes = 0); + ptr> log_entries_version_ext(ulong start, ulong end, int64 batch_size_hint_in_bytes = 0); /** * Get the log entry at the specified log index number. diff --git a/src/Service/NuRaftLogSegment.cpp b/src/Service/NuRaftLogSegment.cpp index a7bbb377168..f9554eb456c 100644 --- a/src/Service/NuRaftLogSegment.cpp +++ b/src/Service/NuRaftLogSegment.cpp @@ -1,6 +1,5 @@ #include #include -#include #include #include #include @@ -27,13 +26,17 @@ namespace ErrorCodes extern const int LOGICAL_ERROR; extern const int CANNOT_READ_FROM_FILE_DESCRIPTOR; extern const int CANNOT_WRITE_TO_FILE_DESCRIPTOR; + extern const int CANNOT_CLOSE_FILE; + extern const int CANNOT_OPEN_FILE; + extern const int FILE_DOESNT_EXIST; + extern const int CORRUPTED_LOG; } using namespace nuraft; -int ftruncateUninterrupted(int fd, off_t length) +[[maybe_unused]] int ftruncateUninterrupted(int fd, off_t length) { - int rc = 0; + int rc; do { rc = ftruncate(fd, length); @@ -41,16 +44,60 @@ int ftruncateUninterrupted(int fd, off_t length) return rc; } -bool compareSegment(ptr & seg1, ptr & seg2) +bool compareSegment(ptr & lhs, ptr & rhs) +{ + return lhs->firstIndex() < rhs->firstIndex(); +} + +NuRaftLogSegment::NuRaftLogSegment(const String & log_dir_, UInt64 first_index_) + : log_dir(log_dir_) + , first_index(first_index_) + , last_index(first_index_ - 1) + , is_open(true) + , log(&(Poco::Logger::get("NuRaftLogSegment"))) + , version(CURRENT_LOG_VERSION) +{ + LOG_INFO(log, "create new log segment, first index {}", first_index); + + Poco::DateTime now; + create_time = Poco::DateTimeFormatter::format(now, "%Y%m%d%H%M%S"); + + std::lock_guard write_lock(log_mutex); + + file_name = getOpenFileName(); + String full_path = getOpenPath(); + + if (Poco::File(full_path).exists()) + throw Exception(ErrorCodes::LOGICAL_ERROR, "Try to create a log segment but file {} already exists.", full_path); + + seg_fd = ::open(full_path.c_str(), O_RDWR | O_CREAT | O_TRUNC, 0644); + if (seg_fd == -1) + throwFromErrno(ErrorCodes::CANNOT_OPEN_FILE, "Fail to create new log segment {}", full_path); +} + +NuRaftLogSegment::NuRaftLogSegment(const String & log_dir_, UInt64 first_index_, UInt64 last_index_, const String & file_name_, const String & create_time_) + : log_dir(log_dir_) + , first_index(first_index_) + , last_index(last_index_) + , file_name(file_name_) + , create_time(create_time_) + , log(&(Poco::Logger::get("NuRaftLogSegment"))) +{ +} + +NuRaftLogSegment::NuRaftLogSegment(const String & log_dir_, UInt64 first_index_, const String & file_name_, const String & create_time_) + : log_dir(log_dir_) + , first_index(first_index_) + , last_index(first_index_ - 1) + , file_name(file_name_) + , create_time(create_time_) + , log(&(Poco::Logger::get("NuRaftLogSegment"))) { - return seg1->firstIndex() < seg2->firstIndex(); } String NuRaftLogSegment::getOpenFileName() { - char buf[1024]; - snprintf(buf, 1024, LOG_OPEN_FILE_NAME, first_index, create_time.c_str()); - return String(buf); + return fmt::format("log_{}_open_{}", first_index, create_time); } String NuRaftLogSegment::getOpenPath() @@ -60,29 +107,21 @@ String NuRaftLogSegment::getOpenPath() return path; } -String NuRaftLogSegment::getFinishFileName() +String NuRaftLogSegment::getClosedFileName() { - char buf[1024]; - snprintf(buf, 1024, LOG_FINISH_FILE_NAME, first_index, last_index.load(std::memory_order_relaxed), create_time.c_str()); - return String(buf); + return fmt::format("log_{}_{}_{}", first_index, last_index.load(std::memory_order_relaxed), create_time); } -String NuRaftLogSegment::getFinishPath() +String NuRaftLogSegment::getClosedPath() { String path(log_dir); - path += "/" + getFinishFileName(); + path += "/" + getClosedFileName(); return path; } String NuRaftLogSegment::getFileName() { - if (!file_name.empty()) - return file_name; - - if (is_open) - return getOpenFileName(); - else - return getFinishFileName(); + return file_name; } String NuRaftLogSegment::getPath() @@ -90,72 +129,40 @@ String NuRaftLogSegment::getPath() return log_dir + "/" + getFileName(); } -int NuRaftLogSegment::openFile() +void NuRaftLogSegment::openFileIfNeeded() { - if (seg_fd > 0) - { - return 0; - } + if (seg_fd != -1) + return; + + LOG_INFO(log, "Opening log segment file {}", file_name); + String full_path = getPath(); if (!Poco::File(full_path).exists()) - { - LOG_ERROR(log, "File path {} is not exists.", full_path); - return -1; - } - errno = 0; + throw Exception(ErrorCodes::FILE_DOESNT_EXIST, "Log segment file {} does not exist.", file_name); + seg_fd = ::open(full_path.c_str(), O_RDWR); - if (seg_fd < 0) - { - LOG_ERROR(log, "Fail to open {}, error:{}", full_path, strerror(errno)); - return -1; - } - LOG_INFO(log, "Open segment for read/write, path {}", full_path); - return 0; + if (seg_fd == -1) + throwFromErrno(ErrorCodes::CANNOT_OPEN_FILE, "Fail to open log segment file {}", file_name); } -int NuRaftLogSegment::closeFile() +void NuRaftLogSegment::closeFileIfNeeded() { - if (seg_fd >= 0) + LOG_INFO(log, "Closing log segment file {}", file_name); + if (seg_fd != -1) { - ::close(seg_fd); + if (::close(seg_fd) != 0) + throwFromErrno(ErrorCodes::CANNOT_CLOSE_FILE, "Error when closing a log segment file"); seg_fd = -1; } - return 0; } -int NuRaftLogSegment::create() +void NuRaftLogSegment::writeHeader() { if (!is_open) - { - LOG_WARNING(log, "Create on a closed segment at first_index={} in {}", first_index, log_dir); - return -1; - } - std::lock_guard write_lock(log_mutex); - file_name = getOpenFileName(); - String full_path = getOpenPath(); - if (Poco::File(full_path).exists()) - { - LOG_ERROR(log, "File {} is exists.", full_path); - return -1; - } - errno = 0; - seg_fd = ::open(full_path.c_str(), O_RDWR | O_CREAT | O_TRUNC, 0644); - if (seg_fd < 0) - { - LOG_WARNING(log, "Created new segment {} failed, fd {}, error:{}", full_path, seg_fd, strerror(errno)); - return -1; - } - LOG_INFO(log, "Created new segment {}, seg_fd {}, first index {}", full_path, seg_fd, first_index); - return 0; -} - -void NuRaftLogSegment::writeFileHeader() -{ - if (!is_open) - throw Exception(ErrorCodes::LOGICAL_ERROR, "Log segment not open yet"); + throw Exception(ErrorCodes::LOGICAL_ERROR, "Log segment {} not open yet", file_name); if (seg_fd < 0) - throw Exception(ErrorCodes::LOGICAL_ERROR, "File not open yet"); + throw Exception(ErrorCodes::LOGICAL_ERROR, "File {} not open yet", file_name); union { @@ -167,148 +174,105 @@ void NuRaftLogSegment::writeFileHeader() auto version_uint8 = static_cast(version); if (write(seg_fd, &magic_num, 8) != 8) - throw Exception(ErrorCodes::CANNOT_WRITE_TO_FILE_DESCRIPTOR, "Cannot write magic to file descriptor"); + throwFromErrno(ErrorCodes::CANNOT_WRITE_TO_FILE_DESCRIPTOR, "Cannot write magic to {}", file_name); if (write(seg_fd, &version_uint8, 1) != 1) - throw Exception(ErrorCodes::CANNOT_WRITE_TO_FILE_DESCRIPTOR, "Cannot write version to file descriptor"); + throwFromErrno(ErrorCodes::CANNOT_WRITE_TO_FILE_DESCRIPTOR, "Cannot write version to {}", file_name); file_size.fetch_add(sizeof(uint64_t) + sizeof(uint8_t), std::memory_order_release); } -int NuRaftLogSegment::load() +void NuRaftLogSegment::load() { - int ret = 0; - - if (openFile() != 0) - return -1; + openFileIfNeeded(); /// get file size struct stat st_buf; - errno = 0; - if (fstat(seg_fd, &st_buf) != 0) - { - LOG_ERROR(log, "Fail to get the stat, error:{}", strerror(errno)); - ::close(seg_fd); - seg_fd = -1; - return -1; - } + throwFromErrno(ErrorCodes::CANNOT_READ_FROM_FILE_DESCRIPTOR, "Fail to get the stat of log segment file {}", file_name); - /// load entry index - file_size = st_buf.st_size; + size_t file_size_read = st_buf.st_size; - size_t entry_off = loadVersion(); - UInt64 actual_last_index = first_index - 1; + /// load header + readHeader(); + size_t entry_off = version == LogVersion::V0 ? 0 : MAGIC_AND_VERSION_SIZE; - for (; entry_off < file_size;) + /// load log entry + UInt64 last_index_read = first_index - 1; + for (; entry_off < file_size_read;) { LogEntryHeader header; - const int rc = loadLogEntryHeader(seg_fd, entry_off, &header); - if (rc != 0) - { - ret = rc; - break; - } + loadLogEntryHeader(seg_fd, entry_off, header); - /// rc == 0 - const UInt64 skip_len = sizeof(LogEntryHeader) + header.data_length; + const UInt64 log_entry_len = sizeof(LogEntryHeader) + header.data_length; - if (entry_off + skip_len > file_size) - { - /// The last log was not completely written and it should be - /// truncated - ret = -1; - break; - } + if (entry_off + log_entry_len > file_size_read) + throw Exception(ErrorCodes::CORRUPTED_LOG, "Corrupted log segment file {}.", file_name); offset_term.push_back(std::make_pair(entry_off, header.term)); - ++actual_last_index; - entry_off += skip_len; + ++last_index_read; + entry_off += log_entry_len; - if (actual_last_index << 44 == 0) + if (last_index_read << 20 == 0) { LOG_DEBUG( log, - "Load log segment, entry_off {}, skip_len {}, file_size {}, actual_last_index {}", + "Load log segment {}, entry_off {}, log_entry_len {}, file_size {}, log_index {}", + file_name, entry_off, - skip_len, - file_size, - actual_last_index); + log_entry_len, + file_size_read, + last_index_read); } } const UInt64 curr_last_index = last_index.load(std::memory_order_relaxed); - if (ret == 0 && !is_open) + if (!is_open) { - if (actual_last_index < curr_last_index) - { - LOG_ERROR( - log, - "Data lost in a full segment, directory {}, first index {}, expect last index {}, actual last index {}", - log_dir, - first_index, - curr_last_index, - actual_last_index); - ret = -1; - } - else if (actual_last_index > curr_last_index) - { - LOG_ERROR( - log, - "Found garbage in a full segment, directory {}, first index {}, expect last index {}, actual last index {} ", - log_dir, - first_index, - last_index, - actual_last_index); - ret = -1; - } + if (last_index_read != curr_last_index) + throw Exception( + ErrorCodes::CORRUPTED_LOG, + "Corrupted log segment {}, last_index_read {}, last_index {}", + file_name, + last_index_read, + curr_last_index); } - - if (ret != 0) - return ret; - - if (is_open) + else { - LOG_INFO(log, "Open segment last_index {}.", actual_last_index); - last_index = actual_last_index; + LOG_INFO(log, "Read last log index {} for an open segment {}.", last_index_read, file_name); + last_index = last_index_read; } - /// truncate last uncompleted entry - if (entry_off != file_size) + if (entry_off != file_size_read) { - LOG_INFO( - log, - "Truncate last uncompleted write entry, directory {}, first_index {}, old size {}, new size {} ", - log_dir, - first_index, - file_size, - entry_off); - ret = ftruncateUninterrupted(seg_fd, entry_off); + throw Exception( + ErrorCodes::CORRUPTED_LOG, + "{} is corrupted, entry_off {} != file_size {}, maybe the last log entry is incomplete.", + file_name, + entry_off, + file_size_read); + /// ftruncateUninterrupted(seg_fd, entry_off); } file_size = entry_off; + /// seek to end of file if it is open if (is_open) ::lseek(seg_fd, entry_off, SEEK_SET); - - return ret; } -size_t NuRaftLogSegment::loadVersion() +void NuRaftLogSegment::readHeader() { if (seg_fd < 0) - throw Exception(ErrorCodes::LOGICAL_ERROR, "File not open yet"); - - /// magic + version. 9 bytes - ptr buf = buffer::alloc(9); + throw Exception(ErrorCodes::LOGICAL_ERROR, "File {} not open yet", file_name); + ptr buf = buffer::alloc(MAGIC_AND_VERSION_SIZE); buf->pos(0); - errno = 0; - ssize_t ret = pread(seg_fd, buf->data(), 9, 0); - if (ret != 9) - throw Exception(ErrorCodes::CANNOT_READ_FROM_FILE_DESCRIPTOR, "Cannot read from file descriptor"); + ssize_t size_read = pread(seg_fd, buf->data(), MAGIC_AND_VERSION_SIZE, 0); + if (size_read != MAGIC_AND_VERSION_SIZE) + throw Exception(ErrorCodes::CORRUPTED_LOG, "Corrupted log segment file {}.", file_name); buffer_serializer bs(buf); bs.pos(0); @@ -323,135 +287,101 @@ size_t NuRaftLogSegment::loadVersion() if (magic == magic_num) { version = static_cast(bs.get_u8()); - LOG_INFO(log, "Magic num is {}, version {}", magic_num, version); - return 9; } else { - LOG_INFO(log, "Not have magic num, set version V0"); + LOG_INFO(log, "{} does not have magic num, its version is V0", file_name); version = LogVersion::V0; - return 0; } } -int NuRaftLogSegment::close(bool is_full) +void NuRaftLogSegment::close(bool is_full) { std::lock_guard write_lock(log_mutex); - if (!is_open) - return 0; + closeFileIfNeeded(); - int ret = closeFile(); - - if (ret) - return ret; + if (!is_open) + return; if (is_full) { String old_path = getOpenPath(); - String new_path = getFinishPath(); + String new_path = getClosedPath(); - LOG_INFO( - log, - "Close a full segment. Current first index {}, last index {}, renamed {} to {}.", - first_index, - last_index, - old_path, - new_path); + LOG_INFO(log, "Closing a full segment {} and rename it to {}.", getOpenFileName(), getClosedFileName()); - is_open = false; Poco::File(old_path).renameTo(new_path); - file_name = getFinishFileName(); - return 0; + file_name = getClosedFileName(); } - return 0; + + is_open = false; } UInt64 NuRaftLogSegment::flush() const { - if (seg_fd >= 0) - { - std::lock_guard write_lock(log_mutex); + std::lock_guard write_lock(log_mutex); - int ret; + int ret; #if defined(OS_DARWIN) - ret = ::fsync(seg_fd); + ret = ::fsync(seg_fd); #else - ret = ::fdatasync(seg_fd); + ret = ::fdatasync(seg_fd); #endif - if (ret == -1) - LOG_ERROR(log, "log fsync error error no {}", errno); - else if (ret == 0) - return last_index; /// return last_index - } - return 0; + if (ret == -1) + throwFromErrno(ErrorCodes::CANNOT_WRITE_TO_FILE_DESCRIPTOR, "Fail to flush log segment {}", file_name); + + return last_index; } -int NuRaftLogSegment::remove() +void NuRaftLogSegment::remove() { std::lock_guard write_lock(log_mutex); - closeFile(); + closeFileIfNeeded(); String full_path = getPath(); - Poco::File file_obj(full_path); - if (file_obj.exists()) - { - LOG_INFO(log, "Remove log segment {}", full_path); - file_obj.remove(); - } - return 0; + Poco::File f(full_path); + if (f.exists()) + f.remove(); } UInt64 NuRaftLogSegment::appendEntry(ptr entry, std::atomic & last_log_index) { LogEntryHeader header; - ptr entry_buf; - - char * entry_str; - size_t buf_size = 0; - struct iovec vec[2]; + { - if (!entry || !is_open) - return -1; + if (!is_open || seg_fd == -1) + throw Exception(ErrorCodes::LOGICAL_ERROR, "Append log but segment {} is not open.", file_name); + + ptr entry_buf; + char * data_in_buf; entry_buf = LogEntryBody::serialize(entry); - buf_size = entry_buf->size(); - entry_str = reinterpret_cast(entry_buf->data_begin()); + data_in_buf = reinterpret_cast(entry_buf->data_begin()); - if (entry_str == nullptr || buf_size == 0) - { - LOG_ERROR(log, "Can't get entry string buffer, size is {}.", buf_size); - return -1; - } + size_t data_size = entry_buf->size(); + + if (data_in_buf == nullptr || data_size == 0) + throw Exception(ErrorCodes::LOGICAL_ERROR, "Append log but it is empty"); - if (seg_fd < 0) - { - LOG_ERROR(log, "seg fs is null."); - return -1; - } header.term = entry->get_term(); - header.data_length = buf_size; - header.data_crc = RK::getCRC32(entry_str, header.data_length); + header.data_length = data_size; + header.data_crc = RK::getCRC32(data_in_buf, header.data_length); vec[0].iov_base = &header; vec[0].iov_len = LogEntryHeader::HEADER_SIZE; - vec[1].iov_base = reinterpret_cast(entry_str); + vec[1].iov_base = reinterpret_cast(data_in_buf); vec[1].iov_len = header.data_length; } - errno = 0; - { std::lock_guard write_lock(log_mutex); header.index = last_index.load(std::memory_order_acquire) + 1; - ssize_t ret = writev(seg_fd, vec, 2); + ssize_t size_written = writev(seg_fd, vec, 2); - if (ret < 0 || ret != static_cast(vec[0].iov_len + vec[1].iov_len)) - { - LOG_WARNING(log, "Write {}, real size {}, error:{}", ret, vec[0].iov_len + vec[1].iov_len, strerror(errno)); - return -1; - } + if (size_written != static_cast(vec[0].iov_len + vec[1].iov_len)) + throwFromErrno(ErrorCodes::CANNOT_WRITE_TO_FILE_DESCRIPTOR, "Fail to append log entry to {}", file_name); offset_term.push_back(std::make_pair(file_size.load(std::memory_order_relaxed), entry->get_term())); file_size.fetch_add(LogEntryHeader::HEADER_SIZE + header.data_length, std::memory_order_release); @@ -462,7 +392,7 @@ UInt64 NuRaftLogSegment::appendEntry(ptr entry, std::atomic & LOG_TRACE( log, - "Append term {}, index {}, length {}, crc {}, file {}, entry type {}.", + "Append log term {}, index {}, length {}, crc {}, file {}, entry type {}.", header.term, header.index, header.data_length, @@ -473,19 +403,11 @@ UInt64 NuRaftLogSegment::appendEntry(ptr entry, std::atomic & return header.index; } -[[maybe_unused]] int NuRaftLogSegment::writeAt(UInt64 index, const ptr entry) -{ - LOG_TRACE(log, "Write at term {}, index {}", entry->get_term(), index); - return 0; -} -int NuRaftLogSegment::getMeta(UInt64 index, LogMeta * meta) const +void NuRaftLogSegment::getMeta(UInt64 index, LogMeta & meta) const { if (last_index == first_index - 1 || index > last_index.load(std::memory_order_relaxed) || index < first_index) - { - LOG_WARNING(log, "current_index={}, last_index={}, first_index={}", index, last_index.load(std::memory_order_relaxed), first_index); - return -1; - } + throw Exception(ErrorCodes::LOGICAL_ERROR, "Get meta for log {} failed, index out of range [{},{}].", index, first_index, last_index.load(std::memory_order_relaxed)); UInt64 meta_index = index - first_index; UInt64 entry_offset = offset_term[meta_index].first; @@ -497,213 +419,150 @@ int NuRaftLogSegment::getMeta(UInt64 index, LogMeta * meta) const else next_offset = file_size; - meta->offset = entry_offset; - meta->term = offset_term[meta_index].second; - meta->length = next_offset - entry_offset; - - LOG_TRACE(log, "Get meta offset {}, term {}, length {}.", meta->offset, meta->term, meta->length); - return 0; + meta.offset = entry_offset; + meta.term = offset_term[meta_index].second; + meta.length = next_offset - entry_offset; } -int NuRaftLogSegment::loadLogEntryHeader(int fd, off_t offset, LogEntryHeader * header) const +void NuRaftLogSegment::loadLogEntryHeader(int fd, off_t offset, LogEntryHeader & header) const { - if (header == nullptr) - return -1; - ptr buf = buffer::alloc(LogEntryHeader::HEADER_SIZE); buf->pos(0); - errno = 0; - ssize_t ret = pread(fd, buf->data(), LogEntryHeader::HEADER_SIZE, offset); + ssize_t size = pread(fd, buf->data(), LogEntryHeader::HEADER_SIZE, offset); - if (ret != LogEntryHeader::HEADER_SIZE) - { - LOG_ERROR( - log, - "Read log entry header failed, offset {}, header size {}, ret:{}, error:{}.", - offset, - LogEntryHeader::HEADER_SIZE, - ret, - strerror(errno)); - return -1; - } + if (size != LogEntryHeader::HEADER_SIZE) + throwFromErrno(ErrorCodes::CANNOT_READ_FROM_FILE_DESCRIPTOR, "Fail to read header of log segment {}", file_name); buffer_serializer bs(buf); bs.pos(0); - header->term = bs.get_u64(); - header->index = bs.get_u64(); + header.term = bs.get_u64(); + header.index = bs.get_u64(); - header->data_length = bs.get_u32(); - header->data_crc = bs.get_u32(); - - return 0; + header.data_length = bs.get_u32(); + header.data_crc = bs.get_u32(); } -int NuRaftLogSegment::loadLogEntry(int fd, off_t offset, LogEntryHeader * head, ptr & entry) const +void NuRaftLogSegment::loadLogEntry(int fd, off_t offset, LogEntryHeader & header, ptr & entry) const { - if (loadLogEntryHeader(fd, offset, head) != 0) - return -1; + loadLogEntryHeader(fd, offset, header); - char * entry_str = new char[head->data_length]; + char * entry_str = new char[header.data_length]; + ssize_t ret = pread(fd, entry_str, header.data_length, offset + LogEntryHeader::HEADER_SIZE); - errno = 0; - ssize_t ret = pread(fd, entry_str, head->data_length, offset + LogEntryHeader::HEADER_SIZE); + if (ret != header.data_length) + throwFromErrno(ErrorCodes::CORRUPTED_LOG, "Fail to read log entry with offset {} from log segment {}", offset, file_name); - if (ret < 0 || ret != head->data_length) - { - LOG_ERROR(log, "Can't read app data from log segment, ret:{}, error:{}.", ret, strerror(errno)); - delete[] entry_str; - return -1; - } - - if (!verifyCRC32(entry_str, head->data_length, head->data_crc)) - { - LOG_ERROR( - log, - "Found corrupted data at offset {}, term {}, index {}, length {}, crc {}, file {}", - offset, - head->term, - head->index, - head->data_length, - head->data_crc, - file_name); - delete[] entry_str; - return -1; - } + if (!verifyCRC32(entry_str, header.data_length, header.data_crc)) + throw Exception(ErrorCodes::CORRUPTED_LOG, "Checking CRC32 failed for log segment {}.", file_name); - entry = LogEntryBody::parse(entry_str, head->data_length); - entry->set_term(head->term); + entry = LogEntryBody::parse(entry_str, header.data_length); + entry->set_term(header.term); delete[] entry_str; - return 0; } ptr NuRaftLogSegment::getEntry(UInt64 index) { { std::lock_guard write_lock(log_mutex); - if (openFile() != 0) - return nullptr; + openFileIfNeeded(); } std::shared_lock read_lock(log_mutex); LogMeta meta; + getMeta(index, meta); - if (getMeta(index, &meta) != 0) - return nullptr; - - bool ok = true; ptr entry; - - do - { - LogEntryHeader header; - size_t offset = meta.offset; - - if (loadLogEntry(seg_fd, offset, &header, entry) != 0) - { - LOG_WARNING(log, "Get entry failed, path {}, index {}, offset {}.", getPath(), index, offset); - ok = false; - break; - } - } while (false); - - if (!ok && entry != nullptr) - entry = nullptr; + LogEntryHeader header; + loadLogEntry(seg_fd, meta.offset, header, entry); return entry; } - -UInt64 NuRaftLogSegment::getTerm(UInt64 index) const +[[maybe_unused]] UInt64 NuRaftLogSegment::getTerm(UInt64 index) const { LogMeta meta; - if (getMeta(index, &meta) != 0) - { - return 0; - } + getMeta(index, meta); return meta.term; } -int NuRaftLogSegment::truncate(const UInt64 last_index_kept) +bool NuRaftLogSegment::truncate(const UInt64 last_index_kept) { - UInt64 truncate_size = 0; - UInt64 first_truncate_in_offset = 0; + UInt64 file_size_to_keep = 0; + UInt64 first_log_offset_to_truncate = 0; + + /// Truncate on a full segment need to rename back to open segment again, + /// because the node may crash before truncate. + auto reopen_closed_segment = [this]() + { + if (!is_open) + { + LOG_INFO( + log, + "Truncate a closed segment, should re-open it. Current first index {}, last index {}, rename file from {} to {}.", + first_index, + last_index, + getClosedFileName(), + getOpenFileName()); + + closeFileIfNeeded(); + + String old_path = getClosedPath(); + String new_path = getOpenPath(); + + Poco::File(old_path).renameTo(new_path); + file_name = getOpenFileName(); + + openFileIfNeeded(); + + is_open = true; + } + }; { std::lock_guard write_lock(log_mutex); if (last_index <= last_index_kept) { - LOG_INFO(log, "truncate nothing, last_index {}, last_index_kept {}", last_index, last_index_kept); - return 0; + LOG_INFO(log, "Log segment {} truncates nothing, last_index {}, last_index_kept {}", file_name, last_index, last_index_kept); + reopen_closed_segment(); + return false; } - first_truncate_in_offset = last_index_kept + 1 - first_index; - truncate_size = offset_term[first_truncate_in_offset].first; + first_log_offset_to_truncate = last_index_kept + 1 - first_index; + file_size_to_keep = offset_term[first_log_offset_to_truncate].first; LOG_INFO( log, "Truncating {}, offset {}, first_index {}, last_index from {} to {}, truncate_size to {} ", - getFileName(), - first_truncate_in_offset, + file_name, + first_log_offset_to_truncate, first_index, last_index, last_index_kept, - truncate_size); - } - - /// Truncate on a full segment need to rename back to open segment again, - /// because the node may crash before truncate. - if (!is_open) - { - String old_path = getFinishPath(); - String new_path = getOpenPath(); - - LOG_INFO( - log, - "Truncate segment closed and reopen. Current first index {}, last index {}, renamed {} to {}.", - first_index, - last_index, - old_path, - new_path); - - Poco::File(old_path).renameTo(new_path); - file_name = getOpenFileName(); - - is_open = true; + file_size_to_keep); } - openFile(); + reopen_closed_segment(); - errno = 0; - int ret = ftruncate(seg_fd, truncate_size); + if (ftruncate(seg_fd, file_size_to_keep) != 0) + throwFromErrno(ErrorCodes::CANNOT_WRITE_TO_FILE_DESCRIPTOR, "Fail to truncate log segment {}", file_name); - if (ret != 0) - { - LOG_INFO(log, "Truncate failed errno {}, msg {}", errno, strerror(errno)); - return ret; - } - - LOG_INFO(log, "Truncate file {} descriptor {}, from {} to size {}", getOpenPath(), seg_fd, file_size, truncate_size); + LOG_INFO(log, "Truncate file {} descriptor {}, from {} to size {}", getOpenPath(), seg_fd, file_size, file_size_to_keep); /// seek fd - off_t ret_off = lseek(seg_fd, truncate_size, SEEK_SET); + off_t ret_off = lseek(seg_fd, file_size_to_keep, SEEK_SET); if (ret_off < 0) - { - LOG_ERROR(log, "Fail to lseek fd {} to size {}, path {}.", seg_fd, truncate_size, getOpenPath()); - ret = ret_off; - } - else - { - std::lock_guard write_lock(log_mutex); - offset_term.resize(first_truncate_in_offset); - last_index.store(last_index_kept, std::memory_order_release); - file_size = truncate_size; - } + throwFromErrno(ErrorCodes::CANNOT_WRITE_TO_FILE_DESCRIPTOR, "Fail to seek to {} for log segment {}", file_size_to_keep, file_name); - return ret; + std::lock_guard write_lock(log_mutex); + offset_term.resize(first_log_offset_to_truncate); + last_index.store(last_index_kept, std::memory_order_release); + file_size = file_size_to_keep; + + return true; } ptr LogSegmentStore::getInstance(const String & log_dir_, bool force_new) @@ -714,11 +573,11 @@ ptr LogSegmentStore::getInstance(const String & log_dir_, bool return segment_store; } -int LogSegmentStore::init(UInt32 max_segment_file_size_, UInt32 max_segment_count_) +void LogSegmentStore::init(UInt32 max_segment_file_size_, UInt32 max_segment_count_) { LOG_INFO( log, - "Begin init log segment store, max segment file size {} bytes, max segment count {}.", + "Initializing log segment store, max segment file size {} bytes, max segment count {}.", max_segment_file_size_, max_segment_count_); @@ -727,154 +586,103 @@ int LogSegmentStore::init(UInt32 max_segment_file_size_, UInt32 max_segment_coun Poco::File(log_dir).createDirectories(); - int ret = 0; - first_log_index.store(1); last_log_index.store(0); open_segment = nullptr; - do - { - ret = listSegments(); - if (ret != 0) - { - LOG_WARNING(log, "List segments failed, error code {}.", ret); - break; - } - ret = loadSegments(); - if (ret != 0) - { - LOG_WARNING(log, "Load segments failed, error code {}.", ret); - break; - } - ret = openSegment(); - if (ret != 0) - { - LOG_WARNING(log, "Open segment failed, error code {}", ret); - break; - } - } while (false); - - return ret; + loadSegmentMetaData(); + loadSegments(); + openNewSegmentIfNeeded(); } -int LogSegmentStore::close() +void LogSegmentStore::close() { + std::lock_guard write_lock(seg_mutex); + if (open_segment) { - std::lock_guard write_lock(seg_mutex); open_segment->close(false); open_segment = nullptr; } - return 0; + + /// When we getEntry from closed segments, we may open it. + for (auto & segment : closed_segments) + segment->close(false); } UInt64 LogSegmentStore::flush() { + std::lock_guard shared_lock(seg_mutex); if (open_segment) - { - std::lock_guard write_lock(seg_mutex); return open_segment->flush(); - } - return 0; + throw Exception(ErrorCodes::LOGICAL_ERROR, "Flush log segment store failed, open segment is nullptr."); } -int LogSegmentStore::openSegment() +void LogSegmentStore::openNewSegmentIfNeeded() { { std::shared_lock read_lock(seg_mutex); if (open_segment && open_segment->getFileSize() <= max_segment_file_size && open_segment->getVersion() >= CURRENT_LOG_VERSION) - return 0; + return; } std::lock_guard write_lock(seg_mutex); if (open_segment) { open_segment->close(true); - segments.push_back(open_segment); + closed_segments.push_back(open_segment); open_segment = nullptr; } UInt64 next_idx = last_log_index.load(std::memory_order_acquire) + 1; - ptr seg = cs_new(log_dir, next_idx); - - open_segment = seg; - if (open_segment->create() != 0) - { - LOG_ERROR(log, "Create open segment directory {} index {} failed.", log_dir, next_idx); - open_segment = nullptr; - return -1; - } + ptr new_seg = cs_new(log_dir, next_idx); - try - { - open_segment->writeFileHeader(); - } - catch (...) - { - open_segment = nullptr; - return -1; - } - - return 0; + open_segment = new_seg; + open_segment->writeHeader(); } -int LogSegmentStore::getSegment(UInt64 index, ptr & seg) +ptr LogSegmentStore::getSegment(UInt64 index) { - seg = nullptr; UInt64 first_index = first_log_index.load(std::memory_order_acquire); UInt64 last_index = last_log_index.load(std::memory_order_acquire); - if (first_index == last_index + 1) // TODO Right? - { - LOG_WARNING(log, "Log segment store no data, entry index {}.", index); - return -1; - } + /// No log + if (first_index < last_index) + return nullptr; if (index < first_index || index > last_index) { - LOG_WARNING(log, "Attempted to access entry {} outside of log, index range [{}, {}].", index, first_index, last_index); - return -1; + LOG_WARNING(log, "Attempted to access log {} who is outside of range [{}, {}].", index, first_index, last_index); + return nullptr; } + ptr seg; if (open_segment && index >= open_segment->firstIndex()) { seg = open_segment; } else { - for (auto & segment : segments) + for (auto & segment : closed_segments) { - ptr seg_it = segment; - if (index >= seg_it->firstIndex() && index <= seg_it->lastIndex()) - { - LOG_TRACE(log, "segment index range [{}, {}].", seg_it->firstIndex(), seg_it->lastIndex()); - seg = seg_it; - } + if (index >= segment->firstIndex() && index <= segment->lastIndex()) + seg = segment; } } - if (seg != nullptr) - return 0; - else - return -1; + return seg; } LogVersion LogSegmentStore::getVersion(UInt64 index) { - ptr seg; - getSegment(index, seg); + ptr seg = getSegment(index); return seg->getVersion(); } UInt64 LogSegmentStore::appendEntry(ptr entry) { - if (openSegment() != 0) - { - LOG_INFO(log, "Open segment failed."); - return -1; - } + openNewSegmentIfNeeded(); std::shared_lock read_lock(seg_mutex); return open_segment->appendEntry(entry, last_log_index); } @@ -891,13 +699,10 @@ UInt64 LogSegmentStore::writeAt(UInt64 index, ptr entry) ptr LogSegmentStore::getEntry(UInt64 index) { - ptr seg; std::shared_lock read_lock(seg_mutex); - if (getSegment(index, seg) != 0) - { - LOG_WARNING(log, "Can't find log segmtnt by index {}.", index); + ptr seg = getSegment(index); + if (!seg) return nullptr; - } return seg->getEntry(index); } @@ -915,78 +720,40 @@ void LogSegmentStore::getEntries(UInt64 start_index, UInt64 end_index, ptr>> & entries) -{ - if (entries == nullptr) - { - LOG_ERROR(log, "Entry vector is nullptr."); - return; - } - - int64 get_size = 0; - int64 entry_size = 0; - - for (UInt64 index = start_index; index <= end_index; index++) - { - auto entry_pt = getEntry(index); - entry_size = entry_pt->get_buf().size() + sizeof(ulong) + sizeof(char); - - if (get_size + entry_size > batch_size_hint_in_bytes) - break; - - entries->push_back(entry_pt); - get_size += entry_size; - } -} - -[[maybe_unused]] UInt64 LogSegmentStore::getTerm(UInt64 index) -{ - ptr seg; - if (getSegment(index, seg) != 0) - { - return 0; - } - return seg->getTerm(index); -} - int LogSegmentStore::removeSegment(UInt64 first_index_kept) { if (first_log_index.load(std::memory_order_acquire) >= first_index_kept) { LOG_INFO( log, - "Nothing is going to happen since first_log_index {} >= first_index_kept {}", + "Remove 0 log segments, since first_log_index {} >= first_index_kept {}", first_log_index.load(std::memory_order_relaxed), first_index_kept); return 0; } + std::vector> to_be_removed; { std::lock_guard write_lock(seg_mutex); - std::vector> to_be_removed; + first_log_index.store(first_index_kept, std::memory_order_release); + for (auto it = closed_segments.begin(); it != closed_segments.end();) { - first_log_index.store(first_index_kept, std::memory_order_release); - for (auto it = segments.begin(); it != segments.end();) + ptr & segment = *it; + if (segment->lastIndex() < first_index_kept) { - ptr & segment = *it; - if (segment->lastIndex() < first_index_kept) - { - to_be_removed.push_back(segment); - it = segments.erase(it); - } - else + to_be_removed.push_back(segment); + it = closed_segments.erase(it); + } + else + { + if (segment->firstIndex() < first_log_index) { - if (segment->firstIndex() < first_log_index) - { - first_log_index.store(segment->firstIndex(), std::memory_order_release); - if (last_log_index == 0 || (last_log_index - 1) < first_log_index) - last_log_index.store(segment->lastIndex(), std::memory_order_release); - } - it++; + first_log_index.store(segment->firstIndex(), std::memory_order_release); + if (last_log_index == 0 || (last_log_index - 1) < first_log_index) + last_log_index.store(segment->lastIndex(), std::memory_order_release); } + it++; } } @@ -1008,54 +775,50 @@ int LogSegmentStore::removeSegment(UInt64 first_index_kept) last_log_index.store(open_segment->lastIndex(), std::memory_order_release); } } + } - for (auto & seg : to_be_removed) - { - seg->remove(); - LOG_INFO(log, "Remove segment, directory {}, file {}", log_dir, seg->getFileName()); - } - - /// reset last_log_index - if (last_log_index == 0 || (last_log_index - 1) < first_log_index) - last_log_index.store(first_log_index - 1, std::memory_order_release); + for (auto & seg : to_be_removed) + { + LOG_INFO(log, "Remove log segment, file {}", seg->getFileName()); + seg->remove(); } - return 0; + /// reset last_log_index + if (last_log_index == 0 || (last_log_index - 1) < first_log_index) + last_log_index.store(first_log_index - 1, std::memory_order_release); + + return to_be_removed.size(); } int LogSegmentStore::removeSegment() { - UInt32 remove_count = segments.size() + 1 - max_segment_count; - if (remove_count <= 0) - { + std::lock_guard write_lock(seg_mutex); + size_t count = closed_segments.size() + 1 - max_segment_count; + + if (count <= 0) return 0; - } - std::lock_guard write_lock(seg_mutex); - std::vector> remove_vec; + std::vector> to_removed_segments; + std::sort(closed_segments.begin(), closed_segments.end(), compareSegment); + for (size_t i = 0; i < count; i++) { - std::sort(segments.begin(), segments.end(), compareSegment); - for (UInt32 i = 0; i < remove_count; i++) - { - ptr & segment = *(segments.begin()); - remove_vec.push_back(segment); - first_log_index.store(segment->lastIndex() + 1, std::memory_order_release); - segments.erase(segments.begin()); - } + ptr & segment = *(closed_segments.begin()); + to_removed_segments.push_back(segment); + first_log_index.store(segment->lastIndex() + 1, std::memory_order_release); + closed_segments.erase(closed_segments.begin()); } - for (auto & i : remove_vec) + for (auto & to_removed : to_removed_segments) { - i->remove(); - LOG_INFO(log, "Remove segment, directory {}, file {}", log_dir, i->getFileName()); - i = nullptr; + LOG_INFO(log, "Removing file for log segment {}", to_removed->getFileName()); + to_removed->remove(); } - return 0; + return count; } -int LogSegmentStore::truncateLog(UInt64 last_index_kept) +bool LogSegmentStore::truncateLog(UInt64 last_index_kept) { if (last_log_index.load(std::memory_order_acquire) <= last_index_kept) { @@ -1064,92 +827,75 @@ int LogSegmentStore::truncateLog(UInt64 last_index_kept) "Nothing is going to happen since last_log_index {} <= last_index_kept {}", last_log_index.load(std::memory_order_relaxed), last_index_kept); - return 0; + return false; } - std::vector> remove_vec; - ptr last_segment = nullptr; + std::vector> to_removed_segments; + ptr last_segment; + std::lock_guard write_lock(seg_mutex); + /// remove finished segment + for (auto it = closed_segments.begin(); it != closed_segments.end();) { - std::lock_guard write_lock(seg_mutex); - /// remove finished segment - for (auto it = segments.begin(); it != segments.end();) + ptr & segment = *it; + if (segment->firstIndex() > last_index_kept) { - ptr & segment = *it; - if (segment->firstIndex() > last_index_kept) - { - remove_vec.push_back(segment); - it = segments.erase(it); - } - - /// Get the segment to last_index_kept belongs - else if (last_index_kept >= segment->firstIndex() && last_index_kept <= segment->lastIndex()) - { - last_segment = segment; - it++; - } - else - it++; + to_removed_segments.push_back(segment); + it = closed_segments.erase(it); } - - /// remove open segment - if (open_segment) + /// Get the segment to last_index_kept belongs + else if (last_index_kept >= segment->firstIndex() && last_index_kept <= segment->lastIndex()) { - if (open_segment->firstIndex() > last_index_kept) - { - remove_vec.push_back(open_segment); - open_segment = nullptr; - } - else if (last_index_kept >= open_segment->firstIndex() && last_index_kept <= open_segment->lastIndex()) - { - last_segment = open_segment; - } + last_segment = segment; + it++; } + else + it++; } - ///remove files - for (auto & i : remove_vec) + /// remove open segment if needed + if (open_segment) { - i->remove(); - LOG_INFO(log, "Remove segment, directory {}, file {}", log_dir, i->getFileName()); - i = nullptr; + if (open_segment->firstIndex() > last_index_kept) + { + to_removed_segments.push_back(open_segment); + open_segment = nullptr; + } + else if (last_index_kept >= open_segment->firstIndex() && last_index_kept <= open_segment->lastIndex()) + { + last_segment = open_segment; + } } - if (last_segment) + if (!last_segment) + throw Exception(ErrorCodes::LOGICAL_ERROR, "Not found a segment to truncate, last_index_kept {}.", last_index_kept); + + /// remove files + for (auto & to_removed : to_removed_segments) { - bool closed = !last_segment->isOpen(); - const int ret = last_segment->truncate(last_index_kept); + LOG_INFO(log, "Removing file for segment {}", to_removed->getFileName()); + to_removed->remove(); + to_removed = nullptr; + } - if (ret != 0) - { - LOG_ERROR(log, "Truncate error {}, last_index_kept {}", last_segment->getFileName(), last_index_kept); - return ret; - } + bool is_open_before_truncate = last_segment->isOpen(); + bool removed_something = last_segment->truncate(last_index_kept); - if (closed && last_segment->isOpen()) - { - std::lock_guard write_lock(seg_mutex); - if (open_segment) - { - LOG_WARNING(log, "Open segment is not nullptr."); - } - open_segment.swap(last_segment); + if (!removed_something && last_segment->lastIndex() != last_index_kept) + throw Exception(ErrorCodes::LOGICAL_ERROR, "Truncate log to last_index_kept {}, but nothing removed from log segment {}.", last_index_kept, last_segment->getFileName()); - if (!segments.empty()) - segments.erase(segments.end() - 1); - } - if (ret == 0) - last_log_index.store(last_index_kept, std::memory_order_release); + if (!is_open_before_truncate && !last_segment->isOpen()) + throw Exception(ErrorCodes::LOGICAL_ERROR, "Truncate a closed log segment {}, but the truncated log segment is not open.", last_segment->getFileName()); - return ret; - } - else + if (!is_open_before_truncate) { - LOG_WARNING(log, "Truncate log not found last segment, last_index_kept {}.", last_index_kept); + open_segment = last_segment; + if (!closed_segments.empty()) + closed_segments.erase(closed_segments.end() - 1); } last_log_index.store(last_index_kept, std::memory_order_release); - return 0; + return true; } int LogSegmentStore::reset(UInt64 next_log_index) @@ -1162,14 +908,15 @@ int LogSegmentStore::reset(UInt64 next_log_index) std::vector> popped; std::unique_lock write_lock(seg_mutex); - popped.reserve(segments.size()); - for (auto & segment : segments) + popped.reserve(closed_segments.size()); + + for (auto & segment : closed_segments) { popped.push_back(segment); } - segments.clear(); + closed_segments.clear(); if (open_segment) { @@ -1188,68 +935,56 @@ int LogSegmentStore::reset(UInt64 next_log_index) return 0; } -int LogSegmentStore::listSegments() +void LogSegmentStore::loadSegmentMetaData() { Poco::File file_dir(log_dir); if (!file_dir.exists()) - { - LOG_WARNING(log, "Log directory {} is not exists.", log_dir); - return 0; - } + throw Exception(ErrorCodes::FILE_DOESNT_EXIST, "Log directory {} does not exist.", log_dir); std::vector files; file_dir.list(files); - for (const auto& file_name : files) + for (const auto & file_name : files) { if (file_name.find("log_") == String::npos) continue; - LOG_INFO(log, "List log dir {}, file name {}", log_dir, file_name); + LOG_INFO(log, "Find log segment file {}", file_name); - int match = 0; + int match; UInt64 first_index = 0; UInt64 last_index = 0; - char create_time[128]; - match = sscanf(file_name.c_str(), NuRaftLogSegment::LOG_FINISH_FILE_NAME, &first_index, &last_index, create_time); + /// Closed log segment + match = sscanf(file_name.c_str(), NuRaftLogSegment::LOG_FINISH_FILE_NAME, &first_index, &last_index, create_time); if (match == 3) { - LOG_INFO(log, "Restore closed segment, directory {}, first index {}, last index {}", log_dir, first_index, last_index); - ptr segment = cs_new(log_dir, first_index, last_index, file_name); - segments.push_back(segment); + ptr segment = cs_new(log_dir, first_index, last_index, file_name, String(create_time)); + closed_segments.push_back(segment); continue; } + /// Open log segment match = sscanf(file_name.c_str(), NuRaftLogSegment::LOG_OPEN_FILE_NAME, &first_index, create_time); - if (match == 2) { - LOG_INFO(log, "Restore open segment, directory {}, first index {}, file name {}", log_dir, first_index, file_name); - if (!open_segment) - { - open_segment = cs_new(log_dir, first_index, file_name, String(create_time)); - LOG_INFO(log, "Create open segment, directory {}, first index {}, file name {}", log_dir, first_index, file_name); - continue; - } - else - { - LOG_WARNING(log, "Open segment conflict, directory {}, first index {}, file name {}", log_dir, first_index, file_name); - return -1; - } + if (open_segment) + throwFromErrno(ErrorCodes::CORRUPTED_LOG, "Find more than one open segment in {}", log_dir); + open_segment = cs_new(log_dir, first_index, file_name, String(create_time)); + continue; } } - std::sort(segments.begin(), segments.end(), compareSegment); + std::sort(closed_segments.begin(), closed_segments.end(), compareSegment); /// 0 close/open segment /// 1 open segment /// N close segment + 1 open segment if (open_segment) { - if (!segments.empty()) - first_log_index.store((*segments.begin())->firstIndex(), std::memory_order_release); + if (!closed_segments.empty()) + first_log_index.store((*closed_segments.begin())->firstIndex(), std::memory_order_release); else first_log_index.store(open_segment->firstIndex(), std::memory_order_release); @@ -1262,7 +997,7 @@ int LogSegmentStore::listSegments() ptr prev_seg = nullptr; ptr segment; - for (auto it = segments.begin(); it != segments.end();) + for (auto it = closed_segments.begin(); it != closed_segments.end();) { segment = *it; LOG_INFO( @@ -1274,110 +1009,90 @@ int LogSegmentStore::listSegments() segment->lastIndex()); if (segment->firstIndex() > segment->lastIndex()) - { - LOG_WARNING( - log, - "Closed segment is bad, directory {}, current segment first index {}, last index {}", - log_dir, + throw Exception( + ErrorCodes::CORRUPTED_LOG, + "Invalid segment {}, first index {} > last index {}", + segment->getFileName(), segment->firstIndex(), segment->lastIndex()); - return -1; - } if (prev_seg && segment->firstIndex() != prev_seg->lastIndex() + 1) - { - LOG_WARNING( - log, - "Closed segment not in order, directory {}, prev segment last index {}, current segment first index {}", + throw Exception( + ErrorCodes::CORRUPTED_LOG, + "Segment {} does not connect correctly, prev segment last index {}, current segment first index {}", log_dir, prev_seg->lastIndex(), segment->firstIndex()); - return -1; - } + ++it; } if (open_segment) { if (prev_seg && open_segment->firstIndex() != prev_seg->lastIndex() + 1) - { - LOG_WARNING( - log, - "Open segment has hole, directory {}, prev segment last index {}, open segment first index {}", - log_dir, + throw Exception( + ErrorCodes::CORRUPTED_LOG, + "Open segment does not connect correctly, prev segment last index {}, open segment first index {}", prev_seg->lastIndex(), open_segment->firstIndex()); - } } - - return 0; } -int LogSegmentStore::loadSegments() +void LogSegmentStore::loadSegments() { - /// closed segments - ThreadPool load_thread_pool(LOAD_THREAD_NUM); + /// 1. Load closed segments in parallel + + size_t thread_num = std::min(closed_segments.size(), LOAD_THREAD_NUM); + ThreadPool load_thread_pool(thread_num); - for (UInt32 thread_idx = 0; thread_idx < LOAD_THREAD_NUM; thread_idx++) + for (size_t thread_id = 0; thread_id < LOAD_THREAD_NUM; thread_id++) { - load_thread_pool.trySchedule([this, thread_idx] { - Poco::Logger * thread_log = &(Poco::Logger::get("LoadLogThread")); - int ret = 0; - for (size_t seg_idx = 0; seg_idx < this->getClosedSegments().size(); seg_idx++) + load_thread_pool.trySchedule([this, thread_id, thread_num] + { + Poco::Logger * thread_log = &(Poco::Logger::get("LoadClosedLogSegmentThread#" + std::to_string(thread_id))); + for (size_t seg_id = 0; seg_id < closed_segments.size(); seg_id++) { - if (seg_idx % LOAD_THREAD_NUM == thread_idx) + if (seg_id % thread_num == thread_id) { - ptr segment = this->getClosedSegments()[seg_idx]; - LOG_INFO(thread_log, "Load closed segment, first_index {}, last_index {}", segment->firstIndex(), segment->lastIndex()); - ret = segment->load(); - if (ret != 0) - { - LOG_WARNING(log, "Load closed segment {} failed {}", segment->firstIndex(), ret); - continue; - } - if (segment->lastIndex() > this->lastLogIndex()) - { - LOG_INFO(log, "Close segment last index {}", segment->lastIndex()); - this->setLastLogIndex(segment->lastIndex()); - } + ptr segment = closed_segments[seg_id]; + LOG_INFO(thread_log, "Loading closed segment, first_index {}, last_index {}", segment->firstIndex(), segment->lastIndex()); + segment->load(); } } }); } + /// Update last_log_index + if (!closed_segments.empty()) + last_log_index = closed_segments.back()->lastIndex(); + load_thread_pool.wait(); - /// open segment + /// 2. Load open segment + if (open_segment) { - LOG_INFO(log, "Load open segment, directory {}, file name {} ", log_dir, open_segment->getFileName()); - int ret = open_segment->load(); - - if (ret != 0) - return ret; + LOG_INFO(log, "Loading open segment {} ", log_dir, open_segment->getFileName()); + open_segment->load(); if (first_log_index.load() > open_segment->lastIndex()) { - LOG_WARNING( - log, - "open segment need discard, file {}, first_log_index {}, first_index {}, last_index {} ", - open_segment->getFileName(), + throw Exception( + ErrorCodes::CORRUPTED_LOG, + "First log index {} > last index {} of open segment {}", first_log_index.load(), - open_segment->firstIndex(), - open_segment->lastIndex()); - open_segment = nullptr; + open_segment->lastIndex(), + open_segment->getFileName()); } else { + LOG_INFO(log, "The last index of open segment {} is {}", open_segment->lastIndex(), open_segment->getFileName()); last_log_index.store(open_segment->lastIndex(), std::memory_order_release); - LOG_INFO(log, "Open segment last index {} {}", open_segment->lastIndex(), last_log_index); } } if (last_log_index == 0) last_log_index = first_log_index - 1; - - return 0; } } diff --git a/src/Service/NuRaftLogSegment.h b/src/Service/NuRaftLogSegment.h index 36ed7d3b540..8afa10abc82 100644 --- a/src/Service/NuRaftLogSegment.h +++ b/src/Service/NuRaftLogSegment.h @@ -24,11 +24,11 @@ using nuraft::int64; enum class LogVersion : uint8_t { V0 = 0, - V1 = 1, /// with ctime mtime + V1 = 1, /// with ctime, mtime, magic and version }; /// Attach version to log entry -struct VersionLogEntry +struct LogEntryWithVersion { LogVersion version; ptr entry; @@ -39,90 +39,46 @@ static constexpr auto CURRENT_LOG_VERSION = LogVersion::V1; class NuRaftLogSegment { public: - NuRaftLogSegment( - const String & log_dir_, UInt64 first_index_, const String & file_name_ = "", const String & create_time_ = "") - : log_dir(log_dir_) - , first_index(first_index_) - , last_index(first_index_ - 1) - , seg_fd(-1) - , file_name(file_name_) - , file_size(0) - , is_open(true) - , log(&(Poco::Logger::get("LogSegment"))) - , version(CURRENT_LOG_VERSION) - { - if (create_time_.empty()) - { - Poco::DateTime now; - create_time = Poco::DateTimeFormatter::format(now, "%Y%m%d%H%M%S"); - } - else - { - create_time = create_time_; - } - } - - NuRaftLogSegment(const String & log_dir_, UInt64 first_index_, UInt64 last_index_, const String file_name_ = "") - : log_dir(log_dir_) - , first_index(first_index_) - , last_index(last_index_) - , seg_fd(-1) - , file_name(file_name_) - , file_size(0) - , is_open(false) - , log(&(Poco::Logger::get("LogSegment"))) - { - } + /// For new open segment + NuRaftLogSegment(const String & log_dir_, UInt64 first_index_); - ~NuRaftLogSegment() = default; + /// For existing closed segment + NuRaftLogSegment(const String & log_dir_, UInt64 first_index_, UInt64 last_index_, const String & file_name_, const String & create_time_); + /// For existing open segment + NuRaftLogSegment(const String & log_dir_, UInt64 first_index_, const String & file_name_, const String & create_time_); - /// create open segment - /// return 0 if success - int create(); - - /// load an segment - /// return 0 if success - int load(); - - /// Close open segment, return 0 if success. - /// is_full: whether we segment is full, if true, - /// close full open log segment and rename to - /// finish file name, or else close ofstream - int close(bool is_full); + void load(); + inline UInt64 flush() const; - /// remove the segment - /// return 0 if success - int remove(); + /// Close an open segment + /// is_full: whether the segment is full, if true, close full open log segment and rename to finish file name + void close(bool is_full); + void remove(); /** - * write segment file header + * log segment file header * magic : \0RaftLog 8 bytes * version: version 1 bytes */ - void writeFileHeader(); - - /// load data format version, return 0 if success. - size_t loadVersion(); + void writeHeader(); + void readHeader(); /// get data format version LogVersion getVersion() const { return version; } - /// flush log, return last flushed log index if success or 0 if failed - inline UInt64 flush() const; - - /// serialize entry, and append to open segment, return new start index + /// serialize entry, and append to open segment, return appended log index UInt64 appendEntry(ptr entry, std::atomic & last_log_index); - [[maybe_unused]] int writeAt(UInt64 index, const ptr entry); - /// get entry by index, return null if not exist. ptr getEntry(UInt64 index); /// get entry's term by index - UInt64 getTerm(UInt64 index) const; + [[maybe_unused]] UInt64 getTerm(UInt64 index) const; - /// Truncate segment from tail to last_index_kept - int truncate(UInt64 last_index_kept); + /// Truncate segment from tail to last_index_kept. + /// Return true if some logs are removed. + /// This method will re-open the segment file if it is a closed one. + bool truncate(UInt64 last_index_kept); bool isOpen() const { return is_open; } @@ -165,30 +121,28 @@ class NuRaftLogSegment String getOpenFileName(); String getOpenPath(); - /// when open segment reach log limit, - /// we should open a new open segment - /// and move current open segment as - /// close segment. - String getFinishFileName(); - String getFinishPath(); + /// when open segment reach log limit, we should open a new open segment + /// and move current open segment as close segment. + String getClosedFileName(); + String getClosedPath(); /// current segment file path String getPath(); - /// open file by fd, return 0 if success. - int openFile(); - - /// close file, return 0 if success. - int closeFile(); + /// open file by fd + void openFileIfNeeded(); - /// get log entry meta, return 0 if success. - int getMeta(UInt64 index, LogMeta * meta) const; + /// close file, throw exception if failed + void closeFileIfNeeded(); - /// load log entry header - int loadLogEntryHeader(int fd, off_t offset, LogEntryHeader * header) const; + /// get log entry meta + void getMeta(UInt64 index, LogMeta & meta) const; /// load log entry - int loadLogEntry(int fd, off_t offset, LogEntryHeader * head, ptr & entry) const; + void loadLogEntry(int fd, off_t offset, LogEntryHeader & header, ptr & entry) const; + void loadLogEntryHeader(int fd, off_t offset, LogEntryHeader & header) const; + + static constexpr size_t MAGIC_AND_VERSION_SIZE = 9; /// segment file directory String log_dir; @@ -199,21 +153,22 @@ class NuRaftLogSegment /// last log index in the segment std::atomic last_index; - /// segment file fd - int seg_fd; + /// Segment is open or closed, if and only if the segment is open, it can be written. + /// There is no more than one open segment in the log store. + std::atomic_bool is_open = false; + + /// Segment file fd, -1 means the file is not open. + /// All segments files in log store should be open. + int seg_fd = -1; /// segment file name String file_name; /// segment file create time - /// TODO use String create_time; /// segment file size - std::atomic file_size; - - /// open or close - bool is_open; + std::atomic file_size = 0; Poco::Logger * log; @@ -243,7 +198,7 @@ class LogSegmentStore static constexpr UInt32 MAX_SEGMENT_FILE_SIZE = 1000 * 1024 * 1024; //1G, 0.3K/Log, 3M logs static constexpr UInt32 MAX_SEGMENT_COUNT = 50; //50G - static constexpr int LOAD_THREAD_NUM = 8; + static constexpr size_t LOAD_THREAD_NUM = 8; explicit LogSegmentStore(const String & log_dir_) : log_dir(log_dir_), first_log_index(1), last_log_index(0), log(&(Poco::Logger::get("LogSegmentStore"))) @@ -254,12 +209,11 @@ class LogSegmentStore virtual ~LogSegmentStore() = default; static ptr getInstance(const String & log_dir, bool force_new = false); - /// Init log store, will create dir if not exist, return 0 if success - int init(UInt32 max_segment_file_size_ = MAX_SEGMENT_FILE_SIZE, UInt32 max_segment_count_ = MAX_SEGMENT_COUNT); - - int close(); + /// Init log store, will create dir if not exist + void init(UInt32 max_segment_file_size_ = MAX_SEGMENT_FILE_SIZE, UInt32 max_segment_count_ = MAX_SEGMENT_COUNT); - /// flush log, return last flushed log index if success + void close(); + /// Return last flushed log index UInt64 flush(); /// first log index in whole log store @@ -268,49 +222,43 @@ class LogSegmentStore /// last log index in whole log store UInt64 lastLogIndex() { return last_log_index.load(std::memory_order_acquire); } - void setLastLogIndex(UInt64 index) { last_log_index.store(index, std::memory_order_release); } - - /// append entry to log store + /// Append entry to log store UInt64 appendEntry(ptr entry); - /// First truncate log whose index large or equal entry.index, - /// then append it. + /// First truncate log whose index is large than or equals with index of entry, then append it. UInt64 writeAt(UInt64 index, ptr entry); ptr getEntry(UInt64 index); - /// collection entries in [start_index, end_index] + /// Just for test, collection entries in [start_index, end_index] void getEntries(UInt64 start_index, UInt64 end_index, ptr>> & entries); - [[maybe_unused]] void - getEntriesExt(UInt64 start_idx, UInt64 end_idx, int64 batch_size_hint_in_bytes, ptr>> & entries); - [[maybe_unused]] UInt64 getTerm(UInt64 index); - - /// Remove segments from storage's head, logs in [1, first_index_kept) will be discarded, - /// usually invoked when compaction. + /// Remove segments from storage's head, logs in [1, first_index_kept) will be discarded, usually invoked when compaction. + /// return number of segments removed int removeSegment(); int removeSegment(UInt64 first_index_kept); /// Delete uncommitted logs from storage's tail, (last_index_kept, infinity) will be discarded - int truncateLog(UInt64 last_index_kept); + /// Return true if some logs are removed + bool truncateLog(UInt64 last_index_kept); int reset(UInt64 next_log_index); /// get closed segments - Segments & getClosedSegments() { return segments; } + Segments & getClosedSegments() { return closed_segments; } /// get file format version LogVersion getVersion(UInt64 index); private: /// open a new segment, invoked when init - int openSegment(); + void openNewSegmentIfNeeded(); /// list segments, invoked when init - int listSegments(); + void loadSegmentMetaData(); /// load listed segments, invoked when init - int loadSegments(); + void loadSegments(); - /// find segment by log index - int getSegment(UInt64 log_index, ptr & ptr); + /// find segment by log index, return null if not found + ptr getSegment(UInt64 log_index); /// file log store directory String log_dir; @@ -328,7 +276,7 @@ class LogSegmentStore Poco::Logger * log; /// closed segments - Segments segments; + Segments closed_segments; /// open segments ptr open_segment; diff --git a/src/Service/NuRaftStateMachine.cpp b/src/Service/NuRaftStateMachine.cpp index 2a5be89f41c..b3e6d205e59 100644 --- a/src/Service/NuRaftStateMachine.cpp +++ b/src/Service/NuRaftStateMachine.cpp @@ -38,7 +38,7 @@ struct ReplayLogBatch { ulong batch_start_index = 0; ulong batch_end_index = 0; - ptr> log_vec; + ptr> log_vec; ptr>> request_vec; }; diff --git a/src/Service/tests/gtest_raft_log.cpp b/src/Service/tests/gtest_raft_log.cpp index 273aca06a16..ceb3f6fa89d 100644 --- a/src/Service/tests/gtest_raft_log.cpp +++ b/src/Service/tests/gtest_raft_log.cpp @@ -6,7 +6,6 @@ #include #include #include -#include #include @@ -100,7 +99,7 @@ TEST(RaftLog, appendEntry) String log_dir(LOG_DIR + "/1"); cleanDirectory(log_dir); auto log_store = LogSegmentStore::getInstance(log_dir, true); - ASSERT_EQ(log_store->init(), 0); + ASSERT_NO_THROW(log_store->init()); UInt64 term = 1; String key("/ck/table/table1"); @@ -115,7 +114,7 @@ TEST(RaftLog, appendSomeEntry) String log_dir(LOG_DIR + "/2"); cleanDirectory(log_dir); auto log_store = LogSegmentStore::getInstance(log_dir, true); - ASSERT_EQ(log_store->init(), 0); + ASSERT_NO_THROW(log_store->init()); for (int i = 0; i < 3; i++) { @@ -133,7 +132,7 @@ TEST(RaftLog, loadLog) String log_dir(LOG_DIR + "/3"); cleanDirectory(log_dir); auto log_store = LogSegmentStore::getInstance(log_dir, true); - ASSERT_EQ(log_store->init(), 0); + ASSERT_NO_THROW(log_store->init()); for (int i = 0; i < 3; i++) { UInt64 term = 1; @@ -141,9 +140,9 @@ TEST(RaftLog, loadLog) String data("CREATE TABLE table1;"); ASSERT_EQ(appendEntry(log_store, term, key, data), i + 1); } - ASSERT_EQ(log_store->close(), 0); + ASSERT_NO_THROW(log_store->close()); //Load prev log segment from disk when log_store init. - ASSERT_EQ(log_store->init(), 0); + ASSERT_NO_THROW(log_store->init()); for (int i = 0; i < 3; i++) { UInt64 term = 1; @@ -160,7 +159,7 @@ TEST(RaftLog, splitSegment) String log_dir(LOG_DIR + "/4"); cleanDirectory(log_dir); auto log_store = LogSegmentStore::getInstance(log_dir, true); - ASSERT_EQ(log_store->init(200, 10), 0); //81 byte / log + ASSERT_NO_THROW(log_store->init(200, 10)); //81 byte / log for (int i = 0; i < 12; i++) { UInt64 term = 1; @@ -169,7 +168,7 @@ TEST(RaftLog, splitSegment) ASSERT_EQ(appendEntry(log_store, term, key, data), i + 1); } ASSERT_EQ(log_store->getClosedSegments().size(), 5); - ASSERT_EQ(log_store->close(), 0); + ASSERT_NO_THROW(log_store->close()); cleanDirectory(log_dir); } @@ -178,7 +177,7 @@ TEST(RaftLog, removeSegment) String log_dir(LOG_DIR + "/5"); cleanDirectory(log_dir); auto log_store = LogSegmentStore::getInstance(log_dir, true); - ASSERT_EQ(log_store->init(200, 3), 0); + ASSERT_NO_THROW(log_store->init(200, 3)); //5 segment for (int i = 0; i < 10; i++) { @@ -198,7 +197,7 @@ TEST(RaftLog, removeSegment) ASSERT_EQ(log_store->getClosedSegments().size(), 2); //2 finish_segment + 1 open_segment = 3 ASSERT_EQ(log_store->firstLogIndex(), 5); ASSERT_EQ(log_store->lastLogIndex(), 10); - ASSERT_EQ(log_store->close(), 0); + ASSERT_NO_THROW(log_store->close()); //cleanDirectory(log_dir); } @@ -207,7 +206,7 @@ TEST(RaftLog, truncateLog) String log_dir(LOG_DIR + "/6"); cleanDirectory(log_dir); auto log_store = LogSegmentStore::getInstance(log_dir, true); - ASSERT_EQ(log_store->init(200, 3), 0); + ASSERT_NO_THROW(log_store->init(200, 3)); //8 segment, index 1-16 for (int i = 0; i < 16; i++) { @@ -252,7 +251,7 @@ TEST(RaftLog, truncateLog) ASSERT_EQ("/ck/table/table1", zk_create_request3->path); ASSERT_EQ("CREATE TABLE table1;", zk_create_request3->data); - ASSERT_EQ(log_store->close(), 0); + ASSERT_NO_THROW(log_store->close()); cleanDirectory(log_dir); } @@ -376,7 +375,7 @@ TEST(RaftLog, getEntry) String log_dir(LOG_DIR + "/7"); cleanDirectory(log_dir); auto log_store = LogSegmentStore::getInstance(log_dir, true); - ASSERT_EQ(log_store->init(100, 3), 0); + ASSERT_NO_THROW(log_store->init(100, 3)); UInt64 term = 1; String key("/ck/table/table1"); String data("CREATE TABLE table1;"); @@ -406,7 +405,7 @@ TEST(RaftLog, getEntries) String log_dir(LOG_DIR + "/8"); cleanDirectory(log_dir); auto log_store = LogSegmentStore::getInstance(log_dir, true); - ASSERT_EQ(log_store->init(250, 3), 0); //69 * 4 = 276 + ASSERT_NO_THROW(log_store->init(250, 3)); //69 * 4 = 276 for (int i = 0; i < 8; i++) { UInt64 term = 1; diff --git a/src/Service/tests/raft_test_common.cpp b/src/Service/tests/raft_test_common.cpp index 255cfad19a4..35a90e48459 100644 --- a/src/Service/tests/raft_test_common.cpp +++ b/src/Service/tests/raft_test_common.cpp @@ -5,7 +5,6 @@ #include #include -#include #include From 7ad30d4d5d7de5c47ca145f9f13b19567e06a643 Mon Sep 17 00:00:00 2001 From: JackyWoo Date: Fri, 9 Aug 2024 09:53:29 +0800 Subject: [PATCH 2/4] Remove useless clang tidy ignore and fix uint tests --- src/Service/KeeperUtils.h | 1 + src/Service/LogEntry.cpp | 6 +- src/Service/NuRaftFileLogStore.cpp | 5 +- src/Service/NuRaftFileLogStore.h | 3 +- src/Service/NuRaftLogSegment.cpp | 158 +++++++-------------------- src/Service/NuRaftLogSegment.h | 26 ++--- src/Service/NuRaftLogSnapshot.cpp | 9 -- src/Service/NuRaftStateMachine.cpp | 10 -- src/Service/SnapshotCommon.cpp | 9 -- src/Service/SnapshotCommon.h | 12 +- src/Service/tests/gtest_raft_log.cpp | 37 +++---- 11 files changed, 75 insertions(+), 201 deletions(-) diff --git a/src/Service/KeeperUtils.h b/src/Service/KeeperUtils.h index f34fc218d24..5940bdea874 100644 --- a/src/Service/KeeperUtils.h +++ b/src/Service/KeeperUtils.h @@ -13,6 +13,7 @@ namespace RK { +/// Serialize ZooKeeper request to log nuraft::ptr getZooKeeperLogEntry(int64_t session_id, int64_t time, const Coordination::ZooKeeperRequestPtr & request); nuraft::ptr makeClone(const nuraft::ptr & entry); diff --git a/src/Service/LogEntry.cpp b/src/Service/LogEntry.cpp index 0f90a194250..d3d16a3c53d 100644 --- a/src/Service/LogEntry.cpp +++ b/src/Service/LogEntry.cpp @@ -6,7 +6,7 @@ namespace RK using nuraft::byte; using nuraft::cs_new; - +/// Add entry type to the log entry ptr LogEntryBody::serialize(ptr & entry) { ptr entry_buf; @@ -24,11 +24,11 @@ ptr LogEntryBody::serialize(ptr & entry) ptr LogEntryBody::parse(const char * entry_str, size_t buf_size) { - nuraft::log_val_type tp = static_cast(entry_str[0]); + nuraft::log_val_type type = static_cast(entry_str[0]); auto data = buffer::alloc(buf_size - 1); data->put_raw(reinterpret_cast(entry_str + 1), buf_size - 1); data->pos(0); - return cs_new(0, data, tp); /// term is set latter + return cs_new(0, data, type); /// term is set latter } } diff --git a/src/Service/NuRaftFileLogStore.cpp b/src/Service/NuRaftFileLogStore.cpp index 6e78b231348..e1e947e6702 100644 --- a/src/Service/NuRaftFileLogStore.cpp +++ b/src/Service/NuRaftFileLogStore.cpp @@ -60,8 +60,7 @@ NuRaftFileLogStore::NuRaftFileLogStore( bool force_new, FsyncMode log_fsync_mode_, UInt64 log_fsync_interval_, - UInt32 max_log_size_, - UInt32 max_segment_count_) + UInt32 max_log_size_) : log_fsync_mode(log_fsync_mode_), log_fsync_interval(log_fsync_interval_) { log = &(Poco::Logger::get("FileLogStore")); @@ -74,7 +73,7 @@ NuRaftFileLogStore::NuRaftFileLogStore( } segment_store = LogSegmentStore::getInstance(log_dir, force_new); - segment_store->init(max_log_size_, max_segment_count_); + segment_store->init(max_log_size_); if (segment_store->lastLogIndex() < 1) /// no log entry exists, return a dummy constant entry with value set to null and term set to zero diff --git a/src/Service/NuRaftFileLogStore.h b/src/Service/NuRaftFileLogStore.h index 79a19d3acf7..668452ba8cd 100644 --- a/src/Service/NuRaftFileLogStore.h +++ b/src/Service/NuRaftFileLogStore.h @@ -57,8 +57,7 @@ class NuRaftFileLogStore : public nuraft::log_store bool force_new = false, FsyncMode log_fsync_mode_ = FsyncMode::FSYNC_PARALLEL, UInt64 log_fsync_interval_ = 1000, - UInt32 max_log_size_ = LogSegmentStore::MAX_SEGMENT_FILE_SIZE, - UInt32 max_segment_count_ = LogSegmentStore::MAX_SEGMENT_COUNT); + UInt32 max_log_size_ = LogSegmentStore::MAX_SEGMENT_FILE_SIZE); ~NuRaftFileLogStore() override; diff --git a/src/Service/NuRaftLogSegment.cpp b/src/Service/NuRaftLogSegment.cpp index f9554eb456c..2877bc660a9 100644 --- a/src/Service/NuRaftLogSegment.cpp +++ b/src/Service/NuRaftLogSegment.cpp @@ -13,10 +13,6 @@ #include #include -#ifdef __clang__ -# pragma clang diagnostic push -# pragma clang diagnostic ignored "-Wformat-nonliteral" -#endif namespace RK { @@ -57,8 +53,6 @@ NuRaftLogSegment::NuRaftLogSegment(const String & log_dir_, UInt64 first_index_) , log(&(Poco::Logger::get("NuRaftLogSegment"))) , version(CURRENT_LOG_VERSION) { - LOG_INFO(log, "create new log segment, first index {}", first_index); - Poco::DateTime now; create_time = Poco::DateTimeFormatter::format(now, "%Y%m%d%H%M%S"); @@ -67,6 +61,8 @@ NuRaftLogSegment::NuRaftLogSegment(const String & log_dir_, UInt64 first_index_) file_name = getOpenFileName(); String full_path = getOpenPath(); + LOG_INFO(log, "Creating new log segment {}", file_name); + if (Poco::File(full_path).exists()) throw Exception(ErrorCodes::LOGICAL_ERROR, "Try to create a log segment but file {} already exists.", full_path); @@ -89,6 +85,7 @@ NuRaftLogSegment::NuRaftLogSegment(const String & log_dir_, UInt64 first_index_, : log_dir(log_dir_) , first_index(first_index_) , last_index(first_index_ - 1) + , is_open(true) , file_name(file_name_) , create_time(create_time_) , log(&(Poco::Logger::get("NuRaftLogSegment"))) @@ -179,7 +176,7 @@ void NuRaftLogSegment::writeHeader() if (write(seg_fd, &version_uint8, 1) != 1) throwFromErrno(ErrorCodes::CANNOT_WRITE_TO_FILE_DESCRIPTOR, "Cannot write version to {}", file_name); - file_size.fetch_add(sizeof(uint64_t) + sizeof(uint8_t), std::memory_order_release); + file_size.fetch_add(MAGIC_AND_VERSION_SIZE, std::memory_order_release); } void NuRaftLogSegment::load() @@ -201,8 +198,7 @@ void NuRaftLogSegment::load() UInt64 last_index_read = first_index - 1; for (; entry_off < file_size_read;) { - LogEntryHeader header; - loadLogEntryHeader(seg_fd, entry_off, header); + LogEntryHeader header = loadEntryHeader(entry_off); const UInt64 log_entry_len = sizeof(LogEntryHeader) + header.data_length; @@ -349,13 +345,12 @@ UInt64 NuRaftLogSegment::appendEntry(ptr entry, std::atomic & LogEntryHeader header; struct iovec vec[2]; + ptr entry_buf; + char * data_in_buf; { if (!is_open || seg_fd == -1) throw Exception(ErrorCodes::LOGICAL_ERROR, "Append log but segment {} is not open.", file_name); - ptr entry_buf; - char * data_in_buf; - entry_buf = LogEntryBody::serialize(entry); data_in_buf = reinterpret_cast(entry_buf->data_begin()); @@ -404,13 +399,13 @@ UInt64 NuRaftLogSegment::appendEntry(ptr entry, std::atomic & } -void NuRaftLogSegment::getMeta(UInt64 index, LogMeta & meta) const +NuRaftLogSegment::LogMeta NuRaftLogSegment::getMeta(UInt64 index) const { if (last_index == first_index - 1 || index > last_index.load(std::memory_order_relaxed) || index < first_index) throw Exception(ErrorCodes::LOGICAL_ERROR, "Get meta for log {} failed, index out of range [{},{}].", index, first_index, last_index.load(std::memory_order_relaxed)); UInt64 meta_index = index - first_index; - UInt64 entry_offset = offset_term[meta_index].first; + UInt64 file_offset = offset_term[meta_index].first; UInt64 next_offset; @@ -419,17 +414,20 @@ void NuRaftLogSegment::getMeta(UInt64 index, LogMeta & meta) const else next_offset = file_size; - meta.offset = entry_offset; + LogMeta meta; + meta.offset = file_offset; meta.term = offset_term[meta_index].second; - meta.length = next_offset - entry_offset; + meta.length = next_offset - file_offset; + + return meta; } -void NuRaftLogSegment::loadLogEntryHeader(int fd, off_t offset, LogEntryHeader & header) const +LogEntryHeader NuRaftLogSegment::loadEntryHeader(off_t offset) const { ptr buf = buffer::alloc(LogEntryHeader::HEADER_SIZE); buf->pos(0); - ssize_t size = pread(fd, buf->data(), LogEntryHeader::HEADER_SIZE, offset); + ssize_t size = pread(seg_fd, buf->data(), LogEntryHeader::HEADER_SIZE, offset); if (size != LogEntryHeader::HEADER_SIZE) throwFromErrno(ErrorCodes::CANNOT_READ_FROM_FILE_DESCRIPTOR, "Fail to read header of log segment {}", file_name); @@ -437,30 +435,37 @@ void NuRaftLogSegment::loadLogEntryHeader(int fd, off_t offset, LogEntryHeader & buffer_serializer bs(buf); bs.pos(0); + LogEntryHeader header; header.term = bs.get_u64(); header.index = bs.get_u64(); header.data_length = bs.get_u32(); header.data_crc = bs.get_u32(); + + return header; } -void NuRaftLogSegment::loadLogEntry(int fd, off_t offset, LogEntryHeader & header, ptr & entry) const +ptr NuRaftLogSegment::loadEntry(const LogMeta & meta) const { - loadLogEntryHeader(fd, offset, header); + LogEntryHeader header = loadEntryHeader(meta.offset); char * entry_str = new char[header.data_length]; - ssize_t ret = pread(fd, entry_str, header.data_length, offset + LogEntryHeader::HEADER_SIZE); + ssize_t size_read = pread(seg_fd, entry_str, header.data_length, meta.offset + LogEntryHeader::HEADER_SIZE); + + if (size_read != header.data_length) + throwFromErrno(ErrorCodes::CORRUPTED_LOG, "Fail to read log entry with offset {} from log segment {}", meta.offset, file_name); - if (ret != header.data_length) - throwFromErrno(ErrorCodes::CORRUPTED_LOG, "Fail to read log entry with offset {} from log segment {}", offset, file_name); + String s(entry_str, header.data_length); + LOG_INFO(log, "get: {}" + s); if (!verifyCRC32(entry_str, header.data_length, header.data_crc)) throw Exception(ErrorCodes::CORRUPTED_LOG, "Checking CRC32 failed for log segment {}.", file_name); - entry = LogEntryBody::parse(entry_str, header.data_length); + auto entry = LogEntryBody::parse(entry_str, header.data_length); entry->set_term(header.term); delete[] entry_str; + return entry; } ptr NuRaftLogSegment::getEntry(UInt64 index) @@ -471,21 +476,8 @@ ptr NuRaftLogSegment::getEntry(UInt64 index) } std::shared_lock read_lock(log_mutex); - LogMeta meta; - getMeta(index, meta); - - ptr entry; - LogEntryHeader header; - loadLogEntry(seg_fd, meta.offset, header, entry); - - return entry; -} - -[[maybe_unused]] UInt64 NuRaftLogSegment::getTerm(UInt64 index) const -{ - LogMeta meta; - getMeta(index, meta); - return meta.term; + LogMeta meta = getMeta(index); + return loadEntry(meta); } bool NuRaftLogSegment::truncate(const UInt64 last_index_kept) @@ -549,7 +541,7 @@ bool NuRaftLogSegment::truncate(const UInt64 last_index_kept) if (ftruncate(seg_fd, file_size_to_keep) != 0) throwFromErrno(ErrorCodes::CANNOT_WRITE_TO_FILE_DESCRIPTOR, "Fail to truncate log segment {}", file_name); - LOG_INFO(log, "Truncate file {} descriptor {}, from {} to size {}", getOpenPath(), seg_fd, file_size, file_size_to_keep); + LOG_INFO(log, "Truncate file {} with fd {}, from {} to size {}", file_name, seg_fd, file_size_to_keep, file_size); /// seek fd off_t ret_off = lseek(seg_fd, file_size_to_keep, SEEK_SET); @@ -573,16 +565,11 @@ ptr LogSegmentStore::getInstance(const String & log_dir_, bool return segment_store; } -void LogSegmentStore::init(UInt32 max_segment_file_size_, UInt32 max_segment_count_) +void LogSegmentStore::init(UInt32 max_segment_file_size_) { - LOG_INFO( - log, - "Initializing log segment store, max segment file size {} bytes, max segment count {}.", - max_segment_file_size_, - max_segment_count_); + LOG_INFO(log, "Initializing log segment store, max segment file size {} bytes.", max_segment_file_size_); max_segment_file_size = max_segment_file_size_; - max_segment_count = max_segment_count_; Poco::File(log_dir).createDirectories(); @@ -648,7 +635,7 @@ ptr LogSegmentStore::getSegment(UInt64 index) UInt64 last_index = last_log_index.load(std::memory_order_acquire); /// No log - if (first_index < last_index) + if (first_index > last_index) return nullptr; if (index < first_index || index > last_index) @@ -790,34 +777,6 @@ int LogSegmentStore::removeSegment(UInt64 first_index_kept) return to_be_removed.size(); } - -int LogSegmentStore::removeSegment() -{ - std::lock_guard write_lock(seg_mutex); - size_t count = closed_segments.size() + 1 - max_segment_count; - - if (count <= 0) - return 0; - - std::vector> to_removed_segments; - std::sort(closed_segments.begin(), closed_segments.end(), compareSegment); - - for (size_t i = 0; i < count; i++) - { - ptr & segment = *(closed_segments.begin()); - to_removed_segments.push_back(segment); - first_log_index.store(segment->lastIndex() + 1, std::memory_order_release); - closed_segments.erase(closed_segments.begin()); - } - - for (auto & to_removed : to_removed_segments) - { - LOG_INFO(log, "Removing file for log segment {}", to_removed->getFileName()); - to_removed->remove(); - } - return count; -} - bool LogSegmentStore::truncateLog(UInt64 last_index_kept) { if (last_log_index.load(std::memory_order_acquire) <= last_index_kept) @@ -898,43 +857,6 @@ bool LogSegmentStore::truncateLog(UInt64 last_index_kept) return true; } -int LogSegmentStore::reset(UInt64 next_log_index) -{ - if (next_log_index <= 0) - { - /// LOG_ERROR << "Invalid next_log_index=" << next_log_index << " path: " << log_dir; - return EINVAL; - } - - std::vector> popped; - std::unique_lock write_lock(seg_mutex); - - popped.reserve(closed_segments.size()); - - for (auto & segment : closed_segments) - { - popped.push_back(segment); - } - - closed_segments.clear(); - - if (open_segment) - { - popped.push_back(open_segment); - open_segment = nullptr; - } - - first_log_index.store(next_log_index, std::memory_order_release); - last_log_index.store(next_log_index - 1, std::memory_order_release); - - write_lock.unlock(); - for (auto & i : popped) - { - i = nullptr; - } - return 0; -} - void LogSegmentStore::loadSegmentMetaData() { Poco::File file_dir(log_dir); @@ -994,7 +916,7 @@ void LogSegmentStore::loadSegmentMetaData() /// check segment /// last_log_index = 0; - ptr prev_seg = nullptr; + ptr prev_seg; ptr segment; for (auto it = closed_segments.begin(); it != closed_segments.end();) @@ -1045,7 +967,7 @@ void LogSegmentStore::loadSegments() size_t thread_num = std::min(closed_segments.size(), LOAD_THREAD_NUM); ThreadPool load_thread_pool(thread_num); - for (size_t thread_id = 0; thread_id < LOAD_THREAD_NUM; thread_id++) + for (size_t thread_id = 0; thread_id < thread_num; thread_id++) { load_thread_pool.trySchedule([this, thread_id, thread_num] { @@ -1062,7 +984,7 @@ void LogSegmentStore::loadSegments() }); } - /// Update last_log_index + /// Update last_log_index from closed segments if (!closed_segments.empty()) last_log_index = closed_segments.back()->lastIndex(); @@ -1096,7 +1018,3 @@ void LogSegmentStore::loadSegments() } } - -#ifdef __clang__ -# pragma clang diagnostic pop -#endif diff --git a/src/Service/NuRaftLogSegment.h b/src/Service/NuRaftLogSegment.h index 8afa10abc82..67bd2689205 100644 --- a/src/Service/NuRaftLogSegment.h +++ b/src/Service/NuRaftLogSegment.h @@ -72,9 +72,6 @@ class NuRaftLogSegment /// get entry by index, return null if not exist. ptr getEntry(UInt64 index); - /// get entry's term by index - [[maybe_unused]] UInt64 getTerm(UInt64 index) const; - /// Truncate segment from tail to last_index_kept. /// Return true if some logs are removed. /// This method will re-open the segment file if it is a closed one. @@ -136,11 +133,11 @@ class NuRaftLogSegment void closeFileIfNeeded(); /// get log entry meta - void getMeta(UInt64 index, LogMeta & meta) const; + LogMeta getMeta(UInt64 index) const; /// load log entry - void loadLogEntry(int fd, off_t offset, LogEntryHeader & header, ptr & entry) const; - void loadLogEntryHeader(int fd, off_t offset, LogEntryHeader & header) const; + ptr loadEntry(const LogMeta & meta) const; + LogEntryHeader loadEntryHeader(off_t offset) const; static constexpr size_t MAGIC_AND_VERSION_SIZE = 9; @@ -197,7 +194,6 @@ class LogSegmentStore using Segments = std::vector>; static constexpr UInt32 MAX_SEGMENT_FILE_SIZE = 1000 * 1024 * 1024; //1G, 0.3K/Log, 3M logs - static constexpr UInt32 MAX_SEGMENT_COUNT = 50; //50G static constexpr size_t LOAD_THREAD_NUM = 8; explicit LogSegmentStore(const String & log_dir_) @@ -210,7 +206,7 @@ class LogSegmentStore static ptr getInstance(const String & log_dir, bool force_new = false); /// Init log store, will create dir if not exist - void init(UInt32 max_segment_file_size_ = MAX_SEGMENT_FILE_SIZE, UInt32 max_segment_count_ = MAX_SEGMENT_COUNT); + void init(UInt32 max_segment_file_size_ = MAX_SEGMENT_FILE_SIZE); void close(); /// Return last flushed log index @@ -234,17 +230,18 @@ class LogSegmentStore /// Remove segments from storage's head, logs in [1, first_index_kept) will be discarded, usually invoked when compaction. /// return number of segments removed - int removeSegment(); int removeSegment(UInt64 first_index_kept); /// Delete uncommitted logs from storage's tail, (last_index_kept, infinity) will be discarded /// Return true if some logs are removed bool truncateLog(UInt64 last_index_kept); - int reset(UInt64 next_log_index); - - /// get closed segments - Segments & getClosedSegments() { return closed_segments; } + /// get closed segments, only for tests + Segments getClosedSegments() + { + std::shared_lock read_lock(seg_mutex); + return closed_segments; + } /// get file format version LogVersion getVersion(UInt64 index); @@ -270,9 +267,6 @@ class LogSegmentStore /// max segment file size UInt32 max_segment_file_size; - /// max segment count - UInt32 max_segment_count; - Poco::Logger * log; /// closed segments diff --git a/src/Service/NuRaftLogSnapshot.cpp b/src/Service/NuRaftLogSnapshot.cpp index 4387079ed86..7aa7d08f5d5 100644 --- a/src/Service/NuRaftLogSnapshot.cpp +++ b/src/Service/NuRaftLogSnapshot.cpp @@ -21,11 +21,6 @@ #include #include -#ifdef __clang__ -# pragma clang diagnostic push -# pragma clang diagnostic ignored "-Wformat-nonliteral" -#endif - namespace RK { @@ -974,7 +969,3 @@ size_t KeeperSnapshotManager::removeSnapshots() } } - -#ifdef __clang__ -# pragma clang diagnostic pop -#endif diff --git a/src/Service/NuRaftStateMachine.cpp b/src/Service/NuRaftStateMachine.cpp index b3e6d205e59..f086048dec3 100644 --- a/src/Service/NuRaftStateMachine.cpp +++ b/src/Service/NuRaftStateMachine.cpp @@ -16,12 +16,6 @@ #include -#ifdef __clang__ -# pragma clang diagnostic push -# pragma clang diagnostic ignored "-Wzero-as-null-pointer-constant" -#endif - - using namespace nuraft; namespace RK @@ -814,7 +808,3 @@ void NuRaftStateMachine::reset() } } - -#ifdef __clang__ -# pragma clang diagnostic pop -#endif diff --git a/src/Service/SnapshotCommon.cpp b/src/Service/SnapshotCommon.cpp index f753e8dbbd0..b68589a3d6f 100644 --- a/src/Service/SnapshotCommon.cpp +++ b/src/Service/SnapshotCommon.cpp @@ -9,11 +9,6 @@ #include #include -#ifdef __clang__ -# pragma clang diagnostic push -# pragma clang diagnostic ignored "-Wformat-nonliteral" -#endif - namespace RK { @@ -587,7 +582,3 @@ void parseBatchIntMapV2(KeeperStore & store, SnapshotBatchBody & batch, Snapshot } } - -#ifdef __clang__ -# pragma clang diagnostic pop -#endif diff --git a/src/Service/SnapshotCommon.h b/src/Service/SnapshotCommon.h index 6e80f325e43..4ceaffee9df 100644 --- a/src/Service/SnapshotCommon.h +++ b/src/Service/SnapshotCommon.h @@ -9,14 +9,6 @@ #include #include #include -#ifdef __clang__ -# pragma clang diagnostic push -# pragma clang diagnostic ignored "-Wsuggest-destructor-override" -# pragma clang diagnostic ignored "-Wheader-hygiene" -#endif -#ifdef __clang__ -# pragma clang diagnostic pop -#endif #include @@ -34,8 +26,10 @@ static constexpr UInt32 SAVE_BATCH_SIZE = 10000; using NumToACLMap = std::unordered_map; using SessionAndAuth = std::unordered_map; using SessionAndTimeout = std::unordered_map; + using StringMap = std::unordered_map; using IntMap = std::unordered_map; + using BucketEdges = KeeperStore::BucketEdges; using BucketNodes = KeeperStore::BucketNodes; @@ -113,8 +107,6 @@ String serializeKeeperNode(const String & path, const ptr & node, Sn ptr parseKeeperNode(const String & buf, SnapshotVersion version); -/// ----- For snapshot version 2 ----- - /// save batch data in snapshot object std::pair saveBatchV2(ptr & out, ptr & batch); std::pair diff --git a/src/Service/tests/gtest_raft_log.cpp b/src/Service/tests/gtest_raft_log.cpp index ceb3f6fa89d..d97e196e978 100644 --- a/src/Service/tests/gtest_raft_log.cpp +++ b/src/Service/tests/gtest_raft_log.cpp @@ -3,8 +3,8 @@ #include #include -#include #include +#include #include #include @@ -109,7 +109,7 @@ TEST(RaftLog, appendEntry) cleanDirectory(log_dir); } -TEST(RaftLog, appendSomeEntry) +TEST(RaftLog, appendEntries) { String log_dir(LOG_DIR + "/2"); cleanDirectory(log_dir); @@ -153,13 +153,12 @@ TEST(RaftLog, loadLog) cleanDirectory(log_dir); } - TEST(RaftLog, splitSegment) { String log_dir(LOG_DIR + "/4"); cleanDirectory(log_dir); auto log_store = LogSegmentStore::getInstance(log_dir, true); - ASSERT_NO_THROW(log_store->init(200, 10)); //81 byte / log + ASSERT_NO_THROW(log_store->init(200)); //81 byte / log for (int i = 0; i < 12; i++) { UInt64 term = 1; @@ -177,7 +176,7 @@ TEST(RaftLog, removeSegment) String log_dir(LOG_DIR + "/5"); cleanDirectory(log_dir); auto log_store = LogSegmentStore::getInstance(log_dir, true); - ASSERT_NO_THROW(log_store->init(200, 3)); + ASSERT_NO_THROW(log_store->init(200)); //5 segment for (int i = 0; i < 10; i++) { @@ -189,16 +188,15 @@ TEST(RaftLog, removeSegment) //[1,2],[3,4],[5,6],[7,8],[9,open] ASSERT_EQ(log_store->getClosedSegments().size(), 4); - ASSERT_EQ(log_store->removeSegment(3), 0); //remove first segment[1,2] + ASSERT_EQ(log_store->removeSegment(3), 1); //remove first segment[1,2] ASSERT_EQ(log_store->getClosedSegments().size(), 3); ASSERT_EQ(log_store->firstLogIndex(), 3); ASSERT_EQ(log_store->lastLogIndex(), 10); - ASSERT_EQ(log_store->removeSegment(), 0); //remove more than MAX_SEGMENT_COUNT segment - ASSERT_EQ(log_store->getClosedSegments().size(), 2); //2 finish_segment + 1 open_segment = 3 - ASSERT_EQ(log_store->firstLogIndex(), 5); + ASSERT_EQ(log_store->getClosedSegments().size(), 3); //3 finish_segment + 1 open_segment = 3 + ASSERT_EQ(log_store->firstLogIndex(), 3); ASSERT_EQ(log_store->lastLogIndex(), 10); ASSERT_NO_THROW(log_store->close()); - //cleanDirectory(log_dir); + cleanDirectory(log_dir); } TEST(RaftLog, truncateLog) @@ -206,7 +204,7 @@ TEST(RaftLog, truncateLog) String log_dir(LOG_DIR + "/6"); cleanDirectory(log_dir); auto log_store = LogSegmentStore::getInstance(log_dir, true); - ASSERT_NO_THROW(log_store->init(200, 3)); + ASSERT_NO_THROW(log_store->init(200)); //8 segment, index 1-16 for (int i = 0; i < 16; i++) { @@ -218,7 +216,7 @@ TEST(RaftLog, truncateLog) ASSERT_EQ(log_store->getClosedSegments().size(), 7); ASSERT_EQ(log_store->lastLogIndex(), 16); - ASSERT_EQ(log_store->truncateLog(15), 0); //truncate open segment + ASSERT_TRUE(log_store->truncateLog(15)); //truncate open segment ASSERT_EQ(log_store->lastLogIndex(), 15); ptr log = log_store->getEntry(15); @@ -229,11 +227,11 @@ TEST(RaftLog, truncateLog) ASSERT_EQ("CREATE TABLE table1;", zk_create_request->data); ASSERT_EQ(log_store->getClosedSegments().size(), 7); - ASSERT_EQ(log_store->truncateLog(13), 0); //truncate close and open segment + ASSERT_TRUE(log_store->truncateLog(13)); //truncate close and open segment ASSERT_EQ(log_store->getClosedSegments().size(), 6); ASSERT_EQ(log_store->lastLogIndex(), 13); //truncate close and open segment - ASSERT_EQ(log_store->truncateLog(2), 0); //truncate close and open segment + ASSERT_TRUE(log_store->truncateLog(2)); //truncate close and open segment ptr log2 = log_store->getEntry(2); ASSERT_EQ(log2->get_term(), 1); @@ -242,7 +240,7 @@ TEST(RaftLog, truncateLog) ASSERT_EQ("/ck/table/table1", zk_create_request2->path); ASSERT_EQ("CREATE TABLE table1;", zk_create_request2->data); - ASSERT_EQ(log_store->truncateLog(1), 0); //truncate close and open segment + ASSERT_TRUE(log_store->truncateLog(1)); //truncate close and open segment ptr log3 = log_store->getEntry(1); ASSERT_EQ(log3->get_term(), 1); @@ -321,15 +319,15 @@ TEST(RaftLog, writeAt) auto zk_request2 = getZookeeperCreateRequest(log2); ASSERT_EQ("/ck/table/table22222222222233312222221", zk_request2->path); ASSERT_EQ("CREATE TABLE table22222222221111123222222222333;", zk_request2->data); + cleanDirectory(log_dir); } - TEST(RaftLog, compact) { String log_dir(LOG_DIR + "/10"); cleanDirectory(log_dir); ptr file_store - = cs_new(log_dir, true, FsyncMode::FSYNC, 1000, static_cast(200), static_cast(3)); + = cs_new(log_dir, true, FsyncMode::FSYNC, 1000, static_cast(200)); UInt64 term = 1; String key("/ck/table/table1"); @@ -368,6 +366,7 @@ TEST(RaftLog, compact) auto zk_request2 = getZookeeperCreateRequest(log2); ASSERT_EQ("/ck/table/table22222222222233312222221", zk_request2->path); ASSERT_EQ("CREATE TABLE table22222222221111123222222222333;", zk_request2->data); + cleanDirectory(log_dir); } TEST(RaftLog, getEntry) @@ -375,7 +374,7 @@ TEST(RaftLog, getEntry) String log_dir(LOG_DIR + "/7"); cleanDirectory(log_dir); auto log_store = LogSegmentStore::getInstance(log_dir, true); - ASSERT_NO_THROW(log_store->init(100, 3)); + ASSERT_NO_THROW(log_store->init(100)); UInt64 term = 1; String key("/ck/table/table1"); String data("CREATE TABLE table1;"); @@ -405,7 +404,7 @@ TEST(RaftLog, getEntries) String log_dir(LOG_DIR + "/8"); cleanDirectory(log_dir); auto log_store = LogSegmentStore::getInstance(log_dir, true); - ASSERT_NO_THROW(log_store->init(250, 3)); //69 * 4 = 276 + ASSERT_NO_THROW(log_store->init(250)); //69 * 4 = 276 for (int i = 0; i < 8; i++) { UInt64 term = 1; From 1af0eb25d0988ca09ca3dd6bbd51f1a80f8831b4 Mon Sep 17 00:00:00 2001 From: JackyWoo Date: Fri, 9 Aug 2024 16:40:35 +0800 Subject: [PATCH 3/4] Refactor log serialize and deserialize logic --- src/Service/KeeperServer.cpp | 2 +- src/Service/KeeperUtils.cpp | 60 ++++++++++++++++--- src/Service/KeeperUtils.h | 7 ++- src/Service/LogEntry.cpp | 8 +-- src/Service/LogEntry.h | 2 +- src/Service/NuRaftLogSegment.cpp | 28 ++++----- src/Service/NuRaftLogSegment.h | 2 +- src/Service/NuRaftStateMachine.cpp | 53 +--------------- src/Service/NuRaftStateMachine.h | 5 -- src/Service/tests/gtest_raft_log.cpp | 2 +- src/Service/tests/gtest_raft_snapshot.cpp | 8 +-- .../tests/gtest_raft_state_machine.cpp | 4 +- src/Service/tests/raft_test_common.cpp | 9 +-- 13 files changed, 89 insertions(+), 101 deletions(-) diff --git a/src/Service/KeeperServer.cpp b/src/Service/KeeperServer.cpp index 07934e18cb3..91da5fe2146 100644 --- a/src/Service/KeeperServer.cpp +++ b/src/Service/KeeperServer.cpp @@ -138,7 +138,7 @@ ptr>> KeeperServer::pushRequestBatch(const std::v for (const auto & request_session : request_batch) { LOG_TRACE(log, "Push request {}", request_session.toSimpleString()); - entries.push_back(getZooKeeperLogEntry(request_session.session_id, request_session.create_time, request_session.request)); + entries.push_back(serializeKeeperRequest(request_session)); } /// append_entries write request ptr>> result = raft_instance->append_entries(entries); diff --git a/src/Service/KeeperUtils.cpp b/src/Service/KeeperUtils.cpp index f5e750b06be..d79e2856be5 100644 --- a/src/Service/KeeperUtils.cpp +++ b/src/Service/KeeperUtils.cpp @@ -1,11 +1,14 @@ +#include + #include -#include #include +#include #include #include -#include +#include +#include #include #include #include @@ -34,15 +37,56 @@ String checkAndGetSuperdigest(const String & user_and_digest) return user_and_digest; } -nuraft::ptr getZooKeeperLogEntry(int64_t session_id, int64_t time, const Coordination::ZooKeeperRequestPtr & request) +ptr serializeKeeperRequest(const RequestForSession & session_request) { - RK::WriteBufferFromNuraftBuffer buf; - RK::writeIntBinary(session_id, buf); - request->write(buf); - Coordination::write(time, buf); - return buf.getBuffer(); + WriteBufferFromNuraftBuffer out; + /// TODO unify digital encoding mode, see deserializeKeeperRequest + writeIntBinary(session_request.session_id, out); + session_request.request->write(out); + Coordination::write(session_request.create_time, out); + return out.getBuffer(); } +RequestForSession deserializeKeeperRequest(nuraft::buffer & data) +{ + ReadBufferFromNuRaftBuffer buffer(data); + RequestForSession request_for_session; + /// TODO unify digital encoding mode + readIntBinary(request_for_session.session_id, buffer); + + int32_t length; + Coordination::read(length, buffer); + + int32_t xid; + Coordination::read(xid, buffer); + + Coordination::OpNum opnum; + Coordination::read(opnum, buffer); + + // bool is_internal; + // Coordination::read(is_internal, buffer); + + request_for_session.request = Coordination::ZooKeeperRequestFactory::instance().get(opnum); + request_for_session.request->xid = xid; + request_for_session.request->readImpl(buffer); + + if (!buffer.eof()) + Coordination::read(request_for_session.create_time, buffer); + else /// backward compatibility + request_for_session.create_time + = std::chrono::duration_cast(std::chrono::system_clock::now().time_since_epoch()).count(); + + auto * log = &(Poco::Logger::get("NuRaftStateMachine")); + LOG_TRACE( + log, + "Parsed request session id {}, length {}, xid {}, opnum {}", + toHexString(request_for_session.session_id), + length, + xid, + Coordination::toString(opnum)); + + return request_for_session; +} ptr makeClone(const ptr & entry) { diff --git a/src/Service/KeeperUtils.h b/src/Service/KeeperUtils.h index 5940bdea874..3632b995aa6 100644 --- a/src/Service/KeeperUtils.h +++ b/src/Service/KeeperUtils.h @@ -8,13 +8,16 @@ #include #include #include +#include namespace RK { -/// Serialize ZooKeeper request to log -nuraft::ptr getZooKeeperLogEntry(int64_t session_id, int64_t time, const Coordination::ZooKeeperRequestPtr & request); +/// Serialize and deserialize ZooKeeper request to log +nuraft::ptr serializeKeeperRequest(const RequestForSession & request); +RequestForSession deserializeKeeperRequest(nuraft::buffer & data); + nuraft::ptr makeClone(const nuraft::ptr & entry); /// Parent of a path, for example: got '/a/b' from '/a/b/c' diff --git a/src/Service/LogEntry.cpp b/src/Service/LogEntry.cpp index d3d16a3c53d..a4e15f757af 100644 --- a/src/Service/LogEntry.cpp +++ b/src/Service/LogEntry.cpp @@ -22,11 +22,11 @@ ptr LogEntryBody::serialize(ptr & entry) return entry_buf; } -ptr LogEntryBody::parse(const char * entry_str, size_t buf_size) +ptr LogEntryBody::deserialize(ptr serialized_entry) { - nuraft::log_val_type type = static_cast(entry_str[0]); - auto data = buffer::alloc(buf_size - 1); - data->put_raw(reinterpret_cast(entry_str + 1), buf_size - 1); + nuraft::log_val_type type = static_cast(*serialized_entry->data_begin()); + auto data = buffer::alloc(serialized_entry->size() - 1); + data->put_raw(serialized_entry->data_begin() + 1, serialized_entry->size() - 1); data->pos(0); return cs_new(0, data, type); /// term is set latter } diff --git a/src/Service/LogEntry.h b/src/Service/LogEntry.h index b1a3a0417f7..5c60bb50e1a 100644 --- a/src/Service/LogEntry.h +++ b/src/Service/LogEntry.h @@ -38,7 +38,7 @@ class LogEntryBody { public: static ptr serialize(ptr & entry); - static ptr parse(const char * entry_str, size_t buf_size); + static ptr deserialize(ptr serialized_entry); }; } diff --git a/src/Service/NuRaftLogSegment.cpp b/src/Service/NuRaftLogSegment.cpp index 2877bc660a9..5e26583b0f9 100644 --- a/src/Service/NuRaftLogSegment.cpp +++ b/src/Service/NuRaftLogSegment.cpp @@ -399,10 +399,10 @@ UInt64 NuRaftLogSegment::appendEntry(ptr entry, std::atomic & } -NuRaftLogSegment::LogMeta NuRaftLogSegment::getMeta(UInt64 index) const +ptr NuRaftLogSegment::getMeta(UInt64 index) const { if (last_index == first_index - 1 || index > last_index.load(std::memory_order_relaxed) || index < first_index) - throw Exception(ErrorCodes::LOGICAL_ERROR, "Get meta for log {} failed, index out of range [{},{}].", index, first_index, last_index.load(std::memory_order_relaxed)); + return nullptr; UInt64 meta_index = index - first_index; UInt64 file_offset = offset_term[meta_index].first; @@ -414,10 +414,10 @@ NuRaftLogSegment::LogMeta NuRaftLogSegment::getMeta(UInt64 index) const else next_offset = file_size; - LogMeta meta; - meta.offset = file_offset; - meta.term = offset_term[meta_index].second; - meta.length = next_offset - file_offset; + ptr meta = cs_new(); + meta->offset = file_offset; + meta->term = offset_term[meta_index].second; + meta->length = next_offset - file_offset; return meta; } @@ -449,22 +449,18 @@ ptr NuRaftLogSegment::loadEntry(const LogMeta & meta) const { LogEntryHeader header = loadEntryHeader(meta.offset); - char * entry_str = new char[header.data_length]; - ssize_t size_read = pread(seg_fd, entry_str, header.data_length, meta.offset + LogEntryHeader::HEADER_SIZE); + ptr buf = buffer::alloc(header.data_length); + ssize_t size_read = pread(seg_fd, buf->data_begin(), header.data_length, meta.offset + LogEntryHeader::HEADER_SIZE); if (size_read != header.data_length) throwFromErrno(ErrorCodes::CORRUPTED_LOG, "Fail to read log entry with offset {} from log segment {}", meta.offset, file_name); - String s(entry_str, header.data_length); - LOG_INFO(log, "get: {}" + s); - - if (!verifyCRC32(entry_str, header.data_length, header.data_crc)) + if (!verifyCRC32(reinterpret_cast(buf->data_begin()), header.data_length, header.data_crc)) throw Exception(ErrorCodes::CORRUPTED_LOG, "Checking CRC32 failed for log segment {}.", file_name); - auto entry = LogEntryBody::parse(entry_str, header.data_length); + auto entry = LogEntryBody::deserialize(buf); entry->set_term(header.term); - delete[] entry_str; return entry; } @@ -476,8 +472,8 @@ ptr NuRaftLogSegment::getEntry(UInt64 index) } std::shared_lock read_lock(log_mutex); - LogMeta meta = getMeta(index); - return loadEntry(meta); + auto meta = getMeta(index); + return meta == nullptr ? nullptr : loadEntry(*meta); } bool NuRaftLogSegment::truncate(const UInt64 last_index_kept) diff --git a/src/Service/NuRaftLogSegment.h b/src/Service/NuRaftLogSegment.h index 67bd2689205..a79a50f1151 100644 --- a/src/Service/NuRaftLogSegment.h +++ b/src/Service/NuRaftLogSegment.h @@ -133,7 +133,7 @@ class NuRaftLogSegment void closeFileIfNeeded(); /// get log entry meta - LogMeta getMeta(UInt64 index) const; + ptr getMeta(UInt64 index) const; /// load log entry ptr loadEntry(const LogMeta & meta) const; diff --git a/src/Service/NuRaftStateMachine.cpp b/src/Service/NuRaftStateMachine.cpp index f086048dec3..29ef99ee014 100644 --- a/src/Service/NuRaftStateMachine.cpp +++ b/src/Service/NuRaftStateMachine.cpp @@ -188,57 +188,6 @@ void NuRaftStateMachine::snapThread() } } -RequestForSession NuRaftStateMachine::parseRequest(nuraft::buffer & data) -{ - ReadBufferFromNuRaftBuffer buffer(data); - RequestForSession request_for_session; - /// TODO unify digital encoding mode - readIntBinary(request_for_session.session_id, buffer); - - int32_t length; - Coordination::read(length, buffer); - - int32_t xid; - Coordination::read(xid, buffer); - - Coordination::OpNum opnum; - Coordination::read(opnum, buffer); - - // bool is_internal; - // Coordination::read(is_internal, buffer); - - request_for_session.request = Coordination::ZooKeeperRequestFactory::instance().get(opnum); - request_for_session.request->xid = xid; - request_for_session.request->readImpl(buffer); - - if (!buffer.eof()) - Coordination::read(request_for_session.create_time, buffer); - else /// backward compatibility - request_for_session.create_time - = std::chrono::duration_cast(std::chrono::system_clock::now().time_since_epoch()).count(); - - auto * log = &(Poco::Logger::get("NuRaftStateMachine")); - LOG_TRACE( - log, - "Parsed request session id {}, length {}, xid {}, opnum {}", - toHexString(request_for_session.session_id), - length, - xid, - Coordination::toString(opnum)); - - return request_for_session; -} - -ptr NuRaftStateMachine::serializeRequest(RequestForSession & session_request) -{ - WriteBufferFromNuraftBuffer out; - /// TODO unify digital encoding mode, see parseRequest - writeIntBinary(session_request.session_id, out); - session_request.request->write(out); - Coordination::write(session_request.create_time, out); - return out.getBuffer(); -} - ptr NuRaftStateMachine::pre_commit(const ulong log_idx, buffer & data) { LOG_TRACE(log, "pre commit, log index {}, data size {}", log_idx, data.size()); @@ -316,7 +265,7 @@ nuraft::ptr NuRaftStateMachine::commit(const ulong log_idx, nura } else { - auto request_for_session = parseRequest(data); + auto request_for_session = deserializeKeeperRequest(data); ResponsesForSessions responses_for_sessions; LOG_TRACE(log, "Commit log index {}, request {}", log_idx, request_for_session.toSimpleString()); diff --git a/src/Service/NuRaftStateMachine.h b/src/Service/NuRaftStateMachine.h index b6ddb9bdbc7..ce5676d20ec 100644 --- a/src/Service/NuRaftStateMachine.h +++ b/src/Service/NuRaftStateMachine.h @@ -277,11 +277,6 @@ class NuRaftStateMachine : public nuraft::state_machine void shutdown(); - /// deserialize a RequestForSession - static RequestForSession parseRequest(nuraft::buffer & data); - /// serialize a RequestForSession - static ptr serializeRequest(RequestForSession & request); - private: /// Clear the whole state machine. /// Used when apply_snapshot. diff --git a/src/Service/tests/gtest_raft_log.cpp b/src/Service/tests/gtest_raft_log.cpp index d97e196e978..544eea2d32b 100644 --- a/src/Service/tests/gtest_raft_log.cpp +++ b/src/Service/tests/gtest_raft_log.cpp @@ -88,7 +88,7 @@ TEST(RaftLog, parseLogEntrybody) ptr log = cs_new(0, data, nuraft::log_val_type::app_log); ptr serialized_log = LogEntryBody::serialize(log); - ptr parsed_log = LogEntryBody::parse(reinterpret_cast(serialized_log->data_begin()), serialized_log->size()); + ptr parsed_log = LogEntryBody::deserialize(serialized_log); ASSERT_EQ(parsed_log->get_val_type(), log->get_val_type()); ASSERT_EQ(parsed_log->get_buf().get_str(), log->get_buf().get_str()); diff --git a/src/Service/tests/gtest_raft_snapshot.cpp b/src/Service/tests/gtest_raft_snapshot.cpp index 2c08209e1da..4b90d0bc68a 100644 --- a/src/Service/tests/gtest_raft_snapshot.cpp +++ b/src/Service/tests/gtest_raft_snapshot.cpp @@ -45,7 +45,7 @@ ptr closeSessionLog(int64_t session_id) request_info.session_id = session_id; int64_t time = std::chrono::system_clock::now().time_since_epoch() / std::chrono::milliseconds(1); request_info.create_time = time; - ptr buf = NuRaftStateMachine::serializeRequest(request_info); + ptr buf = serializeKeeperRequest(request_info); return buf; } @@ -72,7 +72,7 @@ ptr createLog(int64_t session_id, const String & key, const String & dat int64_t time = std::chrono::system_clock::now().time_since_epoch() / std::chrono::milliseconds(1); session_request->create_time = time; - ptr buf = NuRaftStateMachine::serializeRequest(*session_request); + ptr buf = serializeKeeperRequest(*session_request); return buf; } @@ -97,7 +97,7 @@ ptr setLog(int64_t session_id, const String & key, const String value, c int64_t time = std::chrono::system_clock::now().time_since_epoch() / std::chrono::milliseconds(1); session_request->create_time = time; - ptr buf = NuRaftStateMachine::serializeRequest(*session_request); + ptr buf = serializeKeeperRequest(*session_request); return buf; } @@ -120,7 +120,7 @@ ptr removeLog(int64_t session_id, const String & key) int64_t time = std::chrono::system_clock::now().time_since_epoch() / std::chrono::milliseconds(1); session_request->create_time = time; - ptr buf = NuRaftStateMachine::serializeRequest(*session_request); + ptr buf = serializeKeeperRequest(*session_request); return buf; } diff --git a/src/Service/tests/gtest_raft_state_machine.cpp b/src/Service/tests/gtest_raft_state_machine.cpp index bdfc65127b5..248cc51f280 100644 --- a/src/Service/tests/gtest_raft_state_machine.cpp +++ b/src/Service/tests/gtest_raft_state_machine.cpp @@ -41,8 +41,8 @@ TEST(RaftStateMachine, serializeAndParse) using namespace std::chrono; session_request.create_time = duration_cast(system_clock::now().time_since_epoch()).count(); - ptr buf = NuRaftStateMachine::serializeRequest(session_request); - RequestForSession session_request_2 = NuRaftStateMachine::parseRequest(*(buf.get())); + ptr buf = serializeKeeperRequest(session_request); + RequestForSession session_request_2 = deserializeKeeperRequest(*(buf.get())); if (session_request_2.request->getOpNum() == OpNum::Create) { ZooKeeperCreateRequest * request_2 = static_cast(session_request_2.request.get()); diff --git a/src/Service/tests/raft_test_common.cpp b/src/Service/tests/raft_test_common.cpp index 35a90e48459..ba1021f94c4 100644 --- a/src/Service/tests/raft_test_common.cpp +++ b/src/Service/tests/raft_test_common.cpp @@ -84,7 +84,8 @@ ptr createLogEntry(UInt64 term, const String & key, const String & da auto zk_create_request = std::make_shared(); zk_create_request->path = key; zk_create_request->data = data; - auto serialized_request = getZooKeeperLogEntry(1, 1, zk_create_request); + RequestForSession request_with_session(zk_create_request, 1, 1); + auto serialized_request = serializeKeeperRequest(request_with_session); return std::make_shared(term, serialized_request); } @@ -160,7 +161,7 @@ void createZNodeLog(NuRaftStateMachine & machine, const String & key, const Stri using namespace std::chrono; session_request.create_time = duration_cast(system_clock::now().time_since_epoch()).count(); - ptr buf = NuRaftStateMachine::serializeRequest(session_request); + ptr buf = serializeKeeperRequest(session_request); //LOG_INFO(log, "index {}", index); if (store != nullptr) { @@ -199,7 +200,7 @@ void setZNode(NuRaftStateMachine & machine, const String & key, const String & d using namespace std::chrono; session_request.create_time = duration_cast(system_clock::now().time_since_epoch()).count(); - ptr buf = NuRaftStateMachine::serializeRequest(session_request); + ptr buf = serializeKeeperRequest(session_request); machine.commit(index, *(buf.get())); } @@ -222,7 +223,7 @@ void removeZNode(NuRaftStateMachine & machine, const String & key) using namespace std::chrono; session_request.create_time = duration_cast(system_clock::now().time_since_epoch()).count(); - ptr buf = NuRaftStateMachine::serializeRequest(session_request); + ptr buf = serializeKeeperRequest(session_request); machine.commit(index, *(buf.get())); } From d7f356ffb0e2a1561d9e3f34b98108d6b2f4ffdd Mon Sep 17 00:00:00 2001 From: JackyWoo Date: Mon, 19 Aug 2024 14:45:10 +0800 Subject: [PATCH 4/4] Add crc info when cloning log entry --- src/Service/KeeperCommon.h | 3 ++- src/Service/KeeperServer.cpp | 2 +- src/Service/KeeperUtils.cpp | 36 ++++++++++++--------------- src/Service/KeeperUtils.h | 4 +-- src/Service/LogEntry.cpp | 4 +-- src/Service/LogEntry.h | 4 +-- src/Service/NuRaftFileLogStore.cpp | 39 +++++++++--------------------- src/Service/NuRaftFileLogStore.h | 4 +-- src/Service/NuRaftLogSegment.cpp | 6 ++--- src/Service/NuRaftLogSegment.h | 6 ++--- 10 files changed, 43 insertions(+), 65 deletions(-) diff --git a/src/Service/KeeperCommon.h b/src/Service/KeeperCommon.h index 543d0da5cd2..9a0fbf4514a 100644 --- a/src/Service/KeeperCommon.h +++ b/src/Service/KeeperCommon.h @@ -31,7 +31,7 @@ using ThreadPoolPtr = std::shared_ptr; struct RequestId; -/// Attached session id to request +/// Attached session id and forwarding info to request struct RequestForSession { int64_t session_id; @@ -107,4 +107,5 @@ bool isSessionRequest(Coordination::OpNum opnum); bool isSessionRequest(const Coordination::ZooKeeperRequestPtr & request); bool isNewSessionRequest(Coordination::OpNum opnum); + } diff --git a/src/Service/KeeperServer.cpp b/src/Service/KeeperServer.cpp index 91da5fe2146..6d9fd59e2ce 100644 --- a/src/Service/KeeperServer.cpp +++ b/src/Service/KeeperServer.cpp @@ -44,7 +44,7 @@ KeeperServer::KeeperServer( new_session_id_callback_mutex, new_session_id_callback, state_manager->load_log_store(), - checkAndGetSuperdigest(settings->super_digest), + checkAndGetSuperDigest(settings->super_digest), MAX_OBJECT_NODE_SIZE, request_processor_); diff --git a/src/Service/KeeperUtils.cpp b/src/Service/KeeperUtils.cpp index d79e2856be5..3de5e3c4757 100644 --- a/src/Service/KeeperUtils.cpp +++ b/src/Service/KeeperUtils.cpp @@ -23,7 +23,7 @@ namespace ErrorCodes extern const int INVALID_CONFIG_PARAMETER; } -String checkAndGetSuperdigest(const String & user_and_digest) +String checkAndGetSuperDigest(const String & user_and_digest) { if (user_and_digest.empty()) return ""; @@ -32,18 +32,17 @@ String checkAndGetSuperdigest(const String & user_and_digest) boost::split(scheme_and_id, user_and_digest, [](char c) { return c == ':'; }); if (scheme_and_id.size() != 2 || scheme_and_id[0] != "super") throw Exception( - ErrorCodes::INVALID_CONFIG_PARAMETER, "Incorrect superdigest in keeper_server config. Must be 'super:base64string'"); + ErrorCodes::INVALID_CONFIG_PARAMETER, "Incorrect super digest in keeper_server config. Must be 'super:base64string'"); return user_and_digest; } -ptr serializeKeeperRequest(const RequestForSession & session_request) +ptr serializeKeeperRequest(const RequestForSession & request) { WriteBufferFromNuraftBuffer out; - /// TODO unify digital encoding mode, see deserializeKeeperRequest - writeIntBinary(session_request.session_id, out); - session_request.request->write(out); - Coordination::write(session_request.create_time, out); + writeIntBinary(request.session_id, out); + request.request->write(out); + Coordination::write(request.create_time, out); return out.getBuffer(); } @@ -51,7 +50,6 @@ RequestForSession deserializeKeeperRequest(nuraft::buffer & data) { ReadBufferFromNuRaftBuffer buffer(data); RequestForSession request_for_session; - /// TODO unify digital encoding mode readIntBinary(request_for_session.session_id, buffer); int32_t length; @@ -76,22 +74,20 @@ RequestForSession deserializeKeeperRequest(nuraft::buffer & data) request_for_session.create_time = std::chrono::duration_cast(std::chrono::system_clock::now().time_since_epoch()).count(); - auto * log = &(Poco::Logger::get("NuRaftStateMachine")); - LOG_TRACE( - log, - "Parsed request session id {}, length {}, xid {}, opnum {}", - toHexString(request_for_session.session_id), - length, - xid, - Coordination::toString(opnum)); - return request_for_session; } -ptr makeClone(const ptr & entry) +ptr cloneLogEntry(const ptr & entry) { - ptr clone = cs_new(entry->get_term(), buffer::clone(entry->get_buf()), entry->get_val_type()); - return clone; + ptr cloned = cs_new( + entry->get_term(), + buffer::clone(entry->get_buf()), + entry->get_val_type(), + entry->get_timestamp(), + entry->has_crc32(), + entry->get_crc32(), + false); + return cloned; } String getBaseName(const String & path) diff --git a/src/Service/KeeperUtils.h b/src/Service/KeeperUtils.h index 3632b995aa6..cc837d9cf6a 100644 --- a/src/Service/KeeperUtils.h +++ b/src/Service/KeeperUtils.h @@ -18,7 +18,7 @@ namespace RK nuraft::ptr serializeKeeperRequest(const RequestForSession & request); RequestForSession deserializeKeeperRequest(nuraft::buffer & data); -nuraft::ptr makeClone(const nuraft::ptr & entry); +nuraft::ptr cloneLogEntry(const nuraft::ptr & entry); /// Parent of a path, for example: got '/a/b' from '/a/b/c' String getParentPath(const String & path); @@ -28,7 +28,7 @@ String getBaseName(const String & path); String base64Encode(const String & decoded); String getSHA1(const String & userdata); String generateDigest(const String & userdata); -String checkAndGetSuperdigest(const String & user_and_digest); +String checkAndGetSuperDigest(const String & user_and_digest); inline int readUInt32(nuraft::ptr & fs, UInt32 & x) { diff --git a/src/Service/LogEntry.cpp b/src/Service/LogEntry.cpp index a4e15f757af..510b78cd02f 100644 --- a/src/Service/LogEntry.cpp +++ b/src/Service/LogEntry.cpp @@ -7,7 +7,7 @@ using nuraft::byte; using nuraft::cs_new; /// Add entry type to the log entry -ptr LogEntryBody::serialize(ptr & entry) +ptr LogEntryBody::serialize(const ptr & entry) { ptr entry_buf; ptr data = entry->get_buf_ptr(); @@ -22,7 +22,7 @@ ptr LogEntryBody::serialize(ptr & entry) return entry_buf; } -ptr LogEntryBody::deserialize(ptr serialized_entry) +ptr LogEntryBody::deserialize(const ptr & serialized_entry) { nuraft::log_val_type type = static_cast(*serialized_entry->data_begin()); auto data = buffer::alloc(serialized_entry->size() - 1); diff --git a/src/Service/LogEntry.h b/src/Service/LogEntry.h index 5c60bb50e1a..6d1bf3232ab 100644 --- a/src/Service/LogEntry.h +++ b/src/Service/LogEntry.h @@ -37,8 +37,8 @@ struct LogEntryHeader class LogEntryBody { public: - static ptr serialize(ptr & entry); - static ptr deserialize(ptr serialized_entry); + static ptr serialize(const ptr & entry); + static ptr deserialize(const ptr & serialized_entry); }; } diff --git a/src/Service/NuRaftFileLogStore.cpp b/src/Service/NuRaftFileLogStore.cpp index e1e947e6702..06b565dbe41 100644 --- a/src/Service/NuRaftFileLogStore.cpp +++ b/src/Service/NuRaftFileLogStore.cpp @@ -23,7 +23,7 @@ ptr LogEntryQueue::getEntry(const UInt64 & index) return nullptr; } -void LogEntryQueue::putEntry(UInt64 & index, ptr & entry) +void LogEntryQueue::putEntry(UInt64 & index, const ptr & entry) { LOG_TRACE(log, "put entry {}, index {}, batch {}", index, index & (MAX_VECTOR_SIZE - 1), batch_index); std::lock_guard write_lock(queue_mutex); @@ -32,19 +32,6 @@ void LogEntryQueue::putEntry(UInt64 & index, ptr & entry) max_index = std::max(max_index, index); } -[[maybe_unused]] void LogEntryQueue::putEntryOrClear(UInt64 & index, ptr & entry) -{ - std::lock_guard write_lock(queue_mutex); - if (index >> BIT_SIZE == batch_index || index >> BIT_SIZE == batch_index - 1) - { - entry_vec[index & (MAX_VECTOR_SIZE - 1)] = entry; - max_index = index; - return; - } - /// next cycle - clear(); -} - void LogEntryQueue::clear() { LOG_INFO(log, "clear log queue."); @@ -137,18 +124,17 @@ ulong NuRaftFileLogStore::start_index() const ptr NuRaftFileLogStore::last_entry() const { if (last_log_entry) - return makeClone(last_log_entry); - else - return nullptr; + return cloneLogEntry(last_log_entry); + return cs_new(0, nuraft::buffer::alloc(0)); } ulong NuRaftFileLogStore::append(ptr & entry) { - ptr clone = makeClone(entry); + ptr cloned = cloneLogEntry(entry); UInt64 log_index = segment_store->appendEntry(entry); - log_queue.putEntry(log_index, clone); + log_queue.putEntry(log_index, cloned); - last_log_entry = clone; + last_log_entry = cloned; if (log_fsync_mode == FsyncMode::FSYNC_PARALLEL && entry->get_val_type() != log_val_type::app_log) parallel_fsync_event->set(); @@ -209,7 +195,6 @@ ptr>> NuRaftFileLogStore::log_entries_ext(ulong start ptr>> ret = cs_new>>(); int64 get_size = 0; - int64 entry_size; for (auto i = start; i < end; i++) { @@ -217,7 +202,7 @@ ptr>> NuRaftFileLogStore::log_entries_ext(ulong start if (!entry) return nullptr; - entry_size = entry->get_buf().size() + sizeof(ulong) + sizeof(char); + int64_t entry_size = entry->get_buf().size() + sizeof(ulong) + sizeof(char); if (batch_size_hint_in_bytes > 0 && get_size + entry_size > batch_size_hint_in_bytes) break; @@ -234,7 +219,6 @@ ptr> NuRaftFileLogStore::log_entries_version_ex ptr> ret = cs_new>(); int64 get_size = 0; - int64 entry_size; for (auto i = start; i < end; i++) { @@ -242,7 +226,7 @@ ptr> NuRaftFileLogStore::log_entries_version_ex if (!entry) return nullptr; - entry_size = entry->get_buf().size() + sizeof(ulong) + sizeof(char); + int64 entry_size = entry->get_buf().size() + sizeof(ulong) + sizeof(char); if (batch_size_hint_in_bytes > 0 && get_size + entry_size > batch_size_hint_in_bytes) break; @@ -256,7 +240,7 @@ ptr> NuRaftFileLogStore::log_entries_version_ex ptr NuRaftFileLogStore::entry_at(ulong index) { - ptr res = log_queue.getEntry(index); + auto res = log_queue.getEntry(index); if (res) { LOG_TRACE(log, "Get log {} from queue", index); @@ -266,15 +250,14 @@ ptr NuRaftFileLogStore::entry_at(ulong index) LOG_TRACE(log, "Get log {} from disk", index); res = segment_store->getEntry(index); } - return res ? makeClone(res) : nullptr; + return res ? cloneLogEntry(res) : nullptr; } ulong NuRaftFileLogStore::term_at(ulong index) { if (entry_at(index)) return entry_at(index)->get_term(); - else - return 0; + return 0; } ptr NuRaftFileLogStore::pack(ulong index, int32 cnt) diff --git a/src/Service/NuRaftFileLogStore.h b/src/Service/NuRaftFileLogStore.h index 668452ba8cd..739ef46824b 100644 --- a/src/Service/NuRaftFileLogStore.h +++ b/src/Service/NuRaftFileLogStore.h @@ -26,9 +26,7 @@ class LogEntryQueue ptr getEntry(const UInt64 & index); /// put log into the queue - void putEntry(UInt64 & index, ptr & entry); - - [[maybe_unused]] void putEntryOrClear(UInt64 & index, ptr & entry); + void putEntry(UInt64 & index, const ptr & entry); /// clean all log void clear(); diff --git a/src/Service/NuRaftLogSegment.cpp b/src/Service/NuRaftLogSegment.cpp index 5e26583b0f9..62e49c35405 100644 --- a/src/Service/NuRaftLogSegment.cpp +++ b/src/Service/NuRaftLogSegment.cpp @@ -340,7 +340,7 @@ void NuRaftLogSegment::remove() f.remove(); } -UInt64 NuRaftLogSegment::appendEntry(ptr entry, std::atomic & last_log_index) +UInt64 NuRaftLogSegment::appendEntry(const ptr & entry, std::atomic & last_log_index) { LogEntryHeader header; struct iovec vec[2]; @@ -663,14 +663,14 @@ LogVersion LogSegmentStore::getVersion(UInt64 index) return seg->getVersion(); } -UInt64 LogSegmentStore::appendEntry(ptr entry) +UInt64 LogSegmentStore::appendEntry(const ptr & entry) { openNewSegmentIfNeeded(); std::shared_lock read_lock(seg_mutex); return open_segment->appendEntry(entry, last_log_index); } -UInt64 LogSegmentStore::writeAt(UInt64 index, ptr entry) +UInt64 LogSegmentStore::writeAt(UInt64 index, const ptr & entry) { truncateLog(index - 1); if (index == lastLogIndex() + 1) diff --git a/src/Service/NuRaftLogSegment.h b/src/Service/NuRaftLogSegment.h index a79a50f1151..0b393dc5c49 100644 --- a/src/Service/NuRaftLogSegment.h +++ b/src/Service/NuRaftLogSegment.h @@ -67,7 +67,7 @@ class NuRaftLogSegment LogVersion getVersion() const { return version; } /// serialize entry, and append to open segment, return appended log index - UInt64 appendEntry(ptr entry, std::atomic & last_log_index); + UInt64 appendEntry(const ptr & entry, std::atomic & last_log_index); /// get entry by index, return null if not exist. ptr getEntry(UInt64 index); @@ -219,10 +219,10 @@ class LogSegmentStore UInt64 lastLogIndex() { return last_log_index.load(std::memory_order_acquire); } /// Append entry to log store - UInt64 appendEntry(ptr entry); + UInt64 appendEntry(const ptr & entry); /// First truncate log whose index is large than or equals with index of entry, then append it. - UInt64 writeAt(UInt64 index, ptr entry); + UInt64 writeAt(UInt64 index, const ptr & entry); ptr getEntry(UInt64 index); /// Just for test, collection entries in [start_index, end_index]