diff --git a/Makefile b/Makefile index 7fcf3cea..24a5e13a 100644 --- a/Makefile +++ b/Makefile @@ -184,7 +184,6 @@ test-unit: install-deps .PHONY: test-integration test-integration: install-deps build-python-packages cd $(TESTS_DIR) - $(POETRY) run $(PYTHON) -m env_control create $(POETRY) run behave --show-timings --stop --junit $(BEHAVE_ARGS) diff --git a/ch_tools/__init__.py b/ch_tools/__init__.py index 69866380..8526f52a 100644 --- a/ch_tools/__init__.py +++ b/ch_tools/__init__.py @@ -1,3 +1,3 @@ """A set of tools for administration and diagnostics of ClickHouse DBMS.""" -__version__ = "2.548.182050718" +__version__ = "1.0.0" diff --git a/ch_tools/chadmin/cli/object_storage_group.py b/ch_tools/chadmin/cli/object_storage_group.py index 09ceead4..e50e4d79 100644 --- a/ch_tools/chadmin/cli/object_storage_group.py +++ b/ch_tools/chadmin/cli/object_storage_group.py @@ -1,5 +1,6 @@ import logging from datetime import datetime, timedelta, timezone +from tempfile import TemporaryFile from typing import List, Optional import click @@ -18,12 +19,12 @@ # And for metadata for which object is not found in S3. # These objects are not counted if their last modified time fall in the interval from the moment of starting analyzing. DEFAULT_GUARD_INTERVAL = "24h" -# Prefix for a listing table name -LISTING_TABLE_PREFIX = "listing_objects_from_" # Batch size for inserts in a listing table # Set not very big value due to default ClickHouse 'http_max_field_value_size' settings value 128Kb # TODO: streaming upload in POST body while INSERT INSERT_BATCH_SIZE = 500 +# Use big enough timeout for stream HTTP query +STREAM_TIMEOUT = 10 * 60 @group("object-storage") @@ -53,7 +54,10 @@ def object_storage_group(ctx: Context, disk_name: str) -> None: "--object_name_prefix", "object_name_prefix", default="", - help="Additional prefix of object name used while listing bucket.", + help=( + "Prefix of object name used while listing bucket. By default its value is attempted to parse " + "from endpoint in clickhouse S3 disk config" + ), ) @option( "--from-time", @@ -98,6 +102,12 @@ def object_storage_group(ctx: Context, disk_name: str) -> None: is_flag=True, help=("Do not delete collected paths of objects from object storage."), ) +@option( + "--use-saved-list", + "use_saved_list", + is_flag=True, + help=("Use saved object list without traversing object storage again."), +) @pass_context def clean_command( ctx: Context, @@ -108,6 +118,7 @@ def clean_command( cluster_name: str, dry_run: bool, keep_paths: bool, + use_saved_list: bool, ) -> None: """ Clean orphaned S3 objects. @@ -119,7 +130,9 @@ def clean_command( ) disk_conf: S3DiskConfiguration = ctx.obj["disk_configuration"] - listing_table = LISTING_TABLE_PREFIX + disk_conf.name + config = ctx.obj["config"]["object_storage"]["clean"] + + listing_table = f"{config['listing_table_database']}.{config['listing_table_prefix']}{disk_conf.name}" # Create listing table for storing paths from object storage try: execute_query( @@ -135,6 +148,7 @@ def clean_command( cluster_name, dry_run, listing_table, + use_saved_list, ) finally: if not keep_paths: @@ -152,16 +166,20 @@ def _clean_object_storage( cluster_name: str, dry_run: bool, listing_table: str, + use_saved_list: bool, ) -> None: """ Delete orphaned objects from object storage. """ disk_conf: S3DiskConfiguration = ctx.obj["disk_configuration"] - click.echo( - f"Collecting objects for S3 disk '{disk_conf.name}' with endpoint '{disk_conf.endpoint_url}' " - f"in bucket [{disk_conf.bucket_name}] with prefix '{disk_conf.prefix}'" - ) - _traverse_object_storage(ctx, listing_table, from_time, to_time, object_name_prefix) + prefix = object_name_prefix or disk_conf.prefix + + if not use_saved_list: + click.echo( + f"Collecting objects... (Disk: '{disk_conf.name}', Endpoint '{disk_conf.endpoint_url}', " + f"Bucket: {disk_conf.bucket_name}, Prefix: '{prefix}')" + ) + _traverse_object_storage(ctx, listing_table, from_time, to_time, prefix) remote_data_paths_table = "system.remote_data_paths" if on_cluster: @@ -183,15 +201,20 @@ def _clean_object_storage( click.echo("Deleting orphaned objects...") deleted = 0 - with execute_query( - ctx, antijoin_query, stream=True, format_="TabSeparated" - ) as resp: - # make generator for lazy iterating - paths_to_delete = (line.decode() for line in resp.iter_lines()) - deleted = cleanup_s3_object_storage(disk_conf, paths_to_delete, dry_run) + with TemporaryFile() as keys_file: + with execute_query( + ctx, antijoin_query, stream=True, format_="TabSeparated" + ) as resp: + # Save response to the file by chunks + for chunk in resp.iter_content(chunk_size=8192): + keys_file.write(chunk) + + keys_file.seek(0) # rewind file pointer to the beginning + keys = (line.decode().strip() for line in keys_file) + deleted = cleanup_s3_object_storage(disk_conf, keys, dry_run) click.echo( - f"{'Would delete' if dry_run else 'Deleted'} {deleted} objects from bucket [{disk_conf.bucket_name}] with prefix {disk_conf.prefix}" + f"{'Would delete' if dry_run else 'Deleted'} {deleted} objects from bucket [{disk_conf.bucket_name}] with prefix {prefix}" ) @@ -209,7 +232,7 @@ def _traverse_object_storage( counter = 0 now = datetime.now(timezone.utc) - for _, obj in s3_object_storage_iterator( + for obj in s3_object_storage_iterator( ctx.obj["disk_configuration"], object_name_prefix=prefix ): if obj.last_modified > now - to_time: diff --git a/ch_tools/chadmin/internal/object_storage/s3_cleanup.py b/ch_tools/chadmin/internal/object_storage/s3_cleanup.py index 0bb40de7..358f8abc 100644 --- a/ch_tools/chadmin/internal/object_storage/s3_cleanup.py +++ b/ch_tools/chadmin/internal/object_storage/s3_cleanup.py @@ -6,7 +6,7 @@ from ch_tools.chadmin.internal.utils import chunked from ch_tools.common.clickhouse.config.storage_configuration import S3DiskConfiguration -BULK_DELETE_CHUNK_SIZE = 100 +BULK_DELETE_CHUNK_SIZE = 1000 def cleanup_s3_object_storage( diff --git a/ch_tools/chadmin/internal/object_storage/s3_iterator.py b/ch_tools/chadmin/internal/object_storage/s3_iterator.py index 53dde89f..252c7dfc 100644 --- a/ch_tools/chadmin/internal/object_storage/s3_iterator.py +++ b/ch_tools/chadmin/internal/object_storage/s3_iterator.py @@ -1,4 +1,4 @@ -from typing import Any, Iterator, Optional, Tuple +from typing import Any, Iterator import boto3 # type: ignore[import] from botocore.client import Config @@ -12,10 +12,9 @@ def s3_object_storage_iterator( disk: S3DiskConfiguration, *, - path_prefix: Optional[str] = None, object_name_prefix: str = "", skip_ignoring: bool = False -) -> Iterator[Tuple[str, ObjectSummary]]: +) -> Iterator[ObjectSummary]: s3 = boto3.resource( "s3", endpoint_url=disk.endpoint_url, @@ -25,17 +24,11 @@ def s3_object_storage_iterator( ) bucket = s3.Bucket(disk.bucket_name) - if not path_prefix: - path_prefix = disk.prefix - - for obj in bucket.objects.filter(Prefix=path_prefix + object_name_prefix): - name: str = obj.key[len(path_prefix) :] - - if not skip_ignoring and _is_ignored(name): + for obj in bucket.objects.filter(Prefix=object_name_prefix): + if not skip_ignoring and _is_ignored(obj.key): continue - - yield name, obj + yield obj def _is_ignored(name: str) -> bool: - return any(name.startswith(p) for p in IGNORED_OBJECT_NAME_PREFIXES) + return any(p in name for p in IGNORED_OBJECT_NAME_PREFIXES) diff --git a/ch_tools/chadmin/internal/utils.py b/ch_tools/chadmin/internal/utils.py index 07ef9c62..4f70ae35 100644 --- a/ch_tools/chadmin/internal/utils.py +++ b/ch_tools/chadmin/internal/utils.py @@ -16,6 +16,7 @@ def execute_query( dry_run=False, format_="default", stream=False, + settings=None, **kwargs ): """ @@ -32,6 +33,7 @@ def execute_query( echo=echo, dry_run=dry_run, stream=stream, + settings=settings, ) diff --git a/ch_tools/common/clickhouse/client/clickhouse_client.py b/ch_tools/common/clickhouse/client/clickhouse_client.py index c49b3b6e..9093e019 100644 --- a/ch_tools/common/clickhouse/client/clickhouse_client.py +++ b/ch_tools/common/clickhouse/client/clickhouse_client.py @@ -65,6 +65,7 @@ def query( echo: bool = False, dry_run: bool = False, stream: bool = False, + settings: Optional[dict] = None, ) -> Any: """ Execute query. @@ -82,6 +83,7 @@ def query( return None timeout = max(self._timeout, timeout or 0) + per_query_settings = settings or {} logging.debug("Executing query: %s", query) try: @@ -90,6 +92,7 @@ def query( params={ **self._settings, "query": query, + **per_query_settings, # overwrites previous settings }, json=post_data, timeout=timeout, diff --git a/ch_tools/common/config.py b/ch_tools/common/config.py index d2d35746..b5c012b6 100644 --- a/ch_tools/common/config.py +++ b/ch_tools/common/config.py @@ -19,6 +19,12 @@ "monitoring_user": None, "monitoring_password": None, }, + "object_storage": { + "clean": { + "listing_table_prefix": "listing_objects_from_", + "listing_table_database": "default", + } + }, } diff --git a/pyproject.toml b/pyproject.toml index 7e9aa2b6..37e4ccdb 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -4,7 +4,7 @@ build-backend = "poetry.core.masonry.api" [tool.poetry] name = "clickhouse-tools" -version = "2.548.182050718" +version = "1.0.0" license = "MIT" description = "clickhouse-tools is a set of tools for administration and diagnostics of ClickHouse DBMS." diff --git a/tests/environment.py b/tests/environment.py index 9e243a55..b4daad62 100644 --- a/tests/environment.py +++ b/tests/environment.py @@ -8,8 +8,6 @@ from modules.logs import save_logs from modules.utils import version_ge, version_lt -REQUIRE_VERSION_PREFIX_LEN = len("require_version_") - try: import ipdb as pdb except ImportError: diff --git a/tests/features/object_storage.feature b/tests/features/object_storage.feature index a8c34049..019e369f 100644 --- a/tests/features/object_storage.feature +++ b/tests/features/object_storage.feature @@ -18,7 +18,6 @@ Feature: chadmin object-storage commands """ Then S3 contains greater than 0 objects - @require_version_22.8 Scenario: Dry-run clean on one replica with guard period When we execute command on clickhouse01 """ @@ -37,7 +36,6 @@ Feature: chadmin object-storage commands Would delete [1-9][0-9]* objects from bucket \[cloud-storage-test\] """ - @require_version_22.8 Scenario: Dry-run clean on cluster with guard period When we execute command on clickhouse01 """ @@ -56,7 +54,6 @@ Feature: chadmin object-storage commands Would delete 0 objects from bucket [cloud-storage-test] """ - @require_version_22.8 Scenario: Clean orphaned objects When we put object in S3 """ @@ -86,12 +83,11 @@ Feature: chadmin object-storage commands path: /data/orpaned_object.tsv """ - @require_version_22.8 Scenario: Clean many orphaned objects When we put 100 objects in S3 """ bucket: cloud-storage-test - path: /data/orpaned_object.tsv + path: /data/orpaned_object-{} data: '1' """ When we execute command on clickhouse01 @@ -118,3 +114,46 @@ Feature: chadmin object-storage commands """ Would delete 0 objects from bucket [cloud-storage-test] """ + + Scenario: Clean orphaned objects with prefix + When we put object in S3 + """ + bucket: cloud-storage-test + path: /data_1/orpaned_object.tsv + data: '1' + """ + When we put object in S3 + """ + bucket: cloud-storage-test + path: /data_2/orpaned_object.tsv + data: '1' + """ + When we execute command on clickhouse01 + """ + chadmin object-storage clean --dry-run --to-time 0h --on-cluster --prefix "data_1" + """ + Then we get response contains + """ + Would delete 1 objects from bucket [cloud-storage-test] + """ + When we execute command on clickhouse01 + """ + chadmin object-storage clean --to-time 0h --on-cluster --prefix "data_1" + """ + Then we get response contains + """ + Deleted 1 objects from bucket [cloud-storage-test] + """ + And path does not exist in S3 + """ + bucket: cloud-storage-test + path: /data_1/orpaned_object.tsv + """ + When we execute command on clickhouse01 + """ + chadmin object-storage clean --dry-run --to-time 0h --on-cluster --prefix "data_2" + """ + Then we get response contains + """ + Would delete 1 objects from bucket [cloud-storage-test] + """ \ No newline at end of file diff --git a/tests/steps/s3.py b/tests/steps/s3.py index 56817950..be62a740 100644 --- a/tests/steps/s3.py +++ b/tests/steps/s3.py @@ -62,7 +62,7 @@ def step_put_file_count_in_s3(context, count): conf = get_step_data(context) s3_client = s3.S3Client(context, conf["bucket"]) for i in range(count): - path = f"{conf['path']}-{i}" + path = f"{conf['path'].format(i)}" s3_client.upload_data(conf["data"], path) assert s3_client.path_exists(path)