From 5913e8b038e4bf25494eaa7b9d643cd19b387626 Mon Sep 17 00:00:00 2001 From: Tim Brown Date: Thu, 16 Jan 2025 13:41:30 +0000 Subject: [PATCH 1/6] aws-backend: add support for running instances on multiple subnets --- runner_manager/backend/aws.py | 56 ++++++++++++++++++--- runner_manager/models/backend.py | 13 ++++- tests/unit/backend/test_aws.py | 84 ++++++++++++++++++++++++++++++-- 3 files changed, 140 insertions(+), 13 deletions(-) diff --git a/runner_manager/backend/aws.py b/runner_manager/backend/aws.py index 8f3f53f8..60cafc08 100644 --- a/runner_manager/backend/aws.py +++ b/runner_manager/backend/aws.py @@ -1,4 +1,6 @@ -from typing import List, Literal, Optional +from copy import deepcopy +from typing import List, Literal, Optional, Sequence +from random import shuffle from boto3 import client from botocore.exceptions import ClientError @@ -13,6 +15,7 @@ from runner_manager.models.backend import ( AWSConfig, AwsInstance, + AwsSubnetListConfig, AWSInstanceConfig, Backends, ) @@ -31,15 +34,52 @@ def client(self) -> EC2Client: def create(self, runner: Runner) -> Runner: """Create a runner.""" - instance_resource: AwsInstance = self.instance_config.configure_instance(runner) - try: - instance = self.client.run_instances(**instance_resource) - runner.instance_id = instance["Instances"][0]["InstanceId"] - except Exception as e: - log.error(e) - raise e + if self.instance_config.subnet_id and self.instance_config.subnet_configs: + raise Exception("Instance config contains both subnet_id and subnet_configs, only one allowed.") + if len(self.instance_config.subnet_configs) > 0: + runner = self._create_from_subnet_config(runner, self.instance_config.subnet_configs) + log.warn(f"Instance id: {runner.instance_id}") + else: + instance_resource: AwsInstance = self.instance_config.configure_instance(runner) + try: + runner = self._create(runner, instance_resource) + log.warn(f"Instance id: {runner.instance_id}") + except Exception as e: + log.error(e) + raise e return super().create(runner) + def _create_from_subnet_config(self, runner: Runner, subnet_configs: Sequence[AwsSubnetListConfig]) -> Runner: + # Randomize the order of the Subnets - very coarse load balancing. + # TODO: Skip subnets that have failed recently. Maybe with an increasing backoff. + order = list(range(len(subnet_configs))) + shuffle(order) + subnet_config = self.instance_config.subnet_configs + print(f"Order: {order}") + for idx, i in enumerate(order): + subnet_config = subnet_configs[i] + try: + # Copy the object to avoid modifying the object we were passed. + count = self.instance_config.max_count - self.instance_config.min_count + log.info(f"Trying to launch {count} containers on subnet {subnet_config['subnet_id']}") + concrete_instance_config = deepcopy(self.instance_config) + concrete_instance_config.subnet_id = subnet_config["subnet_id"] + concrete_instance_config.security_group_ids.extend( + subnet_config.get("security_group_ids", []) + ) + instance_resource: AwsInstance = concrete_instance_config.configure_instance(runner) + return self._create(runner, instance_resource) + except Exception as e: + log.warn(f"Creating instance in subnet {subnet_config['subnet_id']} failed with '{e}'. Retrying with another subnet.") + if idx >= len(order) - 1: + raise e + return runner + + def _create(self, runner: Runner, instance_resource: AwsInstance) -> Runner: + instance = self.client.run_instances(**instance_resource) + runner.instance_id = instance["Instances"][0]["InstanceId"] + return runner + def delete(self, runner: Runner): """Delete a runner.""" if runner.instance_id: diff --git a/runner_manager/models/backend.py b/runner_manager/models/backend.py index 29c7e714..b8deb84c 100644 --- a/runner_manager/models/backend.py +++ b/runner_manager/models/backend.py @@ -1,7 +1,7 @@ from enum import Enum from pathlib import Path from string import Template -from typing import Dict, List, Literal, Optional, Sequence, TypedDict +from typing import Dict, List, Literal, Optional, Sequence, TypedDict, NotRequired from mypy_boto3_ec2.literals import ( InstanceMetadataTagsStateType, @@ -134,6 +134,14 @@ class AWSConfig(BackendConfig): region: str = "us-west-2" +AwsSubnetListConfig = TypedDict( + "AwsSubnetListConfig", + { + "subnet_id": str, + "security_group_ids": NotRequired[Sequence[str]], + } +) + AwsInstance = TypedDict( "AwsInstance", { @@ -157,7 +165,7 @@ class AWSInstanceConfig(InstanceConfig): image: str = "ami-0735c191cf914754d" # Ubuntu 22.04 for us-west-2 instance_type: InstanceTypeType = "t3.micro" - subnet_id: str + subnet_id: str = "" security_group_ids: Sequence[str] = [] max_count: int = 1 min_count: int = 1 @@ -167,6 +175,7 @@ class AWSInstanceConfig(InstanceConfig): disk_size_gb: int = 20 iam_instance_profile_arn: str = "" instance_metadata_tags: InstanceMetadataTagsStateType = "disabled" + subnet_configs: Sequence[AwsSubnetListConfig] = [] def configure_instance(self, runner: Runner) -> AwsInstance: """Configure instance.""" diff --git a/tests/unit/backend/test_aws.py b/tests/unit/backend/test_aws.py index b5633411..ffebebf1 100644 --- a/tests/unit/backend/test_aws.py +++ b/tests/unit/backend/test_aws.py @@ -2,6 +2,7 @@ from mypy_boto3_ec2.type_defs import TagTypeDef from pytest import fixture, mark, raises +from unittest.mock import patch from redis_om import NotFoundError from runner_manager.backend.aws import AWSBackend @@ -35,11 +36,67 @@ def aws_group(settings) -> RunnerGroup: ) return runner_group +@fixture() +def aws_multi_subnet_group(settings) -> RunnerGroup: + config = AWSConfig() + subnet_id = os.getenv("AWS_SUBNET_ID", "") + runner_group: RunnerGroup = RunnerGroup( + id=3, + name="default", + organization="test", + manager=settings.name, + backend=AWSBackend( + name=Backends.aws, + config=config, + instance_config=AWSInstanceConfig( + subnet_configs=[ + { + "subnet_id": subnet_id, + "security_group_ids": [], + } + ] + ), + ), + labels=[ + "label", + ], + ) + return runner_group + +@fixture() +def aws_multi_subnet_group_invalid_subnets(settings) -> RunnerGroup: + config = AWSConfig() + runner_group: RunnerGroup = RunnerGroup( + id=3, + name="default", + organization="test", + manager=settings.name, + backend=AWSBackend( + name=Backends.aws, + config=config, + instance_config=AWSInstanceConfig( + subnet_configs=[ + { + "subnet_id": "does-not-exist", + }, + { + "subnet_id": "also-does-not-exist", + } + ] + ), + ), + labels=[ + "label", + ], + ) + return runner_group + @fixture() def aws_runner(runner: Runner, aws_group: RunnerGroup) -> Runner: # Cleanup and return a runner for testing aws_group.backend.delete(runner) + runner.instance_id = None return runner @@ -70,7 +127,7 @@ def test_aws_instance_config(runner: Runner): assert instance["TagSpecifications"][1]["ResourceType"] == "volume" -@mark.skipif(not os.getenv("AWS_ACCESS_KEY_ID"), reason="AWS credentials not found") +@mark.skipif(not os.getenv("AWS_ACCESS_KEY_ID") and not os.getenv("AWS_PROFILE"), reason="AWS credentials not found") def test_create_delete(aws_runner, aws_group): runner = aws_group.backend.create(aws_runner) assert runner.instance_id is not None @@ -81,7 +138,7 @@ def test_create_delete(aws_runner, aws_group): Runner.find(Runner.instance_id == runner.instance_id).first() -@mark.skipif(not os.getenv("AWS_ACCESS_KEY_ID"), reason="AWS credentials not found") +@mark.skipif(not os.getenv("AWS_ACCESS_KEY_ID") and not os.getenv("AWS_PROFILE"), reason="AWS credentials not found") def test_list(aws_runner, aws_group): runner = aws_group.backend.create(aws_runner) runners = aws_group.backend.list() @@ -91,7 +148,7 @@ def test_list(aws_runner, aws_group): aws_group.backend.get(runner.instance_id) -@mark.skipif(not os.getenv("AWS_ACCESS_KEY_ID"), reason="AWS credentials not found") +@mark.skipif(not os.getenv("AWS_ACCESS_KEY_ID") and not os.getenv("AWS_PROFILE"), reason="AWS credentials not found") def test_update(aws_runner, aws_group): runner = aws_group.backend.create(aws_runner) runner.labels = [RunnerLabel(name="test", type="custom")] @@ -100,3 +157,24 @@ def test_update(aws_runner, aws_group): aws_group.backend.delete(runner) with raises(NotFoundError): aws_group.backend.get(runner.instance_id) + + +@mark.skipif(not os.getenv("AWS_ACCESS_KEY_ID") and not os.getenv("AWS_PROFILE"), reason="AWS credentials not found") +def test_create_delete_multi_subnet(aws_runner, aws_multi_subnet_group): + runner = aws_multi_subnet_group.backend.create(aws_runner) + print(f"{runner.instance_id}") + assert runner.instance_id is not None + assert runner.backend == "aws" + assert Runner.find(Runner.instance_id == runner.instance_id).first() == runner + aws_multi_subnet_group.backend.delete(runner) + with raises(NotFoundError): + Runner.find(Runner.instance_id == runner.instance_id).first() + + +@mark.skipif(not os.getenv("AWS_ACCESS_KEY_ID") and not os.getenv("AWS_PROFILE"), reason="AWS credentials not found") +def test_create_delete_multi_subnet_invalid_subnets(aws_runner, aws_multi_subnet_group_invalid_subnets): + with patch.object(AWSBackend, '_create', wraps=aws_multi_subnet_group_invalid_subnets.backend._create) as mock: + with raises(Exception): + aws_multi_subnet_group_invalid_subnets.backend.create(aws_runner) + # Check that the code tries once for each subnet. + assert mock.call_count == 2 \ No newline at end of file From 3ba3d687d475d09295990a2239fbdfd25c8c9902 Mon Sep 17 00:00:00 2001 From: Tim Brown Date: Fri, 17 Jan 2025 09:29:50 +0000 Subject: [PATCH 2/6] trunk fmt --- runner_manager/backend/aws.py | 32 +++++++++++++++++------- runner_manager/models/backend.py | 4 +-- tests/unit/backend/test_aws.py | 43 ++++++++++++++++++++++++-------- 3 files changed, 58 insertions(+), 21 deletions(-) diff --git a/runner_manager/backend/aws.py b/runner_manager/backend/aws.py index 60cafc08..2a53aa0d 100644 --- a/runner_manager/backend/aws.py +++ b/runner_manager/backend/aws.py @@ -1,6 +1,6 @@ from copy import deepcopy -from typing import List, Literal, Optional, Sequence from random import shuffle +from typing import List, Literal, Optional, Sequence from boto3 import client from botocore.exceptions import ClientError @@ -15,8 +15,8 @@ from runner_manager.models.backend import ( AWSConfig, AwsInstance, - AwsSubnetListConfig, AWSInstanceConfig, + AwsSubnetListConfig, Backends, ) from runner_manager.models.runner import Runner @@ -35,12 +35,18 @@ def client(self) -> EC2Client: def create(self, runner: Runner) -> Runner: """Create a runner.""" if self.instance_config.subnet_id and self.instance_config.subnet_configs: - raise Exception("Instance config contains both subnet_id and subnet_configs, only one allowed.") + raise Exception( + "Instance config contains both subnet_id and subnet_configs, only one allowed." + ) if len(self.instance_config.subnet_configs) > 0: - runner = self._create_from_subnet_config(runner, self.instance_config.subnet_configs) + runner = self._create_from_subnet_config( + runner, self.instance_config.subnet_configs + ) log.warn(f"Instance id: {runner.instance_id}") else: - instance_resource: AwsInstance = self.instance_config.configure_instance(runner) + instance_resource: AwsInstance = self.instance_config.configure_instance( + runner + ) try: runner = self._create(runner, instance_resource) log.warn(f"Instance id: {runner.instance_id}") @@ -49,7 +55,9 @@ def create(self, runner: Runner) -> Runner: raise e return super().create(runner) - def _create_from_subnet_config(self, runner: Runner, subnet_configs: Sequence[AwsSubnetListConfig]) -> Runner: + def _create_from_subnet_config( + self, runner: Runner, subnet_configs: Sequence[AwsSubnetListConfig] + ) -> Runner: # Randomize the order of the Subnets - very coarse load balancing. # TODO: Skip subnets that have failed recently. Maybe with an increasing backoff. order = list(range(len(subnet_configs))) @@ -61,16 +69,22 @@ def _create_from_subnet_config(self, runner: Runner, subnet_configs: Sequence[Aw try: # Copy the object to avoid modifying the object we were passed. count = self.instance_config.max_count - self.instance_config.min_count - log.info(f"Trying to launch {count} containers on subnet {subnet_config['subnet_id']}") + log.info( + f"Trying to launch {count} containers on subnet {subnet_config['subnet_id']}" + ) concrete_instance_config = deepcopy(self.instance_config) concrete_instance_config.subnet_id = subnet_config["subnet_id"] concrete_instance_config.security_group_ids.extend( subnet_config.get("security_group_ids", []) ) - instance_resource: AwsInstance = concrete_instance_config.configure_instance(runner) + instance_resource: AwsInstance = ( + concrete_instance_config.configure_instance(runner) + ) return self._create(runner, instance_resource) except Exception as e: - log.warn(f"Creating instance in subnet {subnet_config['subnet_id']} failed with '{e}'. Retrying with another subnet.") + log.warn( + f"Creating instance in subnet {subnet_config['subnet_id']} failed with '{e}'. Retrying with another subnet." + ) if idx >= len(order) - 1: raise e return runner diff --git a/runner_manager/models/backend.py b/runner_manager/models/backend.py index b8deb84c..98bb60f1 100644 --- a/runner_manager/models/backend.py +++ b/runner_manager/models/backend.py @@ -1,7 +1,7 @@ from enum import Enum from pathlib import Path from string import Template -from typing import Dict, List, Literal, Optional, Sequence, TypedDict, NotRequired +from typing import Dict, List, Literal, NotRequired, Optional, Sequence, TypedDict from mypy_boto3_ec2.literals import ( InstanceMetadataTagsStateType, @@ -139,7 +139,7 @@ class AWSConfig(BackendConfig): { "subnet_id": str, "security_group_ids": NotRequired[Sequence[str]], - } + }, ) AwsInstance = TypedDict( diff --git a/tests/unit/backend/test_aws.py b/tests/unit/backend/test_aws.py index ffebebf1..2458b4e7 100644 --- a/tests/unit/backend/test_aws.py +++ b/tests/unit/backend/test_aws.py @@ -1,8 +1,8 @@ import os +from unittest.mock import patch from mypy_boto3_ec2.type_defs import TagTypeDef from pytest import fixture, mark, raises -from unittest.mock import patch from redis_om import NotFoundError from runner_manager.backend.aws import AWSBackend @@ -36,6 +36,7 @@ def aws_group(settings) -> RunnerGroup: ) return runner_group + @fixture() def aws_multi_subnet_group(settings) -> RunnerGroup: config = AWSConfig() @@ -63,6 +64,7 @@ def aws_multi_subnet_group(settings) -> RunnerGroup: ) return runner_group + @fixture() def aws_multi_subnet_group_invalid_subnets(settings) -> RunnerGroup: config = AWSConfig() @@ -81,7 +83,7 @@ def aws_multi_subnet_group_invalid_subnets(settings) -> RunnerGroup: }, { "subnet_id": "also-does-not-exist", - } + }, ] ), ), @@ -127,7 +129,10 @@ def test_aws_instance_config(runner: Runner): assert instance["TagSpecifications"][1]["ResourceType"] == "volume" -@mark.skipif(not os.getenv("AWS_ACCESS_KEY_ID") and not os.getenv("AWS_PROFILE"), reason="AWS credentials not found") +@mark.skipif( + not os.getenv("AWS_ACCESS_KEY_ID") and not os.getenv("AWS_PROFILE"), + reason="AWS credentials not found", +) def test_create_delete(aws_runner, aws_group): runner = aws_group.backend.create(aws_runner) assert runner.instance_id is not None @@ -138,7 +143,10 @@ def test_create_delete(aws_runner, aws_group): Runner.find(Runner.instance_id == runner.instance_id).first() -@mark.skipif(not os.getenv("AWS_ACCESS_KEY_ID") and not os.getenv("AWS_PROFILE"), reason="AWS credentials not found") +@mark.skipif( + not os.getenv("AWS_ACCESS_KEY_ID") and not os.getenv("AWS_PROFILE"), + reason="AWS credentials not found", +) def test_list(aws_runner, aws_group): runner = aws_group.backend.create(aws_runner) runners = aws_group.backend.list() @@ -148,7 +156,10 @@ def test_list(aws_runner, aws_group): aws_group.backend.get(runner.instance_id) -@mark.skipif(not os.getenv("AWS_ACCESS_KEY_ID") and not os.getenv("AWS_PROFILE"), reason="AWS credentials not found") +@mark.skipif( + not os.getenv("AWS_ACCESS_KEY_ID") and not os.getenv("AWS_PROFILE"), + reason="AWS credentials not found", +) def test_update(aws_runner, aws_group): runner = aws_group.backend.create(aws_runner) runner.labels = [RunnerLabel(name="test", type="custom")] @@ -159,7 +170,10 @@ def test_update(aws_runner, aws_group): aws_group.backend.get(runner.instance_id) -@mark.skipif(not os.getenv("AWS_ACCESS_KEY_ID") and not os.getenv("AWS_PROFILE"), reason="AWS credentials not found") +@mark.skipif( + not os.getenv("AWS_ACCESS_KEY_ID") and not os.getenv("AWS_PROFILE"), + reason="AWS credentials not found", +) def test_create_delete_multi_subnet(aws_runner, aws_multi_subnet_group): runner = aws_multi_subnet_group.backend.create(aws_runner) print(f"{runner.instance_id}") @@ -171,10 +185,19 @@ def test_create_delete_multi_subnet(aws_runner, aws_multi_subnet_group): Runner.find(Runner.instance_id == runner.instance_id).first() -@mark.skipif(not os.getenv("AWS_ACCESS_KEY_ID") and not os.getenv("AWS_PROFILE"), reason="AWS credentials not found") -def test_create_delete_multi_subnet_invalid_subnets(aws_runner, aws_multi_subnet_group_invalid_subnets): - with patch.object(AWSBackend, '_create', wraps=aws_multi_subnet_group_invalid_subnets.backend._create) as mock: +@mark.skipif( + not os.getenv("AWS_ACCESS_KEY_ID") and not os.getenv("AWS_PROFILE"), + reason="AWS credentials not found", +) +def test_create_delete_multi_subnet_invalid_subnets( + aws_runner, aws_multi_subnet_group_invalid_subnets +): + with patch.object( + AWSBackend, + "_create", + wraps=aws_multi_subnet_group_invalid_subnets.backend._create, + ) as mock: with raises(Exception): aws_multi_subnet_group_invalid_subnets.backend.create(aws_runner) # Check that the code tries once for each subnet. - assert mock.call_count == 2 \ No newline at end of file + assert mock.call_count == 2 From a623a141354164c1f08a3419a232b33c0673bed0 Mon Sep 17 00:00:00 2001 From: Tim Brown Date: Mon, 20 Jan 2025 11:05:50 +0000 Subject: [PATCH 3/6] fix extend warning and ignore type errors from existing code --- runner_manager/backend/aws.py | 12 +++++++----- 1 file changed, 7 insertions(+), 5 deletions(-) diff --git a/runner_manager/backend/aws.py b/runner_manager/backend/aws.py index 2a53aa0d..f452a6e6 100644 --- a/runner_manager/backend/aws.py +++ b/runner_manager/backend/aws.py @@ -74,9 +74,11 @@ def _create_from_subnet_config( ) concrete_instance_config = deepcopy(self.instance_config) concrete_instance_config.subnet_id = subnet_config["subnet_id"] - concrete_instance_config.security_group_ids.extend( - subnet_config.get("security_group_ids", []) - ) + subnet_security_groups = subnet_config.get("security_group_ids", []) + if subnet_security_groups: + security_groups = list(concrete_instance_config.security_group_ids) + security_groups += subnet_security_groups + concrete_instance_config.security_group_ids = security_groups instance_resource: AwsInstance = ( concrete_instance_config.configure_instance(runner) ) @@ -90,8 +92,8 @@ def _create_from_subnet_config( return runner def _create(self, runner: Runner, instance_resource: AwsInstance) -> Runner: - instance = self.client.run_instances(**instance_resource) - runner.instance_id = instance["Instances"][0]["InstanceId"] + instance = self.client.run_instances(**instance_resource) # type: ignore + runner.instance_id = instance["Instances"][0]["InstanceId"] # type: ignore return runner def delete(self, runner: Runner): From 3f60dbfb6e27deeaae4ed754c74e2f6d0be00b1a Mon Sep 17 00:00:00 2001 From: Tim Brown Date: Mon, 20 Jan 2025 11:09:05 +0000 Subject: [PATCH 4/6] add comment to describe ignore and fmt --- runner_manager/backend/aws.py | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/runner_manager/backend/aws.py b/runner_manager/backend/aws.py index f452a6e6..b5ab03de 100644 --- a/runner_manager/backend/aws.py +++ b/runner_manager/backend/aws.py @@ -92,8 +92,10 @@ def _create_from_subnet_config( return runner def _create(self, runner: Runner, instance_resource: AwsInstance) -> Runner: - instance = self.client.run_instances(**instance_resource) # type: ignore - runner.instance_id = instance["Instances"][0]["InstanceId"] # type: ignore + instance = self.client.run_instances(**instance_resource) + # Allow this to raise exception as we don't want to track an instance that + # doesn't have an instance ID. + runner.instance_id = instance["Instances"][0]["InstanceId"] # type: ignore return runner def delete(self, runner: Runner): From c0f4b07003606417b3c56034a25e3e46071ae181 Mon Sep 17 00:00:00 2001 From: Tim Brown Date: Mon, 27 Jan 2025 11:18:14 +0000 Subject: [PATCH 5/6] Stop using print for debug message and remove unused variable definition --- runner_manager/backend/aws.py | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/runner_manager/backend/aws.py b/runner_manager/backend/aws.py index b5ab03de..dc95eb9c 100644 --- a/runner_manager/backend/aws.py +++ b/runner_manager/backend/aws.py @@ -62,8 +62,7 @@ def _create_from_subnet_config( # TODO: Skip subnets that have failed recently. Maybe with an increasing backoff. order = list(range(len(subnet_configs))) shuffle(order) - subnet_config = self.instance_config.subnet_configs - print(f"Order: {order}") + log.debug(f"Order: {order}") for idx, i in enumerate(order): subnet_config = subnet_configs[i] try: From 3cec72acfc8fdf8d7c5df8ee7ead534763ec3b86 Mon Sep 17 00:00:00 2001 From: Tim Brown Date: Mon, 27 Jan 2025 13:22:16 +0000 Subject: [PATCH 6/6] remove barely useful debug message --- runner_manager/backend/aws.py | 1 - 1 file changed, 1 deletion(-) diff --git a/runner_manager/backend/aws.py b/runner_manager/backend/aws.py index dc95eb9c..1d6d9c6e 100644 --- a/runner_manager/backend/aws.py +++ b/runner_manager/backend/aws.py @@ -62,7 +62,6 @@ def _create_from_subnet_config( # TODO: Skip subnets that have failed recently. Maybe with an increasing backoff. order = list(range(len(subnet_configs))) shuffle(order) - log.debug(f"Order: {order}") for idx, i in enumerate(order): subnet_config = subnet_configs[i] try: