Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Release memory when loading snapshot completes #242

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
58 changes: 24 additions & 34 deletions src/Service/KeeperStore.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -293,7 +293,7 @@ struct StoreRequestCreate final : public StoreRequest
std::pair<Coordination::ZooKeeperResponsePtr, Undo>
process(KeeperStore & store, int64_t zxid, int64_t session_id, int64_t time) const override
{
Poco::Logger * log = &(Poco::Logger::get("SvsKeeperStorageCreateRequest"));
Poco::Logger * log = &(Poco::Logger::get("StoreRequestCreate"));

Coordination::ZooKeeperResponsePtr response_ptr = zk_request->makeResponse();
Undo undo;
Expand Down Expand Up @@ -429,7 +429,7 @@ struct StoreRequestGet final : public StoreRequest

bool checkAuth(KeeperStore & store, int64_t session_id) const override
{
Poco::Logger * log = &(Poco::Logger::get("SvsKeeperStorageGetRequest"));
Poco::Logger * log = &(Poco::Logger::get("StoreRequestGet"));
auto & container = store.container;
auto node = container.get(zk_request->getPath());
if (node == nullptr)
Expand Down Expand Up @@ -519,8 +519,8 @@ struct StoreRequestRemove final : public StoreRequest
Coordination::ZooKeeperRemoveRequest & request = dynamic_cast<Coordination::ZooKeeperRemoveRequest &>(*zk_request);
Undo undo;

Poco::Logger * log = &(Poco::Logger::get("SvsKeeperStorageRemoveRequest"));
KeeperStore::Container::SharedElement node = store.container.get(request.path);
Poco::Logger * log = &(Poco::Logger::get("StoreRequestRemove"));
auto node = store.container.get(request.path);
if (node == nullptr)
{
response.error = Coordination::Error::ZNONODE;
Expand Down Expand Up @@ -736,7 +736,7 @@ struct StoreRequestList final : public StoreRequest

auto path_prefix = request.path;
if (path_prefix.empty())
throw RK::Exception("Logical error: path cannot be empty", ErrorCodes::LOGICAL_ERROR);
throw RK::Exception(ErrorCodes::LOGICAL_ERROR, "Logical error: path cannot be empty");

if (response_ptr->getOpNum() == Coordination::OpNum::List || response_ptr->getOpNum() == Coordination::OpNum::FilteredList)
{
Expand Down Expand Up @@ -764,7 +764,7 @@ struct StoreRequestList final : public StoreRequest
if (node == nullptr)
{
LOG_ERROR(
&Poco::Logger::get("SvsKeeperStorageListRequest"),
&Poco::Logger::get("StoreRequestList"),
"Inconsistency found between uncommitted and committed data, can't get child {} for {} ."
"Keeper will terminate to avoid undefined behaviour.", child, request.path);
std::terminate();
Expand Down Expand Up @@ -1225,7 +1225,7 @@ class StoreRequestFactory final : private boost::noncopyable
{
auto it = op_num_to_request.find(zk_request->getOpNum());
if (it == op_num_to_request.end())
throw RK::Exception("Unknown operation type " + toString(zk_request->getOpNum()), ErrorCodes::LOGICAL_ERROR);
throw RK::Exception(ErrorCodes::LOGICAL_ERROR, "Unknown operation type {}", toString(zk_request->getOpNum()));

return it->second(zk_request);
}
Expand Down Expand Up @@ -1618,61 +1618,51 @@ bool KeeperStore::updateSessionTimeout(int64_t session_id, int64_t /*session_tim
return true;
}

void KeeperStore::buildPathChildren(bool from_zk_snapshot)
void KeeperStore::buildChildrenSet(bool from_zk_snapshot)
{
LOG_INFO(log, "build path children in keeper storage {}", container.size());
/// build children
for (UInt32 bucket_idx = 0; bucket_idx < container.getBucketNum(); bucket_idx++)
for (UInt32 bucket_id = 0; bucket_id < container.getBucketNum(); bucket_id++)
{
for (const auto & it : container.getMap(bucket_idx).getMap())
for (const auto & it : container.getMap(bucket_id).getMap())
{
if (it.first == "/")
continue;

auto parent_path = getParentPath(it.first);
auto child_path = getBaseName(it.first);
auto parent = container.get(parent_path);

if (parent == nullptr)
{
throw RK::Exception("Logical error: Build : can not find parent node " + it.first, ErrorCodes::LOGICAL_ERROR);
}
else
{
parent->children.insert(child_path);
if (from_zk_snapshot)
parent->stat.numChildren++;
}
throw RK::Exception(ErrorCodes::LOGICAL_ERROR, "Error when building children set, can not find parent for node {}", it.first);

parent->children.insert(child_path);
if (from_zk_snapshot)
parent->stat.numChildren++;
}
}
}

void KeeperStore::buildBucketNodes(const std::vector<BucketNodes> & all_objects_nodes, UInt32 bucket_idx)
void KeeperStore::fillDataTreeBucket(const std::vector<BucketNodes> & all_objects_nodes, UInt32 bucket_id)
{
for (auto && object_nodes : all_objects_nodes)
{
for (auto && [path, node] : object_nodes[bucket_idx])
for (auto && [path, node] : object_nodes[bucket_id])
{
// container.emplace(path, std::move(node), bucket_idx);
if (!container.emplace(path, std::move(node), bucket_idx) && path != "/")
{
throw RK::Exception(RK::ErrorCodes::LOGICAL_ERROR, "Logical error: When loading data from a snapshot, duplicate node {} were found ", path);
}
if (!container.emplace(path, std::move(node), bucket_id) && path != "/")
throw RK::Exception(RK::ErrorCodes::LOGICAL_ERROR, "Error when filling data tree bucket {}, duplicated node {}", bucket_id, path);
}
}
}

void KeeperStore::buildBucketChildren(const std::vector<BucketEdges> & all_objects_edges, UInt32 bucket_idx)
void KeeperStore::buildBucketChildren(const std::vector<BucketEdges> & all_objects_edges, UInt32 bucket_id)
{
for (auto & object_edges : all_objects_edges)
for (const auto & object_edges : all_objects_edges)
{
for (auto & [parent_path, path] : object_edges[bucket_idx])
for (const auto & [parent_path, path] : object_edges[bucket_id])
{
auto parent = container.get(parent_path);

if (unlikely(parent == nullptr))
{
throw RK::Exception(RK::ErrorCodes::LOGICAL_ERROR, "Logical error: Build : can not find parent {} for node ", path);
}
throw RK::Exception(RK::ErrorCodes::LOGICAL_ERROR, "Can not find parent for node {}", path);

parent->children.emplace(std::move(path));
}
Expand Down
82 changes: 42 additions & 40 deletions src/Service/KeeperStore.h
Original file line number Diff line number Diff line change
Expand Up @@ -79,86 +79,89 @@ struct KeeperNodeWithPath
std::shared_ptr<KeeperNode> node;
};

/// KeeperNodeMap is a two-level unordered_map which is designed
/// to reduce latency for unordered_map scales.
template <typename Element, unsigned NumBuckets>
/// KeeperNodeMap is a two-level unordered_map which is designed to reduce latency for unordered_map scaling.
/// It is not a thread-safe map. But it is accessed only in the request processor thread.
template <typename Value, unsigned NumBuckets>
class KeeperNodeMap
{
public:
using SharedElement = std::shared_ptr<Element>;
using ElementMap = std::unordered_map<String, SharedElement>;
using Action = std::function<void(const String &, const SharedElement &)>;
using Key = String;
using ValuePtr = std::shared_ptr<Value>;
using NestedMap = std::unordered_map<String, ValuePtr>;
using Action = std::function<void(const String &, const ValuePtr &)>;

class InnerMap
{
public:
SharedElement get(String const & key)
ValuePtr get(const String & key)
{
auto i = map_.find(key);
return (i != map_.end()) ? i->second : nullptr;
auto i = map.find(key);
return (i != map.end()) ? i->second : nullptr;
}

template <typename T>
bool emplace(String const & key, T && value)
bool emplace(const String & key, T && value)
{
return map_.insert_or_assign(key, value).second;
return map.insert_or_assign(key, value).second;
}

bool erase(String const & key)
bool erase(const String & key)
{
return map_.erase(key);
return map.erase(key);
}

size_t size() const
{
return map_.size();
return map.size();
}

void clear()
{
map_.clear();
map.clear();
}

void forEach(const Action & fn)
{
for (const auto & [key, value] : map_)
for (const auto & [key, value] : map)
fn(key, value);
}

/// This method will destroy InnerMap thread safety property.
/// deprecated use forEach instead.
ElementMap & getMap() { return map_; }
/// Deprecated, please use forEach instead.
NestedMap & getMap() { return map; }

private:
ElementMap map_;
NestedMap map;
};

private:
std::array<InnerMap, NumBuckets> maps_;
std::hash<String> hash_;
inline InnerMap & mapFor(const String & key) { return buckets[hash(key) % NumBuckets]; }

std::array<InnerMap, NumBuckets> buckets;
std::hash<String> hash;
std::atomic<size_t> node_count{0};

public:
SharedElement get(const String & key) { return mapFor(key).get(key); }
SharedElement at(const String & key) { return mapFor(key).get(key); }
ValuePtr get(const String & key) { return mapFor(key).get(key); }
ValuePtr at(const String & key) { return mapFor(key).get(key); }

template <typename T>
bool emplace(const String & key, T && value)
{
if (mapFor(key).emplace(key, std::forward<T>(value)))
{
node_count ++;
node_count++;
return true;
}
return false;
}

template <typename T>
bool emplace(const String & key, T && value, UInt32 bucket_idx)
bool emplace(const String & key, T && value, UInt32 bucket_id)
{
if (maps_[bucket_idx].emplace(key, std::forward<T>(value)))
if (buckets[bucket_id].emplace(key, std::forward<T>(value)))
{
node_count ++;
node_count++;
return true;
}
return false;
Expand All @@ -168,24 +171,23 @@ class KeeperNodeMap
{
if (mapFor(key).erase(key))
{
node_count --;
node_count--;
return true;
}
return false;
}

size_t count(const String & key) { return get(key) != nullptr ? 1 : 0; }


InnerMap & mapFor(String const & key) { return maps_[hash_(key) % NumBuckets]; }
UInt32 getBucketIndex(String const & key) { return hash_(key) % NumBuckets; }
UInt32 getBucketIndex(const String & key) { return hash(key) % NumBuckets; }
UInt32 getBucketNum() const { return NumBuckets; }
InnerMap & getMap(const UInt32 & index) { return maps_[index]; }

InnerMap & getMap(const UInt32 & bucket_id) { return buckets[bucket_id]; }

void clear()
{
for (auto & map : maps_)
map.clear();
for (auto & bucket : buckets)
bucket.clear();
node_count.store(0);
}

Expand Down Expand Up @@ -316,17 +318,17 @@ class KeeperStore
bool check_acl = true,
bool ignore_response = false);

/// Build path children after load data from snapshot
void buildPathChildren(bool from_zk_snapshot = false);
/// Build children set after loading data from snapshot
void buildChildrenSet(bool from_zk_snapshot = false);

// Build childrenSet for the node in specified bucket after load data from snapshot.
void buildBucketChildren(const std::vector<BucketEdges> & all_objects_edges, UInt32 bucket_idx);
void buildBucketNodes(const std::vector<BucketNodes> & all_objects_nodes, UInt32 bucket_idx);
// Build children set for the nodes in specified bucket after load data from snapshot.
void buildBucketChildren(const std::vector<BucketEdges> & all_objects_edges, UInt32 bucket_id);
void fillDataTreeBucket(const std::vector<BucketNodes> & all_objects_nodes, UInt32 bucket_id);


void finalize();

/// Add session id. Used when restoring KeeperStorage from snapshot.
/// Add session id. Used when restoring KeeperStore from snapshot.
void addSessionID(int64_t session_id, int64_t session_timeout_ms)
{
std::lock_guard lock(session_mutex);
Expand Down
Loading
Loading