Skip to content

Commit

Permalink
Update cli.py
Browse files Browse the repository at this point in the history
  • Loading branch information
gabrielBusta committed Nov 21, 2023
1 parent 4c4dd71 commit 53bdae1
Showing 1 changed file with 140 additions and 10 deletions.
150 changes: 140 additions & 10 deletions src/mozilla_linux_pkg_manager/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,16 @@
import asyncio
import logging
import os
import random
from collections.abc import Awaitable, Callable, Sequence
from datetime import datetime, timedelta
from itertools import islice
from pprint import pformat
from typing import (
Any,
Optional,
Union,
)
from urllib.parse import urljoin

import aiohttp
Expand All @@ -18,28 +25,138 @@
)


def calculate_sleep_time(
attempt, delay_factor=5.0, randomization_factor=0.5, max_delay=120
):
"""Calculate the sleep time between retries, in seconds.
Based off of `taskcluster.utils.calculateSleepTime`, but with kwargs instead
of constant `delay_factor`/`randomization_factor`/`max_delay`. The taskcluster
function generally slept for less than a second, which didn't always get
past server issues.
Args:
attempt (int): the retry attempt number
delay_factor (float, optional): a multiplier for the delay time. Defaults to 5.
randomization_factor (float, optional): a randomization multiplier for the
delay time. Defaults to .5.
max_delay (float, optional): the max delay to sleep. Defaults to 120 (seconds).
Returns:
float: the time to sleep, in seconds.
"""
if attempt <= 0:
return 0

# We subtract one to get exponents: 1, 2, 3, 4, 5, ..
delay = float(2 ** (attempt - 1)) * float(delay_factor)
# Apply randomization factor. Only increase the delay here.
delay = delay * (randomization_factor * random.random() + 1)
# Always limit with a maximum delay
return min(delay, max_delay)


async def retry_async(
func: Callable[..., Awaitable[Any]],
attempts: int = 5,
sleeptime_callback: Callable[..., Any] = calculate_sleep_time,
retry_exceptions: Union[
type[BaseException], tuple[type[BaseException], ...]
] = Exception,
args: Sequence[Any] = (),
kwargs: Optional[dict[str, Any]] = None,
sleeptime_kwargs: Optional[dict[str, Any]] = None,
) -> Any:
"""Retry ``func``, where ``func`` is an awaitable.
Args:
func (function): an awaitable function.
attempts (int, optional): the number of attempts to make. Default is 5.
sleeptime_callback (function, optional): the function to use to determine
how long to sleep after each attempt. Defaults to ``calculateSleepTime``.
retry_exceptions (list or exception, optional): the exception(s) to retry on.
Defaults to ``Exception``.
args (list, optional): the args to pass to ``func``. Defaults to ()
kwargs (dict, optional): the kwargs to pass to ``func``. Defaults to
{}.
sleeptime_kwargs (dict, optional): the kwargs to pass to ``sleeptime_callback``.
If None, use {}. Defaults to None.
Returns:
object: the value from a successful ``function`` call
Raises:
Exception: the exception from a failed ``function`` call, either outside
of the retry_exceptions, or one of those if we pass the max
``attempts``.
"""
kwargs = kwargs or {}
attempt = 1
while True:
try:
return await func(*args, **kwargs)
except retry_exceptions:
attempt += 1
check_number_of_attempts(attempt, attempts, func, "retry_async")
await asyncio.sleep(
define_sleep_time(
sleeptime_kwargs, sleeptime_callback, attempt, func, "retry_async"
)
)


def check_number_of_attempts(
attempt: int, attempts: int, func: Callable[..., Any], retry_function_name: str
) -> None:
if attempt > attempts:
logging.warning(f"{retry_function_name}: {func.__name__}: too many retries!")
raise


