diff --git a/docs/how-to-monitor-and-manage.md b/docs/how-to-monitor-and-manage.md index 40d569d84d..e6f765689b 100644 --- a/docs/how-to-monitor-and-manage.md +++ b/docs/how-to-monitor-and-manage.md @@ -193,7 +193,6 @@ max_stored_snapshots=5 shutdown_timeout=5000 startup_timeout=6000000 raft_logs_level=information -rotate_log_storage_interval=100000 log_fsync_mode=fsync_parallel log_fsync_interval=1000 nuraft_thread_size=16 diff --git a/src/Service/ConnectionHandler.h b/src/Service/ConnectionHandler.h index ccbbed4561..6340b12c85 100644 --- a/src/Service/ConnectionHandler.h +++ b/src/Service/ConnectionHandler.h @@ -8,6 +8,7 @@ #include #include +#include #include #include #include @@ -62,7 +63,6 @@ class ConnectionHandler void onSocketError(const Notification &); /// current connection statistics - ConnectionStats getConnectionStats() const; void dumpStats(WriteBufferFromOwnString & buf, bool brief); /// reset current connection statistics diff --git a/src/Service/ForwardConnectionHandler.h b/src/Service/ForwardConnectionHandler.h index 575abd6736..5c808bba95 100644 --- a/src/Service/ForwardConnectionHandler.h +++ b/src/Service/ForwardConnectionHandler.h @@ -4,6 +4,7 @@ #include #include +#include #include #include #include diff --git a/src/Service/FourLetterCommand.cpp b/src/Service/FourLetterCommand.cpp index d66107d20d..1c913551be 100644 --- a/src/Service/FourLetterCommand.cpp +++ b/src/Service/FourLetterCommand.cpp @@ -8,7 +8,6 @@ #include #include #include -#include "Common/StringUtils.h" #include #include #include @@ -258,7 +257,7 @@ String MonitorCommand::run() print(ret, "watch_count", state_machine.getTotalWatchesCount()); print(ret, "ephemerals_count", state_machine.getTotalEphemeralNodesCount()); print(ret, "approximate_data_size", state_machine.getApproximateDataSize()); - print(ret, "in_snapshot", state_machine.getSnapshoting()); + print(ret, "in_snapshot", state_machine.isCreatingSnapshot()); #if defined(__linux__) || defined(__APPLE__) print(ret, "open_file_descriptor_count", getCurrentProcessFDCount()); diff --git a/src/Service/KeeperServer.cpp b/src/Service/KeeperServer.cpp index 2201901702..cd8707712f 100644 --- a/src/Service/KeeperServer.cpp +++ b/src/Service/KeeperServer.cpp @@ -135,10 +135,10 @@ ptr>> KeeperServer::pushRequestBatch(const std::v { LOG_DEBUG(log, "Push batch requests of size {}", request_batch.size()); std::vector> entries; - for (const auto & request_session : request_batch) + for (const auto & request : request_batch) { - LOG_TRACE(log, "Push request {}", request_session.toSimpleString()); - entries.push_back(serializeKeeperRequest(request_session)); + LOG_TRACE(log, "Push request {}", request.toSimpleString()); + entries.push_back(serializeKeeperRequest(request)); } /// append_entries write request ptr>> result = raft_instance->append_entries(entries); diff --git a/src/Service/KeeperUtils.cpp b/src/Service/KeeperUtils.cpp index 3de5e3c475..ee251e043e 100644 --- a/src/Service/KeeperUtils.cpp +++ b/src/Service/KeeperUtils.cpp @@ -46,11 +46,11 @@ ptr serializeKeeperRequest(const RequestForSession & request) return out.getBuffer(); } -RequestForSession deserializeKeeperRequest(nuraft::buffer & data) +ptr deserializeKeeperRequest(nuraft::buffer & data) { + ptr request = cs_new(); ReadBufferFromNuRaftBuffer buffer(data); - RequestForSession request_for_session; - readIntBinary(request_for_session.session_id, buffer); + readIntBinary(request->session_id, buffer); int32_t length; Coordination::read(length, buffer); @@ -64,17 +64,13 @@ RequestForSession deserializeKeeperRequest(nuraft::buffer & data) // bool is_internal; // Coordination::read(is_internal, buffer); - request_for_session.request = Coordination::ZooKeeperRequestFactory::instance().get(opnum); - request_for_session.request->xid = xid; - request_for_session.request->readImpl(buffer); + request->request = Coordination::ZooKeeperRequestFactory::instance().get(opnum); + request->request->xid = xid; + request->request->readImpl(buffer); - if (!buffer.eof()) - Coordination::read(request_for_session.create_time, buffer); - else /// backward compatibility - request_for_session.create_time - = std::chrono::duration_cast(std::chrono::system_clock::now().time_since_epoch()).count(); + Coordination::read(request->create_time, buffer); - return request_for_session; + return request; } ptr cloneLogEntry(const ptr & entry) diff --git a/src/Service/KeeperUtils.h b/src/Service/KeeperUtils.h index cc837d9cf6..fe073185da 100644 --- a/src/Service/KeeperUtils.h +++ b/src/Service/KeeperUtils.h @@ -1,22 +1,29 @@ #pragma once #include -#include -#include #include #include #include #include -#include #include namespace RK { +inline UInt64 getCurrentTimeMilliseconds() +{ + return duration_cast(std::chrono::steady_clock::now().time_since_epoch()).count(); +} + +inline UInt64 getCurrentTimeMicroseconds() +{ + return duration_cast(std::chrono::steady_clock::now().time_since_epoch()).count(); +} + /// Serialize and deserialize ZooKeeper request to log nuraft::ptr serializeKeeperRequest(const RequestForSession & request); -RequestForSession deserializeKeeperRequest(nuraft::buffer & data); +nuraft::ptr deserializeKeeperRequest(nuraft::buffer & data); nuraft::ptr cloneLogEntry(const nuraft::ptr & entry); diff --git a/src/Service/LastCommittedIndexManager.cpp b/src/Service/LastCommittedIndexManager.cpp index 4960ee1954..abdcc89831 100644 --- a/src/Service/LastCommittedIndexManager.cpp +++ b/src/Service/LastCommittedIndexManager.cpp @@ -18,10 +18,6 @@ namespace ErrorCodes extern const int CANNOT_SEEK_THROUGH_FILE; } -inline UInt64 getCurrentTimeMicroseconds() -{ - return std::chrono::duration_cast(std::chrono::steady_clock::now().time_since_epoch()).count(); -} LastCommittedIndexManager::LastCommittedIndexManager(const String & log_dir) : log(&Poco::Logger::get("LastCommittedIndexManager")) { @@ -35,7 +31,7 @@ LastCommittedIndexManager::LastCommittedIndexManager(const String & log_dir) : l throwFromErrno("Failed to open committed log index file", ErrorCodes::CANNOT_OPEN_FILE); previous_persist_time = getCurrentTimeMicroseconds(); - persist_thread = ThreadFromGlobalPool([this] { persistThread(); }); + bg_persist_thread = ThreadFromGlobalPool([this] { persistThread(); }); } LastCommittedIndexManager::~LastCommittedIndexManager() @@ -117,7 +113,7 @@ void LastCommittedIndexManager::shutDown() if (!is_shut_down) { is_shut_down = true; - persist_thread.join(); + bg_persist_thread.join(); ::close(persist_file_fd); } diff --git a/src/Service/LastCommittedIndexManager.h b/src/Service/LastCommittedIndexManager.h index 07060cdf9f..9162a4a681 100644 --- a/src/Service/LastCommittedIndexManager.h +++ b/src/Service/LastCommittedIndexManager.h @@ -38,7 +38,7 @@ class LastCommittedIndexManager UInt64 static constexpr PERSIST_INTERVAL_US = 100 * 1000; std::string_view static constexpr FILE_NAME = "last_committed_index.bin"; - ThreadFromGlobalPool persist_thread; + ThreadFromGlobalPool bg_persist_thread; std::atomic is_shut_down{false}; String persist_file_name; diff --git a/src/Service/Metrics.h b/src/Service/Metrics.h index cf07b8b420..a327fe3524 100644 --- a/src/Service/Metrics.h +++ b/src/Service/Metrics.h @@ -13,11 +13,6 @@ namespace RK { -inline UInt64 getCurrentTimeMilliseconds() -{ - return duration_cast(std::chrono::steady_clock::now().time_since_epoch()).count(); -}; - /** * Uses the reservoir sampling algorithm to sample statistical values diff --git a/src/Service/NuRaftLogSnapshot.cpp b/src/Service/NuRaftLogSnapshot.cpp index 0ebb92af23..1c9fee0bd2 100644 --- a/src/Service/NuRaftLogSnapshot.cpp +++ b/src/Service/NuRaftLogSnapshot.cpp @@ -5,8 +5,6 @@ #include #include -#include - #include #include #include @@ -15,6 +13,7 @@ #include #include +#include #include #include #include @@ -776,12 +775,12 @@ bool KeeperSnapshotManager::receiveSnapshotMeta(snapshot & meta) return true; } -bool KeeperSnapshotManager::existSnapshot(const snapshot & meta) +bool KeeperSnapshotManager::existSnapshot(const snapshot & meta) const { return snapshots.find(getSnapshotStoreMapKey(meta)) != snapshots.end(); } -bool KeeperSnapshotManager::existSnapshotObject(const snapshot & meta, ulong obj_id) +bool KeeperSnapshotManager::existSnapshotObject(const snapshot & meta, ulong obj_id) const { auto it = snapshots.find(getSnapshotStoreMapKey(meta)); if (it == snapshots.end()) diff --git a/src/Service/NuRaftLogSnapshot.h b/src/Service/NuRaftLogSnapshot.h index dc76367188..827ce629ee 100644 --- a/src/Service/NuRaftLogSnapshot.h +++ b/src/Service/NuRaftLogSnapshot.h @@ -327,10 +327,10 @@ class KeeperSnapshotManager bool saveSnapshotObject(snapshot & meta, ulong obj_id, buffer & buffer); /// whether snapshot exists - bool existSnapshot(const snapshot & meta); + bool existSnapshot(const snapshot & meta) const; /// whether snapshot object exists - bool existSnapshotObject(const snapshot & meta, ulong obj_id); + bool existSnapshotObject(const snapshot & meta, ulong obj_id) const; /// load snapshot object, invoked when leader should send snapshot to others. bool loadSnapshotObject(const snapshot & meta, ulong obj_id, ptr & buffer); diff --git a/src/Service/NuRaftStateMachine.cpp b/src/Service/NuRaftStateMachine.cpp index 01eea466be..8f152c6bec 100644 --- a/src/Service/NuRaftStateMachine.cpp +++ b/src/Service/NuRaftStateMachine.cpp @@ -2,22 +2,15 @@ #include #include -#include - -#include #include #include #include -#include #include #include -#include #include -using namespace nuraft; - namespace RK { @@ -32,8 +25,8 @@ struct ReplayLogBatch { ulong batch_start_index = 0; ulong batch_end_index = 0; - ptr> log_vec; - ptr>> request_vec; + ptr> log_entries; + ptr>> requests; }; NuRaftStateMachine::NuRaftStateMachine( @@ -55,12 +48,11 @@ NuRaftStateMachine::NuRaftStateMachine( , request_processor(request_processor_) , last_committed_idx(0) , snapshot_creating_interval(static_cast(internal) * 1000000) - , last_snapshot_time(Poco::Timestamp().epochMicroseconds()) + , last_snapshot_time(getCurrentTimeMicroseconds()) , new_session_id_callback_mutex(new_session_id_callback_mutex_) , new_session_id_callback(new_session_id_callback_) + , log(&(Poco::Logger::get("KeeperStateMachine"))) { - log = &(Poco::Logger::get("KeeperStateMachine")); - LOG_INFO(log, "Begin to initialize state machine"); snapshot_dir = snap_dir; @@ -68,16 +60,15 @@ NuRaftStateMachine::NuRaftStateMachine( /// Load snapshot meta from disk auto snapshots_count = snap_mgr->loadSnapshotMetas(); + LOG_INFO(log, "Found {} snapshots from disk, load the latest one", snapshots_count); - auto last_snapshot = snap_mgr->lastSnapshot(); - if (last_snapshot != nullptr) + if (auto last_snapshot = snap_mgr->lastSnapshot()) applySnapshotImpl(*last_snapshot); committed_log_manager = cs_new(log_dir); - /// Last committed idx of the previous startup, we should apply log to here. - uint64_t previous_last_commit_id = committed_log_manager->get(); - if (!previous_last_commit_id) + /// Last committed idx of the previous startup, we should apply log to here. + if (uint64_t previous_last_commit_id = committed_log_manager->get(); previous_last_commit_id == 0) { LOG_INFO(log, "No previous last commit idx found, skip replaying logs."); } @@ -109,54 +100,14 @@ NuRaftStateMachine::NuRaftStateMachine( store.getZxid()); store.initializeSystemNodes(); - - LOG_INFO(log, "Starting background creating snapshot thread."); - snap_thread = ThreadFromGlobalPool([this] { snapThread(); }); -} - -ptr NuRaftStateMachine::createRequestSession(ptr & entry) -{ - if (entry->get_val_type() != nuraft::log_val_type::app_log) - return nullptr; - - ReadBufferFromNuRaftBuffer buffer(entry->get_buf()); - ptr request_for_session = cs_new(); - - readIntBinary(request_for_session->session_id, buffer); - if (buffer.eof()) - { - LOG_DEBUG(log, "session time out {}", toHexString(request_for_session->session_id)); - return nullptr; - } - - int32_t length; - Coordination::read(length, buffer); - if (length <= 0) - { - return nullptr; - } - - int32_t xid; - Coordination::read(xid, buffer); - - Coordination::OpNum opnum; - Coordination::read(opnum, buffer); - - request_for_session->request = Coordination::ZooKeeperRequestFactory::instance().get(opnum); - request_for_session->request->xid = xid; - request_for_session->request->readImpl(buffer); - - if (buffer.eof()) - request_for_session->create_time = getCurrentTimeMilliseconds(); - else - Coordination::read(request_for_session->create_time, buffer); - - return request_for_session; + bg_snap_thread = ThreadFromGlobalPool([this] { snapThread(); }); } void NuRaftStateMachine::snapThread() { + LOG_INFO(log, "Starting background creating snapshot thread."); setThreadName("snapThread"); + while (!shutdown_called) { if (snap_task_ready) @@ -177,20 +128,19 @@ void NuRaftStateMachine::snapThread() current_task->when_done(ret, except); Metrics::getMetrics().snap_count->add(1); - Metrics::getMetrics().snap_time_ms->add(Poco::Timestamp().epochMicroseconds() / 1000 - snap_start_time); + Metrics::getMetrics().snap_time_ms->add(getCurrentTimeMilliseconds() - snap_start_time); - last_snapshot_time = Poco::Timestamp().epochMicroseconds(); + last_snapshot_time = getCurrentTimeMicroseconds(); in_snapshot = false; - LOG_INFO(log, "Snapshot created time cost {} ms", Poco::Timestamp().epochMicroseconds() / 1000 - snap_start_time); + LOG_INFO(log, "Snapshot created time cost {} ms", getCurrentTimeMilliseconds() - snap_start_time); } std::this_thread::sleep_for(std::chrono::milliseconds(1000)); } } -ptr NuRaftStateMachine::pre_commit(const ulong log_idx, buffer & data) +ptr NuRaftStateMachine::pre_commit(const ulong, buffer &) { - LOG_TRACE(log, "pre commit, log index {}, data size {}", log_idx, data.size()); return nullptr; } @@ -200,156 +150,27 @@ void NuRaftStateMachine::rollback(const ulong log_idx, buffer & data) LOG_TRACE(log, "pre commit, log index {}, data size {}", log_idx, data.size()); } -nuraft::ptr NuRaftStateMachine::commit(const ulong log_idx, nuraft::buffer & data, bool ignore_response) +ptr NuRaftStateMachine::commit(const ulong log_idx, buffer & data, bool ignore_response) { - LOG_TRACE(log, "Begin commit log index {}", log_idx); - - if (isNewSessionRequest(data)) /// TODO remove in future - { - nuraft::buffer_serializer timeout_data(data); - int64_t session_timeout_ms = timeout_data.get_i64(); - - auto response = nuraft::buffer::alloc(sizeof(int64_t)); - int64_t session_id; - - nuraft::buffer_serializer bs(response); - { - std::unique_lock session_id_lock(new_session_id_callback_mutex); - session_id = store.getSessionID(session_timeout_ms); - bs.put_i64(session_id); - - LOG_DEBUG(log, "Commit session id {} with timeout {}", toHexString(session_id), session_timeout_ms); + auto request_for_session = deserializeKeeperRequest(data); + LOG_TRACE(log, "Commit log {}, request {}", log_idx, request_for_session->toSimpleString()); - last_committed_idx = log_idx; - committed_log_manager->push(last_committed_idx); - - if (new_session_id_callback.contains(session_id)) - new_session_id_callback.find(session_id)->second->notify_all(); - else - LOG_DEBUG( - log, - "Not found callback for session id {}, maybe time out or before wait or not allocate from local", - toHexString(session_id)); - } - - return response; - } - else if (isUpdateSessionRequest(data)) /// TODO remove in future - { - nuraft::buffer_serializer data_serializer(data); - int64_t session_id = data_serializer.get_i64(); - int64_t session_timeout_ms = data_serializer.get_i64(); - - auto response = nuraft::buffer::alloc(1); - nuraft::buffer_serializer bs(response); - - { - std::unique_lock session_id_lock(new_session_id_callback_mutex); - int8_t is_success = store.updateSessionTimeout(session_id, session_timeout_ms); - bs.put_i8(is_success); - - LOG_DEBUG(log, "Update session id {} with timeout {}, response {}", toHexString(session_id), session_timeout_ms, is_success); - last_committed_idx = log_idx; - committed_log_manager->push(last_committed_idx); - - if (new_session_id_callback.contains(session_id)) - new_session_id_callback.find(session_id)->second->notify_all(); - else - LOG_DEBUG( - log, - "Not found callback for session id {}, maybe time out or before wait or not allocate from local", - toHexString(session_id)); - } - - return response; - } + if (request_processor) + request_processor->commit(*request_for_session); else - { - auto request_for_session = deserializeKeeperRequest(data); - LOG_TRACE(log, "Commit log index {}, request {}", log_idx, request_for_session.toSimpleString()); - - if (request_processor) - request_processor->commit(request_for_session); - else - store.processRequest(responses_queue, request_for_session, {}, true, ignore_response); + store.processRequest(responses_queue, *request_for_session, {}, true, ignore_response); - last_committed_idx = log_idx; - committed_log_manager->push(last_committed_idx); + last_committed_idx = log_idx; + committed_log_manager->push(last_committed_idx); - return nullptr; - } + return nullptr; } -nuraft::ptr NuRaftStateMachine::commit(const ulong log_idx, buffer & data) +ptr NuRaftStateMachine::commit(const ulong log_idx, buffer & data) { return commit(log_idx, data, false); } -std::vector NuRaftStateMachine::getDeadSessions() -{ - return store.getDeadSessions(); -} - -int64_t NuRaftStateMachine::getLastProcessedZxid() const -{ - return store.getZxid(); -} - -uint64_t NuRaftStateMachine::getNodesCount() const -{ - return store.getNodesCount(); -} - -uint64_t NuRaftStateMachine::getTotalWatchesCount() const -{ - return store.getTotalWatchesCount(); -} - -uint64_t NuRaftStateMachine::getWatchedPathsCount() const -{ - return store.getWatchedPathsCount(); -} - -uint64_t NuRaftStateMachine::getSessionsWithWatchesCount() const -{ - return store.getSessionsWithWatchesCount(); -} - -uint64_t NuRaftStateMachine::getTotalEphemeralNodesCount() const -{ - return store.getTotalEphemeralNodesCount(); -} - -uint64_t NuRaftStateMachine::getSessionWithEphemeralNodesCount() const -{ - return store.getSessionWithEphemeralNodesCount(); -} - -void NuRaftStateMachine::dumpWatches(WriteBufferFromOwnString & buf) const -{ - store.dumpWatches(buf); -} - -void NuRaftStateMachine::dumpWatchesByPath(WriteBufferFromOwnString & buf) const -{ - store.dumpWatchesByPath(buf); -} - -void NuRaftStateMachine::dumpSessionsAndEphemerals(WriteBufferFromOwnString & buf) const -{ - store.dumpSessionsAndEphemerals(buf); -} - -uint64_t NuRaftStateMachine::getApproximateDataSize() const -{ - return store.getApproximateDataSize(); -} - -bool NuRaftStateMachine::containsSession(int64_t session_id) const -{ - return store.containsSession(session_id); -} - void NuRaftStateMachine::shutdown() { if (shutdown_called) @@ -360,7 +181,7 @@ void NuRaftStateMachine::shutdown() store.finalize(); committed_log_manager->shutDown(); - snap_thread.join(); + bg_snap_thread.join(); LOG_INFO(log, "State machine shut down done!"); } @@ -384,7 +205,7 @@ void NuRaftStateMachine::create_snapshot(snapshot & s, async_result::handl } in_snapshot = true; - snap_start_time = Poco::Timestamp().epochMicroseconds() / 1000; + snap_start_time = getCurrentTimeMilliseconds(); LOG_INFO(log, "Creating snapshot last_log_term {}, last_log_idx {}", s.get_last_log_term(), s.get_last_log_idx()); @@ -396,12 +217,12 @@ void NuRaftStateMachine::create_snapshot(snapshot & s, async_result::handl when_done(ret, except); Metrics::getMetrics().snap_count->add(1); - Metrics::getMetrics().snap_time_ms->add(Poco::Timestamp().epochMicroseconds() / 1000 - snap_start_time); + Metrics::getMetrics().snap_time_ms->add(getCurrentTimeMilliseconds() - snap_start_time); - last_snapshot_time = Poco::Timestamp().epochMicroseconds(); + last_snapshot_time = getCurrentTimeMicroseconds(); in_snapshot = false; - LOG_INFO(log, "Created snapshot, time cost {} ms", Poco::Timestamp().epochMicroseconds() / 1000 - snap_start_time); + LOG_INFO(log, "Created snapshot, time cost {} ms", getCurrentTimeMilliseconds() - snap_start_time); } else { @@ -411,20 +232,20 @@ void NuRaftStateMachine::create_snapshot(snapshot & s, async_result::handl snap_task = std::make_shared(snap_copy, store, when_done); snap_task_ready = true; - LOG_INFO(log, "Scheduling asynchronous creating snapshot task, time cost {} ms", Poco::Timestamp().epochMicroseconds() / 1000 - snap_start_time); + LOG_INFO(log, "Scheduling asynchronous creating snapshot task, time cost {} ms", getCurrentTimeMilliseconds() - snap_start_time); } } void NuRaftStateMachine::create_snapshot(snapshot & s, int64_t next_zxid, int64_t next_session_id) { - std::lock_guard lock(snapshot_mutex); + std::lock_guard lock(snapshot_mutex); snap_mgr->createSnapshot(s, store, next_zxid, next_session_id); snap_mgr->removeSnapshots(); } void NuRaftStateMachine::create_snapshot_async(SnapTask & s) { - std::lock_guard lock(snapshot_mutex); + std::lock_guard lock(snapshot_mutex); snap_mgr->createSnapshotAsync(s); snap_mgr->removeSnapshots(); } @@ -454,7 +275,7 @@ int NuRaftStateMachine::read_logical_snp_obj(snapshot & s, void *& user_snp_ctx, { // Object ID == 0: first object data_out = buffer::alloc(sizeof(UInt32)); - buffer_serializer bs(data_out); + nuraft::buffer_serializer bs(data_out); bs.put_i32(0); is_last_obj = false; LOG_INFO(log, "Read snapshot object, last_log_idx {}, object id {}, is_last {}", s.get_last_log_idx(), obj_id, false); @@ -488,7 +309,7 @@ void NuRaftStateMachine::save_logical_snp_obj(snapshot & s, ulong & obj_id, buff obj_id++; } -bool NuRaftStateMachine::existSnapshotObject(snapshot & s, ulong obj_id) +bool NuRaftStateMachine::existSnapshotObject(snapshot & s, ulong obj_id) const { return snap_mgr->existSnapshotObject(s, obj_id); } @@ -505,7 +326,7 @@ bool NuRaftStateMachine::apply_snapshot(snapshot & s) bool NuRaftStateMachine::applySnapshotImpl(snapshot & s) { LOG_INFO(log, "Applying snapshot term {}, last log index {}, size {}", s.get_last_log_term(), s.get_last_log_idx(), s.size()); - std::lock_guard lock(snapshot_mutex); + std::lock_guard lock(snapshot_mutex); bool succeed = snap_mgr->parseSnapshot(s, store); if (succeed) { @@ -557,7 +378,7 @@ void NuRaftStateMachine::replayLogs(ptr log_store_, uint64_t from, ui /// Loading and applying asynchronously auto load_thread = ThreadFromGlobalPool( - [this, last_index_in_store, &log_queue, &batch_start_index, &batch_end_index, &log_store_] + [last_index_in_store, &log_queue, &batch_start_index, &batch_end_index, &log_store_] { Poco::Logger * thread_log = &(Poco::Logger::get("LoadLogThread")); while (batch_start_index < last_index_in_store) @@ -576,33 +397,25 @@ void NuRaftStateMachine::replayLogs(ptr log_store_, uint64_t from, ui LOG_INFO(thread_log, "Begin to load batch [{} , {})", batch_start_index, batch_end_index); ReplayLogBatch batch; - batch.log_vec + batch.log_entries = dynamic_cast(log_store_.get())->log_entries_version_ext(batch_start_index, batch_end_index, 0); batch.batch_start_index = batch_start_index; batch.batch_end_index = batch_end_index; - batch.request_vec = cs_new>>(); + batch.requests = cs_new>>(); - for (auto entry : *(batch.log_vec)) + for (auto & entry_with_version : *batch.log_entries) { - if (entry.entry->get_val_type() != nuraft::log_val_type::app_log) - { - LOG_DEBUG(thread_log, "Found non app log(type {}), ignore it", entry.entry->get_val_type()); - batch.request_vec->push_back(nullptr); - } - else if (isNewSessionRequest(entry.entry->get_buf())) - { - batch.request_vec->push_back(nullptr); - } - else if (isUpdateSessionRequest(entry.entry->get_buf())) + if (entry_with_version.entry->get_val_type() != nuraft::log_val_type::app_log) { - batch.request_vec->push_back(nullptr); + LOG_DEBUG(thread_log, "Found non app nuraft log(type {}), ignore it", entry_with_version.entry->get_val_type()); + batch.requests->push_back(nullptr); } else { /// user requests - ptr ptr_request = createRequestSession(entry.entry); - batch.request_vec->push_back(ptr_request); + auto request = deserializeKeeperRequest(entry_with_version.entry->get_buf()); + batch.requests->push_back(request); } } @@ -624,53 +437,33 @@ void NuRaftStateMachine::replayLogs(ptr log_store_, uint64_t from, ui ReplayLogBatch batch; log_queue.peek(batch); - if (batch.log_vec == nullptr) + if (batch.log_entries == nullptr) { LOG_DEBUG(log, "log vector is null"); break; } - for (size_t i = 0; i < batch.log_vec->size(); ++i) + for (size_t i = 0; i < batch.log_entries->size(); ++i) { ulong log_index = batch.batch_start_index + i; - auto entry = (*batch.log_vec)[i]; - if (entry.entry->get_val_type() != nuraft::log_val_type::app_log) + auto & entry_with_version = (*batch.log_entries)[i]; + + if (entry_with_version.entry->get_val_type() != nuraft::log_val_type::app_log) continue; - /// Compatible with old NewSessionRequest log_entry. - if (isNewSessionRequest(entry.entry->get_buf())) - { - /// replay session - int64_t session_timeout_ms = entry.entry->get_buf().get_ulong(); - int64_t session_id = store.getSessionID(session_timeout_ms); - LOG_TRACE(log, "Replay log create session {} with timeout {} from log", toHexString(session_id), session_timeout_ms); - } - /// Compatible with old UpdateSessionRequest log_entry. - else if (isUpdateSessionRequest(entry.entry->get_buf())) - { - /// replay update session - nuraft::buffer_serializer data_serializer(entry.entry->get_buf()); - int64_t session_id = data_serializer.get_i64(); - int64_t session_timeout_ms = data_serializer.get_i64(); + auto & request = (*batch.requests)[i]; + LOG_TRACE(log, "Replaying log {}, request {}", log_index, request->toString()); - store.updateSessionTimeout(session_id, session_timeout_ms); - LOG_TRACE(log, "Replay log update session {} with timeout {}", toHexString(session_id), session_timeout_ms); - } - else + store.processRequest(responses_queue, *request, {}, true, true); + + if (!isNewSessionRequest(request->request->getOpNum()) && request->session_id > store.getSessionIDCounter()) { - /// replay user requests - auto & request = (*batch.request_vec)[i]; - LOG_TRACE(log, "Replay log {}, request {}", log_index, request->toString()); - store.processRequest(responses_queue, *request, {}, true, true); - if (!RK::isNewSessionRequest(request->request->getOpNum()) && request->session_id > store.getSessionIDCounter()) - { - /// We may receive an error session id from client, and we just ignore it. - LOG_WARNING( - log, - "Storage's session_id_counter {} must bigger than the session id {} of log.", - toHexString(store.getSessionIDCounter()), - toHexString(request->session_id)); - } + /// We may receive an error session id from client, and we just ignore it. + LOG_WARNING( + log, + "Storage's session_id_counter {} must bigger than the session id {} of log.", + toHexString(store.getSessionIDCounter()), + toHexString(request->session_id)); } } @@ -704,9 +497,7 @@ void NuRaftStateMachine::free_user_snp_ctx(void *& user_snp_ctx) ptr NuRaftStateMachine::last_snapshot() { - /// Just return the latest snapshot. - std::lock_guard lock(snapshot_mutex); - LOG_INFO(log, "last_snapshot invoke"); + std::lock_guard lock(snapshot_mutex); return snap_mgr->lastSnapshot(); } @@ -717,24 +508,14 @@ bool NuRaftStateMachine::exists(const String & path) KeeperNode & NuRaftStateMachine::getNode(const String & path) { - auto node_ptr = store.getNode(path); - if (node_ptr != nullptr) + auto node = store.getNode(path); + if (node != nullptr) { - return *node_ptr.get(); + return *node.get(); } return default_node; } -bool NuRaftStateMachine::isNewSessionRequest(nuraft::buffer & data) -{ - return data.size() == sizeof(int64); -} - -bool NuRaftStateMachine::isUpdateSessionRequest(nuraft::buffer & data) -{ - return data.size() == sizeof(int64) + sizeof(int64); -} - void NuRaftStateMachine::reset() { { @@ -749,4 +530,70 @@ void NuRaftStateMachine::reset() } } + +std::vector NuRaftStateMachine::getDeadSessions() const +{ + return store.getDeadSessions(); +} + +int64_t NuRaftStateMachine::getLastProcessedZxid() const +{ + return store.getZxid(); +} + +uint64_t NuRaftStateMachine::getNodesCount() const +{ + return store.getNodesCount(); +} + +uint64_t NuRaftStateMachine::getTotalWatchesCount() const +{ + return store.getTotalWatchesCount(); +} + +uint64_t NuRaftStateMachine::getWatchedPathsCount() const +{ + return store.getWatchedPathsCount(); +} + +uint64_t NuRaftStateMachine::getSessionsWithWatchesCount() const +{ + return store.getSessionsWithWatchesCount(); +} + +uint64_t NuRaftStateMachine::getTotalEphemeralNodesCount() const +{ + return store.getTotalEphemeralNodesCount(); +} + +uint64_t NuRaftStateMachine::getSessionWithEphemeralNodesCount() const +{ + return store.getSessionWithEphemeralNodesCount(); +} + +void NuRaftStateMachine::dumpWatches(WriteBufferFromOwnString & buf) const +{ + store.dumpWatches(buf); +} + +void NuRaftStateMachine::dumpWatchesByPath(WriteBufferFromOwnString & buf) const +{ + store.dumpWatchesByPath(buf); +} + +void NuRaftStateMachine::dumpSessionsAndEphemerals(WriteBufferFromOwnString & buf) const +{ + store.dumpSessionsAndEphemerals(buf); +} + +uint64_t NuRaftStateMachine::getApproximateDataSize() const +{ + return store.getApproximateDataSize(); +} + +bool NuRaftStateMachine::containsSession(int64_t session_id) const +{ + return store.containsSession(session_id); +} + } diff --git a/src/Service/NuRaftStateMachine.h b/src/Service/NuRaftStateMachine.h index 8704834b25..d62cbfacac 100644 --- a/src/Service/NuRaftStateMachine.h +++ b/src/Service/NuRaftStateMachine.h @@ -1,10 +1,7 @@ #pragma once #include -#include #include -#include -#include #include #include @@ -180,7 +177,7 @@ class NuRaftStateMachine : public nuraft::state_machine void save_logical_snp_obj(snapshot & s, ulong & obj_id, buffer & data, bool is_first_obj, bool is_last_obj) override; /// whether snapshot object exists. - bool existSnapshotObject(snapshot & s, ulong obj_id); + bool existSnapshotObject(snapshot & s, ulong obj_id) const; /** * Apply received snapshot to state machine. Note that you should reset the state machine first. @@ -209,7 +206,7 @@ class NuRaftStateMachine : public nuraft::state_machine * Get the latest snapshot instance. * * This API will be invoked at the initialization of Raft server, - * so that the last last snapshot should be durable for server restart, + * so that the last snapshot should be durable for server restart, * if you want to avoid unnecessary catch-up. * * @return Pointer to the latest snapshot. @@ -219,7 +216,7 @@ class NuRaftStateMachine : public nuraft::state_machine ulong last_commit_index() override { return last_committed_idx; } /// get persisted last committed index - ulong getLastCommittedIndex() + ulong getLastCommittedIndex() const { return committed_log_manager->get(); } @@ -233,7 +230,7 @@ class NuRaftStateMachine : public nuraft::state_machine KeeperStore & getStore() { return store; } /// get expired session - std::vector getDeadSessions(); + std::vector getDeadSessions() const; /// for 4lw commands int64_t getLastProcessedZxid() const; @@ -267,7 +264,7 @@ class NuRaftStateMachine : public nuraft::state_machine bool containsSession(int64_t session_id) const; /// whether a snapshot creating is in progress. - bool getSnapshoting() const + bool isCreatingSnapshot() const { return in_snapshot; } @@ -279,19 +276,11 @@ class NuRaftStateMachine : public nuraft::state_machine /// Used when apply_snapshot. void reset(); - ptr createRequestSession(ptr & entry); - /// Asynchronously snapshot creating thread. /// Now it is not used. void snapThread(); - /// Only contains session_id - static bool isNewSessionRequest(nuraft::buffer & data); - - /// Contains session_id and timeout - static bool isUpdateSessionRequest(nuraft::buffer & data); - Poco::Logger * log; /// raft related settings RaftSettingsPtr raft_settings; @@ -327,13 +316,15 @@ class NuRaftStateMachine : public nuraft::state_machine std::atomic_bool snap_task_ready{false}; std::atomic_uint64_t snap_start_time; - ThreadFromGlobalPool snap_thread; + ThreadFromGlobalPool bg_snap_thread; std::shared_ptr snap_task; std::atomic shutdown_called{false}; std::mutex & new_session_id_callback_mutex; std::unordered_map> & new_session_id_callback; + + Poco::Logger * log; }; } diff --git a/src/Service/RequestForwarder.cpp b/src/Service/RequestForwarder.cpp index bcd25e443c..72d1db5ef5 100644 --- a/src/Service/RequestForwarder.cpp +++ b/src/Service/RequestForwarder.cpp @@ -120,7 +120,7 @@ void RequestForwarder::runSend(RunnerId runner_id) } session_sync_time_watch.restart(); - session_sync_idx++; + ++session_sync_idx; } } } diff --git a/src/Service/Settings.cpp b/src/Service/Settings.cpp index 6ee6e456c9..7e32900a8d 100644 --- a/src/Service/Settings.cpp +++ b/src/Service/Settings.cpp @@ -82,7 +82,6 @@ void RaftSettings::loadFromConfig(const String & config_elem, const Poco::Util:: String log_level = config.getString(get_key("raft_logs_level"), "information"); raft_logs_level = parseNuRaftLogLevel(log_level); - rotate_log_storage_interval = config.getUInt(get_key("rotate_log_storage_interval"), 100000); nuraft_thread_size = config.getUInt(get_key("nuraft_thread_size"), getNumberOfPhysicalCPUCores()); fresh_log_gap = config.getUInt(get_key("fresh_log_gap"), 200); configuration_change_tries_count = config.getUInt(get_key("configuration_change_tries_count"), 30); @@ -117,7 +116,6 @@ RaftSettingsPtr RaftSettings::getDefault() settings->startup_timeout = 6000000; settings->raft_logs_level = NuRaftLogLevel::RAFT_LOG_INFORMATION; - settings->rotate_log_storage_interval = 100000; settings->nuraft_thread_size = getNumberOfPhysicalCPUCores(); settings->fresh_log_gap = 200; settings->configuration_change_tries_count = 30; @@ -214,8 +212,6 @@ void Settings::dump(WriteBufferFromOwnString & buf) const writeText("raft_logs_level=", buf); writeText(nuRaftLogLevelToString(raft_settings->raft_logs_level), buf); buf.write('\n'); - writeText("rotate_log_storage_interval=", buf); - write_int(raft_settings->rotate_log_storage_interval); writeText("log_fsync_mode=", buf); writeText(FsyncModeNS::toString(raft_settings->log_fsync_mode), buf); diff --git a/src/Service/Settings.h b/src/Service/Settings.h index 126177403a..c179d9aa22 100644 --- a/src/Service/Settings.h +++ b/src/Service/Settings.h @@ -98,8 +98,6 @@ struct RaftSettings UInt64 startup_timeout; /// Log internal RAFT logs into main server log level. Valid values: 'trace', 'debug', 'information', 'warning', 'error', 'fatal' NuRaftLogLevel raft_logs_level; - /// How many records will be stored in one log storage file. TODO remove - UInt64 rotate_log_storage_interval; /// NuRaft thread pool size UInt64 nuraft_thread_size; /// When node became fresh diff --git a/src/Service/SnapshotCommon.cpp b/src/Service/SnapshotCommon.cpp index 5cfb357fbc..adecf828dc 100644 --- a/src/Service/SnapshotCommon.cpp +++ b/src/Service/SnapshotCommon.cpp @@ -3,6 +3,7 @@ #include #include +#include #include #include #include diff --git a/src/Service/tests/gtest_raft_snapshot.cpp b/src/Service/tests/gtest_raft_snapshot.cpp index e4b5120348..d8a1002797 100644 --- a/src/Service/tests/gtest_raft_snapshot.cpp +++ b/src/Service/tests/gtest_raft_snapshot.cpp @@ -20,26 +20,36 @@ namespace RK ptr createSessionLog(int64_t session_timeout_ms) { - auto entry = buffer::alloc(sizeof(int64_t)); - nuraft::buffer_serializer bs(entry); - bs.put_i64(session_timeout_ms); - return entry; + auto request = cs_new(); + request->xid = 1; + request->session_timeout_ms = session_timeout_ms; + request->internal_id = 1; + RequestForSession request_info; + request_info.request = request; + request_info.session_id = 1; + request_info.create_time = getCurrentTimeMilliseconds(); + ptr buf = serializeKeeperRequest(request_info); + return buf; } ptr updateSessionLog(int64_t session_id, int64_t session_timeout_ms) { - auto entry = buffer::alloc(sizeof(int64_t) + sizeof(int64_t)); - nuraft::buffer_serializer bs(entry); - - bs.put_i64(session_id); - bs.put_i64(session_timeout_ms); - return entry; + auto request = cs_new(); + request->xid = 1; + request->session_timeout_ms = session_timeout_ms; + request->session_id = session_id; + RequestForSession request_info; + request_info.request = request; + request_info.session_id = session_id; + request_info.create_time = getCurrentTimeMilliseconds(); + ptr buf = serializeKeeperRequest(request_info); + return buf; } ptr closeSessionLog(int64_t session_id) { - Coordination::ZooKeeperRequestPtr request = Coordination::ZooKeeperRequestFactory::instance().get(Coordination::OpNum::Close); - request->xid = Coordination::CLOSE_XID; + ZooKeeperRequestPtr request = ZooKeeperRequestFactory::instance().get(OpNum::Close); + request->xid = CLOSE_XID; RequestForSession request_info; request_info.request = request; request_info.session_id = session_id; @@ -761,7 +771,7 @@ void createSnapshotWithFuzzyLog(bool async_snapshot) LOG_INFO(log, "create snapshot with fuzzy log complete"); - std::this_thread::sleep_for(std::chrono::milliseconds(200)); + std::this_thread::sleep_for(std::chrono::milliseconds(1000)); /// wait for last committed index persistence. KeeperResponsesQueue ano_queue; ptr ano_store = cs_new(log_dir, false, FsyncMode::FSYNC); diff --git a/src/Service/tests/gtest_raft_state_machine.cpp b/src/Service/tests/gtest_raft_state_machine.cpp index 248cc51f28..1f33125389 100644 --- a/src/Service/tests/gtest_raft_state_machine.cpp +++ b/src/Service/tests/gtest_raft_state_machine.cpp @@ -42,10 +42,10 @@ TEST(RaftStateMachine, serializeAndParse) session_request.create_time = duration_cast(system_clock::now().time_since_epoch()).count(); ptr buf = serializeKeeperRequest(session_request); - RequestForSession session_request_2 = deserializeKeeperRequest(*(buf.get())); - if (session_request_2.request->getOpNum() == OpNum::Create) + ptr session_request_2 = deserializeKeeperRequest(*(buf.get())); + if (session_request_2->request->getOpNum() == OpNum::Create) { - ZooKeeperCreateRequest * request_2 = static_cast(session_request_2.request.get()); + ZooKeeperCreateRequest * request_2 = static_cast(session_request_2->request.get()); ASSERT_EQ(request_2->path, request->path); ASSERT_EQ(request_2->data, request->data); } diff --git a/src/ZooKeeper/ZooKeeperCommon.h b/src/ZooKeeper/ZooKeeperCommon.h index a5a40f1c0b..b5f1deea75 100644 --- a/src/ZooKeeper/ZooKeeperCommon.h +++ b/src/ZooKeeper/ZooKeeperCommon.h @@ -1,25 +1,19 @@ #pragma once -#include -#include #include #include #include #include -#include -#include -#include #include #include -#include "IKeeper.h" -#include "ZooKeeperConstants.h" #include #include #include #include -#include -#include + +#include +#include namespace Coordination diff --git a/tests/integration/test_four_word_command/test.py b/tests/integration/test_four_word_command/test.py index d6c015bde1..48c87cdde2 100644 --- a/tests/integration/test_four_word_command/test.py +++ b/tests/integration/test_four_word_command/test.py @@ -307,7 +307,6 @@ def test_cmd_conf(started_cluster): assert result["startup_timeout"] == "6000000" assert result["raft_logs_level"] == "debug" - assert result["rotate_log_storage_interval"] == "100000" assert result["log_fsync_mode"] == "fsync_parallel" assert result["log_fsync_interval"] == "1000"