Skip to content

Commit

Permalink
Rework clean command to use remote_data_paths table
Browse files Browse the repository at this point in the history
  • Loading branch information
aalexfvk committed Jan 25, 2024
1 parent 1746a7a commit 75f431a
Show file tree
Hide file tree
Showing 17 changed files with 808 additions and 633 deletions.
299 changes: 112 additions & 187 deletions ch_tools/chadmin/cli/object_storage_group.py
Original file line number Diff line number Diff line change
@@ -1,52 +1,30 @@
import contextlib
import json
import logging
import sys
from datetime import datetime, timedelta, timezone
from gzip import GzipFile
from io import TextIOWrapper
from pathlib import Path
from typing import BinaryIO, Dict, List, Optional, Union
from typing import List, Optional

import click
from click import Context, group, option, pass_context

from ch_tools.chadmin.cli import get_clickhouse_config
from ch_tools.chadmin.internal.object_storage import (
ObjectSummary,
S3DiskConfiguration,
S3ObjectLocalMetaData,
cleanup_s3_object_storage,
collect_metadata,
s3_object_storage_iterator,
)
from ch_tools.chadmin.internal.utils import execute_query
from ch_tools.common.cli.parameters import TimeSpanParamType
from ch_tools.common.clickhouse.config.storage_configuration import S3DiskConfiguration

STORAGE_POLICY_CONFIG_PATH = Path("/etc/clickhouse-server/config.d/storage_policy.xml")
# The guard interval is used for S3 objects for which metadata is not found.
# And for metadata for which object is not found in S3.
# These objects are not counted if their last modified time fall in the interval from the moment of starting analyzing.
DEFAULT_GUARD_INTERVAL = "24h"


def get_disk_metadata_paths(disk_name: str) -> List[Path]:
return [
Path(f"/var/lib/clickhouse/disks/{disk_name}/store"), # Atomic database engine
Path(f"/var/lib/clickhouse/disks/{disk_name}/data"), # Ordinary database engine
Path(f"/var/lib/clickhouse/disks/{disk_name}/shadow"), # Backups
]
# Prefix for a listing table name
LISTING_TABLE_PREFIX = "listing_objects_from_"
# Batch size for inserts in a listing table
INSERT_BATCH_SIZE = 1000


@group("object-storage")
@option(
"-c",
"--config",
"config_path",
type=click.Path(
exists=True, file_okay=True, dir_okay=False, readable=True, path_type=Path
),
default=STORAGE_POLICY_CONFIG_PATH,
help="ClickHouse storage policy config.",
)
@option(
"-d",
"--disk",
Expand All @@ -55,60 +33,25 @@ def get_disk_metadata_paths(disk_name: str) -> List[Path]:
help="S3 disk name.",
)
@pass_context
def object_storage_group(ctx: Context, config_path: Path, disk_name: str) -> None:
def object_storage_group(ctx: Context, disk_name: str) -> None:
"""Commands to manage S3 objects and their metadata."""

# Restrict excessive boto logging
_set_boto_log_level(logging.WARNING)

ctx.obj["disk_configuration"] = S3DiskConfiguration.from_config(
config_path, disk_name
)
ch_config = get_clickhouse_config(ctx)
ctx.obj[
"disk_configuration"
] = ch_config.storage_configuration.s3_disk_configuaration(disk_name)


