forked from panther-labs/panther-analysis
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathaws_resource_made_public.py
85 lines (64 loc) · 2.64 KB
/
aws_resource_made_public.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
import json
from panther import aws_cloudtrail_success
from panther_base_helpers import deep_get
from policyuniverse.policy import Policy
# Check that the IAM policy allows resource accessibility via the Internet
def policy_is_internet_accessible(json_policy):
if json_policy is None:
return False
return Policy(json_policy).is_internet_accessible()
# Normally this check helps avoid overly complex functions that are doing too many things,
# but in this case we explicitly want to handle 10 different cases in 10 different ways.
# Any solution that avoids too many return statements only increases the complexity of this rule.
# pylint: disable=too-many-return-statements
def rule(event):
if not aws_cloudtrail_success(event):
return False
parameters = event.get("requestParameters", {})
# Ignore events that are missing request params
if not parameters:
return False
policy = ""
# S3
if event["eventName"] == "PutBucketPolicy":
return policy_is_internet_accessible(parameters.get("bucketPolicy"))
# ECR
if event["eventName"] == "SetRepositoryPolicy":
policy = parameters.get("policyText", {})
# Elasticsearch
if event["eventName"] in ["CreateElasticsearchDomain", "UpdateElasticsearchDomainConfig"]:
policy = parameters.get("accessPolicies", {})
# KMS
if event["eventName"] in ["CreateKey", "PutKeyPolicy"]:
policy = parameters.get("policy", {})
# S3 Glacier
if event["eventName"] == "SetVaultAccessPolicy":
policy = deep_get(parameters, "policy", "policy", default={})
# SNS & SQS
if event["eventName"] in ["SetQueueAttributes", "CreateTopic"]:
policy = deep_get(parameters, "attributes", "Policy", default={})
# SNS
if (
event["eventName"] == "SetTopicAttributes"
and parameters.get("attributeName", "") == "Policy"
):
policy = parameters.get("attributeValue", {})
# SecretsManager
if event["eventName"] == "PutResourcePolicy":
policy = parameters.get("resourcePolicy", {})
if not policy:
return False
return policy_is_internet_accessible(json.loads(policy))
def title(event):
# TODO(): Update this rule to use data models
user = deep_get(event, "userIdentity", "userName") or deep_get(
event,
"userIdentity",
"sessionContext",
"sessionIssuer",
"userName",
default="<MISSING_USER>",
)
if event.get("Resources"):
return f"Resource {event.get('Resources')[0].get('arn', 'MISSING')} made public by {user}"
return f"{event.get('eventSource', 'MISSING SOURCE')} resource made public by {user}"