diff --git a/ch_tools/chadmin/chadmin_cli.py b/ch_tools/chadmin/chadmin_cli.py index 1c8fdbef..f7967ddb 100755 --- a/ch_tools/chadmin/chadmin_cli.py +++ b/ch_tools/chadmin/chadmin_cli.py @@ -42,7 +42,7 @@ from ch_tools.chadmin.cli.table_group import table_group from ch_tools.chadmin.cli.table_replica_group import table_replica_group from ch_tools.chadmin.cli.thread_log_group import thread_log_group -from ch_tools.chadmin.cli.wait_started_command import wait_started_command +from ch_tools.chadmin.cli.wait_group import wait_group from ch_tools.chadmin.cli.zookeeper_group import zookeeper_group from ch_tools.common.cli.context_settings import CONTEXT_SETTINGS from ch_tools.common.cli.locale_resolver import LocaleResolver @@ -116,7 +116,6 @@ def cli(ctx, format_, settings, timeout, port, debug): list_settings_command, restore_replica_command, stack_trace_command, - wait_started_command, ] groups: List[Any] = [ @@ -139,6 +138,7 @@ def cli(ctx, format_, settings, timeout, port, debug): table_group, table_replica_group, thread_log_group, + wait_group, zookeeper_group, ] diff --git a/ch_tools/chadmin/cli/wait_started_command.py b/ch_tools/chadmin/cli/wait_group.py similarity index 52% rename from ch_tools/chadmin/cli/wait_started_command.py rename to ch_tools/chadmin/cli/wait_group.py index 335e7ae4..f979fe81 100644 --- a/ch_tools/chadmin/cli/wait_started_command.py +++ b/ch_tools/chadmin/cli/wait_group.py @@ -3,9 +3,11 @@ import sys import time -from click import command, option, pass_context +from click import FloatRange, group, option, pass_context from ch_tools.chadmin.internal.utils import execute_query +from ch_tools.common.cli.parameters import TimeSpanParamType +from ch_tools.common.commands.replication_lag import estimate_replication_lag from ch_tools.common.utils import execute BASE_TIMEOUT = 600 @@ -13,7 +15,94 @@ S3_PART_LOAD_SPEED = 0.5 # in data parts per second -@command("wait-started") +@group("wait") +def wait_group(): + """Commands to wait until Clickhouse is in a certain state.""" + pass + + +@wait_group.command("replication-sync") +@option( + "-s", + "--status", + type=int, + default=0, + help="Wait until replication-lag returned status is no worse than given, 0 = OK, 1 = WARN, 2 = CRIT.", +) +@option( + "-p", + "--pause", + type=TimeSpanParamType(), + default="30s", + help="Pause between requests.", +) +@option( + "-t", + "--timeout", + type=TimeSpanParamType(), + default="3d", + help="Max amount of time to wait.", +) +@option( + "-x", + "--exec-critical", + "xcrit", + type=int, + default=3600, + help="Critical threshold for one task execution.", +) +@option( + "-c", + "--critical", + "crit", + type=int, + default=600, + help="Critical threshold for lag with errors.", +) +@option("-w", "--warning", "warn", type=int, default=300, help="Warning threshold.") +@option( + "-M", + "--merges-critical", + "mcrit", + type=FloatRange(0.0, 100.0), + default=90.0, + help="Critical threshold in percent of max_replicated_merges_in_queue.", +) +@option( + "-m", + "--merges-warning", + "mwarn", + type=FloatRange(0.0, 100.0), + default=50.0, + help="Warning threshold in percent of max_replicated_merges_in_queue.", +) +@option( + "-v", + "--verbose", + "verbose", + type=int, + count=True, + default=0, + help="Show details about lag.", +) +@pass_context +def wait_replication_sync_command( + ctx, status, pause, timeout, xcrit, crit, warn, mwarn, mcrit, verbose +): + """Wait for ClickHouse server to sync replication with other replicas using replication-lag command.""" + + deadline = time.time() + timeout.total_seconds() + while time.time() < deadline: + res = estimate_replication_lag(ctx, xcrit, crit, warn, mwarn, mcrit, verbose) + if res.code <= status: + sys.exit(0) + time.sleep(pause.total_seconds()) + + logging.error("ClickHouse can't sync replicas.") + sys.exit(1) + + +@wait_group.command("started") @option( "--timeout", type=int, diff --git a/ch_tools/common/commands/replication_lag.py b/ch_tools/common/commands/replication_lag.py new file mode 100644 index 00000000..9bac5b85 --- /dev/null +++ b/ch_tools/common/commands/replication_lag.py @@ -0,0 +1,276 @@ +from typing import Any, Dict + +from tabulate import tabulate + +from ch_tools.common.clickhouse.client.clickhouse_client import clickhouse_client +from ch_tools.common.result import Result + +XCRIT = 3600 +CRIT = 600 +WARN = 300 +MWARN = 50.0 +MCRIT = 90.0 +VERBOSE = 0 + + +def estimate_replication_lag( + ctx, xcrit=XCRIT, crit=CRIT, warn=WARN, mwarn=MWARN, mcrit=MCRIT, verbose=VERBOSE +): + """ + Check for replication lag across replicas. + Should be: lag >= lag_with_errors, lag >= max_execution + """ + # pylint: disable=too-many-branches,too-many-locals + ch_client = clickhouse_client(ctx) + lag, lag_with_errors, max_execution, max_merges, chart = get_replication_lag( + ch_client + ) + + msg_verbose = "" + msg_verbose_2 = "\n\n" + + if verbose >= 1: + verbtab = [] + + headers = [ + "Table", + "Lag [s]", + "Tasks", + "Max task execution [s]", + "Non-retrayable errors", + "Has user fault errors", + "Merges with 1000+ tries", + ] + for key, item in chart.items(): + if item.get("multi_replicas", False): + tabletab = [ + key, + item.get("delay", 0), + item.get("tasks", 0), + item.get("max_execution", 0), + item.get("errors", 0), + item.get("user_fault", False), + item.get("retried_merges", 0), + ] + verbtab.append(tabletab) + if verbose >= 2: + exceptions_retrayable = "" + exceptions_non_retrayable = "" + exceptions_ignored = "" + for exception in item.get("exceptions", []): + if exception: + if is_userfault_exception(exception): + exceptions_ignored += "\t" + exception[5:] + "\n" + elif exception.startswith(" "): + exceptions_retrayable += "\t" + exception[5:] + "\n" + else: + exceptions_non_retrayable += "\t" + exception[5:] + "\n" + max_execution_part = ( + item.get("max_execution_part", "") + if item.get("max_execution", 0) + else 0 + ) + if ( + exceptions_retrayable + or exceptions_non_retrayable + or exceptions_ignored + or max_execution_part + ): + msg_verbose_2 = msg_verbose_2 + key + ":\n" + if exceptions_non_retrayable: + msg_verbose_2 = ( + msg_verbose_2 + + " Non-retrayable errors:\n" + + exceptions_non_retrayable + ) + if exceptions_retrayable: + msg_verbose_2 = ( + msg_verbose_2 + + " Retrayable errors:\n" + + exceptions_retrayable + ) + if exceptions_ignored: + msg_verbose_2 = ( + msg_verbose_2 + + " User fault errors:\n" + + exceptions_ignored + ) + if max_execution_part: + msg_verbose_2 = ( + msg_verbose_2 + + " Result part of task with max execution time: " + + max_execution_part + + "\n" + ) + msg_verbose = tabulate(verbtab, headers=headers) + if verbose >= 2: + msg_verbose = msg_verbose + msg_verbose_2 + + max_merges_warn_threshold = 1 + max_merges_crit_threshold = 1 + if max_merges > 0: + max_replicated_merges_in_queue = get_max_replicated_merges_in_queue(ch_client) + max_merges_warn_threshold = int(max_replicated_merges_in_queue * mwarn / 100.0) + max_merges_crit_threshold = int(max_replicated_merges_in_queue * mcrit / 100.0) + + if lag < warn and max_merges < max_merges_warn_threshold: + return Result(code=0, message="OK", verbose=msg_verbose) + + msg = "Max {0} seconds, with errors {1} seconds, max task execution {2} seconds, max merges in queue {3}".format( + lag, lag_with_errors, max_execution, max_merges + ) + + if ( + lag_with_errors < crit + and max_execution < xcrit + and max_merges < max_merges_crit_threshold + ): + return Result(code=1, message=msg, verbose=msg_verbose) + + return Result(code=2, message=msg, verbose=msg_verbose) + + +def get_replication_lag(ch_client): + """ + Get max absolute_delay from system.replicas. + """ + + tables = get_tables_with_replication_delay(ch_client) + chart: Dict[str, Dict[str, Any]] = {} + for t in tables: + key = "{database}.{table}".format(database=t["database"], table=t["table"]) + chart[key] = {} + chart[key]["delay"] = int(t["absolute_delay"]) + tables = filter_out_single_replica_tables(ch_client, tables) + for t in tables: + key = "{database}.{table}".format(database=t["database"], table=t["table"]) + chart[key]["multi_replicas"] = True + tables = count_errors(ch_client, tables, -1) + + max_merges = 0 + for t in tables: + key = "{database}.{table}".format(database=t["database"], table=t["table"]) + chart[key]["tasks"] = int(t["tasks"]) + chart[key]["errors"] = int(t["errors"]) + chart[key]["max_execution"] = int(t["max_execution"]) + chart[key]["max_execution_part"] = t["max_execution_part"] + chart[key]["exceptions"] = t["exceptions"] + chart[key]["retried_merges"] = int(t["retried_merges"]) + max_merges = max(int(t["retried_merges"]), max_merges) + for exception in t["exceptions"]: + if is_userfault_exception(exception): + chart[key]["userfault"] = True + break + + lag = 0 + lag_with_errors = 0 + max_execution = 0 + for key, item in chart.items(): + if item.get("multi_replicas", False): + delay = item.get("delay", 0) + if delay > lag: + lag = delay + if ( + delay > lag_with_errors + and item.get("errors", 0) > 0 + and not item.get("userfault", False) + ): + lag_with_errors = delay + execution = item.get("max_execution", 0) + if execution > max_execution: + max_execution = execution + + return lag, lag_with_errors, max_execution, max_merges, chart + + +def get_tables_with_replication_delay(ch_client): + """ + Get tables with absolute_delay > 0. + """ + query = "SELECT database, table, zookeeper_path, absolute_delay FROM system.replicas WHERE absolute_delay > 0" + return ch_client.query(query=query, format_="JSON")["data"] + + +def filter_out_single_replica_tables(ch_client, tables): + if not tables: + return tables + + query = """ + SELECT + database, + table, + zookeeper_path + FROM system.replicas + WHERE (database, table) IN ({tables}) + AND total_replicas > 1 + """.format( + tables=",".join( + "('{0}', '{1}')".format(t["database"], t["table"]) for t in tables + ) + ) + return ch_client.query(query=query, format_="JSON")["data"] + + +def count_errors(ch_client, tables, exceptions_limit): + """ + Add count of replication errors. + """ + if not tables: + return tables + + limit = "" if exceptions_limit < 0 else "({})".format(exceptions_limit) + + query = """ + SELECT + database, + table, + count() as tasks, + countIf(last_exception != '' AND postpone_reason = '') as errors, + max(IF(is_currently_executing, dateDiff('second', last_attempt_time, now()), 0)) as max_execution, + groupUniqArray{limit}(IF(last_exception != '', concat(IF(postpone_reason = '', ' ', ' '), last_exception), '')) as exceptions, + argMax(new_part_name, IF(is_currently_executing, dateDiff('second', last_attempt_time, now()), 0)) as max_execution_part, + countIf(type = 'MERGE_PARTS' and num_tries >= 1000) as retried_merges + FROM system.replication_queue + WHERE (database, table) IN ({tables}) + GROUP BY database,table + """.format( + tables=",".join( + "('{0}', '{1}')".format(t["database"], t["table"]) for t in tables + ), + limit=limit, + ) + return ch_client.query(query=query, format_="JSON")["data"] + + +def is_userfault_exception(exception): + """ + Check if exception was caused by user. + Current list: + * DB::Exception: Cannot reserve 1.00 MiB, not enough space + * DB::Exception: Incorrect data: Sign = -127 (must be 1 or -1) + """ + + if "DB::Exception: Cannot reserve" in exception and "not enough space" in exception: + return True + if ( + "DB::Exception: Incorrect data: Sign" in exception + and "(must be 1 or -1)" in exception + ): + return True + + return False + + +def get_max_replicated_merges_in_queue(ch_client): + """ + Get max_replicated_merges_in_queue value + """ + query = """ + SELECT value FROM system.merge_tree_settings WHERE name='max_replicated_merges_in_queue' + """ + res = ch_client.query(query=query, format_="JSONCompact")["data"] + if not res: + return ( + 16 # 16 is default value for 'max_replicated_merges_in_queue' in ClickHouse + ) + return int(res[0][0]) diff --git a/ch_tools/monrun_checks/ch_replication_lag.py b/ch_tools/monrun_checks/ch_replication_lag.py index 761ef1d8..1cd401e4 100644 --- a/ch_tools/monrun_checks/ch_replication_lag.py +++ b/ch_tools/monrun_checks/ch_replication_lag.py @@ -1,12 +1,6 @@ -import logging -from typing import Any, Dict - import click -from tabulate import tabulate -from ch_tools.common.result import Result -from ch_tools.monrun_checks.clickhouse_client import ClickhouseClient -from ch_tools.monrun_checks.clickhouse_info import ClickhouseInfo +from ch_tools.common.commands.replication_lag import estimate_replication_lag @click.command("replication-lag") @@ -56,271 +50,4 @@ ) @click.pass_context def replication_lag_command(ctx, xcrit, crit, warn, mwarn, mcrit, verbose): - """ - Check for replication lag across replicas. - Should be: lag >= lag_with_errors, lag >= max_execution - """ - # pylint: disable=too-many-branches,too-many-locals - ch_client = ClickhouseClient(ctx) - lag, lag_with_errors, max_execution, max_merges, chart = get_replication_lag( - ch_client - ) - - msg_verbose = "" - msg_verbose_2 = "\n\n" - - if verbose >= 1: - verbtab = [] - - headers = [ - "Table", - "Lag [s]", - "Tasks", - "Max task execution [s]", - "Non-retrayable errors", - "Has user fault errors", - "Merges with 1000+ tries", - ] - for key, item in chart.items(): - if item.get("multi_replicas", False): - tabletab = [ - key, - item.get("delay", 0), - item.get("tasks", 0), - item.get("max_execution", 0), - item.get("errors", 0), - item.get("user_fault", False), - item.get("retried_merges", 0), - ] - verbtab.append(tabletab) - if verbose >= 2: - exceptions_retrayable = "" - exceptions_non_retrayable = "" - exceptions_ignored = "" - for exception in item.get("exceptions", []): - if exception: - if is_userfault_exception(exception): - exceptions_ignored += "\t" + exception[5:] + "\n" - elif exception.startswith(" "): - exceptions_retrayable += "\t" + exception[5:] + "\n" - else: - exceptions_non_retrayable += "\t" + exception[5:] + "\n" - max_execution_part = ( - item.get("max_execution_part", "") - if item.get("max_execution", 0) - else 0 - ) - if ( - exceptions_retrayable - or exceptions_non_retrayable - or exceptions_ignored - or max_execution_part - ): - msg_verbose_2 = msg_verbose_2 + key + ":\n" - if exceptions_non_retrayable: - msg_verbose_2 = ( - msg_verbose_2 - + " Non-retrayable errors:\n" - + exceptions_non_retrayable - ) - if exceptions_retrayable: - msg_verbose_2 = ( - msg_verbose_2 - + " Retrayable errors:\n" - + exceptions_retrayable - ) - if exceptions_ignored: - msg_verbose_2 = ( - msg_verbose_2 - + " User fault errors:\n" - + exceptions_ignored - ) - if max_execution_part: - msg_verbose_2 = ( - msg_verbose_2 - + " Result part of task with max execution time: " - + max_execution_part - + "\n" - ) - msg_verbose = tabulate(verbtab, headers=headers) - if verbose >= 2: - msg_verbose = msg_verbose + msg_verbose_2 - - max_merges_warn_threshold = 1 - max_merges_crit_threshold = 1 - if max_merges > 0: - max_replicated_merges_in_queue = get_max_replicated_merges_in_queue(ch_client) - max_merges_warn_threshold = int(max_replicated_merges_in_queue * mwarn / 100.0) - max_merges_crit_threshold = int(max_replicated_merges_in_queue * mcrit / 100.0) - - if lag < warn and max_merges < max_merges_warn_threshold: - return Result(code=0, message="OK", verbose=msg_verbose) - - msg = "Max {0} seconds, with errors {1} seconds, max task execution {2} seconds, max merges in queue {3}".format( - lag, lag_with_errors, max_execution, max_merges - ) - - try: - replica_versions_mismatch = ClickhouseInfo.get_versions_count(ctx) > 1 - if replica_versions_mismatch: - msg += ", ClickHouse versions on replicas mismatch" - return Result(code=1, message=msg, verbose=msg_verbose) - except Exception: - logging.warning("Unable to get version info from replicas", exc_info=True) - msg += ", one or more replicas is unavailable" - return Result(code=1, message=msg, verbose=msg_verbose) - - if ( - lag_with_errors < crit - and max_execution < xcrit - and max_merges < max_merges_crit_threshold - ): - return Result(code=1, message=msg, verbose=msg_verbose) - - return Result(code=2, message=msg, verbose=msg_verbose) - - -def get_replication_lag(ch_client): - """ - Get max absolute_delay from system.replicas. - """ - - tables = get_tables_with_replication_delay(ch_client) - chart: Dict[str, Dict[str, Any]] = {} - for t in tables: - key = "{database}.{table}".format(database=t["database"], table=t["table"]) - chart[key] = {} - chart[key]["delay"] = int(t["absolute_delay"]) - tables = filter_out_single_replica_tables(ch_client, tables) - for t in tables: - key = "{database}.{table}".format(database=t["database"], table=t["table"]) - chart[key]["multi_replicas"] = True - tables = count_errors(ch_client, tables, -1) - - max_merges = 0 - for t in tables: - key = "{database}.{table}".format(database=t["database"], table=t["table"]) - chart[key]["tasks"] = int(t["tasks"]) - chart[key]["errors"] = int(t["errors"]) - chart[key]["max_execution"] = int(t["max_execution"]) - chart[key]["max_execution_part"] = t["max_execution_part"] - chart[key]["exceptions"] = t["exceptions"] - chart[key]["retried_merges"] = int(t["retried_merges"]) - max_merges = max(int(t["retried_merges"]), max_merges) - for exception in t["exceptions"]: - if is_userfault_exception(exception): - chart[key]["userfault"] = True - break - - lag = 0 - lag_with_errors = 0 - max_execution = 0 - for key, item in chart.items(): - if item.get("multi_replicas", False): - delay = item.get("delay", 0) - if delay > lag: - lag = delay - if ( - delay > lag_with_errors - and item.get("errors", 0) > 0 - and not item.get("userfault", False) - ): - lag_with_errors = delay - execution = item.get("max_execution", 0) - if execution > max_execution: - max_execution = execution - - return lag, lag_with_errors, max_execution, max_merges, chart - - -def get_tables_with_replication_delay(ch_client): - """ - Get tables with absolute_delay > 0. - """ - query = "SELECT database, table, zookeeper_path, absolute_delay FROM system.replicas WHERE absolute_delay > 0" - return ch_client.execute(query, compact=False) - - -def filter_out_single_replica_tables(ch_client, tables): - if not tables: - return tables - - query = """ - SELECT - database, - table, - zookeeper_path - FROM system.replicas - WHERE (database, table) IN ({tables}) - AND total_replicas > 1 - """.format( - tables=",".join( - "('{0}', '{1}')".format(t["database"], t["table"]) for t in tables - ) - ) - return ch_client.execute(query, False) - - -def count_errors(ch_client, tables, exceptions_limit): - """ - Add count of replication errors. - """ - if not tables: - return tables - - limit = "" if exceptions_limit < 0 else "({})".format(exceptions_limit) - - query = """ - SELECT - database, - table, - count() as tasks, - countIf(last_exception != '' AND postpone_reason = '') as errors, - max(IF(is_currently_executing, dateDiff('second', last_attempt_time, now()), 0)) as max_execution, - groupUniqArray{limit}(IF(last_exception != '', concat(IF(postpone_reason = '', ' ', ' '), last_exception), '')) as exceptions, - argMax(new_part_name, IF(is_currently_executing, dateDiff('second', last_attempt_time, now()), 0)) as max_execution_part, - countIf(type = 'MERGE_PARTS' and num_tries >= 1000) as retried_merges - FROM system.replication_queue - WHERE (database, table) IN ({tables}) - GROUP BY database,table - """.format( - tables=",".join( - "('{0}', '{1}')".format(t["database"], t["table"]) for t in tables - ), - limit=limit, - ) - return ch_client.execute(query, False) - - -def is_userfault_exception(exception): - """ - Check if exception was caused by user. - Current list: - * DB::Exception: Cannot reserve 1.00 MiB, not enough space - * DB::Exception: Incorrect data: Sign = -127 (must be 1 or -1) - """ - - if "DB::Exception: Cannot reserve" in exception and "not enough space" in exception: - return True - if ( - "DB::Exception: Incorrect data: Sign" in exception - and "(must be 1 or -1)" in exception - ): - return True - - return False - - -def get_max_replicated_merges_in_queue(ch_client): - """ - Get max_replicated_merges_in_queue value - """ - query = """ - SELECT value FROM system.merge_tree_settings WHERE name='max_replicated_merges_in_queue' - """ - res = ch_client.execute(query, True) - if not res: - return ( - 16 # 16 is default value for 'max_replicated_merges_in_queue' in ClickHouse - ) - return int(res[0][0]) + return estimate_replication_lag(ctx, xcrit, crit, warn, mwarn, mcrit, verbose) diff --git a/pyproject.toml b/pyproject.toml index 0b493b90..21145ad0 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -95,7 +95,7 @@ pygments = "*" pyopenssl = "*" python-dateutil = "*" pyyaml = "<5.4" -requests = "*" +requests = "<2.30.0" tabulate = "*" tenacity = "*" termcolor = "*" diff --git a/tests/features/chadmin.feature b/tests/features/chadmin.feature new file mode 100644 index 00000000..abcc739f --- /dev/null +++ b/tests/features/chadmin.feature @@ -0,0 +1,44 @@ +Feature: chadmin commands. + + Background: + Given default configuration + And a working s3 + And a working zookeeper + And a working clickhouse on clickhouse01 + And a working clickhouse on clickhouse02 + Given we have executed queries on clickhouse01 + """ + CREATE DATABASE IF NOT EXISTS test ON CLUSTER 'cluster'; + + CREATE TABLE IF NOT EXISTS test.table_01 ON CLUSTER 'cluster' (n Int32) + ENGINE = ReplicatedMergeTree('/tables/table_01', '{replica}') PARTITION BY n ORDER BY n; + """ + + + Scenario: Check wait replication sync + When we execute command on clickhouse01 + """ + chadmin wait replication-sync -t 10 -p 1 -w 4 + """ + When we execute query on clickhouse01 + """ + SYSTEM STOP FETCHES + """ + And we execute query on clickhouse02 + """ + INSERT INTO test.table_01 SELECT number FROM numbers(100) + """ + And we sleep for 5 seconds + When we try to execute command on clickhouse01 + """ + chadmin wait replication-sync -t 10 -p 1 -w 4 + """ + Then it fails + When we execute query on clickhouse01 + """ + SYSTEM START FETCHES + """ + When we execute command on clickhouse01 + """ + chadmin wait replication-sync -t 10 -p 1 -w 4 + """ diff --git a/tests/features/monrun.feature b/tests/features/monrun.feature index 4006986c..da2e359a 100644 --- a/tests/features/monrun.feature +++ b/tests/features/monrun.feature @@ -139,6 +139,23 @@ Feature: ch-monitoring tool """ 0;OK """ + When we execute query on clickhouse01 + """ + SYSTEM STOP FETCHES + """ + And we execute query on clickhouse02 + """ + INSERT INTO test.table_01 SELECT number FROM numbers(100) + """ + And we sleep for 5 seconds + And we execute command on clickhouse01 + """ + ch-monitoring replication-lag -w 4 + """ + Then we get response contains + """ + 1; + """ Scenario: Check System queues size When we execute command on clickhouse01