From ba20cf78939bf09f27934edfe51573ffe0becb7d Mon Sep 17 00:00:00 2001 From: Aleksei Filatov Date: Fri, 26 Jan 2024 12:23:14 +0300 Subject: [PATCH] Make script output more informative --- ch_tools/chadmin/cli/object_storage_group.py | 68 ++++++++++---- .../internal/object_storage/__init__.py | 3 - .../object_storage/s3_local_metadata.py | 92 ------------------- tests/features/object_storage.feature | 16 ++-- 4 files changed, 60 insertions(+), 119 deletions(-) delete mode 100644 ch_tools/chadmin/internal/object_storage/s3_local_metadata.py diff --git a/ch_tools/chadmin/cli/object_storage_group.py b/ch_tools/chadmin/cli/object_storage_group.py index 65087c03..deb8f83b 100644 --- a/ch_tools/chadmin/cli/object_storage_group.py +++ b/ch_tools/chadmin/cli/object_storage_group.py @@ -21,7 +21,8 @@ # Prefix for a listing table name LISTING_TABLE_PREFIX = "listing_objects_from_" # Batch size for inserts in a listing table -INSERT_BATCH_SIZE = 1000 +# Set not very big value due to default ClickHouse 'http_max_field_value_size' settings value 128Kb +INSERT_BATCH_SIZE = 100 @group("object-storage") @@ -97,7 +98,7 @@ def object_storage_group(ctx: Context, disk_name: str) -> None: help=("Do not delete collected paths of objects from object storage."), ) @pass_context -def clean_object_storage( +def clean_command( ctx: Context, object_name_prefix: str, from_time: Optional[timedelta], @@ -117,6 +118,49 @@ def clean_object_storage( ) disk_conf: S3DiskConfiguration = ctx.obj["disk_configuration"] + listing_table = LISTING_TABLE_PREFIX + disk_conf.name + # Create listing table for storing paths from object storage + try: + execute_query( + ctx, + f"CREATE TABLE IF NOT EXISTS {listing_table} (obj_path String) ENGINE MergeTree ORDER BY obj_path", + ) + _clean_object_storage( + ctx, + object_name_prefix, + from_time, + to_time, + on_cluster, + cluster_name, + dry_run, + listing_table, + ) + finally: + if not keep_paths: + execute_query( + ctx, f"TRUNCATE TABLE IF EXISTS {listing_table}", format_=None + ) + + +def _clean_object_storage( + ctx: Context, + object_name_prefix: str, + from_time: Optional[timedelta], + to_time: timedelta, + on_cluster: bool, + cluster_name: str, + dry_run: bool, + listing_table: str, +) -> None: + """ + Delete orphaned objects from object storage. + """ + disk_conf: S3DiskConfiguration = ctx.obj["disk_configuration"] + click.echo( + f"Collecting objects for S3 disk '{disk_conf.name}' with endpoint '{disk_conf.endpoint_url}' " + f"in bucket [{disk_conf.bucket_name}] with prefix '{disk_conf.prefix}'" + ) + _traverse_object_storage(ctx, listing_table, from_time, to_time, object_name_prefix) remote_data_paths_table = "system.remote_data_paths" if on_cluster: @@ -124,16 +168,6 @@ def clean_object_storage( f"clusterAllReplicas('{cluster_name}', {remote_data_paths_table})" ) - listing_table = LISTING_TABLE_PREFIX + disk_conf.name - # Create listing table for storing paths from object storage - execute_query( - ctx, - f"CREATE TABLE IF NOT EXISTS {listing_table} (obj_path String) ENGINE MergeTree ORDER BY obj_path", - ) - _traverse_object_storage( - ctx, listing_table, from_time, to_time, object_name_prefix - ) - antijoin_query = f""" SELECT obj_path FROM {listing_table} AS object_storage LEFT ANTI JOIN {remote_data_paths_table} AS object_table @@ -142,6 +176,11 @@ def clean_object_storage( """ logging.info("Antijoin query: %s", antijoin_query) + if dry_run: + click.echo("Counting orphaned objects...") + else: + click.echo("Deleting orphaned objects...") + deleted = 0 with execute_query( ctx, antijoin_query, stream=True, format_="TabSeparated" @@ -154,11 +193,8 @@ def clean_object_storage( else: deleted = cleanup_s3_object_storage(disk_conf, paths_to_delete) - if not keep_paths: - execute_query(ctx, f"TRUNCATE {listing_table}", format_=None) - click.echo( - f"{'Would delete' if dry_run else 'Deleted'} {deleted} objects from bucket [{disk_conf.bucket_name}]" + f"{'Would delete' if dry_run else 'Deleted'} {deleted} objects from bucket [{disk_conf.bucket_name}] with prefix {disk_conf.prefix}" ) diff --git a/ch_tools/chadmin/internal/object_storage/__init__.py b/ch_tools/chadmin/internal/object_storage/__init__.py index 22969a8f..7199e671 100644 --- a/ch_tools/chadmin/internal/object_storage/__init__.py +++ b/ch_tools/chadmin/internal/object_storage/__init__.py @@ -5,6 +5,3 @@ ObjectSummary, s3_object_storage_iterator, ) -from ch_tools.chadmin.internal.object_storage.s3_local_metadata import ( - S3ObjectLocalMetaData, -) diff --git a/ch_tools/chadmin/internal/object_storage/s3_local_metadata.py b/ch_tools/chadmin/internal/object_storage/s3_local_metadata.py deleted file mode 100644 index 44bfb477..00000000 --- a/ch_tools/chadmin/internal/object_storage/s3_local_metadata.py +++ /dev/null @@ -1,92 +0,0 @@ -import re -from dataclasses import dataclass -from pathlib import Path -from typing import List - -from typing_extensions import Self - -MAX_METADATA_FILE_SIZE = 10 * 1024 - - -@dataclass -class S3ObjectLocalInfo: - """ - Information about the S3 object stored locally in the metadata file. - """ - - key: str - size: int - - -@dataclass -class S3ObjectLocalMetaData: - """ - Parsed content of metadata file stored on the local disk. - """ - - version: int - total_size: int - objects: List[S3ObjectLocalInfo] - ref_counter: int - read_only: bool - - @classmethod - def from_string(cls, value: str) -> Self: - lines = value.splitlines() - idx = 0 - - matches = re.match(r"^[123]$", lines[idx]) - if not matches: - raise ValueError(f"Incorrect metadata version. Line: `{lines[idx]}`") - version = int(matches[0]) - idx += 1 - - matches = re.match(r"^(\d+)\s+(\d+)$", lines[idx]) - if not matches: - raise ValueError( - f"Incorrect metadata about the objects count and total size. Line: `{lines[idx]}`" - ) - object_count, total_size = int(matches[1]), int(matches[2]) - idx += 1 - - objects: List[S3ObjectLocalInfo] = [] - for _ in range(object_count): - matches = re.match(r"^(\d+)\s+(\S+)$", lines[idx]) - if not matches: - raise ValueError( - f"Incorrect metadata about object size and name. Line: `{lines[idx]}`" - ) - objects.append(S3ObjectLocalInfo(key=matches[2], size=int(matches[1]))) - idx += 1 - - matches = re.match(r"^\d+$", lines[idx]) - if not matches: - raise ValueError( - f"Incorrect metadata about refcounter. Line: `{lines[idx]}`" - ) - refcounter = int(lines[idx]) - idx += 1 - - matches = re.match("^[01]$", lines[idx]) - if not matches: - raise ValueError( - f"Incorrect metadata about readonly flag. Line: `{lines[idx]}`" - ) - read_only = bool(int(matches[0])) - - return cls( - version=version, - total_size=total_size, - objects=objects, - ref_counter=refcounter, - read_only=read_only, - ) - - @classmethod - def from_file(cls, path: Path) -> Self: - if path.stat().st_size > MAX_METADATA_FILE_SIZE: - raise ValueError( - f"Metadata file too large. Its size must not exceed {MAX_METADATA_FILE_SIZE} bytes" - ) - with path.open(encoding="latin-1") as file: - return cls.from_string(file.read()) diff --git a/tests/features/object_storage.feature b/tests/features/object_storage.feature index 74aa2b23..a8c34049 100644 --- a/tests/features/object_storage.feature +++ b/tests/features/object_storage.feature @@ -24,7 +24,7 @@ Feature: chadmin object-storage commands """ chadmin object-storage clean --dry-run """ - Then we get response + Then we get response contains """ Would delete 0 objects from bucket [cloud-storage-test] """ @@ -43,7 +43,7 @@ Feature: chadmin object-storage commands """ chadmin object-storage clean --dry-run --on-cluster """ - Then we get response + Then we get response contains """ Would delete 0 objects from bucket [cloud-storage-test] """ @@ -51,7 +51,7 @@ Feature: chadmin object-storage commands """ chadmin object-storage clean --to-time 0h --dry-run --on-cluster """ - Then we get response + Then we get response contains """ Would delete 0 objects from bucket [cloud-storage-test] """ @@ -68,7 +68,7 @@ Feature: chadmin object-storage commands """ chadmin object-storage clean --dry-run --to-time 0h --on-cluster """ - Then we get response + Then we get response contains """ Would delete 1 objects from bucket [cloud-storage-test] """ @@ -76,7 +76,7 @@ Feature: chadmin object-storage commands """ chadmin object-storage clean --to-time 0h --on-cluster """ - Then we get response + Then we get response contains """ Deleted 1 objects from bucket [cloud-storage-test] """ @@ -98,7 +98,7 @@ Feature: chadmin object-storage commands """ chadmin object-storage clean --dry-run --to-time 0h --on-cluster """ - Then we get response + Then we get response contains """ Would delete 100 objects from bucket [cloud-storage-test] """ @@ -106,7 +106,7 @@ Feature: chadmin object-storage commands """ chadmin object-storage clean --to-time 0h --on-cluster """ - Then we get response + Then we get response contains """ Deleted 100 objects from bucket [cloud-storage-test] """ @@ -114,7 +114,7 @@ Feature: chadmin object-storage commands """ chadmin object-storage clean --to-time 0h --dry-run --on-cluster """ - Then we get response + Then we get response contains """ Would delete 0 objects from bucket [cloud-storage-test] """