-
Notifications
You must be signed in to change notification settings - Fork 17
/
Copy path__init__.py
204 lines (163 loc) · 6.96 KB
/
__init__.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
import re
from functools import lru_cache
from google.cloud.securitycenter import Asset, Finding, SecurityCenterClient
import google.api_core.exceptions
from ...log import log
from . import api
class APIDataError(Exception):
pass
class AssetNotFound(APIDataError):
pass
class Cache():
def __init__(self):
self._projects = {}
self._sources = {}
self._assets = {}
self._findings = {}
def asset(self, event):
asset_id = event.instance_id
if asset_id not in self._assets:
project_number = event.cloud_provider_account_id
assets = self.scc.get_asset(project_number, asset_id)
if len(assets) == 1:
self._assets[asset_id] = assets[0].asset
elif len(assets) == 0:
raise AssetNotFound("Asset {} not found in GCP Project {}".format(asset_id, project_number))
else:
raise APIDataError(
"Multiple assets found with ID={} within GCP Project {}".format(asset_id, project_number))
return self._assets[asset_id]
def source(self, org_id):
if org_id not in self._sources:
self._sources[org_id] = self.scc.get_or_create_fig_source(org_id)
return self._sources[org_id]
def organization_parent_of(self, project_id):
project = self.project(project_id)
return api.project_get_parent_org(project)
def project_number_accesible(self, project_number: int) -> bool:
try:
return self.project(project_number) is not None
except google.api_core.exceptions.PermissionDenied:
return False
def project(self, project_number: int):
if project_number not in self._projects:
self._projects[project_number] = api.project(project_number)
return self._projects[project_number]
def submit_finding(self, finding_id, finding: Finding, org_id: str):
if org_id not in self._findings:
self._findings[org_id] = {}
if finding_id in self._findings[org_id]:
log.debug("Finding %s already exists in GCP SCC", finding_id)
return None
finding = self.scc.get_or_create_finding(finding_id, finding, self.source(org_id))
self._findings[org_id][finding_id] = finding
return finding
@property
@lru_cache
def scc(self):
return api.SecurityCommandCenter()
class Submitter():
def __init__(self, cache, event):
self.cache = cache
self.event = event
def submit(self):
log.info("Processing detection: %s", self.event.detect_description)
if not self.cache.project_number_accesible(self.gcp_project_number):
log.warning(
"Cannot access GCP project (number=%s) to report malicious behaviour to GCP Security Command Center. Please grant 'roles/securitycenter.admin' role to this service account in every GCP organization that needs to have CrowdStrike detections forwarded to SCC.",
self.gcp_project_number)
return
try:
finding = self.finding()
except AssetNotFound:
log.warning("Corresponding asset not found in GCP Project")
return
self.submit_finding(finding)
def finding(self):
return Finding(
name=self.finding_path,
parent=self.source_path,
resource_name=self.asset.security_center_properties.resource_name,
state=Finding.State.ACTIVE,
external_uri=self.event.falcon_link,
event_time=self.event.time,
category=self.event_category,
severity=self.severity,
source_properties={
'FalconEventId': self.event.event_id,
'ComputerName': self.event.original_event.computer_name,
'Description': self.event.detect_description,
'Severity': self.severity,
'Title': 'Falcon Alert. Instance {}'.format(self.event.instance_id),
'Category': self.event_category,
'ProcessInformation': {
'ProcessName': self.event.original_event['event']['FileName'],
'ProcessPath': self.event.original_event['event']['FilePath'],
'CommandLine': self.event.original_event['event']['CommandLine']
},
}
)
# ART Uncomment these fields to force behavior parity with the prior art. Don't.
# ART finding.severity=self.original_event['event']['Severity']
# ART finding.source_properties.severity = self.original_event['event']['Severity']
# ART del(finding.source_properties.CommandLine)
# ART del(finding.source_properties.FalconEventId)
@property
def event_category(self):
# ART Uncomment the following lines to force behavior parity with the prior art. Don't.
# ART tactic = self.event.original_event['event']['Tactic']
# ART technique = self.event.original_event['event']['Technique']
# ART if tactic and technique:
# return 'Namespace: TTPs, Category: {}, Classifier: {}'.format(tactic, technique)
# ART return self.event.detect_description
return self.event.detect_name
@property
def severity(self):
sev = self.event.severity.upper()
return 'LOW' if sev == 'INFORMATIONAL' else sev
def submit_finding(self, finding):
return self.cache.submit_finding(self.finding_id, finding, self.org_id)
@property
def asset_path(self):
return self.asset.name
@property
def asset(self) -> Asset:
return self.cache.asset(self.event)
@property
def finding_path(self):
return SecurityCenterClient.finding_path(self.org_id, self.source_id, self.finding_id)
@property
@lru_cache
def finding_id(self):
event_id = re.sub('[^0-9a-zA-Z]+', '', self.event.event_id)
creation_time = str(hex(int(self.event.original_event['metadata']['eventCreationTime'])))[-6:]
return (event_id + creation_time)[-32:]
@property
def source_path(self):
return SecurityCenterClient.source_path(self.org_id, self.source_id)
@property
def source_id(self):
parsed = SecurityCenterClient.parse_source_path(self.source.name)
return parsed['source']
@property
@lru_cache
def source(self):
return self.cache.source(self.org_id)
@property
@lru_cache
def org_id(self):
return self.cache.organization_parent_of(self.gcp_project_number)
@property
@lru_cache
def gcp_project_number(self):
return self.event.cloud_provider_account_id
class Runtime():
RELEVANT_EVENT_TYPES = ['DetectionSummaryEvent']
def __init__(self):
log.info("GCP Backend is enabled.")
self.cache = Cache()
def is_relevant(self, falcon_event):
return falcon_event.cloud_provider == 'GCP'
def process(self, falcon_event):
Submitter(self.cache, falcon_event).submit()
__all__ = ['Runtime']