Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add a method to subscribe SQS Queue to SNS Topic #251

Merged
merged 1 commit into from
Jun 20, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion src/e3/aws/troposphere/sns/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -42,9 +42,11 @@ def add_lambda_subscription(
"Endpoint": function.arn,
"Protocol": "lambda",
"TopicArn": self.arn,
"DeliveryPolicy": delivery_policy,
}

if delivery_policy:
sub_params.update({"DeliveryPolicy": delivery_policy})

self.optional_resources.extend(
[
sns.SubscriptionResource(
Expand Down
40 changes: 38 additions & 2 deletions src/e3/aws/troposphere/sqs/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
from e3.aws.troposphere.iam.policy_document import PolicyDocument
from e3.aws.troposphere.iam.policy_statement import Allow

from troposphere import sqs, GetAtt, Ref
from troposphere import sns, sqs, GetAtt, Ref

if TYPE_CHECKING:
from typing import Optional
Expand All @@ -27,6 +27,10 @@ def __init__(
"""Initialize a SQS.

:param name: topic name
:param fifo: Set the queue type to fifo
:param visibility_timeout: set the length of time during which a message will be
unavailable after a message is delivered from the queue
:param dlq_name: dead letter queue name
"""
self.name = name
self.attr = {"QueueName": name, "VisibilityTimeout": visibility_timeout}
Expand All @@ -44,6 +48,7 @@ def __init__(
"deadLetterTargetArn": GetAtt(name_to_id(dlq_name), "Arn"),
"maxReceiveCount": "3",
}
self.optional_resources: list[AWSObject] = []

def allow_service_to_write(
self, service: str, name_suffix: str, condition: Optional[ConditionType] = None
Expand All @@ -64,6 +69,34 @@ def allow_service_to_write(
).as_dict,
)

def subscribe_to_sns_topic(
self, topic_arn: str, delivery_policy: dict | None = None
) -> None:
"""Subscribe to SNS topic.

:param topic_arn: ARN of the topic to subscribe
:param delivery_policy: The delivery policy to assign to the subscription
"""
sub_params = {
"Endpoint": self.arn,
"Protocol": "sqs",
"TopicArn": topic_arn,
}

if delivery_policy:
sub_params.update({"DeliveryPolicy": delivery_policy})

self.optional_resources.extend(
[
sns.SubscriptionResource(name_to_id(f"{self.name}Sub"), **sub_params),
self.allow_service_to_write(
service="sns",
name_suffix="Sub",
condition={"ArnLike": {"aws:SourceArn": topic_arn}},
),
]
)

@property
def arn(self) -> GetAtt:
"""SQS ARN."""
Expand All @@ -76,4 +109,7 @@ def ref(self) -> Ref:

def resources(self, stack: Stack) -> list[AWSObject]:
"""Compute AWS resources for the construct."""
return [sqs.Queue.from_dict(name_to_id(self.name), self.attr)]
return [
grouigrokon marked this conversation as resolved.
Show resolved Hide resolved
sqs.Queue.from_dict(name_to_id(self.name), self.attr),
*self.optional_resources,
]
Empty file.
88 changes: 88 additions & 0 deletions tests/tests_e3_aws/troposphere/sqs/sqs_test.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,88 @@
from __future__ import annotations

from e3.aws.troposphere import Stack
from e3.aws.troposphere.sqs import Queue

EXPECTED_QUEUE_DEFAULT_TEMPLATE = {
"Myqueue": {
"Properties": {"QueueName": "myqueue", "VisibilityTimeout": 30},
"Type": "AWS::SQS::Queue",
}
}


EXPECTED_QUEUE_TEMPLATE = {
"Myqueue": {
"Properties": {
"ContentBasedDeduplication": True,
"FifoQueue": True,
"QueueName": "myqueue.fifo",
"RedrivePolicy": {
"deadLetterTargetArn": {"Fn::GetAtt": ["Somedlqname", "Arn"]},
"maxReceiveCount": "3",
},
"VisibilityTimeout": 10,
},
"Type": "AWS::SQS::Queue",
}
}


EXPECTED_SQS_SUBSCRIPTION_TEMPLATE = {
"Myqueue": {
"Properties": {"QueueName": "myqueue", "VisibilityTimeout": 30},
"Type": "AWS::SQS::Queue",
},
"MyqueuePolicySub": {
"Properties": {
"PolicyDocument": {
"Statement": [
{
"Action": "sqs:SendMessage",
"Condition": {"ArnLike": {"aws:SourceArn": "some_topic_arn"}},
"Effect": "Allow",
"Principal": {"Service": "sns.amazonaws.com"},
"Resource": {"Fn::GetAtt": ["Myqueue", "Arn"]},
}
],
"Version": "2012-10-17",
},
"Queues": [{"Ref": "Myqueue"}],
},
"Type": "AWS::SQS::QueuePolicy",
},
"MyqueueSub": {
"Properties": {
"Endpoint": {"Fn::GetAtt": ["Myqueue", "Arn"]},
"Protocol": "sqs",
"TopicArn": "some_topic_arn",
},
"Type": "AWS::SNS::Subscription",
},
}


def test_queue_default(stack: Stack) -> None:
"""Test Queue default creation."""
stack.add(Queue(name="myqueue"))
assert stack.export()["Resources"] == EXPECTED_QUEUE_DEFAULT_TEMPLATE


def test_queue(stack: Stack) -> None:
"""Test Queue creation."""
stack.add(
Queue(
name="myqueue", fifo=True, visibility_timeout=10, dlq_name="some_dlq_name"
)
)
assert stack.export()["Resources"] == EXPECTED_QUEUE_TEMPLATE


def test_subscribe_to_sns_topic(stack: Stack) -> None:
"""Test sqs subscription to sns topic."""
queue = Queue(name="myqueue")
queue.subscribe_to_sns_topic("some_topic_arn")

stack.add(queue)

assert stack.export()["Resources"] == EXPECTED_SQS_SUBSCRIPTION_TEMPLATE
Loading