From a261a76f57d5b7fdb848c02cf7695e053c07ff53 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=D0=9E=D0=BB=D0=B5=D0=B3?= <150132506+iddqdex@users.noreply.github.com> Date: Sat, 16 Nov 2024 11:30:11 +0300 Subject: [PATCH] Refactoring parsing of nodes info response from cluster (#11681) --- ydb/tests/olap/lib/allure_utils.py | 5 +- ydb/tests/olap/lib/ydb_cli.py | 10 ++- ydb/tests/olap/lib/ydb_cluster.py | 106 +++++++++++++++++++---------- 3 files changed, 75 insertions(+), 46 deletions(-) diff --git a/ydb/tests/olap/lib/allure_utils.py b/ydb/tests/olap/lib/allure_utils.py index a0555f5123ca..e9fb587ad6c4 100644 --- a/ydb/tests/olap/lib/allure_utils.py +++ b/ydb/tests/olap/lib/allure_utils.py @@ -52,9 +52,8 @@ def _set_results_plot(test_info: dict[str, str], suite: str, test: str, refferen def _set_logs_command(test_info: dict[str, str], start_time: float, end_time: float): hosts = [] for node in YdbCluster.get_cluster_nodes(): - ss = node.get('SystemState', {}) - if 'Storage' in ss.get('Roles', []): - hosts.append(ss.get('Host')) + if node.role == YdbCluster.Node.Role.STORAGE: + hosts.append(node.host) hosts_cmd = ' '.join([f'-H {h}' for h in hosts]) start = datetime.fromtimestamp(start_time, UTC).isoformat() end = datetime.fromtimestamp(end_time, UTC).isoformat() diff --git a/ydb/tests/olap/lib/ydb_cli.py b/ydb/tests/olap/lib/ydb_cli.py index b47f2f79b727..ef3dfa2c00fa 100644 --- a/ydb/tests/olap/lib/ydb_cli.py +++ b/ydb/tests/olap/lib/ydb_cli.py @@ -7,7 +7,6 @@ from ydb.tests.olap.lib.ydb_cluster import YdbCluster from ydb.tests.olap.lib.utils import get_external_param from enum import StrEnum -from time import time from types import TracebackType @@ -152,13 +151,12 @@ def _load_query_out(self) -> None: self.result.query_out = r.read() @staticmethod - def _get_nodes_info() -> dict[str, dict[str, int]]: - nodes = YdbCluster.get_cluster_nodes(db_only=True) + def _get_nodes_info() -> dict[str, dict[str, Any]]: return { - n['SystemState']['Host']: { - 'start_time': int(int(n['SystemState'].get('StartTime', time() * 1000)) / 1000) + n.host: { + 'start_time': n.start_time } - for n in nodes + for n in YdbCluster.get_cluster_nodes(db_only=True) } def _check_nodes(self): diff --git a/ydb/tests/olap/lib/ydb_cluster.py b/ydb/tests/olap/lib/ydb_cluster.py index 4dd3c48175fd..b939998babd5 100644 --- a/ydb/tests/olap/lib/ydb_cluster.py +++ b/ydb/tests/olap/lib/ydb_cluster.py @@ -8,6 +8,7 @@ from copy import deepcopy from time import sleep, time from typing import List, Optional +from enum import Enum LOGGER = logging.getLogger() @@ -20,6 +21,33 @@ def __init__(self, url: str, caption: str = 'link') -> None: self.url = f'https://{self.url}' self.caption = caption + class Node: + class Role(Enum): + UNKNOWN = 0 + STORAGE = 1 + COMPUTE = 2 + + class Tablet: + def __init__(self, desc: dict): + self.state: str = desc.get('State', 'Red') + self.type: str = desc.get('Type', 'Unknown') + self.count: int = desc.get('Count', 0) + + def __init__(self, desc: dict): + ss = desc.get('SystemState', {}) + self.host: str = ss.get('Host', '') + self.disconnected: bool = desc.get('Disconnected', False) + self.cluster_name: str = ss.get('ClusterName', '') + self.version: str = ss.get('Version', '') + self.start_time: float = 0.001 * int(ss.get('StartTime', time() * 1000)) + if 'Storage' in ss.get('Roles', []): + self.role = YdbCluster.Node.Role.STORAGE + elif 'Tenants' in ss.get('Roles', []): + self.role = YdbCluster.Node.Role.COMPUTE + else: + self.role = YdbCluster.Node.Role.UNKNOWN + self.tablets = [YdbCluster.Node.Tablet(t) for t in desc.get('Tablets', [])] + _ydb_driver = None _results_driver = None _cluster_info = None @@ -53,7 +81,7 @@ def _get_service_url(cls): return f'http://{host}:{port}' @classmethod - def get_cluster_nodes(cls, path: Optional[str] = None, db_only: bool = False) -> list[dict[str:any]]: + def get_cluster_nodes(cls, path: Optional[str] = None, db_only: bool = False) -> list[YdbCluster.Node]: try: url = f'{cls._get_service_url()}/viewer/json/nodes?' if db_only or path is not None: @@ -64,12 +92,17 @@ def get_cluster_nodes(cls, path: Optional[str] = None, db_only: bool = False) -> # token = os.getenv('OLAP_YDB_OAUTH', None) # if token is not None: # headers['Authorization'] = token - data = requests.get(url, headers=headers).json() - nodes = data.get('Nodes', []) - return nodes + response = requests.get(url, headers=headers) + response.raise_for_status() + data = response.json() + if not isinstance(data, dict): + raise Exception(f'Incorrect response type: {data}') + return [YdbCluster.Node(n) for n in data.get('Nodes', [])] + except requests.HTTPError as e: + LOGGER.error(f'{e.strerror}: {e.response.content}') except Exception as e: LOGGER.error(e) - return [], 0 + return [] @classmethod def get_cluster_info(cls): @@ -77,14 +110,14 @@ def get_cluster_info(cls): version = '' cluster_name = '' nodes_wilcard = '' - nodes = cls.get_cluster_nodes() + nodes = cls.get_cluster_nodes(db_only=True) for node in nodes: - n = node.get('SystemState', {}) - cluster_name = n.get('ClusterName', cluster_name) - version = n.get('Version', version) - for tenant in n.get('Tenants', []): - if tenant.endswith(cls.ydb_database): - nodes_wilcard = n.get('Host', nodes_wilcard).split('.')[0].rstrip('0123456789') + if not cluster_name: + cluster_name = node.cluster_name + if not version: + version = node.version + if not nodes_wilcard and node.role == YdbCluster.Node.Role.COMPUTE: + nodes_wilcard = node.host.split('.')[0].rstrip('0123456789') cls._cluster_info = { 'database': cls.ydb_database, 'version': version, @@ -162,6 +195,14 @@ def _get_tables(cls, path): result.append(full_path) return result + @staticmethod + def _join_errors(log_level: int, errors: list[str]): + if len(errors) > 0: + error = ', '.join(errors) + LOGGER.log(log_level, error) + return error + return None + @classmethod @allure.step('Execute scan query') def execute_single_result_query(cls, query, timeout=10): @@ -180,21 +221,14 @@ def execute_single_result_query(cls, query, timeout=10): @classmethod @allure.step('Check if YDB alive') def check_if_ydb_alive(cls, timeout=10, balanced_paths=None) -> tuple[str, str]: - def _check_node(n): - name = 'UnknownNode' - error = None - try: - ss = n.get('SystemState', {}) - name = ss.get("Host") - start_time = int(ss.get('StartTime', int(time()) * 1000)) / 1000 - uptime = int(time()) - start_time - if uptime < 15: - error = f'Node {name} too yong: {uptime}' - except BaseException as ex: - error = f"Error while process node {name}: {ex}" - if error: - LOGGER.error(error) - return error + def _check_node(n: YdbCluster.Node): + errors = [] + if n.disconnected: + errors.append(f'Node {n.host} disconnected') + uptime = time() - n.start_time + if uptime < 15: + errors.append(f'Node {n.host} too yong: {uptime}') + return cls._join_errors(logging.ERROR, errors) errors = [] warnings = [] @@ -216,7 +250,7 @@ def _check_node(n): else: ok_node_count += 1 if ok_node_count < nodes_count: - errors.append(f'Only {ok_node_count} from {ok_node_count} dynnodes are ok: {",".join(node_errors)}') + errors.append(f'Only {ok_node_count} from {nodes_count} dynnodes are ok: {",".join(node_errors)}') paths_to_balance = [] if isinstance(balanced_paths, str): paths_to_balance += cls._get_tables(balanced_paths) @@ -232,11 +266,11 @@ def _check_node(n): min = 0 for tn in table_nodes: tablet_count = 0 - for tablet in tn.get("Tablets", []): - if tablet.get("State") != "Green": - warnings.append(f'Node {tn.get("SystemState", {}).get("Host")}: {tablet.get("Count")} tablets of type {tablet.get("Type")} in {tablet.get("State")} state') - if tablet.get("Type") in {"ColumnShard", "DataShard"}: - tablet_count += tablet.get("Count") + for tablet in tn.tablets: + if tablet.count > 0 and tablet.state != "Green": + warnings.append(f'Node {tn.host}: {tablet.count} tablets of type {tablet.type} in {tablet.state} state') + if tablet.type in {"ColumnShard", "DataShard"}: + tablet_count += tablet.count if tablet_count > 0: if min is None or tablet_count < min: min = tablet_count @@ -251,10 +285,8 @@ def _check_node(n): cls.execute_single_result_query("select 1", timeout) except BaseException as ex: errors.append(f"Cannot connect to YDB: {ex}") - error = ', '.join(errors) if len(errors) > 0 else None - warning = ', '.join(warnings) if len(warnings) > 0 else None - LOGGER.error(f'Errors: {error}, warnings: {warning}') - return error, warning + + return cls._join_errors(logging.ERROR, errors), cls._join_errors(logging.WARNING, warnings) @classmethod @allure.step('Wait YDB alive')