diff --git a/src/Service/KeeperDispatcher.cpp b/src/Service/KeeperDispatcher.cpp index d7ba240ae0..7ef3bb4418 100644 --- a/src/Service/KeeperDispatcher.cpp +++ b/src/Service/KeeperDispatcher.cpp @@ -375,12 +375,6 @@ void KeeperDispatcher::unRegisterSessionResponseCallbackWithoutLock(int64_t id) session_response_callbacks.erase(it); } -[[maybe_unused]] void KeeperDispatcher::registerUserResponseCallBack(int64_t session_id, ZooKeeperResponseCallback callback, bool is_reconnected) -{ - std::unique_lock write_lock(response_callbacks_mutex); - registerUserResponseCallBackWithoutLock(session_id, callback, is_reconnected); -} - void KeeperDispatcher::registerUserResponseCallBackWithoutLock(int64_t session_id, ZooKeeperResponseCallback callback, bool is_reconnected) { if (session_id == 0) diff --git a/src/Service/KeeperDispatcher.h b/src/Service/KeeperDispatcher.h index 49650799c9..23de4656ba 100644 --- a/src/Service/KeeperDispatcher.h +++ b/src/Service/KeeperDispatcher.h @@ -129,7 +129,6 @@ class KeeperDispatcher : public std::enable_shared_from_this void unRegisterForwarderResponseCallBack(ForwardClientId client_id); /// Register response callback for user request - [[maybe_unused]] void registerUserResponseCallBack(int64_t session_id, ZooKeeperResponseCallback callback, bool is_reconnected = false); void registerUserResponseCallBackWithoutLock(int64_t session_id, ZooKeeperResponseCallback callback, bool is_reconnected = false); void unregisterUserResponseCallBack(int64_t session_id); void unregisterUserResponseCallBackWithoutLock(int64_t session_id); diff --git a/src/Service/KeeperServer.cpp b/src/Service/KeeperServer.cpp index 6d9fd59e2c..2201901702 100644 --- a/src/Service/KeeperServer.cpp +++ b/src/Service/KeeperServer.cpp @@ -150,20 +150,6 @@ void KeeperServer::handleRemoteSession(int64_t session_id, int64_t expiration_ti state_machine->getStore().handleRemoteSession(session_id, expiration_time); } -[[maybe_unused]] int64_t KeeperServer::getSessionTimeout(int64_t session_id) -{ - LOG_DEBUG(log, "New session timeout for {}", session_id); - if (state_machine->getStore().containsSession(session_id)) - { - return state_machine->getStore().getSessionAndTimeOut().find(session_id)->second; - } - else - { - LOG_WARNING(log, "Not found session timeout for {}", session_id); - return -1; - } -} - bool KeeperServer::isLeader() const { return raft_instance->is_leader(); diff --git a/src/Service/KeeperServer.h b/src/Service/KeeperServer.h index c87ffac676..157e777038 100644 --- a/src/Service/KeeperServer.h +++ b/src/Service/KeeperServer.h @@ -40,10 +40,6 @@ class KeeperServer /// it will update the snapshot itself. void handleRemoteSession(int64_t session_id, int64_t expiration_time); - /// Get the initialized timeout for a session. - /// Return initialized timeout, or -1 if session not exist. - [[maybe_unused]] int64_t getSessionTimeout(int64_t session_id); - /// will invoke waitInit void startup(); diff --git a/src/Service/NuRaftFileLogStore.cpp b/src/Service/NuRaftFileLogStore.cpp index 06b565dbe4..c83383883a 100644 --- a/src/Service/NuRaftFileLogStore.cpp +++ b/src/Service/NuRaftFileLogStore.cpp @@ -144,12 +144,12 @@ ulong NuRaftFileLogStore::append(ptr & entry) void NuRaftFileLogStore::write_at(ulong index, ptr & entry) { - if (segment_store->writeAt(index, entry) == index) - log_queue.clear(); + segment_store->writeAt(index, entry); + log_queue.clear(); last_log_entry = entry; - /// notify parallel fsync thread + /// log store file fsync if (log_fsync_mode == FsyncMode::FSYNC_PARALLEL && entry->get_val_type() != log_val_type::app_log) parallel_fsync_event->set(); @@ -255,8 +255,8 @@ ptr NuRaftFileLogStore::entry_at(ulong index) ulong NuRaftFileLogStore::term_at(ulong index) { - if (entry_at(index)) - return entry_at(index)->get_term(); + if (auto entry = entry_at(index)) + return entry->get_term(); return 0; } diff --git a/src/Service/NuRaftLogSegment.cpp b/src/Service/NuRaftLogSegment.cpp index a0940e3f69..f5a7011c89 100644 --- a/src/Service/NuRaftLogSegment.cpp +++ b/src/Service/NuRaftLogSegment.cpp @@ -1,11 +1,11 @@ #include +#include #include #include #include #include #include -#include #include @@ -33,16 +33,6 @@ namespace ErrorCodes using namespace nuraft; -[[maybe_unused]] int ftruncateUninterrupted(int fd, off_t length) -{ - int rc; - do - { - rc = ftruncate(fd, length); - } while (rc == -1 && errno == EINTR); - return rc; -} - bool compareSegment(const ptr & lhs, const ptr & rhs) { return lhs->firstIndex() < rhs->firstIndex(); @@ -661,14 +651,13 @@ UInt64 LogSegmentStore::appendEntry(const ptr & entry) return open_segment->appendEntry(entry, last_log_index); } -UInt64 LogSegmentStore::writeAt(UInt64 index, const ptr & entry) +void LogSegmentStore::writeAt(UInt64 index, const ptr & entry) { truncateLog(index - 1); if (index == lastLogIndex() + 1) - return appendEntry(entry); - - LOG_WARNING(log, "writeAt log index {} failed, firstLogIndex {}, lastLogIndex {}.", index, firstLogIndex(), lastLogIndex()); - return -1; + appendEntry(entry); + else + throw Exception(ErrorCodes::LOGICAL_ERROR, "Fail to write log at {}, log store index range [{}, {}].", index, firstLogIndex(), lastLogIndex()); } ptr LogSegmentStore::getEntry(UInt64 index) const @@ -680,18 +669,15 @@ ptr LogSegmentStore::getEntry(UInt64 index) const return seg->getEntry(index); } -void LogSegmentStore::getEntries(UInt64 start_index, UInt64 end_index, const ptr>> & entries) +std::vector> LogSegmentStore::getEntries(UInt64 start_index, UInt64 end_index) const { - if (entries == nullptr) - { - LOG_ERROR(log, "Entry vector is nullptr."); - return; - } + std::vector> entries; for (UInt64 index = start_index; index <= end_index; index++) { auto entry_pt = getEntry(index); - entries->push_back(entry_pt); + entries.push_back(entry_pt); } + return entries; } int LogSegmentStore::removeSegment(UInt64 first_index_kept) @@ -813,9 +799,6 @@ bool LogSegmentStore::truncateLog(UInt64 last_index_kept) } } - if (!last_segment) - throw Exception(ErrorCodes::LOGICAL_ERROR, "Not found a segment to truncate, last_index_kept {}.", last_index_kept); - /// remove files for (auto & to_removed : to_removed_segments) { @@ -824,20 +807,23 @@ bool LogSegmentStore::truncateLog(UInt64 last_index_kept) to_removed = nullptr; } - bool is_open_before_truncate = last_segment->isOpen(); - bool removed_something = last_segment->truncate(last_index_kept); + if (last_segment) + { + bool is_open_before_truncate = last_segment->isOpen(); + bool removed_something = last_segment->truncate(last_index_kept); - if (!removed_something && last_segment->lastIndex() != last_index_kept) - throw Exception(ErrorCodes::LOGICAL_ERROR, "Truncate log to last_index_kept {}, but nothing removed from log segment {}.", last_index_kept, last_segment->getFileName()); + if (!removed_something && last_segment->lastIndex() != last_index_kept) + throw Exception(ErrorCodes::LOGICAL_ERROR, "Truncate log to last_index_kept {}, but nothing removed from log segment {}.", last_index_kept, last_segment->getFileName()); - if (!is_open_before_truncate && !last_segment->isOpen()) - throw Exception(ErrorCodes::LOGICAL_ERROR, "Truncate a closed log segment {}, but the truncated log segment is not open.", last_segment->getFileName()); + if (!is_open_before_truncate && !last_segment->isOpen()) + throw Exception(ErrorCodes::LOGICAL_ERROR, "Truncate a closed log segment {}, but the truncated log segment is not open.", last_segment->getFileName()); - if (!is_open_before_truncate) - { - open_segment = last_segment; - if (!closed_segments.empty()) - closed_segments.erase(closed_segments.end() - 1); + if (!is_open_before_truncate) + { + open_segment = last_segment; + if (!closed_segments.empty()) + closed_segments.erase(closed_segments.end() - 1); + } } last_log_index.store(last_index_kept, std::memory_order_release); diff --git a/src/Service/NuRaftLogSegment.h b/src/Service/NuRaftLogSegment.h index 2af81a7332..e9366fa410 100644 --- a/src/Service/NuRaftLogSegment.h +++ b/src/Service/NuRaftLogSegment.h @@ -205,11 +205,11 @@ class LogSegmentStore final UInt64 appendEntry(const ptr & entry); /// First truncate log whose index is larger than or equals with index of entry, then append it. - UInt64 writeAt(UInt64 index, const ptr & entry); + void writeAt(UInt64 index, const ptr & entry); ptr getEntry(UInt64 index) const; - /// Just for test, collection entries in [start_index, end_index] - void getEntries(UInt64 start_index, UInt64 end_index, const ptr>> & entries); + /// Just for test + std::vector> getEntries(UInt64 start_index, UInt64 end_index) const; /// Remove segments from storage's head, logs in [1, first_index_kept) will be discarded, usually invoked when compaction. /// return number of segments removed @@ -219,7 +219,7 @@ class LogSegmentStore final /// Return true if some logs are removed bool truncateLog(UInt64 last_index_kept); - /// get closed segments, only for tests + /// Just for tests Segments getClosedSegments() { std::shared_lock read_lock(seg_mutex); @@ -232,8 +232,10 @@ class LogSegmentStore final private: /// open a new segment, invoked when init void openNewSegmentIfNeeded(); + /// list segments, invoked when init void loadSegmentMetaData(); + /// load listed segments, invoked when init void loadSegments(); diff --git a/src/Service/NuRaftStateMachine.cpp b/src/Service/NuRaftStateMachine.cpp index e0bf13f56a..c44775ff3e 100644 --- a/src/Service/NuRaftStateMachine.cpp +++ b/src/Service/NuRaftStateMachine.cpp @@ -285,11 +285,6 @@ nuraft::ptr NuRaftStateMachine::commit(const ulong log_idx, buff return commit(log_idx, data, false); } -[[maybe_unused]] void NuRaftStateMachine::processReadRequest(const RequestForSession & request_for_session) -{ - store.processRequest(responses_queue, request_for_session); -} - std::vector NuRaftStateMachine::getDeadSessions() { return store.getDeadSessions(); diff --git a/src/Service/NuRaftStateMachine.h b/src/Service/NuRaftStateMachine.h index ce5676d20e..8704834b25 100644 --- a/src/Service/NuRaftStateMachine.h +++ b/src/Service/NuRaftStateMachine.h @@ -232,9 +232,6 @@ class NuRaftStateMachine : public nuraft::state_machine KeeperStore & getStore() { return store; } - /// process read request - [[maybe_unused]] void processReadRequest(const RequestForSession & request_for_session); - /// get expired session std::vector getDeadSessions(); diff --git a/src/Service/SnapshotCommon.cpp b/src/Service/SnapshotCommon.cpp index c0bafdbcc7..5cfb357fbc 100644 --- a/src/Service/SnapshotCommon.cpp +++ b/src/Service/SnapshotCommon.cpp @@ -229,60 +229,6 @@ void serializeAclsV2(const NumToACLMap & acl_map, String path, UInt32 save_batch LOG_INFO(log, "Finish create snapshot acl object, acl size {}, path {}", acl_map.size(), path); } -[[maybe_unused]] size_t serializeEphemeralsV2(KeeperStore::Ephemerals & ephemerals, std::mutex & mutex, String path, UInt32 save_batch_size) -{ - Poco::Logger * log = &(Poco::Logger::get("KeeperSnapshotStore")); - LOG_INFO(log, "Begin create snapshot ephemeral object, node size {}, path {}", ephemerals.size(), path); - - ptr batch; - - std::lock_guard lock(mutex); - - if (ephemerals.empty()) - { - LOG_INFO(log, "Ephemeral nodes size is 0"); - return 0; - } - - auto out = cs_new(path); - uint64_t index = 0; - for (auto & ephemeral_it : ephemerals) - { - /// flush and rebuild batch - if (index % save_batch_size == 0) - { - /// skip flush the first batch - if (index != 0) - { - /// write data in batch to file - saveBatchV2(out, batch); - } - batch = cs_new(); - batch->type = SnapshotBatchType::SNAPSHOT_TYPE_DATA_EPHEMERAL; - } - - /// append to batch - WriteBufferFromNuraftBuffer buf; - Coordination::write(ephemeral_it.first, buf); - Coordination::write(ephemeral_it.second.size(), buf); - - for (const auto & node_path : ephemeral_it.second) - { - Coordination::write(node_path, buf); - } - - ptr data = buf.getBuffer(); - data->pos(0); - batch->add(String(reinterpret_cast(data->data_begin()), data->size())); - - index++; - } - - /// flush the last batch - saveBatchV2(out, batch); - out->close(); - return 1; -} void serializeSessionsV2(SessionAndTimeout & session_and_timeout, SessionAndAuth & session_and_auth, UInt32 save_batch_size, SnapshotVersion version, String & path) { diff --git a/src/Service/SnapshotCommon.h b/src/Service/SnapshotCommon.h index c43b30bbe2..b8486263c2 100644 --- a/src/Service/SnapshotCommon.h +++ b/src/Service/SnapshotCommon.h @@ -112,8 +112,6 @@ std::pair saveBatchAndUpdateCheckSumV2(ptr & out, ptr & batch, UInt32 checksum); void serializeAclsV2(const NumToACLMap & acls, String path, UInt32 save_batch_size, SnapshotVersion version); -[[maybe_unused]] size_t -serializeEphemeralsV2(KeeperStore::Ephemerals & ephemerals, std::mutex & mutex, String path, UInt32 save_batch_size); /// Serialize sessions and return the next_session_id before serialize void serializeSessionsV2(SessionAndTimeout & session_and_timeout, SessionAndAuth & session_and_auth, UInt32 save_batch_size, SnapshotVersion version, String & path); diff --git a/src/Service/tests/gtest_raft_log.cpp b/src/Service/tests/gtest_raft_log.cpp index 544eea2d32..91a6b1ba45 100644 --- a/src/Service/tests/gtest_raft_log.cpp +++ b/src/Service/tests/gtest_raft_log.cpp @@ -412,12 +412,10 @@ TEST(RaftLog, getEntries) String data("CREATE TABLE table1;"); ASSERT_EQ(appendEntry(log_store, term, key, data), i + 1); } - ptr>> ret = cs_new>>(); - log_store->getEntries(1, 3, ret); - ASSERT_EQ(ret->size(), 3); - ret->clear(); - log_store->getEntries(4, 8, ret); - ASSERT_EQ(ret->size(), 5); + std::vector> ret = log_store->getEntries(1, 3); + ASSERT_EQ(ret.size(), 3); + ret = log_store->getEntries(4, 8); + ASSERT_EQ(ret.size(), 5); log_store->close(); cleanDirectory(log_dir); } diff --git a/src/Service/tests/raft_test_common.h b/src/Service/tests/raft_test_common.h index 0f7470a441..ba117a78fc 100644 --- a/src/Service/tests/raft_test_common.h +++ b/src/Service/tests/raft_test_common.h @@ -19,7 +19,7 @@ class TestServer : public Poco::Util::Application, public Loggers }; static const String LOG_DIR = "./test_raft_log"; -[[maybe_unused]] static const String SNAP_DIR = "./test_raft_snapshot"; +static const String SNAP_DIR = "./test_raft_snapshot"; void cleanAll(); void cleanDirectory(const String & log_dir, bool remove_dir = true);