Skip to content

Commit

Permalink
Not full walk before reattach partition (#249)
Browse files Browse the repository at this point in the history
* init

* fix style

* all update
  • Loading branch information
k-morozov authored Nov 5, 2024
1 parent d49b7c9 commit 707e6ff
Show file tree
Hide file tree
Showing 2 changed files with 109 additions and 92 deletions.
198 changes: 109 additions & 89 deletions ch_tools/chadmin/cli/data_store_group.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
import shutil
import subprocess
from pathlib import Path
from typing import Dict, List, Optional, Tuple
from typing import List, NamedTuple, Optional, Set, Tuple

import boto3
from click import Context, group, option, pass_context
Expand Down Expand Up @@ -33,6 +33,11 @@
ATTACH_DETACH_QUERY_RETRY = 10


class TablePartition(NamedTuple):
table: str
partition: str


@group("data-store", cls=Chadmin)
def data_store_group():
"""
Expand Down Expand Up @@ -350,34 +355,6 @@ def collect_orphaned_sql_objects_recursive(
@constraint(AcceptAtMost(1), ["detach", "reattach"])
@pass_context
def detect_broken_partitions(ctx, root_path, reattach, detach):
parts_paths_with_lost_keys = find_paths_to_part_with_lost_keys(ctx, root_path)
partition_list = get_partitions_by_path(ctx, parts_paths_with_lost_keys)

print_response(ctx, partition_list, default_format="table")

if reattach or detach:
for partition_info in partition_list:
if reattach:
handle_partition(
ctx, partition_info["table"], partition_info["partition"]
)
if detach:
handle_partition(
ctx,
partition_info["table"],
partition_info["partition"],
attach=False,
detach=True,
)


def find_paths_to_part_with_lost_keys(ctx: Context, root_path: str) -> List[str]:
"""
Find paths of parts with keys that doesn't have objects in s3.
"""

result = []

ch_config = get_clickhouse_config(ctx)

disk_conf: S3DiskConfiguration = (
Expand All @@ -391,6 +368,7 @@ def find_paths_to_part_with_lost_keys(ctx: Context, root_path: str) -> List[str]
aws_access_key_id=disk_conf.access_key_id,
aws_secret_access_key=disk_conf.secret_access_key,
)
repaired_partitions = set()

for path, _, files in os.walk(root_path):
objects: List[S3ObjectLocalInfo] = []
Expand All @@ -400,19 +378,59 @@ def find_paths_to_part_with_lost_keys(ctx: Context, root_path: str) -> List[str]
objects.extend(S3ObjectLocalMetaData.from_file(file_full_path).objects)

for s3_object in objects:
full_key = os.path.join(disk_conf.prefix, s3_object.key)
if not check_key_in_object_storage(
s3_client, disk_conf.bucket_name, full_key
object_storage_key = os.path.join(disk_conf.prefix, s3_object.key)

if check_key_in_object_storage(
s3_client, disk_conf.bucket_name, object_storage_key
):
logging.debug("Not found key {}", full_key)
result.append(path)
continue

logging.debug("Not found key {}", object_storage_key)

table_partition = get_partition_by_path(ctx, path)

if table_partition is None:
logging.warning("Skip failed path {}.", path)
break

if table_partition not in repaired_partitions:
repaired_partitions.add(table_partition)

logging.debug(
"Add part to check with path", result[-1][0], result[-1][1]
"Add part to check with path={}, table={}, partition={}",
path,
table_partition.table,
table_partition.partition,
)
try_repair_partition(ctx, table_partition, detach, reattach)
else:
logging.debug(
"Partition {} for table {} was already repared. Skip.",
table_partition.partition,
table_partition.table,
)
break

logging.debug("Found parts with missing s3 keys. Local paths of parts: {}", result)
return result
break

print_partitions(ctx, repaired_partitions)

logging.debug(
"Found parts with missing s3 keys. Local paths of parts: {}",
repaired_partitions,
)


def try_repair_partition(
ctx: Context, table_partition: TablePartition, detach: bool, attach: bool
) -> None:
"""
Try to repair broken partition with DETACH and ATTACH.
"""
if detach:
detach_partition(ctx, table_partition)

if attach:
detach_partition(ctx, table_partition)


def check_key_in_object_storage(s3_client: boto3.client, bucket: str, key: str) -> bool:
Expand All @@ -431,37 +449,41 @@ def check_key_in_object_storage(s3_client: boto3.client, bucket: str, key: str)
return res


def get_partitions_by_path(
ctx: Context, parts_paths: List[str]
) -> List[Dict[str, str]]:
def get_partition_by_path(ctx: Context, path: str) -> Optional[TablePartition]:
"""
For each path of part match corresponding table and partition.
Get partition from path
"""
partitions = set()
for path in parts_paths:
query_string = (
f"SELECT database, table, partition FROM system.parts WHERE path='{path}/'"
)
res = execute_query(ctx, query_string, format_=OutputFormat.JSONCompact)
if "data" not in res:
logging.warning("Not found data for part with path {}", path)
continue
query_string = (
f"SELECT database, table, partition FROM system.parts WHERE path='{path}/'"
)
res = execute_query(ctx, query_string, format_=OutputFormat.JSONCompact)
if "data" not in res:
logging.warning("Not found data for part with path {}", path)
return None

if len(res["data"]) != 1 or len(res["data"][0]) != 3:
continue
table = f"`{res['data'][0][0]}`.`{res['data'][0][1]}`"
partition = res["data"][0][2]
partitions.add((table, partition))
if len(res["data"]) != 1 or len(res["data"][0]) != 3:
return None

table = f"`{res['data'][0][0]}`.`{res['data'][0][1]}`"
partition = res["data"][0][2]
return TablePartition(table, partition)


def print_partitions(ctx: Context, repaired_partitions: Set[TablePartition]) -> None:
"""
For each path of part match corresponding table and partition.
"""

# It's not really necessary, just to make output stable for tests.
partitions_list: List[Tuple[str, str]] = list(partitions)
partitions_list: List[Tuple[str, str]] = list(repaired_partitions)
partitions_list.sort()

result = [
{"table": partition[0], "partition": partition[1]}
for partition in partitions_list
]
return result

print_response(ctx, result, default_format="table")


def query_with_retry(ctx: Context, query: str, timeout: int, retries: int) -> bool:
Expand All @@ -484,37 +506,35 @@ def query_with_retry(ctx: Context, query: str, timeout: int, retries: int) -> bo
return True


def handle_partition(
ctx: Context, table: str, partition: str, detach: bool = True, attach: bool = True
) -> bool:
def detach_partition(ctx: Context, table_partition: TablePartition) -> bool:
"""
Handle the partition.
Run DETACH, ATTACH if needed.
Run DETACH the partition.
"""
if not detach and not attach:
return False

if detach:
logging.debug(f"Going to attach partition {partition} for table {table}")
detach_query = f"ALTER TABLE {table} DETACH PARTITION '{partition}'"
res = query_with_retry(
ctx,
detach_query,
timeout=ATTACH_DETTACH_TIMEOUT,
retries=ATTACH_DETACH_QUERY_RETRY,
)
if not res:
return res
if attach:
logging.debug(f"Going to detach partition {partition} for table {table}")
attach_query = f"ALTER TABLE {table} ATTACH PARTITION '{partition}'"
# To avoid keeping detached partitions, perform the attach query with double attempts.
res = query_with_retry(
ctx,
attach_query,
timeout=ATTACH_DETTACH_TIMEOUT,
retries=2 * ATTACH_DETACH_QUERY_RETRY,
)
if not res:
return res
return True
logging.debug(
f"Going to detach partition {table_partition.partition} for table {table_partition.table}"
)
detach_query = f"ALTER TABLE {table_partition.table} DETACH PARTITION '{table_partition.partition}'"
return query_with_retry(
ctx,
detach_query,
timeout=ATTACH_DETTACH_TIMEOUT,
retries=ATTACH_DETACH_QUERY_RETRY,
)


def attach_partition(ctx: Context, table_partition: TablePartition) -> bool:
"""
Run ATTACH the partition.
"""
logging.debug(
f"Going to attach partition {table_partition.partition} for table {table_partition.table}"
)
attach_query = f"ALTER TABLE {table_partition.table} ATTACH PARTITION '{table_partition.partition}'"
# To avoid keeping detached partitions, perform the attach query with double attempts.
return query_with_retry(
ctx,
attach_query,
timeout=ATTACH_DETTACH_TIMEOUT,
retries=2 * ATTACH_DETACH_QUERY_RETRY,
)
3 changes: 0 additions & 3 deletions tests/features/data_storage_group.feature
Original file line number Diff line number Diff line change
Expand Up @@ -208,9 +208,6 @@ Feature: chadmin data-store commands
SELECT * FROM test_db.table1;
SELECT * FROM test_db.table2;
"""
Then we get response contains
"""
"""

@require_version_24.4
Examples:
Expand Down

0 comments on commit 707e6ff

Please sign in to comment.