From 68d0e27ad2120fa7548c5c801245d93080540210 Mon Sep 17 00:00:00 2001 From: Egor Medvedev Date: Wed, 13 Dec 2023 07:45:55 +0000 Subject: [PATCH] Delete objects by path from s3 object storage --- ch_tools/chadmin/cli/object_storage_group.py | 40 ++++++++++++------- .../internal/object_storage/s3_iterator.py | 17 +++++--- ch_tools/chadmin/internal/utils.py | 6 ++- 3 files changed, 42 insertions(+), 21 deletions(-) diff --git a/ch_tools/chadmin/cli/object_storage_group.py b/ch_tools/chadmin/cli/object_storage_group.py index 18e288cb..dabd8b63 100644 --- a/ch_tools/chadmin/cli/object_storage_group.py +++ b/ch_tools/chadmin/cli/object_storage_group.py @@ -45,14 +45,14 @@ def get_disk_metadata_paths(disk_name: str) -> List[Path]: exists=True, file_okay=True, dir_okay=False, readable=True, path_type=Path ), default=STORAGE_POLICY_CONFIG_PATH, - help="ClickHouse storage policy config", + help="ClickHouse storage policy config.", ) @option( "-d", "--disk", "disk_name", default="object_storage", - help="S3 disk name", + help="S3 disk name.", ) @pass_context def object_storage_group(ctx: Context, config_path: Path, disk_name: str) -> None: @@ -73,42 +73,42 @@ def object_storage_group(ctx: Context, config_path: Path, disk_name: str) -> Non "orphaned", is_flag=True, default=False, - help="List objects that are not referenced in the metadata", + help="List objects that are not referenced in the metadata.", ) @option( "-p", "--object-name-prefix", "object_name_prefix", default="", - help="Additional prefix of object name using for listing", + help="Additional prefix of object name using for listing.", ) @option( "-l", "--limit", "limit", type=int, - help="Return at most this many objects", + help="Return at most this many objects.", ) @option( "-f", "--dump-file", "dump_file", type=click.Path(path_type=Path), - help="Dump result to the file instead of STDOUT", + help="Dump result to the file instead of STDOUT.", ) @option( "-c", "--compressed", "compressed", is_flag=True, - help="Compress an output using GZIP format", + help="Compress an output using GZIP format.", ) @option( "-q", "--quiet", "quiet", is_flag=True, - help="Output only newline delimited object keys", + help="Output only newline delimited object keys.", ) @option( "--from-time", @@ -117,7 +117,7 @@ def object_storage_group(ctx: Context, config_path: Path, disk_name: str) -> Non type=TimeSpanParamType(), help=( "Begin of inspecting interval in human-friendly format. " - "Objects with a modification time falling interval [now - from_time, now - to_time] are considered" + "Objects with a modification time falling interval [now - from_time, now - to_time] are considered." ), ) @option( @@ -159,7 +159,9 @@ def list_objects( object_key_to_metadata = collect_metadata( get_disk_metadata_paths(disk_conf.name) ) - for name, obj in s3_object_storage_iterator(disk_conf, object_name_prefix): + for name, obj in s3_object_storage_iterator( + disk_conf, object_name_prefix=object_name_prefix + ): if limit is not None and counter >= limit: break @@ -180,12 +182,17 @@ def list_objects( @object_storage_group.command("clean") +@option( + "--prefix", + "prefix", + help="Prefix path to delete objects. If not empty it has more priority than other options.", +) @option( "-f", "--file", "file", type=click.File("rb"), - help="File containing S3 object keys delimited by newlines", + help="File containing S3 object keys delimited by newlines.", default=sys.stdin.buffer, show_default="STDIN", ) @@ -194,18 +201,23 @@ def list_objects( "--compressed", "compressed", is_flag=True, - help="Input stream is compressed using GZIP format", + help="Input stream is compressed using GZIP format.", ) @pass_context -def clean_object_storage(ctx, file, compressed): +def clean_object_storage(ctx, prefix, file, compressed): """ Clean up needless S3 objects. """ disk_conf: S3DiskConfiguration = ctx.obj["disk_configuration"] + file = TextIOWrapper(file) if compressed: file = GzipFile(fileobj=file) - file = TextIOWrapper(file) + if prefix: + file = ( + obj.key + for _, obj in s3_object_storage_iterator(disk_conf, path_prefix=prefix, skip_ignoring=True) + ) lines_stripped = ( line.rstrip() for line in file diff --git a/ch_tools/chadmin/internal/object_storage/s3_iterator.py b/ch_tools/chadmin/internal/object_storage/s3_iterator.py index fe37ba3a..a81a200d 100644 --- a/ch_tools/chadmin/internal/object_storage/s3_iterator.py +++ b/ch_tools/chadmin/internal/object_storage/s3_iterator.py @@ -1,4 +1,4 @@ -from typing import Any, Iterator, Tuple +from typing import Any, Iterator, Optional, Tuple import boto3 # type: ignore[import] from botocore.client import Config # type: ignore[import] @@ -12,7 +12,11 @@ def s3_object_storage_iterator( - disk: S3DiskConfiguration, object_name_prefix: str + disk: S3DiskConfiguration, + *, + path_prefix: Optional[str] = None, + object_name_prefix: str = "", + skip_ignoring: bool = False ) -> Iterator[Tuple[str, ObjectSummary]]: s3 = boto3.resource( "s3", @@ -23,10 +27,13 @@ def s3_object_storage_iterator( ) bucket = s3.Bucket(disk.bucket_name) - for obj in bucket.objects.filter(Prefix=disk.prefix + object_name_prefix): - name: str = obj.key[len(disk.prefix) :] + if not path_prefix: + path_prefix = disk.prefix - if _is_ignored(name): + for obj in bucket.objects.filter(Prefix=path_prefix + object_name_prefix): + name: str = obj.key[len(path_prefix) :] + + if not skip_ignoring and _is_ignored(name): continue yield name, obj diff --git a/ch_tools/chadmin/internal/utils.py b/ch_tools/chadmin/internal/utils.py index c9dfff0d..726fbbc1 100644 --- a/ch_tools/chadmin/internal/utils.py +++ b/ch_tools/chadmin/internal/utils.py @@ -47,6 +47,8 @@ def chunked(iterable: Iterable, n: int) -> Iterator[list]: raise ValueError("n must be at least one") it = iter(iterable) - chunk = list(islice(it, n)) - while chunk: + while True: + chunk = list(islice(it, n)) + if not chunk: + break yield chunk