From 51c14ebb950a1ed029b812fdc18780b058bef1f9 Mon Sep 17 00:00:00 2001 From: lzydmxy <13126752315@163.com> Date: Mon, 13 Nov 2023 23:47:24 +0800 Subject: [PATCH] merge FilteredList --- src/Service/KeeperStore.cpp | 44 +++++- src/ZooKeeper/ZooKeeperCommon.cpp | 18 +++ src/ZooKeeper/ZooKeeperCommon.h | 37 +++++ src/ZooKeeper/ZooKeeperConstants.cpp | 3 + src/ZooKeeper/ZooKeeperConstants.h | 3 + src/ZooKeeper/ZooKeeperIO.cpp | 5 + src/ZooKeeper/ZooKeeperIO.h | 1 + tests/integration/helpers/cluster_service.py | 6 +- tests/integration/helpers/utils.py | 150 +++++++++++++++++-- tests/integration/test_back_to_back/test.py | 49 +++++- 10 files changed, 289 insertions(+), 27 deletions(-) diff --git a/src/Service/KeeperStore.cpp b/src/Service/KeeperStore.cpp index 722c7ba86ef..b89bbe9cd49 100644 --- a/src/Service/KeeperStore.cpp +++ b/src/Service/KeeperStore.cpp @@ -799,12 +799,48 @@ struct SvsKeeperStorageListRequest final : public StoreRequest if (path_prefix.empty()) throw RK::Exception("Logical error: path cannot be empty", ErrorCodes::LOGICAL_ERROR); - if (response_ptr->getOpNum() == Coordination::OpNum::List) + if (response_ptr->getOpNum() == Coordination::OpNum::List || response_ptr->getOpNum() == Coordination::OpNum::FilteredList) { + using enum Coordination::ZooKeeperFilteredListRequest::ListRequestType; + auto list_request_type = ALL; + if (auto * filtered_list_request = dynamic_cast(&request)) + { + list_request_type = filtered_list_request->list_request_type; + } + Coordination::ZooKeeperListResponse & response = dynamic_cast(*response_ptr); std::shared_lock r_lock(node->mutex); - response.names.insert(response.names.end(), node->children.begin(), node->children.end()); + response.stat = node->statForResponse(); + + if (list_request_type == ALL) + { + response.names.reserve(node->children.size()); + response.names.insert(response.names.end(), node->children.begin(), node->children.end()); + return {response_ptr, {}}; + } + + auto add_child = [&](const auto & child) + { + auto child_node = store.container.get(request.path + "/" + child); + if (node == nullptr) + { + LOG_ERROR( + &Poco::Logger::get("SvsKeeperStorageListRequest"), + "Inconsistency found between uncommitted and committed data, can't get child {} for {} ." + "Keeper will terminate to avoid undefined behaviour.", child, request.path); + std::terminate(); + } + + const auto is_ephemeral = child_node->stat.ephemeralOwner != 0; + return (is_ephemeral && list_request_type == EPHEMERAL_ONLY) || (!is_ephemeral && list_request_type == PERSISTENT_ONLY); + }; + + for (const auto & child: node->children) + { + if (add_child(child)) + response.names.push_back(child); + } } else { @@ -1099,7 +1135,8 @@ struct SvsKeeperStorageMultiRequest final : public StoreRequest check_operation_type(OperationType::Read); concrete_requests.push_back(std::make_shared(sub_zk_request)); } - else if (sub_zk_request->getOpNum() == Coordination::OpNum::List || sub_zk_request->getOpNum() == Coordination::OpNum::SimpleList) + else if (sub_zk_request->getOpNum() == Coordination::OpNum::List || sub_zk_request->getOpNum() == Coordination::OpNum::SimpleList + || sub_zk_request->getOpNum() == Coordination::OpNum::FilteredList) { check_operation_type(OperationType::Read); concrete_requests.push_back(std::make_shared(sub_zk_request)); @@ -1286,6 +1323,7 @@ NuKeeperWrapperFactory::NuKeeperWrapperFactory() registerNuKeeperRequestWrapper(*this); registerNuKeeperRequestWrapper(*this); registerNuKeeperRequestWrapper(*this); + registerNuKeeperRequestWrapper(*this); registerNuKeeperRequestWrapper(*this); registerNuKeeperRequestWrapper(*this); registerNuKeeperRequestWrapper(*this); diff --git a/src/ZooKeeper/ZooKeeperCommon.cpp b/src/ZooKeeper/ZooKeeperCommon.cpp index b4211a2840f..01ab7cd1346 100644 --- a/src/ZooKeeper/ZooKeeperCommon.cpp +++ b/src/ZooKeeper/ZooKeeperCommon.cpp @@ -230,6 +230,23 @@ void ZooKeeperListResponse::readImpl(ReadBuffer & in) Coordination::read(stat, in); } +void ZooKeeperFilteredListRequest::writeImpl(WriteBuffer & out) const +{ + Coordination::write(path, out); + Coordination::write(has_watch, out); + Coordination::write(static_cast(list_request_type), out); +} + +void ZooKeeperFilteredListRequest::readImpl(ReadBuffer & in) +{ + Coordination::read(path, in); + Coordination::read(has_watch, in); + + uint8_t read_request_type{0}; + Coordination::read(read_request_type, in); + list_request_type = static_cast(read_request_type); +} + void ZooKeeperListResponse::writeImpl(WriteBuffer & out) const { Coordination::write(names, out); @@ -696,6 +713,7 @@ ZooKeeperRequestFactory::ZooKeeperRequestFactory() registerZooKeeperRequest(*this); registerZooKeeperRequest(*this); registerZooKeeperRequest(*this); + registerZooKeeperRequest(*this); registerZooKeeperRequest(*this); registerZooKeeperRequest(*this); registerZooKeeperRequest(*this); diff --git a/src/ZooKeeper/ZooKeeperCommon.h b/src/ZooKeeper/ZooKeeperCommon.h index b14dbae4c26..f1998d1bc98 100644 --- a/src/ZooKeeper/ZooKeeperCommon.h +++ b/src/ZooKeeper/ZooKeeperCommon.h @@ -449,6 +449,43 @@ struct ZooKeeperSimpleListRequest final : ZooKeeperListRequest } }; +struct ZooKeeperFilteredListRequest final : ZooKeeperListRequest +{ + enum class ListRequestType : UInt8 + { + ALL, + PERSISTENT_ONLY, + EPHEMERAL_ONLY + }; + + String listRequestTypeToString(ListRequestType value) const + { + static std::map listRequestTypeStrings = + { + {ListRequestType::ALL, "ALL"}, + {ListRequestType::EPHEMERAL_ONLY, "PERSISTENT_ONLY"}, + {ListRequestType::PERSISTENT_ONLY, "EPHEMERAL_ONLY"} + }; + + if (auto it = listRequestTypeStrings.find(value); it != listRequestTypeStrings.end()) + { + return it->second; + } + + return "Unknown"; + } + + ListRequestType list_request_type{ListRequestType::ALL}; + + OpNum getOpNum() const override { return OpNum::FilteredList; } + void writeImpl(WriteBuffer & out) const override; + void readImpl(ReadBuffer & in) override; + String toString() const override + { + return Coordination::toString(getOpNum()) + ", xid " + std::to_string(xid) + ", path " + path + listRequestTypeToString(list_request_type) ; + } +}; + struct ZooKeeperListResponse final : ListResponse, ZooKeeperResponse { void readImpl(ReadBuffer & in) override; diff --git a/src/ZooKeeper/ZooKeeperConstants.cpp b/src/ZooKeeper/ZooKeeperConstants.cpp index 5398bbca826..d661e643937 100644 --- a/src/ZooKeeper/ZooKeeperConstants.cpp +++ b/src/ZooKeeper/ZooKeeperConstants.cpp @@ -26,6 +26,7 @@ static const std::unordered_set VALID_OPERATIONS = static_cast(OpNum::SetWatches), static_cast(OpNum::SetACL), static_cast(OpNum::GetACL), + static_cast(OpNum::FilteredList), static_cast(OpNum::UpdateSession), }; @@ -73,6 +74,8 @@ std::string toString(OpNum op_num) return "GetACL"; case OpNum::UpdateSession: return "UpdateSession"; + case OpNum::FilteredList: + return "FilteredList"; } int32_t raw_op = static_cast(op_num); throw Exception("Operation " + std::to_string(raw_op) + " is unknown", Error::ZUNIMPLEMENTED); diff --git a/src/ZooKeeper/ZooKeeperConstants.h b/src/ZooKeeper/ZooKeeperConstants.h index 25d7e5a23c2..64e5d190ce6 100644 --- a/src/ZooKeeper/ZooKeeperConstants.h +++ b/src/ZooKeeper/ZooKeeperConstants.h @@ -36,6 +36,9 @@ enum class OpNum : int32_t MultiRead = 22, Auth = 100, SetWatches = 101, + + FilteredList = 500, /// Special operation for ClickHouse + SessionID = 997, /// Special internal request. Used to get session id. UpdateSession = 996, /// Special internal request. Used to session reconnect. }; diff --git a/src/ZooKeeper/ZooKeeperIO.cpp b/src/ZooKeeper/ZooKeeperIO.cpp index a04708fc461..12615918a38 100644 --- a/src/ZooKeeper/ZooKeeperIO.cpp +++ b/src/ZooKeeper/ZooKeeperIO.cpp @@ -119,6 +119,11 @@ void read(uint32_t & x, ReadBuffer & in) x = __builtin_bswap32(x); } +void read(uint8_t & x, ReadBuffer & in) +{ + readBinary(x, in); +} + void read(OpNum & x, ReadBuffer & in) { int32_t raw_op_num; diff --git a/src/ZooKeeper/ZooKeeperIO.h b/src/ZooKeeper/ZooKeeperIO.h index 6cefd43421a..92724b0c33e 100644 --- a/src/ZooKeeper/ZooKeeperIO.h +++ b/src/ZooKeeper/ZooKeeperIO.h @@ -55,6 +55,7 @@ void read(int32_t & x, ReadBuffer & in); void read(OpNum & x, ReadBuffer & in); void read(bool & x, ReadBuffer & in); void read(int8_t & x, ReadBuffer & in); +void read(uint8_t & x, ReadBuffer & in); void read(std::string & s, ReadBuffer & in); void read(ACL & acl, ReadBuffer & in); void read(AuthID & auth_id, ReadBuffer & in); diff --git a/tests/integration/helpers/cluster_service.py b/tests/integration/helpers/cluster_service.py index 857c8920e99..ed329c000de 100644 --- a/tests/integration/helpers/cluster_service.py +++ b/tests/integration/helpers/cluster_service.py @@ -18,7 +18,7 @@ from kazoo.exceptions import KazooException import docker -from .utils import MultiReadClient +from .utils import KeeperFeatureClient HELPERS_DIR = p.dirname(__file__) RAFTKEEPER_ROOT_DIR = p.join(p.dirname(__file__), "../../..") @@ -454,8 +454,8 @@ def get_kazoo_client(self, zoo_instance_name): zk.start() return zk - def get_multi_read_client(self, zoo_instance_name): - zk = MultiReadClient(hosts=self.get_instance_ip(zoo_instance_name), timeout=60.0) + def get_keeper_feature_client(self, zoo_instance_name): + zk = KeeperFeatureClient(hosts=self.get_instance_ip(zoo_instance_name), timeout=60.0) zk.start() return zk diff --git a/tests/integration/helpers/utils.py b/tests/integration/helpers/utils.py index 528772359bc..faf5178762a 100644 --- a/tests/integration/helpers/utils.py +++ b/tests/integration/helpers/utils.py @@ -1,10 +1,12 @@ -from kazoo.client import KazooClient, Create, GetData, GetChildren, OPEN_ACL_UNSAFE, string_types, bytes_types +from kazoo.client import KazooClient, Create, GetData, GetChildren, OPEN_ACL_UNSAFE, string_types, bytes_types, \ + GetChildren2, Exists from kazoo.protocol.paths import _prefix_root from kazoo.protocol.serialization import MultiHeader, Transaction, multiheader_struct, int_struct, read_string, \ - read_buffer, stat_struct, ZnodeStat + read_buffer, stat_struct, ZnodeStat, write_string from kazoo.protocol.connection import ReplyHeader from kazoo.exceptions import EXCEPTIONS, MarshallingError from collections import namedtuple +import struct def close_zk_clients(zk_clients): @@ -97,6 +99,12 @@ def get_children(self, path, watcher): """ self._add(GetChildren(path, watcher), None) + def get_children3(self, path, list_type, watcher): + """Add a filteredList ops to the operations. + :returns: List of childern + """ + self._add(GetChildren3(path, watcher, list_type), None) + def commit_async(self): """Commit the operations asynchronously. @@ -124,14 +132,39 @@ def _add(self, request, post_processor=None): self.operations.append(request) -class MultiReadClient(KazooClient): +class GetChildren3(namedtuple('GetChildren3', 'path watcher list_type')): + type = 500 + + def serialize(self): + b = bytearray() + b.extend(write_string(self.path)) + b.extend([1 if self.watcher else 0]) + b.extend(struct.pack('B', self.list_type)) + return b + + @classmethod + def deserialize(cls, bytes, offset): + count = int_struct.unpack_from(bytes, offset)[0] + offset += int_struct.size + if count == -1: # pragma: nocover + return [] + + children = [] + for c in range(count): + child, offset = read_string(bytes, offset) + children.append(child) + stat = ZnodeStat._make(stat_struct.unpack_from(bytes, offset)) + return children, stat + + +class KeeperFeatureClient(KazooClient): """A Zookeeper Python client extends from Kazoo.KazooClient, Kazoo is a Python library working with Zookeeper. supports multi_read ops """ def __init__(self, hosts, timeout): - """Create a :class:`MultiReadClient` instance (extends from KazooClient). + """Create a :class:`KeeperFeatureClient` instance (extends from KazooClient). :param hosts: Comma-separated list of hosts to connect to (e.g. 127.0.0.1:2181,127.0.0.1:2182,[::1]:2183). @@ -141,13 +174,13 @@ def __init__(self, hosts, timeout): For example:: - zk = MultiReadClient() + zk = KeeperFeatureClient() t = zk.start() children = zk.get_children('/') zk.stop() """ - super(MultiReadClient, self).__init__( + super(KeeperFeatureClient, self).__init__( hosts=hosts , timeout=timeout) @@ -165,6 +198,75 @@ def multi_read(self): """ return MultiReadRequest(self) + def get_filtered_children(self, path, watch=None, list_type=None, include_data=False): + """Get a list of child nodes of a path. + + If a watch is provided it will be left on the node with the + given path. The watch will be triggered by a successful + operation that deletes the node of the given path or + creates/deletes a child under the node. + + The list of children returned is not sorted and no guarantee is + provided as to its natural or lexical order. + + :param path: Path of node to list. + :param watch: Optional watch callback to set for future changes + to this path. + :param include_data: + Include the :class:`~kazoo.protocol.states.ZnodeStat` of + the node in addition to the children. This option changes + the return value to be a tuple of (children, stat). + + :param list_type: type of List, + Options:{0: ALL, 1: PERSISTENT_ONLY, 2:EPHEMERAL_ONLY}. + + :returns: List of child node names, or tuple if `include_data` + is `True`. + :rtype: list + + :raises: + :exc:`~kazoo.exceptions.NoNodeError` if the node doesn't + exist. + + :exc:`~kazoo.exceptions.ZookeeperError` if the server + returns a non-zero error code. + + .. versionadded:: 0.5 + The `include_data` option. + + """ + return self.get_filtered_children_async(path, watch=watch, list_type=list_type, + include_data=include_data).get() + + def get_filtered_children_async(self, path, watch=None, list_type=None, include_data=False): + """Asynchronously get a list of child nodes of a path. Takes + the same arguments as :meth:`get_children`. + + :rtype: :class:`~kazoo.interfaces.IAsyncResult` + + """ + if not isinstance(path, string_types): + raise TypeError("Invalid type for 'path' (string expected)") + if watch and not callable(watch): + raise TypeError("Invalid type for 'watch' (must be a callable)") + if not isinstance(include_data, bool): + raise TypeError("Invalid type for 'include_data' (bool expected)") + + + async_result = self.handler.async_result() + + if type: + req = GetChildren3(_prefix_root(self.chroot, path), watch, list_type) + else: + if include_data: + req = GetChildren2(_prefix_root(self.chroot, path), watch) + else: + req = GetChildren(_prefix_root(self.chroot, path), watch) + self._call(req, async_result) + return async_result + + + class MultiRead(namedtuple('MultiRead', 'operations')): type = 22 @@ -182,29 +284,49 @@ def deserialize(cls, bytes, offset): results = [] response = None while not header.done: - if header.type == Create.type: + if header.type == -1: + err = int_struct.unpack_from(bytes, offset)[0] + offset += int_struct.size + response = EXCEPTIONS[err]() + elif header.err is not None and header.err != 0: + response = EXCEPTIONS[header.err]() + elif header.type == Create.type: response, offset = read_string(bytes, offset) - if header.type == GetData.type: + elif header.type == GetData.type: data, offset = read_buffer(bytes, offset) stat = ZnodeStat._make(stat_struct.unpack_from(bytes, offset)) offset += stat_struct.size response = (data, stat) - if header.type == GetChildren.type: + elif header.type == GetChildren2.type: count = int_struct.unpack_from(bytes, offset)[0] offset += int_struct.size children = [] if count == -1: # pragma: nocover - raise MarshallingError() + print("-------11-------") + for c in range(count): + child, offset = read_string(bytes, offset) + children.append(child) + stat = ZnodeStat._make(stat_struct.unpack_from(bytes, offset)) + offset += stat_struct.size + response = (children, stat) + elif header.type == Exists.type: + stat = ZnodeStat._make(stat_struct.unpack_from(bytes, offset)) + offset += stat_struct.size + response = (stat) + + elif header.type == GetChildren.type: + count = int_struct.unpack_from(bytes, offset)[0] + offset += int_struct.size + children = [] + + if count == -1: # pragma: nocover + print("-------11-------") for c in range(count): child, offset = read_string(bytes, offset) children.append(child) response = children - elif header.type == -1: - err = int_struct.unpack_from(bytes, offset)[0] - offset += int_struct.size - response = EXCEPTIONS[err]() if response is not None: results.append(response) header, offset = MultiHeader.deserialize(bytes, offset) diff --git a/tests/integration/test_back_to_back/test.py b/tests/integration/test_back_to_back/test.py index b0802153b8e..2425cd6a343 100644 --- a/tests/integration/test_back_to_back/test.py +++ b/tests/integration/test_back_to_back/test.py @@ -8,7 +8,7 @@ from helpers.cluster_service import RaftKeeperCluster from helpers.utils import close_zk_clients, close_zk_client -from helpers.utils import MultiReadClient +from helpers.utils import KeeperFeatureClient cluster = RaftKeeperCluster(__file__) @@ -20,18 +20,18 @@ with_zookeeper=True, stay_alive=True) -def get_genuine_zk(multi=False): +def get_genuine_zk(use_keeper_feature_client=False): print("Zoo1", cluster.get_instance_ip("zoo1")) - if multi: - return cluster.get_multi_read_client('zoo1') + if use_keeper_feature_client: + return cluster.get_keeper_feature_client('zoo1') else: return cluster.get_kazoo_client('zoo1') -def get_fake_zk(multi=False): +def get_fake_zk(use_keeper_feature_client=False): print("node1", cluster.get_instance_ip("node1")) - if multi: - _fake_zk_instance = MultiReadClient(hosts=cluster.get_instance_ip("node1") + ":8101", timeout=60.0) + if use_keeper_feature_client: + _fake_zk_instance = KeeperFeatureClient(hosts=cluster.get_instance_ip("node1") + ":8101", timeout=60.0) else: _fake_zk_instance = KazooClient(hosts=cluster.get_instance_ip("node1") + ":8101", timeout=60.0) @@ -311,6 +311,26 @@ def test_multi_transactions(started_cluster): close_zk_clients([genuine_zk, fake_zk]) +def test_filtered_list(): + + fake_zk = None + try: + fake_zk = get_fake_zk(True) + fake_zk.create('/test_filteredList') + ephemerals = [f'ephemeral{i}' for i in range(3)] + persistents = [f'persistent{i}' for i in range(3)] + for node in ephemerals: + fake_zk.create("/test_filteredList/" + node, ephemeral=True) + for node in persistents: + fake_zk.create("/test_filteredList/" + node, ephemeral=False) + + assert(sorted(fake_zk.get_filtered_children('/test_filteredList', list_type=0)[0]) == sorted(ephemerals + persistents)) + assert(sorted(fake_zk.get_filtered_children('/test_filteredList', list_type=1)[0]) == sorted(persistents)) + assert(sorted(fake_zk.get_filtered_children('/test_filteredList', list_type=2)[0]) == sorted(ephemerals)) + finally: + close_zk_clients([fake_zk]) + + def test_multi_read(): genuine_zk = fake_zk = None try: @@ -341,6 +361,21 @@ def test_multi_read(): assert results[1].__class__ == NoNodeError assert results[2].__class__ == NoNodeError assert results[3][0] == b"rico" + + # test filtered_list in multi + t = fake_zk.multi_read() + t.get_children3('/test_multi_read', 0, None) + t.get_children3('/test_multi_read', 1, None) + t.get_children3('/test_multi_read', 2, None) + t.get_children3('/test_multi_read/fre', 1,None) + + results = t.commit() + assert len(results) == 4 + assert sorted(results[0][0]) == ['fred', 'freddy', 'smith'] + assert sorted(results[1][0]) == ['freddy', 'smith'] + assert results[2][0] == ['fred'] + from kazoo.exceptions import NoNodeError + assert results[3].__class__ == NoNodeError finally: close_zk_clients([genuine_zk, fake_zk])