diff --git a/bin/remove-message-attachments.py b/bin/remove-message-attachments.py index 6d3b6c3cc..903bf4302 100755 --- a/bin/remove-message-attachments.py +++ b/bin/remove-message-attachments.py @@ -3,9 +3,10 @@ import enum import logging from collections.abc import Iterable +from concurrent.futures import ThreadPoolExecutor import click -from sqlalchemy.orm import Query, joinedload +from sqlalchemy.orm import Query from sqlalchemy.sql import func from inbox.logging import configure_logging, get_logger @@ -17,23 +18,25 @@ log = get_logger() -BATCH_SIZE = 1000 +DEFAULT_DELETE_BATCH_SIZE = 100 +DEFAULT_BATCH_SIZE = 1000 class Resolution(enum.Enum): NOT_PRESENT = "not-present" DELETE = "delete" - WOULD_DELETE = "would-delete" def find_blocks( limit: "int | None", after: "datetime.datetime | None", before: "datetime.datetime | None", + after_id: "int | None", + before_id: "int | None", + batch_size: int, ) -> "Iterable[tuple[Block, int]]": query = ( Query([Block]) - .options(joinedload(Block.parts)) .filter(Block.size > 0) # empty blocks are not stored in S3 .order_by(Block.id) ) @@ -42,6 +45,10 @@ def find_blocks( query = query.filter(Block.created_at >= after) if before: query = query.filter(Block.created_at < before) + if after_id: + query = query.filter(Block.id >= after_id) + if before_id: + query = query.filter(Block.id < before_id) inner_max_id_query = query.with_entities(Block.id) if limit is not None: @@ -50,14 +57,14 @@ def find_blocks( with global_session_scope() as db_session: max_id = db_session.query(func.max(inner_max_id_query.subquery().c.id)).scalar() - yielded = 0 - last_id = 0 + offset = 0 + start_id = 1 if after_id is None else after_id while True: with global_session_scope() as db_session: block_batch = ( - query.filter(Block.id > last_id) - .limit(min(limit, BATCH_SIZE) if limit is not None else BATCH_SIZE) + query.filter(Block.id >= start_id) + .limit(min(limit, batch_size) if limit is not None else batch_size) .with_session(db_session) .all() ) @@ -65,57 +72,111 @@ def find_blocks( if not block_batch: return + seen_sha256s = set() for block in block_batch: - if limit is not None and yielded >= limit: + if limit is not None and offset >= limit: return - yield block, max_id - yielded += 1 # noqa: SIM113 + if block.data_sha256 not in seen_sha256s: + yield block, max_id + seen_sha256s.add(block.data_sha256) - last_id = block_batch[-1].id + offset += 1 # noqa: SIM113 + + start_id = block_batch[-1].id + 1 + + +def delete_batch(delete_sha256s: "set[str]", dry_run: bool) -> None: + if not delete_sha256s: + return + + if not dry_run: + blockstore.delete_from_blockstore(*delete_sha256s) + print("deleted", len(delete_sha256s), "blobs") + else: + print("would-delete", len(delete_sha256s), "blobs") @click.command() @click.option("--limit", type=int, default=None) @click.option("--after", type=str, default=None) @click.option("--before", type=str, default=None) +@click.option("--after-id", type=int, default=None) +@click.option("--before-id", type=int, default=None) +@click.option("--batch-size", type=int, default=DEFAULT_BATCH_SIZE) +@click.option("--delete-batch-size", type=int, default=DEFAULT_DELETE_BATCH_SIZE) +@click.option("--repeat", type=int, default=1) @click.option("--dry-run/--no-dry-run", default=True) @click.option("--check-existence/--no-check-existence", default=False) def run( limit: "int | None", after: "str | None", before: "str | None", + after_id: "int | None", + before_id: "int | None", + batch_size: int, + delete_batch_size: int, + repeat: int, dry_run: bool, check_existence: bool, -) -> None: - blocks = find_blocks( - limit, - datetime.datetime.fromisoformat(after) if after else None, - datetime.datetime.fromisoformat(before) if before else None, - ) - - for block, max_id in blocks: - if check_existence: - data = blockstore.get_from_blockstore(block.data_sha256) - else: - data = ... # assume it exists, it's OK to delete non-existent data - - if data is None: - resolution = Resolution.NOT_PRESENT - else: - resolution = Resolution.DELETE if not dry_run else Resolution.WOULD_DELETE - - print( - f"{block.id}/{max_id}", - block.created_at.date(), - resolution.value, - block.data_sha256, - block.size if data else None, - len(block.parts), +) -> int: + assert batch_size > 0 + assert delete_batch_size > 0 + + delete_executor = ThreadPoolExecutor(max_workers=10) + + for repetition in range(repeat): + blocks = find_blocks( + limit, + datetime.datetime.fromisoformat(after) if after else None, + datetime.datetime.fromisoformat(before) if before else None, + after_id, + before_id, + batch_size, ) - if resolution is Resolution.DELETE: - blockstore.delete_from_blockstore(block.data_sha256) + delete_sha256s = set() + + max_id = None + for block, max_id in blocks: + if check_existence: + data = blockstore.get_from_blockstore(block.data_sha256) + else: + data = ... # assume it exists, it's OK to delete non-existent data + + if data is None: + resolution = Resolution.NOT_PRESENT + else: + resolution = Resolution.DELETE + + print_arguments = [ + f"{block.id}/{max_id}", + block.created_at.date(), + resolution.value, + block.data_sha256, + block.size if data else None, + ] + + if repeat != 1: + print_arguments.insert(0, repetition) + + print(*print_arguments) + + if resolution is Resolution.DELETE: + delete_sha256s.add(block.data_sha256) + + if len(delete_sha256s) >= delete_batch_size: + delete_executor.submit(delete_batch, delete_sha256s.copy(), dry_run) + delete_sha256s.clear() + + delete_batch(delete_sha256s, dry_run) + + if max_id is None: + return + + after_id = max_id + 1 + + delete_executor.shutdown(wait=True) if __name__ == "__main__":