From 4996b7a7d70fc451cbc2c2e5e0def7f3af59ef9b Mon Sep 17 00:00:00 2001 From: Alexander Burmak Date: Fri, 24 Nov 2023 10:50:46 +0300 Subject: [PATCH] Use credentials from config in monitoring checks --- .../clickhouse/client/clickhouse_client.py | 23 ++++++++---- ch_tools/monrun_checks/ch_dist_tables.py | 5 ++- ch_tools/monrun_checks/ch_geobase.py | 5 ++- ch_tools/monrun_checks/ch_ping.py | 5 ++- ch_tools/monrun_checks/ch_replication_lag.py | 37 ++++++++++--------- ch_tools/monrun_checks/ch_resetup_state.py | 12 ++++-- ch_tools/monrun_checks/ch_ro_replica.py | 5 ++- ch_tools/monrun_checks/ch_system_queues.py | 17 +++++---- ch_tools/monrun_checks/ch_tls.py | 11 ++++-- ch_tools/monrun_checks/clickhouse_client.py | 13 +++++-- ch_tools/monrun_checks/clickhouse_info.py | 16 ++++---- ch_tools/monrun_checks/status.py | 2 +- 12 files changed, 90 insertions(+), 61 deletions(-) diff --git a/ch_tools/common/clickhouse/client/clickhouse_client.py b/ch_tools/common/clickhouse/client/clickhouse_client.py index cd034af9..07b67df1 100644 --- a/ch_tools/common/clickhouse/client/clickhouse_client.py +++ b/ch_tools/common/clickhouse/client/clickhouse_client.py @@ -136,13 +136,7 @@ def clickhouse_client(ctx): """ if not ctx.obj.get("chcli"): config = ctx.obj["config"]["clickhouse"] - - user = config["user"] - password = config["password"] - if ctx.obj.get("monitoring", False) and config["monitoring_user"]: - user = config["monitoring_user"] - password = config["monitoring_password"] - + user, password = clickhouse_credentials(ctx) ctx.obj["chcli"] = ClickhouseClient( host=config["host"], protocol=config["protocol"], @@ -155,3 +149,18 @@ def clickhouse_client(ctx): ) return ctx.obj["chcli"] + + +def clickhouse_credentials(ctx): + """ + Return credentials to connect to ClickHouse. + """ + config = ctx.obj["config"]["clickhouse"] + + user = config["user"] + password = config["password"] + if ctx.obj.get("monitoring", False) and config["monitoring_user"]: + user = config["monitoring_user"] + password = config["monitoring_password"] + + return user, password diff --git a/ch_tools/monrun_checks/ch_dist_tables.py b/ch_tools/monrun_checks/ch_dist_tables.py index a1589c57..ee3538dd 100644 --- a/ch_tools/monrun_checks/ch_dist_tables.py +++ b/ch_tools/monrun_checks/ch_dist_tables.py @@ -15,11 +15,12 @@ @click.option( "-w", "--warning", "warn", type=int, default=600, help="Warning threshold." ) -def dist_tables_command(crit, warn): +@click.pass_context +def dist_tables_command(ctx, crit, warn): """ Check for old chunks on Distributed tables. """ - ch_client = ClickhouseClient() + ch_client = ClickhouseClient(ctx) status = 0 issues = [] diff --git a/ch_tools/monrun_checks/ch_geobase.py b/ch_tools/monrun_checks/ch_geobase.py index 34489326..7b9d0930 100644 --- a/ch_tools/monrun_checks/ch_geobase.py +++ b/ch_tools/monrun_checks/ch_geobase.py @@ -8,11 +8,12 @@ @click.command("geobase") -def geobase_command(): +@click.pass_context +def geobase_command(ctx): """ Check that embedded geobase is configured. """ - ch_client = ClickhouseClient() + ch_client = ClickhouseClient(ctx) try: response = ch_client.execute("SELECT regionToName(CAST(1 AS UInt32))")[0][0] diff --git a/ch_tools/monrun_checks/ch_ping.py b/ch_tools/monrun_checks/ch_ping.py index da532f2b..1049b90e 100644 --- a/ch_tools/monrun_checks/ch_ping.py +++ b/ch_tools/monrun_checks/ch_ping.py @@ -15,13 +15,14 @@ "-c", "--critical", "crit", type=int, default=5, help="Critical threshold." ) @click.option("-w", "--warning", "warn", type=int, default=2, help="Warning threshold.") -def ping_command(number, crit, warn): +@click.pass_context +def ping_command(ctx, number, crit, warn): """ Ping all available ClickHouse ports. """ # pylint: disable=too-many-branches - ch_client = ClickhouseClient() + ch_client = ClickhouseClient(ctx) fails = { ClickhousePort.TCP: 0, diff --git a/ch_tools/monrun_checks/ch_replication_lag.py b/ch_tools/monrun_checks/ch_replication_lag.py index 17f623e0..761ef1d8 100644 --- a/ch_tools/monrun_checks/ch_replication_lag.py +++ b/ch_tools/monrun_checks/ch_replication_lag.py @@ -54,14 +54,17 @@ default=0, help="Show details about lag.", ) -def replication_lag_command(xcrit, crit, warn, mwarn, mcrit, verbose): +@click.pass_context +def replication_lag_command(ctx, xcrit, crit, warn, mwarn, mcrit, verbose): """ Check for replication lag across replicas. Should be: lag >= lag_with_errors, lag >= max_execution """ # pylint: disable=too-many-branches,too-many-locals - - lag, lag_with_errors, max_execution, max_merges, chart = get_replication_lag() + ch_client = ClickhouseClient(ctx) + lag, lag_with_errors, max_execution, max_merges, chart = get_replication_lag( + ch_client + ) msg_verbose = "" msg_verbose_2 = "\n\n" @@ -146,7 +149,7 @@ def replication_lag_command(xcrit, crit, warn, mwarn, mcrit, verbose): max_merges_warn_threshold = 1 max_merges_crit_threshold = 1 if max_merges > 0: - max_replicated_merges_in_queue = get_max_replicated_merges_in_queue() + max_replicated_merges_in_queue = get_max_replicated_merges_in_queue(ch_client) max_merges_warn_threshold = int(max_replicated_merges_in_queue * mwarn / 100.0) max_merges_crit_threshold = int(max_replicated_merges_in_queue * mcrit / 100.0) @@ -158,7 +161,7 @@ def replication_lag_command(xcrit, crit, warn, mwarn, mcrit, verbose): ) try: - replica_versions_mismatch = ClickhouseInfo.get_versions_count() > 1 + replica_versions_mismatch = ClickhouseInfo.get_versions_count(ctx) > 1 if replica_versions_mismatch: msg += ", ClickHouse versions on replicas mismatch" return Result(code=1, message=msg, verbose=msg_verbose) @@ -177,22 +180,22 @@ def replication_lag_command(xcrit, crit, warn, mwarn, mcrit, verbose): return Result(code=2, message=msg, verbose=msg_verbose) -def get_replication_lag(): +def get_replication_lag(ch_client): """ Get max absolute_delay from system.replicas. """ - tables = get_tables_with_replication_delay() + tables = get_tables_with_replication_delay(ch_client) chart: Dict[str, Dict[str, Any]] = {} for t in tables: key = "{database}.{table}".format(database=t["database"], table=t["table"]) chart[key] = {} chart[key]["delay"] = int(t["absolute_delay"]) - tables = filter_out_single_replica_tables(tables) + tables = filter_out_single_replica_tables(ch_client, tables) for t in tables: key = "{database}.{table}".format(database=t["database"], table=t["table"]) chart[key]["multi_replicas"] = True - tables = count_errors(tables, -1) + tables = count_errors(ch_client, tables, -1) max_merges = 0 for t in tables: @@ -230,15 +233,15 @@ def get_replication_lag(): return lag, lag_with_errors, max_execution, max_merges, chart -def get_tables_with_replication_delay(): +def get_tables_with_replication_delay(ch_client): """ Get tables with absolute_delay > 0. """ query = "SELECT database, table, zookeeper_path, absolute_delay FROM system.replicas WHERE absolute_delay > 0" - return ClickhouseClient().execute(query, compact=False) + return ch_client.execute(query, compact=False) -def filter_out_single_replica_tables(tables): +def filter_out_single_replica_tables(ch_client, tables): if not tables: return tables @@ -255,10 +258,10 @@ def filter_out_single_replica_tables(tables): "('{0}', '{1}')".format(t["database"], t["table"]) for t in tables ) ) - return ClickhouseClient().execute(query, False) + return ch_client.execute(query, False) -def count_errors(tables, exceptions_limit): +def count_errors(ch_client, tables, exceptions_limit): """ Add count of replication errors. """ @@ -286,7 +289,7 @@ def count_errors(tables, exceptions_limit): ), limit=limit, ) - return ClickhouseClient().execute(query, False) + return ch_client.execute(query, False) def is_userfault_exception(exception): @@ -308,14 +311,14 @@ def is_userfault_exception(exception): return False -def get_max_replicated_merges_in_queue(): +def get_max_replicated_merges_in_queue(ch_client): """ Get max_replicated_merges_in_queue value """ query = """ SELECT value FROM system.merge_tree_settings WHERE name='max_replicated_merges_in_queue' """ - res = ClickhouseClient().execute(query, True) + res = ch_client.execute(query, True) if not res: return ( 16 # 16 is default value for 'max_replicated_merges_in_queue' in ClickHouse diff --git a/ch_tools/monrun_checks/ch_resetup_state.py b/ch_tools/monrun_checks/ch_resetup_state.py index b8e88126..19a2359a 100644 --- a/ch_tools/monrun_checks/ch_resetup_state.py +++ b/ch_tools/monrun_checks/ch_resetup_state.py @@ -7,6 +7,7 @@ import psutil import requests +from ch_tools.common.clickhouse.client.clickhouse_client import clickhouse_credentials from ch_tools.common.clickhouse.config.path import CLICKHOUSE_RESETUP_CONFIG_PATH from ch_tools.common.result import Result from ch_tools.monrun_checks.exceptions import die @@ -16,7 +17,8 @@ @click.option("-p", "--port", "port", type=int, help="ClickHouse HTTP(S) port to use.") @click.option("-s", "--ssl", "ssl", is_flag=True, help="Use HTTPS rather than HTTP.") @click.option("--ca_bundle", "ca_bundle", help="Path to CA bundle to use.") -def resetup_state_command(port, ssl, ca_bundle): +@click.pass_context +def resetup_state_command(ctx, port, ssl, ca_bundle): """ Check state of resetup process. """ @@ -26,7 +28,7 @@ def resetup_state_command(port, ssl, ca_bundle): check_resetup_required() host = socket.getfqdn() - if request(host, port, ssl, ca_bundle): + if request(ctx, host, port, ssl, ca_bundle): return Result(2, "ClickHouse is listening on ports reserved for resetup") if os.path.isfile(CLICKHOUSE_RESETUP_CONFIG_PATH): @@ -70,7 +72,7 @@ def check_resetup_required(): die(0, "OK") -def request(host, port, ssl, ca_bundle, query=None): +def request(ctx, host, port, ssl, ca_bundle, query=None): """ Send request to ClickHouse. """ @@ -81,11 +83,13 @@ def request(host, port, ssl, ca_bundle, query=None): if query: params["query"] = query + user, password = clickhouse_credentials(ctx) r = requests.get( "{0}://{1}:{2}".format(protocol, host, port), params=params, headers={ - "X-ClickHouse-User": "mdb_monitor", + "X-ClickHouse-User": user, + "X-ClickHouse-Key": password, }, timeout=1, verify=verify, diff --git a/ch_tools/monrun_checks/ch_ro_replica.py b/ch_tools/monrun_checks/ch_ro_replica.py index 2f93b5f7..9bc636e8 100644 --- a/ch_tools/monrun_checks/ch_ro_replica.py +++ b/ch_tools/monrun_checks/ch_ro_replica.py @@ -5,11 +5,12 @@ @click.command("ro-replica") -def ro_replica_command(): +@click.pass_context +def ro_replica_command(ctx): """ Check for readonly replicated tables. """ - ch_client = ClickhouseClient() + ch_client = ClickhouseClient(ctx) response = ch_client.execute( "SELECT database, table FROM system.replicas WHERE is_readonly" diff --git a/ch_tools/monrun_checks/ch_system_queues.py b/ch_tools/monrun_checks/ch_system_queues.py index 4d95b3a1..e4ea9773 100644 --- a/ch_tools/monrun_checks/ch_system_queues.py +++ b/ch_tools/monrun_checks/ch_system_queues.py @@ -16,7 +16,8 @@ @click.option( "-f", "--config_file", "conf", help="Config file with theshholds per each table." ) -def system_queues_command(crit, warn, conf): +@click.pass_context +def system_queues_command(ctx, crit, warn, conf): """ Check system queues. """ @@ -25,8 +26,8 @@ def system_queues_command(crit, warn, conf): else: config = {"triggers": {"default": {"crit": crit, "warn": warn}}} - metrics = get_metrics() - return check_metrics(metrics, config) + metrics = get_metrics(ctx) + return check_metrics(ctx, metrics, config) def get_config(conf): @@ -37,7 +38,7 @@ def get_config(conf): return yaml.safe_load(f) -def get_metrics(): +def get_metrics(ctx): """ Select and return metrics form system.replicas. """ @@ -45,10 +46,10 @@ def get_metrics(): "SELECT database, table, future_parts, parts_to_check, queue_size," " inserts_in_queue, merges_in_queue FROM system.replicas" ) - return ClickhouseClient().execute(query, compact=False) + return ClickhouseClient(ctx).execute(query, compact=False) -def check_metrics(metrics, config): +def check_metrics(ctx, metrics, config): """ Check that metrics are within acceptable levels. """ @@ -79,7 +80,7 @@ def check_metrics(metrics, config): table_status = status_map[prior] if table_status > 1: if versions_count == 0: - versions_count = ClickhouseInfo.get_versions_count() + versions_count = ClickhouseInfo.get_versions_count(ctx) if versions_count > 1: table_status = 1 prior = "crit->warn" @@ -94,7 +95,7 @@ def check_metrics(metrics, config): status = triggers[0][0] message = " ".join(x[1] for x in triggers) if versions_count == 0: - versions_count = ClickhouseInfo.get_versions_count() + versions_count = ClickhouseInfo.get_versions_count(ctx) if versions_count > 1: message += " ClickHouse versions on replicas mismatch" diff --git a/ch_tools/monrun_checks/ch_tls.py b/ch_tools/monrun_checks/ch_tls.py index 345623ec..9214a8fe 100644 --- a/ch_tools/monrun_checks/ch_tls.py +++ b/ch_tools/monrun_checks/ch_tls.py @@ -28,13 +28,16 @@ default=None, help="Comma separated list of ports. By default read from ClickHouse config", ) -def tls_command(crit: int, warn: int, ports: Optional[str]) -> Result: +@click.pass_context +def tls_command( + ctx: click.Context, crit: int, warn: int, ports: Optional[str] +) -> Result: """ Check TLS certificate for expiration and that actual cert from fs used. """ file_certificate, _ = read_cert_file() - for port in get_ports(ports): + for port in get_ports(ctx, ports): try: addr: Tuple[str, int] = (socket.getfqdn(), int(port)) cert: str = ssl.get_server_certificate(addr) @@ -54,10 +57,10 @@ def tls_command(crit: int, warn: int, ports: Optional[str]) -> Result: return Result(0, "OK") -def get_ports(ports: Optional[str]) -> List[str]: +def get_ports(ctx: click.Context, ports: Optional[str]) -> List[str]: if ports: return ports.split(",") - client = ClickhouseClient() + client = ClickhouseClient(ctx) return [ client.get_port(ClickhousePort.HTTPS), client.get_port(ClickhousePort.TCP_SECURE), diff --git a/ch_tools/monrun_checks/clickhouse_client.py b/ch_tools/monrun_checks/clickhouse_client.py index 956b67e1..6d7f9d67 100644 --- a/ch_tools/monrun_checks/clickhouse_client.py +++ b/ch_tools/monrun_checks/clickhouse_client.py @@ -7,6 +7,7 @@ import requests +from ch_tools.common.clickhouse.client.clickhouse_client import clickhouse_credentials from ch_tools.monrun_checks.exceptions import die @@ -43,9 +44,10 @@ class ClickhouseClient: port_settings: Dict[str, str] = {} cert_path = "/etc/clickhouse-server/ssl/allCAs.pem" - def __init__(self): + def __init__(self, ctx): self.__get_settings() self.host = socket.getfqdn() + self.user, self.password = clickhouse_credentials(ctx) def __execute_http(self, query, port=ClickhousePort.HTTPS): # Private method, we are sure that port is https or http and presents in config @@ -58,7 +60,8 @@ def __execute_http(self, query, port=ClickhousePort.HTTPS): if query else {}, headers={ - "X-ClickHouse-User": "_monitor", + "X-ClickHouse-User": self.user, + "X-ClickHouse-Key": self.password, }, timeout=10, verify=self.cert_path if port == ClickhousePort.HTTPS else None, @@ -76,9 +79,11 @@ def __execute_tcp(self, query, port=ClickhousePort.TCP_SECURE): self.host, "--port", self.port_settings[port], - "--user", - "_monitor", ] + if self.user: + cmd.extend(("--user", self.user)) + if self.password: + cmd.extend(("--password", self.password)) if port == ClickhousePort.TCP_SECURE: cmd.append("--secure") diff --git a/ch_tools/monrun_checks/clickhouse_info.py b/ch_tools/monrun_checks/clickhouse_info.py index 29b9140e..3f71060b 100644 --- a/ch_tools/monrun_checks/clickhouse_info.py +++ b/ch_tools/monrun_checks/clickhouse_info.py @@ -6,28 +6,28 @@ class ClickhouseInfo: @classmethod @functools.lru_cache(maxsize=1) - def get_versions_count(cls): + def get_versions_count(cls, ctx): """ Count different clickhouse versions in cluster. """ - ch_client = ClickhouseClient() + ch_client = ClickhouseClient(ctx) # I believe that all hosts in cluster have the same port set, so check current for security port remote_command = ( "remoteSecure" if ch_client.check_port(ClickhousePort.TCP_SECURE) else "remote" ) - replicas = ",".join(cls.get_replicas()) + replicas = ",".join(cls.get_replicas(ctx)) query = f"""SELECT uniq(version()) FROM {remote_command}('{replicas}', system.one)""" return int(ch_client.execute(query)[0][0]) @classmethod @functools.lru_cache(maxsize=1) - def get_replicas(cls): + def get_replicas(cls, ctx): """ Get hostnames of replicas. """ - cluster = cls.get_cluster() + cluster = cls.get_cluster(ctx) query = f""" SELECT host_name FROM system.clusters @@ -35,13 +35,13 @@ def get_replicas(cls): AND shard_num = (SELECT shard_num FROM system.clusters WHERE host_name = hostName() AND cluster = '{cluster}') """ - return [row[0] for row in ClickhouseClient().execute(query)] + return [row[0] for row in ClickhouseClient(ctx).execute(query)] @classmethod @functools.lru_cache(maxsize=1) - def get_cluster(cls): + def get_cluster(cls, ctx): """ Get cluster identifier. """ query = "SELECT substitution FROM system.macros WHERE macro = 'cluster'" - return ClickhouseClient().execute(query)[0][0] + return ClickhouseClient(ctx).execute(query)[0][0] diff --git a/ch_tools/monrun_checks/status.py b/ch_tools/monrun_checks/status.py index 81337eaf..18fba3d4 100644 --- a/ch_tools/monrun_checks/status.py +++ b/ch_tools/monrun_checks/status.py @@ -18,7 +18,7 @@ def status_impl(ctx): Perform all checks. """ checks_status = [] - ctx.obj = {"status_mode": True} + ctx.obj["status_mode"] = True for cmd in commands: status = ctx.invoke(cmd) checks_status.append(