From 74d66f8c437077687c22f2b9d560c5fb773881b6 Mon Sep 17 00:00:00 2001 From: MikhailBurdukov <102754618+MikhailBurdukov@users.noreply.github.com> Date: Fri, 9 Feb 2024 13:29:04 +0300 Subject: [PATCH] Change ddl tasks state to finished for removed hosts. (#100) * Change state to finished in ddl tasks for removed hosts. * Review * minor * try to fix tests * fix --- ch_tools/chadmin/cli/zookeeper_group.py | 6 +- ch_tools/chadmin/internal/zookeeper.py | 62 ++++++++++++++++++- ch_tools/common/config.py | 1 + tests/features/chadmin_zookeeper.feature | 39 ++++++++++++ tests/modules/chadmin.py | 6 +- tests/steps/chadmin.py | 9 ++- tests/steps/clickhouse.py | 24 +++++++ .../common/clickhouse/test_zk_path_escape.py | 19 ++++++ 8 files changed, 160 insertions(+), 6 deletions(-) create mode 100644 tests/unit/common/clickhouse/test_zk_path_escape.py diff --git a/ch_tools/chadmin/cli/zookeeper_group.py b/ch_tools/chadmin/cli/zookeeper_group.py index df258eff..8696e79a 100644 --- a/ch_tools/chadmin/cli/zookeeper_group.py +++ b/ch_tools/chadmin/cli/zookeeper_group.py @@ -18,6 +18,7 @@ ) from ch_tools.common.cli.formatting import print_json, print_response from ch_tools.common.cli.parameters import ListParamType, StringParamType +from ch_tools.common.config import load_config @group("zookeeper") @@ -304,4 +305,7 @@ def delete_ddl_task_command(ctx, tasks): @argument("fqdn", type=ListParamType()) @pass_context def clickhouse_hosts_command(ctx, fqdn): - clean_zk_metadata_for_hosts(ctx, fqdn) + # We can't get the ddl queue path from clickhouse config, + # because in some cases we are changing this path while performing cluster resetup. + config = load_config() + clean_zk_metadata_for_hosts(ctx, fqdn, config["clickhouse"]["distributed_ddl_path"]) diff --git a/ch_tools/chadmin/internal/zookeeper.py b/ch_tools/chadmin/internal/zookeeper.py index 05b8244e..3a1b3002 100644 --- a/ch_tools/chadmin/internal/zookeeper.py +++ b/ch_tools/chadmin/internal/zookeeper.py @@ -222,7 +222,21 @@ def _delete_recursive(zk, paths): _delete_nodes_transaction(zk, transaction_orerations) -def clean_zk_metadata_for_hosts(ctx, nodes): +def escape_for_zookeeper(s: str) -> str: + # clickhouse uses name formatting in zookeeper. + # See escapeForFileName.cpp + result = [] + for c in s: + if c.isalnum() or c == "_": + result.append(c) + else: + code = ord(c) + result.append(f"%{code//16:X}{code%16:X}") + + return "".join(result) + + +def clean_zk_metadata_for_hosts(ctx, nodes, zk_ddl_query_path): """ Perform cleanup in zookeeper after deleting hosts in the cluster or whole cluster deleting. """ @@ -340,10 +354,56 @@ def databases_cleanups(zk, zk_root_path): _delete_recursive(zk, hosts_paths) _remove_if_no_hosts_in_replicas(zk, databases_paths) + def get_host_mention_in_task(zk, host, ddl_task_path): + """ + Predicate for indicating that host must execute this task. + If there is a host mention in the ddl, then returns the escaped hostname with port. + Example of ddl: + + version: 5 + query: CREATE TABLE default.test ... + hosts: ['host:port', ...] + initiator: host:port + settings: s3_min_upload_part_size = 33554432, s3_max_single_part_upload_size = 33554432, distributed_foreground_insert = true, distributed_background_insert_batch = true, log_queries = false, log_queries_cut_to_length = 10000000, max_concurrent_queries_for_user = 450, ignore_on_cluster_for_replicated_udf_queries = true, ignore_on_cluster_for_replicated_access_entities_queries = true, timeout_before_checking_execution_speed = 300., join_algorithm = 'auto,direct', allow_drop_detached = true, database_atomic_wait_for_drop_and_detach_synchronously = true, kafka_disable_num_consumers_limit = true, force_remove_data_recursively_on_drop = true + tracing: 00000000-0000-0000-0000-000000000000 + """ + try: + for line in zk.get(ddl_task_path)[0].decode().strip().splitlines(): + if "hosts: " in line: + escaped_host = escape_for_zookeeper(host) + pos = line.find(escaped_host) + # Not found + if pos == -1: + return None + + return line[pos : line.find("'", pos)] + except NoNodeError: + pass + + def mark_finished_ddl_query(zk): + """ + If after deleting a host there are still unfinished ddl tasks in the queue, + then we pretend that the host has completed this task. + + """ + + for ddl_task in _get_children(zk, _format_path(ctx, zk_ddl_query_path)): + ddl_task_full = os.path.join(zk_ddl_query_path, ddl_task) + for host in nodes: + host_mention = get_host_mention_in_task(zk, host, ddl_task_full) + if not host_mention: + continue + finished_path = os.path.join(ddl_task_full, f"finished/{host_mention}") + if zk.exists(finished_path): + continue + print(f"Add {host} to finished for ddl_task: {ddl_task}") + zk.create(finished_path, b"0\n") + zk_root_path = _format_path(ctx, "/") with zk_client(ctx) as zk: tables_cleanup(zk, zk_root_path) databases_cleanups(zk, zk_root_path) + mark_finished_ddl_query(zk) @contextmanager diff --git a/ch_tools/common/config.py b/ch_tools/common/config.py index b5c012b6..424cdcdc 100644 --- a/ch_tools/common/config.py +++ b/ch_tools/common/config.py @@ -18,6 +18,7 @@ "settings": {}, "monitoring_user": None, "monitoring_password": None, + "distributed_ddl_path": "/clickhouse/task_queue/ddl", }, "object_storage": { "clean": { diff --git a/tests/features/chadmin_zookeeper.feature b/tests/features/chadmin_zookeeper.feature index 8d5d380b..0f3c37f1 100644 --- a/tests/features/chadmin_zookeeper.feature +++ b/tests/features/chadmin_zookeeper.feature @@ -5,6 +5,7 @@ Feature: chadmin zookeeper commands. And a working s3 And a working zookeeper And a working clickhouse on clickhouse01 + And a working clickhouse on clickhouse02 Scenario: Cleanup all hosts @@ -88,3 +89,41 @@ Feature: chadmin zookeeper commands. And we delete zookeepers nodes /test/a,/test/a/b on zookeeper01 And we delete zookeepers nodes /test/c/d,/test/c on zookeeper01 Then the list of children on zookeeper01 for zk node /test are empty + + + Scenario: Set finished to ddl with removed host. + # Create the fake cluster with removed host. + When we put the clickhouse config to path /etc/clickhouse-server/config.d/cluster.xml with restarting on clickhouse02 + """ + + + + + true + + clickhouse02 + 9000 + + + + zone-host.db.asd.net + 9000 + + + + + + """ + And we execute query on clickhouse02 + """ + CREATE DATABASE test_db ON CLUSTER 'cluster_with_removed_host' + """ + # Make sure that ddl have executed on clickhouse02 + And we sleep for 5 seconds + And we execute query on clickhouse02 + """ + CREATE TABLE test_db.test ON CLUSTER 'cluster_with_removed_host' (a int) ENGINE=MergeTree() ORDER BY a + """ + + And we do hosts cleanup on clickhouse02 with fqdn zone-host.db.asd.net + Then there are no unfinished dll queries on clickhouse02 diff --git a/tests/modules/chadmin.py b/tests/modules/chadmin.py index d9c05b07..f1d788fc 100644 --- a/tests/modules/chadmin.py +++ b/tests/modules/chadmin.py @@ -33,10 +33,10 @@ def zk_list(self, zk_node, no_ch_config=True): ) return self.exec_cmd(cmd) - def zk_cleanup(self, fqdn, zk_root, no_ch_config=True): - cmd = "zookeeper {use_config} --chroot {root} cleanup-removed-hosts-metadata {hosts}".format( + def zk_cleanup(self, fqdn, zk_root=None, no_ch_config=True): + cmd = "zookeeper {use_config} {root} cleanup-removed-hosts-metadata {hosts}".format( use_config="--no-ch-config" if no_ch_config else "", - root=zk_root, + root=f"--chroot {zk_root}" if zk_root else "", hosts=fqdn, ) return self.exec_cmd(cmd) diff --git a/tests/steps/chadmin.py b/tests/steps/chadmin.py index 068ae607..397184fb 100644 --- a/tests/steps/chadmin.py +++ b/tests/steps/chadmin.py @@ -20,12 +20,19 @@ def step_create_(context, node): @when("we do hosts cleanup on {node} with fqdn {fqdn} and zk root {zk_root}") -def step_host_cleanup(context, node, fqdn, zk_root): +def step_host_cleanup_with_zk_root(context, node, fqdn, zk_root): container = get_container(context, node) result = Chadmin(container).zk_cleanup(fqdn, zk_root) assert result.exit_code == 0, f" output:\n {result.output.decode().strip()}" +@when("we do hosts cleanup on {node} with fqdn {fqdn}") +def step_host_cleanup(context, node, fqdn): + container = get_container(context, node) + result = Chadmin(container).zk_cleanup(fqdn, no_ch_config=False) + assert result.exit_code == 0, f" output:\n {result.output.decode().strip()}" + + @then("the list of children on {node:w} for zk node {zk_node} are equal to") def step_childen_list(context, node, zk_node): container = get_container(context, node) diff --git a/tests/steps/clickhouse.py b/tests/steps/clickhouse.py index f2d4f449..dd02882d 100644 --- a/tests/steps/clickhouse.py +++ b/tests/steps/clickhouse.py @@ -4,6 +4,7 @@ from behave import given, then, when from hamcrest import assert_that, equal_to from modules.clickhouse import ClickhouseClient +from modules.docker import get_container from tenacity import retry, stop_after_attempt, wait_fixed @@ -60,3 +61,26 @@ def _get_data(node): return data assert_that(_get_data(node1), equal_to(_get_data(node2))) + + +@then("there are no unfinished dll queries on {node:w}") +def step_check_unfinished_ddl(context, node): + ch_client = ClickhouseClient(context, node) + query = "SELECT count(*) FROM system.distributed_ddl_queue WHERE status!='Finished'" + ret_code, response = ch_client.get_response(query) + assert_that(response, equal_to("0")) + + +@when("we put the clickhouse config to path {path} with restarting on {node:w}") +def step_put_config(context, path, node): + config = context.text + container = get_container(context, node) + result = container.exec_run( + ["bash", "-c", f'echo -e " {config} " > {path}'], user="root" + ) + assert_that(result.exit_code, equal_to(0)) + + result = container.exec_run( + ["bash", "-c", "supervisorctl restart clickhouse-server"], user="root" + ) + assert_that(result.exit_code, equal_to(0)) diff --git a/tests/unit/common/clickhouse/test_zk_path_escape.py b/tests/unit/common/clickhouse/test_zk_path_escape.py new file mode 100644 index 00000000..51a1cb26 --- /dev/null +++ b/tests/unit/common/clickhouse/test_zk_path_escape.py @@ -0,0 +1,19 @@ +import pytest + +from ch_tools.chadmin.internal.zookeeper import escape_for_zookeeper + +# type: ignore + + +@pytest.mark.parametrize( + "hostname, result", + [ + pytest.param( + "zone-hostname.database.urs.net", + "zone%2Dhostname%2Edatabase%2Eurs%2Enet", + ), + ], +) +def test_config(hostname, result): + + assert escape_for_zookeeper(hostname) == result