Skip to content

Commit

Permalink
Scaling rules for Lookup queues (#293)
Browse files Browse the repository at this point in the history
* Scaling rules for Lookup queues
* Fixing caught exception for Events
  • Loading branch information
JohnPreston authored Dec 1, 2020
1 parent c81f443 commit b9f1ec8
Show file tree
Hide file tree
Showing 9 changed files with 110 additions and 52 deletions.
2 changes: 1 addition & 1 deletion ecs_composex/ecs/ecs_scaling.py
Original file line number Diff line number Diff line change
Expand Up @@ -266,7 +266,7 @@ def handle_defined_x_aws_autoscaling(configs, service):
f"Detected both x-aws-autoscaling and x-scaling for {service.name}. Priority goes to x-scaling"
)
configs.append(service.x_scaling)
elif not keyisset("deploy", service.definition) and service.x_scaling:
elif service.x_scaling:
LOG.debug("No x-aws-autoscaling detected, proceeding as usual")
configs.append(service.x_scaling)

Expand Down
6 changes: 6 additions & 0 deletions ecs_composex/events/events_ecs.py
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@
SERVICE_T,
TASK_ROLE_T,
EXEC_ROLE_T,
SERVICE_SCALING_TARGET,
)
from ecs_composex.vpc.vpc_params import APP_SUBNETS, SG_ID_TYPE, SUBNETS_TYPE

Expand Down Expand Up @@ -191,11 +192,16 @@ def define_service_targets(stack, rule, cluster_arn):
if (
keyisset("DeleteDefaultService", service[4])
and SERVICE_T in service[0].template.resources
and SERVICE_SCALING_TARGET not in service[0].template.resources
):
LOG.info(
f"Deleting ECS Service definition from stack for {service[0].name}"
)
del service[0].template.resources[SERVICE_T]
elif SERVICE_SCALING_TARGET in service[0].template.resources:
LOG.warning(
f"Target for event {rule.logical_name} has others dependencies. Not altering"
)


def events_to_ecs(resources, services_stack, res_root_stack, settings):
Expand Down
8 changes: 5 additions & 3 deletions ecs_composex/sqs/sqs_aws.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@
find_aws_resource_arn_from_tags_api,
define_lookup_role_from_info,
)
from ecs_composex.sqs.sqs_params import SQS_ARN, SQS_KMS_KEY_T
from ecs_composex.sqs.sqs_params import SQS_ARN, SQS_KMS_KEY_T, SQS_NAME


