From baaf84329cb7f524eef08e8256c6537879934d81 Mon Sep 17 00:00:00 2001 From: MikhailBurdukov <102754618+MikhailBurdukov@users.noreply.github.com> Date: Thu, 11 Jan 2024 20:59:27 +0300 Subject: [PATCH] Speed up cleanup-removed-hosts-metadata script (#89) * Speed up cleanup-removed-hosts-metadata script * Review and tests * style * Review * Cosmetic changes --------- Co-authored-by: Aleksei Filatov --- ch_tools/chadmin/internal/zookeeper.py | 124 +++++++++++++++++------ tests/features/chadmin_zookeeper.feature | 21 +++- tests/modules/chadmin.py | 7 ++ tests/steps/chadmin.py | 7 ++ 4 files changed, 126 insertions(+), 33 deletions(-) diff --git a/ch_tools/chadmin/internal/zookeeper.py b/ch_tools/chadmin/internal/zookeeper.py index 13e1f768..05b8244e 100644 --- a/ch_tools/chadmin/internal/zookeeper.py +++ b/ch_tools/chadmin/internal/zookeeper.py @@ -1,13 +1,15 @@ import logging import os import re +from collections import deque from contextlib import contextmanager -from queue import Queue +from math import sqrt from kazoo.client import KazooClient -from kazoo.exceptions import NoNodeError, NotEmptyError +from kazoo.exceptions import NoNodeError from ch_tools.chadmin.cli import get_clickhouse_config, get_macros +from ch_tools.chadmin.internal.utils import chunked def get_zk_node(ctx, path, binary=False): @@ -90,13 +92,7 @@ def delete_zk_node(ctx, path): def delete_zk_nodes(ctx, paths): paths_formated = [_format_path(ctx, path) for path in paths] with zk_client(ctx) as zk: - _delete_zk_nodes(zk, paths_formated) - - -def _delete_zk_nodes(zk, paths): - for path in paths: - print(f"Deleting ZooKeeper node {path}") - zk.delete(path, recursive=True) + _delete_recursive(zk, paths_formated) def _format_path(ctx, path): @@ -125,15 +121,14 @@ def _find_paths(zk, root_path, included_paths_regexp, excluded_paths_regexp=None Return paths of nodes that match the include regular expression and do not match the excluded one. """ paths = set() - queue: Queue = Queue() - queue.put(root_path) + queue = deque(root_path) included_regexp = re.compile("|".join(included_paths_regexp)) excluded_regexp = ( re.compile("|".join(excluded_paths_regexp)) if excluded_paths_regexp else None ) - while not queue.empty(): - path = queue.get() + while len(queue): + path = queue.popleft() if excluded_regexp and re.match(excluded_regexp, path): continue @@ -143,27 +138,94 @@ def _find_paths(zk, root_path, included_paths_regexp, excluded_paths_regexp=None if re.match(included_regexp, subpath): paths.add(subpath) else: - queue.put(os.path.join(path, subpath)) + queue.append(os.path.join(path, subpath)) return list(paths) -def clean_zk_metadata_for_hosts(ctx, nodes): +def _delete_nodes_transaction(zk, to_delete_in_trasaction): """ - Perform cleanup in zookeeper after deleting hosts in the cluster or whole cluster deleting. + Perform deletion for the list of nodes in a single transaction. + If the transaction fails, go through the list and delete the nodes one by one. """ - - def _try_delete_zk_node(zk, node): + delete_transaction = zk.transaction() + for node in to_delete_in_trasaction: + delete_transaction.delete(node) + result = delete_transaction.commit() + + if result.count(True) == len(result): + # Transaction completed successfully, exit. + return + + print( + "Delete transaction have failed. Fallthrough to single delete operations for zk_nodes : ", + to_delete_in_trasaction, + ) + for node in to_delete_in_trasaction: try: - _delete_zk_nodes(zk, [node]) + zk.delete(node, recursive=True) except NoNodeError: # Someone deleted node before us. Do nothing. print("Node {node} is already absent, skipped".format(node=node)) - except NotEmptyError: - # Someone created child node while deleting. - # I'm not sure that we can get this exception with recursive=True. - # Do nothing. - print("Node {node} is not empty, skipped".format(node=node)) + + +def _remove_subpaths(paths): + """ + Removing from the list paths that are subpath of another. + + Example: + [/a, /a/b/c<-remove it] + """ + if not paths: + return + # Sorting the list in the lexicographic order + paths.sort() + paths = [path.split("/") for path in paths] + normalized_paths = [paths[0]] + # If path[i] has subnode path[j] then all paths from i to j will be subnode of i. + for path in paths: + last = normalized_paths[-1] + # Ignore the path if the last normalized one is its prefix + if len(last) > len(path) or path[: len(last)] != last: + normalized_paths.append(path) + return ["/".join(path) for path in normalized_paths] + + +def _delete_recursive(zk, paths): + """ + Kazoo already has the ability to recursively delete nodes, but the implementation is quite naive + and has poor performance with a large number of nodes being deleted. + + In this implementation we unite the nodes to delete in transactions to do single operation for batch of nodes. + To delete in correct order first of all we perform topological sort using bfs approach. + """ + + if len(paths) == 0: + return + print("Node to recursive delete", paths) + paths = _remove_subpaths(paths) + nodes_to_delete = [] + queue = deque(paths) + + while queue: + path = queue.popleft() + nodes_to_delete.append(path) + for child_node in _get_children(zk, path): + queue.append(os.path.join(path, child_node)) + + # When number of nodes to delete is large preferable to use greater transaction size. + operations_in_transaction = max(100, int(sqrt(len(nodes_to_delete)))) + + for transaction_orerations in chunked( + reversed(nodes_to_delete), operations_in_transaction + ): + _delete_nodes_transaction(zk, transaction_orerations) + + +def clean_zk_metadata_for_hosts(ctx, nodes): + """ + Perform cleanup in zookeeper after deleting hosts in the cluster or whole cluster deleting. + """ def _find_tables(zk, root_path, nodes): """ @@ -243,23 +305,21 @@ def _remove_replicas_queues(zk, paths, replica_names): continue if zk.exists(queue_path): - _try_delete_zk_node(zk, queue_path) - - def _remove_hosts_nodes(zk, paths): - for node in paths: - _try_delete_zk_node(zk, node) + _delete_recursive(zk, queue_path) def _remove_if_no_hosts_in_replicas(zk, paths): """ Remove node if subnode is empty """ + to_delete = [] for path in paths: child_full_path = os.path.join(path, "replicas") if ( not zk.exists(child_full_path) or len(_get_children(zk, child_full_path)) == 0 ): - _try_delete_zk_node(zk, path) + to_delete.append(path) + _delete_recursive(zk, to_delete) def tables_cleanup(zk, zk_root_path): """ @@ -268,7 +328,7 @@ def tables_cleanup(zk, zk_root_path): (table_paths, hosts_paths) = _find_tables(zk, zk_root_path, nodes) _set_replicas_is_lost(zk, table_paths, nodes) _remove_replicas_queues(zk, table_paths, nodes) - _remove_hosts_nodes(zk, hosts_paths) + _delete_recursive(zk, hosts_paths) _remove_if_no_hosts_in_replicas(zk, table_paths) def databases_cleanups(zk, zk_root_path): @@ -277,7 +337,7 @@ def databases_cleanups(zk, zk_root_path): """ (databases_paths, hosts_paths) = _find_databases(zk, zk_root_path, nodes) - _remove_hosts_nodes(zk, hosts_paths) + _delete_recursive(zk, hosts_paths) _remove_if_no_hosts_in_replicas(zk, databases_paths) zk_root_path = _format_path(ctx, "/") diff --git a/tests/features/chadmin_zookeeper.feature b/tests/features/chadmin_zookeeper.feature index 702bcd92..8d5d380b 100644 --- a/tests/features/chadmin_zookeeper.feature +++ b/tests/features/chadmin_zookeeper.feature @@ -66,6 +66,25 @@ Feature: chadmin zookeeper commands. '/test/clickhouse/databases/test/shard1/counter' """ And we do hosts cleanup on zookeeper01 with fqdn host1.net,host2.net and zk root /test - Then the list of children on zookeeper01 for zk node /test/clickhouse/databases/test/ are equal to + Then the list of children on zookeeper01 for zk node test/clickhouse/databases/test/ are empty + + Scenario: Zookeeper recursive delete command + When we execute chadmin create zk nodes on zookeeper01 + """ + /test/a/b/c + /test/a/b/d + /test/a/f + """ + And we delete zookeepers nodes /test/a on zookeeper01 + Then the list of children on zookeeper01 for zk node /test are empty + + Scenario: Zookeeper delete parent and child nodes + When we execute chadmin create zk nodes on zookeeper01 """ + /test/a/b + /test/c/d """ + # Make sure that it is okey to delete the node and its child. + And we delete zookeepers nodes /test/a,/test/a/b on zookeeper01 + And we delete zookeepers nodes /test/c/d,/test/c on zookeeper01 + Then the list of children on zookeeper01 for zk node /test are empty diff --git a/tests/modules/chadmin.py b/tests/modules/chadmin.py index 015bfa1f..d9c05b07 100644 --- a/tests/modules/chadmin.py +++ b/tests/modules/chadmin.py @@ -19,6 +19,13 @@ def create_zk_node(self, zk_node, no_ch_config=True, recursive=True): ) return self.exec_cmd(cmd) + def zk_delete(self, zk_nodes, no_ch_config=True): + cmd = "zookeeper {use_config} delete {nodes}".format( + use_config="--no-ch-config" if no_ch_config else "", + nodes=zk_nodes, + ) + return self.exec_cmd(cmd) + def zk_list(self, zk_node, no_ch_config=True): cmd = "zookeeper {use_config} list {node}".format( use_config="--no-ch-config" if no_ch_config else "", diff --git a/tests/steps/chadmin.py b/tests/steps/chadmin.py index 072d165a..068ae607 100644 --- a/tests/steps/chadmin.py +++ b/tests/steps/chadmin.py @@ -38,3 +38,10 @@ def step_childen_list_empty(context, node, zk_node): container = get_container(context, node) result = Chadmin(container).zk_list(zk_node) assert_that(result.output.decode(), equal_to("\n")) + + +@when("we delete zookeepers nodes {zk_nodes} on {node:w}") +def step_delete_command(context, zk_nodes, node): + container = get_container(context, node) + result = Chadmin(container).zk_delete(zk_nodes) + assert result.exit_code == 0, f" output:\n {result.output.decode().strip()}"