Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Remove unnecessary locks in keeperStore #238

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
31 changes: 16 additions & 15 deletions src/Service/KeeperStore.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -373,8 +373,6 @@ struct StoreRequestCreate final : public StoreRequest
int64_t pzxid;

{
std::lock_guard parent_lock(parent->mutex);

response.path_created = path_created;

parent->children.insert(child_path);
Expand Down Expand Up @@ -413,7 +411,6 @@ struct StoreRequestCreate final : public StoreRequest
}
auto undo_parent = store.container.at(parent_path);
{
std::lock_guard parent_lock(undo_parent->mutex);
--undo_parent->stat.cversion;
--undo_parent->stat.numChildren;
undo_parent->stat.pzxid = pzxid;
Expand Down Expand Up @@ -473,7 +470,6 @@ struct StoreRequestGet final : public StoreRequest
else
{
{
std::shared_lock r_lock(node->mutex);
response.stat = node->statForResponse();
response.data = node->data;
}
Expand Down Expand Up @@ -548,7 +544,6 @@ struct StoreRequestRemove final : public StoreRequest

auto parent = store.container.at(getParentPath(request.path));
{
std::lock_guard parent_lock(parent->mutex);
--parent->stat.numChildren;
pzxid = parent->stat.pzxid;
parent->stat.pzxid = zxid;
Expand Down Expand Up @@ -578,7 +573,6 @@ struct StoreRequestRemove final : public StoreRequest
store.container.emplace(path, prev_node);
auto undo_parent = store.container.at(getParentPath(path));
{
std::lock_guard parent_lock(undo_parent->mutex);
++(undo_parent->stat.numChildren);
undo_parent->stat.pzxid = pzxid;
undo_parent->children.insert(child_basename);
Expand Down Expand Up @@ -610,7 +604,6 @@ struct StoreRequestExists final : public StoreRequest
if (node != nullptr)
{
{
std::shared_lock r_lock(node->mutex);
response.stat = node->statForResponse();
}
response.error = Coordination::Error::ZOK;
Expand Down Expand Up @@ -671,7 +664,6 @@ struct StoreRequestSet final : public StoreRequest
{
auto prev_node = node->clone();
{
std::lock_guard node_lock(node->mutex);
++node->stat.version;
node->stat.mzxid = zxid;
node->stat.mtime = time;
Expand Down Expand Up @@ -756,7 +748,6 @@ struct StoreRequestList final : public StoreRequest
}

Coordination::ZooKeeperListResponse & response = dynamic_cast<Coordination::ZooKeeperListResponse &>(*response_ptr);
std::shared_lock r_lock(node->mutex);

response.stat = node->statForResponse();

Expand Down Expand Up @@ -792,7 +783,6 @@ struct StoreRequestList final : public StoreRequest
else
{
Coordination::ZooKeeperSimpleListResponse & response = dynamic_cast<Coordination::ZooKeeperSimpleListResponse &>(*response_ptr);
std::shared_lock r_lock(node->mutex);
response.names.insert(response.names.end(), node->children.begin(), node->children.end());
}

Expand Down Expand Up @@ -919,7 +909,6 @@ struct StoreRequestSetACL final : public StoreRequest
uint64_t acl_id = store.acl_map.convertACLs(node_acls);
store.acl_map.addUsage(acl_id);

std::lock_guard node_lock(node->mutex);
node->acl_id = acl_id;
++node->stat.aversion;

Expand Down Expand Up @@ -976,7 +965,6 @@ struct StoreRequestGetACL final : public StoreRequest
}
else
{
std::shared_lock r_lock(node->mutex);
response.stat = node->stat;
response.acl = store.acl_map.convertNumber(node->acl_id);
}
Expand Down Expand Up @@ -1195,7 +1183,6 @@ void KeeperStore::finalize()
{
auto parent = container.at(getParentPath(ephemeral_path));
{
std::lock_guard parent_lock(parent->mutex);
--parent->stat.numChildren;
parent->children.erase(getBaseName(ephemeral_path));
}
Expand Down Expand Up @@ -1659,6 +1646,21 @@ void KeeperStore::buildPathChildren(bool from_zk_snapshot)
}
}

void KeeperStore::buildBucketNodes(const std::vector<BucketNodes> & all_objects_nodes, UInt32 bucket_idx)
{
for (auto && object_nodes : all_objects_nodes)
{
for (auto && [path, node] : object_nodes[bucket_idx])
{
// container.emplace(path, std::move(node), bucket_idx);
if (!container.emplace(path, std::move(node), bucket_idx) && path != "/")
{
throw RK::Exception(RK::ErrorCodes::LOGICAL_ERROR, "Logical error: When loading data from a snapshot, duplicate node {} were found ", path);
}
}
}
}

void KeeperStore::buildBucketChildren(const std::vector<BucketEdges> & all_objects_edges, UInt32 bucket_idx)
{
for (auto & object_edges : all_objects_edges)
Expand All @@ -1669,7 +1671,7 @@ void KeeperStore::buildBucketChildren(const std::vector<BucketEdges> & all_objec

if (unlikely(parent == nullptr))
{
throw RK::Exception("Logical error: Build : can not find parent for node " + path, ErrorCodes::LOGICAL_ERROR);
throw RK::Exception(RK::ErrorCodes::LOGICAL_ERROR, "Logical error: Build : can not find parent {} for node ", path);
}

parent->children.emplace(std::move(path));
Expand Down Expand Up @@ -1700,7 +1702,6 @@ void KeeperStore::cleanEphemeralNodes(int64_t session_id, ThreadSafeQueue<Respon
}
else
{
std::lock_guard parent_lock(parent->mutex);
--parent->stat.numChildren;
parent->children.erase(getBaseName(ephemeral_path));
}
Expand Down
79 changes: 48 additions & 31 deletions src/Service/KeeperStore.h
Original file line number Diff line number Diff line change
Expand Up @@ -42,8 +42,6 @@ struct KeeperNode
Coordination::Stat stat{};
ChildrenSet children{};

std::shared_mutex mutex;

std::shared_ptr<KeeperNode> clone() const
{
auto node = std::make_shared<KeeperNode>();
Expand Down Expand Up @@ -81,60 +79,48 @@ struct KeeperNodeWithPath
std::shared_ptr<KeeperNode> node;
};

/// Map for data tree, it is thread safe with read write lock.
/// ConcurrentMap is a two-level unordered_map which is designed
/// KeeperNodeMap is a two-level unordered_map which is designed
/// to reduce latency for unordered_map scales.
template <typename Element, unsigned NumBuckets>
class ConcurrentMap
class KeeperNodeMap
{
public:
using SharedElement = std::shared_ptr<Element>;
using ElementMap = std::unordered_map<String, SharedElement>;
using Action = std::function<void(const String &, const SharedElement &)>;

/// Simple encapsulate unordered_map with lock.
class InnerMap
{
public:
SharedElement get(String const & key)
{
std::shared_lock lock(mut_);
auto i = map_.find(key);
return (i != map_.end()) ? i->second : nullptr;
}
bool emplace(String const & key, SharedElement && value)
{
std::unique_lock lock(mut_);
auto [_, created] = map_.insert_or_assign(key, std::move(value));
return created;
}
bool emplace(String const & key, const SharedElement & value)

template <typename T>
bool emplace(String const & key, T && value)
{
std::unique_lock lock(mut_);
auto [_, created] = map_.insert_or_assign(key, value);
return created;
return map_.insert_or_assign(key, value).second;
}

bool erase(String const & key)
{
std::unique_lock write_lock(mut_);
return map_.erase(key);
}

size_t size() const
{
std::shared_lock lock(mut_);
return map_.size();
}

void clear()
{
std::unique_lock lock(mut_);
map_.clear();
}

void forEach(const Action & fn)
{
std::shared_lock read_lock(mut_);
for (const auto & [key, value] : map_)
fn(key, value);
}
Expand All @@ -144,22 +130,52 @@ class ConcurrentMap
ElementMap & getMap() { return map_; }

private:
mutable std::shared_mutex mut_;
ElementMap map_;
};

private:
std::array<InnerMap, NumBuckets> maps_;
std::hash<String> hash_;
std::atomic<size_t> node_count{0};

public:
SharedElement get(const String & key) { return mapFor(key).get(key); }
SharedElement at(const String & key) { return mapFor(key).get(key); }

bool emplace(const String & key, SharedElement && value) { return mapFor(key).emplace(key, std::move(value)); }
bool emplace(const String & key, const SharedElement & value) { return mapFor(key).emplace(key, value); }
template <typename T>
bool emplace(const String & key, T && value)
{
if (mapFor(key).emplace(key, std::forward<T>(value)))
{
node_count ++;
return true;
}
return false;
}

template <typename T>
bool emplace(const String & key, T && value, UInt32 bucket_idx)
{
if (maps_[bucket_idx].emplace(key, std::forward<T>(value)))
{
node_count ++;
return true;
}
return false;
}

bool erase(String const & key)
{
if (mapFor(key).erase(key))
{
node_count --;
return true;
}
return false;
}

size_t count(const String & key) { return get(key) != nullptr ? 1 : 0; }
bool erase(String const & key) { return mapFor(key).erase(key); }


InnerMap & mapFor(String const & key) { return maps_[hash_(key) % NumBuckets]; }
UInt32 getBucketIndex(String const & key) { return hash_(key) % NumBuckets; }
Expand All @@ -170,24 +186,22 @@ class ConcurrentMap
{
for (auto & map : maps_)
map.clear();
node_count.store(0);
}

size_t size() const
{
size_t s(0);
for (const auto & map : maps_)
s += map.size();
return s;
return node_count.load();
}
};

/// KeeperStore hold data tree and sessions. It is under state machine.
class KeeperStore
{
public:
/// bucket num for ConcurrentMap
/// bucket num for KeeperNodeMap
static constexpr int MAP_BUCKET_NUM = 16;
using Container = ConcurrentMap<KeeperNode, MAP_BUCKET_NUM>;
using Container = KeeperNodeMap<KeeperNode, MAP_BUCKET_NUM>;

using ResponsesForSessions = std::vector<ResponseForSession>;
using KeeperResponsesQueue = ThreadSafeQueue<ResponseForSession>;
Expand All @@ -207,6 +221,7 @@ class KeeperStore
/// Hold Edges in different Buckets based on the parent node's bucket number.
/// It should be used when load snapshot to built node's childrenSet in parallel without lock.
using BucketEdges = std::array<Edges, MAP_BUCKET_NUM>;
using BucketNodes = std::array<std::vector<std::pair<String, std::shared_ptr<KeeperNode>>>, MAP_BUCKET_NUM>;

/// global session id counter, used to allocate new session id.
/// It should be same across all nodes.
Expand Down Expand Up @@ -306,6 +321,8 @@ class KeeperStore

// Build childrenSet for the node in specified bucket after load data from snapshot.
void buildBucketChildren(const std::vector<BucketEdges> & all_objects_edges, UInt32 bucket_idx);
void buildBucketNodes(const std::vector<BucketNodes> & all_objects_nodes, UInt32 bucket_idx);


void finalize();

Expand Down
Loading
Loading