Skip to content

Commit

Permalink
Merge branch 'mr/forestier/add-policy-statement-id' into 'master'
Browse files Browse the repository at this point in the history
Add policy statements id

See merge request it/e3-aws!6
  • Loading branch information
jeromef853 committed Jul 25, 2024
2 parents 63499e4 + 5c9b475 commit 543d806
Show file tree
Hide file tree
Showing 7 changed files with 80 additions and 7 deletions.
24 changes: 21 additions & 3 deletions src/e3/aws/troposphere/iam/policy_statement.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ class PolicyStatement:
def __init__(
self,
action: str | list[str],
sid: str | None = None,
effect: str = "Deny",
resource: ResourceType | None = None,
principal: PrincipalType | None = None,
Expand All @@ -25,11 +26,13 @@ def __init__(
"""Initialize a policy statement.
:param action: actions on which the policy has effect
:param effect: effect of the policy (Allow, Deny ..)
:param sid: unique statement identifier
:param effect: effect of the policy (Allow, Deny ...)
:param resource: resource on which the policy has effect
:param principal: principal affected by the policy
:param condition: conditions for when the policy is in effect
"""
self.sid = sid
self.action = action
self.effect = effect
self.resource = resource
Expand All @@ -38,10 +41,11 @@ def __init__(

@property
def as_dict(self) -> dict[str, Any]:
"""Return a dictionnary defining a troposphere policy statement."""
"""Return a dictionary defining a troposphere policy statement."""
return {
key: val
for key, val in {
"Sid": self.sid,
"Effect": self.effect,
"Principal": self.principal,
"Action": self.action,
Expand All @@ -56,18 +60,21 @@ class Allow(PolicyStatement):
def __init__(
self,
action: str | list[str],
sid: str | None = None,
resource: ResourceType | None = None,
principal: PrincipalType | None = None,
condition: ConditionType | None = None,
) -> None:
"""Initialize an Allow policy statement.
:param action: actions on which the policy has effect
:param sid: unique statement identifier
:param resource: resource on which the policy has effect
:param principal: principal affected by the policy
:param condition: conditions for when the policy is in effect
"""
super().__init__(
sid=sid,
action=action,
effect="Allow",
resource=resource,
Expand All @@ -80,14 +87,19 @@ class AssumeRole(PolicyStatement):
"""Define a sts:AssumeRole role policy statement."""

def __init__(
self, principal: PrincipalType, condition: ConditionType | None = None
self,
principal: PrincipalType,
sid: str | None = None,
condition: ConditionType | None = None,
):
"""Initialize an AssumeRole statement.
:param principal: principal which are allowed to assume the role
:param sid: unique statement identifier
:param condition: condition to apply
"""
super().__init__(
sid=sid,
action="sts:AssumeRole",
effect="Allow",
resource=None,
Expand All @@ -101,6 +113,7 @@ class Trust(PolicyStatement):

def __init__(
self,
sid: str | None = None,
services: list[str] | None = None,
accounts: list[str] | None = None,
users: list[tuple[str, str]] | None = None,
Expand All @@ -110,6 +123,7 @@ def __init__(
) -> None:
"""Initialize a trust policy statement.
:param sid: unique statement identifier
:param services: list of services to trust (without amazonaws.com suffix)
:param accounts: list of accounts to trust (accounts alias not allowed)
:param users: list of users as tuple (account number, user name)
Expand Down Expand Up @@ -146,6 +160,7 @@ def __init__(

self.condition = condition
self.actions = actions
self.sid = sid

@property
def as_dict(self) -> dict[str, Any]:
Expand All @@ -159,4 +174,7 @@ def as_dict(self) -> dict[str, Any]:
if self.condition is not None:
result["Condition"] = self.condition

if self.sid is not None:
result["Sid"] = self.sid

return result
2 changes: 2 additions & 0 deletions src/e3/aws/troposphere/s3/bucket.py
Original file line number Diff line number Diff line change
Expand Up @@ -169,6 +169,7 @@ def notification_setup(
for _, topic, _ in self.topic_configurations:
if topic:
topic_policy_name = topic.add_allow_service_to_publish_statement(
applicant=f"{name_to_id(self.name)}",
service="s3",
condition={"ArnLike": {"aws:SourceArn": self.arn}},
)
Expand All @@ -185,6 +186,7 @@ def notification_setup(
for _, queue, _ in self.queue_configurations:
if queue:
queue_policy_name = queue.add_allow_service_to_write_statement(
applicant=f"{name_to_id(self.name)}",
service="s3",
condition={"ArnLike": {"aws:SourceArn": self.arn}},
)
Expand Down
9 changes: 8 additions & 1 deletion src/e3/aws/troposphere/sns/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -64,16 +64,18 @@ def add_lambda_subscription(
)

def add_allow_service_to_publish_statement(
self, service: str, condition: ConditionType | None = None
self, service: str, applicant: str, condition: ConditionType | None = None
) -> str:
"""Add a statement in TopicPolicy allowing a service to publish to the topic.
:param service: service allowed to publish
:param applicant: applicant name used for the Sid statement
:param condition: condition to be able to publish
:return: the TopicPolicy name for depends_on settings
"""
self.topic_policy_statements.append(
Allow(
sid=f"{applicant}PubAccess",
action="sns:Publish",
resource=self.ref,
principal={"Service": f"{service}.amazonaws.com"},
Expand Down Expand Up @@ -101,6 +103,11 @@ def resources(self, stack: Stack) -> list[AWSObject]:

# Add Topic policy to optional resources if any statement
if self.topic_policy_statements:
# Check for unique Sid
check_sid = [statement.sid for statement in self.topic_policy_statements]
if len(check_sid) != len(set(check_sid)):
raise Exception("Unique Sid is required for TopicPolicy statements")

self.optional_resources.extend(
[
sns.TopicPolicy(
Expand Down
13 changes: 11 additions & 2 deletions src/e3/aws/troposphere/sqs/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -56,16 +56,18 @@ def _get_queue_policy_name(self) -> str:
return name_to_id(f"{self.name}Policy")

def add_allow_service_to_write_statement(
self, service: str, condition: Optional[ConditionType] = None
self, service: str, applicant: str, condition: Optional[ConditionType] = None
) -> str:
"""Add a statement in QueuePolicy allowing a service to send msg to the queue.
:param service: service allowed to send message
:param applicant: applicant name used for the Sid statement
:param condition: condition to be able to send message
:return: the QueuePolicy name for depends_on settings
"""
self.queue_policy_statements.append(
Allow(
sid=f"{applicant}WriteAccess",
action="sqs:SendMessage",
resource=self.arn,
principal={"Service": f"{service}.amazonaws.com"},
Expand All @@ -75,11 +77,12 @@ def add_allow_service_to_write_statement(
return self._get_queue_policy_name()

def subscribe_to_sns_topic(
self, topic_arn: str, delivery_policy: dict | None = None
self, topic_arn: str, applicant: str, delivery_policy: dict | None = None
) -> None:
"""Subscribe to SNS topic.
:param topic_arn: ARN of the topic to subscribe
:param applicant: applicant name used for the Sid statement
:param delivery_policy: The delivery policy to assign to the subscription
"""
sub_params = {
Expand All @@ -93,6 +96,7 @@ def subscribe_to_sns_topic(
sub_params.update({"DeliveryPolicy": delivery_policy})

self.add_allow_service_to_write_statement(
applicant=applicant,
service="sns",
condition={"ArnLike": {"aws:SourceArn": topic_arn}},
)
Expand All @@ -115,6 +119,11 @@ def resources(self, stack: Stack) -> list[AWSObject]:
"""Compute AWS resources for the construct."""
# Add Queue policy to optional resources if any statement
if self.queue_policy_statements:
# Check for unique Sid
check_sid = [statement.sid for statement in self.queue_policy_statements]
if len(check_sid) != len(set(check_sid)):
raise Exception("Unique Sid is required for QueuePolicy statements")

self.optional_resources.extend(
[
sqs.QueuePolicy(
Expand Down
2 changes: 2 additions & 0 deletions tests/tests_e3_aws/troposphere/s3/bucket.json
Original file line number Diff line number Diff line change
Expand Up @@ -140,6 +140,7 @@
"Version": "2012-10-17",
"Statement": [
{
"Sid": "TestBucketPubAccess",
"Effect": "Allow",
"Principal": {
"Service": "s3.amazonaws.com"
Expand All @@ -164,6 +165,7 @@
"PolicyDocument": {
"Statement": [
{
"Sid": "TestBucketWriteAccess",
"Action": "sqs:SendMessage",
"Condition": {
"ArnLike": {
Expand Down
20 changes: 20 additions & 0 deletions tests/tests_e3_aws/troposphere/sns/sns_test.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
from __future__ import annotations

import pytest

from e3.aws.troposphere import Stack
from e3.aws.troposphere.sns import Topic

Expand Down Expand Up @@ -41,3 +43,21 @@ def test_topic(stack: Stack) -> None:
)
)
assert stack.export()["Resources"] == EXPECTED_TOPIC_TEMPLATE


def test_allow_service_to_publish_not_unique_sid(stack: Stack) -> None:
"""Test topic creation with same Sid statements in Access Policy."""
topic = Topic("mytopic")
topic.add_allow_service_to_publish_statement(
applicant="SomeApplicant",
service="s3",
)
topic.add_allow_service_to_publish_statement(
applicant="SomeApplicant",
service="lambda",
)

with pytest.raises(Exception) as ex:
stack.add(topic)

assert str(ex.value) == "Unique Sid is required for TopicPolicy statements"
17 changes: 16 additions & 1 deletion tests/tests_e3_aws/troposphere/sqs/sqs_test.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
from __future__ import annotations

import pytest

from e3.aws.troposphere import Stack
from e3.aws.troposphere.sqs import Queue

Expand Down Expand Up @@ -38,6 +40,7 @@
"PolicyDocument": {
"Statement": [
{
"Sid": "SomeApplicantWriteAccess",
"Action": "sqs:SendMessage",
"Condition": {"ArnLike": {"aws:SourceArn": "some_topic_arn"}},
"Effect": "Allow",
Expand Down Expand Up @@ -82,8 +85,20 @@ def test_queue(stack: Stack) -> None:
def test_subscribe_to_sns_topic(stack: Stack) -> None:
"""Test sqs subscription to sns topic."""
queue = Queue(name="myqueue")
queue.subscribe_to_sns_topic("some_topic_arn")
queue.subscribe_to_sns_topic(topic_arn="some_topic_arn", applicant="SomeApplicant")

stack.add(queue)

assert stack.export()["Resources"] == EXPECTED_SQS_SUBSCRIPTION_TEMPLATE


def test_allow_service_to_write_not_unique_sid(stack: Stack) -> None:
"""Test Queue creation with same sid statements in Access Policy."""
queue = Queue(name="myqueue")
queue.add_allow_service_to_write_statement(service="sns", applicant="SomeApplicant")
queue.add_allow_service_to_write_statement(service="s3", applicant="SomeApplicant")

with pytest.raises(Exception) as ex:
stack.add(queue)

assert str(ex.value) == "Unique Sid is required for QueuePolicy statements"

0 comments on commit 543d806

Please sign in to comment.