Skip to content

Commit

Permalink
Move zookeeper.py in chadmin (#70)
Browse files Browse the repository at this point in the history
* Move zookeeper.py in chadmin

* Review fixes

* Changes

* Update chadmin_zookeeper.feature
  • Loading branch information
MikhailBurdukov authored Oct 5, 2023
1 parent 3024d65 commit 8c07351
Show file tree
Hide file tree
Showing 5 changed files with 296 additions and 14 deletions.
24 changes: 23 additions & 1 deletion ch_tools/chadmin/cli/zookeeper_group.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
from ch_tools.chadmin.internal.table_replica import get_table_replica
from ch_tools.chadmin.internal.zookeeper import (
check_zk_node,
clean_zk_metadata_for_hosts,
create_zk_nodes,
delete_zk_nodes,
get_zk_node,
Expand Down Expand Up @@ -41,8 +42,18 @@
help="Do not try to get parameters from clickhouse config.xml.",
default=False,
)
@option(
"-c",
"--chroot",
"zk_root_path",
type=str,
help="Cluster ZooKeeper root path. If not specified,the root path will be used.",
required=False,
)
@pass_context
def zookeeper_group(ctx, host, port, timeout, zkcli_identity, no_chroot, no_ch_config):
def zookeeper_group(
ctx, host, port, timeout, zkcli_identity, no_chroot, no_ch_config, zk_root_path
):
"""ZooKeeper management commands.
ZooKeeper command runs client which connects to Zookeeper node.
Expand All @@ -57,6 +68,7 @@ def zookeeper_group(ctx, host, port, timeout, zkcli_identity, no_chroot, no_ch_c
"zkcli_identity": zkcli_identity,
"no_chroot": no_chroot,
"no_ch_config": no_ch_config,
"zk_root_path": zk_root_path,
}


Expand Down Expand Up @@ -266,3 +278,13 @@ def delete_ddl_task_command(ctx, tasks):
"""
paths = [f"/clickhouse/task_queue/ddl/{task}" for task in tasks]
delete_zk_nodes(ctx, paths)


@zookeeper_group.command(
name="cleanup-removed-hosts-metadata",
help="Remove metadata from Zookeeper for specified hosts.",
)
@argument("fqdn", type=ListParamType())
@pass_context
def clickhouse_hosts_command(ctx, fqdn):
clean_zk_metadata_for_hosts(ctx, fqdn)
168 changes: 155 additions & 13 deletions ch_tools/chadmin/internal/zookeeper.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,11 @@
import logging
import os
import re
from contextlib import contextmanager
from queue import Queue

from kazoo.client import KazooClient
from kazoo.exceptions import NoNodeError
from kazoo.exceptions import NoNodeError, NotEmptyError

from ch_tools.chadmin.cli import get_config, get_macros

Expand All @@ -27,19 +29,22 @@ def get_zk_node_acls(ctx, path):
return zk.get_acls(path)


def _get_children(zk, path):
try:
return zk.get_children(path)
except NoNodeError:
return [] # in the case ZK deletes a znode while we traverse the tree


def list_zk_nodes(ctx, path, verbose=False):
def _stat_node(zk, node):
descendants_count = 0
queue = [node]
while queue:
item = queue.pop()
try:
children = zk.get_children(item)
descendants_count += len(children)
queue.extend(os.path.join(item, node) for node in children)
except NoNodeError:
# ZooKeeper nodes can be deleted during node tree traversal
pass
children = _get_children(zk, item)
descendants_count += len(children)
queue.extend(os.path.join(item, node) for node in children)

return {
"path": node,
Expand Down Expand Up @@ -83,11 +88,15 @@ def delete_zk_node(ctx, path):


def delete_zk_nodes(ctx, paths):
paths_formated = [_format_path(ctx, path) for path in paths]
with zk_client(ctx) as zk:
for path in paths:
path = _format_path(ctx, path)
print(f"Deleting ZooKeeper node {path}")
zk.delete(path, recursive=True)
_delete_zk_nodes(zk, paths_formated)


def _delete_zk_nodes(zk, paths):
for path in paths:
print(f"Deleting ZooKeeper node {path}")
zk.delete(path, recursive=True)


def _format_path(ctx, path):
Expand All @@ -98,6 +107,136 @@ def _format_path(ctx, path):
return path.format_map(get_macros(ctx))


def _set_node_value(zk, path, value):
"""
Set value to node in zk.
"""
if zk.exists(path):
try:
zk.set(path, value.encode())
except NoNodeError:
print(f"Can not set for node: {path} value : {value}")


def _find_paths(zk, root_path, included_paths_regexp, excluded_paths_regexp=None):
"""
Traverse zookeeper tree from root_path with bfs approach.
Return paths of nodes that match the include regular expression and do not match the excluded one.
"""
paths = set()
queue: Queue = Queue()
queue.put(root_path)
included_regexp = re.compile("|".join(included_paths_regexp))
excluded_regexp = (
re.compile("|".join(excluded_paths_regexp)) if excluded_paths_regexp else None
)

while not queue.empty():
path = queue.get()
if excluded_regexp and re.match(excluded_regexp, path):
continue

for child_node in _get_children(zk, path):
subpath = os.path.join(path, child_node)

if re.match(included_regexp, subpath):
paths.add(subpath)
else:
queue.put(os.path.join(path, subpath))

return list(paths)


def clean_zk_metadata_for_hosts(ctx, nodes):
"""
Perform cleanup in zookeeper after deleting hosts in the cluster or whole cluster deleting.
"""

def _try_delete_zk_node(zk, node):
try:
_delete_zk_nodes(zk, [node])
except NoNodeError:
# Someone deleted node before us. Do nothing.
print("Node {node} is already absent, skipped".format(node=node))
except NotEmptyError:
# Someone created child node while deleting.
# I'm not sure that we can get this exception with recursive=True.
# Do nothing.
print("Node {node} is not empty, skipped".format(node=node))

def _find_parents(zk, root_path, nodes, parent_name):
excluded_paths = [
".*clickhouse/task_queue",
".*clickhouse/zero_copy",
]
included_paths = [".*/" + parent_name + "/" + node for node in nodes]
paths = _find_paths(zk, root_path, included_paths, excluded_paths)
# Paths will be like */shard1/replicas/hostname. But we need */shard1.
# Go up for 2 directories.
paths = [os.sep.join(path.split(os.sep)[:-2]) for path in paths]
# One path might be in list several times. Make list unique.
return list(set(paths))

def _set_replicas_is_lost(zk, table_paths, nodes):
"""
Set flag <path>/replicas/<replica_name>/is_lost to 1
"""
for path in table_paths:

replica_path = os.path.join(path, "replicas")
if not zk.exists(replica_path):
continue

for node in nodes:
is_lost_flag_path = os.path.join(replica_path, node, "is_lost")
print("Set is_lost_flag " + is_lost_flag_path)
_set_node_value(zk, is_lost_flag_path, "1")

def _remove_replicas_queues(zk, paths, replica_names):
"""
Remove <path>/replicas/<replica_name>/queue
"""
for path in paths:
replica_name = os.path.join(path, "replicas")
if not zk.exists(replica_name):
continue

for replica_name in replica_names:
queue_path = os.path.join(replica_name, replica_name, "queue")
if not zk.exists(queue_path):
continue

if zk.exists(queue_path):
_try_delete_zk_node(zk, queue_path)

def _nodes_absent(zk, zk_root_path, nodes):
included_paths = [".*/" + node for node in nodes]
paths_to_delete = _find_paths(zk, zk_root_path, included_paths)
for node in paths_to_delete:
_try_delete_zk_node(zk, node)

def _absent_if_empty_child(zk, path, child):
"""
Remove node if subnode is empty
"""
child_full_path = os.path.join(path, child)
if (
not zk.exists(child_full_path)
or len(_get_children(zk, child_full_path)) == 0
):
_try_delete_zk_node(zk, path)

zk_root_path = _format_path(ctx, "/")
with zk_client(ctx) as zk:
table_paths = _find_parents(zk, zk_root_path, nodes, "replicas")
_set_replicas_is_lost(zk, table_paths, nodes)
_remove_replicas_queues(zk, table_paths, nodes)
_nodes_absent(zk, zk_root_path, nodes)
for path in table_paths:
_absent_if_empty_child(zk, path, "replicas")


@contextmanager
def zk_client(ctx):
zk = _get_zk_client(ctx)
Expand All @@ -119,6 +258,7 @@ def _get_zk_client(ctx):
zkcli_identity = args.get("zkcli_identity")
no_chroot = args.get("no_chroot", False)
no_ch_config = args.get("no_ch_config", False)
zk_root_path = args.get("zk_root_path", None)

if no_ch_config:
if not host:
Expand All @@ -132,7 +272,9 @@ def _get_zk_client(ctx):
f'{host if host else node["host"]}:{port if port else node["port"]}'
for node in zk_config.nodes
)
if not no_chroot and zk_config.root is not None:
if zk_root_path:
connect_str += zk_root_path
elif not no_chroot and zk_config.root is not None:
connect_str += zk_config.root

if zkcli_identity is None:
Expand Down
43 changes: 43 additions & 0 deletions tests/features/chadmin_zookeeper.feature
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
Feature: chadmin zookeeper commands.

Background:
Given default configuration
And a working s3
And a working zookeeper
And a working clickhouse on clickhouse01


Scenario: Cleanup all hosts
When we execute chadmin create zk nodes on zookeeper01
"""
/test/write_sli_part/shard1/replicas/host1.net
/test/write_sli_part/shard1/replicas/host2.net
/test/read_sli_part/shard1/replicas/host1.net
/test/read_sli_part/shard1/replicas/host2.net
/test/write_sli_part/shard1/log
"""
And we do hosts cleanup on zookeeper01 with fqdn host1.net,host2.net and zk root /test

Then the list of children on zookeeper01 for zk node /test/write_sli_part are empty
And the list of children on zookeeper01 for zk node /test/read_sli_part are empty

Scenario: Cleanup single host
When we execute chadmin create zk nodes on zookeeper01
"""
/test/write_sli_part/shard1/replicas/host1.net
/test/write_sli_part/shard1/replicas/host2.net
/test/read_sli_part/shard1/replicas/host1.net
/test/read_sli_part/shard1/replicas/host2.net
/test/write_sli_part/shard1/log
"""
And we do hosts cleanup on zookeeper01 with fqdn host1.net and zk root /test


Then the list of children on zookeeper01 for zk node /test/write_sli_part/shard1/replicas/ are equal to
"""
/test/write_sli_part/shard1/replicas/host2.net
"""
And the list of children on zookeeper01 for zk node /test/read_sli_part/shard1/replicas/ are equal to
"""
/test/read_sli_part/shard1/replicas/host2.net
"""
35 changes: 35 additions & 0 deletions tests/modules/chadmin.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
import logging


class Chadmin:
def __init__(self, container):
self._container = container

def exec_cmd(self, cmd):
ch_admin_cmd = f"chadmin {cmd}"
logging.debug("chadmin command:", ch_admin_cmd)
result = self._container.exec_run(["bash", "-c", ch_admin_cmd], user="root")
return result

def create_zk_node(self, zk_node, no_ch_config=True, recursive=True):
cmd = "zookeeper {use_config} create {make_parents} {node}".format(
use_config="--no-ch-config" if no_ch_config else "",
make_parents="--make-parents" if recursive else "",
node=zk_node,
)
return self.exec_cmd(cmd)

def zk_list(self, zk_node, no_ch_config=True):
cmd = "zookeeper {use_config} list {node}".format(
use_config="--no-ch-config" if no_ch_config else "",
node=zk_node,
)
return self.exec_cmd(cmd)

def zk_cleanup(self, fqdn, zk_root, no_ch_config=True):
cmd = "zookeeper {use_config} --chroot {root} cleanup-removed-hosts-metadata {hosts}".format(
use_config="--no-ch-config" if no_ch_config else "",
root=zk_root,
hosts=fqdn,
)
return self.exec_cmd(cmd)
40 changes: 40 additions & 0 deletions tests/steps/chadmin.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
"""
Steps for interacting with chadmin.
"""

from behave import then, when
from hamcrest import assert_that, equal_to
from modules.chadmin import Chadmin
from modules.docker import get_container


@when("we execute chadmin create zk nodes on {node:w}")
def step_create_(context, node):
container = get_container(context, node)
nodes = context.text.strip().split("\n")
chadmin = Chadmin(container)

for node in nodes:
result = chadmin.create_zk_node(node)
assert result.exit_code == 0, f" output:\n {result.output.decode().strip()}"


@when("we do hosts cleanup on {node} with fqdn {fqdn} and zk root {zk_root}")
def step_host_cleanup(context, node, fqdn, zk_root):
container = get_container(context, node)
result = Chadmin(container).zk_cleanup(fqdn, zk_root)
assert result.exit_code == 0, f" output:\n {result.output.decode().strip()}"


@then("the list of children on {node:w} for zk node {zk_node} are equal to")
def step_childen_list(context, node, zk_node):
container = get_container(context, node)
result = Chadmin(container).zk_list(zk_node)
assert_that(result.output.decode(), equal_to(context.text + "\n"))


@then("the list of children on {node:w} for zk node {zk_node} are empty")
def step_childen_list_empty(context, node, zk_node):
container = get_container(context, node)
result = Chadmin(container).zk_list(zk_node)
assert_that(result.output.decode(), equal_to("\n"))

0 comments on commit 8c07351

Please sign in to comment.