Skip to content

Commit

Permalink
Use read-write-lock for user_response_callbacks
Browse files Browse the repository at this point in the history
  • Loading branch information
lzydmxy committed May 20, 2024
1 parent 361aa13 commit 0f0abb2
Show file tree
Hide file tree
Showing 2 changed files with 16 additions and 13 deletions.
27 changes: 15 additions & 12 deletions src/Service/KeeperDispatcher.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -119,7 +119,7 @@ void KeeperDispatcher::invokeResponseCallBack(int64_t session_id, const Coordina
/// session request
if (unlikely(isSessionRequest(response->getOpNum())))
{
std::lock_guard lock(response_callbacks_mutex);
std::shared_lock<std::shared_mutex> read_lock(response_callbacks_mutex);
auto session_writer = session_response_callbacks.find(session_id); /// TODO session id == internal id?
if (session_writer == session_response_callbacks.end())
return;
Expand All @@ -128,15 +128,18 @@ void KeeperDispatcher::invokeResponseCallBack(int64_t session_id, const Coordina
/// user request
else
{
std::lock_guard lock(response_callbacks_mutex);
std::shared_lock<std::shared_mutex> read_lock(response_callbacks_mutex);
auto session_writer = user_response_callbacks.find(session_id);
if (session_writer == user_response_callbacks.end())
return;

session_writer->second(response);
/// Session closed, no more writes
if (response->xid != Coordination::WATCH_XID && response->getOpNum() == Coordination::OpNum::Close)
unregisterUserResponseCallBackWithoutLock(session_id);
{
read_lock.unlock();
unregisterUserResponseCallBack(session_id);
}
}
}

Expand Down Expand Up @@ -177,7 +180,7 @@ bool KeeperDispatcher::pushSessionRequest(const Coordination::ZooKeeperRequestPt
bool KeeperDispatcher::pushRequest(const Coordination::ZooKeeperRequestPtr & request, int64_t session_id)
{
{
std::lock_guard lock(response_callbacks_mutex);
std::shared_lock<std::shared_mutex> read_lock(response_callbacks_mutex);
/// session is expired by server
if (user_response_callbacks.count(session_id) == 0)
return false;
Expand Down Expand Up @@ -331,7 +334,7 @@ void KeeperDispatcher::shutdown()
response->error = Coordination::Error::ZSESSIONEXPIRED;
invokeResponseCallBack(request_for_session.session_id, response);
}
std::lock_guard lock(response_callbacks_mutex);
std::unique_lock<std::shared_mutex> write_lock(response_callbacks_mutex);
user_response_callbacks.clear();
session_response_callbacks.clear();
}
Expand All @@ -346,14 +349,14 @@ void KeeperDispatcher::shutdown()
void KeeperDispatcher::registerSessionResponseCallback(int64_t id, ZooKeeperResponseCallback callback)
{
LOG_DEBUG(log, "Register session response callback {}", toHexString(id));
std::lock_guard lock(response_callbacks_mutex);
std::unique_lock<std::shared_mutex> write_lock(response_callbacks_mutex);
if (!session_response_callbacks.try_emplace(id, callback).second)
throw Exception(RK::ErrorCodes::LOGICAL_ERROR, "Session response callback with id {} has already registered", toHexString(id));
}

void KeeperDispatcher::unRegisterSessionResponseCallback(int64_t id)
{
std::lock_guard lock(response_callbacks_mutex);
std::unique_lock<std::shared_mutex> write_lock(response_callbacks_mutex);
unRegisterSessionResponseCallbackWithoutLock(id);
}

Expand All @@ -367,7 +370,7 @@ void KeeperDispatcher::unRegisterSessionResponseCallbackWithoutLock(int64_t id)

[[maybe_unused]] void KeeperDispatcher::registerUserResponseCallBack(int64_t session_id, ZooKeeperResponseCallback callback, bool is_reconnected)
{
std::lock_guard lock(response_callbacks_mutex);
std::unique_lock<std::shared_mutex> write_lock(response_callbacks_mutex);
registerUserResponseCallBackWithoutLock(session_id, callback, is_reconnected);
}

Expand All @@ -382,7 +385,7 @@ void KeeperDispatcher::registerUserResponseCallBackWithoutLock(int64_t session_i

void KeeperDispatcher::unregisterUserResponseCallBack(int64_t session_id)
{
std::lock_guard lock(response_callbacks_mutex);
std::unique_lock<std::shared_mutex> write_lock(response_callbacks_mutex);
unregisterUserResponseCallBackWithoutLock(session_id);
}

Expand Down Expand Up @@ -537,14 +540,14 @@ void KeeperDispatcher::updateConfigurationThread()

bool KeeperDispatcher::isLocalSession(int64_t session_id)
{
std::lock_guard lock(response_callbacks_mutex);
std::shared_lock<std::shared_mutex> read_lock(response_callbacks_mutex);
auto it = user_response_callbacks.find(session_id);
return it != user_response_callbacks.end();
}

void KeeperDispatcher::filterLocalSessions(std::unordered_map<int64_t, int64_t> & session_to_expiration_time)
{
std::lock_guard lock(response_callbacks_mutex);
std::unique_lock<std::shared_mutex> write_lock(response_callbacks_mutex);
for (auto it = session_to_expiration_time.begin(); it != session_to_expiration_time.end();)
{
if (!user_response_callbacks.contains(it->first))
Expand Down Expand Up @@ -630,7 +633,7 @@ Keeper4LWInfo KeeperDispatcher::getKeeper4LWInfo()
result.outstanding_requests_count = requests_queue->size();
}
{
std::lock_guard lock(response_callbacks_mutex);
std::shared_lock<std::shared_mutex> read_lock(response_callbacks_mutex);
result.alive_connections_count = user_response_callbacks.size();
}
if (result.is_leader)
Expand Down
2 changes: 1 addition & 1 deletion src/Service/KeeperDispatcher.h
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ class KeeperDispatcher : public std::enable_shared_from_this<KeeperDispatcher>
/// which are local session which are directly connected to the node.
using UserResponseCallbacks = std::unordered_map<int64_t, ZooKeeperResponseCallback>;
UserResponseCallbacks user_response_callbacks;
std::mutex response_callbacks_mutex;
std::shared_mutex response_callbacks_mutex;

/// Just like user_response_callbacks, but only concerns new session or update session requests.
/// For new session request the key is internal_id, for update session request the key is session id.
Expand Down

0 comments on commit 0f0abb2

Please sign in to comment.