Skip to content

Commit

Permalink
Merge branch 'mr/forestier/add-filter-policy-topic-sub' into 'master'
Browse files Browse the repository at this point in the history
Add filter policy argument to sns subscription

See merge request it/e3-aws!9
  • Loading branch information
jeromef853 committed Aug 14, 2024
2 parents f162626 + b85d494 commit 6065641
Show file tree
Hide file tree
Showing 2 changed files with 76 additions and 2 deletions.
17 changes: 15 additions & 2 deletions src/e3/aws/troposphere/sqs/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -77,13 +77,20 @@ def add_allow_service_to_write_statement(
return self._get_queue_policy_name()

def subscribe_to_sns_topic(
self, topic_arn: str, applicant: str, delivery_policy: dict | None = None
self,
topic_arn: str,
applicant: str,
delivery_policy: dict | None = None,
filter_policy: dict | None = None,
filter_policy_scope: str | None = None,
) -> None:
"""Subscribe to SNS topic.
:param topic_arn: ARN of the topic to subscribe
:param applicant: applicant name used for the Sid statement
:param delivery_policy: The delivery policy to assign to the subscription
:param filter_policy: The filter policy JSON assigned to the subscription.
:param filter_policy_scope: The filtering scope.
"""
sub_params = {
"Endpoint": self.arn,
Expand All @@ -93,7 +100,13 @@ def subscribe_to_sns_topic(
}

if delivery_policy:
sub_params.update({"DeliveryPolicy": delivery_policy})
sub_params["DeliveryPolicy"] = delivery_policy

if filter_policy:
sub_params["FilterPolicy"] = filter_policy

if filter_policy_scope:
sub_params["FilterPolicyScope"] = filter_policy_scope

self.add_allow_service_to_write_statement(
applicant=applicant,
Expand Down
61 changes: 61 additions & 0 deletions tests/tests_e3_aws/troposphere/sqs/sqs_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,52 @@
},
}

EXPECTED_SQS_SUBSCRIPTION_WITH_FILTER_TEMPLATE = {
"Myqueue": {
"Properties": {"QueueName": "myqueue", "VisibilityTimeout": 30},
"Type": "AWS::SQS::Queue",
},
"MyqueuePolicy": {
"Properties": {
"PolicyDocument": {
"Statement": [
{
"Sid": "SomeApplicantWriteAccess",
"Action": "sqs:SendMessage",
"Condition": {"ArnLike": {"aws:SourceArn": "some_topic_arn"}},
"Effect": "Allow",
"Principal": {"Service": "sns.amazonaws.com"},
"Resource": {"Fn::GetAtt": ["Myqueue", "Arn"]},
}
],
"Version": "2012-10-17",
},
"Queues": [{"Ref": "Myqueue"}],
},
"Type": "AWS::SQS::QueuePolicy",
},
"MyqueueSub": {
"Properties": {
"Endpoint": {"Fn::GetAtt": ["Myqueue", "Arn"]},
"Protocol": "sqs",
"TopicArn": "some_topic_arn",
"RawMessageDelivery": True,
"FilterPolicy": {
"key_a": {
"key_b": {
"key_c": [
"value_1",
"value_2",
],
},
},
},
"FilterPolicyScope": "MessageBody",
},
"Type": "AWS::SNS::Subscription",
},
}


def test_queue_default(stack: Stack) -> None:
"""Test Queue default creation."""
Expand Down Expand Up @@ -102,3 +148,18 @@ def test_allow_service_to_write_not_unique_sid(stack: Stack) -> None:
stack.add(queue)

assert str(ex.value) == "Unique Sid is required for QueuePolicy statements"


def test_subscribe_to_sns_topic_with_policy_filter(stack: Stack) -> None:
"""Test sqs subscription to sns topic with a policy filter."""
queue = Queue(name="myqueue")
queue.subscribe_to_sns_topic(
topic_arn="some_topic_arn",
applicant="SomeApplicant",
filter_policy={"key_a": {"key_b": {"key_c": ["value_1", "value_2"]}}},
filter_policy_scope="MessageBody",
)

stack.add(queue)

assert stack.export()["Resources"] == EXPECTED_SQS_SUBSCRIPTION_WITH_FILTER_TEMPLATE

0 comments on commit 6065641

Please sign in to comment.