Skip to content

Commit

Permalink
merge FilteredList
Browse files Browse the repository at this point in the history
  • Loading branch information
lzydmxy authored and JackyWoo committed Dec 11, 2023
1 parent 0b34f3e commit 51c14eb
Show file tree
Hide file tree
Showing 10 changed files with 289 additions and 27 deletions.
44 changes: 41 additions & 3 deletions src/Service/KeeperStore.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -799,12 +799,48 @@ struct SvsKeeperStorageListRequest final : public StoreRequest
if (path_prefix.empty())
throw RK::Exception("Logical error: path cannot be empty", ErrorCodes::LOGICAL_ERROR);

if (response_ptr->getOpNum() == Coordination::OpNum::List)
if (response_ptr->getOpNum() == Coordination::OpNum::List || response_ptr->getOpNum() == Coordination::OpNum::FilteredList)
{
using enum Coordination::ZooKeeperFilteredListRequest::ListRequestType;
auto list_request_type = ALL;
if (auto * filtered_list_request = dynamic_cast<Coordination::ZooKeeperFilteredListRequest *>(&request))
{
list_request_type = filtered_list_request->list_request_type;
}

Coordination::ZooKeeperListResponse & response = dynamic_cast<Coordination::ZooKeeperListResponse &>(*response_ptr);
std::shared_lock r_lock(node->mutex);
response.names.insert(response.names.end(), node->children.begin(), node->children.end());

response.stat = node->statForResponse();

if (list_request_type == ALL)
{
response.names.reserve(node->children.size());
response.names.insert(response.names.end(), node->children.begin(), node->children.end());
return {response_ptr, {}};
}

auto add_child = [&](const auto & child)
{
auto child_node = store.container.get(request.path + "/" + child);
if (node == nullptr)
{
LOG_ERROR(
&Poco::Logger::get("SvsKeeperStorageListRequest"),
"Inconsistency found between uncommitted and committed data, can't get child {} for {} ."
"Keeper will terminate to avoid undefined behaviour.", child, request.path);
std::terminate();
}

const auto is_ephemeral = child_node->stat.ephemeralOwner != 0;
return (is_ephemeral && list_request_type == EPHEMERAL_ONLY) || (!is_ephemeral && list_request_type == PERSISTENT_ONLY);
};

for (const auto & child: node->children)
{
if (add_child(child))
response.names.push_back(child);
}
}
else
{
Expand Down Expand Up @@ -1099,7 +1135,8 @@ struct SvsKeeperStorageMultiRequest final : public StoreRequest
check_operation_type(OperationType::Read);
concrete_requests.push_back(std::make_shared<SvsKeeperStorageExistsRequest>(sub_zk_request));
}
else if (sub_zk_request->getOpNum() == Coordination::OpNum::List || sub_zk_request->getOpNum() == Coordination::OpNum::SimpleList)
else if (sub_zk_request->getOpNum() == Coordination::OpNum::List || sub_zk_request->getOpNum() == Coordination::OpNum::SimpleList
|| sub_zk_request->getOpNum() == Coordination::OpNum::FilteredList)
{
check_operation_type(OperationType::Read);
concrete_requests.push_back(std::make_shared<SvsKeeperStorageListRequest>(sub_zk_request));
Expand Down Expand Up @@ -1286,6 +1323,7 @@ NuKeeperWrapperFactory::NuKeeperWrapperFactory()
registerNuKeeperRequestWrapper<Coordination::OpNum::Set, SvsKeeperStorageSetRequest>(*this);
registerNuKeeperRequestWrapper<Coordination::OpNum::List, SvsKeeperStorageListRequest>(*this);
registerNuKeeperRequestWrapper<Coordination::OpNum::SimpleList, SvsKeeperStorageListRequest>(*this);
registerNuKeeperRequestWrapper<Coordination::OpNum::FilteredList, SvsKeeperStorageListRequest>(*this);
registerNuKeeperRequestWrapper<Coordination::OpNum::Check, SvsKeeperStorageCheckRequest>(*this);
registerNuKeeperRequestWrapper<Coordination::OpNum::Multi, SvsKeeperStorageMultiRequest>(*this);
registerNuKeeperRequestWrapper<Coordination::OpNum::MultiRead, SvsKeeperStorageMultiRequest>(*this);
Expand Down
18 changes: 18 additions & 0 deletions src/ZooKeeper/ZooKeeperCommon.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -230,6 +230,23 @@ void ZooKeeperListResponse::readImpl(ReadBuffer & in)
Coordination::read(stat, in);
}

void ZooKeeperFilteredListRequest::writeImpl(WriteBuffer & out) const
{
Coordination::write(path, out);
Coordination::write(has_watch, out);
Coordination::write(static_cast<uint8_t>(list_request_type), out);
}

void ZooKeeperFilteredListRequest::readImpl(ReadBuffer & in)
{
Coordination::read(path, in);
Coordination::read(has_watch, in);

uint8_t read_request_type{0};
Coordination::read(read_request_type, in);
list_request_type = static_cast<ListRequestType>(read_request_type);
}