def define_sleep_time(
sleeptime_kwargs: Optional[dict[str, Any]],
sleeptime_callback: Callable[..., int],
attempt: int,
func: Callable[..., Any],
retry_function_name: str,
) -> float:
sleeptime_kwargs = sleeptime_kwargs or {}
sleep_time = sleeptime_callback(attempt, **sleeptime_kwargs)
logging.debug(
"{}: {}: sleeping {} seconds before retry".format(
retry_function_name, func.__name__, sleep_time
)
)
return sleep_time


async def batch_delete_versions(versions, dry_run):
client = artifactregistry_v1.ArtifactRegistryAsyncClient()
request = artifactregistry_v1.BatchDeleteVersionsRequest(
names=versions,
)
display_versions = [
await retry_async(
client.get_version,
kwargs={"request": artifactregistry_v1.GetVersionRequest(name=version)},
)
for version in random.sample(versions, 3)
]
if not dry_run:
logging.info(
f"Deleting {format(len(versions), ',')} expired package versions similar to:\n{str(display_versions)}"
)
operation = client.batch_delete_versions(request=request)
response = (await operation).result()
return response
result = (await operation).result()
logging.info(f"result: {str(result)}")
logging.info(
f"batch_delete_versions is a no-op in dry-run mode, 'deleting' {format(len(versions), ',')} expired package versions"
f"batch_delete_versions is a no-op in dry-run mode!\nDeleting {format(len(versions), ',')} expired package versions similar to:\n{str(display_versions)}"
)
return versions


async def get_repository(args):
client = artifactregistry_v1.ArtifactRegistryAsyncClient()
parent = f"projects/{os.environ['GOOGLE_CLOUD_PROJECT']}/locations/{args.region}/repositories/{args.repository}"
get_repo_request = artifactregistry_v1.GetRepositoryRequest(
get_repository_request = artifactregistry_v1.GetRepositoryRequest(
name=parent,
)
repository = await client.get_repository(request=get_repo_request)
repository = await client.get_repository(request=get_repository_request)
return repository


Expand Down Expand Up @@ -85,18 +202,29 @@ def parse_key_value_block(block):


async def delete_nightly_versions(args):
url = "https://packages.mozilla.org/apt/dists/mozilla"
url = f"https://{args.region}-apt.pkg.dev/projects/{os.environ['GOOGLE_CLOUD_PROJECT']}/dists/{args.repository}"
normalized_url = f"{url}/" if not url.endswith("/") else url
release_url = urljoin(normalized_url, "Release")
try:
raw_release_data = await fetch_url(release_url)
logging.info(f"Fetching raw_release_data at {url}")
raw_release_data = await retry_async(
fetch_url,
args=[release_url],
attempts=3,
)
parsed_release_data = yaml.safe_load(raw_release_data)
logging.info(f"parsed_release_data:\n{pformat(parsed_release_data)}")
architectures = parsed_release_data["Architectures"].split()
package_data_promises = []
for architecture in architectures:
pkg_url = f"{normalized_url}main/binary-{architecture}/Packages"
package_data_promises.append(fetch_url(pkg_url))
package_data_promises.append(
retry_async(
fetch_url,
args=[pkg_url],
attempts=3,
)
)
package_data_results = await asyncio.gather(*package_data_promises)
package_data = []
for architecture, package_data_result in zip(
Expand Down Expand Up @@ -127,7 +255,7 @@ async def delete_nightly_versions(args):
logging.info(f"repository:\n{str(repository)}")
batches = batched(targets, 10000)
for batch in batches:
await batch_delete_versions(batch, True)
await batch_delete_versions(batch, args.dry_run)
except Exception as e:
logging.error(f"An error occurred: {e}")

Expand Down Expand Up @@ -183,6 +311,7 @@ def main():
"--dry-run",
action="store_true",
help="Do a no-op run and print out a summary of the operations that will be executed",
default=False,
)

args = parser.parse_args()
Expand All @@ -203,5 +332,6 @@ def main():
"Retention days must be specified for the nightly channel"
)
asyncio.run(delete_nightly_versions(args))
logging.info("Done cleaning up")
else:
raise ValueError("Only the nightly channel is supported")

0 comments on commit 53bdae1

Please sign in to comment.