Skip to content

Commit

Permalink
rename block to bucket
Browse files Browse the repository at this point in the history
  • Loading branch information
lzydmxy authored and JackyWoo committed Mar 29, 2024
1 parent 6cfa7d4 commit 142a743
Show file tree
Hide file tree
Showing 7 changed files with 48 additions and 45 deletions.
8 changes: 4 additions & 4 deletions src/Service/KeeperStore.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1635,9 +1635,9 @@ void KeeperStore::buildPathChildren(bool from_zk_snapshot)
{
LOG_INFO(log, "build path children in keeper storage {}", container.size());
/// build children
for (UInt32 block_idx = 0; block_idx < container.getBlockNum(); block_idx++)
for (UInt32 bucket_idx = 0; bucket_idx < container.getBucketNum(); bucket_idx++)
{
for (const auto & it : container.getMap(block_idx).getMap())
for (const auto & it : container.getMap(bucket_idx).getMap())
{
if (it.first == "/")
continue;
Expand All @@ -1659,11 +1659,11 @@ void KeeperStore::buildPathChildren(bool from_zk_snapshot)
}
}

void KeeperStore::buildBlockChildren(const std::vector<BlocksEdges> & all_objects_edges, UInt32 block_idx)
void KeeperStore::buildBucketChildren(const std::vector<BucketEdges> & all_objects_edges, UInt32 bucket_idx)
{
for (auto & object_edges : all_objects_edges)
{
for (auto & [parent_path, path] : object_edges[block_idx])
for (auto & [parent_path, path] : object_edges[bucket_idx])
{
auto parent = container.get(parent_path);

Expand Down
27 changes: 15 additions & 12 deletions src/Service/KeeperStore.h
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,7 @@ struct KeeperNodeWithPath
/// Map for data tree, it is thread safe with read write lock.
/// ConcurrentMap is a two-level unordered_map which is designed
/// to reduce latency for unordered_map scales.
template <typename Element, unsigned NumBlocks>
template <typename Element, unsigned NumBuckets>
class ConcurrentMap
{
public:
Expand Down Expand Up @@ -149,7 +149,7 @@ class ConcurrentMap
};

private:
std::array<InnerMap, NumBlocks> maps_;
std::array<InnerMap, NumBuckets> maps_;
std::hash<String> hash_;

public:
Expand All @@ -161,9 +161,9 @@ class ConcurrentMap
size_t count(const String & key) { return get(key) != nullptr ? 1 : 0; }
bool erase(String const & key) { return mapFor(key).erase(key); }

InnerMap & mapFor(String const & key) { return maps_[hash_(key) % NumBlocks]; }
UInt32 getBlockIndex(String const & key) { return hash_(key) % NumBlocks; }
UInt32 getBlockNum() const { return NumBlocks; }
InnerMap & mapFor(String const & key) { return maps_[hash_(key) % NumBuckets]; }
UInt32 getBucketIndex(String const & key) { return hash_(key) % NumBuckets; }
UInt32 getBucketNum() const { return NumBuckets; }
InnerMap & getMap(const UInt32 & index) { return maps_[index]; }

void clear()
Expand All @@ -185,9 +185,9 @@ class ConcurrentMap
class KeeperStore
{
public:
/// block num for ConcurrentMap
static constexpr int MAP_BLOCK_NUM = 16;
using Container = ConcurrentMap<KeeperNode, MAP_BLOCK_NUM>;
/// bucket num for ConcurrentMap
static constexpr int MAP_BUCKET_NUM = 16;
using Container = ConcurrentMap<KeeperNode, MAP_BUCKET_NUM>;

using ResponsesForSessions = std::vector<ResponseForSession>;
using KeeperResponsesQueue = ThreadSafeQueue<ResponseForSession>;
Expand All @@ -203,7 +203,10 @@ class KeeperStore
using SessionIDs = std::vector<int64_t>;

using Watches = std::unordered_map<String /* path, relative of root_path */, SessionIDs>;
using BlocksEdges = std::array<Edges, MAP_BLOCK_NUM>;

/// Hold Edges in different Buckets based on the parent node's bucket number.
/// It should be used when load snapshot to built node's childrenSet in parallel without lock.
using BucketEdges = std::array<Edges, MAP_BUCKET_NUM>;

/// global session id counter, used to allocate new session id.
/// It should be same across all nodes.
Expand Down Expand Up @@ -301,8 +304,8 @@ class KeeperStore
/// Build path children after load data from snapshot
void buildPathChildren(bool from_zk_snapshot = false);

// Build childrenset for the node in specified block after load data from snapshot.
void buildBlockChildren(const std::vector<BlocksEdges> & all_objects_edges, UInt32 block_idx);
// Build childrenSet for the node in specified bucket after load data from snapshot.
void buildBucketChildren(const std::vector<BucketEdges> & all_objects_edges, UInt32 bucket_idx);

void finalize();

Expand Down Expand Up @@ -342,7 +345,7 @@ class KeeperStore
uint64_t getApproximateDataSize() const
{
UInt64 node_count = container.size();
UInt64 size_bytes = container.getBlockNum() * sizeof(Container::InnerMap) /* Inner map size */
UInt64 size_bytes = container.getBucketNum() * sizeof(Container::InnerMap) /* Inner map size */
+ node_count * 8 / 0.75 /*hash map array size*/
+ node_count * sizeof(KeeperNode) /*node size*/
+ node_count * 100; /*path and child of node size*/
Expand Down
24 changes: 12 additions & 12 deletions src/Service/NuRaftLogSnapshot.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -461,7 +461,7 @@ void KeeperSnapshotStore::parseBatchHeader(ptr<std::fstream> fs, SnapshotBatchHe
}
}

void KeeperSnapshotStore::parseObject(KeeperStore & store, String obj_path, BlocksEdges & blocks_edges)
void KeeperSnapshotStore::parseObject(KeeperStore & store, String obj_path, BucketEdges & buckets_edges)
{
ptr<std::fstream> snap_fs = cs_new<std::fstream>();
snap_fs->open(obj_path, std::ios::in | std::ios::binary);
Expand Down Expand Up @@ -553,21 +553,21 @@ void KeeperSnapshotStore::parseObject(KeeperStore & store, String obj_path, Bloc
}

if (version_from_obj < SnapshotVersion::V2)
parseBatchBody(store, body_string, blocks_edges, version_from_obj);
parseBatchBody(store, body_string, buckets_edges, version_from_obj);
else
parseBatchBodyV2(store, body_string, blocks_edges, version_from_obj);
parseBatchBodyV2(store, body_string, buckets_edges, version_from_obj);
}
}

void KeeperSnapshotStore::parseBatchBody(KeeperStore & store, const String & body_string, BlocksEdges & blocks_edges, SnapshotVersion version_)
void KeeperSnapshotStore::parseBatchBody(KeeperStore & store, const String & body_string, BucketEdges & buckets_edges, SnapshotVersion version_)
{
SnapshotBatchPB batch_pb;
batch_pb.ParseFromString(body_string);
switch (batch_pb.batch_type())
{
case SnapshotTypePB::SNAPSHOT_TYPE_DATA:
LOG_DEBUG(log, "Parsing batch data from snapshot, data count {}", batch_pb.data_size());
parseBatchData(store, batch_pb, blocks_edges, version_);
parseBatchData(store, batch_pb, buckets_edges, version_);
break;
case SnapshotTypePB::SNAPSHOT_TYPE_SESSION: {
LOG_DEBUG(log, "Parsing batch session from snapshot, session count {}", batch_pb.data_size());
Expand All @@ -591,15 +591,15 @@ void KeeperSnapshotStore::parseBatchBody(KeeperStore & store, const String & bod
}
}

void KeeperSnapshotStore::parseBatchBodyV2(KeeperStore & store, const String & body_string, BlocksEdges & blocks_edges, SnapshotVersion version_)
void KeeperSnapshotStore::parseBatchBodyV2(KeeperStore & store, const String & body_string, BucketEdges & buckets_edges, SnapshotVersion version_)
{
ptr<SnapshotBatchBody> batch;
batch = SnapshotBatchBody::parse(body_string);
switch (batch->type)
{
case SnapshotBatchType::SNAPSHOT_TYPE_DATA:
LOG_DEBUG(log, "Parsing batch data from snapshot, data count {}", batch->size());
parseBatchDataV2(store, *batch, blocks_edges, version_);
parseBatchDataV2(store, *batch, buckets_edges, version_);
break;
case SnapshotBatchType::SNAPSHOT_TYPE_SESSION: {
LOG_DEBUG(log, "Parsing batch session from snapshot, session count {}", batch->size());
Expand All @@ -626,7 +626,7 @@ void KeeperSnapshotStore::parseBatchBodyV2(KeeperStore & store, const String & b
void KeeperSnapshotStore::loadLatestSnapshot(KeeperStore & store)
{
ThreadPool object_thread_pool(SNAPSHOT_THREAD_NUM);
all_objects_edges = std::vector<BlocksEdges>(objects_path.size());
all_objects_edges = std::vector<BucketEdges>(objects_path.size());

LOG_INFO(log, "Parse object from disk");

Expand Down Expand Up @@ -673,12 +673,12 @@ void KeeperSnapshotStore::loadLatestSnapshot(KeeperStore & store)
[this, thread_idx, &store]
{
Poco::Logger * thread_log = &(Poco::Logger::get("KeeperSnapshotStore.buildChildren"));
for (UInt32 block_idx = 0; block_idx < store.container.getBlockNum(); block_idx++)
for (UInt32 bucket_idx = 0; bucket_idx < store.container.getBucketNum(); bucket_idx++)
{
if (block_idx % SNAPSHOT_THREAD_NUM == thread_idx)
if (bucket_idx % SNAPSHOT_THREAD_NUM == thread_idx)
{
LOG_INFO(thread_log, "Build ChildrenSet for nodes in block {}, thread_idx {}", block_idx, thread_idx);
store.buildBlockChildren(this->all_objects_edges, block_idx);
LOG_INFO(thread_log, "Build ChildrenSet for nodes in bucket {}, thread_idx {}", bucket_idx, thread_idx);
store.buildBucketChildren(this->all_objects_edges, bucket_idx);
}
}
});
Expand Down
8 changes: 4 additions & 4 deletions src/Service/NuRaftLogSnapshot.h
Original file line number Diff line number Diff line change
Expand Up @@ -104,16 +104,16 @@ class KeeperSnapshotStore
void getObjectPath(ulong object_id, String & path);

/// Parse an snapshot object. We should take the version from snapshot in general.
void parseObject(KeeperStore & store, String obj_path, BlocksEdges &);
void parseObject(KeeperStore & store, String obj_path, BucketEdges &);

/// Parse batch header in an object
/// TODO use internal buffer
void parseBatchHeader(ptr<std::fstream> fs, SnapshotBatchHeader & head);

/// Parse a batch /// TODO delete
void parseBatchBody(KeeperStore & store, const String &, BlocksEdges &, SnapshotVersion version_);
void parseBatchBody(KeeperStore & store, const String &, BucketEdges &, SnapshotVersion version_);
/// Parse a batch
void parseBatchBodyV2(KeeperStore & store, const String &, BlocksEdges &, SnapshotVersion version_);
void parseBatchBodyV2(KeeperStore & store, const String &, BucketEdges &, SnapshotVersion version_);

/// Serialize whole data tree /// TODO delete
size_t serializeDataTree(KeeperStore & storage);
Expand Down Expand Up @@ -168,7 +168,7 @@ class KeeperSnapshotStore

std::map<ulong, String> objects_path;

std::vector<BlocksEdges> all_objects_edges;
std::vector<BucketEdges> all_objects_edges;

/// Appended to snapshot file name
String curr_time;
Expand Down
16 changes: 8 additions & 8 deletions src/Service/SnapshotCommon.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -414,7 +414,7 @@ template void serializeMap<StringMap>(StringMap & snap_map, UInt32 save_batch_si
template void serializeMap<IntMap>(IntMap & snap_map, UInt32 save_batch_size, SnapshotVersion version, String & path);


void parseBatchData(KeeperStore & store, const SnapshotBatchPB & batch, BlocksEdges & blocks_edges, SnapshotVersion version)
void parseBatchData(KeeperStore & store, const SnapshotBatchPB & batch, BucketEdges & buckets_edges, SnapshotVersion version)
{
for (int i = 0; i < batch.data_size(); i++)
{
Expand Down Expand Up @@ -464,11 +464,11 @@ void parseBatchData(KeeperStore & store, const SnapshotBatchPB & batch, BlocksEd
throw Exception(ErrorCodes::CORRUPTED_SNAPSHOT, "Can't find parent path for path {}", path);

auto parent_path = rslash_pos == 0 ? "/" : path.substr(0, rslash_pos);
auto idx = store.container.getBlockIndex(parent_path);
auto idx = store.container.getBucketIndex(parent_path);

// Storage edges in different bucket, according to the block index of parent node.
// Storage edges in different bucket, according to the bucket index of parent node.
// Which allow us to insert child paths for all nodes in parallel.
blocks_edges[idx].emplace_back(std::move(parent_path), path.substr(rslash_pos + 1));
buckets_edges[idx].emplace_back(std::move(parent_path), path.substr(rslash_pos + 1));
}
}

Expand Down Expand Up @@ -861,7 +861,7 @@ ptr<SnapshotBatchBody> SnapshotBatchBody::parse(const String & data)
return batch_body;
}

void parseBatchDataV2(KeeperStore & store, SnapshotBatchBody & batch, BlocksEdges & blocks_edges, SnapshotVersion version)
void parseBatchDataV2(KeeperStore & store, SnapshotBatchBody & batch, BucketEdges & buckets_edges, SnapshotVersion version)
{
for (size_t i = 0; i < batch.size(); i++)
{
Expand Down Expand Up @@ -910,11 +910,11 @@ void parseBatchDataV2(KeeperStore & store, SnapshotBatchBody & batch, BlocksEdge
throw Exception(ErrorCodes::CORRUPTED_SNAPSHOT, "Can't find parent path for path {}", path);

auto parent_path = rslash_pos == 0 ? "/" : path.substr(0, rslash_pos);
auto idx = store.container.getBlockIndex(parent_path);
auto idx = store.container.getBucketIndex(parent_path);

// Storage edges in different bucket, according to the block index of parent node.
// Storage edges in different bucket, according to the bucket index of parent node.
// Which allow us to insert child paths for all nodes in parallel.
blocks_edges[idx].emplace_back(std::move(parent_path), path.substr(rslash_pos + 1));
buckets_edges[idx].emplace_back(std::move(parent_path), path.substr(rslash_pos + 1));

}
}
Expand Down
6 changes: 3 additions & 3 deletions src/Service/SnapshotCommon.h
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ static constexpr UInt32 SAVE_BATCH_SIZE = 10000;

using StringMap = std::unordered_map<String, String>;
using IntMap = std::unordered_map<String, int64_t>;
using BlocksEdges = KeeperStore::BlocksEdges;
using BucketEdges = KeeperStore::BucketEdges;

enum SnapshotVersion : uint8_t
{
Expand Down Expand Up @@ -128,7 +128,7 @@ template <typename T>
void serializeMap(T & snap_map, UInt32 save_batch_size, SnapshotVersion version, String & path);

/// Parse snapshot batch without protobuf
void parseBatchData(KeeperStore & store, const SnapshotBatchPB & batch, BlocksEdges & blocks_edges, SnapshotVersion version);
void parseBatchData(KeeperStore & store, const SnapshotBatchPB & batch, BucketEdges & buckets_edges, SnapshotVersion version);
void parseBatchSession(KeeperStore & store, const SnapshotBatchPB & batch, SnapshotVersion version);
void parseBatchAclMap(KeeperStore & store, const SnapshotBatchPB & batch, SnapshotVersion version);
void parseBatchIntMap(KeeperStore & store, const SnapshotBatchPB & batch, SnapshotVersion version);
Expand All @@ -153,7 +153,7 @@ template <typename T>
void serializeMapV2(T & snap_map, UInt32 save_batch_size, SnapshotVersion version, String & path);

/// parse snapshot batch
void parseBatchDataV2(KeeperStore & store, SnapshotBatchBody & batch, BlocksEdges & blocks_edges, SnapshotVersion version);
void parseBatchDataV2(KeeperStore & store, SnapshotBatchBody & batch, BucketEdges & buckets_edges, SnapshotVersion version);
void parseBatchSessionV2(KeeperStore & store, SnapshotBatchBody & batch, SnapshotVersion version);
void parseBatchAclMapV2(KeeperStore & store, SnapshotBatchBody & batch, SnapshotVersion version);
void parseBatchIntMapV2(KeeperStore & store, SnapshotBatchBody & batch, SnapshotVersion version);
Expand Down
4 changes: 2 additions & 2 deletions src/Service/tests/gtest_raft_snapshot.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -244,7 +244,7 @@ void assertStateMachineEquals(KeeperStore & storage, KeeperStore & ano_storage)


/// assert container
for (uint32_t i = 0; i < KeeperStore::MAP_BLOCK_NUM; i++)
for (uint32_t i = 0; i < KeeperStore::MAP_BUCKET_NUM; i++)
{
auto & map = storage.container.getMap(i);
auto & ano_map = ano_storage.container.getMap(i);
Expand Down Expand Up @@ -434,7 +434,7 @@ TEST(RaftSnapshot, readAndSaveSnapshot)
void compareKeeperStore(KeeperStore & store, KeeperStore & new_store, bool compare_acl)
{
ASSERT_EQ(new_store.container.size(), store.container.size());
for (UInt32 i = 0; i < store.container.getBlockNum(); i++)
for (UInt32 i = 0; i < store.container.getBucketNum(); i++)
{
auto & inner_map = store.container.getMap(i);
for (auto it = inner_map.getMap().begin(); it != inner_map.getMap().end(); it++)
Expand Down

0 comments on commit 142a743

Please sign in to comment.