Skip to content

Commit

Permalink
Review and tests
Browse files Browse the repository at this point in the history
  • Loading branch information
MikhailBurdukov committed Jan 11, 2024
1 parent aa80d5a commit f5abf0b
Show file tree
Hide file tree
Showing 4 changed files with 74 additions and 21 deletions.
60 changes: 40 additions & 20 deletions ch_tools/chadmin/internal/zookeeper.py
Original file line number Diff line number Diff line change
@@ -1,14 +1,15 @@
import logging
import os
import re
from collections import deque
from contextlib import contextmanager
from math import sqrt
from queue import Queue

from kazoo.client import KazooClient
from kazoo.exceptions import NoNodeError

from ch_tools.chadmin.cli import get_clickhouse_config, get_macros
from ch_tools.chadmin.internal.utils import chunked


def get_zk_node(ctx, path, binary=False):
Expand Down Expand Up @@ -120,15 +121,14 @@ def _find_paths(zk, root_path, included_paths_regexp, excluded_paths_regexp=None
Return paths of nodes that match the include regular expression and do not match the excluded one.
"""
paths = set()
queue: Queue = Queue()
queue.put(root_path)
queue = deque(root_path)
included_regexp = re.compile("|".join(included_paths_regexp))
excluded_regexp = (
re.compile("|".join(excluded_paths_regexp)) if excluded_paths_regexp else None
)

while not queue.empty():
path = queue.get()
while len(queue):
path = queue.popleft()
if excluded_regexp and re.match(excluded_regexp, path):
continue

Expand All @@ -138,7 +138,7 @@ def _find_paths(zk, root_path, included_paths_regexp, excluded_paths_regexp=None
if re.match(included_regexp, subpath):
paths.add(subpath)
else:
queue.put(os.path.join(path, subpath))
queue.append(os.path.join(path, subpath))

return list(paths)

Expand All @@ -156,6 +156,11 @@ def _delete_nodes_transaction(zk, to_delete_in_trasaction):
if result.count(True) == len(result):
# Transaction completed successfully, exit.
return

print(
"Delete transaction have failed. Fallthrough to single delete operations for zk_nodes : ",
to_delete_in_trasaction,
)
for node in to_delete_in_trasaction:
try:
zk.delete(node, recursive=True)
Expand All @@ -173,27 +178,42 @@ def _delete_recursive(zk, paths):
To delete in correct order first of all we perform topological sort using bfs approach.
"""

def remove_subpaths(paths):
"""
Removing from the list paths that are subpath of antoher.
Example:
[/a, /a/b/c<-remove it]
"""
# Sorting the list in the lexicographic order
paths.sort()
normalized = [paths[0]]
# If path[i] have subnode path[j] then all paths from i to j will be subnode of i.
for path in paths:
if not path.startswith(normalized[-1]):
normalized.append(path)
return normalized

if not len(paths):
return
print("Node to recursive delete", paths)
paths = remove_subpaths(paths)
nodes_to_delete = []
queue: Queue = Queue()
for path in paths:
queue.put(path)
nodes_to_delete.append(path)
queue = deque(paths)

while not queue.empty():
path = queue.get()
while len(queue):
path = queue.popleft()
nodes_to_delete.append(path)
for child_node in _get_children(zk, path):
full_child_path = os.path.join(path, child_node)
queue.put(full_child_path)
nodes_to_delete.append(full_child_path)
queue.append(os.path.join(path, child_node))

# When number of nodes to delete is large preferable to use greater transaction size.
operations_in_transaction = max(100, int(sqrt(len(nodes_to_delete))))

while len(nodes_to_delete) > 0:
transaction_size = min(len(nodes_to_delete), operations_in_transaction)
transaction_oper = nodes_to_delete[-transaction_size:]
nodes_to_delete = nodes_to_delete[:-transaction_size]
_delete_nodes_transaction(zk, transaction_oper[::-1])
for transaction_orerations in chunked(
reversed(nodes_to_delete), operations_in_transaction
):
_delete_nodes_transaction(zk, transaction_orerations)


def clean_zk_metadata_for_hosts(ctx, nodes):
Expand Down
21 changes: 20 additions & 1 deletion tests/features/chadmin_zookeeper.feature
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,25 @@ Feature: chadmin zookeeper commands.
'/test/clickhouse/databases/test/shard1/counter'
"""
And we do hosts cleanup on zookeeper01 with fqdn host1.net,host2.net and zk root /test
Then the list of children on zookeeper01 for zk node /test/clickhouse/databases/test/ are equal to
Then the list of children on zookeeper01 for zk node test/clickhouse/databases/test/ are empty

Scenario: Zookeeper recursive delete command
When we execute chadmin create zk nodes on zookeeper01
"""
/test/a/b/c
/test/a/b/d
/test/a/f
"""
And we delete zookeepers nodes /test/a on zookeeper01
Then the list of children on zookeeper01 for zk node /test are empty

Scenario: Zookeeper delete parent and child nodes
When we execute chadmin create zk nodes on zookeeper01
"""
/test/a/b
/test/c/d
"""
# Make sure that it is okey to delete the node and its child.
And we delete zookeepers nodes /test/a,/test/a/b on zookeeper01
And we delete zookeepers nodes /test/c/d,/test/c on zookeeper01
Then the list of children on zookeeper01 for zk node /test are empty
7 changes: 7 additions & 0 deletions tests/modules/chadmin.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,13 @@ def create_zk_node(self, zk_node, no_ch_config=True, recursive=True):
)
return self.exec_cmd(cmd)

def zk_delete(self, zk_nodes, no_ch_config=True):
cmd = "zookeeper {use_config} delete {nodes}".format(
use_config="--no-ch-config" if no_ch_config else "",
nodes=zk_nodes,
)
return self.exec_cmd(cmd)

def zk_list(self, zk_node, no_ch_config=True):
cmd = "zookeeper {use_config} list {node}".format(
use_config="--no-ch-config" if no_ch_config else "",
Expand Down
7 changes: 7 additions & 0 deletions tests/steps/chadmin.py
Original file line number Diff line number Diff line change
Expand Up @@ -38,3 +38,10 @@ def step_childen_list_empty(context, node, zk_node):
container = get_container(context, node)
result = Chadmin(container).zk_list(zk_node)
assert_that(result.output.decode(), equal_to("\n"))


@when("we delete zookeepers nodes {zk_nodes} on {node:w}")
def step_delete_command(context, zk_nodes, node):
container = get_container(context, node)
result = Chadmin(container).zk_delete(zk_nodes)
assert result.exit_code == 0, f" output:\n {result.output.decode().strip()}"

0 comments on commit f5abf0b

Please sign in to comment.