@object_storage_group.command("list")
@option(
"-o",
"--orphaned",
"orphaned",
is_flag=True,
default=False,
help="List objects that are not referenced in the metadata.",
)
@object_storage_group.command("clean")
@option(
"-p",
"--object-name-prefix",
"--prefix",
"--object_name_prefix",
"object_name_prefix",
default="",
help="Additional prefix of object name using for listing.",
)
@option(
"-l",
"--limit",
"limit",
type=int,
help="Return at most this many objects.",
)
@option(
"-f",
"--dump-file",
"dump_file",
type=click.Path(path_type=Path),
help="Dump result to the file instead of STDOUT.",
)
@option(
"-c",
"--compressed",
"compressed",
is_flag=True,
help="Compress an output using GZIP format.",
)
@option(
"-q",
"--quiet",
"quiet",
is_flag=True,
help="Output only newline delimited object keys.",
help="Additional prefix of object name used while listing bucket.",
)
@option(
"--from-time",
Expand All @@ -129,119 +72,123 @@ def object_storage_group(ctx: Context, config_path: Path, disk_name: str) -> Non
type=TimeSpanParamType(),
help=("End of inspecting interval in human-friendly format."),
)
@option(
"--on-cluster",
"on_cluster",
is_flag=True,
help=("List objects on all hosts in a cluster."),
)
@option(
"--cluster",
"cluster_name",
default="{cluster}",
help=("Cluster to be cleaned. Default value is macro."),
)
@option(
"--dry-run",
"dry_run",
is_flag=True,
help=("Do not delete objects. Show only statistics."),
)
@option(
"--keep-paths",
"keep_paths",
is_flag=True,
help=("Do not delete collected paths of objects from object storage."),
)
@pass_context
def list_objects(
def clean_object_storage(
ctx: Context,
orphaned: bool,
object_name_prefix: str,
dump_file: Optional[Path],
compressed: bool,
quiet: bool,
from_time: Optional[timedelta],
to_time: timedelta,
limit: Optional[int],
on_cluster: bool,
cluster_name: str,
dry_run: bool,
keep_paths: bool,
) -> None:
"""
List S3 objects.
Clean orphaned S3 objects.
"""
if from_time is not None and from_time <= to_time:
if from_time is not None and to_time <= from_time:
raise click.BadParameter(
"from_time parameter must be greater than to_time",
"'to_time' parameter must be greater than 'from_time'",
param_hint="--from-time",
)

disk_conf: S3DiskConfiguration = ctx.obj["disk_configuration"]
now = datetime.now(timezone.utc)

counter = 0

with dump_writer(compressed, dump_file) as writer:
object_key_to_metadata = collect_metadata(
get_disk_metadata_paths(disk_conf.name)
remote_data_paths_table = "system.remote_data_paths"
if on_cluster:
remote_data_paths_table = (
f"clusterAllReplicas('{cluster_name}', {remote_data_paths_table})"
)
for name, obj in s3_object_storage_iterator(
disk_conf, object_name_prefix=object_name_prefix
):
if limit is not None and counter >= limit:
break

metadata = object_key_to_metadata.get(name)

if obj.last_modified > now - to_time:
continue
if from_time is not None and obj.last_modified < now - from_time:
continue

if orphaned and metadata:
continue
if not orphaned and not metadata:
continue

writer.write(_get_dump_line(obj, metadata, quiet))
counter += 1

listing_table = LISTING_TABLE_PREFIX + disk_conf.name
# Create listing table for storing paths from object storage
execute_query(
ctx,
f"CREATE TABLE IF NOT EXISTS {listing_table} (obj_path String) ENGINE MergeTree ORDER BY obj_path",
)

@object_storage_group.command("clean")
@option(
"--prefix",
"prefix",
help="Prefix path to delete objects. If not empty it has more priority than other options.",
)
@option(
"-f",
"--file",
"file",
type=click.File("rb"),
help="File containing S3 object keys delimited by newlines.",
default=sys.stdin.buffer,
show_default="STDIN",
)
@option(
"-c",
"--compressed",
"compressed",
is_flag=True,
help="Input stream is compressed using GZIP format.",
)
@pass_context
def clean_object_storage(ctx, prefix, file, compressed):
"""
Clean up needless S3 objects.
obj_paths_batch = []
counter = 0
for _, obj in s3_object_storage_iterator(
disk_conf, object_name_prefix=object_name_prefix
):
if obj.last_modified > now - to_time:
continue
if from_time is not None and obj.last_modified < now - from_time:
continue

obj_paths_batch.append(obj.key)
counter += 1
if len(obj_paths_batch) >= INSERT_BATCH_SIZE:
_insert_listing_batch(ctx, obj_paths_batch, listing_table)
obj_paths_batch.clear()

# Insert last batch (might be shorter)
if obj_paths_batch:
_insert_listing_batch(ctx, obj_paths_batch, listing_table)

antijoin_query = f"""
SELECT obj_path FROM {listing_table} AS object_storage
LEFT ANTI JOIN {remote_data_paths_table} AS object_table
ON object_table.remote_path = object_storage.obj_path
AND object_table.disk_name = '{disk_conf.name}'
"""
disk_conf: S3DiskConfiguration = ctx.obj["disk_configuration"]

if compressed:
file = GzipFile(fileobj=file)
file = TextIOWrapper(file)

if prefix:
file = (
obj.key
for _, obj in s3_object_storage_iterator(
disk_conf, path_prefix=prefix, skip_ignoring=True
)
)

lines_stripped = (
line.rstrip() for line in file
) # lazily iterate over file stripping newline
deleted = cleanup_s3_object_storage(disk_conf, lines_stripped)

click.echo(f"Deleted {deleted} objects from bucket [{disk_conf.bucket_name}]")
logging.info("Antijoin query: %s", antijoin_query)

deleted = 0
with execute_query(
ctx, antijoin_query, stream=True, format_="TabSeparated"
) as resp:
# make generator for lazy iterating
paths_to_delete = (line.decode() for line in resp.iter_lines())
if dry_run:
# just count items
deleted = sum(1 for _ in paths_to_delete)
else:
deleted = cleanup_s3_object_storage(disk_conf, paths_to_delete)

if not keep_paths:
execute_query(ctx, f"TRUNCATE {listing_table}", format_=None)

click.echo(
f"{'Would delete' if dry_run else 'Deleted'} {deleted} objects from bucket [{disk_conf.bucket_name}]"
)


@contextlib.contextmanager
def dump_writer(compressed, file_path=None):
out_file = open(file_path, "wb") if file_path is not None else sys.stdout.buffer
writer: Union[GzipFile, BinaryIO] = (
GzipFile(mode="wb", fileobj=out_file) if compressed else out_file
def _insert_listing_batch(
ctx: Context, obj_paths_batch: List[str], listing_table: str
) -> None:
batch_values = ",".join(f"('{obj_path}')" for obj_path in obj_paths_batch)
execute_query(
ctx,
f"INSERT INTO {listing_table} (obj_path) VALUES {batch_values}",
format_=None,
)
try:
yield writer
finally:
writer.flush()
if file_path is not None or compressed:
writer.close()


def _set_boto_log_level(level: int) -> None:
Expand All @@ -250,25 +197,3 @@ def _set_boto_log_level(level: int) -> None:
logging.getLogger("nose").setLevel(level)
logging.getLogger("s3transfer").setLevel(level)
logging.getLogger("urllib3").setLevel(level)


def _get_dump_line(
obj: ObjectSummary,
metadata_files: Optional[Dict[Path, S3ObjectLocalMetaData]],
quiet: bool,
) -> bytes:
if quiet:
res = obj.key
else:
res = json.dumps(
{
"object": {
"key": obj.key,
"size": obj.size,
"last_modified": str(obj.last_modified),
},
"files": list(metadata_files) if metadata_files else [],
},
default=str,
)
return f"{res}\n".encode()
2 changes: 1 addition & 1 deletion ch_tools/chadmin/internal/diagnostics/formatter.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ def format_storage(dbaas_config, ch_config):
disk_size = format_size(dbaas_config.disk_size, binary=True)

storage = f"{disk_size} {disk_type}"
if ch_config.has_disk("object_storage"):
if ch_config.storage_configuration.has_disk("object_storage"):
storage += " + S3"

return storage
4 changes: 0 additions & 4 deletions ch_tools/chadmin/internal/object_storage/__init__.py
Original file line number Diff line number Diff line change
@@ -1,10 +1,6 @@
from ch_tools.chadmin.internal.object_storage.collect_metadata import collect_metadata
from ch_tools.chadmin.internal.object_storage.s3_cleanup import (
cleanup_s3_object_storage,
)
from ch_tools.chadmin.internal.object_storage.s3_disk_configuration import (
S3DiskConfiguration,
)
from ch_tools.chadmin.internal.object_storage.s3_iterator import (
ObjectSummary,
s3_object_storage_iterator,
Expand Down
Loading

0 comments on commit 75f431a

Please sign in to comment.