Skip to content

Commit

Permalink
Add concurrent tiering updates to OLAP scenario test (ydb-platform#8262)
Browse files Browse the repository at this point in the history
  • Loading branch information
swalrus1 authored Aug 27, 2024
1 parent ac92ce0 commit 1c09c5d
Show file tree
Hide file tree
Showing 4 changed files with 225 additions and 50 deletions.
26 changes: 26 additions & 0 deletions ydb/tests/olap/scenario/helpers/data_generators.py
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,32 @@ def generate_value(self, column: ScenarioTestHelper.Column) -> Any:
return self._value


class ColumnValueGeneratorLambda(IColumnValueGenerator):
"""Arbitrary value generator.
Uses arbitrary function to generate values."""

def __init__(self, func) -> None:
"""Constructor.
Args:
func: Function used to generate values.
Example:
DataGeneratorPerColumn(
self.schema2, 10,
ColumnValueGeneratorDefault(init_value=10))
.with_column('not_level', ColumnValueGeneratorLambda(lambda: time.now())
)
"""

super().__init__()
self._func = func

@override
def generate_value(self, column: ScenarioTestHelper.Column) -> Any:
return self._func()


class ColumnValueGeneratorRandom(IColumnValueGenerator):
"""Random column value generator.
Expand Down
70 changes: 56 additions & 14 deletions ydb/tests/olap/scenario/helpers/scenario_tests_helper.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
from ydb.tests.olap.lib.ydb_cluster import YdbCluster
from abc import abstractmethod, ABC
from typing import Set, List, Dict, Any, Callable
from time import sleep


class TestContext:
Expand Down Expand Up @@ -59,6 +60,13 @@ class ScenarioTestHelper:
sth.execute_scheme_query(DropTable(table_name))
"""

DEFAULT_RETRIABLE_ERRORS = {
ydb.StatusCode.OVERLOADED,
ydb.StatusCode.BAD_SESSION,
ydb.StatusCode.CONNECTION_LOST,
ydb.StatusCode.UNAVAILABLE,
}

class Column:
"""A class that describes a table column."""

Expand Down Expand Up @@ -247,21 +255,37 @@ def _add_not_empty(p: str, dir: str):
return result

@staticmethod
def _run_with_expected_status(operation: callable, expected_status: ydb.StatusCode | Set[ydb.StatusCode]):
def _run_with_expected_status(
operation: callable,
expected_status: ydb.StatusCode | Set[ydb.StatusCode],
retriable_status: ydb.StatusCode | Set[ydb.StatusCode] = {},
n_retries=0,
):
if isinstance(expected_status, ydb.StatusCode):
expected_status = {expected_status}
try:
result = operation()
if ydb.StatusCode.SUCCESS not in expected_status:
pytest.fail(
f'Unexpected status: must be in {repr(expected_status)}, but get {repr(ydb.StatusCode.SUCCESS)}'
)
return result
except ydb.issues.Error as e:
allure.attach(f'{repr(e.status)}: {e}', 'request status', allure.attachment_type.TEXT)
if e.status not in expected_status:
pytest.fail(f'Unexpected status: must be in {repr(expected_status)}, but get {repr(e)}')
return None
if isinstance(retriable_status, ydb.StatusCode):
retriable_status = {retriable_status}

result = None
error = None
status = None
for _ in range(n_retries + 1):
try:
result = operation()
error = None
status = ydb.StatusCode.SUCCESS
except ydb.issues.Error as e:
result = None
error = e
status = error.status
allure.attach(f'{repr(status)}: {error}', 'request status', allure.attachment_type.TEXT)

if status in expected_status:
return result
if status not in retriable_status:
pytest.fail(f'Unexpected status: must be in {repr(expected_status)}, but get {repr(error or status)}')
sleep(3)
pytest.fail(f'Retries exceeded with unexpected status: must be in {repr(expected_status)}, but get {repr(error or status)}')

def _bulk_upsert_impl(
self, tablename: str, data_generator: ScenarioTestHelper.IDataGenerator, expected_status: ydb.StatusCode | Set[ydb.StatusCode]
Expand Down Expand Up @@ -302,6 +326,8 @@ def execute_scheme_query(
self,
yqlble: ScenarioTestHelper.IYqlble,
expected_status: ydb.StatusCode | Set[ydb.StatusCode] = ydb.StatusCode.SUCCESS,
retries=0,
retriable_status: ydb.StatusCode | Set[ydb.StatusCode] = DEFAULT_RETRIABLE_ERRORS,
comment: str = '',
) -> None:
"""Run a schema query on the database under test.
Expand Down Expand Up @@ -330,7 +356,7 @@ def execute_scheme_query(
yql = yqlble.to_yql(self.test_context)
allure.attach(yql, 'request', allure.attachment_type.TEXT)
self._run_with_expected_status(
lambda: YdbCluster.get_ydb_driver().table_client.session().create().execute_scheme(yql), expected_status
lambda: YdbCluster.get_ydb_driver().table_client.session().create().execute_scheme(yql), expected_status, retriable_status, retries
)

@classmethod
Expand Down Expand Up @@ -524,6 +550,22 @@ def get_table_rows_count(self, tablename: str, comment: str = '') -> int:
result_set = self.execute_scan_query(f'SELECT count(*) FROM `{self.get_full_path(tablename)}`')
return result_set.result_set.rows[0][0]

@allure.step('Describe table {path}')
def describe_table(self, path: str, settings: ydb.DescribeTableSettings = None) -> ydb.TableSchemeEntry:
"""Get table description.
Args:
path: Relative path to a table.
settings: DescribeTableSettings.
Returns:
TableSchemeEntry object.
"""

return self._run_with_expected_status(
lambda: YdbCluster.get_ydb_driver().table_client.session().create().describe_table(self.get_full_path(path), settings), ydb.StatusCode.SUCCESS
)

@allure.step('List path {path}')
def list_path(self, path: str) -> List[ydb.SchemeEntry]:
"""Recursively describe the path in the database under test.
Expand Down
Loading

0 comments on commit 1c09c5d

Please sign in to comment.