Skip to content

Commit

Permalink
Speed up cleanup-removed-hosts-metadata script (#89)
Browse files Browse the repository at this point in the history
* Speed up cleanup-removed-hosts-metadata script

* Review and tests

* style

* Review

* Cosmetic changes

---------

Co-authored-by: Aleksei Filatov <[email protected]>
  • Loading branch information
MikhailBurdukov and aalexfvk authored Jan 11, 2024
1 parent 8757316 commit baaf843
Show file tree
Hide file tree
Showing 4 changed files with 126 additions and 33 deletions.
124 changes: 92 additions & 32 deletions ch_tools/chadmin/internal/zookeeper.py
Original file line number Diff line number Diff line change
@@ -1,13 +1,15 @@
import logging
import os
import re
from collections import deque
from contextlib import contextmanager
from queue import Queue
from math import sqrt

from kazoo.client import KazooClient
from kazoo.exceptions import NoNodeError, NotEmptyError
from kazoo.exceptions import NoNodeError

from ch_tools.chadmin.cli import get_clickhouse_config, get_macros
from ch_tools.chadmin.internal.utils import chunked


def get_zk_node(ctx, path, binary=False):
Expand Down Expand Up @@ -90,13 +92,7 @@ def delete_zk_node(ctx, path):
def delete_zk_nodes(ctx, paths):
paths_formated = [_format_path(ctx, path) for path in paths]
with zk_client(ctx) as zk:
_delete_zk_nodes(zk, paths_formated)


def _delete_zk_nodes(zk, paths):
for path in paths:
print(f"Deleting ZooKeeper node {path}")
zk.delete(path, recursive=True)
_delete_recursive(zk, paths_formated)


def _format_path(ctx, path):
Expand Down Expand Up @@ -125,15 +121,14 @@ def _find_paths(zk, root_path, included_paths_regexp, excluded_paths_regexp=None
Return paths of nodes that match the include regular expression and do not match the excluded one.
"""
paths = set()
queue: Queue = Queue()
queue.put(root_path)
queue = deque(root_path)
included_regexp = re.compile("|".join(included_paths_regexp))
excluded_regexp = (
re.compile("|".join(excluded_paths_regexp)) if excluded_paths_regexp else None
)

while not queue.empty():
path = queue.get()
while len(queue):
path = queue.popleft()
if excluded_regexp and re.match(excluded_regexp, path):
continue

Expand All @@ -143,27 +138,94 @@ def _find_paths(zk, root_path, included_paths_regexp, excluded_paths_regexp=None
if re.match(included_regexp, subpath):
paths.add(subpath)
else:
queue.put(os.path.join(path, subpath))
queue.append(os.path.join(path, subpath))

return list(paths)


def clean_zk_metadata_for_hosts(ctx, nodes):
def _delete_nodes_transaction(zk, to_delete_in_trasaction):
"""
Perform cleanup in zookeeper after deleting hosts in the cluster or whole cluster deleting.
Perform deletion for the list of nodes in a single transaction.
If the transaction fails, go through the list and delete the nodes one by one.
"""

def _try_delete_zk_node(zk, node):
delete_transaction = zk.transaction()
for node in to_delete_in_trasaction:
delete_transaction.delete(node)
result = delete_transaction.commit()

if result.count(True) == len(result):
# Transaction completed successfully, exit.
return

print(
"Delete transaction have failed. Fallthrough to single delete operations for zk_nodes : ",
to_delete_in_trasaction,
)
for node in to_delete_in_trasaction:
try:
_delete_zk_nodes(zk, [node])
zk.delete(node, recursive=True)
except NoNodeError:
# Someone deleted node before us. Do nothing.
print("Node {node} is already absent, skipped".format(node=node))
except NotEmptyError:
# Someone created child node while deleting.
# I'm not sure that we can get this exception with recursive=True.
# Do nothing.
print("Node {node} is not empty, skipped".format(node=node))


def _remove_subpaths(paths):
"""
Removing from the list paths that are subpath of another.
Example:
[/a, /a/b/c<-remove it]
"""
if not paths:
return
# Sorting the list in the lexicographic order
paths.sort()
paths = [path.split("/") for path in paths]
normalized_paths = [paths[0]]
# If path[i] has subnode path[j] then all paths from i to j will be subnode of i.
for path in paths:
last = normalized_paths[-1]
# Ignore the path if the last normalized one is its prefix
if len(last) > len(path) or path[: len(last)] != last:
normalized_paths.append(path)
return ["/".join(path) for path in normalized_paths]


def _delete_recursive(zk, paths):
"""
Kazoo already has the ability to recursively delete nodes, but the implementation is quite naive
and has poor performance with a large number of nodes being deleted.
In this implementation we unite the nodes to delete in transactions to do single operation for batch of nodes.
To delete in correct order first of all we perform topological sort using bfs approach.
"""

if len(paths) == 0:
return
print("Node to recursive delete", paths)
paths = _remove_subpaths(paths)
nodes_to_delete = []
queue = deque(paths)

while queue:
path = queue.popleft()
nodes_to_delete.append(path)
for child_node in _get_children(zk, path):
queue.append(os.path.join(path, child_node))

# When number of nodes to delete is large preferable to use greater transaction size.
operations_in_transaction = max(100, int(sqrt(len(nodes_to_delete))))

for transaction_orerations in chunked(
reversed(nodes_to_delete), operations_in_transaction
):
_delete_nodes_transaction(zk, transaction_orerations)


def clean_zk_metadata_for_hosts(ctx, nodes):
"""
Perform cleanup in zookeeper after deleting hosts in the cluster or whole cluster deleting.
"""

def _find_tables(zk, root_path, nodes):
"""
Expand Down Expand Up @@ -243,23 +305,21 @@ def _remove_replicas_queues(zk, paths, replica_names):
continue

if zk.exists(queue_path):
_try_delete_zk_node(zk, queue_path)

def _remove_hosts_nodes(zk, paths):
for node in paths:
_try_delete_zk_node(zk, node)
_delete_recursive(zk, queue_path)

def _remove_if_no_hosts_in_replicas(zk, paths):
"""
Remove node if subnode is empty
"""
to_delete = []
for path in paths:
child_full_path = os.path.join(path, "replicas")
if (
not zk.exists(child_full_path)
or len(_get_children(zk, child_full_path)) == 0
):
_try_delete_zk_node(zk, path)
to_delete.append(path)
_delete_recursive(zk, to_delete)

def tables_cleanup(zk, zk_root_path):
"""
Expand All @@ -268,7 +328,7 @@ def tables_cleanup(zk, zk_root_path):
(table_paths, hosts_paths) = _find_tables(zk, zk_root_path, nodes)
_set_replicas_is_lost(zk, table_paths, nodes)
_remove_replicas_queues(zk, table_paths, nodes)
_remove_hosts_nodes(zk, hosts_paths)
_delete_recursive(zk, hosts_paths)
_remove_if_no_hosts_in_replicas(zk, table_paths)

def databases_cleanups(zk, zk_root_path):
Expand All @@ -277,7 +337,7 @@ def databases_cleanups(zk, zk_root_path):
"""

(databases_paths, hosts_paths) = _find_databases(zk, zk_root_path, nodes)
_remove_hosts_nodes(zk, hosts_paths)
_delete_recursive(zk, hosts_paths)
_remove_if_no_hosts_in_replicas(zk, databases_paths)

zk_root_path = _format_path(ctx, "/")
Expand Down
21 changes: 20 additions & 1 deletion tests/features/chadmin_zookeeper.feature
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,25 @@ Feature: chadmin zookeeper commands.
'/test/clickhouse/databases/test/shard1/counter'
"""
And we do hosts cleanup on zookeeper01 with fqdn host1.net,host2.net and zk root /test
Then the list of children on zookeeper01 for zk node /test/clickhouse/databases/test/ are equal to
Then the list of children on zookeeper01 for zk node test/clickhouse/databases/test/ are empty

Scenario: Zookeeper recursive delete command
When we execute chadmin create zk nodes on zookeeper01
"""
/test/a/b/c
/test/a/b/d
/test/a/f
"""
And we delete zookeepers nodes /test/a on zookeeper01
Then the list of children on zookeeper01 for zk node /test are empty

Scenario: Zookeeper delete parent and child nodes
When we execute chadmin create zk nodes on zookeeper01
"""
/test/a/b
/test/c/d
"""
# Make sure that it is okey to delete the node and its child.
And we delete zookeepers nodes /test/a,/test/a/b on zookeeper01
And we delete zookeepers nodes /test/c/d,/test/c on zookeeper01
Then the list of children on zookeeper01 for zk node /test are empty
7 changes: 7 additions & 0 deletions tests/modules/chadmin.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,13 @@ def create_zk_node(self, zk_node, no_ch_config=True, recursive=True):
)
return self.exec_cmd(cmd)

def zk_delete(self, zk_nodes, no_ch_config=True):
cmd = "zookeeper {use_config} delete {nodes}".format(
use_config="--no-ch-config" if no_ch_config else "",
nodes=zk_nodes,
)
return self.exec_cmd(cmd)

def zk_list(self, zk_node, no_ch_config=True):
cmd = "zookeeper {use_config} list {node}".format(
use_config="--no-ch-config" if no_ch_config else "",
Expand Down
7 changes: 7 additions & 0 deletions tests/steps/chadmin.py
Original file line number Diff line number Diff line change
Expand Up @@ -38,3 +38,10 @@ def step_childen_list_empty(context, node, zk_node):
container = get_container(context, node)
result = Chadmin(container).zk_list(zk_node)
assert_that(result.output.decode(), equal_to("\n"))


@when("we delete zookeepers nodes {zk_nodes} on {node:w}")
def step_delete_command(context, zk_nodes, node):
container = get_container(context, node)
result = Chadmin(container).zk_delete(zk_nodes)
assert result.exit_code == 0, f" output:\n {result.output.decode().strip()}"

0 comments on commit baaf843

Please sign in to comment.