Skip to content

Commit

Permalink
PTFE-1196 reduce impact of missed webhooks
Browse files Browse the repository at this point in the history
  • Loading branch information
tcarmet committed Dec 4, 2023
1 parent 11e6771 commit 24e056a
Show file tree
Hide file tree
Showing 7 changed files with 99 additions and 5 deletions.
2 changes: 1 addition & 1 deletion .devcontainer/devcontainer.json
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@
"typeCheckingMode": "basic"
},
"editor": {
"defaultFormatter": "ms-python.python"
"defaultFormatter": "ms-python.black-formatter"
}
}
}
Expand Down
21 changes: 20 additions & 1 deletion runner_manager/jobs/workflow_job.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
from __future__ import annotations

import logging
from datetime import timedelta

from githubkit.webhooks.models import (
WorkflowJobCompleted,
Expand All @@ -9,8 +10,9 @@
)
from githubkit.webhooks.types import WorkflowJobEvent

from runner_manager import Settings
from runner_manager.clients.github import GitHub
from runner_manager.dependencies import get_github
from runner_manager.dependencies import get_github, get_settings
from runner_manager.models.runner import Runner
from runner_manager.models.runner_group import RunnerGroup

Expand All @@ -25,6 +27,11 @@ def log_workflow_job(webhook: WorkflowJobEvent) -> None:
)


def time_to_start(webhook: WorkflowJobInProgress | WorkflowJobCompleted) -> timedelta:
"""From a given webhook, calculate the time it took to start the job"""
return webhook.workflow_job.started_at - webhook.workflow_job.created_at


def completed(webhook: WorkflowJobCompleted) -> int:
log_workflow_job(webhook)
runner: Runner | None = Runner.find_from_webhook(webhook)
Expand All @@ -47,6 +54,7 @@ def completed(webhook: WorkflowJobCompleted) -> int:

def in_progress(webhook: WorkflowJobInProgress) -> str | None:
log_workflow_job(webhook)
settings: Settings = get_settings()
name: str | None = webhook.workflow_job.runner_name
runner_group: RunnerGroup | None = RunnerGroup.find_from_webhook(webhook)
if not runner_group:
Expand All @@ -55,6 +63,17 @@ def in_progress(webhook: WorkflowJobInProgress) -> str | None:
log.info(f"Updating runner {name} in group {runner_group.name}")
runner: Runner = runner_group.update_runner(webhook=webhook)
log.info(f"Runner {name} in group {runner_group.name} has been updated")
tts = time_to_start(webhook)
log.info(f"{runner} took {tts} to start")
# If the time to start is greater than settings.timeout_runner,
# create an extra runner.
# The main reason we perform this action is to ensure that
# in the case we have missed a webhook, we still have a runner
# available for the jobs that are requesting it.
if tts > settings.timeout_runner and runner_group.is_full is False:
log.info(f"Runner group {runner_group.name} needs a new runner")
github: GitHub = get_github()
runner_group.create_runner(github)
return runner.pk


Expand Down
5 changes: 5 additions & 0 deletions runner_manager/models/runner_group.py
Original file line number Diff line number Diff line change
Expand Up @@ -219,6 +219,11 @@ def need_new_runner(self) -> bool:
count = len(runners)
return (not_active < self.min or self.queued > 0) and count < self.max

@property
def is_full(self) -> bool:
"""Return True if the max number of runners has been reached."""
return len(self.get_runners()) >= self.max

def create_github_group(self, github: GitHub) -> GitHubRunnerGroup:
"""Create a GitHub runner group."""

Expand Down
2 changes: 1 addition & 1 deletion runner_manager/models/settings.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ class Settings(BaseSettings):
api_key: Optional[SecretStr] = None
log_level: Literal["INFO", "WARNING", "DEBUG", "ERROR"] = "INFO"
runner_groups: List[BaseRunnerGroup] = []
timeout_runner: Optional[timedelta] = timedelta(minutes=15)
timeout_runner: timedelta = timedelta(minutes=15)
time_to_live: Optional[timedelta] = timedelta(hours=12)
healthcheck_interval: timedelta = timedelta(minutes=15)
github_base_url: Optional[AnyHttpUrl] = Field(default="https://api.github.com")
Expand Down
3 changes: 2 additions & 1 deletion tests/strategies.py
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,7 @@ class Repo(Repository):
runner_group_id=Int,
labels=st.lists(Text, min_size=1, max_size=5),
started_at=st.datetimes(),
created_at=st.datetimes(),
)