void ZooKeeperListResponse::writeImpl(WriteBuffer & out) const
{
Coordination::write(names, out);
Expand Down Expand Up @@ -696,6 +713,7 @@ ZooKeeperRequestFactory::ZooKeeperRequestFactory()
registerZooKeeperRequest<OpNum::Get, ZooKeeperGetRequest>(*this);
registerZooKeeperRequest<OpNum::Set, ZooKeeperSetRequest>(*this);
registerZooKeeperRequest<OpNum::SimpleList, ZooKeeperSimpleListRequest>(*this);
registerZooKeeperRequest<OpNum::FilteredList, ZooKeeperFilteredListRequest>(*this);
registerZooKeeperRequest<OpNum::List, ZooKeeperListRequest>(*this);
registerZooKeeperRequest<OpNum::Check, ZooKeeperCheckRequest>(*this);
registerZooKeeperRequest<OpNum::Multi, ZooKeeperMultiRequest>(*this);
Expand Down
37 changes: 37 additions & 0 deletions src/ZooKeeper/ZooKeeperCommon.h
Original file line number Diff line number Diff line change
Expand Up @@ -449,6 +449,43 @@ struct ZooKeeperSimpleListRequest final : ZooKeeperListRequest
}
};

struct ZooKeeperFilteredListRequest final : ZooKeeperListRequest
{
enum class ListRequestType : UInt8
{
ALL,
PERSISTENT_ONLY,
EPHEMERAL_ONLY
};

String listRequestTypeToString(ListRequestType value) const
{
static std::map<ListRequestType, String> listRequestTypeStrings =
{
{ListRequestType::ALL, "ALL"},
{ListRequestType::EPHEMERAL_ONLY, "PERSISTENT_ONLY"},
{ListRequestType::PERSISTENT_ONLY, "EPHEMERAL_ONLY"}
};

if (auto it = listRequestTypeStrings.find(value); it != listRequestTypeStrings.end())
{
return it->second;
}

return "Unknown";
}

ListRequestType list_request_type{ListRequestType::ALL};

OpNum getOpNum() const override { return OpNum::FilteredList; }
void writeImpl(WriteBuffer & out) const override;
void readImpl(ReadBuffer & in) override;
String toString() const override
{
return Coordination::toString(getOpNum()) + ", xid " + std::to_string(xid) + ", path " + path + listRequestTypeToString(list_request_type) ;
}
};

struct ZooKeeperListResponse final : ListResponse, ZooKeeperResponse
{
void readImpl(ReadBuffer & in) override;
Expand Down
3 changes: 3 additions & 0 deletions src/ZooKeeper/ZooKeeperConstants.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ static const std::unordered_set<int32_t> VALID_OPERATIONS =
static_cast<int32_t>(OpNum::SetWatches),
static_cast<int32_t>(OpNum::SetACL),
static_cast<int32_t>(OpNum::GetACL),
static_cast<int32_t>(OpNum::FilteredList),
static_cast<int32_t>(OpNum::UpdateSession),
};

Expand Down Expand Up @@ -73,6 +74,8 @@ std::string toString(OpNum op_num)
return "GetACL";
case OpNum::UpdateSession:
return "UpdateSession";
case OpNum::FilteredList:
return "FilteredList";
}
int32_t raw_op = static_cast<int32_t>(op_num);
throw Exception("Operation " + std::to_string(raw_op) + " is unknown", Error::ZUNIMPLEMENTED);
Expand Down
3 changes: 3 additions & 0 deletions src/ZooKeeper/ZooKeeperConstants.h
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,9 @@ enum class OpNum : int32_t
MultiRead = 22,
Auth = 100,
SetWatches = 101,

FilteredList = 500, /// Special operation for ClickHouse

SessionID = 997, /// Special internal request. Used to get session id.
UpdateSession = 996, /// Special internal request. Used to session reconnect.
};
Expand Down
5 changes: 5 additions & 0 deletions src/ZooKeeper/ZooKeeperIO.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -119,6 +119,11 @@ void read(uint32_t & x, ReadBuffer & in)
x = __builtin_bswap32(x);
}

void read(uint8_t & x, ReadBuffer & in)
{
readBinary(x, in);
}

void read(OpNum & x, ReadBuffer & in)
{
int32_t raw_op_num;
Expand Down
1 change: 1 addition & 0 deletions src/ZooKeeper/ZooKeeperIO.h
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@ void read(int32_t & x, ReadBuffer & in);
void read(OpNum & x, ReadBuffer & in);
void read(bool & x, ReadBuffer & in);
void read(int8_t & x, ReadBuffer & in);
void read(uint8_t & x, ReadBuffer & in);
void read(std::string & s, ReadBuffer & in);
void read(ACL & acl, ReadBuffer & in);
void read(AuthID & auth_id, ReadBuffer & in);
Expand Down
6 changes: 3 additions & 3 deletions tests/integration/helpers/cluster_service.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@
from kazoo.exceptions import KazooException

import docker
from .utils import MultiReadClient
from .utils import KeeperFeatureClient

HELPERS_DIR = p.dirname(__file__)
RAFTKEEPER_ROOT_DIR = p.join(p.dirname(__file__), "../../..")
Expand Down Expand Up @@ -454,8 +454,8 @@ def get_kazoo_client(self, zoo_instance_name):
zk.start()
return zk

def get_multi_read_client(self, zoo_instance_name):
zk = MultiReadClient(hosts=self.get_instance_ip(zoo_instance_name), timeout=60.0)
def get_keeper_feature_client(self, zoo_instance_name):
zk = KeeperFeatureClient(hosts=self.get_instance_ip(zoo_instance_name), timeout=60.0)
zk.start()
return zk

Expand Down
Loading

0 comments on commit 51c14eb

Please sign in to comment.