diff --git a/src/Service/KeeperDispatcher.cpp b/src/Service/KeeperDispatcher.cpp index d30248b2e6..d26ad57821 100644 --- a/src/Service/KeeperDispatcher.cpp +++ b/src/Service/KeeperDispatcher.cpp @@ -119,7 +119,7 @@ void KeeperDispatcher::invokeResponseCallBack(int64_t session_id, const Coordina /// session request if (unlikely(isSessionRequest(response->getOpNum()))) { - std::lock_guard lock(response_callbacks_mutex); + std::shared_lock read_lock(response_callbacks_mutex); auto session_writer = session_response_callbacks.find(session_id); /// TODO session id == internal id? if (session_writer == session_response_callbacks.end()) return; @@ -128,7 +128,7 @@ void KeeperDispatcher::invokeResponseCallBack(int64_t session_id, const Coordina /// user request else { - std::lock_guard lock(response_callbacks_mutex); + std::shared_lock read_lock(response_callbacks_mutex); auto session_writer = user_response_callbacks.find(session_id); if (session_writer == user_response_callbacks.end()) return; @@ -136,7 +136,10 @@ void KeeperDispatcher::invokeResponseCallBack(int64_t session_id, const Coordina session_writer->second(response); /// Session closed, no more writes if (response->xid != Coordination::WATCH_XID && response->getOpNum() == Coordination::OpNum::Close) - unregisterUserResponseCallBackWithoutLock(session_id); + { + read_lock.unlock(); + unregisterUserResponseCallBack(session_id); + } } } @@ -177,7 +180,7 @@ bool KeeperDispatcher::pushSessionRequest(const Coordination::ZooKeeperRequestPt bool KeeperDispatcher::pushRequest(const Coordination::ZooKeeperRequestPtr & request, int64_t session_id) { { - std::lock_guard lock(response_callbacks_mutex); + std::shared_lock read_lock(response_callbacks_mutex); /// session is expired by server if (user_response_callbacks.count(session_id) == 0) return false; @@ -331,7 +334,7 @@ void KeeperDispatcher::shutdown() response->error = Coordination::Error::ZSESSIONEXPIRED; invokeResponseCallBack(request_for_session.session_id, response); } - std::lock_guard lock(response_callbacks_mutex); + std::unique_lock write_lock(response_callbacks_mutex); user_response_callbacks.clear(); session_response_callbacks.clear(); } @@ -346,14 +349,14 @@ void KeeperDispatcher::shutdown() void KeeperDispatcher::registerSessionResponseCallback(int64_t id, ZooKeeperResponseCallback callback) { LOG_DEBUG(log, "Register session response callback {}", toHexString(id)); - std::lock_guard lock(response_callbacks_mutex); + std::unique_lock write_lock(response_callbacks_mutex); if (!session_response_callbacks.try_emplace(id, callback).second) throw Exception(RK::ErrorCodes::LOGICAL_ERROR, "Session response callback with id {} has already registered", toHexString(id)); } void KeeperDispatcher::unRegisterSessionResponseCallback(int64_t id) { - std::lock_guard lock(response_callbacks_mutex); + std::unique_lock write_lock(response_callbacks_mutex); unRegisterSessionResponseCallbackWithoutLock(id); } @@ -367,7 +370,7 @@ void KeeperDispatcher::unRegisterSessionResponseCallbackWithoutLock(int64_t id) [[maybe_unused]] void KeeperDispatcher::registerUserResponseCallBack(int64_t session_id, ZooKeeperResponseCallback callback, bool is_reconnected) { - std::lock_guard lock(response_callbacks_mutex); + std::unique_lock write_lock(response_callbacks_mutex); registerUserResponseCallBackWithoutLock(session_id, callback, is_reconnected); } @@ -382,7 +385,7 @@ void KeeperDispatcher::registerUserResponseCallBackWithoutLock(int64_t session_i void KeeperDispatcher::unregisterUserResponseCallBack(int64_t session_id) { - std::lock_guard lock(response_callbacks_mutex); + std::unique_lock write_lock(response_callbacks_mutex); unregisterUserResponseCallBackWithoutLock(session_id); } @@ -537,14 +540,14 @@ void KeeperDispatcher::updateConfigurationThread() bool KeeperDispatcher::isLocalSession(int64_t session_id) { - std::lock_guard lock(response_callbacks_mutex); + std::shared_lock read_lock(response_callbacks_mutex); auto it = user_response_callbacks.find(session_id); return it != user_response_callbacks.end(); } void KeeperDispatcher::filterLocalSessions(std::unordered_map & session_to_expiration_time) { - std::lock_guard lock(response_callbacks_mutex); + std::unique_lock write_lock(response_callbacks_mutex); for (auto it = session_to_expiration_time.begin(); it != session_to_expiration_time.end();) { if (!user_response_callbacks.contains(it->first)) @@ -630,7 +633,7 @@ Keeper4LWInfo KeeperDispatcher::getKeeper4LWInfo() result.outstanding_requests_count = requests_queue->size(); } { - std::lock_guard lock(response_callbacks_mutex); + std::shared_lock read_lock(response_callbacks_mutex); result.alive_connections_count = user_response_callbacks.size(); } if (result.is_leader) diff --git a/src/Service/KeeperDispatcher.h b/src/Service/KeeperDispatcher.h index b57fa20d3a..3fa1f5c62f 100644 --- a/src/Service/KeeperDispatcher.h +++ b/src/Service/KeeperDispatcher.h @@ -41,7 +41,7 @@ class KeeperDispatcher : public std::enable_shared_from_this /// which are local session which are directly connected to the node. using UserResponseCallbacks = std::unordered_map; UserResponseCallbacks user_response_callbacks; - std::mutex response_callbacks_mutex; + std::shared_mutex response_callbacks_mutex; /// Just like user_response_callbacks, but only concerns new session or update session requests. /// For new session request the key is internal_id, for update session request the key is session id.