From d44a9b49dd9025f2529276ad1d4cb54a821e5800 Mon Sep 17 00:00:00 2001 From: Aleksei Filatov Date: Wed, 21 Feb 2024 11:46:32 +0300 Subject: [PATCH 1/2] Add total size counting for object-storage clean --- ch_tools/chadmin/cli/object_storage_group.py | 94 ++++++++++++++++---- tests/features/object_storage.feature | 72 ++++++++------- 2 files changed, 118 insertions(+), 48 deletions(-) diff --git a/ch_tools/chadmin/cli/object_storage_group.py b/ch_tools/chadmin/cli/object_storage_group.py index 881e661e..718b7dfc 100644 --- a/ch_tools/chadmin/cli/object_storage_group.py +++ b/ch_tools/chadmin/cli/object_storage_group.py @@ -1,10 +1,12 @@ import logging +from dataclasses import dataclass from datetime import datetime, timedelta, timezone from tempfile import TemporaryFile from typing import List, Optional import click from click import Context, group, option, pass_context +from humanfriendly import format_size from ch_tools.chadmin.cli import get_clickhouse_config from ch_tools.chadmin.internal.object_storage import ( @@ -12,6 +14,7 @@ s3_object_storage_iterator, ) from ch_tools.chadmin.internal.utils import execute_query +from ch_tools.common.cli.formatting import print_response from ch_tools.common.cli.parameters import TimeSpanParamType from ch_tools.common.clickhouse.config.storage_configuration import S3DiskConfiguration @@ -27,6 +30,17 @@ STREAM_TIMEOUT = 10 * 60 +@dataclass +class ObjListItem: + path: str + size: int + + @classmethod + def from_tab_separated(cls, value: str) -> "ObjListItem": + path, size = value.split("\t") + return cls(path, int(size)) + + @group("object-storage") @option( "-d", @@ -137,7 +151,7 @@ def clean_command( try: execute_query( ctx, - f"CREATE TABLE IF NOT EXISTS {listing_table} (obj_path String) ENGINE MergeTree ORDER BY obj_path", + f"CREATE TABLE IF NOT EXISTS {listing_table} (obj_path String, obj_size UInt64) ENGINE MergeTree ORDER BY obj_path", ) _clean_object_storage( ctx, @@ -153,7 +167,7 @@ def clean_command( finally: if not keep_paths: execute_query( - ctx, f"TRUNCATE TABLE IF EXISTS {listing_table}", format_=None + ctx, f"DROP TABLE IF EXISTS {listing_table} SYNC", format_=None ) @@ -175,9 +189,12 @@ def _clean_object_storage( prefix = object_name_prefix or disk_conf.prefix if not use_saved_list: - click.echo( - f"Collecting objects... (Disk: '{disk_conf.name}', Endpoint '{disk_conf.endpoint_url}', " - f"Bucket: {disk_conf.bucket_name}, Prefix: '{prefix}')" + logging.info( + "Collecting objects... (Disk: '%s', Endpoint '%s', Bucket: '%s', Prefix: '%s')", + disk_conf.name, + disk_conf.endpoint_url, + disk_conf.bucket_name, + prefix, ) _traverse_object_storage(ctx, listing_table, from_time, to_time, prefix) @@ -188,7 +205,7 @@ def _clean_object_storage( ) antijoin_query = f""" - SELECT obj_path FROM {listing_table} AS object_storage + SELECT obj_path, obj_size FROM {listing_table} AS object_storage LEFT ANTI JOIN {remote_data_paths_table} AS object_table ON object_table.remote_path = object_storage.obj_path AND object_table.disk_name = '{disk_conf.name}' @@ -196,11 +213,12 @@ def _clean_object_storage( logging.info("Antijoin query: %s", antijoin_query) if dry_run: - click.echo("Counting orphaned objects...") + logging.info("Counting orphaned objects...") else: - click.echo("Deleting orphaned objects...") + logging.info("Deleting orphaned objects...") deleted = 0 + total_size = 0 with TemporaryFile() as keys_file: with execute_query( ctx, antijoin_query, stream=True, format_="TabSeparated" @@ -210,11 +228,51 @@ def _clean_object_storage( keys_file.write(chunk) keys_file.seek(0) # rewind file pointer to the beginning - keys = (line.decode().strip() for line in keys_file) - deleted = cleanup_s3_object_storage(disk_conf, keys, dry_run) - click.echo( - f"{'Would delete' if dry_run else 'Deleted'} {deleted} objects from bucket [{disk_conf.bucket_name}] with prefix {prefix}" + # Generator producing keys from temporary file with counting of statistics + def keys(): + nonlocal deleted, total_size + for line in keys_file: + obj = ObjListItem.from_tab_separated(line.decode().strip()) + yield obj.path + deleted += 1 + total_size += obj.size + + cleanup_s3_object_storage(disk_conf, keys(), dry_run) + + logging.info( + "%s %s objects with total size %s from bucket [%s] with prefix %s", + "Would delete" if dry_run else "Deleted", + deleted, + format_size(total_size, binary=True), + disk_conf.bucket_name, + prefix, + ) + _print_response(ctx, dry_run, deleted, total_size) + + +def _print_response(ctx: Context, dry_run: bool, deleted: int, total_size: int) -> None: + """ + Outputs result of cleaning. + """ + # List of dicts for print_response() + clean_stats = [ + {"WouldDelete" if dry_run else "Deleted": deleted, "TotalSize": total_size} + ] + + def _table_formatter(stats): + result = {} + + if "Deleted" in stats: + result["Deleted"] = stats["Deleted"] + if "WouldDeleted" in stats: + result["WouldDeleted"] = stats["WouldDeleted"] + result["TotalSize"] = format_size(stats["TotalSize"], binary=True) + + return result + + print_response( + ctx, clean_stats, default_format="table", table_formatter=_table_formatter ) @@ -228,7 +286,7 @@ def _traverse_object_storage( """ Traverse S3 disk's bucket and put object names to the ClickHouse table. """ - obj_paths_batch = [] + obj_paths_batch: List[ObjListItem] = [] counter = 0 now = datetime.now(timezone.utc) @@ -240,7 +298,7 @@ def _traverse_object_storage( if from_time is not None and obj.last_modified < now - from_time: continue - obj_paths_batch.append(obj.key) + obj_paths_batch.append(ObjListItem(obj.key, obj.size)) counter += 1 if len(obj_paths_batch) >= INSERT_BATCH_SIZE: _insert_listing_batch(ctx, obj_paths_batch, listing_table) @@ -250,19 +308,19 @@ def _traverse_object_storage( if obj_paths_batch: _insert_listing_batch(ctx, obj_paths_batch, listing_table) - click.echo(f"Collected {counter} objects") + logging.info("Collected %s objects", counter) def _insert_listing_batch( - ctx: Context, obj_paths_batch: List[str], listing_table: str + ctx: Context, obj_paths_batch: List[ObjListItem], listing_table: str ) -> None: """ Insert batch of object names to the listing table. """ - batch_values = ",".join(f"('{obj_path}')" for obj_path in obj_paths_batch) + batch_values = ",".join(f"('{item.path}',{item.size})" for item in obj_paths_batch) execute_query( ctx, - f"INSERT INTO {listing_table} (obj_path) VALUES {batch_values}", + f"INSERT INTO {listing_table} (obj_path, obj_size) VALUES {batch_values}", format_=None, ) diff --git a/tests/features/object_storage.feature b/tests/features/object_storage.feature index 019e369f..ea113c77 100644 --- a/tests/features/object_storage.feature +++ b/tests/features/object_storage.feature @@ -21,37 +21,41 @@ Feature: chadmin object-storage commands Scenario: Dry-run clean on one replica with guard period When we execute command on clickhouse01 """ - chadmin object-storage clean --dry-run + chadmin --format yaml object-storage clean --dry-run """ Then we get response contains """ - Would delete 0 objects from bucket [cloud-storage-test] + - WouldDelete: 0 + TotalSize: 0 """ When we execute command on clickhouse01 """ - chadmin object-storage clean --to-time 0h --dry-run + chadmin --format yaml object-storage clean --to-time 0h --dry-run """ Then we get response matches """ - Would delete [1-9][0-9]* objects from bucket \[cloud-storage-test\] + - WouldDelete: [1-9][0-9]* + TotalSize: [1-9][0-9]* """ Scenario: Dry-run clean on cluster with guard period When we execute command on clickhouse01 """ - chadmin object-storage clean --dry-run --on-cluster + chadmin --format yaml object-storage clean --dry-run --on-cluster """ Then we get response contains """ - Would delete 0 objects from bucket [cloud-storage-test] + - WouldDelete: 0 + TotalSize: 0 """ When we execute command on clickhouse01 """ - chadmin object-storage clean --to-time 0h --dry-run --on-cluster + chadmin --format yaml object-storage clean --to-time 0h --dry-run --on-cluster """ Then we get response contains """ - Would delete 0 objects from bucket [cloud-storage-test] + - WouldDelete: 0 + TotalSize: 0 """ Scenario: Clean orphaned objects @@ -63,86 +67,93 @@ Feature: chadmin object-storage commands """ When we execute command on clickhouse01 """ - chadmin object-storage clean --dry-run --to-time 0h --on-cluster + chadmin --format yaml object-storage clean --dry-run --to-time 0h --on-cluster """ Then we get response contains """ - Would delete 1 objects from bucket [cloud-storage-test] + - WouldDelete: 1 + TotalSize: 1 """ When we execute command on clickhouse01 """ - chadmin object-storage clean --to-time 0h --on-cluster + chadmin --format yaml object-storage clean --to-time 0h --on-cluster """ Then we get response contains """ - Deleted 1 objects from bucket [cloud-storage-test] + - Deleted: 1 + TotalSize: 1 """ And path does not exist in S3 """ bucket: cloud-storage-test path: /data/orpaned_object.tsv """ - + Scenario: Clean many orphaned objects When we put 100 objects in S3 """ bucket: cloud-storage-test path: /data/orpaned_object-{} - data: '1' + data: '10' """ When we execute command on clickhouse01 """ - chadmin object-storage clean --dry-run --to-time 0h --on-cluster + chadmin --format yaml object-storage clean --dry-run --to-time 0h --on-cluster """ Then we get response contains """ - Would delete 100 objects from bucket [cloud-storage-test] + - WouldDelete: 100 + TotalSize: 200 """ When we execute command on clickhouse01 """ - chadmin object-storage clean --to-time 0h --on-cluster + chadmin --format yaml object-storage clean --to-time 0h --on-cluster """ Then we get response contains """ - Deleted 100 objects from bucket [cloud-storage-test] + - Deleted: 100 + TotalSize: 200 """ When we execute command on clickhouse01 """ - chadmin object-storage clean --to-time 0h --dry-run --on-cluster + chadmin --format yaml object-storage clean --to-time 0h --dry-run --on-cluster """ Then we get response contains """ - Would delete 0 objects from bucket [cloud-storage-test] + - WouldDelete: 0 + TotalSize: 0 """ - + Scenario: Clean orphaned objects with prefix When we put object in S3 """ bucket: cloud-storage-test path: /data_1/orpaned_object.tsv - data: '1' + data: '10' """ When we put object in S3 """ bucket: cloud-storage-test path: /data_2/orpaned_object.tsv - data: '1' + data: '100' """ When we execute command on clickhouse01 """ - chadmin object-storage clean --dry-run --to-time 0h --on-cluster --prefix "data_1" + chadmin --format yaml object-storage clean --dry-run --to-time 0h --on-cluster --prefix "data_1" """ Then we get response contains """ - Would delete 1 objects from bucket [cloud-storage-test] + - WouldDelete: 1 + TotalSize: 2 """ When we execute command on clickhouse01 """ - chadmin object-storage clean --to-time 0h --on-cluster --prefix "data_1" + chadmin --format yaml object-storage clean --to-time 0h --on-cluster --prefix "data_1" """ Then we get response contains """ - Deleted 1 objects from bucket [cloud-storage-test] + - Deleted: 1 + TotalSize: 2 """ And path does not exist in S3 """ @@ -151,9 +162,10 @@ Feature: chadmin object-storage commands """ When we execute command on clickhouse01 """ - chadmin object-storage clean --dry-run --to-time 0h --on-cluster --prefix "data_2" + chadmin --format yaml object-storage clean --dry-run --to-time 0h --on-cluster --prefix "data_2" """ Then we get response contains """ - Would delete 1 objects from bucket [cloud-storage-test] - """ \ No newline at end of file + - WouldDelete: 1 + TotalSize: 3 + """ From 9ed73898cf8a35b6866aefa3b3c8646620afddc7 Mon Sep 17 00:00:00 2001 From: Aleksei Filatov Date: Wed, 21 Feb 2024 15:07:13 +0300 Subject: [PATCH 2/2] [rev.1] --- ch_tools/chadmin/cli/object_storage_group.py | 27 ++++--------------- .../internal/object_storage/__init__.py | 1 + .../internal/object_storage/obj_list_item.py | 16 +++++++++++ .../internal/object_storage/s3_cleanup.py | 17 +++++++----- 4 files changed, 32 insertions(+), 29 deletions(-) create mode 100644 ch_tools/chadmin/internal/object_storage/obj_list_item.py diff --git a/ch_tools/chadmin/cli/object_storage_group.py b/ch_tools/chadmin/cli/object_storage_group.py index 718b7dfc..7540191c 100644 --- a/ch_tools/chadmin/cli/object_storage_group.py +++ b/ch_tools/chadmin/cli/object_storage_group.py @@ -1,5 +1,4 @@ import logging -from dataclasses import dataclass from datetime import datetime, timedelta, timezone from tempfile import TemporaryFile from typing import List, Optional @@ -10,6 +9,7 @@ from ch_tools.chadmin.cli import get_clickhouse_config from ch_tools.chadmin.internal.object_storage import ( + ObjListItem, cleanup_s3_object_storage, s3_object_storage_iterator, ) @@ -30,17 +30,6 @@ STREAM_TIMEOUT = 10 * 60 -@dataclass -class ObjListItem: - path: str - size: int - - @classmethod - def from_tab_separated(cls, value: str) -> "ObjListItem": - path, size = value.split("\t") - return cls(path, int(size)) - - @group("object-storage") @option( "-d", @@ -229,16 +218,10 @@ def _clean_object_storage( keys_file.seek(0) # rewind file pointer to the beginning - # Generator producing keys from temporary file with counting of statistics - def keys(): - nonlocal deleted, total_size - for line in keys_file: - obj = ObjListItem.from_tab_separated(line.decode().strip()) - yield obj.path - deleted += 1 - total_size += obj.size - - cleanup_s3_object_storage(disk_conf, keys(), dry_run) + keys = ( + ObjListItem.from_tab_separated(line.decode().strip()) for line in keys_file + ) + deleted, total_size = cleanup_s3_object_storage(disk_conf, keys, dry_run) logging.info( "%s %s objects with total size %s from bucket [%s] with prefix %s", diff --git a/ch_tools/chadmin/internal/object_storage/__init__.py b/ch_tools/chadmin/internal/object_storage/__init__.py index 7199e671..19c719fd 100644 --- a/ch_tools/chadmin/internal/object_storage/__init__.py +++ b/ch_tools/chadmin/internal/object_storage/__init__.py @@ -1,3 +1,4 @@ +from ch_tools.chadmin.internal.object_storage.obj_list_item import ObjListItem from ch_tools.chadmin.internal.object_storage.s3_cleanup import ( cleanup_s3_object_storage, ) diff --git a/ch_tools/chadmin/internal/object_storage/obj_list_item.py b/ch_tools/chadmin/internal/object_storage/obj_list_item.py new file mode 100644 index 00000000..dfa97e1e --- /dev/null +++ b/ch_tools/chadmin/internal/object_storage/obj_list_item.py @@ -0,0 +1,16 @@ +from dataclasses import dataclass + + +@dataclass +class ObjListItem: + """ + Item of object storage listing. + """ + + path: str + size: int + + @classmethod + def from_tab_separated(cls, value: str) -> "ObjListItem": + path, size = value.split("\t") + return cls(path, int(size)) diff --git a/ch_tools/chadmin/internal/object_storage/s3_cleanup.py b/ch_tools/chadmin/internal/object_storage/s3_cleanup.py index 358f8abc..5f5178b4 100644 --- a/ch_tools/chadmin/internal/object_storage/s3_cleanup.py +++ b/ch_tools/chadmin/internal/object_storage/s3_cleanup.py @@ -1,8 +1,9 @@ -from typing import Any, Iterator, List +from typing import Any, Iterator, List, Tuple import boto3 -from botocore.client import Config # type: ignore[import] +from botocore.client import Config +from ch_tools.chadmin.internal.object_storage import ObjListItem from ch_tools.chadmin.internal.utils import chunked from ch_tools.common.clickhouse.config.storage_configuration import S3DiskConfiguration @@ -10,8 +11,8 @@ def cleanup_s3_object_storage( - disk: S3DiskConfiguration, keys: Iterator[str], dry_run: bool = False -) -> int: + disk: S3DiskConfiguration, keys: Iterator[ObjListItem], dry_run: bool = False +) -> Tuple[int, int]: s3 = boto3.resource( "s3", endpoint_url=disk.endpoint_url, @@ -21,15 +22,17 @@ def cleanup_s3_object_storage( ) bucket = s3.Bucket(disk.bucket_name) deleted = 0 + total_size = 0 for chunk in chunked(keys, BULK_DELETE_CHUNK_SIZE): if not dry_run: _bulk_delete(bucket, chunk) deleted += len(chunk) + total_size += sum(item.size for item in chunk) - return deleted + return deleted, total_size -def _bulk_delete(bucket: Any, keys: List[str]) -> None: - objects = [{"Key": key} for key in keys] +def _bulk_delete(bucket: Any, items: List[ObjListItem]) -> None: + objects = [{"Key": item.path} for item in items] bucket.delete_objects(Delete={"Objects": objects, "Quiet": False})