Skip to content

Commit

Permalink
Small refactoring
Browse files Browse the repository at this point in the history
  • Loading branch information
aalexfvk committed Jan 26, 2024
1 parent 12e1ba9 commit b6ac017
Showing 1 changed file with 42 additions and 21 deletions.
63 changes: 42 additions & 21 deletions ch_tools/chadmin/cli/object_storage_group.py
Original file line number Diff line number Diff line change
Expand Up @@ -117,7 +117,6 @@ def clean_object_storage(
)

disk_conf: S3DiskConfiguration = ctx.obj["disk_configuration"]
now = datetime.now(timezone.utc)

remote_data_paths_table = "system.remote_data_paths"
if on_cluster:
Expand All @@ -131,26 +130,9 @@ def clean_object_storage(
ctx,
f"CREATE TABLE IF NOT EXISTS {listing_table} (obj_path String) ENGINE MergeTree ORDER BY obj_path",
)

obj_paths_batch = []
counter = 0
for _, obj in s3_object_storage_iterator(
disk_conf, object_name_prefix=object_name_prefix
):
if obj.last_modified > now - to_time:
continue
if from_time is not None and obj.last_modified < now - from_time:
continue

obj_paths_batch.append(obj.key)
counter += 1
if len(obj_paths_batch) >= INSERT_BATCH_SIZE:
_insert_listing_batch(ctx, obj_paths_batch, listing_table)
obj_paths_batch.clear()

# Insert last batch (might be shorter)
if obj_paths_batch:
_insert_listing_batch(ctx, obj_paths_batch, listing_table)
_traverse_object_storage(
ctx, listing_table, from_time, to_time, object_name_prefix
)

antijoin_query = f"""
SELECT obj_path FROM {listing_table} AS object_storage
Expand Down Expand Up @@ -180,9 +162,45 @@ def clean_object_storage(
)


def _traverse_object_storage(
ctx: Context,
listing_table: str,
from_time: Optional[timedelta],
to_time: timedelta,
prefix: str,
) -> None:
"""
Traverse S3 disk's bucket and put object names to the ClickHouse table.
"""
obj_paths_batch = []
counter = 0
now = datetime.now(timezone.utc)

for _, obj in s3_object_storage_iterator(
ctx.obj["disk_configuration"], object_name_prefix=prefix
):
if obj.last_modified > now - to_time:
continue
if from_time is not None and obj.last_modified < now - from_time:
continue

obj_paths_batch.append(obj.key)
counter += 1
if len(obj_paths_batch) >= INSERT_BATCH_SIZE:
_insert_listing_batch(ctx, obj_paths_batch, listing_table)
obj_paths_batch.clear()

# Insert the last batch (might be shorter)
if obj_paths_batch:
_insert_listing_batch(ctx, obj_paths_batch, listing_table)


def _insert_listing_batch(
ctx: Context, obj_paths_batch: List[str], listing_table: str
) -> None:
"""
Insert batch of object names to the listing table.
"""
batch_values = ",".join(f"('{obj_path}')" for obj_path in obj_paths_batch)
execute_query(
ctx,
Expand All @@ -192,6 +210,9 @@ def _insert_listing_batch(


def _set_boto_log_level(level: int) -> None:
"""
Set log level for libraries involved in communications with S3.
"""
logging.getLogger("boto3").setLevel(level)
logging.getLogger("botocore").setLevel(level)
logging.getLogger("nose").setLevel(level)
Expand Down

0 comments on commit b6ac017

Please sign in to comment.