From 7c5c4f73a94583407573032c708178e67ee6dad0 Mon Sep 17 00:00:00 2001 From: Thomas Carmet <8408330+tcarmet@users.noreply.github.com> Date: Mon, 4 Dec 2023 19:28:23 +0000 Subject: [PATCH] PTFE-1196 reduce impact of missed webhooks --- .devcontainer/devcontainer.json | 2 +- runner_manager/jobs/workflow_job.py | 21 ++++++++- runner_manager/models/runner_group.py | 5 +++ runner_manager/models/settings.py | 2 +- tests/strategies.py | 3 +- tests/unit/jobs/test_workflow_job.py | 60 +++++++++++++++++++++++++- tests/unit/models/test_runner_group.py | 11 +++++ 7 files changed, 99 insertions(+), 5 deletions(-) diff --git a/.devcontainer/devcontainer.json b/.devcontainer/devcontainer.json index 0d0fc11a..2fd4468c 100644 --- a/.devcontainer/devcontainer.json +++ b/.devcontainer/devcontainer.json @@ -56,7 +56,7 @@ "typeCheckingMode": "basic" }, "editor": { - "defaultFormatter": "ms-python.python" + "defaultFormatter": "ms-python.black-formatter" } } } diff --git a/runner_manager/jobs/workflow_job.py b/runner_manager/jobs/workflow_job.py index 673af9f2..cd6ee802 100644 --- a/runner_manager/jobs/workflow_job.py +++ b/runner_manager/jobs/workflow_job.py @@ -1,6 +1,7 @@ from __future__ import annotations import logging +from datetime import timedelta from githubkit.webhooks.models import ( WorkflowJobCompleted, @@ -9,8 +10,9 @@ ) from githubkit.webhooks.types import WorkflowJobEvent +from runner_manager import Settings from runner_manager.clients.github import GitHub -from runner_manager.dependencies import get_github +from runner_manager.dependencies import get_github, get_settings from runner_manager.models.runner import Runner from runner_manager.models.runner_group import RunnerGroup @@ -25,6 +27,11 @@ def log_workflow_job(webhook: WorkflowJobEvent) -> None: ) +def time_to_start(webhook: WorkflowJobInProgress | WorkflowJobCompleted) -> timedelta: + """From a given webhook, calculate the time it took to start the job""" + return webhook.workflow_job.started_at - webhook.workflow_job.created_at + + def completed(webhook: WorkflowJobCompleted) -> int: log_workflow_job(webhook) runner: Runner | None = Runner.find_from_webhook(webhook) @@ -47,6 +54,7 @@ def completed(webhook: WorkflowJobCompleted) -> int: def in_progress(webhook: WorkflowJobInProgress) -> str | None: log_workflow_job(webhook) + settings: Settings = get_settings() name: str | None = webhook.workflow_job.runner_name runner_group: RunnerGroup | None = RunnerGroup.find_from_webhook(webhook) if not runner_group: @@ -55,6 +63,17 @@ def in_progress(webhook: WorkflowJobInProgress) -> str | None: log.info(f"Updating runner {name} in group {runner_group.name}") runner: Runner = runner_group.update_runner(webhook=webhook) log.info(f"Runner {name} in group {runner_group.name} has been updated") + tts = time_to_start(webhook) + log.info(f"{runner} took {tts} to start") + # If the time to start is greater than settings.timeout_runner, + # create an extra runner. + # The main reason we perform this action is to ensure that + # in the case we have missed a webhook, we still have a runner + # available for the jobs that are requesting it. + if tts > settings.timeout_runner and runner_group.is_full is False: + log.info(f"Runner group {runner_group.name} needs a new runner") + github: GitHub = get_github() + runner_group.create_runner(github) return runner.pk diff --git a/runner_manager/models/runner_group.py b/runner_manager/models/runner_group.py index 0434efc0..eb1a526f 100644 --- a/runner_manager/models/runner_group.py +++ b/runner_manager/models/runner_group.py @@ -219,6 +219,11 @@ def need_new_runner(self) -> bool: count = len(runners) return (not_active < self.min or self.queued > 0) and count < self.max + @property + def is_full(self) -> bool: + """Return True if the max number of runners has been reached.""" + return len(self.get_runners()) >= self.max + def create_github_group(self, github: GitHub) -> GitHubRunnerGroup: """Create a GitHub runner group.""" diff --git a/runner_manager/models/settings.py b/runner_manager/models/settings.py index 1730bbb5..2f458c88 100644 --- a/runner_manager/models/settings.py +++ b/runner_manager/models/settings.py @@ -31,7 +31,7 @@ class Settings(BaseSettings): api_key: Optional[SecretStr] = None log_level: Literal["INFO", "WARNING", "DEBUG", "ERROR"] = "INFO" runner_groups: List[BaseRunnerGroup] = [] - timeout_runner: Optional[timedelta] = timedelta(minutes=15) + timeout_runner: timedelta = timedelta(minutes=15) time_to_live: Optional[timedelta] = timedelta(hours=12) healthcheck_interval: timedelta = timedelta(minutes=15) github_base_url: Optional[AnyHttpUrl] = Field(default="https://api.github.com") diff --git a/tests/strategies.py b/tests/strategies.py index db43babf..effd6adb 100644 --- a/tests/strategies.py +++ b/tests/strategies.py @@ -80,6 +80,7 @@ class Repo(Repository): runner_group_id=Int, labels=st.lists(Text, min_size=1, max_size=5), started_at=st.datetimes(), + created_at=st.datetimes(), ) JobPropQueuedStrategy = st.builds( @@ -127,7 +128,7 @@ class Repo(Repository): github_base_url=st.just("http://localhost:4010"), github_token=st.just("test"), time_to_live=st.integers(1, 60), - timeout_runner=st.integers(1, 10), + timeout_runner=st.integers(120, 600), ) RedisStrategy = st.builds( diff --git a/tests/unit/jobs/test_workflow_job.py b/tests/unit/jobs/test_workflow_job.py index 35916ddb..15e786c1 100644 --- a/tests/unit/jobs/test_workflow_job.py +++ b/tests/unit/jobs/test_workflow_job.py @@ -1,3 +1,4 @@ +from datetime import timedelta from time import sleep from uuid import uuid4 @@ -103,7 +104,6 @@ def test_workflow_job_completed( def test_workflow_job_in_progress( webhook: WorkflowJobInProgress, queue: Queue, settings: Settings, redis: Redis ): - # flush all keys that start with settings.name in redis init_model(Runner, redis, settings) @@ -197,3 +197,61 @@ def test_workflow_job_queued( ).first() assert runner.busy is False assert runner.status == "offline" + + +@settings(max_examples=10) +@given( + webhook=WorkflowJobInProgressStrategy, + queue=QueueStrategy, + settings=SettingsStrategy, + redis=RedisStrategy, +) +def test_workflow_job_time_to_start( + webhook: WorkflowJobInProgress, queue: Queue, settings: Settings, redis: Redis +): + """ + This test will ensure that an extra runner is created when the time to start + the given workflow was higher than settings.timeout_runners. + """ + init_model(Runner, redis, settings) + init_model(RunnerGroup, redis, settings) + runner_group: RunnerGroup = RunnerGroup( + organization=webhook.organization.login, + name=webhook.workflow_job.runner_group_name, + id=webhook.workflow_job.runner_group_id, + labels=webhook.workflow_job.labels, + manager=settings.name, + backend={"name": "base"}, + max=2, + ) + runner_group.save() + Migrator().run() + + runner: Runner = Runner( + id=webhook.workflow_job.runner_id, + name=webhook.workflow_job.runner_name, + busy=False, + status="online", + manager=settings.name, + runner_group_id=webhook.workflow_job.runner_group_id, + runner_group_name=webhook.workflow_job.runner_group_name, + ) + runner.save() + Migrator().run() + + assert len(runner_group.get_runners()) == 1 + # ensure we have only one runner if the time to start is less than timeout_runners + webhook.workflow_job.started_at = webhook.workflow_job.created_at + ( + settings.timeout_runner - timedelta(minutes=15) + ) + queue.enqueue(workflow_job.in_progress, webhook) + assert len(runner_group.get_runners()) == 1 + # ensure we have two runners if the time to start is greater than timeout_runners + webhook.workflow_job.started_at = webhook.workflow_job.created_at + ( + settings.timeout_runner + timedelta(minutes=15) + ) + queue.enqueue(workflow_job.in_progress, webhook) + assert len(runner_group.get_runners()) == 2 + # ensure we remain with two runners given that the max for the runner group is 2 + queue.enqueue(workflow_job.in_progress, webhook) + assert len(runner_group.get_runners()) == 2 diff --git a/tests/unit/models/test_runner_group.py b/tests/unit/models/test_runner_group.py index af5cde02..d86f5661 100644 --- a/tests/unit/models/test_runner_group.py +++ b/tests/unit/models/test_runner_group.py @@ -214,3 +214,14 @@ def test_find_github_group(runner_group: RunnerGroup, github: GitHub): assert exists is not None group = runner_group.save(github=github) assert exists.id == group.id + + +def test_is_full(runner_group: RunnerGroup, github: GitHub): + runner_group.max = 2 + runner_group.min = 1 + runner_group.save() + assert runner_group.is_full is False + runner_group.create_runner(github) + assert runner_group.is_full is False + runner_group.create_runner(github) + assert runner_group.is_full is True