From 169eaf3bd933b56de54f66af991812707ccb226a Mon Sep 17 00:00:00 2001 From: Squeaky Date: Thu, 5 Sep 2024 11:02:05 +0200 Subject: [PATCH 1/6] before_id, after_id --- bin/remove-message-attachments.py | 18 +++++++++++++++--- 1 file changed, 15 insertions(+), 3 deletions(-) diff --git a/bin/remove-message-attachments.py b/bin/remove-message-attachments.py index 6d3b6c3cc..1caad85d2 100755 --- a/bin/remove-message-attachments.py +++ b/bin/remove-message-attachments.py @@ -30,6 +30,8 @@ def find_blocks( limit: "int | None", after: "datetime.datetime | None", before: "datetime.datetime | None", + after_id: "int | None", + before_id: "int | None", ) -> "Iterable[tuple[Block, int]]": query = ( Query([Block]) @@ -42,6 +44,10 @@ def find_blocks( query = query.filter(Block.created_at >= after) if before: query = query.filter(Block.created_at < before) + if after_id: + query = query.filter(Block.id >= after_id) + if before_id: + query = query.filter(Block.id < before_id) inner_max_id_query = query.with_entities(Block.id) if limit is not None: @@ -51,12 +57,12 @@ def find_blocks( max_id = db_session.query(func.max(inner_max_id_query.subquery().c.id)).scalar() yielded = 0 - last_id = 0 + start_id = 1 if after_id is None else after_id while True: with global_session_scope() as db_session: block_batch = ( - query.filter(Block.id > last_id) + query.filter(Block.id >= start_id) .limit(min(limit, BATCH_SIZE) if limit is not None else BATCH_SIZE) .with_session(db_session) .all() @@ -72,19 +78,23 @@ def find_blocks( yield block, max_id yielded += 1 # noqa: SIM113 - last_id = block_batch[-1].id + start_id = block_batch[-1].id + 1 @click.command() @click.option("--limit", type=int, default=None) @click.option("--after", type=str, default=None) @click.option("--before", type=str, default=None) +@click.option("--after-id", type=int, default=None) +@click.option("--before-id", type=int, default=None) @click.option("--dry-run/--no-dry-run", default=True) @click.option("--check-existence/--no-check-existence", default=False) def run( limit: "int | None", after: "str | None", before: "str | None", + after_id: "int | None", + before_id: "int | None", dry_run: bool, check_existence: bool, ) -> None: @@ -92,6 +102,8 @@ def run( limit, datetime.datetime.fromisoformat(after) if after else None, datetime.datetime.fromisoformat(before) if before else None, + after_id, + before_id, ) for block, max_id in blocks: From 8494ce0a9f8a72f8b04e0e7443dca0382b6867ce Mon Sep 17 00:00:00 2001 From: Squeaky Date: Thu, 5 Sep 2024 11:20:04 +0200 Subject: [PATCH 2/6] batching --- bin/remove-message-attachments.py | 23 ++++++++++++++++++++--- 1 file changed, 20 insertions(+), 3 deletions(-) diff --git a/bin/remove-message-attachments.py b/bin/remove-message-attachments.py index 1caad85d2..460a86477 100755 --- a/bin/remove-message-attachments.py +++ b/bin/remove-message-attachments.py @@ -17,7 +17,8 @@ log = get_logger() -BATCH_SIZE = 1000 +DEFAULT_DELETE_BATCH_SIZE = 100 +DEFAULT_BATCH_SIZE = 1000 class Resolution(enum.Enum): @@ -32,6 +33,7 @@ def find_blocks( before: "datetime.datetime | None", after_id: "int | None", before_id: "int | None", + batch_size: int, ) -> "Iterable[tuple[Block, int]]": query = ( Query([Block]) @@ -63,7 +65,7 @@ def find_blocks( with global_session_scope() as db_session: block_batch = ( query.filter(Block.id >= start_id) - .limit(min(limit, BATCH_SIZE) if limit is not None else BATCH_SIZE) + .limit(min(limit, batch_size) if limit is not None else batch_size) .with_session(db_session) .all() ) @@ -87,6 +89,8 @@ def find_blocks( @click.option("--before", type=str, default=None) @click.option("--after-id", type=int, default=None) @click.option("--before-id", type=int, default=None) +@click.option("--batch-size", type=int, default=DEFAULT_BATCH_SIZE) +@click.option("--delete-batch-size", type=int, default=DEFAULT_DELETE_BATCH_SIZE) @click.option("--dry-run/--no-dry-run", default=True) @click.option("--check-existence/--no-check-existence", default=False) def run( @@ -95,17 +99,25 @@ def run( before: "str | None", after_id: "int | None", before_id: "int | None", + batch_size: int, + delete_batch_size: int, dry_run: bool, check_existence: bool, ) -> None: + assert batch_size > 0 + assert delete_batch_size > 0 + blocks = find_blocks( limit, datetime.datetime.fromisoformat(after) if after else None, datetime.datetime.fromisoformat(before) if before else None, after_id, before_id, + batch_size, ) + delete_sha256s = set() + for block, max_id in blocks: if check_existence: data = blockstore.get_from_blockstore(block.data_sha256) @@ -127,7 +139,12 @@ def run( ) if resolution is Resolution.DELETE: - blockstore.delete_from_blockstore(block.data_sha256) + delete_sha256s.add(block.data_sha256) + + if len(delete_sha256s) >= delete_batch_size: + blockstore.delete_from_blockstore(*delete_sha256s) + delete_sha256s.clear() + print("Deleted batch") if __name__ == "__main__": From 3adcaf43090ae03d03289a3dcb849873e59c54f2 Mon Sep 17 00:00:00 2001 From: Squeaky Date: Thu, 5 Sep 2024 11:51:00 +0200 Subject: [PATCH 3/6] Extract delete_batch --- bin/remove-message-attachments.py | 19 +++++++++++++++---- 1 file changed, 15 insertions(+), 4 deletions(-) diff --git a/bin/remove-message-attachments.py b/bin/remove-message-attachments.py index 460a86477..4553bcdfe 100755 --- a/bin/remove-message-attachments.py +++ b/bin/remove-message-attachments.py @@ -24,7 +24,6 @@ class Resolution(enum.Enum): NOT_PRESENT = "not-present" DELETE = "delete" - WOULD_DELETE = "would-delete" def find_blocks( @@ -83,6 +82,17 @@ def find_blocks( start_id = block_batch[-1].id + 1 +def delete_batch(delete_sha256s: "set[str]", dry_run: bool) -> None: + if not delete_sha256s: + return + + if not dry_run: + blockstore.delete_from_blockstore(*delete_sha256s) + print("deleted", len(delete_sha256s), "blobs") + else: + print("would-delete", len(delete_sha256s), "blobs") + + @click.command() @click.option("--limit", type=int, default=None) @click.option("--after", type=str, default=None) @@ -127,7 +137,7 @@ def run( if data is None: resolution = Resolution.NOT_PRESENT else: - resolution = Resolution.DELETE if not dry_run else Resolution.WOULD_DELETE + resolution = Resolution.DELETE print( f"{block.id}/{max_id}", @@ -142,9 +152,10 @@ def run( delete_sha256s.add(block.data_sha256) if len(delete_sha256s) >= delete_batch_size: - blockstore.delete_from_blockstore(*delete_sha256s) + delete_batch(delete_sha256s, dry_run) delete_sha256s.clear() - print("Deleted batch") + + delete_batch(delete_sha256s, dry_run) if __name__ == "__main__": From 63a4faa6b86ec72d4cd5dc1748ec3d911609fcff Mon Sep 17 00:00:00 2001 From: Squeaky Date: Thu, 5 Sep 2024 13:00:35 +0200 Subject: [PATCH 4/6] seen_sha256 --- bin/remove-message-attachments.py | 12 ++++++++---- 1 file changed, 8 insertions(+), 4 deletions(-) diff --git a/bin/remove-message-attachments.py b/bin/remove-message-attachments.py index 4553bcdfe..c580d850a 100755 --- a/bin/remove-message-attachments.py +++ b/bin/remove-message-attachments.py @@ -57,7 +57,7 @@ def find_blocks( with global_session_scope() as db_session: max_id = db_session.query(func.max(inner_max_id_query.subquery().c.id)).scalar() - yielded = 0 + offset = 0 start_id = 1 if after_id is None else after_id while True: @@ -72,12 +72,16 @@ def find_blocks( if not block_batch: return + seen_sha256s = set() for block in block_batch: - if limit is not None and yielded >= limit: + if limit is not None and offset >= limit: return - yield block, max_id - yielded += 1 # noqa: SIM113 + if block.data_sha256 not in seen_sha256s: + yield block, max_id + seen_sha256s.add(block.data_sha256) + + offset += 1 # noqa: SIM113 start_id = block_batch[-1].id + 1 From 2635756fab00c3a8c12cb4c6fb434c2b1b8915cc Mon Sep 17 00:00:00 2001 From: Squeaky Date: Mon, 9 Sep 2024 10:51:30 +0200 Subject: [PATCH 5/6] repeat, don't query parts --- bin/remove-message-attachments.py | 88 ++++++++++++++++++------------- 1 file changed, 50 insertions(+), 38 deletions(-) diff --git a/bin/remove-message-attachments.py b/bin/remove-message-attachments.py index c580d850a..04f0e38b0 100755 --- a/bin/remove-message-attachments.py +++ b/bin/remove-message-attachments.py @@ -5,7 +5,7 @@ from collections.abc import Iterable import click -from sqlalchemy.orm import Query, joinedload +from sqlalchemy.orm import Query from sqlalchemy.sql import func from inbox.logging import configure_logging, get_logger @@ -36,7 +36,6 @@ def find_blocks( ) -> "Iterable[tuple[Block, int]]": query = ( Query([Block]) - .options(joinedload(Block.parts)) .filter(Block.size > 0) # empty blocks are not stored in S3 .order_by(Block.id) ) @@ -105,6 +104,7 @@ def delete_batch(delete_sha256s: "set[str]", dry_run: bool) -> None: @click.option("--before-id", type=int, default=None) @click.option("--batch-size", type=int, default=DEFAULT_BATCH_SIZE) @click.option("--delete-batch-size", type=int, default=DEFAULT_DELETE_BATCH_SIZE) +@click.option("--repeat", type=int, default=1) @click.option("--dry-run/--no-dry-run", default=True) @click.option("--check-existence/--no-check-existence", default=False) def run( @@ -115,51 +115,63 @@ def run( before_id: "int | None", batch_size: int, delete_batch_size: int, + repeat: int, dry_run: bool, check_existence: bool, -) -> None: +) -> int: assert batch_size > 0 assert delete_batch_size > 0 - blocks = find_blocks( - limit, - datetime.datetime.fromisoformat(after) if after else None, - datetime.datetime.fromisoformat(before) if before else None, - after_id, - before_id, - batch_size, - ) - - delete_sha256s = set() - - for block, max_id in blocks: - if check_existence: - data = blockstore.get_from_blockstore(block.data_sha256) - else: - data = ... # assume it exists, it's OK to delete non-existent data - - if data is None: - resolution = Resolution.NOT_PRESENT - else: - resolution = Resolution.DELETE - - print( - f"{block.id}/{max_id}", - block.created_at.date(), - resolution.value, - block.data_sha256, - block.size if data else None, - len(block.parts), + for repetition in range(repeat): + blocks = find_blocks( + limit, + datetime.datetime.fromisoformat(after) if after else None, + datetime.datetime.fromisoformat(before) if before else None, + after_id, + before_id, + batch_size, ) - if resolution is Resolution.DELETE: - delete_sha256s.add(block.data_sha256) + delete_sha256s = set() + + max_id = None + for block, max_id in blocks: + if check_existence: + data = blockstore.get_from_blockstore(block.data_sha256) + else: + data = ... # assume it exists, it's OK to delete non-existent data + + if data is None: + resolution = Resolution.NOT_PRESENT + else: + resolution = Resolution.DELETE + + print_arguments = [ + f"{block.id}/{max_id}", + block.created_at.date(), + resolution.value, + block.data_sha256, + block.size if data else None, + ] + + if repeat != 1: + print_arguments.insert(0, repetition) - if len(delete_sha256s) >= delete_batch_size: - delete_batch(delete_sha256s, dry_run) - delete_sha256s.clear() + print(*print_arguments) + + if resolution is Resolution.DELETE: + delete_sha256s.add(block.data_sha256) + + if len(delete_sha256s) >= delete_batch_size: + delete_batch(delete_sha256s, dry_run) + delete_sha256s.clear() + + delete_batch(delete_sha256s, dry_run) + + if max_id is None: + return - delete_batch(delete_sha256s, dry_run) + after_id = max_id + 1 if __name__ == "__main__": From 92d625216238982bd47c6c7650a0ad88d0fcfc7c Mon Sep 17 00:00:00 2001 From: Squeaky Date: Mon, 9 Sep 2024 10:58:54 +0200 Subject: [PATCH 6/6] thread pool executor for deletion --- bin/remove-message-attachments.py | 7 ++++++- 1 file changed, 6 insertions(+), 1 deletion(-) diff --git a/bin/remove-message-attachments.py b/bin/remove-message-attachments.py index 04f0e38b0..903bf4302 100755 --- a/bin/remove-message-attachments.py +++ b/bin/remove-message-attachments.py @@ -3,6 +3,7 @@ import enum import logging from collections.abc import Iterable +from concurrent.futures import ThreadPoolExecutor import click from sqlalchemy.orm import Query @@ -122,6 +123,8 @@ def run( assert batch_size > 0 assert delete_batch_size > 0 + delete_executor = ThreadPoolExecutor(max_workers=10) + for repetition in range(repeat): blocks = find_blocks( limit, @@ -163,7 +166,7 @@ def run( delete_sha256s.add(block.data_sha256) if len(delete_sha256s) >= delete_batch_size: - delete_batch(delete_sha256s, dry_run) + delete_executor.submit(delete_batch, delete_sha256s.copy(), dry_run) delete_sha256s.clear() delete_batch(delete_sha256s, dry_run) @@ -173,6 +176,8 @@ def run( after_id = max_id + 1 + delete_executor.shutdown(wait=True) + if __name__ == "__main__": run()