Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Remove message attachments continue #876

Draft
wants to merge 6 commits into
base: master
Choose a base branch
from
Draft
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
139 changes: 100 additions & 39 deletions bin/remove-message-attachments.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,10 @@
import enum
import logging
from collections.abc import Iterable
from concurrent.futures import ThreadPoolExecutor

import click
from sqlalchemy.orm import Query, joinedload
from sqlalchemy.orm import Query
from sqlalchemy.sql import func

from inbox.logging import configure_logging, get_logger
Expand All @@ -17,23 +18,25 @@
log = get_logger()


BATCH_SIZE = 1000
DEFAULT_DELETE_BATCH_SIZE = 100
DEFAULT_BATCH_SIZE = 1000


class Resolution(enum.Enum):
NOT_PRESENT = "not-present"
DELETE = "delete"
WOULD_DELETE = "would-delete"


def find_blocks(
limit: "int | None",
after: "datetime.datetime | None",
before: "datetime.datetime | None",
after_id: "int | None",
before_id: "int | None",
batch_size: int,
) -> "Iterable[tuple[Block, int]]":
query = (
Query([Block])
.options(joinedload(Block.parts))
.filter(Block.size > 0) # empty blocks are not stored in S3
.order_by(Block.id)
)
Expand All @@ -42,6 +45,10 @@ def find_blocks(
query = query.filter(Block.created_at >= after)
if before:
query = query.filter(Block.created_at < before)
if after_id:
query = query.filter(Block.id >= after_id)
if before_id:
query = query.filter(Block.id < before_id)

inner_max_id_query = query.with_entities(Block.id)
if limit is not None:
Expand All @@ -50,72 +57,126 @@ def find_blocks(
with global_session_scope() as db_session:
max_id = db_session.query(func.max(inner_max_id_query.subquery().c.id)).scalar()

yielded = 0
last_id = 0
offset = 0
start_id = 1 if after_id is None else after_id

while True:
with global_session_scope() as db_session:
block_batch = (
query.filter(Block.id > last_id)
.limit(min(limit, BATCH_SIZE) if limit is not None else BATCH_SIZE)
query.filter(Block.id >= start_id)
.limit(min(limit, batch_size) if limit is not None else batch_size)
.with_session(db_session)
.all()
)

if not block_batch:
return

seen_sha256s = set()
for block in block_batch:
if limit is not None and yielded >= limit:
if limit is not None and offset >= limit:
return

yield block, max_id
yielded += 1 # noqa: SIM113
if block.data_sha256 not in seen_sha256s:
yield block, max_id
seen_sha256s.add(block.data_sha256)

last_id = block_batch[-1].id
offset += 1 # noqa: SIM113

start_id = block_batch[-1].id + 1


def delete_batch(delete_sha256s: "set[str]", dry_run: bool) -> None:
if not delete_sha256s:
return

if not dry_run:
blockstore.delete_from_blockstore(*delete_sha256s)
print("deleted", len(delete_sha256s), "blobs")
else:
print("would-delete", len(delete_sha256s), "blobs")


@click.command()
@click.option("--limit", type=int, default=None)
@click.option("--after", type=str, default=None)
@click.option("--before", type=str, default=None)
@click.option("--after-id", type=int, default=None)
@click.option("--before-id", type=int, default=None)
@click.option("--batch-size", type=int, default=DEFAULT_BATCH_SIZE)
@click.option("--delete-batch-size", type=int, default=DEFAULT_DELETE_BATCH_SIZE)
@click.option("--repeat", type=int, default=1)
@click.option("--dry-run/--no-dry-run", default=True)
@click.option("--check-existence/--no-check-existence", default=False)
def run(
limit: "int | None",
after: "str | None",
before: "str | None",
after_id: "int | None",
before_id: "int | None",
batch_size: int,
delete_batch_size: int,
repeat: int,
dry_run: bool,
check_existence: bool,
) -> None:
blocks = find_blocks(
limit,
datetime.datetime.fromisoformat(after) if after else None,
datetime.datetime.fromisoformat(before) if before else None,
)

for block, max_id in blocks:
if check_existence:
data = blockstore.get_from_blockstore(block.data_sha256)
else:
data = ... # assume it exists, it's OK to delete non-existent data

if data is None:
resolution = Resolution.NOT_PRESENT
else:
resolution = Resolution.DELETE if not dry_run else Resolution.WOULD_DELETE

print(
f"{block.id}/{max_id}",
block.created_at.date(),
resolution.value,
block.data_sha256,
block.size if data else None,
len(block.parts),
) -> int:
assert batch_size > 0
assert delete_batch_size > 0

delete_executor = ThreadPoolExecutor(max_workers=10)

for repetition in range(repeat):
blocks = find_blocks(
limit,
datetime.datetime.fromisoformat(after) if after else None,
datetime.datetime.fromisoformat(before) if before else None,
after_id,
before_id,
batch_size,
)

if resolution is Resolution.DELETE:
blockstore.delete_from_blockstore(block.data_sha256)
delete_sha256s = set()

max_id = None
for block, max_id in blocks:
if check_existence:
data = blockstore.get_from_blockstore(block.data_sha256)
else:
data = ... # assume it exists, it's OK to delete non-existent data

if data is None:
resolution = Resolution.NOT_PRESENT
else:
resolution = Resolution.DELETE

print_arguments = [
f"{block.id}/{max_id}",
block.created_at.date(),
resolution.value,
block.data_sha256,
block.size if data else None,
]

if repeat != 1:
print_arguments.insert(0, repetition)

print(*print_arguments)

if resolution is Resolution.DELETE:
delete_sha256s.add(block.data_sha256)

if len(delete_sha256s) >= delete_batch_size:
delete_executor.submit(delete_batch, delete_sha256s.copy(), dry_run)
delete_sha256s.clear()

delete_batch(delete_sha256s, dry_run)

if max_id is None:
return

after_id = max_id + 1

delete_executor.shutdown(wait=True)


if __name__ == "__main__":
Expand Down