Skip to content

Commit

Permalink
Add total size counting for object-storage clean command (#104)
Browse files Browse the repository at this point in the history
* Add total size counting for object-storage clean

* [rev.1]
  • Loading branch information
aalexfvk authored Feb 21, 2024
1 parent deeefdf commit 4c7c9d9
Show file tree
Hide file tree
Showing 5 changed files with 128 additions and 55 deletions.
77 changes: 59 additions & 18 deletions ch_tools/chadmin/cli/object_storage_group.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,13 +5,16 @@

import click
from click import Context, group, option, pass_context
from humanfriendly import format_size

from ch_tools.chadmin.cli import get_clickhouse_config
from ch_tools.chadmin.internal.object_storage import (
ObjListItem,
cleanup_s3_object_storage,
s3_object_storage_iterator,
)
from ch_tools.chadmin.internal.utils import execute_query
from ch_tools.common.cli.formatting import print_response
from ch_tools.common.cli.parameters import TimeSpanParamType
from ch_tools.common.clickhouse.config.storage_configuration import S3DiskConfiguration

Expand Down Expand Up @@ -137,7 +140,7 @@ def clean_command(
try:
execute_query(
ctx,
f"CREATE TABLE IF NOT EXISTS {listing_table} (obj_path String) ENGINE MergeTree ORDER BY obj_path",
f"CREATE TABLE IF NOT EXISTS {listing_table} (obj_path String, obj_size UInt64) ENGINE MergeTree ORDER BY obj_path",
)
_clean_object_storage(
ctx,
Expand All @@ -153,7 +156,7 @@ def clean_command(
finally:
if not keep_paths:
execute_query(
ctx, f"TRUNCATE TABLE IF EXISTS {listing_table}", format_=None
ctx, f"DROP TABLE IF EXISTS {listing_table} SYNC", format_=None
)


Expand All @@ -175,9 +178,12 @@ def _clean_object_storage(
prefix = object_name_prefix or disk_conf.prefix

if not use_saved_list:
click.echo(
f"Collecting objects... (Disk: '{disk_conf.name}', Endpoint '{disk_conf.endpoint_url}', "
f"Bucket: {disk_conf.bucket_name}, Prefix: '{prefix}')"
logging.info(
"Collecting objects... (Disk: '%s', Endpoint '%s', Bucket: '%s', Prefix: '%s')",
disk_conf.name,
disk_conf.endpoint_url,
disk_conf.bucket_name,
prefix,
)
_traverse_object_storage(ctx, listing_table, from_time, to_time, prefix)

Expand All @@ -188,19 +194,20 @@ def _clean_object_storage(
)

antijoin_query = f"""
SELECT obj_path FROM {listing_table} AS object_storage
SELECT obj_path, obj_size FROM {listing_table} AS object_storage
LEFT ANTI JOIN {remote_data_paths_table} AS object_table
ON object_table.remote_path = object_storage.obj_path
AND object_table.disk_name = '{disk_conf.name}'
"""
logging.info("Antijoin query: %s", antijoin_query)

if dry_run:
click.echo("Counting orphaned objects...")
logging.info("Counting orphaned objects...")
else:
click.echo("Deleting orphaned objects...")
logging.info("Deleting orphaned objects...")

deleted = 0
total_size = 0
with TemporaryFile() as keys_file:
with execute_query(
ctx, antijoin_query, stream=True, format_="TabSeparated"
Expand All @@ -210,11 +217,45 @@ def _clean_object_storage(
keys_file.write(chunk)

keys_file.seek(0) # rewind file pointer to the beginning
keys = (line.decode().strip() for line in keys_file)
deleted = cleanup_s3_object_storage(disk_conf, keys, dry_run)

click.echo(
f"{'Would delete' if dry_run else 'Deleted'} {deleted} objects from bucket [{disk_conf.bucket_name}] with prefix {prefix}"
keys = (
ObjListItem.from_tab_separated(line.decode().strip()) for line in keys_file
)
deleted, total_size = cleanup_s3_object_storage(disk_conf, keys, dry_run)

logging.info(
"%s %s objects with total size %s from bucket [%s] with prefix %s",
"Would delete" if dry_run else "Deleted",
deleted,
format_size(total_size, binary=True),
disk_conf.bucket_name,
prefix,
)
_print_response(ctx, dry_run, deleted, total_size)


def _print_response(ctx: Context, dry_run: bool, deleted: int, total_size: int) -> None:
"""
Outputs result of cleaning.
"""
# List of dicts for print_response()
clean_stats = [
{"WouldDelete" if dry_run else "Deleted": deleted, "TotalSize": total_size}
]

def _table_formatter(stats):
result = {}

if "Deleted" in stats:
result["Deleted"] = stats["Deleted"]
if "WouldDeleted" in stats:
result["WouldDeleted"] = stats["WouldDeleted"]
result["TotalSize"] = format_size(stats["TotalSize"], binary=True)

return result

print_response(
ctx, clean_stats, default_format="table", table_formatter=_table_formatter
)


Expand All @@ -228,7 +269,7 @@ def _traverse_object_storage(
"""
Traverse S3 disk's bucket and put object names to the ClickHouse table.
"""
obj_paths_batch = []
obj_paths_batch: List[ObjListItem] = []
counter = 0
now = datetime.now(timezone.utc)

Expand All @@ -240,7 +281,7 @@ def _traverse_object_storage(
if from_time is not None and obj.last_modified < now - from_time:
continue

obj_paths_batch.append(obj.key)
obj_paths_batch.append(ObjListItem(obj.key, obj.size))
counter += 1
if len(obj_paths_batch) >= INSERT_BATCH_SIZE:
_insert_listing_batch(ctx, obj_paths_batch, listing_table)
Expand All @@ -250,19 +291,19 @@ def _traverse_object_storage(
if obj_paths_batch:
_insert_listing_batch(ctx, obj_paths_batch, listing_table)

click.echo(f"Collected {counter} objects")
logging.info("Collected %s objects", counter)


def _insert_listing_batch(
ctx: Context, obj_paths_batch: List[str], listing_table: str
ctx: Context, obj_paths_batch: List[ObjListItem], listing_table: str
) -> None:
"""
Insert batch of object names to the listing table.
"""
batch_values = ",".join(f"('{obj_path}')" for obj_path in obj_paths_batch)
batch_values = ",".join(f"('{item.path}',{item.size})" for item in obj_paths_batch)
execute_query(
ctx,
f"INSERT INTO {listing_table} (obj_path) VALUES {batch_values}",
f"INSERT INTO {listing_table} (obj_path, obj_size) VALUES {batch_values}",
format_=None,
)

Expand Down
1 change: 1 addition & 0 deletions ch_tools/chadmin/internal/object_storage/__init__.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
from ch_tools.chadmin.internal.object_storage.obj_list_item import ObjListItem
from ch_tools.chadmin.internal.object_storage.s3_cleanup import (
cleanup_s3_object_storage,
)
Expand Down
16 changes: 16 additions & 0 deletions ch_tools/chadmin/internal/object_storage/obj_list_item.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
from dataclasses import dataclass


@dataclass
class ObjListItem:
"""
Item of object storage listing.
"""

path: str
size: int

@classmethod
def from_tab_separated(cls, value: str) -> "ObjListItem":
path, size = value.split("\t")
return cls(path, int(size))
17 changes: 10 additions & 7 deletions ch_tools/chadmin/internal/object_storage/s3_cleanup.py
Original file line number Diff line number Diff line change
@@ -1,17 +1,18 @@
from typing import Any, Iterator, List
from typing import Any, Iterator, List, Tuple

import boto3
from botocore.client import Config # type: ignore[import]
from botocore.client import Config

from ch_tools.chadmin.internal.object_storage import ObjListItem
from ch_tools.chadmin.internal.utils import chunked
from ch_tools.common.clickhouse.config.storage_configuration import S3DiskConfiguration

BULK_DELETE_CHUNK_SIZE = 1000


def cleanup_s3_object_storage(
disk: S3DiskConfiguration, keys: Iterator[str], dry_run: bool = False
) -> int:
disk: S3DiskConfiguration, keys: Iterator[ObjListItem], dry_run: bool = False
) -> Tuple[int, int]:
s3 = boto3.resource(
"s3",
endpoint_url=disk.endpoint_url,
Expand All @@ -21,15 +22,17 @@ def cleanup_s3_object_storage(
)
bucket = s3.Bucket(disk.bucket_name)
deleted = 0
total_size = 0

for chunk in chunked(keys, BULK_DELETE_CHUNK_SIZE):
if not dry_run:
_bulk_delete(bucket, chunk)
deleted += len(chunk)
total_size += sum(item.size for item in chunk)

return deleted
return deleted, total_size


def _bulk_delete(bucket: Any, keys: List[str]) -> None:
objects = [{"Key": key} for key in keys]
def _bulk_delete(bucket: Any, items: List[ObjListItem]) -> None:
objects = [{"Key": item.path} for item in items]
bucket.delete_objects(Delete={"Objects": objects, "Quiet": False})
Loading

0 comments on commit 4c7c9d9

Please sign in to comment.