Skip to content

Commit

Permalink
Delete objects by path from s3 object storage
Browse files Browse the repository at this point in the history
  • Loading branch information
MedvedewEM committed Dec 13, 2023
1 parent 37f2c1b commit 68d0e27
Show file tree
Hide file tree
Showing 3 changed files with 42 additions and 21 deletions.
40 changes: 26 additions & 14 deletions ch_tools/chadmin/cli/object_storage_group.py
Original file line number Diff line number Diff line change
Expand Up @@ -45,14 +45,14 @@ def get_disk_metadata_paths(disk_name: str) -> List[Path]:
exists=True, file_okay=True, dir_okay=False, readable=True, path_type=Path
),
default=STORAGE_POLICY_CONFIG_PATH,
help="ClickHouse storage policy config",
help="ClickHouse storage policy config.",
)
@option(
"-d",
"--disk",
"disk_name",
default="object_storage",
help="S3 disk name",
help="S3 disk name.",
)
@pass_context
def object_storage_group(ctx: Context, config_path: Path, disk_name: str) -> None:
Expand All @@ -73,42 +73,42 @@ def object_storage_group(ctx: Context, config_path: Path, disk_name: str) -> Non
"orphaned",
is_flag=True,
default=False,
help="List objects that are not referenced in the metadata",
help="List objects that are not referenced in the metadata.",
)
@option(
"-p",
"--object-name-prefix",
"object_name_prefix",
default="",
help="Additional prefix of object name using for listing",
help="Additional prefix of object name using for listing.",
)
@option(
"-l",
"--limit",
"limit",
type=int,
help="Return at most this many objects",
help="Return at most this many objects.",
)
@option(
"-f",
"--dump-file",
"dump_file",
type=click.Path(path_type=Path),
help="Dump result to the file instead of STDOUT",
help="Dump result to the file instead of STDOUT.",
)
@option(
"-c",
"--compressed",
"compressed",
is_flag=True,
help="Compress an output using GZIP format",
help="Compress an output using GZIP format.",
)
@option(
"-q",
"--quiet",
"quiet",
is_flag=True,
help="Output only newline delimited object keys",
help="Output only newline delimited object keys.",
)
@option(
"--from-time",
Expand All @@ -117,7 +117,7 @@ def object_storage_group(ctx: Context, config_path: Path, disk_name: str) -> Non
type=TimeSpanParamType(),
help=(
"Begin of inspecting interval in human-friendly format. "
"Objects with a modification time falling interval [now - from_time, now - to_time] are considered"
"Objects with a modification time falling interval [now - from_time, now - to_time] are considered."
),
)
@option(
Expand Down Expand Up @@ -159,7 +159,9 @@ def list_objects(
object_key_to_metadata = collect_metadata(
get_disk_metadata_paths(disk_conf.name)
)
for name, obj in s3_object_storage_iterator(disk_conf, object_name_prefix):
for name, obj in s3_object_storage_iterator(
disk_conf, object_name_prefix=object_name_prefix
):
if limit is not None and counter >= limit:
break

Expand All @@ -180,12 +182,17 @@ def list_objects(


@object_storage_group.command("clean")
@option(
"--prefix",
"prefix",
help="Prefix path to delete objects. If not empty it has more priority than other options.",
)
@option(
"-f",
"--file",
"file",
type=click.File("rb"),
help="File containing S3 object keys delimited by newlines",
help="File containing S3 object keys delimited by newlines.",
default=sys.stdin.buffer,
show_default="STDIN",
)
Expand All @@ -194,18 +201,23 @@ def list_objects(
"--compressed",
"compressed",
is_flag=True,
help="Input stream is compressed using GZIP format",
help="Input stream is compressed using GZIP format.",
)
@pass_context
def clean_object_storage(ctx, file, compressed):
def clean_object_storage(ctx, prefix, file, compressed):
"""
Clean up needless S3 objects.
"""
disk_conf: S3DiskConfiguration = ctx.obj["disk_configuration"]

file = TextIOWrapper(file)
if compressed:
file = GzipFile(fileobj=file)
file = TextIOWrapper(file)
if prefix:
file = (
obj.key
for _, obj in s3_object_storage_iterator(disk_conf, path_prefix=prefix, skip_ignoring=True)
)

lines_stripped = (
line.rstrip() for line in file
Expand Down
17 changes: 12 additions & 5 deletions ch_tools/chadmin/internal/object_storage/s3_iterator.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
from typing import Any, Iterator, Tuple
from typing import Any, Iterator, Optional, Tuple

import boto3 # type: ignore[import]
from botocore.client import Config # type: ignore[import]
Expand All @@ -12,7 +12,11 @@


def s3_object_storage_iterator(
disk: S3DiskConfiguration, object_name_prefix: str
disk: S3DiskConfiguration,
*,
path_prefix: Optional[str] = None,
object_name_prefix: str = "",
skip_ignoring: bool = False
) -> Iterator[Tuple[str, ObjectSummary]]:
s3 = boto3.resource(
"s3",
Expand All @@ -23,10 +27,13 @@ def s3_object_storage_iterator(
)
bucket = s3.Bucket(disk.bucket_name)

for obj in bucket.objects.filter(Prefix=disk.prefix + object_name_prefix):
name: str = obj.key[len(disk.prefix) :]
if not path_prefix:
path_prefix = disk.prefix

if _is_ignored(name):
for obj in bucket.objects.filter(Prefix=path_prefix + object_name_prefix):
name: str = obj.key[len(path_prefix) :]

if not skip_ignoring and _is_ignored(name):
continue

yield name, obj
Expand Down
6 changes: 4 additions & 2 deletions ch_tools/chadmin/internal/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,8 @@ def chunked(iterable: Iterable, n: int) -> Iterator[list]:
raise ValueError("n must be at least one")
it = iter(iterable)

chunk = list(islice(it, n))
while chunk:
while True:
chunk = list(islice(it, n))
if not chunk:
break
yield chunk

0 comments on commit 68d0e27

Please sign in to comment.