diff --git a/src/mozilla_linux_pkg_manager/cli.py b/src/mozilla_linux_pkg_manager/cli.py index 2135e65..3887f50 100644 --- a/src/mozilla_linux_pkg_manager/cli.py +++ b/src/mozilla_linux_pkg_manager/cli.py @@ -2,9 +2,16 @@ import asyncio import logging import os +import random +from collections.abc import Awaitable, Callable, Sequence from datetime import datetime, timedelta from itertools import islice from pprint import pformat +from typing import ( + Any, + Optional, + Union, +) from urllib.parse import urljoin import aiohttp @@ -18,28 +25,138 @@ ) +def calculate_sleep_time( + attempt, delay_factor=5.0, randomization_factor=0.5, max_delay=120 +): + """Calculate the sleep time between retries, in seconds. + + Based off of `taskcluster.utils.calculateSleepTime`, but with kwargs instead + of constant `delay_factor`/`randomization_factor`/`max_delay`. The taskcluster + function generally slept for less than a second, which didn't always get + past server issues. + Args: + attempt (int): the retry attempt number + delay_factor (float, optional): a multiplier for the delay time. Defaults to 5. + randomization_factor (float, optional): a randomization multiplier for the + delay time. Defaults to .5. + max_delay (float, optional): the max delay to sleep. Defaults to 120 (seconds). + Returns: + float: the time to sleep, in seconds. + """ + if attempt <= 0: + return 0 + + # We subtract one to get exponents: 1, 2, 3, 4, 5, .. + delay = float(2 ** (attempt - 1)) * float(delay_factor) + # Apply randomization factor. Only increase the delay here. + delay = delay * (randomization_factor * random.random() + 1) + # Always limit with a maximum delay + return min(delay, max_delay) + + +async def retry_async( + func: Callable[..., Awaitable[Any]], + attempts: int = 5, + sleeptime_callback: Callable[..., Any] = calculate_sleep_time, + retry_exceptions: Union[ + type[BaseException], tuple[type[BaseException], ...] + ] = Exception, + args: Sequence[Any] = (), + kwargs: Optional[dict[str, Any]] = None, + sleeptime_kwargs: Optional[dict[str, Any]] = None, +) -> Any: + """Retry ``func``, where ``func`` is an awaitable. + + Args: + func (function): an awaitable function. + attempts (int, optional): the number of attempts to make. Default is 5. + sleeptime_callback (function, optional): the function to use to determine + how long to sleep after each attempt. Defaults to ``calculateSleepTime``. + retry_exceptions (list or exception, optional): the exception(s) to retry on. + Defaults to ``Exception``. + args (list, optional): the args to pass to ``func``. Defaults to () + kwargs (dict, optional): the kwargs to pass to ``func``. Defaults to + {}. + sleeptime_kwargs (dict, optional): the kwargs to pass to ``sleeptime_callback``. + If None, use {}. Defaults to None. + Returns: + object: the value from a successful ``function`` call + Raises: + Exception: the exception from a failed ``function`` call, either outside + of the retry_exceptions, or one of those if we pass the max + ``attempts``. + """ + kwargs = kwargs or {} + attempt = 1 + while True: + try: + return await func(*args, **kwargs) + except retry_exceptions: + attempt += 1 + check_number_of_attempts(attempt, attempts, func, "retry_async") + await asyncio.sleep( + define_sleep_time( + sleeptime_kwargs, sleeptime_callback, attempt, func, "retry_async" + ) + ) + + +def check_number_of_attempts( + attempt: int, attempts: int, func: Callable[..., Any], retry_function_name: str +) -> None: + if attempt > attempts: + logging.warning(f"{retry_function_name}: {func.__name__}: too many retries!") + raise + + +def define_sleep_time( + sleeptime_kwargs: Optional[dict[str, Any]], + sleeptime_callback: Callable[..., int], + attempt: int, + func: Callable[..., Any], + retry_function_name: str, +) -> float: + sleeptime_kwargs = sleeptime_kwargs or {} + sleep_time = sleeptime_callback(attempt, **sleeptime_kwargs) + logging.debug( + "{}: {}: sleeping {} seconds before retry".format( + retry_function_name, func.__name__, sleep_time + ) + ) + return sleep_time + + async def batch_delete_versions(versions, dry_run): client = artifactregistry_v1.ArtifactRegistryAsyncClient() request = artifactregistry_v1.BatchDeleteVersionsRequest( names=versions, ) + display_versions = [ + await retry_async( + client.get_version, + kwargs={"request": artifactregistry_v1.GetVersionRequest(name=version)}, + ) + for version in random.sample(versions, 3) + ] if not dry_run: + logging.info( + f"Deleting {format(len(versions), ',')} expired package versions similar to:\n{str(display_versions)}" + ) operation = client.batch_delete_versions(request=request) - response = (await operation).result() - return response + result = (await operation).result() + logging.info(f"result: {str(result)}") logging.info( - f"batch_delete_versions is a no-op in dry-run mode, 'deleting' {format(len(versions), ',')} expired package versions" + f"batch_delete_versions is a no-op in dry-run mode!\nDeleting {format(len(versions), ',')} expired package versions similar to:\n{str(display_versions)}" ) - return versions async def get_repository(args): client = artifactregistry_v1.ArtifactRegistryAsyncClient() parent = f"projects/{os.environ['GOOGLE_CLOUD_PROJECT']}/locations/{args.region}/repositories/{args.repository}" - get_repo_request = artifactregistry_v1.GetRepositoryRequest( + get_repository_request = artifactregistry_v1.GetRepositoryRequest( name=parent, ) - repository = await client.get_repository(request=get_repo_request) + repository = await client.get_repository(request=get_repository_request) return repository @@ -85,18 +202,29 @@ def parse_key_value_block(block): async def delete_nightly_versions(args): - url = "https://packages.mozilla.org/apt/dists/mozilla" + url = f"https://{args.region}-apt.pkg.dev/projects/{os.environ['GOOGLE_CLOUD_PROJECT']}/dists/{args.repository}" normalized_url = f"{url}/" if not url.endswith("/") else url release_url = urljoin(normalized_url, "Release") try: - raw_release_data = await fetch_url(release_url) + logging.info(f"Fetching raw_release_data at {url}") + raw_release_data = await retry_async( + fetch_url, + args=[release_url], + attempts=3, + ) parsed_release_data = yaml.safe_load(raw_release_data) logging.info(f"parsed_release_data:\n{pformat(parsed_release_data)}") architectures = parsed_release_data["Architectures"].split() package_data_promises = [] for architecture in architectures: pkg_url = f"{normalized_url}main/binary-{architecture}/Packages" - package_data_promises.append(fetch_url(pkg_url)) + package_data_promises.append( + retry_async( + fetch_url, + args=[pkg_url], + attempts=3, + ) + ) package_data_results = await asyncio.gather(*package_data_promises) package_data = [] for architecture, package_data_result in zip( @@ -127,7 +255,7 @@ async def delete_nightly_versions(args): logging.info(f"repository:\n{str(repository)}") batches = batched(targets, 10000) for batch in batches: - await batch_delete_versions(batch, True) + await batch_delete_versions(batch, args.dry_run) except Exception as e: logging.error(f"An error occurred: {e}") @@ -183,6 +311,7 @@ def main(): "--dry-run", action="store_true", help="Do a no-op run and print out a summary of the operations that will be executed", + default=False, ) args = parser.parse_args() @@ -203,5 +332,6 @@ def main(): "Retention days must be specified for the nightly channel" ) asyncio.run(delete_nightly_versions(args)) + logging.info("Done cleaning up") else: raise ValueError("Only the nightly channel is supported")