From 1c09c5d05253f0f8dae47b7264b7f39c11e13bc4 Mon Sep 17 00:00:00 2001 From: Semyon Date: Tue, 27 Aug 2024 14:57:32 +0300 Subject: [PATCH] Add concurrent tiering updates to OLAP scenario test (#8262) --- .../olap/scenario/helpers/data_generators.py | 26 +++ .../scenario/helpers/scenario_tests_helper.py | 70 +++++-- ydb/tests/olap/scenario/test_alter_tiering.py | 178 ++++++++++++++---- ydb/tests/olap/scenario/ya.make | 1 + 4 files changed, 225 insertions(+), 50 deletions(-) diff --git a/ydb/tests/olap/scenario/helpers/data_generators.py b/ydb/tests/olap/scenario/helpers/data_generators.py index 286f4d8347a0..dcb34b92089b 100644 --- a/ydb/tests/olap/scenario/helpers/data_generators.py +++ b/ydb/tests/olap/scenario/helpers/data_generators.py @@ -68,6 +68,32 @@ def generate_value(self, column: ScenarioTestHelper.Column) -> Any: return self._value +class ColumnValueGeneratorLambda(IColumnValueGenerator): + """Arbitrary value generator. + + Uses arbitrary function to generate values.""" + + def __init__(self, func) -> None: + """Constructor. + + Args: + func: Function used to generate values. + Example: + DataGeneratorPerColumn( + self.schema2, 10, + ColumnValueGeneratorDefault(init_value=10)) + .with_column('not_level', ColumnValueGeneratorLambda(lambda: time.now()) + ) + """ + + super().__init__() + self._func = func + + @override + def generate_value(self, column: ScenarioTestHelper.Column) -> Any: + return self._func() + + class ColumnValueGeneratorRandom(IColumnValueGenerator): """Random column value generator. diff --git a/ydb/tests/olap/scenario/helpers/scenario_tests_helper.py b/ydb/tests/olap/scenario/helpers/scenario_tests_helper.py index 43aebe02cb9c..b6f42b3edacb 100644 --- a/ydb/tests/olap/scenario/helpers/scenario_tests_helper.py +++ b/ydb/tests/olap/scenario/helpers/scenario_tests_helper.py @@ -8,6 +8,7 @@ from ydb.tests.olap.lib.ydb_cluster import YdbCluster from abc import abstractmethod, ABC from typing import Set, List, Dict, Any, Callable +from time import sleep class TestContext: @@ -59,6 +60,13 @@ class ScenarioTestHelper: sth.execute_scheme_query(DropTable(table_name)) """ + DEFAULT_RETRIABLE_ERRORS = { + ydb.StatusCode.OVERLOADED, + ydb.StatusCode.BAD_SESSION, + ydb.StatusCode.CONNECTION_LOST, + ydb.StatusCode.UNAVAILABLE, + } + class Column: """A class that describes a table column.""" @@ -247,21 +255,37 @@ def _add_not_empty(p: str, dir: str): return result @staticmethod - def _run_with_expected_status(operation: callable, expected_status: ydb.StatusCode | Set[ydb.StatusCode]): + def _run_with_expected_status( + operation: callable, + expected_status: ydb.StatusCode | Set[ydb.StatusCode], + retriable_status: ydb.StatusCode | Set[ydb.StatusCode] = {}, + n_retries=0, + ): if isinstance(expected_status, ydb.StatusCode): expected_status = {expected_status} - try: - result = operation() - if ydb.StatusCode.SUCCESS not in expected_status: - pytest.fail( - f'Unexpected status: must be in {repr(expected_status)}, but get {repr(ydb.StatusCode.SUCCESS)}' - ) - return result - except ydb.issues.Error as e: - allure.attach(f'{repr(e.status)}: {e}', 'request status', allure.attachment_type.TEXT) - if e.status not in expected_status: - pytest.fail(f'Unexpected status: must be in {repr(expected_status)}, but get {repr(e)}') - return None + if isinstance(retriable_status, ydb.StatusCode): + retriable_status = {retriable_status} + + result = None + error = None + status = None + for _ in range(n_retries + 1): + try: + result = operation() + error = None + status = ydb.StatusCode.SUCCESS + except ydb.issues.Error as e: + result = None + error = e + status = error.status + allure.attach(f'{repr(status)}: {error}', 'request status', allure.attachment_type.TEXT) + + if status in expected_status: + return result + if status not in retriable_status: + pytest.fail(f'Unexpected status: must be in {repr(expected_status)}, but get {repr(error or status)}') + sleep(3) + pytest.fail(f'Retries exceeded with unexpected status: must be in {repr(expected_status)}, but get {repr(error or status)}') def _bulk_upsert_impl( self, tablename: str, data_generator: ScenarioTestHelper.IDataGenerator, expected_status: ydb.StatusCode | Set[ydb.StatusCode] @@ -302,6 +326,8 @@ def execute_scheme_query( self, yqlble: ScenarioTestHelper.IYqlble, expected_status: ydb.StatusCode | Set[ydb.StatusCode] = ydb.StatusCode.SUCCESS, + retries=0, + retriable_status: ydb.StatusCode | Set[ydb.StatusCode] = DEFAULT_RETRIABLE_ERRORS, comment: str = '', ) -> None: """Run a schema query on the database under test. @@ -330,7 +356,7 @@ def execute_scheme_query( yql = yqlble.to_yql(self.test_context) allure.attach(yql, 'request', allure.attachment_type.TEXT) self._run_with_expected_status( - lambda: YdbCluster.get_ydb_driver().table_client.session().create().execute_scheme(yql), expected_status + lambda: YdbCluster.get_ydb_driver().table_client.session().create().execute_scheme(yql), expected_status, retriable_status, retries ) @classmethod @@ -524,6 +550,22 @@ def get_table_rows_count(self, tablename: str, comment: str = '') -> int: result_set = self.execute_scan_query(f'SELECT count(*) FROM `{self.get_full_path(tablename)}`') return result_set.result_set.rows[0][0] + @allure.step('Describe table {path}') + def describe_table(self, path: str, settings: ydb.DescribeTableSettings = None) -> ydb.TableSchemeEntry: + """Get table description. + + Args: + path: Relative path to a table. + settings: DescribeTableSettings. + + Returns: + TableSchemeEntry object. + """ + + return self._run_with_expected_status( + lambda: YdbCluster.get_ydb_driver().table_client.session().create().describe_table(self.get_full_path(path), settings), ydb.StatusCode.SUCCESS + ) + @allure.step('List path {path}') def list_path(self, path: str) -> List[ydb.SchemeEntry]: """Recursively describe the path in the database under test. diff --git a/ydb/tests/olap/scenario/test_alter_tiering.py b/ydb/tests/olap/scenario/test_alter_tiering.py index cae96e9da48d..d328cf4c82cc 100644 --- a/ydb/tests/olap/scenario/test_alter_tiering.py +++ b/ydb/tests/olap/scenario/test_alter_tiering.py @@ -5,6 +5,7 @@ CreateTable, CreateTableStore, DropTable, + DropTableStore, ) from helpers.tiering_helper import ( ObjectStorageParams, @@ -19,15 +20,20 @@ DropTieringRule, ) import helpers.data_generators as dg -from helpers.table_helper import AlterTable +from helpers.table_helper import ( + AlterTable, + AlterTableStore +) -from ydb.tests.olap.lib.utils import get_external_param -from ydb import PrimitiveType +from ydb.tests.olap.lib.utils import get_external_param, external_param_is_true +from ydb import PrimitiveType, StatusCode +import boto3 import datetime import random import threading -from typing import Iterable -import time +from typing import Iterable, Optional +import itertools +from string import ascii_lowercase class TestAlterTiering(BaseTestSet): @@ -37,6 +43,7 @@ class TestAlterTiering(BaseTestSet): .with_column(name='writer', type=PrimitiveType.Uint32, not_null=True) .with_column(name='value', type=PrimitiveType.Uint64, not_null=True) .with_column(name='data', type=PrimitiveType.String, not_null=True) + .with_column(name='timestamp2', type=PrimitiveType.Timestamp, not_null=True) .with_key_columns('timestamp', 'writer', 'value') ) @@ -59,8 +66,9 @@ def _drop_tables(self, prefix: str, count: int, ctx: TestContext): for i in range(count): sth.execute_scheme_query(DropTable(f'store/{prefix}_{i}')) - def _upsert(self, ctx: TestContext, table: str, writer_id: int, duration: datetime.timedelta): + def _loop_upsert(self, ctx: TestContext, table: str, writer_id: int, duration: datetime.timedelta, allow_scan_errors: bool = False): deadline = datetime.datetime.now() + duration + expected_scan_status = {StatusCode.SUCCESS, StatusCode.GENERIC_ERROR} if allow_scan_errors else {StatusCode.SUCCESS} sth = ScenarioTestHelper(ctx) rows_written = 0 i = 0 @@ -68,33 +76,91 @@ def _upsert(self, ctx: TestContext, table: str, writer_id: int, duration: dateti sth.bulk_upsert( table, dg.DataGeneratorPerColumn(self.schema1, 1000) - .with_column('timestamp', dg.ColumnValueGeneratorRandom(null_probability=0)) + .with_column('timestamp', dg.ColumnValueGeneratorLambda(lambda: int(datetime.datetime.now().timestamp() * 1000000))) .with_column('writer', dg.ColumnValueGeneratorConst(writer_id)) .with_column('value', dg.ColumnValueGeneratorSequential(rows_written)) .with_column('data', dg.ColumnValueGeneratorConst(random.randbytes(1024))) + .with_column('timestamp2', dg.ColumnValueGeneratorRandom(null_probability=0)) ) rows_written += 1000 i += 1 - if rows_written > 100000 and i % 10 == 0: - scan_result = sth.execute_scan_query(f'SELECT COUNT(*) FROM `{sth.get_full_path('store/table')}` WHERE writer == {writer_id}') - assert scan_result.result_set.rows[0][0] == rows_written + scan_result = sth.execute_scan_query(f'SELECT COUNT(*) FROM `{sth.get_full_path(table)}` WHERE writer == {writer_id}', expected_status=expected_scan_status) + assert scan_result.result_set.rows[0][0] == rows_written - def _change_tiering_rule(self, ctx: TestContext, table: str, tiering_rules: Iterable[str], duration: datetime.timedelta): + def _loop_change_tiering_rule(self, ctx: TestContext, table: str, tiering_rules: Iterable[str], duration: datetime.timedelta): deadline = datetime.datetime.now() + duration sth = ScenarioTestHelper(ctx) while datetime.datetime.now() < deadline: for tiering_rule in tiering_rules: - sth.execute_scheme_query(AlterTable(table).set_tiering(tiering_rule)) - sth.execute_scheme_query(AlterTable(table).reset_tiering()) + if tiering_rule is not None: + sth.execute_scheme_query(AlterTable(table).set_tiering(tiering_rule), retries=10) + else: + sth.execute_scheme_query(AlterTable(table).reset_tiering(), retries=10) + sth.execute_scheme_query(AlterTable(table).reset_tiering(), retries=10) + + def _loop_alter_tiering_rule(self, ctx: TestContext, tiering_rule: str, default_column_values: Iterable[str], config_values: Iterable[TieringPolicy], duration: datetime.timedelta): + deadline = datetime.datetime.now() + duration + sth = ScenarioTestHelper(ctx) + for default_column, config in zip(itertools.cycle(default_column_values), itertools.cycle(config_values)): + if datetime.datetime.now() >= deadline: + break + sth.execute_scheme_query(AlterTieringRule(tiering_rule, default_column, config), retries=10) + + def _loop_alter_column(self, ctx: TestContext, store: str, duration: datetime.timedelta): + column_name = 'tmp_column_' + ''.join(random.choice(ascii_lowercase) for _ in range(8)) + data_types = [PrimitiveType.Int8, PrimitiveType.Uint64, PrimitiveType.Datetime, PrimitiveType.Utf8] + + deadline = datetime.datetime.now() + duration + sth = ScenarioTestHelper(ctx) + while datetime.datetime.now() < deadline: + sth.execute_scheme_query(AlterTableStore(store).add_column(sth.Column(column_name, random.choice(data_types))), retries=10) + sth.execute_scheme_query(AlterTableStore(store).drop_column(column_name), retries=10) + + def _override_tier(self, sth, name, config): + sth.execute_scheme_query(CreateTierIfNotExists(name, config)) + sth.execute_scheme_query(AlterTier(name, config)) + + def _override_tiering_rule(self, sth, name, default_column, config): + sth.execute_scheme_query(CreateTieringRuleIfNotExists(name, default_column, config)) + sth.execute_scheme_query(AlterTieringRule(name, default_column, config)) - def scenario_alter_tiering_rule_while_writing(self, ctx: TestContext): - test_duration = datetime.timedelta(seconds=400) + def _make_s3_client(self, access_key, secret_key, endpoint): + session = boto3.Session( + aws_access_key_id=(access_key), + aws_secret_access_key=(secret_key), + region_name='ru-central1', + ) + return session.client('s3', endpoint_url=endpoint) - s3_endpoint = get_external_param('s3-endpoint', 'storage.yandexcloud.net') + def _count_objects(self, bucket_config: ObjectStorageParams): + s3 = self._make_s3_client(bucket_config.access_key, bucket_config.secret_key, bucket_config.endpoint) + paginator = s3.get_paginator('list_objects_v2') + page_iterator = paginator.paginate(Bucket=bucket_config.bucket) + + object_count = 0 + for page in page_iterator: + if 'Contents' in page: + object_count += len(page['Contents']) + + return object_count + + def scenario_many_tables(self, ctx: TestContext): + random.seed(42) + n_tables = 4 + + test_duration = datetime.timedelta(seconds=int(get_external_param('test-duration-seconds', '4600'))) + n_tables = int(get_external_param('tables', '4')) + n_writers = int(get_external_param('writers-per-table', '4')) + allow_s3_unavailability = external_param_is_true('allow-s3-unavailability') + is_standalone_tables = external_param_is_true('test-standalone-tables') + + s3_endpoint = get_external_param('s3-endpoint', 'http://storage.yandexcloud.net') s3_access_key = get_external_param('s3-access-key', 'YCAJEM3Pg9fMyuX9ZUOJ_fake') s3_secret_key = get_external_param('s3-secret-key', 'YCM7Ovup55wDkymyEtO8pw5F10_L5jtVY8w_fake') s3_buckets = get_external_param('s3-buckets', 'ydb-tiering-test-1,ydb-tiering-test-2').split(',') + assert len(s3_buckets) == 2, f'expected 2 bucket configs, got {len(s3_buckets)}' + s3_configs = [ ObjectStorageParams( scheme='HTTP', @@ -109,44 +175,84 @@ def scenario_alter_tiering_rule_while_writing(self, ctx: TestContext): sth = ScenarioTestHelper(ctx) tiers: list[str] = [] - tiering_rules: list[str] = [] for i, s3_config in enumerate(s3_configs): tiers.append(f'TestAlterTiering:tier{i}') - tiering_rules.append(f'TestAlterTiering:tiering_rule{i}') + self._override_tier(sth, tiers[-1], TierConfig(tiers[-1], s3_config)) + + tiering_policy_configs: list[TieringPolicy] = [] + tiering_policy_configs.append(TieringPolicy().with_rule(TieringRule(tiers[0], '1s'))) + tiering_policy_configs.append(TieringPolicy().with_rule(TieringRule(tiers[1], '1s'))) + tiering_policy_configs.append(TieringPolicy().with_rule(TieringRule(tiers[0], '100000d'))) + tiering_policy_configs.append(TieringPolicy().with_rule(TieringRule(tiers[1], '100000d'))) + tiering_policy_configs.append(TieringPolicy().with_rule(TieringRule(tiers[0], '1s')).with_rule(TieringRule(tiers[1], '100000d'))) + tiering_policy_configs.append(TieringPolicy().with_rule(TieringRule(tiers[1], '1s')).with_rule(TieringRule(tiers[0], '100000d'))) - tier_config = TierConfig(tiers[-1], s3_config) - tiering_config = TieringPolicy().with_rule(TieringRule(tiers[-1], '1s')) + tiering_rules: list[Optional[str]] = [] + for i, config in enumerate(tiering_policy_configs): + tiering_rules.append(f'TestAlterTiering:tiering_rule{i}') + self._override_tiering_rule(sth, tiering_rules[-1], 'timestamp', config) + tiering_rules.append(None) - sth.execute_scheme_query(CreateTierIfNotExists(tiers[-1], tier_config)) - sth.execute_scheme_query(CreateTieringRuleIfNotExists(tiering_rules[-1], 'timestamp', tiering_config)) + if not is_standalone_tables: + sth.execute_scheme_query(CreateTableStore('store').with_schema(self.schema1)) - sth.execute_scheme_query(AlterTier(tiers[-1], tier_config)) - sth.execute_scheme_query(AlterTieringRule(tiering_rules[-1], 'timestamp', tiering_config)) + tables: list[str] = [] + tables_for_tiering_modification: list[str] = [] + for i in range(n_tables): + if is_standalone_tables: + tables.append(f'table{i}') + else: + tables.append(f'store/table{i}') + tables_for_tiering_modification.append(tables[-1]) + sth.execute_scheme_query(CreateTable(tables[-1]).with_schema(self.schema1)) + for i, tiering_rule in enumerate([tiering_rules[0], tiering_rules[1]]): + if is_standalone_tables: + tables.append(f'extra_table{i}') + else: + tables.append(f'store/extra_table{i}') + sth.execute_scheme_query(CreateTable(tables[-1]).with_schema(self.schema1)) + sth.execute_scheme_query(AlterTable(tables[-1]).set_tiering(tiering_rule)) - sth.execute_scheme_query(CreateTableStore('store').with_schema(self.schema1)) - sth.execute_scheme_query(CreateTable('store/table').with_schema(self.schema1)) + if any(self._count_objects(bucket) != 0 for bucket in s3_configs): + assert any(sth.get_table_rows_count(table) != 0 for table in tables), \ + 'unrelated data in object storage: all tables are empty, but S3 is not' threads = [] - threads.append(self.TestThread( - target=self._change_tiering_rule, - args=[ctx, 'store/table', tiering_rules, test_duration] - )) - writer_id_offset = random.randint(0, 1 << 30) - for i in range(4): - threads.append(self.TestThread(target=self._upsert, args=[ctx, 'store/table', writer_id_offset + i, test_duration])) + # "Alter table drop column" causes scan failures + threads.append(self.TestThread(target=self._loop_alter_column, args=[ctx, 'store', test_duration])) + for table in tables_for_tiering_modification: + threads.append(self.TestThread( + target=self._loop_change_tiering_rule, + args=[ctx, table, random.sample(tiering_rules, len(tiering_rules)), test_duration] + )) + for i, table in enumerate(tables): + for writer in range(n_writers): + threads.append(self.TestThread(target=self._loop_upsert, args=[ctx, table, i * n_writers + writer, test_duration, allow_s3_unavailability])) + for tiering_rule in tiering_rules: + threads.append(self.TestThread( + target=self._loop_alter_tiering_rule, + args=[ctx, tiering_rule, random.sample(['timestamp', 'timestamp2'], 2), random.sample(tiering_policy_configs, len(tiering_policy_configs)), test_duration] + )) for thread in threads: thread.start() for thread in threads: thread.join() + assert any(self._count_objects(bucket) != 0 for bucket in s3_configs) + + for table in tables: + sth.execute_scheme_query(AlterTable(table).reset_tiering()) + for tiering in tiering_rules: sth.execute_scheme_query(DropTieringRule(tiering)) for tier in tiers: sth.execute_scheme_query(DropTier(tier)) - sth.execute_scheme_query(AlterTable('store/table').set_ttl('P1D', 'timestamp')) + for table in tables: + sth.execute_scheme_query(DropTable(table)) + if not is_standalone_tables: + sth.execute_scheme_query(DropTableStore('store')) - while sth.execute_scan_query(f'SELECT COUNT(*) FROM `{sth.get_full_path('store/table')}`').result_set.rows[0][0]: - time.sleep(10) + assert all(self._count_objects(bucket) == 0 for bucket in s3_configs) diff --git a/ydb/tests/olap/scenario/ya.make b/ydb/tests/olap/scenario/ya.make index 58a33a89fdba..f9eac7689943 100644 --- a/ydb/tests/olap/scenario/ya.make +++ b/ydb/tests/olap/scenario/ya.make @@ -17,6 +17,7 @@ PY3TEST() PEERDIR( contrib/python/allure-pytest contrib/python/allure-python-commons + contrib/python/boto3 contrib/python/pandas contrib/python/requests ydb/public/sdk/python