Skip to content

Commit

Permalink
Speed up cleanup-removed-hosts-metadata script
Browse files Browse the repository at this point in the history
  • Loading branch information
MikhailBurdukov committed Jan 9, 2024
1 parent 86cca4e commit aa80d5a
Showing 1 changed file with 60 additions and 26 deletions.
86 changes: 60 additions & 26 deletions ch_tools/chadmin/internal/zookeeper.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,11 @@
import os
import re
from contextlib import contextmanager
from math import sqrt
from queue import Queue

from kazoo.client import KazooClient
from kazoo.exceptions import NoNodeError, NotEmptyError
from kazoo.exceptions import NoNodeError

from ch_tools.chadmin.cli import get_clickhouse_config, get_macros

Expand Down Expand Up @@ -90,13 +91,7 @@ def delete_zk_node(ctx, path):
def delete_zk_nodes(ctx, paths):
paths_formated = [_format_path(ctx, path) for path in paths]
with zk_client(ctx) as zk:
_delete_zk_nodes(zk, paths_formated)


def _delete_zk_nodes(zk, paths):
for path in paths:
print(f"Deleting ZooKeeper node {path}")
zk.delete(path, recursive=True)
_delete_recursive(zk, paths_formated)


def _format_path(ctx, path):
Expand Down Expand Up @@ -148,22 +143,63 @@ def _find_paths(zk, root_path, included_paths_regexp, excluded_paths_regexp=None
return list(paths)


def clean_zk_metadata_for_hosts(ctx, nodes):
def _delete_nodes_transaction(zk, to_delete_in_trasaction):
"""
Perform cleanup in zookeeper after deleting hosts in the cluster or whole cluster deleting.
Perform deletion for the list of nodes in a single transaction.
If the transaction fails, go through the list and delete the nodes one by one.
"""

def _try_delete_zk_node(zk, node):
delete_transaction = zk.transaction()
for node in to_delete_in_trasaction:
delete_transaction.delete(node)
result = delete_transaction.commit()

if result.count(True) == len(result):
# Transaction completed successfully, exit.
return
for node in to_delete_in_trasaction:
try:
_delete_zk_nodes(zk, [node])
zk.delete(node, recursive=True)
except NoNodeError:
# Someone deleted node before us. Do nothing.
print("Node {node} is already absent, skipped".format(node=node))
except NotEmptyError:
# Someone created child node while deleting.
# I'm not sure that we can get this exception with recursive=True.
# Do nothing.
print("Node {node} is not empty, skipped".format(node=node))


def _delete_recursive(zk, paths):
"""
Kazoo already has the ability to recursively delete nodes, but the implementation is quite naive
and has poor performance with a large number of nodes being deleted.
In this implementation we unite the nodes to delete in transactions to do single operation for batch of nodes.
To delete in correct order first of all we perform topological sort using bfs approach.
"""

print("Node to recursive delete", paths)
nodes_to_delete = []
queue: Queue = Queue()
for path in paths:
queue.put(path)
nodes_to_delete.append(path)

while not queue.empty():
path = queue.get()
for child_node in _get_children(zk, path):
full_child_path = os.path.join(path, child_node)
queue.put(full_child_path)
nodes_to_delete.append(full_child_path)

operations_in_transaction = max(100, int(sqrt(len(nodes_to_delete))))

while len(nodes_to_delete) > 0:
transaction_size = min(len(nodes_to_delete), operations_in_transaction)
transaction_oper = nodes_to_delete[-transaction_size:]
nodes_to_delete = nodes_to_delete[:-transaction_size]
_delete_nodes_transaction(zk, transaction_oper[::-1])


def clean_zk_metadata_for_hosts(ctx, nodes):
"""
Perform cleanup in zookeeper after deleting hosts in the cluster or whole cluster deleting.
"""

def _find_tables(zk, root_path, nodes):
"""
Expand Down Expand Up @@ -243,23 +279,21 @@ def _remove_replicas_queues(zk, paths, replica_names):
continue

if zk.exists(queue_path):
_try_delete_zk_node(zk, queue_path)

def _remove_hosts_nodes(zk, paths):
for node in paths:
_try_delete_zk_node(zk, node)
_delete_recursive(zk, queue_path)

def _remove_if_no_hosts_in_replicas(zk, paths):
"""
Remove node if subnode is empty
"""
to_delete = []
for path in paths:
child_full_path = os.path.join(path, "replicas")
if (
not zk.exists(child_full_path)
or len(_get_children(zk, child_full_path)) == 0
):
_try_delete_zk_node(zk, path)
to_delete.append(path)
_delete_recursive(zk, to_delete)

def tables_cleanup(zk, zk_root_path):
"""
Expand All @@ -268,7 +302,7 @@ def tables_cleanup(zk, zk_root_path):
(table_paths, hosts_paths) = _find_tables(zk, zk_root_path, nodes)
_set_replicas_is_lost(zk, table_paths, nodes)
_remove_replicas_queues(zk, table_paths, nodes)
_remove_hosts_nodes(zk, hosts_paths)
_delete_recursive(zk, hosts_paths)
_remove_if_no_hosts_in_replicas(zk, table_paths)

def databases_cleanups(zk, zk_root_path):
Expand All @@ -277,7 +311,7 @@ def databases_cleanups(zk, zk_root_path):
"""

(databases_paths, hosts_paths) = _find_databases(zk, zk_root_path, nodes)
_remove_hosts_nodes(zk, hosts_paths)
_delete_recursive(zk, hosts_paths)
_remove_if_no_hosts_in_replicas(zk, databases_paths)

zk_root_path = _format_path(ctx, "/")
Expand Down

0 comments on commit aa80d5a

Please sign in to comment.