JobPropQueuedStrategy = st.builds(
Expand Down Expand Up @@ -127,7 +128,7 @@ class Repo(Repository):
github_base_url=st.just("http://localhost:4010"),
github_token=st.just("test"),
time_to_live=st.integers(1, 60),
timeout_runner=st.integers(1, 10),
timeout_runner=st.integers(120, 600),
)

RedisStrategy = st.builds(
Expand Down
60 changes: 59 additions & 1 deletion tests/unit/jobs/test_workflow_job.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
from datetime import timedelta
from time import sleep
from uuid import uuid4

Expand Down Expand Up @@ -103,7 +104,6 @@ def test_workflow_job_completed(
def test_workflow_job_in_progress(
webhook: WorkflowJobInProgress, queue: Queue, settings: Settings, redis: Redis
):

# flush all keys that start with settings.name in redis

init_model(Runner, redis, settings)
Expand Down Expand Up @@ -197,3 +197,61 @@ def test_workflow_job_queued(
).first()
assert runner.busy is False
assert runner.status == "offline"


@settings(max_examples=10)
@given(
webhook=WorkflowJobInProgressStrategy,
queue=QueueStrategy,
settings=SettingsStrategy,
redis=RedisStrategy,
)
def test_workflow_job_time_to_start(
webhook: WorkflowJobInProgress, queue: Queue, settings: Settings, redis: Redis
):
"""
This test will ensure that an extra runner is created when the time to start
the given workflow was higher than settings.timeout_runners.
"""
init_model(Runner, redis, settings)
init_model(RunnerGroup, redis, settings)
runner_group: RunnerGroup = RunnerGroup(
organization=webhook.organization.login,
name=webhook.workflow_job.runner_group_name,
id=webhook.workflow_job.runner_group_id,
labels=webhook.workflow_job.labels,
manager=settings.name,
backend={"name": "base"},
max=2,
)
runner_group.save()
Migrator().run()

runner: Runner = Runner(
id=webhook.workflow_job.runner_id,
name=webhook.workflow_job.runner_name,
busy=False,
status="online",
manager=settings.name,
runner_group_id=webhook.workflow_job.runner_group_id,
runner_group_name=webhook.workflow_job.runner_group_name,
)
runner.save()
Migrator().run()

assert len(runner_group.get_runners()) == 1
# ensure we have only one runner if the time to start is less than timeout_runners
webhook.workflow_job.started_at = webhook.workflow_job.created_at + (
settings.timeout_runner - timedelta(minutes=15)
)
queue.enqueue(workflow_job.in_progress, webhook)
assert len(runner_group.get_runners()) == 1
# ensure we have two runners if the time to start is greater than 15 minutes
webhook.workflow_job.started_at = webhook.workflow_job.created_at + (
settings.timeout_runner + timedelta(minutes=15)
)
queue.enqueue(workflow_job.in_progress, webhook)
assert len(runner_group.get_runners()) == 2
# ensure we remain with two runners given that the max for the runner group is 2
queue.enqueue(workflow_job.in_progress, webhook)
assert len(runner_group.get_runners()) == 2
11 changes: 11 additions & 0 deletions tests/unit/models/test_runner_group.py
Original file line number Diff line number Diff line change
Expand Up @@ -214,3 +214,14 @@ def test_find_github_group(runner_group: RunnerGroup, github: GitHub):
assert exists is not None
group = runner_group.save(github=github)
assert exists.id == group.id


def test_is_full(runner_group: RunnerGroup, github: GitHub):
runner_group.max = 2
runner_group.min = 1
runner_group.save()
assert runner_group.is_full is False
runner_group.create_runner(github)
assert runner_group.is_full is False
runner_group.create_runner(github)
assert runner_group.is_full is True

0 comments on commit 24e056a

Please sign in to comment.