diff --git a/src/Service/RequestProcessor.cpp b/src/Service/RequestProcessor.cpp index 84576becd8..0ca7bcf83b 100644 --- a/src/Service/RequestProcessor.cpp +++ b/src/Service/RequestProcessor.cpp @@ -47,7 +47,7 @@ void RequestProcessor::run() if (it->second.empty()) { LOG_ERROR(log, "Got empty queue in pending_requests, it's a bug"); - it = pending_requests.erase(it); // erase 返回下一个迭代器 + it = pending_requests.erase(it); } else { @@ -56,7 +56,21 @@ void RequestProcessor::run() } } - return error_request_ids.empty() && requests_queue->empty() && committed_queue.empty() && pending_requests_empty; + for (auto it = pending_error_requests.begin(); it != pending_error_requests.end();) + { + if (it->second.empty()) + { + LOG_ERROR(log, "Got empty queue in pending_requests, it's a bug"); + it = pending_error_requests.erase(it); + } + else + { + pending_requests_empty = false; + break; + } + } + + return requests_queue->empty() && committed_queue.empty() && error_requests.empty() && pending_requests_empty; }; { @@ -66,7 +80,7 @@ void RequestProcessor::run() LOG_DEBUG( log, "Waiting timeout errors size {}, requests_queue size {}, committed_queue size {}", - error_request_ids.size(), + error_requests.size(), requests_queue->size(), committed_queue.size()); } @@ -74,62 +88,24 @@ void RequestProcessor::run() if (shutdown_called) return; - size_t error_request_size; - { - std::unique_lock lk(mutex); - error_request_size = error_request_ids.size(); - } - - /// 1. process read request - moveRequestToPendingQueue(); + size_t error_to_process = error_requests.size(); + size_t commits_to_process = committed_queue.size(); + size_t requests_to_process = requests_queue->size(); - /// 2. process committed request, single thread - size_t committed_request_size = committed_queue.size(); + /// 1. process request from request_queue + if (requests_to_process != 0) + moveRequestToPendingQueue(requests_to_process); - if (committed_request_size == 0) - continue; - - - auto queuesToDrain = processCommittedRequest(committed_request_size); + /// 2. process error requests + if (error_to_process !=0) + processErrorRequest(error_to_process); - /// 3. process error requests - processErrorRequest(error_request_size); + /// 3. process committed request, single thread + if (commits_to_process != 0) + processCommittedRequest(commits_to_process); /// 4. handle need_drain sessions - size_t read_processed = 0; - - for (auto && sessionId : queuesToDrain) - { - if (!pending_requests.contains(sessionId)) - continue; - - auto & session_queue = pending_requests[sessionId]; - size_t read_after_write = 0; - - while (!shutdown_called && !session_queue.empty() && session_queue.front().request->isReadRequest()) - { - auto & read_request = session_queue.front(); - ++ read_after_write; - sendToProcessor(read_request); - - LOG_DEBUG(log, "Move read request {} from pending_requests", read_request.toSimpleString()); - session_queue.pop(); - - if (!session_queue.empty()) - { - LOG_DEBUG(log, "Next pending_request {}", session_queue.front().toSimpleString()); - } - } - - Metrics::getMetrics().reads_after_write_in_session_queue->add(read_after_write); - read_processed += read_after_write; - - // Remove empty queues - if (session_queue.empty()) - pending_requests.erase(sessionId); - } - Metrics::getMetrics().reads_issued_from_session_queue->add(read_processed); - + drainQueues(); } catch (...) { @@ -138,26 +114,25 @@ void RequestProcessor::run() } } -void RequestProcessor::moveRequestToPendingQueue() +void RequestProcessor::moveRequestToPendingQueue(size_t requestsToProcess) { - size_t requestsToProcess = requests_queue->size(); + if (requestsToProcess == 0) return; size_t readsProcessed = 0; while (!shutdown_called - && requestsToProcess > 0 - && readsProcessed <= max_read_batch_size) + && requestsToProcess > 0) { RequestForSession request; if (requests_queue->tryPop(request)) { if (request.request->getOpNum() == Coordination::OpNum::Auth) - { continue; - } + /// Since there are no pending write requests in the current session, + /// we can directly process these read requests without moving them to the pending queue. if (request.request->isReadRequest() && !pending_requests.contains(request.session_id)) { readsProcessed++; @@ -176,87 +151,21 @@ void RequestProcessor::moveRequestToPendingQueue() Metrics::getMetrics().reads_issued_from_requests_queue->add(readsProcessed); } -bool RequestProcessor::shouldProcessCommittedRequest(const RequestForSession & committed_request, bool & found_in_pending_queue) +void RequestProcessor::processCommittedRequest(size_t commits_to_process) { - bool has_read_request = false; - bool found_error = false; - - - auto & pending_requests_for_session = pending_requests[committed_request.session_id]; - - auto process_not_in_pending_queue = [this, &found_in_pending_queue, &committed_request]() - { - found_in_pending_queue = false; - LOG_WARNING( - this->log, - "Not found committed(write) request {} in pending queue. Possible reason: 1.close requests from deadSessionCleanThread are not " - "put into pending queue; 2.error occurs(because of forward or append entries) but request is still committed, " - "'processErrorRequest' may delete request from pending request first, so here we can not find it.", - committed_request.toSimpleString()); - }; - - if (pending_requests_for_session.empty()) + if (committed_queue.empty()) { - process_not_in_pending_queue(); - return true; - } - - auto & first_pending_request = pending_requests_for_session.front(); - LOG_DEBUG( - log, - "First pending request of session {} is {}", - toHexString(committed_request.session_id), - first_pending_request.toSimpleString()); - - if (first_pending_request.request->xid == committed_request.request->xid) - { - found_in_pending_queue = true; - std::unique_lock lk(mutex); - if (error_request_ids.contains(first_pending_request.getRequestId())) - { - LOG_WARNING(log, "Request {} is in errors, but is successfully committed", committed_request.toSimpleString()); - } - return true; - } - else - { - found_in_pending_queue = false; - /// Session of the previous committed(write) request is not same with the current, - /// which means a write_request(session_1) -> request(session_2) sequence. - if (first_pending_request.request->isReadRequest()) - { - LOG_DEBUG(log, "Found read request, We should terminate the processing of committed(write) requests."); - has_read_request = true; - } - else - { - { - std::unique_lock lk(mutex); - found_error = error_request_ids.contains(first_pending_request.getRequestId()); - } - - if (found_error) - LOG_WARNING(log, "Found error request, We should terminate the processing of committed(write) requests."); - else - process_not_in_pending_queue(); - } + return; } - return !has_read_request && !found_error; -} - -std::unordered_set RequestProcessor::processCommittedRequest(size_t commits_to_process) -{ /// Drain outstanding reads { std::unique_lock lk(empty_pool_lock); cv.wait(lk, [this]{ return numRequestsProcessing == 0 || shutdown_called; }); } - auto start_time_ms = getCurrentTimeMilliseconds(); - - std::unordered_set queues_to_drain; + auto start_time_ms = getCurrentTimeMilliseconds(); size_t commits_processed = 0; for (; commits_processed < commits_to_process; ++commits_processed) @@ -300,6 +209,9 @@ std::unordered_set RequestProcessor::processCommittedRequest(size_t com } else { + /// Can't process this write yet. + /// Either there are reads pending in this session, or we + /// haven't gotten to this write yet if (!pending_requests.contains(committed_request.session_id) || pending_requests[committed_request.session_id].empty() ) @@ -313,16 +225,25 @@ std::unordered_set RequestProcessor::processCommittedRequest(size_t com committed_queue.pop(); continue; } - if (!pending_requests.contains(committed_request.session_id)) - { - LOG_DEBUG(log, "Commit request got, but not in pending_requests"); - } - else if (pending_requests[committed_request.session_id].empty()) - { - LOG_DEBUG(log, "Commit request got, but pending_requests is empty, it's a Bug"); - } - break; + + /// We push request to request_processor first, than do replication (forward to leader) + /// for every RequestProcessor::run(), wo got current commited queue size before got + /// request queue, an normal commit should find in pending queue. + + /// ----------------------------------------- Timeline -------------------------------------------------> + /// --> request_processor->push(request) + /// -----------> replication + /// ------------------------> commit_queue->push(request) + /// ----------------------------------------> got request in commit_queue->size() to process + /// --------------------------------------------------------> got request in request_processor to process + + LOG_ERROR(log, "Commit request got {}, but not in pending_requests, it's maybe get errors before (for example forward timeout)," + "and we already handle it in before. But we should still apply it.", committed_request.toSimpleString()); + + applyRequest(committed_request); + committed_queue.pop(); + continue; } // Pending reads @@ -348,116 +269,139 @@ std::unordered_set RequestProcessor::processCommittedRequest(size_t com pending_requests.erase(committed_request.session_id); else LOG_DEBUG(log, "Next pending_request {}", pending_requests_for_session.front().toSimpleString()); - - queues_to_drain.emplace(committed_request.session_id); + queues_to_drain.emplace(committed_request.session_id); } } } Metrics::getMetrics().apply_write_request_time_ms->add(getCurrentTimeMilliseconds() - start_time_ms); Metrics::getMetrics().write_commit_proc_issued->add(commits_processed); - - return queues_to_drain; } -void RequestProcessor::processErrorRequest(size_t count) +void RequestProcessor::processErrorRequest(size_t error_to_process) { - std::lock_guard lock(mutex); + // Process error_requests, move local error_request to pending_error_requests + { + std::lock_guard lock(mutex); - if (error_request_ids.empty()) - return; + LOG_WARNING(log, "There are {} error requests", error_to_process); - LOG_INFO(log, "There are {} error requests", count); + /// Note that error requests may be not processed in order. + for (size_t i = 0; i < error_to_process; i++) + { + auto & error_request = error_requests.front(); + auto [session_id, xid] = error_request.getRequestId(); - ///Note that error requests may be not processed in order. - for (size_t i = 0; i < count; i++) - { - auto & error_request = error_requests.front(); - auto [session_id, xid] = error_request.getRequestId(); + if (unlikely(isSessionRequest(error_request.opnum))) + { + ZooKeeperResponsePtr response; + if (isNewSessionRequest(error_request.opnum)) + { + auto new_session_response = std::make_shared(); + new_session_response->xid = xid; + new_session_response->internal_id = session_id; + new_session_response->success = false; + response = std::move(new_session_response); + } + else + { + auto update_session_response = std::make_shared(); + update_session_response->xid = xid; + update_session_response->session_id = session_id; + update_session_response->success = false; + response = std::move(update_session_response); + } - if (unlikely(isSessionRequest(error_request.opnum))) - { - ZooKeeperResponsePtr response; - if (isNewSessionRequest(error_request.opnum)) + response->error = error_request.error_code == nuraft::cmd_result_code::TIMEOUT ? Coordination::Error::ZOPERATIONTIMEOUT + : Coordination::Error::ZCONNECTIONLOSS; + /// TODO use real request creating time. + response->request_created_time_ms = getCurrentTimeMilliseconds(); + + responses_queue.push(ResponseForSession{session_id, response}); + + } + /// Remote request + else if (!keeper_dispatcher->isLocalSession(session_id)) { - auto new_session_response = std::make_shared(); - new_session_response->xid = xid; - new_session_response->internal_id = session_id; - new_session_response->success = false; - response = std::move(new_session_response); + if (pending_requests.contains(session_id)) + { + LOG_WARNING( + log, + "Found session {} in pending_queue while it is not local, maybe because of connection disconnected. " + "Just delete from pending queue.", + toHexString(session_id)); + pending_requests.erase(session_id); + } + + LOG_WARNING(log, "Error request {} is not local", error_request.toString()); } + /// Local request, just move it to pending_error_requests else { - auto update_session_response = std::make_shared(); - update_session_response->xid = xid; - update_session_response->session_id = session_id; - update_session_response->success = false; - response = std::move(update_session_response); + if (pending_error_requests[error_request.session_id].contains(error_request.xid)) + { + LOG_WARNING(log, "Duplicate error requests found {} ", error_request.toString()); + } + else + { + pending_error_requests[error_request.session_id].emplace(error_request.xid, error_request); + } } - response->error = error_request.error_code == nuraft::cmd_result_code::TIMEOUT ? Coordination::Error::ZOPERATIONTIMEOUT - : Coordination::Error::ZCONNECTIONLOSS; - /// TODO use real request creating time. - response->request_created_time_ms = getCurrentTimeMilliseconds(); - - responses_queue.push(ResponseForSession{session_id, response}); - - error_request_ids.erase(error_request.getRequestId()); + // Remove error request from error_requests error_requests.erase(error_requests.begin()); } - /// Remote request - else if (!keeper_dispatcher->isLocalSession(session_id)) - { - if (pending_requests.contains(session_id)) - { - LOG_WARNING( - log, - "Found session {} in pending_queue while it is not local, maybe because of connection disconnected. " - "Just delete from pending queue.", - toHexString(session_id)); - pending_requests.erase(session_id); - } + } - LOG_WARNING(log, "Error request {} is not local", error_request.toString()); - error_request_ids.erase(error_request.getRequestId()); - error_requests.erase(error_requests.begin()); + /// Handle error request in pending_error_requests + for (auto it = pending_error_requests.begin(); it != pending_error_requests.end();) + { + if (it->second.empty()) + { + LOG_ERROR(log, "Got empty queue in pending_error_requests, it's a bug"); + it = pending_error_requests.erase(it); } - /// Local request else { - /// find error request in pending queue - std::optional request = findErrorRequest(error_request); - - /// process error request - if (request) + auto & sorted_error_requests = it->second; + while (!sorted_error_requests.empty()) { - LOG_ERROR(log, "Make error response for {}", error_request.toString()); + auto & error_request = sorted_error_requests.rbegin()->second; + std::optional request = findErrorRequest(error_request); - ZooKeeperResponsePtr response = request->request->makeResponse(); - response->xid = request->request->xid; - response->zxid = 0; - response->request_created_time_ms = request->create_time; - - response->error = error_request.error_code == nuraft::cmd_result_code::TIMEOUT ? Coordination::Error::ZOPERATIONTIMEOUT - : Coordination::Error::ZCONNECTIONLOSS; - - responses_queue.push(ResponseForSession{session_id, response}); - - error_request_ids.erase(error_request.getRequestId()); - error_requests.erase(error_requests.begin()); + if (request) + { + LOG_DEBUG(log, "rico ga"); + LOG_ERROR(log, "Make error response for {}", error_request.toString()); + + ZooKeeperResponsePtr response = request->request->makeResponse(); + response->xid = request->request->xid; + response->zxid = 0; + response->request_created_time_ms = request->create_time; + + response->error = error_request.error_code == nuraft::cmd_result_code::TIMEOUT ? Coordination::Error::ZOPERATIONTIMEOUT + : Coordination::Error::ZCONNECTIONLOSS; + responses_queue.push(ResponseForSession{it->first, response}); + sorted_error_requests.erase(response->xid); + } + else + { + LOG_WARNING( + this->log, + "Not found error request {} in pending queue. Possible reason: 1. request forwarding error; 2.close requests from " + "deadSessionCleanThread are not put into pending queue; 3.error occurs(forward or append entries) but request is still " + "committed, 'processCommittedRequest' may delete request from pending request first, so here we can not find it." + , error_request.toString()); + break; + } + } + if (sorted_error_requests.empty()) + { + it = pending_error_requests.erase(it); } else { - LOG_WARNING( - this->log, - "Not found error request {} in pending queue. Possible reason: 1. request forwarding error; 2.close requests from " - "deadSessionCleanThread are not put into pending queue; 3.error occurs(forward or append entries) but request is still " - "committed, 'processCommittedRequest' may delete request from pending request first, so here we can not find it. We " - "also delete it from errors.", - error_request.toString()); - - error_request_ids.erase(error_request.getRequestId()); - error_requests.erase(error_requests.begin()); + ++it; } } } @@ -484,39 +428,70 @@ std::optional RequestProcessor::findErrorRequest(const ErrorR std::optional request; - if (pending_requests.contains(session_id)) { - auto & session_requests = pending_requests[session_id]; - - RequestForSessions new_session_requests; + auto & pending_requests_for_session = pending_requests[session_id]; + auto & request_first = pending_requests_for_session.front(); - while (!session_requests.empty()) + if (request_first.request->xid == xid + || (request_first.request->getOpNum() == Coordination::OpNum::Close && error_request.opnum == Coordination::OpNum::Close)) { - auto & request_it = session_requests.front(); - if (request_it.request->xid == xid - || (request_it.request->getOpNum() == Coordination::OpNum::Close && error_request.opnum == Coordination::OpNum::Close)) - { - LOG_WARNING(log, "Matched error request {} in pending queue", request_it.toSimpleString()); - request.emplace(request_it); - } + LOG_WARNING(log, "Matched error request {} in pending queue", request_first.toSimpleString()); + request.emplace(request_first); + pending_requests_for_session.pop(); + + if (pending_requests_for_session.empty()) + pending_requests.erase(session_id); else { - new_session_requests.push(request_it); + LOG_DEBUG(log, "Next pending_request {}", pending_requests_for_session.front().toSimpleString()); + queues_to_drain.emplace(session_id); } - session_requests.pop(); - } - - if (new_session_requests.empty()) - pending_requests.erase(session_id); - else - pending_requests[session_id] = new_session_requests; } return request; } +void RequestProcessor::drainQueues() +{ + size_t read_processed = 0; + if (queues_to_drain.empty()) + { + return; + } + + for (auto && sessionId : queues_to_drain) + { + auto & session_queue = pending_requests[sessionId]; + size_t read_after_write = 0; + + while (!shutdown_called && !session_queue.empty() && session_queue.front().request->isReadRequest()) + { + auto & read_request = session_queue.front(); + ++ read_after_write; + sendToProcessor(read_request); + + LOG_DEBUG(log, "Move read request {} from pending_requests", read_request.toSimpleString()); + session_queue.pop(); + + if (!session_queue.empty()) + { + LOG_DEBUG(log, "Next pending_request {}", session_queue.front().toSimpleString()); + } + } + + Metrics::getMetrics().reads_after_write_in_session_queue->add(read_after_write); + read_processed += read_after_write; + + // Remove empty queues + if (session_queue.empty()) + pending_requests.erase(sessionId); + } + Metrics::getMetrics().reads_issued_from_session_queue->add(read_processed); + queues_to_drain.clear(); +} + void RequestProcessor::readRequestProcessor(RunnerId runner_id) { setThreadName(("ReadProcess#" + std::to_string(runner_id)).c_str()); @@ -643,14 +618,12 @@ void RequestProcessor::onError( { if (!shutdown_called) { - RequestId id{session_id, xid}; ErrorRequest error_request{accepted, error_code, session_id, xid, opnum}; LOG_WARNING(log, "Found error request {}", error_request.toString()); { std::unique_lock lock(mutex); error_requests.push_back(error_request); - error_request_ids.emplace(id); } cv.notify_all(); } @@ -664,7 +637,7 @@ void RequestProcessor::initialize( { operation_timeout_ms = operation_timeout_ms_; runner_count = thread_count_; - max_read_batch_size = thread_count_ * 4; + // max_read_batch_size = thread_count_ * 4; server = server_; keeper_dispatcher = keeper_dispatcher_; requests_queue = std::make_shared>(10000); diff --git a/src/Service/RequestProcessor.h b/src/Service/RequestProcessor.h index 97b88bd663..f0c0356e04 100644 --- a/src/Service/RequestProcessor.h +++ b/src/Service/RequestProcessor.h @@ -44,13 +44,14 @@ class RequestProcessor /// Exist system for fatal error. [[noreturn]] static void systemExist(); - void moveRequestToPendingQueue(); + void moveRequestToPendingQueue(size_t); void readRequestProcessor(RunnerId id); void sendToProcessor(const RequestForSession & request); - void processErrorRequest(size_t count); - std::unordered_set processCommittedRequest(size_t); + void processErrorRequest(size_t); + void processCommittedRequest(size_t); + void drainQueues(); /// Apply request to state machine void applyRequest(const RequestForSession & request); @@ -81,6 +82,11 @@ class RequestProcessor /// Requests from `requests_queue` grouped by session std::unordered_map pending_requests; + /// + /// sortedErrorRequests grouped by session + using sortedErrorRequests = std::map; + std::unordered_map pending_error_requests; + /// Raft committed write requests which can be local or from other nodes. ConcurrentBoundedQueue committed_queue{1000}; @@ -88,13 +94,13 @@ class RequestProcessor std::shared_ptr keeper_dispatcher; + std::unordered_set queues_to_drain; + mutable std::mutex mutex; std::condition_variable cv; /// Error requests when append entry or forward to leader. ErrorRequests error_requests; - /// Used as index for error_requests - std::unordered_set error_request_ids; Poco::Logger * log; @@ -108,7 +114,7 @@ class RequestProcessor std::mutex empty_pool_lock; std::condition_variable empty_pool_cv; - size_t max_read_batch_size = 32; + // size_t max_read_batch_size = 32; }; }