From 8c0735165cd65d296db494c1cb1eee342946c1f9 Mon Sep 17 00:00:00 2001 From: MikhailBurdukov <102754618+MikhailBurdukov@users.noreply.github.com> Date: Thu, 5 Oct 2023 12:59:28 +0300 Subject: [PATCH] Move zookeeper.py in chadmin (#70) * Move zookeeper.py in chadmin * Review fixes * Changes * Update chadmin_zookeeper.feature --- ch_tools/chadmin/cli/zookeeper_group.py | 24 +++- ch_tools/chadmin/internal/zookeeper.py | 168 +++++++++++++++++++++-- tests/features/chadmin_zookeeper.feature | 43 ++++++ tests/modules/chadmin.py | 35 +++++ tests/steps/chadmin.py | 40 ++++++ 5 files changed, 296 insertions(+), 14 deletions(-) create mode 100644 tests/features/chadmin_zookeeper.feature create mode 100644 tests/modules/chadmin.py create mode 100644 tests/steps/chadmin.py diff --git a/ch_tools/chadmin/cli/zookeeper_group.py b/ch_tools/chadmin/cli/zookeeper_group.py index f50ec367..154106c5 100644 --- a/ch_tools/chadmin/cli/zookeeper_group.py +++ b/ch_tools/chadmin/cli/zookeeper_group.py @@ -7,6 +7,7 @@ from ch_tools.chadmin.internal.table_replica import get_table_replica from ch_tools.chadmin.internal.zookeeper import ( check_zk_node, + clean_zk_metadata_for_hosts, create_zk_nodes, delete_zk_nodes, get_zk_node, @@ -41,8 +42,18 @@ help="Do not try to get parameters from clickhouse config.xml.", default=False, ) +@option( + "-c", + "--chroot", + "zk_root_path", + type=str, + help="Cluster ZooKeeper root path. If not specified,the root path will be used.", + required=False, +) @pass_context -def zookeeper_group(ctx, host, port, timeout, zkcli_identity, no_chroot, no_ch_config): +def zookeeper_group( + ctx, host, port, timeout, zkcli_identity, no_chroot, no_ch_config, zk_root_path +): """ZooKeeper management commands. ZooKeeper command runs client which connects to Zookeeper node. @@ -57,6 +68,7 @@ def zookeeper_group(ctx, host, port, timeout, zkcli_identity, no_chroot, no_ch_c "zkcli_identity": zkcli_identity, "no_chroot": no_chroot, "no_ch_config": no_ch_config, + "zk_root_path": zk_root_path, } @@ -266,3 +278,13 @@ def delete_ddl_task_command(ctx, tasks): """ paths = [f"/clickhouse/task_queue/ddl/{task}" for task in tasks] delete_zk_nodes(ctx, paths) + + +@zookeeper_group.command( + name="cleanup-removed-hosts-metadata", + help="Remove metadata from Zookeeper for specified hosts.", +) +@argument("fqdn", type=ListParamType()) +@pass_context +def clickhouse_hosts_command(ctx, fqdn): + clean_zk_metadata_for_hosts(ctx, fqdn) diff --git a/ch_tools/chadmin/internal/zookeeper.py b/ch_tools/chadmin/internal/zookeeper.py index e5565395..dd4115b9 100644 --- a/ch_tools/chadmin/internal/zookeeper.py +++ b/ch_tools/chadmin/internal/zookeeper.py @@ -1,9 +1,11 @@ import logging import os +import re from contextlib import contextmanager +from queue import Queue from kazoo.client import KazooClient -from kazoo.exceptions import NoNodeError +from kazoo.exceptions import NoNodeError, NotEmptyError from ch_tools.chadmin.cli import get_config, get_macros @@ -27,19 +29,22 @@ def get_zk_node_acls(ctx, path): return zk.get_acls(path) +def _get_children(zk, path): + try: + return zk.get_children(path) + except NoNodeError: + return [] # in the case ZK deletes a znode while we traverse the tree + + def list_zk_nodes(ctx, path, verbose=False): def _stat_node(zk, node): descendants_count = 0 queue = [node] while queue: item = queue.pop() - try: - children = zk.get_children(item) - descendants_count += len(children) - queue.extend(os.path.join(item, node) for node in children) - except NoNodeError: - # ZooKeeper nodes can be deleted during node tree traversal - pass + children = _get_children(zk, item) + descendants_count += len(children) + queue.extend(os.path.join(item, node) for node in children) return { "path": node, @@ -83,11 +88,15 @@ def delete_zk_node(ctx, path): def delete_zk_nodes(ctx, paths): + paths_formated = [_format_path(ctx, path) for path in paths] with zk_client(ctx) as zk: - for path in paths: - path = _format_path(ctx, path) - print(f"Deleting ZooKeeper node {path}") - zk.delete(path, recursive=True) + _delete_zk_nodes(zk, paths_formated) + + +def _delete_zk_nodes(zk, paths): + for path in paths: + print(f"Deleting ZooKeeper node {path}") + zk.delete(path, recursive=True) def _format_path(ctx, path): @@ -98,6 +107,136 @@ def _format_path(ctx, path): return path.format_map(get_macros(ctx)) +def _set_node_value(zk, path, value): + """ + Set value to node in zk. + """ + if zk.exists(path): + try: + zk.set(path, value.encode()) + except NoNodeError: + print(f"Can not set for node: {path} value : {value}") + + +def _find_paths(zk, root_path, included_paths_regexp, excluded_paths_regexp=None): + """ + Traverse zookeeper tree from root_path with bfs approach. + + Return paths of nodes that match the include regular expression and do not match the excluded one. + """ + paths = set() + queue: Queue = Queue() + queue.put(root_path) + included_regexp = re.compile("|".join(included_paths_regexp)) + excluded_regexp = ( + re.compile("|".join(excluded_paths_regexp)) if excluded_paths_regexp else None + ) + + while not queue.empty(): + path = queue.get() + if excluded_regexp and re.match(excluded_regexp, path): + continue + + for child_node in _get_children(zk, path): + subpath = os.path.join(path, child_node) + + if re.match(included_regexp, subpath): + paths.add(subpath) + else: + queue.put(os.path.join(path, subpath)) + + return list(paths) + + +def clean_zk_metadata_for_hosts(ctx, nodes): + """ + Perform cleanup in zookeeper after deleting hosts in the cluster or whole cluster deleting. + """ + + def _try_delete_zk_node(zk, node): + try: + _delete_zk_nodes(zk, [node]) + except NoNodeError: + # Someone deleted node before us. Do nothing. + print("Node {node} is already absent, skipped".format(node=node)) + except NotEmptyError: + # Someone created child node while deleting. + # I'm not sure that we can get this exception with recursive=True. + # Do nothing. + print("Node {node} is not empty, skipped".format(node=node)) + + def _find_parents(zk, root_path, nodes, parent_name): + excluded_paths = [ + ".*clickhouse/task_queue", + ".*clickhouse/zero_copy", + ] + included_paths = [".*/" + parent_name + "/" + node for node in nodes] + paths = _find_paths(zk, root_path, included_paths, excluded_paths) + # Paths will be like */shard1/replicas/hostname. But we need */shard1. + # Go up for 2 directories. + paths = [os.sep.join(path.split(os.sep)[:-2]) for path in paths] + # One path might be in list several times. Make list unique. + return list(set(paths)) + + def _set_replicas_is_lost(zk, table_paths, nodes): + """ + Set flag /replicas//is_lost to 1 + """ + for path in table_paths: + + replica_path = os.path.join(path, "replicas") + if not zk.exists(replica_path): + continue + + for node in nodes: + is_lost_flag_path = os.path.join(replica_path, node, "is_lost") + print("Set is_lost_flag " + is_lost_flag_path) + _set_node_value(zk, is_lost_flag_path, "1") + + def _remove_replicas_queues(zk, paths, replica_names): + """ + Remove /replicas//queue + """ + for path in paths: + replica_name = os.path.join(path, "replicas") + if not zk.exists(replica_name): + continue + + for replica_name in replica_names: + queue_path = os.path.join(replica_name, replica_name, "queue") + if not zk.exists(queue_path): + continue + + if zk.exists(queue_path): + _try_delete_zk_node(zk, queue_path) + + def _nodes_absent(zk, zk_root_path, nodes): + included_paths = [".*/" + node for node in nodes] + paths_to_delete = _find_paths(zk, zk_root_path, included_paths) + for node in paths_to_delete: + _try_delete_zk_node(zk, node) + + def _absent_if_empty_child(zk, path, child): + """ + Remove node if subnode is empty + """ + child_full_path = os.path.join(path, child) + if ( + not zk.exists(child_full_path) + or len(_get_children(zk, child_full_path)) == 0 + ): + _try_delete_zk_node(zk, path) + + zk_root_path = _format_path(ctx, "/") + with zk_client(ctx) as zk: + table_paths = _find_parents(zk, zk_root_path, nodes, "replicas") + _set_replicas_is_lost(zk, table_paths, nodes) + _remove_replicas_queues(zk, table_paths, nodes) + _nodes_absent(zk, zk_root_path, nodes) + for path in table_paths: + _absent_if_empty_child(zk, path, "replicas") + + @contextmanager def zk_client(ctx): zk = _get_zk_client(ctx) @@ -119,6 +258,7 @@ def _get_zk_client(ctx): zkcli_identity = args.get("zkcli_identity") no_chroot = args.get("no_chroot", False) no_ch_config = args.get("no_ch_config", False) + zk_root_path = args.get("zk_root_path", None) if no_ch_config: if not host: @@ -132,7 +272,9 @@ def _get_zk_client(ctx): f'{host if host else node["host"]}:{port if port else node["port"]}' for node in zk_config.nodes ) - if not no_chroot and zk_config.root is not None: + if zk_root_path: + connect_str += zk_root_path + elif not no_chroot and zk_config.root is not None: connect_str += zk_config.root if zkcli_identity is None: diff --git a/tests/features/chadmin_zookeeper.feature b/tests/features/chadmin_zookeeper.feature new file mode 100644 index 00000000..aac8e72b --- /dev/null +++ b/tests/features/chadmin_zookeeper.feature @@ -0,0 +1,43 @@ +Feature: chadmin zookeeper commands. + + Background: + Given default configuration + And a working s3 + And a working zookeeper + And a working clickhouse on clickhouse01 + + + Scenario: Cleanup all hosts + When we execute chadmin create zk nodes on zookeeper01 + """ + /test/write_sli_part/shard1/replicas/host1.net + /test/write_sli_part/shard1/replicas/host2.net + /test/read_sli_part/shard1/replicas/host1.net + /test/read_sli_part/shard1/replicas/host2.net + /test/write_sli_part/shard1/log + """ + And we do hosts cleanup on zookeeper01 with fqdn host1.net,host2.net and zk root /test + + Then the list of children on zookeeper01 for zk node /test/write_sli_part are empty + And the list of children on zookeeper01 for zk node /test/read_sli_part are empty + + Scenario: Cleanup single host + When we execute chadmin create zk nodes on zookeeper01 + """ + /test/write_sli_part/shard1/replicas/host1.net + /test/write_sli_part/shard1/replicas/host2.net + /test/read_sli_part/shard1/replicas/host1.net + /test/read_sli_part/shard1/replicas/host2.net + /test/write_sli_part/shard1/log + """ + And we do hosts cleanup on zookeeper01 with fqdn host1.net and zk root /test + + + Then the list of children on zookeeper01 for zk node /test/write_sli_part/shard1/replicas/ are equal to + """ + /test/write_sli_part/shard1/replicas/host2.net + """ + And the list of children on zookeeper01 for zk node /test/read_sli_part/shard1/replicas/ are equal to + """ + /test/read_sli_part/shard1/replicas/host2.net + """ diff --git a/tests/modules/chadmin.py b/tests/modules/chadmin.py new file mode 100644 index 00000000..015bfa1f --- /dev/null +++ b/tests/modules/chadmin.py @@ -0,0 +1,35 @@ +import logging + + +class Chadmin: + def __init__(self, container): + self._container = container + + def exec_cmd(self, cmd): + ch_admin_cmd = f"chadmin {cmd}" + logging.debug("chadmin command:", ch_admin_cmd) + result = self._container.exec_run(["bash", "-c", ch_admin_cmd], user="root") + return result + + def create_zk_node(self, zk_node, no_ch_config=True, recursive=True): + cmd = "zookeeper {use_config} create {make_parents} {node}".format( + use_config="--no-ch-config" if no_ch_config else "", + make_parents="--make-parents" if recursive else "", + node=zk_node, + ) + return self.exec_cmd(cmd) + + def zk_list(self, zk_node, no_ch_config=True): + cmd = "zookeeper {use_config} list {node}".format( + use_config="--no-ch-config" if no_ch_config else "", + node=zk_node, + ) + return self.exec_cmd(cmd) + + def zk_cleanup(self, fqdn, zk_root, no_ch_config=True): + cmd = "zookeeper {use_config} --chroot {root} cleanup-removed-hosts-metadata {hosts}".format( + use_config="--no-ch-config" if no_ch_config else "", + root=zk_root, + hosts=fqdn, + ) + return self.exec_cmd(cmd) diff --git a/tests/steps/chadmin.py b/tests/steps/chadmin.py new file mode 100644 index 00000000..072d165a --- /dev/null +++ b/tests/steps/chadmin.py @@ -0,0 +1,40 @@ +""" +Steps for interacting with chadmin. +""" + +from behave import then, when +from hamcrest import assert_that, equal_to +from modules.chadmin import Chadmin +from modules.docker import get_container + + +@when("we execute chadmin create zk nodes on {node:w}") +def step_create_(context, node): + container = get_container(context, node) + nodes = context.text.strip().split("\n") + chadmin = Chadmin(container) + + for node in nodes: + result = chadmin.create_zk_node(node) + assert result.exit_code == 0, f" output:\n {result.output.decode().strip()}" + + +@when("we do hosts cleanup on {node} with fqdn {fqdn} and zk root {zk_root}") +def step_host_cleanup(context, node, fqdn, zk_root): + container = get_container(context, node) + result = Chadmin(container).zk_cleanup(fqdn, zk_root) + assert result.exit_code == 0, f" output:\n {result.output.decode().strip()}" + + +@then("the list of children on {node:w} for zk node {zk_node} are equal to") +def step_childen_list(context, node, zk_node): + container = get_container(context, node) + result = Chadmin(container).zk_list(zk_node) + assert_that(result.output.decode(), equal_to(context.text + "\n")) + + +@then("the list of children on {node:w} for zk node {zk_node} are empty") +def step_childen_list_empty(context, node, zk_node): + container = get_container(context, node) + result = Chadmin(container).zk_list(zk_node) + assert_that(result.output.decode(), equal_to("\n"))