Skip to content

Commit

Permalink
Stream response from file. Fix review notes
Browse files Browse the repository at this point in the history
  • Loading branch information
aalexfvk committed Jan 30, 2024
1 parent 2c31212 commit 3dfdae2
Show file tree
Hide file tree
Showing 12 changed files with 105 additions and 42 deletions.
1 change: 0 additions & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -184,7 +184,6 @@ test-unit: install-deps
.PHONY: test-integration
test-integration: install-deps build-python-packages
cd $(TESTS_DIR)
$(POETRY) run $(PYTHON) -m env_control create
$(POETRY) run behave --show-timings --stop --junit $(BEHAVE_ARGS)


Expand Down
2 changes: 1 addition & 1 deletion ch_tools/__init__.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,3 @@
"""A set of tools for administration and diagnostics of ClickHouse DBMS."""

__version__ = "2.548.182050718"
__version__ = "1.0.0"
57 changes: 40 additions & 17 deletions ch_tools/chadmin/cli/object_storage_group.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
import logging
from datetime import datetime, timedelta, timezone
from tempfile import TemporaryFile
from typing import List, Optional

import click
Expand All @@ -18,12 +19,12 @@
# And for metadata for which object is not found in S3.
# These objects are not counted if their last modified time fall in the interval from the moment of starting analyzing.
DEFAULT_GUARD_INTERVAL = "24h"
# Prefix for a listing table name
LISTING_TABLE_PREFIX = "listing_objects_from_"
# Batch size for inserts in a listing table
# Set not very big value due to default ClickHouse 'http_max_field_value_size' settings value 128Kb
# TODO: streaming upload in POST body while INSERT
INSERT_BATCH_SIZE = 500
# Use big enough timeout for stream HTTP query
STREAM_TIMEOUT = 10 * 60


