From 9b333690b0334faf9d7bdf386a3af61454b02ba9 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?J=C3=A9r=C3=B4me=20Forestier?= Date: Mon, 17 Jun 2024 12:36:43 +0200 Subject: [PATCH] Add a method to subscribe SQS queue to SNS topic An SQS queue might need to subscribe to an SNS Topic. The method generates Subscription and QueuePolicy sections for CloudFormation template --- src/e3/aws/troposphere/sns/__init__.py | 4 +- src/e3/aws/troposphere/sqs/__init__.py | 40 ++++++++- .../tests_e3_aws/troposphere/sqs/__init__.py | 0 .../tests_e3_aws/troposphere/sqs/sqs_test.py | 88 +++++++++++++++++++ 4 files changed, 129 insertions(+), 3 deletions(-) create mode 100644 tests/tests_e3_aws/troposphere/sqs/__init__.py create mode 100644 tests/tests_e3_aws/troposphere/sqs/sqs_test.py diff --git a/src/e3/aws/troposphere/sns/__init__.py b/src/e3/aws/troposphere/sns/__init__.py index 690003a7..546aa756 100644 --- a/src/e3/aws/troposphere/sns/__init__.py +++ b/src/e3/aws/troposphere/sns/__init__.py @@ -42,9 +42,11 @@ def add_lambda_subscription( "Endpoint": function.arn, "Protocol": "lambda", "TopicArn": self.arn, - "DeliveryPolicy": delivery_policy, } + if delivery_policy: + sub_params.update({"DeliveryPolicy": delivery_policy}) + self.optional_resources.extend( [ sns.SubscriptionResource( diff --git a/src/e3/aws/troposphere/sqs/__init__.py b/src/e3/aws/troposphere/sqs/__init__.py index ab6aca95..162d98f8 100644 --- a/src/e3/aws/troposphere/sqs/__init__.py +++ b/src/e3/aws/troposphere/sqs/__init__.py @@ -5,7 +5,7 @@ from e3.aws.troposphere.iam.policy_document import PolicyDocument from e3.aws.troposphere.iam.policy_statement import Allow -from troposphere import sqs, GetAtt, Ref +from troposphere import sns, sqs, GetAtt, Ref if TYPE_CHECKING: from typing import Optional @@ -27,6 +27,10 @@ def __init__( """Initialize a SQS. :param name: topic name + :param fifo: Set the queue type to fifo + :param visibility_timeout: set the length of time during which a message will be + unavailable after a message is delivered from the queue + :param dlq_name: dead letter queue name """ self.name = name self.attr = {"QueueName": name, "VisibilityTimeout": visibility_timeout} @@ -44,6 +48,7 @@ def __init__( "deadLetterTargetArn": GetAtt(name_to_id(dlq_name), "Arn"), "maxReceiveCount": "3", } + self.optional_resources: list[AWSObject] = [] def allow_service_to_write( self, service: str, name_suffix: str, condition: Optional[ConditionType] = None @@ -64,6 +69,34 @@ def allow_service_to_write( ).as_dict, ) + def subscribe_to_sns_topic( + self, topic_arn: str, delivery_policy: dict | None = None + ) -> None: + """Subscribe to SNS topic. + + :param topic_arn: ARN of the topic to subscribe + :param delivery_policy: The delivery policy to assign to the subscription + """ + sub_params = { + "Endpoint": self.arn, + "Protocol": "sqs", + "TopicArn": topic_arn, + } + + if delivery_policy: + sub_params.update({"DeliveryPolicy": delivery_policy}) + + self.optional_resources.extend( + [ + sns.SubscriptionResource(name_to_id(f"{self.name}Sub"), **sub_params), + self.allow_service_to_write( + service="sns", + name_suffix="Sub", + condition={"ArnLike": {"aws:SourceArn": topic_arn}}, + ), + ] + ) + @property def arn(self) -> GetAtt: """SQS ARN.""" @@ -76,4 +109,7 @@ def ref(self) -> Ref: def resources(self, stack: Stack) -> list[AWSObject]: """Compute AWS resources for the construct.""" - return [sqs.Queue.from_dict(name_to_id(self.name), self.attr)] + return [ + sqs.Queue.from_dict(name_to_id(self.name), self.attr), + *self.optional_resources, + ] diff --git a/tests/tests_e3_aws/troposphere/sqs/__init__.py b/tests/tests_e3_aws/troposphere/sqs/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/tests/tests_e3_aws/troposphere/sqs/sqs_test.py b/tests/tests_e3_aws/troposphere/sqs/sqs_test.py new file mode 100644 index 00000000..cf825b1d --- /dev/null +++ b/tests/tests_e3_aws/troposphere/sqs/sqs_test.py @@ -0,0 +1,88 @@ +from __future__ import annotations + +from e3.aws.troposphere import Stack +from e3.aws.troposphere.sqs import Queue + +EXPECTED_QUEUE_DEFAULT_TEMPLATE = { + "Myqueue": { + "Properties": {"QueueName": "myqueue", "VisibilityTimeout": 30}, + "Type": "AWS::SQS::Queue", + } +} + + +EXPECTED_QUEUE_TEMPLATE = { + "Myqueue": { + "Properties": { + "ContentBasedDeduplication": True, + "FifoQueue": True, + "QueueName": "myqueue.fifo", + "RedrivePolicy": { + "deadLetterTargetArn": {"Fn::GetAtt": ["Somedlqname", "Arn"]}, + "maxReceiveCount": "3", + }, + "VisibilityTimeout": 10, + }, + "Type": "AWS::SQS::Queue", + } +} + + +EXPECTED_SQS_SUBSCRIPTION_TEMPLATE = { + "Myqueue": { + "Properties": {"QueueName": "myqueue", "VisibilityTimeout": 30}, + "Type": "AWS::SQS::Queue", + }, + "MyqueuePolicySub": { + "Properties": { + "PolicyDocument": { + "Statement": [ + { + "Action": "sqs:SendMessage", + "Condition": {"ArnLike": {"aws:SourceArn": "some_topic_arn"}}, + "Effect": "Allow", + "Principal": {"Service": "sns.amazonaws.com"}, + "Resource": {"Fn::GetAtt": ["Myqueue", "Arn"]}, + } + ], + "Version": "2012-10-17", + }, + "Queues": [{"Ref": "Myqueue"}], + }, + "Type": "AWS::SQS::QueuePolicy", + }, + "MyqueueSub": { + "Properties": { + "Endpoint": {"Fn::GetAtt": ["Myqueue", "Arn"]}, + "Protocol": "sqs", + "TopicArn": "some_topic_arn", + }, + "Type": "AWS::SNS::Subscription", + }, +} + + +def test_queue_default(stack: Stack) -> None: + """Test Queue default creation.""" + stack.add(Queue(name="myqueue")) + assert stack.export()["Resources"] == EXPECTED_QUEUE_DEFAULT_TEMPLATE + + +def test_queue(stack: Stack) -> None: + """Test Queue creation.""" + stack.add( + Queue( + name="myqueue", fifo=True, visibility_timeout=10, dlq_name="some_dlq_name" + ) + ) + assert stack.export()["Resources"] == EXPECTED_QUEUE_TEMPLATE + + +def test_subscribe_to_sns_topic(stack: Stack) -> None: + """Test sqs subscription to sns topic.""" + queue = Queue(name="myqueue") + queue.subscribe_to_sns_topic("some_topic_arn") + + stack.add(queue) + + assert stack.export()["Resources"] == EXPECTED_SQS_SUBSCRIPTION_TEMPLATE