diff --git a/src/Service/ForwardRequest.cpp b/src/Service/ForwardRequest.cpp index 8a5d04fa21..9171616fe7 100644 --- a/src/Service/ForwardRequest.cpp +++ b/src/Service/ForwardRequest.cpp @@ -36,7 +36,7 @@ ForwardResponsePtr ForwardHandshakeRequest::makeResponse() const return std::make_shared(); } -KeeperStore::RequestForSession ForwardHandshakeRequest::requestForSession() const +RequestForSession ForwardHandshakeRequest::requestForSession() const { RequestForSession reuqest_info; return reuqest_info; @@ -62,7 +62,7 @@ ForwardResponsePtr ForwardSessionRequest::makeResponse() const return std::make_shared(); } -KeeperStore::RequestForSession ForwardSessionRequest::requestForSession() const +RequestForSession ForwardSessionRequest::requestForSession() const { RequestForSession reuqest_info; return reuqest_info; @@ -100,7 +100,7 @@ ForwardResponsePtr ForwardGetSessionRequest::makeResponse() const return res; } -KeeperStore::RequestForSession ForwardGetSessionRequest::requestForSession() const +RequestForSession ForwardGetSessionRequest::requestForSession() const { RequestForSession reuqest_info; reuqest_info.request = request; @@ -141,7 +141,7 @@ ForwardResponsePtr ForwardUpdateSessionRequest::makeResponse() const return res; } -KeeperStore::RequestForSession ForwardUpdateSessionRequest::requestForSession() const +RequestForSession ForwardUpdateSessionRequest::requestForSession() const { RequestForSession reuqest_info; reuqest_info.request = request; @@ -186,7 +186,7 @@ ForwardResponsePtr ForwardOpRequest::makeResponse() const return res; } -KeeperStore::RequestForSession ForwardOpRequest::requestForSession() const +RequestForSession ForwardOpRequest::requestForSession() const { return request; } diff --git a/src/Service/ForwardRequest.h b/src/Service/ForwardRequest.h index 39b6385a7c..152532fdfa 100644 --- a/src/Service/ForwardRequest.h +++ b/src/Service/ForwardRequest.h @@ -29,7 +29,7 @@ struct ForwardRequest virtual ForwardResponsePtr makeResponse() const = 0; - virtual KeeperStore::RequestForSession requestForSession() const = 0; + virtual RequestForSession requestForSession() const = 0; virtual String toString() const = 0; @@ -51,7 +51,7 @@ struct ForwardHandshakeRequest : public ForwardRequest ForwardResponsePtr makeResponse() const override; - KeeperStore::RequestForSession requestForSession() const override; + RequestForSession requestForSession() const override; String toString() const override { @@ -78,7 +78,7 @@ struct ForwardSessionRequest : public ForwardRequest ForwardResponsePtr makeResponse() const override; - KeeperStore::RequestForSession requestForSession() const override; + RequestForSession requestForSession() const override; String toString() const override { @@ -99,7 +99,7 @@ struct ForwardGetSessionRequest : public ForwardRequest ForwardResponsePtr makeResponse() const override; - KeeperStore::RequestForSession requestForSession() const override; + RequestForSession requestForSession() const override; String toString() const override { @@ -120,7 +120,7 @@ struct ForwardUpdateSessionRequest : public ForwardRequest ForwardResponsePtr makeResponse() const override; - KeeperStore::RequestForSession requestForSession() const override; + RequestForSession requestForSession() const override; String toString() const override { @@ -132,7 +132,7 @@ struct ForwardUpdateSessionRequest : public ForwardRequest struct ForwardOpRequest : public ForwardRequest { - KeeperStore::RequestForSession request; + RequestForSession request; ForwardType forwardType() const override { return ForwardType::Operation; } @@ -142,7 +142,7 @@ struct ForwardOpRequest : public ForwardRequest ForwardResponsePtr makeResponse() const override; - KeeperStore::RequestForSession requestForSession() const override; + RequestForSession requestForSession() const override; String toString() const override { @@ -162,7 +162,7 @@ class ForwardRequestFactory final : private boost::noncopyable ForwardRequestPtr get(ForwardType op_num) const; - static ForwardRequestPtr convertFromRequest(const KeeperStore::RequestForSession & request_for_session) + static ForwardRequestPtr convertFromRequest(const RequestForSession & request_for_session) { auto opnum = request_for_session.request->getOpNum(); switch (opnum) diff --git a/src/Service/KeeperCommon.cpp b/src/Service/KeeperCommon.cpp index 9c4a773d46..d826d8ffc0 100644 --- a/src/Service/KeeperCommon.cpp +++ b/src/Service/KeeperCommon.cpp @@ -1,55 +1,69 @@ #include -#include -#include -#include -#include -#include -#include - -using namespace nuraft; +#include namespace RK { -namespace ErrorCodes +String ErrorRequest::toString() const { - extern const int INVALID_CONFIG_PARAMETER; + return fmt::format( + "session_id:{}, xid:{}, opnum:{}, accepted:{}, error_code:{}", + toHexString(session_id), + xid, + Coordination::toString(opnum), + accepted, + error_code); } -int Directory::createDir(const std::string & dir) +String ErrorRequest::getRequestId() const { - Poco::File(dir).createDirectories(); - return 0; + return RequestId{session_id, xid}.toString(); } -std::string checkAndGetSuperdigest(const String & user_and_digest) +String RequestId::toString() const { - if (user_and_digest.empty()) - return ""; + return fmt::format("session_id:{}, xid:{}", toHexString(session_id), xid); +} - std::vector scheme_and_id; - boost::split(scheme_and_id, user_and_digest, [](char c) { return c == ':'; }); - if (scheme_and_id.size() != 2 || scheme_and_id[0] != "super") - throw Exception( - ErrorCodes::INVALID_CONFIG_PARAMETER, "Incorrect superdigest in keeper_server config. Must be 'super:base64string'"); +bool RequestId::operator==(const RequestId & other) const +{ + return session_id == other.session_id && xid == other.xid; +} - return user_and_digest; +std::size_t RequestId::RequestIdHash::operator()(const RequestId & request_id) const +{ + std::size_t seed = 0; + std::hash hash64; + std::hash hash32; + + seed ^= hash64(request_id.session_id) + 0x9e3779b9 + (seed << 6) + (seed >> 2); + seed ^= hash32(request_id.xid) + 0x9e3779b9 + (seed << 6) + (seed >> 2); + + return seed; } -nuraft::ptr getZooKeeperLogEntry(int64_t session_id, int64_t time, const Coordination::ZooKeeperRequestPtr & request) +String RequestForSession::toString() const { - RK::WriteBufferFromNuraftBuffer buf; - RK::writeIntBinary(session_id, buf); - request->write(buf); - Coordination::write(time, buf); - return buf.getBuffer(); + return fmt::format( + "session_id: {}, xid:{}, opnum:{}, create_time:{}, server_id:{}, client_id:{}, request:{}", + toHexString(session_id), + request->xid, + Coordination::toString(request->getOpNum()), + create_time, + server_id, + client_id, + request->toString()); } +String RequestForSession::toSimpleString() const +{ + return fmt::format( + "session_id: {}, xid:{}, opnum:{}", toHexString(session_id), request->xid, Coordination::toString(request->getOpNum())); +} -ptr makeClone(const ptr & entry) +RequestId RequestForSession::getRequestId() const { - ptr clone = cs_new(entry->get_term(), buffer::clone(entry->get_buf()), entry->get_val_type()); - return clone; + return {session_id, request->xid}; } } diff --git a/src/Service/KeeperCommon.h b/src/Service/KeeperCommon.h index 9cdf227e87..a9fe32f410 100644 --- a/src/Service/KeeperCommon.h +++ b/src/Service/KeeperCommon.h @@ -1,91 +1,66 @@ #pragma once -#include -#include -#include -#include #include -#include #include -#include namespace RK { -std::string checkAndGetSuperdigest(const String & user_and_digest); -nuraft::ptr getZooKeeperLogEntry(int64_t session_id, int64_t time, const Coordination::ZooKeeperRequestPtr & request); -nuraft::ptr makeClone(const nuraft::ptr & entry); +/// Simple error request info. +struct ErrorRequest +{ + bool accepted; + nuraft::cmd_result_code error_code; /// TODO new error code instead of NuRaft error code + int64_t session_id; + Coordination::XID xid; + Coordination::OpNum opnum; + + String toString() const; + String getRequestId() const; +}; -struct BackendTimer +/// Global client request id. +struct RequestId { - static constexpr char TIME_FMT[] = "%Y%m%d%H%M%S"; + int64_t session_id; + Coordination::XID xid; - /// default min interval is 1 hour - UInt32 interval = 1 * 3600; - UInt32 random_window = 1200; //20 minutes + String toString() const; + bool operator==(const RequestId & other) const; - static void getCurrentTime(std::string & date_str) + struct RequestIdHash { - time_t curr_time; - time(&curr_time); - char tmp_buf[24]; - std::strftime(tmp_buf, sizeof(tmp_buf), TIME_FMT, localtime(&curr_time)); - date_str = tmp_buf; - } + std::size_t operator()(const RequestId & request_id) const; + }; +}; - static time_t parseTime(const std::string & date_str) - { - struct tm prev_tm; - memset(&prev_tm, 0, sizeof(tm)); - strptime(date_str.data(), TIME_FMT, &prev_tm); - time_t prev_time = mktime(&prev_tm); - return prev_time; - } +/// Attached session id to request +struct RequestForSession +{ + int64_t session_id; + Coordination::ZooKeeperRequestPtr request; - bool isActionTime(const time_t & prev_time, time_t curr_time) const - { - if (curr_time == 0L) - time(&curr_time); - return difftime(curr_time, prev_time) >= (interval + rand() % random_window); - } -}; + /// measured in millisecond + int64_t create_time{}; + /// for forward request + int32_t server_id{-1}; + int32_t client_id{-1}; -class Directory -{ -public: - static int createDir(const std::string & path); -}; + explicit RequestForSession() = default; -inline int readUInt32(nuraft::ptr & fs, UInt32 & x) -{ - errno = 0; - char * buf = reinterpret_cast(&x); - fs->read(buf, sizeof(UInt32)); - return fs->good() ? 0 : -1; -} + RequestForSession(Coordination::ZooKeeperRequestPtr request_, int64_t session_id_, int64_t create_time_) + : session_id(session_id_), request(request_), create_time(create_time_) + { + } -inline int writeUInt32(nuraft::ptr & fs, const UInt32 & x) -{ - errno = 0; - fs->write(reinterpret_cast(&x), sizeof(UInt32)); - return fs->good() ? 0 : -1; -} + bool isForwardRequest() const { return server_id > -1 && client_id > -1; } + RequestId getRequestId() const; -inline int readUInt64(nuraft::ptr & fs, UInt64 & x) -{ - errno = 0; - char * buf = reinterpret_cast(&x); - fs->read(buf, sizeof(UInt64)); - return fs->good() ? 0 : -1; -} + String toString() const; + String toSimpleString() const; -inline int writeUInt64(nuraft::ptr & fs, const UInt64 & x) -{ - errno = 0; - fs->write(reinterpret_cast(&x), sizeof(UInt64)); - return fs->good() ? 0 : -1; -} +}; } diff --git a/src/Service/KeeperDispatcher.cpp b/src/Service/KeeperDispatcher.cpp index fc39d1f72c..85aeb7d198 100644 --- a/src/Service/KeeperDispatcher.cpp +++ b/src/Service/KeeperDispatcher.cpp @@ -41,7 +41,7 @@ void KeeperDispatcher::requestThread(RunnerId runner_id) while (!shutdown_called) { - KeeperStore::RequestForSession request_for_session; + RequestForSession request_for_session; UInt64 max_wait = configuration_and_settings->raft_settings->operation_timeout_ms; @@ -161,7 +161,7 @@ bool KeeperDispatcher::putRequest(const Coordination::ZooKeeperRequestPtr & requ return false; } - KeeperStore::RequestForSession request_info; + RequestForSession request_info; request_info.request = request; request_info.session_id = session_id; using namespace std::chrono; @@ -191,7 +191,7 @@ bool KeeperDispatcher::putRequest(const Coordination::ZooKeeperRequestPtr & requ bool KeeperDispatcher::putForwardingRequest(size_t server_id, size_t client_id, ForwardRequestPtr request) { - KeeperStore::RequestForSession && request_info = request->requestForSession(); + RequestForSession && request_info = request->requestForSession(); using namespace std::chrono; request_info.create_time = duration_cast(system_clock::now().time_since_epoch()).count(); @@ -316,7 +316,7 @@ void KeeperDispatcher::shutdown() } LOG_INFO(log, "for unhandled requests sending session expired error to client."); - KeeperStore::RequestForSession request_for_session; + RequestForSession request_for_session; while (requests_queue->tryPopAny(request_for_session)) { auto response = request_for_session.request->makeResponse(); @@ -375,7 +375,7 @@ void KeeperDispatcher::sessionCleanerTask() Coordination::ZooKeeperRequestPtr request = Coordination::ZooKeeperRequestFactory::instance().get(Coordination::OpNum::Close); request->xid = Coordination::CLOSE_XID; - KeeperStore::RequestForSession request_info; + RequestForSession request_info; request_info.request = request; request_info.session_id = dead_session; using namespace std::chrono; diff --git a/src/Service/KeeperDispatcher.h b/src/Service/KeeperDispatcher.h index 9793004ea6..99ce7243c0 100644 --- a/src/Service/KeeperDispatcher.h +++ b/src/Service/KeeperDispatcher.h @@ -38,10 +38,10 @@ class KeeperDispatcher : public std::enable_shared_from_this std::atomic shutdown_called{false}; using SessionToResponseCallback = std::unordered_map; - std::mutex session_to_response_callback_mutex; + /// Session response callback which will send response to IO handler. + /// Sessions here is local session which are directly connected to the node. SessionToResponseCallback session_to_response_callback; - - std::mutex forward_to_response_callback_mutex; + std::mutex session_to_response_callback_mutex; struct PairHash { @@ -59,6 +59,7 @@ class KeeperDispatcher : public std::enable_shared_from_this using ForwardToResponseCallback = std::unordered_map; ForwardToResponseCallback forward_to_response_callback; + std::mutex forward_to_response_callback_mutex; using UpdateConfigurationQueue = ConcurrentBoundedQueue; /// More than 1k updates is definitely misconfiguration. @@ -85,7 +86,6 @@ class KeeperDispatcher : public std::enable_shared_from_this /// 1. request_accumulator for accumulating request into batch /// 2. request_forwarder for forwarding requests to leader /// 3. request_processor for processing requests - std::shared_ptr request_processor; RequestAccumulator request_accumulator; RequestForwarder request_forwarder; diff --git a/src/Service/KeeperServer.cpp b/src/Service/KeeperServer.cpp index 18b30bc23f..60b76d4bf1 100644 --- a/src/Service/KeeperServer.cpp +++ b/src/Service/KeeperServer.cpp @@ -115,7 +115,7 @@ void KeeperServer::shutdown() LOG_INFO(log, "Shut down keeper server done!"); } -void KeeperServer::putRequest(const KeeperStore::RequestForSession & request_for_session) +void KeeperServer::putRequest(const RequestForSession & request_for_session) { auto [session_id, request, time, server, client] = request_for_session; if (isLeaderAlive() && request->isReadRequest()) @@ -173,7 +173,7 @@ void KeeperServer::putRequest(const KeeperStore::RequestForSession & request_for } } -ptr>> KeeperServer::putRequestBatch(const std::vector & request_batch) +ptr>> KeeperServer::putRequestBatch(const std::vector & request_batch) { LOG_DEBUG(log, "process the batch requests {}", request_batch.size()); std::vector> entries; diff --git a/src/Service/KeeperServer.h b/src/Service/KeeperServer.h index 39a6bc3896..76e796b20e 100644 --- a/src/Service/KeeperServer.h +++ b/src/Service/KeeperServer.h @@ -70,10 +70,10 @@ class KeeperServer std::shared_ptr request_processor_ = nullptr); /// need replaced with putRequestBatch - void putRequest(const KeeperStore::RequestForSession & request); + void putRequest(const RequestForSession & request); /// Put write request into queue and keeper server will append it to NuRaft asynchronously. - ptr>> putRequestBatch(const std::vector & request_batch); + ptr>> putRequestBatch(const std::vector & request_batch); /// Allocate a new session id. int64_t getSessionID(int64_t session_timeout_ms); diff --git a/src/Service/KeeperStore.h b/src/Service/KeeperStore.h index 276a6a5490..1d734ce570 100644 --- a/src/Service/KeeperStore.h +++ b/src/Service/KeeperStore.h @@ -7,6 +7,7 @@ #include #include #include +#include #include #include #include @@ -180,29 +181,6 @@ class KeeperStore using ResponsesForSessions = std::vector; using KeeperResponsesQueue = ThreadSafeQueue; - /// Attached session id to request - struct RequestForSession - { - int64_t session_id; - Coordination::ZooKeeperRequestPtr request; - - /// measured in millisecond - int64_t create_time{}; - - /// for forward request - int32_t server_id{-1}; - int32_t client_id{-1}; - - explicit RequestForSession() = default; - - RequestForSession(Coordination::ZooKeeperRequestPtr request_, int64_t session_id_, int64_t create_time_) - : session_id(session_id_), request(request_), create_time(create_time_) - { - } - - bool isForwardRequest() const { return server_id > -1 && client_id > -1; } - }; - using SessionAndAuth = std::unordered_map; using RequestsForSessions = std::vector; using Container = ConcurrentMap; diff --git a/src/Service/KeeperUtils.cpp b/src/Service/KeeperUtils.cpp new file mode 100644 index 0000000000..33ccf5f451 --- /dev/null +++ b/src/Service/KeeperUtils.cpp @@ -0,0 +1,57 @@ +#include + +#include +#include + +#include +#include +#include +#include + +using namespace nuraft; + +namespace RK +{ + +namespace ErrorCodes +{ + extern const int INVALID_CONFIG_PARAMETER; +} + +int Directory::createDir(const std::string & dir) +{ + Poco::File(dir).createDirectories(); + return 0; +} + +std::string checkAndGetSuperdigest(const String & user_and_digest) +{ + if (user_and_digest.empty()) + return ""; + + std::vector scheme_and_id; + boost::split(scheme_and_id, user_and_digest, [](char c) { return c == ':'; }); + if (scheme_and_id.size() != 2 || scheme_and_id[0] != "super") + throw Exception( + ErrorCodes::INVALID_CONFIG_PARAMETER, "Incorrect superdigest in keeper_server config. Must be 'super:base64string'"); + + return user_and_digest; +} + +nuraft::ptr getZooKeeperLogEntry(int64_t session_id, int64_t time, const Coordination::ZooKeeperRequestPtr & request) +{ + RK::WriteBufferFromNuraftBuffer buf; + RK::writeIntBinary(session_id, buf); + request->write(buf); + Coordination::write(time, buf); + return buf.getBuffer(); +} + + +ptr makeClone(const ptr & entry) +{ + ptr clone = cs_new(entry->get_term(), buffer::clone(entry->get_buf()), entry->get_val_type()); + return clone; +} + +} diff --git a/src/Service/KeeperUtils.h b/src/Service/KeeperUtils.h new file mode 100644 index 0000000000..2cf09b5f3a --- /dev/null +++ b/src/Service/KeeperUtils.h @@ -0,0 +1,92 @@ +#pragma once + +#include +#include +#include +#include +#include +#include +#include +#include + + +namespace RK +{ + +std::string checkAndGetSuperdigest(const String & user_and_digest); +nuraft::ptr getZooKeeperLogEntry(int64_t session_id, int64_t time, const Coordination::ZooKeeperRequestPtr & request); +nuraft::ptr makeClone(const nuraft::ptr & entry); + +struct BackendTimer +{ + static constexpr char TIME_FMT[] = "%Y%m%d%H%M%S"; + + /// default min interval is 1 hour + UInt32 interval = 1 * 3600; + UInt32 random_window = 1200; //20 minutes + + static void getCurrentTime(std::string & date_str) + { + time_t curr_time; + time(&curr_time); + char tmp_buf[24]; + std::strftime(tmp_buf, sizeof(tmp_buf), TIME_FMT, localtime(&curr_time)); + date_str = tmp_buf; + } + + static time_t parseTime(const std::string & date_str) + { + struct tm prev_tm; + memset(&prev_tm, 0, sizeof(tm)); + strptime(date_str.data(), TIME_FMT, &prev_tm); + time_t prev_time = mktime(&prev_tm); + return prev_time; + } + + bool isActionTime(const time_t & prev_time, time_t curr_time) const + { + if (curr_time == 0L) + time(&curr_time); + return difftime(curr_time, prev_time) >= (interval + rand() % random_window); + } +}; + + +class Directory +{ +public: + static int createDir(const std::string & path); +}; + + +inline int readUInt32(nuraft::ptr & fs, UInt32 & x) +{ + errno = 0; + char * buf = reinterpret_cast(&x); + fs->read(buf, sizeof(UInt32)); + return fs->good() ? 0 : -1; +} + +inline int writeUInt32(nuraft::ptr & fs, const UInt32 & x) +{ + errno = 0; + fs->write(reinterpret_cast(&x), sizeof(UInt32)); + return fs->good() ? 0 : -1; +} + +inline int readUInt64(nuraft::ptr & fs, UInt64 & x) +{ + errno = 0; + char * buf = reinterpret_cast(&x); + fs->read(buf, sizeof(UInt64)); + return fs->good() ? 0 : -1; +} + +inline int writeUInt64(nuraft::ptr & fs, const UInt64 & x) +{ + errno = 0; + fs->write(reinterpret_cast(&x), sizeof(UInt64)); + return fs->good() ? 0 : -1; +} + +} diff --git a/src/Service/NuRaftLogSegment.cpp b/src/Service/NuRaftLogSegment.cpp index 95114ae511..a4b94dc70b 100644 --- a/src/Service/NuRaftLogSegment.cpp +++ b/src/Service/NuRaftLogSegment.cpp @@ -1,16 +1,19 @@ #include #include #include -#include -#include -#include -#include -#include #include #include +#include + #include + #include +#include +#include +#include +#include + #ifdef __clang__ # pragma clang diagnostic push # pragma clang diagnostic ignored "-Wformat-nonliteral" diff --git a/src/Service/NuRaftLogSegment.h b/src/Service/NuRaftLogSegment.h index 458aba8cf7..27c58b6ac2 100644 --- a/src/Service/NuRaftLogSegment.h +++ b/src/Service/NuRaftLogSegment.h @@ -3,14 +3,16 @@ #include #include #include -#include #include -#include -#include -#include +#include + +#include #include #include -#include + +#include +#include +#include namespace RK diff --git a/src/Service/NuRaftLogSnapshot.cpp b/src/Service/NuRaftLogSnapshot.cpp index 9692c08761..8964410cb6 100644 --- a/src/Service/NuRaftLogSnapshot.cpp +++ b/src/Service/NuRaftLogSnapshot.cpp @@ -1,21 +1,24 @@ #include -#include #include +#include #include #include -#include -#include -#include -#include -#include -#include #include +#include + #include #include + #include #include #include +#include +#include +#include +#include +#include + #ifdef __clang__ # pragma clang diagnostic push # pragma clang diagnostic ignored "-Wformat-nonliteral" diff --git a/src/Service/NuRaftLogSnapshot.h b/src/Service/NuRaftLogSnapshot.h index e43bff3a7b..dab3f4a891 100644 --- a/src/Service/NuRaftLogSnapshot.h +++ b/src/Service/NuRaftLogSnapshot.h @@ -2,13 +2,15 @@ #include #include -#include + +#include +#include + #include +#include #include #include #include -#include -#include namespace RK diff --git a/src/Service/NuRaftStateMachine.cpp b/src/Service/NuRaftStateMachine.cpp index 428391404e..d5f9441490 100644 --- a/src/Service/NuRaftStateMachine.cpp +++ b/src/Service/NuRaftStateMachine.cpp @@ -31,7 +31,7 @@ struct ReplayLogBatch ulong batch_start_index = 0; ulong batch_end_index = 0; ptr> log_vec; - ptr>> request_vec; + ptr>> request_vec; }; NuRaftStateMachine::NuRaftStateMachine( @@ -140,7 +140,7 @@ NuRaftStateMachine::NuRaftStateMachine( batch.batch_start_index = batch_start_index; batch.batch_end_index = batch_end_index; - batch.request_vec = cs_new>>(); + batch.request_vec = cs_new>>(); for (auto entry : *(batch.log_vec)) { @@ -162,7 +162,7 @@ NuRaftStateMachine::NuRaftStateMachine( else { /// replay nodes - ptr ptr_request = this->createRequestSession(entry.entry); + ptr ptr_request = this->createRequestSession(entry.entry); LOG_TRACE(log, "Replay log request, session {}", toHexString(ptr_request->session_id)); batch.request_vec->push_back(ptr_request); @@ -279,13 +279,13 @@ NuRaftStateMachine::NuRaftStateMachine( snap_thread = ThreadFromGlobalPool([this] { snapThread(); }); } -ptr NuRaftStateMachine::createRequestSession(ptr & entry) +ptr NuRaftStateMachine::createRequestSession(ptr & entry) { if (entry->get_val_type() != nuraft::log_val_type::app_log) return nullptr; ReadBufferFromNuraftBuffer buffer(entry->get_buf()); - ptr request_for_session = cs_new(); + ptr request_for_session = cs_new(); readIntBinary(request_for_session->session_id, buffer); if (buffer.eof()) @@ -354,10 +354,10 @@ void NuRaftStateMachine::snapThread() } } -KeeperStore::RequestForSession NuRaftStateMachine::parseRequest(nuraft::buffer & data) +RequestForSession NuRaftStateMachine::parseRequest(nuraft::buffer & data) { ReadBufferFromNuraftBuffer buffer(data); - KeeperStore::RequestForSession request_for_session; + RequestForSession request_for_session; /// TODO unify digital encoding mode readIntBinary(request_for_session.session_id, buffer); @@ -392,7 +392,7 @@ KeeperStore::RequestForSession NuRaftStateMachine::parseRequest(nuraft::buffer & return request_for_session; } -ptr NuRaftStateMachine::serializeRequest(KeeperStore::RequestForSession & session_request) +ptr NuRaftStateMachine::serializeRequest(RequestForSession & session_request) { WriteBufferFromNuraftBuffer out; /// TODO unify digital encoding mode, see parseRequest @@ -521,7 +521,7 @@ nuraft::ptr NuRaftStateMachine::commit(const ulong log_idx, buff return commit(log_idx, data, false); } -void NuRaftStateMachine::processReadRequest(const KeeperStore::RequestForSession & request_for_session) +void NuRaftStateMachine::processReadRequest(const RequestForSession & request_for_session) { store.processRequest(responses_queue, request_for_session); } diff --git a/src/Service/NuRaftStateMachine.h b/src/Service/NuRaftStateMachine.h index 4ea263fe3d..d2c98746d4 100644 --- a/src/Service/NuRaftStateMachine.h +++ b/src/Service/NuRaftStateMachine.h @@ -225,7 +225,7 @@ class NuRaftStateMachine : public nuraft::state_machine KeeperStore & getStore() { return store; } /// process read request - void processReadRequest(const KeeperStore::RequestForSession & request_for_session); + void processReadRequest(const RequestForSession & request_for_session); /// get expired session std::vector getDeadSessions(); @@ -282,12 +282,12 @@ class NuRaftStateMachine : public nuraft::state_machine void shutdown(); /// deserialize a RequestForSession - static KeeperStore::RequestForSession parseRequest(nuraft::buffer & data); + static RequestForSession parseRequest(nuraft::buffer & data); /// serialize a RequestForSession - static ptr serializeRequest(KeeperStore::RequestForSession & request); + static ptr serializeRequest(RequestForSession & request); private: - ptr createRequestSession(ptr & entry); + ptr createRequestSession(ptr & entry); /// Asynchronously snapshot creating thread. /// Now it is not used. diff --git a/src/Service/RaftTaskManager.cpp b/src/Service/RaftTaskManager.cpp index 69e28375ff..fba0112015 100644 --- a/src/Service/RaftTaskManager.cpp +++ b/src/Service/RaftTaskManager.cpp @@ -1,10 +1,12 @@ #include #include -#include -#include #include + #include +#include +#include + namespace RK { diff --git a/src/Service/RequestAccumulator.cpp b/src/Service/RequestAccumulator.cpp index 445a6d4414..521b259fdc 100644 --- a/src/Service/RequestAccumulator.cpp +++ b/src/Service/RequestAccumulator.cpp @@ -22,7 +22,7 @@ void RequestAccumulator::run(RunnerId runner_id) while (!shutdown_called) { - KeeperStore::RequestForSession request_for_session; + RequestForSession request_for_session; bool pop_success; if (to_append_batch.empty()) @@ -99,7 +99,7 @@ void RequestAccumulator::shutdown() shutdown_called = true; - KeeperStore::RequestForSession request_for_session; + RequestForSession request_for_session; while (requests_queue->tryPopAny(request_for_session)) { request_processor->onError( diff --git a/src/Service/RequestAccumulator.h b/src/Service/RequestAccumulator.h index 973c61bc48..7dc779bf2b 100644 --- a/src/Service/RequestAccumulator.h +++ b/src/Service/RequestAccumulator.h @@ -15,7 +15,7 @@ namespace RK */ class RequestAccumulator { - using RequestForSession = KeeperStore::RequestForSession; + using RequestForSession = RequestForSession; using NuRaftResult = nuraft::ptr>>; public: diff --git a/src/Service/RequestForwarder.cpp b/src/Service/RequestForwarder.cpp index 4fe30236ec..56700dbc3a 100644 --- a/src/Service/RequestForwarder.cpp +++ b/src/Service/RequestForwarder.cpp @@ -34,7 +34,7 @@ void RequestForwarder::runSend(RunnerId runner_id) max_wait = elapsed_milliseconds >= session_sync_period_ms ? 0 : session_sync_period_ms - elapsed_milliseconds; } - KeeperStore::RequestForSession request_for_session; + RequestForSession request_for_session; if (requests_queue->tryPop(runner_id, request_for_session, max_wait)) { @@ -273,7 +273,7 @@ void RequestForwarder::shutdown() }); } - KeeperStore::RequestForSession request_for_session; + RequestForSession request_for_session; while (requests_queue->tryPopAny(request_for_session)) { request_processor->onError( diff --git a/src/Service/RequestProcessor.cpp b/src/Service/RequestProcessor.cpp index 940f81d660..1711d7f2ef 100644 --- a/src/Service/RequestProcessor.cpp +++ b/src/Service/RequestProcessor.cpp @@ -1,4 +1,5 @@ +#include #include #include #include @@ -6,6 +7,11 @@ namespace RK { +namespace ErrorCodes +{ + extern const int LOGICAL_ERROR; +} + void RequestProcessor::push(const RequestForSession & request_for_session) { if (!shutdown_called) @@ -18,6 +24,11 @@ void RequestProcessor::push(const RequestForSession & request_for_session) } } +void RequestProcessor::systemExist() +{ + ::abort(); +} + void RequestProcessor::run() { setThreadName("ReqProcessor"); @@ -43,12 +54,9 @@ void RequestProcessor::run() if (shutdown_called) return; - /// 1. process error requests - processErrorRequest(); - size_t committed_request_size = committed_queue.size(); - /// 2. process read request, multi thread + /// 1. process read request, multi thread for (RunnerId runner_id = 0; runner_id < runner_count; runner_id++) { request_thread->trySchedule([this, runner_id] { @@ -58,8 +66,11 @@ void RequestProcessor::run() } request_thread->wait(); - /// 3. process committed request, single thread + /// 2. process committed request, single thread processCommittedRequest(committed_request_size); + + /// 3. process error requests + processErrorRequest(); } catch (...) { @@ -92,148 +103,130 @@ void RequestProcessor::moveRequestToPendingQueue(RunnerId runner_id) } } +bool RequestProcessor::shouldProcessCommittedRequest(const RequestForSession & committed_request) +{ + bool has_read_request = false; + bool found_error = false; + + auto runner_id = getRunnerId(committed_request.session_id); + auto & my_pending_requests = pending_requests.find(runner_id)->second; + + auto & pending_requests_for_session = my_pending_requests[committed_request.session_id]; + + if (pending_requests_for_session.empty()) + { + LOG_FATAL(log, "Logic error, no pending requests for session {}", toHexString(committed_request.session_id)); + systemExist(); + } + + auto & first_pending_request = pending_requests_for_session.front(); + LOG_DEBUG(log, "First session pending request {}", first_pending_request.toSimpleString()); + + if (first_pending_request.request->xid == committed_request.request->xid) + { + std::unique_lock lk(mutex); + if (errors.contains(first_pending_request.getRequestId())) + { + LOG_WARNING(log, "Request {} is in errors, but is successfully committed", committed_request.toSimpleString()); + } + return true; + } + else + { + /// Session of the previous committed(write) request is not same with the current, + /// which means a write_request(session_1) -> request(session_2) sequence. + if (first_pending_request.request->isReadRequest()) + { + LOG_DEBUG(log, "Found read request, We should terminate the processing of committed(write) requests."); + has_read_request = true; + } + else + { + { + std::unique_lock lk(mutex); + found_error = errors.contains(first_pending_request.getRequestId()); + } + + if (found_error) + { + LOG_WARNING(log, "Found error request, We should terminate the processing of committed(write) requests."); + } + else + { + LOG_FATAL( + log, + "Logical Error, maybe reconnected current session {}, system will exist. First pending request {}, to committed request {}, pending " + "request size {}", + first_pending_request.toString(), + committed_request.toString(), + pending_requests_for_session.size()); + systemExist(); + } + } + } + + return !has_read_request && !found_error; +} void RequestProcessor::processCommittedRequest(size_t count) { - LOG_DEBUG(log, "Process committed request size {}", count); RequestForSession committed_request; for (size_t i = 0; i < count; ++i) { - if (committed_queue.peek(committed_request)) + if (!committed_queue.peek(committed_request)) + continue; + + LOG_DEBUG(log, "Process committed(write) request {}", committed_request.toString()); + + auto runner_id = getRunnerId(committed_request.session_id); + auto & my_pending_requests = pending_requests.find(runner_id)->second; + + /// Remote requests + if (!keeper_dispatcher->isLocalSession(committed_request.session_id)) { - auto & pending_requests_for_thread = pending_requests.find(getRunnerId(committed_request.session_id))->second; + if (my_pending_requests.contains(committed_request.session_id)) + { + LOG_WARNING( + log, + "Found session {} in pending_queue while it is not local, maybe because of connection disconnected. " + "Just delete from pending queue", + toHexString(committed_request.session_id)); + my_pending_requests.erase(committed_request.session_id); + } - LOG_DEBUG( + LOG_WARNING( log, - "Committed request session {} xid {} request {}, session {} pending requests size {},", - toHexString(committed_request.session_id), - committed_request.request->xid, - committed_request.request->toString(), - toHexString(committed_request.session_id), - pending_requests_for_thread.contains(committed_request.session_id) - ? pending_requests_for_thread[committed_request.session_id].size() - : 0 - ); - - auto op_num = committed_request.request->getOpNum(); - - /// Remote requests - if (!keeper_dispatcher->isLocalSession(committed_request.session_id) - || op_num == Coordination::OpNum::Auth) + "Session {} is not local, maybe it is because of disconnecting. We still should apply the committed(write) " + "request", + committed_request.session_id); + + applyRequest(committed_request); + committed_queue.pop(); + } + /// Local requests + else + { + if (committed_request.request->getOpNum() == Coordination::OpNum::Auth) { - LOG_DEBUG(log, "Not contains session {}", committed_request.session_id); - if (pending_requests_for_thread.contains(committed_request.session_id)) - { - LOG_WARNING( - log, - "Found session {} in pending_queue while it is not local, maybe because of connection disconnected. " - "Just delete from pending queue", - toHexString(committed_request.session_id)); - pending_requests_for_thread.erase(committed_request.session_id); - } + LOG_DEBUG(log, "Apply auth request", committed_request.session_id); applyRequest(committed_request); committed_queue.pop(); } - /// Local requests else { - bool has_read_request = false; - bool found_error = false; - auto & pending_requests_for_session = pending_requests_for_thread[committed_request.session_id]; - if (!pending_requests_for_session.empty()) - { - LOG_DEBUG( - log, - "Current session pending request opNum {}, session {}, xid {}", - Coordination::toString(pending_requests_for_session.begin()->request->getOpNum()), - toHexString(pending_requests_for_session.begin()->session_id), - pending_requests_for_session.begin()->request->xid); - - while (pending_requests_for_session.begin()->request->xid != committed_request.request->xid) - { - auto current_begin_request_session = pending_requests_for_session.begin(); - if (current_begin_request_session->request->isReadRequest()) - { - LOG_DEBUG( - log, - "Current session {} pending head request xid {} {} is read request", - toHexString(committed_request.session_id), - current_begin_request_session->session_id, - current_begin_request_session->request->xid); - - has_read_request = true; - break; - } - /// Because close's xid is not necessarily CLOSE_XID. - else if ( - current_begin_request_session->request->getOpNum() == Coordination::OpNum::Close - && committed_request.request->getOpNum() == Coordination::OpNum::Close) - { - break; - } - else - { - std::unique_lock lk(mutex); - if (errors.contains(UInt128( - current_begin_request_session->session_id, current_begin_request_session->request->xid))) - { - LOG_WARNING( - log, - "Current session {} pending head request xid {} not same committed request xid {} opnum " - "{}, " - "because it is in errors", - toHexString(committed_request.session_id), - current_begin_request_session->request->xid, - committed_request.request->xid, - Coordination::toString(committed_request.request->getOpNum())); - - found_error = true; - break; - } - else - { - /// TODO should exit? - LOG_WARNING( - log, - "Logic Error, maybe reconnected current session {} pending head request xid {} {} not same " - "committed request xid {} {}, pending request size {}", - toHexString(committed_request.session_id), - current_begin_request_session->request->xid, - current_begin_request_session->request->toString(), - committed_request.request->xid, - committed_request.request->toString(), - pending_requests_for_session.size()); - break; - } - } - } - } - else - { - LOG_WARNING(log, "Logic error, pending request is empty for session {}", toHexString(committed_request.session_id)); - } - - if (has_read_request || found_error) + if (!shouldProcessCommittedRequest(committed_request)) break; + /// apply request applyRequest(committed_request); committed_queue.pop(); - for (auto it = pending_requests_for_session.begin(); it != pending_requests_for_session.end();) - { - auto xid = it->request->xid; - auto opnum = it->request->getOpNum(); - it = pending_requests_for_session.erase(it); - if (xid == committed_request.request->xid - || (opnum == Coordination::OpNum::Close - && committed_request.request->getOpNum() == Coordination::OpNum::Close)) - { - break; - } - } + /// remove request from pending queue + auto & pending_requests_for_session = my_pending_requests[committed_request.session_id]; + pending_requests_for_session.erase(pending_requests_for_session.begin()); if (pending_requests_for_session.empty()) - pending_requests_for_thread.erase(committed_request.session_id); + my_pending_requests.erase(committed_request.session_id); } } } @@ -241,133 +234,119 @@ void RequestProcessor::processCommittedRequest(size_t count) void RequestProcessor::processErrorRequest() { - /// 1. handle error requests std::lock_guard lock(mutex); - if (!errors.empty()) + + if (errors.empty()) + return; + + LOG_WARNING(log, "Has {} error requests", errors.size()); + + ///TODO error requests are not processed in sequence. + for (auto it = errors.begin(); it != errors.end();) { - LOG_WARNING(log, "Has {} error requests", errors.size()); - for (auto it = errors.begin(); it != errors.end();) - { - auto [session_id, xid] = it->first; - auto & error_request = it->second; + auto [session_id, xid] = it->first; + auto & error_request = it->second; - LOG_WARNING(log, "Try find error request session {}, xid {}, error code {}", toHexString(session_id), xid, error_request.error_code); + auto & my_pending_requests = pending_requests.find(getRunnerId(session_id))->second; + + /// request is not local + if (!keeper_dispatcher->isLocalSession(session_id)) + { + if (my_pending_requests.contains(session_id)) + { + LOG_WARNING( + log, + "Found session {} in pending_queue while it is not local, maybe because of connection disconnected. " + "Just delete from pending queue", + toHexString(session_id)); + my_pending_requests.erase(session_id); + } - auto & pending_requests_for_thread = pending_requests.find(getRunnerId(session_id))->second; + LOG_WARNING(log, "Not my error request {}", error_request.toString()); + it = errors.erase(it); + } + else + { + /// find error request in pending queue + std::optional request = findErrorRequest(error_request); - if (!keeper_dispatcher->isLocalSession(session_id)) + /// process error request + if (request) { - if (pending_requests_for_thread.contains(session_id)) - { - LOG_WARNING( - log, - "Found session {} in pending_queue while it is not local, maybe because of connection disconnected. " - "Just delete from pending queue", - toHexString(session_id)); - pending_requests_for_thread.erase(session_id); - } + LOG_ERROR(log, "Make error response for {}", error_request.toString()); + + ZooKeeperResponsePtr response = request->request->makeResponse(); + response->xid = request->request->xid; + response->zxid = 0; + response->request_created_time_ms = request->create_time; + + response->error = error_request.error_code == nuraft::cmd_result_code::TIMEOUT ? Coordination::Error::ZOPERATIONTIMEOUT + : Coordination::Error::ZCONNECTIONLOSS; - LOG_WARNING(log, "Not my session error, session {}, xid {}", toHexString(session_id), xid); + responses_queue.push(KeeperStore::ResponseForSession{static_cast(session_id), response}); it = errors.erase(it); } else { - using namespace Coordination; - std::optional request; - if (static_cast(xid) == Coordination::AUTH_XID) - { - ZooKeeperRequestPtr auth_request = std::make_shared(); - using namespace std::chrono; - int64_t now = duration_cast(system_clock::now().time_since_epoch()).count(); - RequestForSession request1{auth_request, static_cast(session_id), now}; - request.emplace(request1); - } - else - { - auto session_requests = pending_requests_for_thread.find(session_id); - - if (session_requests != pending_requests_for_thread.end()) - { - auto & requests = session_requests->second; - for (auto request_it = requests.begin(); request_it != requests.end();) - { - LOG_TRACE( - log, - "Try match session {} pending request xid {}, target error xid {}", - toHexString(session_id), - request_it->request->xid, - xid); - if (static_cast(request_it->request->xid) < xid) - { - break; - } - else if ( - static_cast(request_it->request->xid) == xid - || (request_it->request->getOpNum() == Coordination::OpNum::Close - && error_request.opnum == Coordination::OpNum::Close)) - { - request = *request_it; - request_it = requests.erase(request_it); - break; - } - else - { - ++request_it; - } - } - } - else - { - LOG_WARNING(log, "Session {}, no pending requests", toHexString(session_id)); - } - } - - if (request) - { - ZooKeeperResponsePtr response = request->request->makeResponse(); - response->xid = request->request->xid; - response->zxid = 0; - response->request_created_time_ms = request->create_time; - - auto accepted = error_request.accepted; - auto error_code = error_request.error_code; - - response->error = error_code == nuraft::cmd_result_code::TIMEOUT ? Coordination::Error::ZOPERATIONTIMEOUT - : Coordination::Error::ZCONNECTIONLOSS; - - responses_queue.push(RK::KeeperStore::ResponseForSession{static_cast(session_id), response}); - - LOG_ERROR( - log, - "Make error response for session {}, xid {}, opNum {}", - toHexString(session_id), - response->xid, - error_request.opnum); - - if (!accepted) - LOG_ERROR(log, "Request batch is not accepted"); - else - LOG_ERROR(log, "Request batch error, nuraft code {}", error_code); - - LOG_ERROR(log, "Matched error request session {}, xid {} from pending requests queue", toHexString(session_id), xid); - - it = errors.erase(it); - } - else - { - LOG_WARNING( - log, - "Not found error request session {}, xid {} from pending queue. Maybe it is still in the request queue " - "and will be processed next time", - toHexString(session_id), - xid); - break; - } + LOG_FATAL(log, "Logical error, not found error request {} in pending queue.", error_request.getRequestId()); + systemExist(); } } } } +std::optional RequestProcessor::findErrorRequest(const ErrorRequest & error_request) +{ + auto session_id = error_request.session_id; + auto xid = error_request.xid; + + /// Auth request is not put in pending queue, so no need to remove it. + if (xid == Coordination::AUTH_XID) + { + std::optional request; + ZooKeeperRequestPtr auth_request = std::make_shared(); + + Poco::Timestamp timestamp; + auto now = timestamp.epochMicroseconds(); + + RequestForSession request_for_session{auth_request, session_id, now / 1000}; + request.emplace(request_for_session); + return request; + } + + std::optional request; + + auto & my_pending_requests = pending_requests.find(getRunnerId(session_id))->second; + auto session_requests = my_pending_requests.find(session_id); + + if (session_requests != my_pending_requests.end()) + { + auto & requests = session_requests->second; + for (auto request_it = requests.begin(); request_it != requests.end();) + { + LOG_TRACE(log, "Try match {}", toHexString(session_id), request_it->request->xid); + + if (request_it->request->xid == xid + || (request_it->request->getOpNum() == Coordination::OpNum::Close && error_request.opnum == Coordination::OpNum::Close)) + { + request = *request_it; + request_it = requests.erase(request_it); + break; + } + else + { + ++request_it; + } + } + } + else + { + LOG_ERROR(log, "Logical error, session {} has no pending requests now.", toHexString(session_id)); + } + + return request; +} + void RequestProcessor::processReadRequests(RunnerId runner_id) { auto & thread_requests = pending_requests.find(runner_id)->second; @@ -399,42 +378,50 @@ void RequestProcessor::processReadRequests(RunnerId runner_id) void RequestProcessor::applyRequest(const RequestForSession & request) const { + LOG_TRACE( + log, + "Apply request session {} xid {} request {}", + toHexString(request.session_id), + request.request->xid, + request.request->toString()); + try { - LOG_TRACE( - log, - "Apply request session {} xid {} request {}", - toHexString(request.session_id), - request.request->xid, - request.request->toString()); - - if (!server->isLeaderAlive() && request.request->isReadRequest()) + if (request.request->isReadRequest()) { - auto response = request.request->makeResponse(); + if (server->isLeaderAlive()) + { + server->getKeeperStateMachine()->getStore().processRequest(responses_queue, request); + } + else + { + auto response = request.request->makeResponse(); - response->request_created_time_ms = request.create_time; - response->xid = request.request->xid; - response->zxid = 0; - response->error = Coordination::Error::ZCONNECTIONLOSS; + response->request_created_time_ms = request.create_time; + response->xid = request.request->xid; + response->zxid = 0; + response->error = Coordination::Error::ZCONNECTIONLOSS; - responses_queue.push(RK::KeeperStore::ResponseForSession{request.session_id, response}); + responses_queue.push(KeeperStore::ResponseForSession{request.session_id, response}); + } } - /// Raft already committed the request, we must apply it/ else { if (!server->isLeaderAlive()) - LOG_WARNING(log, "Apply write request but leader not alive."); - server->getKeeperStateMachine()->getStore().processRequest( - responses_queue, request, {}, true, false); + LOG_WARNING(log, "Write request is committed, when try to apply it to store the leader is not alive."); + server->getKeeperStateMachine()->getStore().processRequest(responses_queue, request); } } catch (...) { - tryLogCurrentException( - log, - fmt::format( - "Got exception while process session {} read request {}.", toHexString(request.session_id), request.request->toString())); + tryLogCurrentException(log, fmt::format("Fail to apply request {}.", request.request->toString())); + if (!request.request->isReadRequest()) + { + LOG_FATAL(log, "Fail to apply write request which will lead state machine inconsistency, system will exist."); + ::exit(1); + } } + } void RequestProcessor::shutdown() @@ -453,7 +440,7 @@ void RequestProcessor::shutdown() if (main_thread.joinable()) main_thread.join(); - KeeperStore::RequestForSession request_for_session; + RequestForSession request_for_session; while (requests_queue->tryPopAny(request_for_session)) { auto response = request_for_session.request->makeResponse(); @@ -485,9 +472,10 @@ void RequestProcessor::onError( { LOG_WARNING(log, "On error session {}, xid {}", toHexString(session_id), xid); { - std::unique_lock lk(mutex); ErrorRequest error_request{accepted, error_code, session_id, xid, opnum}; - errors.emplace(UInt128(session_id, xid), error_request); + RequestId id{session_id, xid}; + std::unique_lock lock(mutex); + errors.emplace(id, error_request); } cv.notify_all(); } diff --git a/src/Service/RequestProcessor.h b/src/Service/RequestProcessor.h index 2d6226d0c9..48dfd21c32 100644 --- a/src/Service/RequestProcessor.h +++ b/src/Service/RequestProcessor.h @@ -1,5 +1,6 @@ #pragma once +#include "ZooKeeper/ZooKeeperConstants.h" #include #include #include @@ -8,17 +9,6 @@ namespace RK { -using RequestForSession = KeeperStore::RequestForSession; - -struct ErrorRequest -{ - bool accepted; - nuraft::cmd_result_code error_code; - int64_t session_id; - Coordination::XID xid; - Coordination::OpNum opnum; -}; - class KeeperDispatcher; /** Handle user read request and Raft committed write request. @@ -32,21 +22,12 @@ class RequestProcessor } void push(const RequestForSession & request_for_session); - void run(); - - void moveRequestToPendingQueue(RunnerId runner_id); - - void processReadRequests(RunnerId runner_id); - void processErrorRequest(); - void processCommittedRequest(size_t count); - - /// Apply request to state machine - void applyRequest(const RequestForSession & request) const; void shutdown(); void commit(RequestForSession request); + /// Invoked when fail to forward request to leader or append entry. void onError(bool accepted, nuraft::cmd_result_code error_code, int64_t session_id, Coordination::XID xid, Coordination::OpNum opnum); void initialize( @@ -58,10 +39,29 @@ class RequestProcessor size_t commitQueueSize() { return committed_queue.size(); } private: + void run(); + /// Exist system for fatal error. + [[noreturn]] void systemExist(); + + void moveRequestToPendingQueue(RunnerId runner_id); + + void processReadRequests(RunnerId runner_id); + void processErrorRequest(); + void processCommittedRequest(size_t count); + + /// Apply request to state machine + void applyRequest(const RequestForSession & request) const; size_t getRunnerId(int64_t session_id) const { return session_id % runner_count; } + /// Find error request in pending request queue + std::optional findErrorRequest(const ErrorRequest & error_request); + + /// We can handle zxid as a continuous stream of committed(write) requests at once. + /// However, if we encounter a read request or an erroneous request, + /// we need to interrupt the processing. + bool shouldProcessCommittedRequest(const RequestForSession & committed_request); - using RequestForSessions = std::vector; + using RequestForSessions = std::vector; ThreadFromGlobalPool main_thread; @@ -79,7 +79,7 @@ class RequestProcessor std::unordered_map> pending_requests; /// Raft committed write requests which can be local or from other nodes. - ConcurrentBoundedQueue committed_queue{1000}; + ConcurrentBoundedQueue committed_queue{1000}; size_t runner_count; @@ -92,7 +92,7 @@ class RequestProcessor /// key : session_id xid /// Error requests when append entry or forward to leader - std::unordered_map errors; + std::unordered_map errors; Poco::Logger * log; diff --git a/src/Service/RequestsQueue.h b/src/Service/RequestsQueue.h index 74c594843a..a4fd27e902 100644 --- a/src/Service/RequestsQueue.h +++ b/src/Service/RequestsQueue.h @@ -35,7 +35,7 @@ namespace RK */ struct RequestsQueue { - using Queue = ConcurrentBoundedQueue; + using Queue = ConcurrentBoundedQueue; std::vector> queues; @@ -63,26 +63,26 @@ struct RequestsQueue return queues[request.session_id % queues.size()]->tryPush(std::forward(request), wait_ms); } - bool pop(size_t queue_id, KeeperStore::RequestForSession & request) + bool pop(size_t queue_id, RequestForSession & request) { assert(queue_id != 0 && queue_id <= queues.size()); return queues[queue_id]->pop(request); } - bool tryPop(size_t queue_id, KeeperStore::RequestForSession & request, UInt64 wait_ms = 0) + bool tryPop(size_t queue_id, RequestForSession & request, UInt64 wait_ms = 0) { assert(queue_id < queues.size()); return queues[queue_id]->tryPop(request, wait_ms); } - [[maybe_unused]] bool tryPopMicro(size_t queue_id, KeeperStore::RequestForSession & request, UInt64 wait_micro = 0) + [[maybe_unused]] bool tryPopMicro(size_t queue_id, RequestForSession & request, UInt64 wait_micro = 0) { assert(queue_id < queues.size()); return queues[queue_id]->tryPopMicro(request, wait_micro); } - bool tryPopAny(KeeperStore::RequestForSession & request, UInt64 wait_ms = 0) + bool tryPopAny(RequestForSession & request, UInt64 wait_ms = 0) { for (const auto & queue : queues) { diff --git a/src/Service/tests/gtest_raft_log.cpp b/src/Service/tests/gtest_raft_log.cpp index 18dda8180a..a0d794a64f 100644 --- a/src/Service/tests/gtest_raft_log.cpp +++ b/src/Service/tests/gtest_raft_log.cpp @@ -1,13 +1,15 @@ -#include +#include + +#include +#include +#include +#include + +#include #include #include #include #include -#include -#include -#include -#include -#include using namespace nuraft; diff --git a/src/Service/tests/gtest_raft_performance.cpp b/src/Service/tests/gtest_raft_performance.cpp index d18e1b31ee..cfd8f603a8 100644 --- a/src/Service/tests/gtest_raft_performance.cpp +++ b/src/Service/tests/gtest_raft_performance.cpp @@ -1,14 +1,16 @@ -#include +#include + +#include +#include +#include +#include + +#include #include #include #include #include #include -#include -#include -#include -#include -#include using namespace nuraft; using namespace RK; diff --git a/src/Service/tests/gtest_raft_snapshot.cpp b/src/Service/tests/gtest_raft_snapshot.cpp index 3389b17f8d..b0d922062e 100644 --- a/src/Service/tests/gtest_raft_snapshot.cpp +++ b/src/Service/tests/gtest_raft_snapshot.cpp @@ -2,6 +2,7 @@ #include #include #include +#include #include #include #include @@ -62,7 +63,7 @@ ptr closeSessionLog(int64_t session_id) { Coordination::ZooKeeperRequestPtr request = Coordination::ZooKeeperRequestFactory::instance().get(Coordination::OpNum::Close); request->xid = Coordination::CLOSE_XID; - KeeperStore::RequestForSession request_info; + RequestForSession request_info; request_info.request = request; request_info.session_id = session_id; int64_t time = std::chrono::system_clock::now().time_since_epoch() / std::chrono::milliseconds(1); @@ -80,7 +81,7 @@ ptr createLog(int64_t session_id, const std::string & key, const std::st acl.id = "anyone"; default_acls.emplace_back(std::move(acl)); - auto session_request = cs_new(); + auto session_request = cs_new(); auto request = cs_new(); session_request->request = request; session_request->session_id = session_id; @@ -107,7 +108,7 @@ ptr setLog(int64_t session_id, const std::string & key, const std::strin acl.id = "anyone"; default_acls.emplace_back(std::move(acl)); - auto session_request = cs_new(); + auto session_request = cs_new(); auto request = cs_new(); session_request->request = request; session_request->session_id = session_id; @@ -132,7 +133,7 @@ ptr removeLog(int64_t session_id, const std::string & key) acl.id = "anyone"; default_acls.emplace_back(std::move(acl)); - auto session_request = cs_new(); + auto session_request = cs_new(); auto request = cs_new(); session_request->request = request; session_request->session_id = session_id; diff --git a/src/Service/tests/gtest_raft_state_machine.cpp b/src/Service/tests/gtest_raft_state_machine.cpp index 763a2afb7a..2141ad0bf1 100644 --- a/src/Service/tests/gtest_raft_state_machine.cpp +++ b/src/Service/tests/gtest_raft_state_machine.cpp @@ -2,6 +2,7 @@ #include #include #include +#include #include #include #include @@ -39,7 +40,7 @@ void createZNodeLog(NuRaftStateMachine & machine, std::string & key, std::string default_acls.emplace_back(std::move(acl)); UInt64 index = machine.last_commit_index() + 1; - KeeperStore::RequestForSession session_request; + RequestForSession session_request; session_request.session_id = createSession(machine); auto request = cs_new(); session_request.request = request; @@ -81,7 +82,7 @@ void setZNode(NuRaftStateMachine & machine, std::string & key, std::string & dat default_acls.emplace_back(std::move(acl)); UInt64 index = machine.last_commit_index() + 1; - KeeperStore::RequestForSession session_request; + RequestForSession session_request; session_request.session_id = createSession(machine); auto request = cs_new(); session_request.request = request; @@ -108,7 +109,7 @@ void removeZNode(NuRaftStateMachine & machine, std::string & key) default_acls.emplace_back(std::move(acl)); UInt64 index = machine.last_commit_index() + 1; - KeeperStore::RequestForSession session_request; + RequestForSession session_request; session_request.session_id = createSession(machine); auto request = cs_new(); session_request.request = request; @@ -139,7 +140,7 @@ TEST(RaftStateMachine, serializeAndParse) default_acls.emplace_back(std::move(acl)); //UInt64 index = machine.last_commit_index() + 1; - KeeperStore::RequestForSession session_request; + RequestForSession session_request; session_request.session_id = 1; auto request = cs_new(); request->path = "1"; @@ -153,7 +154,7 @@ TEST(RaftStateMachine, serializeAndParse) session_request.create_time = duration_cast(system_clock::now().time_since_epoch()).count(); ptr buf = NuRaftStateMachine::serializeRequest(session_request); - KeeperStore::RequestForSession session_request_2 = NuRaftStateMachine::parseRequest(*(buf.get())); + RequestForSession session_request_2 = NuRaftStateMachine::parseRequest(*(buf.get())); if (session_request_2.request->getOpNum() == OpNum::Create) { ZooKeeperCreateRequest * request_2 = static_cast(session_request_2.request.get()); diff --git a/src/Service/tests/raft_unit_benchmark.cpp b/src/Service/tests/raft_unit_benchmark.cpp index 83b6838f87..be7431aa7e 100644 --- a/src/Service/tests/raft_unit_benchmark.cpp +++ b/src/Service/tests/raft_unit_benchmark.cpp @@ -2,22 +2,25 @@ #include #include #include -#include -#include -#include -#include -#include -#include -#include -#include -#include -#include + #include #include + #include #include #include +#include #include +#include +#include + +#include +#include +#include +#include +#include +#include +#include using namespace Coordination; using namespace RK;