@group("object-storage")
Expand Down Expand Up @@ -53,7 +54,10 @@ def object_storage_group(ctx: Context, disk_name: str) -> None:
"--object_name_prefix",
"object_name_prefix",
default="",
help="Additional prefix of object name used while listing bucket.",
help=(
"Prefix of object name used while listing bucket. By default its value is attempted to parse "
"from endpoint in clickhouse S3 disk config"
),
)
@option(
"--from-time",
Expand Down Expand Up @@ -98,6 +102,12 @@ def object_storage_group(ctx: Context, disk_name: str) -> None:
is_flag=True,
help=("Do not delete collected paths of objects from object storage."),
)
@option(
"--use-saved-list",
"use_saved_list",
is_flag=True,
help=("Use saved object list without traversing object storage again."),
)
@pass_context
def clean_command(
ctx: Context,
Expand All @@ -108,6 +118,7 @@ def clean_command(
cluster_name: str,
dry_run: bool,
keep_paths: bool,
use_saved_list: bool,
) -> None:
"""
Clean orphaned S3 objects.
Expand All @@ -119,7 +130,9 @@ def clean_command(
)

disk_conf: S3DiskConfiguration = ctx.obj["disk_configuration"]
listing_table = LISTING_TABLE_PREFIX + disk_conf.name
config = ctx.obj["config"]["object_storage"]["clean"]

listing_table = f"{config['listing_table_database']}.{config['listing_table_prefix']}{disk_conf.name}"
# Create listing table for storing paths from object storage
try:
execute_query(
Expand All @@ -135,6 +148,7 @@ def clean_command(
cluster_name,
dry_run,
listing_table,
use_saved_list,
)
finally:
if not keep_paths:
Expand All @@ -152,16 +166,20 @@ def _clean_object_storage(
cluster_name: str,
dry_run: bool,
listing_table: str,
use_saved_list: bool,
) -> None:
"""
Delete orphaned objects from object storage.
"""
disk_conf: S3DiskConfiguration = ctx.obj["disk_configuration"]
click.echo(
f"Collecting objects for S3 disk '{disk_conf.name}' with endpoint '{disk_conf.endpoint_url}' "
f"in bucket [{disk_conf.bucket_name}] with prefix '{disk_conf.prefix}'"
)
_traverse_object_storage(ctx, listing_table, from_time, to_time, object_name_prefix)
prefix = object_name_prefix or disk_conf.prefix

if not use_saved_list:
click.echo(
f"Collecting objects... (Disk: '{disk_conf.name}', Endpoint '{disk_conf.endpoint_url}', "
f"Bucket: {disk_conf.bucket_name}], Prefix: '{prefix}')"
)
_traverse_object_storage(ctx, listing_table, from_time, to_time, prefix)

remote_data_paths_table = "system.remote_data_paths"
if on_cluster:
Expand All @@ -183,15 +201,20 @@ def _clean_object_storage(
click.echo("Deleting orphaned objects...")

deleted = 0
with execute_query(
ctx, antijoin_query, stream=True, format_="TabSeparated"
) as resp:
# make generator for lazy iterating
paths_to_delete = (line.decode() for line in resp.iter_lines())
deleted = cleanup_s3_object_storage(disk_conf, paths_to_delete, dry_run)
with TemporaryFile() as keys_file:
with execute_query(
ctx, antijoin_query, stream=True, format_="TabSeparated"
) as resp:
# Save response to the file by chunks
for chunk in resp.iter_content(chunk_size=8192):
keys_file.write(chunk)

keys_file.seek(0) # rewind file pointer to the beginning
keys = (line.decode().strip() for line in keys_file)
deleted = cleanup_s3_object_storage(disk_conf, keys, dry_run)

click.echo(
f"{'Would delete' if dry_run else 'Deleted'} {deleted} objects from bucket [{disk_conf.bucket_name}] with prefix {disk_conf.prefix}"
f"{'Would delete' if dry_run else 'Deleted'} {deleted} objects from bucket [{disk_conf.bucket_name}] with prefix {prefix}"
)


Expand All @@ -209,7 +232,7 @@ def _traverse_object_storage(
counter = 0
now = datetime.now(timezone.utc)

for _, obj in s3_object_storage_iterator(
for obj in s3_object_storage_iterator(
ctx.obj["disk_configuration"], object_name_prefix=prefix
):
if obj.last_modified > now - to_time:
Expand Down
2 changes: 1 addition & 1 deletion ch_tools/chadmin/internal/object_storage/s3_cleanup.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
from ch_tools.chadmin.internal.utils import chunked
from ch_tools.common.clickhouse.config.storage_configuration import S3DiskConfiguration

BULK_DELETE_CHUNK_SIZE = 100
BULK_DELETE_CHUNK_SIZE = 1000


def cleanup_s3_object_storage(
Expand Down
19 changes: 6 additions & 13 deletions ch_tools/chadmin/internal/object_storage/s3_iterator.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
from typing import Any, Iterator, Optional, Tuple
from typing import Any, Iterator

import boto3 # type: ignore[import]
from botocore.client import Config
Expand All @@ -12,10 +12,9 @@
def s3_object_storage_iterator(
disk: S3DiskConfiguration,
*,
path_prefix: Optional[str] = None,
object_name_prefix: str = "",
skip_ignoring: bool = False
) -> Iterator[Tuple[str, ObjectSummary]]:
) -> Iterator[ObjectSummary]:
s3 = boto3.resource(
"s3",
endpoint_url=disk.endpoint_url,
Expand All @@ -25,17 +24,11 @@ def s3_object_storage_iterator(
)
bucket = s3.Bucket(disk.bucket_name)

if not path_prefix:
path_prefix = disk.prefix

for obj in bucket.objects.filter(Prefix=path_prefix + object_name_prefix):
name: str = obj.key[len(path_prefix) :]

if not skip_ignoring and _is_ignored(name):
for obj in bucket.objects.filter(Prefix=object_name_prefix):
if not skip_ignoring and _is_ignored(obj.key):
continue

yield name, obj
yield obj


def _is_ignored(name: str) -> bool:
return any(name.startswith(p) for p in IGNORED_OBJECT_NAME_PREFIXES)
return any(p in name for p in IGNORED_OBJECT_NAME_PREFIXES)
2 changes: 2 additions & 0 deletions ch_tools/chadmin/internal/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ def execute_query(
dry_run=False,
format_="default",
stream=False,
settings=None,
**kwargs
):
"""
Expand All @@ -32,6 +33,7 @@ def execute_query(
echo=echo,
dry_run=dry_run,
stream=stream,
settings=settings,
)


Expand Down
3 changes: 3 additions & 0 deletions ch_tools/common/clickhouse/client/clickhouse_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,7 @@ def query(
echo: bool = False,
dry_run: bool = False,
stream: bool = False,
settings: Optional[dict] = None,
) -> Any:
"""
Execute query.
Expand All @@ -82,6 +83,7 @@ def query(
return None

timeout = max(self._timeout, timeout or 0)
per_query_settings = settings or {}

logging.debug("Executing query: %s", query)
try:
Expand All @@ -90,6 +92,7 @@ def query(
params={
**self._settings,
"query": query,
**per_query_settings, # overwrites previous settings
},
json=post_data,
timeout=timeout,
Expand Down
6 changes: 6 additions & 0 deletions ch_tools/common/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,12 @@
"monitoring_user": None,
"monitoring_password": None,
},
"object_storage": {
"clean": {
"listing_table_prefix": "listing_objects_from_",
"listing_table_database": "default",
}
},
}


Expand Down
2 changes: 1 addition & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ build-backend = "poetry.core.masonry.api"

[tool.poetry]
name = "clickhouse-tools"
version = "2.548.182050718"
version = "1.0.0"
license = "MIT"
description = "clickhouse-tools is a set of tools for administration and diagnostics of ClickHouse DBMS."

Expand Down
2 changes: 0 additions & 2 deletions tests/environment.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,6 @@
from modules.logs import save_logs
from modules.utils import version_ge, version_lt

REQUIRE_VERSION_PREFIX_LEN = len("require_version_")

try:
import ipdb as pdb
except ImportError:
Expand Down
49 changes: 44 additions & 5 deletions tests/features/object_storage.feature
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@ Feature: chadmin object-storage commands
"""
Then S3 contains greater than 0 objects

@require_version_22.8
Scenario: Dry-run clean on one replica with guard period
When we execute command on clickhouse01
"""
Expand All @@ -37,7 +36,6 @@ Feature: chadmin object-storage commands
Would delete [1-9][0-9]* objects from bucket \[cloud-storage-test\]
"""

@require_version_22.8
Scenario: Dry-run clean on cluster with guard period
When we execute command on clickhouse01
"""
Expand All @@ -56,7 +54,6 @@ Feature: chadmin object-storage commands
Would delete 0 objects from bucket [cloud-storage-test]
"""

@require_version_22.8
Scenario: Clean orphaned objects
When we put object in S3
"""
Expand Down Expand Up @@ -86,12 +83,11 @@ Feature: chadmin object-storage commands
path: /data/orpaned_object.tsv
"""

@require_version_22.8
Scenario: Clean many orphaned objects
When we put 100 objects in S3
"""
bucket: cloud-storage-test
path: /data/orpaned_object.tsv
path: /data/orpaned_object-{}
data: '1'
"""
When we execute command on clickhouse01
Expand All @@ -118,3 +114,46 @@ Feature: chadmin object-storage commands
"""
Would delete 0 objects from bucket [cloud-storage-test]
"""

Scenario: Clean orphaned objects with prefix
When we put object in S3
"""
bucket: cloud-storage-test
path: /data_1/orpaned_object.tsv
data: '1'
"""
When we put object in S3
"""
bucket: cloud-storage-test
path: /data_2/orpaned_object.tsv
data: '1'
"""
When we execute command on clickhouse01
"""
chadmin object-storage clean --dry-run --to-time 0h --on-cluster --prefix "/data_1"
"""
Then we get response contains
"""
Would delete 1 objects from bucket [cloud-storage-test]
"""
When we execute command on clickhouse01
"""
chadmin object-storage clean --to-time 0h --on-cluster --prefix "/data_1"
"""
Then we get response contains
"""
Deleted 1 objects from bucket [cloud-storage-test]
"""
And path does not exist in S3
"""
bucket: cloud-storage-test
path: /data_1/orpaned_object.tsv
"""
When we execute command on clickhouse01
"""
chadmin object-storage clean --dry-run --to-time 0h --on-cluster --prefix "/data_2"
"""
Then we get response contains
"""
Would delete 1 objects from bucket [cloud-storage-test]
"""
2 changes: 1 addition & 1 deletion tests/steps/s3.py
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ def step_put_file_count_in_s3(context, count):
conf = get_step_data(context)
s3_client = s3.S3Client(context, conf["bucket"])
for i in range(count):
path = f"{conf['path']}-{i}"
path = f"{conf['path'].format(i)}"
s3_client.upload_data(conf["data"], path)
assert s3_client.path_exists(path)

Expand Down

0 comments on commit 3dfdae2

Please sign in to comment.