def get_queue_config(logical_name, queue_arn, session):
Expand All @@ -41,13 +41,15 @@ def get_queue_config(logical_name, queue_arn, session):
queue_parts = re.compile(r"(?:^arn:aws(?:-[a-z]+)?:sqs:)([\S]+):([0-9]+):([\S]+)$")
queue_name = queue_parts.match(queue_arn).groups()[2]
queue_owner = queue_parts.match(queue_arn).groups()[1]
queue_config = {logical_name: queue_name, SQS_ARN.title: queue_arn}
queue_config = {SQS_ARN.title: queue_arn}
client = session.client("sqs")
try:
url_r = client.get_queue_url(
QueueName=queue_name, QueueOwnerAWSAccountId=queue_owner
)
queue_config.update({logical_name: url_r["QueueUrl"]})
queue_config.update(
{logical_name: url_r["QueueUrl"], SQS_NAME.title: queue_name}
)
try:
encryption_config_r = client.get_queue_attributes(
QueueUrl=url_r["QueueUrl"], AttributeNames=["KmsMasterKeyId"]
Expand Down
118 changes: 77 additions & 41 deletions ecs_composex/sqs/sqs_ecs.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@
"""

from troposphere import Parameter
from troposphere import Ref, GetAtt
from troposphere import Ref, GetAtt, FindInMap
from troposphere.cloudwatch import Alarm, MetricDimension

from ecs_composex.common import LOG, keyisset, add_parameters
Expand All @@ -34,35 +34,32 @@
handle_lookup_resource,
)
from ecs_composex.sqs.sqs_aws import lookup_queue_config
from ecs_composex.sqs.sqs_params import SQS_NAME, SQS_KMS_KEY_T
from ecs_composex.sqs.sqs_params import SQS_NAME, SQS_KMS_KEY_T, MOD_KEY


def handle_service_scaling(resource, res_root_stack):
"""
Function to assign resource to services stack
Function to define and prepare settings for scaling rules based for SQS Queues discovered through lookup
:param resource:
:type resource: ecs_composex.common.compose_resources.XResource
:param res_root_stack:
:type res_root_stack: ecs_composex.common.stacks.ComposeXStack
:param ecs_composex.common.compose_resources.XResource resource:
:param ecs_composex.common.stacks.ComposeXStack res_root_stack:
:raises KeyError: if the service name is not a listed service in docker-compose.
:return:
"""
resource_attribute = SQS_NAME.title
resource_parameter = Parameter(
f"{resource.logical_name}{resource_attribute}", Type="String"
)
resource_value = GetAtt(
res_root_stack.title, f"Outputs.{resource.logical_name}{SQS_NAME.title}"
)
if not resource.lookup:
resource_value = GetAtt(
res_root_stack.title, f"Outputs.{resource.logical_name}{SQS_NAME.title}"
)
else:
resource_value = FindInMap(MOD_KEY, resource.logical_name, resource_attribute)
for target in resource.families_scaling:
if SERVICE_SCALING_TARGET not in target[0].template.resources:
LOG.warning(
f"No Scalable target defined for {target[0].name}."
" You need to define `scaling.range` in x-configs first. No scaling applied"
)
return
add_parameters(target[0].template, [resource_parameter])
target[0].stack.Parameters.update({resource_parameter.title: resource_value})
scaling_out_policy = generate_alarm_scaling_out_policy(
target[0].logical_name,
target[0].template,
Expand All @@ -75,31 +72,69 @@ def handle_service_scaling(resource, res_root_stack):
target[1],
scaling_source=resource.logical_name,
)
Alarm(
f"SqsScalingAlarm{resource.logical_name}To{target[0].logical_name}",
template=target[0].template,
ActionsEnabled=True,
AlarmActions=[Ref(scaling_out_policy)],
AlarmDescription=f"MessagesProcessingWatchFor{resource.logical_name}To{target[0].logical_name}",
ComparisonOperator="GreaterThanOrEqualToThreshold",
DatapointsToAlarm=1,
Dimensions=[
MetricDimension(Name="QueueName", Value=Ref(resource_parameter)),
],
EvaluationPeriods=1,
InsufficientDataActions=[Ref(scaling_in_policy)],
MetricName="ApproximateNumberOfMessagesVisible",
Namespace="AWS/SQS",
OKActions=[Ref(scaling_in_policy)],
Period="60",
Statistic="Sum",
TreatMissingData="notBreaching",
Threshold=float(
scaling_out_policy.StepScalingPolicyConfiguration.StepAdjustments[
0
].MetricIntervalLowerBound
),
)
if not resource.lookup:
resource_parameter = Parameter(
f"{resource.logical_name}{resource_attribute}", Type="String"
)
add_parameters(target[0].template, [resource_parameter])
target[0].stack.Parameters.update(
{resource_parameter.title: resource_value}
)
add_alarm_for_resource(
resource,
target,
scaling_out_policy,
scaling_in_policy,
Ref(resource_parameter),
)
else:
add_alarm_for_resource(
resource,
target,
scaling_out_policy,
scaling_in_policy,
resource_value,
)


def add_alarm_for_resource(
resource, target, scaling_out_policy, scaling_in_policy, resource_parameter
):
"""
Function to add the Alarm for SQS resource to the service template
:param ecs_composex.common.compose_resources.XResource resource:
:param tuple target:
:param scaling_out_policy:
:param scaling_in_policy:
:param resource_parameter:
:return:
"""
Alarm(
f"SqsScalingAlarm{resource.logical_name}To{target[0].logical_name}",
template=target[0].template,
ActionsEnabled=True,
AlarmActions=[Ref(scaling_out_policy)],
AlarmDescription=f"MessagesProcessingWatchFor{resource.logical_name}To{target[0].logical_name}",
ComparisonOperator="GreaterThanOrEqualToThreshold",
DatapointsToAlarm=1,
Dimensions=[
MetricDimension(Name="QueueName", Value=resource_parameter),
],
EvaluationPeriods=1,
InsufficientDataActions=[Ref(scaling_in_policy)],
MetricName="ApproximateNumberOfMessagesVisible",
Namespace="AWS/SQS",
OKActions=[Ref(scaling_in_policy)],
Period="60",
Statistic="Sum",
TreatMissingData="notBreaching",
Threshold=float(
scaling_out_policy.StepScalingPolicyConfiguration.StepAdjustments[
0
].MetricIntervalLowerBound
),
)


def create_sqs_mappings(mapping, resources, settings):
Expand Down Expand Up @@ -140,4 +175,5 @@ def sqs_to_ecs(resources, services_stack, res_root_stack, settings):
handle_service_scaling(new_res, res_root_stack)
create_sqs_mappings(resource_mappings, lookup_resources, settings)
for lookup_res in lookup_resources:
handle_lookup_resource(resource_mappings, "sqs", lookup_res)
handle_lookup_resource(resource_mappings, MOD_KEY, lookup_res)
handle_service_scaling(lookup_res, None)
3 changes: 2 additions & 1 deletion ecs_composex/sqs/sqs_params.py
Original file line number Diff line number Diff line change
Expand Up @@ -38,5 +38,6 @@
SQS_KMS_KEY_T = "QueueKmsKey"
SQS_KMS_KEY = Parameter(SQS_KMS_KEY_T, Type="String")

RES_KEY = f"x-{path.basename(path.dirname(path.abspath(__file__)))}"
MOD_KEY = f"{path.basename(path.dirname(path.abspath(__file__)))}"
RES_KEY = f"x-{MOD_KEY}"
SQS_SSM_PREFIX = f"/{RES_KEY}/"
1 change: 1 addition & 0 deletions features/features/sqs.feature
Original file line number Diff line number Diff line change
Expand Up @@ -21,4 +21,5 @@ Feature: ecs_composex.sqs

Examples:
| file_path | override_file |
| use-cases/blog.features.yml | use-cases/sqs/simple_queue.yml |
| use-cases/blog.features.yml | use-cases/sqs/create_and_lookup.yml |
7 changes: 3 additions & 4 deletions use-cases/blog.features.yml
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,8 @@ services:
- '*'
Sid: AllowPublishMetricsToCw
x-xray: false
x-scaling:
range: "1-4"
app02:
deploy:
labels:
Expand Down Expand Up @@ -99,11 +101,10 @@ services:
ManagedPolicyArns:
- arn:aws:iam::aws:policy/AdministratorAccess
x-scaling:
range: 1-5
range: "1-5"
target_scaling:
cpu_target: 88
disable_scale_in: true
memory_target: 30
x-xray: false
app03:
deploy:
Expand Down Expand Up @@ -138,8 +139,6 @@ services:
RetentionInDays: 42
x-scaling:
range: 1-10
target_scaling:
cpu_target: 50
rproxy:
depends_on:
- app01
Expand Down
2 changes: 1 addition & 1 deletion use-cases/events/simple.yml
Original file line number Diff line number Diff line change
Expand Up @@ -10,4 +10,4 @@ x-events:
Services:
- name: app03
TaskCount: 1
DeleteDefaultService: True
DeleteDefaultService: False
15 changes: 14 additions & 1 deletion use-cases/sqs/create_and_lookup.yml
Original file line number Diff line number Diff line change
Expand Up @@ -42,8 +42,21 @@ x-sqs:
queueE:
Properties: {}
Services:
- name: app03
- name: bignicefamily
access: RWMessages
scaling:
scale_in_cooldown: 120
scale_out_cooldown: 60
steps:
- lower_bound: 0
upper_bound: 10
count: 1
- lower_bound: 10
upper_bound: 20
count: 2
- lower_bound: 20
count: 5



queueF:
Expand Down

0 comments on commit b9f1ec8

Please sign in to comment.