-
Notifications
You must be signed in to change notification settings - Fork 0
/
ssm-s3-public-remediation
161 lines (146 loc) · 6.11 KB
/
ssm-s3-public-remediation
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
#
# Description: Check that all parameter store are encrypted.
#
# Trigger Type: Config Trigger
# Scope of Changes: S3:EncryptionConfiguration
# Accepted Parameters: None
# Output : finding in Security Hub
from datetime import datetime
import boto3
import sys
import json
from botocore.exceptions import ClientError
def create_incident(s3name, account_id):
now = datetime.now()
nowstr = now.strftime("%Y-%m-%dT%X+00:00")
security = boto3.client('securityhub', region_name='ca-central-1')
response = security.batch_import_findings(
Findings=[
{
"AwsAccountId": account_id,
"Confidence": 100,
"CreatedAt": nowstr,
"Criticality": 10,
"CompanyName": "ProtectyourCloud",
"ProductName": "Security Fondation",
"Description": "A bucket has been automatically restricted",
"FirstObservedAt": nowstr,
"GeneratorId": "protectyourCloud-compliance-s3-002",
"Id": "ca-central-1/" + account_id + "/s3public/" + s3name,
"LastObservedAt": nowstr,
"Note": {
"Text": "Finding generated by custom Lambda, due to an S3 which was public without the appropriate tag. The risk has been automatically remediated, no further action needed.",
"UpdatedAt": nowstr,
"UpdatedBy": "ProtectyourCloud automation"
},
"ProductArn": "arn:aws:securityhub:ca-central-1:" + account_id + ":product/" + account_id + "/default",
"Remediation": {
"Recommendation": {
"Text": "Tag the S3 bucket with the appriate tag protectyourcloud:classification-securite-donnees if public exposure needed",
"Url": "https://registry.terraform.io/providers/hashicorp/aws/latest/docs/resources/s3_bucket"
}
},
"Resources": [
{
"Details": {
"Other": {
"string": s3name
}
},
"Id": "arn:aws:s3:::" + s3name,
"Partition": "aws",
"Region": "ca-central-1",
"Type": "AwsS3Bucket"
}
],
"SchemaVersion": "2018-10-08",
"Severity": {
"Label": "LOW"
},
"Title": "S3 bucket exposed without tag",
"Types": [
"Software and Configuration Checks/Industry and Regulatory Standards/CIS AWS Foundations Benchmark"],
"UpdatedAt": nowstr,
"VerificationState": "TRUE_POSITIVE",
}
]
)
def restrict_status(s3name):
client = boto3.client("ssm")
response = client.start_automation_execution(
DocumentName='AWS-DisableS3BucketPublicReadWrite',
DocumentVersion='$DEFAULT',
Parameters={
'S3BucketName': [
s3name,
]
}
)
print("Restriction done on bucket "+s3name)
def is_public_access_blocked(s3name):
s3 = boto3.client("s3")
publicblocks = s3.get_public_access_block(Bucket=s3name)
if publicblocks['PublicAccessBlockConfiguration']['BlockPublicAcls'] and publicblocs['PublicAccessBlockConfiguration']['BlockPublicPolicy']:
return True
else:
return False
def is_bucket_policy_public(s3name):
s3 = boto3.client("s3")
try:
bucket_policy = s3.get_bucket_policy_status(Bucket=s3name)
except:
print("Bucket %s doesn't have Policy attached" % (s3name))
return False
if bucket_policy['PolicyStatus']['IsPublic']:
return True
else:
return False
def is_bucket_acl_public(s3name):
permissions_to_check = ['READ', 'WRITE', 'FULL_CONTROL']
public_acl_indicator = ['http://acs.amazonaws.com/groups/global/AllUsers','http://acs.amazonaws.com/groups/global/AuthenticatedUsers']
s3 = boto3.client("s3")
try:
bucket_acl = s3.get_bucket_acl(Bucket=s3name)
except:
print("Problem retrieving ACL for Bucket " + s3name)
return False
for grant in bucket_acl['Grants']:
for (k, v) in grant.items():
if k == 'Permission' and any(permission in v for permission in permissions_to_check):
for (grantee_attrib_k, grantee_attrib_v) in grant['Grantee'].items():
if 'URI' in grantee_attrib_k and grant['Grantee']['URI'] in public_acl_indicator:
print("ACL public is in place on the bucket "+s3name)
return True
print("ACL public is NOT in place on the bucket "+s3name)
return False
def is_tagged_public(s3name):
s3 = boto3.client("s3")
try:
tags = s3.get_bucket_tagging(Bucket=s3name)
for tag in tags['TagSet']:
if tag['Key'] == "protectyourcloud:classification-securite-donnees" and tag['Value'].upper() == "PUBLIC":
return True
except ClientError as e:
print('Bucket: %s, doesn t have tags associated' % (s3name))
return False
return False
def lambda_handler(event, context):
#
# prepare variables for the Security Hub finding
#
client = boto3.client("sts")
account_id = client.get_caller_identity()["Account"]
s3arn = str(event['resources'][0])
spl_word = ":::"
s3bucketname = s3arn.partition(spl_word)[2]
if is_public_access_blocked(s3bucketname):
print ("Bucket %s is protected and so not public" % (s3bucketname))
sys.exit(1)
else:
if is_bucket_policy_public(s3bucketname) or is_bucket_acl_public(s3bucketname):
if is_tagged_public(s3bucketname):
print('Bucket: %s, is public and tagged as is' % (s3bucketname))
else:
print('Bucket: %s, is public but without tags associated' % (s3bucketname))
restrict_status(s3bucketname)
create_incident(s3bucketname, account_id)