-
Notifications
You must be signed in to change notification settings - Fork 6
/
twitter_search.py
289 lines (218 loc) · 11.4 KB
/
twitter_search.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
from twitter import *
import json
from threatresponse import ThreatResponse
from datetime import datetime
import os
import webexteamssdk
def open_config():
'''
this function opens config.json
'''
if os.path.isfile("config.json"):
global config_file
with open("config.json", 'r') as config_file:
config_file = json.loads(config_file.read())
print("\nThe config.json file was loaded.\n")
else:
print("No config.json file, please make sure config.json file is in same directory.\n")
def write_config():
'''
This function writes to config.json
'''
with open("config.json", 'w') as output_file:
json.dump(config_file, output_file, indent=4)
def return_observables(tweet_text):
'''
this function will parse raw text and return the observables and types
'''
ctr_client = ThreatResponse(
client_id=config_file["ctr"]["client_id"],
client_password=config_file["ctr"]["client_password"],
region='us', # can be change to eu or apjc
)
data = {"content":tweet_text}
response = ctr_client.inspect.inspect(data)
#check if request was succesful
if response:
return(response)
elif response == []:
print(f"No observables found for tweet:\n\n{tweet_text}\n")
return False
else:
print(f"Error occured in inspecting tweet:\n\n{tweet_text}\n")
return False
def return_non_clean_observables(returned_observables_json):
'''
this function returns only non clean observables (to remove noise)
'''
ctr_client = ThreatResponse(
client_id=config_file["ctr"]["client_id"],
client_password=config_file["ctr"]["client_password"],
region='us', # can be change to eu or apjc
)
# create empty list to store clean observables
clean_observables = []
# retrieve dispositions for observables
response = ctr_client.enrich.deliberate.observables(returned_observables_json)
disposition_observables = response
# parse through json and search for observables with clean disposition (1)
for module in disposition_observables['data']:
module_name = module['module']
if 'verdicts' in module['data'] and module['data']['verdicts']['count'] > 0:
docs = module['data']['verdicts']['docs']
for doc in docs:
observable = doc['observable']
# if the disposition is clean / 1 then add to separate list to remove from other list
if doc['disposition'] == 1:
clean_observables.append(observable)
#print(f"Clean observable, omitting: {observable}\n")
non_clean_observables = [i for i in returned_observables_json if not i in clean_observables or clean_observables.remove(i)]
non_clean_observables_json = non_clean_observables
return non_clean_observables_json
def check_for_sighting(returned_observables_json):
'''
this function checks if there is a sighting for a specific observable
'''
ctr_client = ThreatResponse(
client_id=config_file["ctr"]["client_id"],
client_password=config_file["ctr"]["client_password"],
region='us', # can be change to eu or apjc
)
data = returned_observables_json
response = ctr_client.enrich.observe.observables(data)
#check if request was succesful
if response:
returned_data = response
total_amp_sighting_count = 0
total_umbrella_sighting_count = 0
total_email_sighting_count = 0
# run through all modules to check for sightings (currently checking the amp, umbrella and SMA modules)
for module in returned_data['data']:
if module['module'] == "AMP for Endpoints":
# json key not always there, error checking...
if 'sightings' in module['data']:
# store amount of sightings
total_amp_sighting_count = module['data']['sightings']['count']
if module['module'] == "Umbrella":
# json key not always there, error checking...
if 'sightings' in module['data']:
# store amount of sightings
total_umbrella_sighting_count = module['data']['sightings']['count']
if module['module'] == "SMA Email":
# json key not always there, error checking...
if 'sightings' in module['data']:
# store amount of sightings
total_email_sighting_count = module['data']['sightings']['count']
# create dict to store information regarding the sightings
total_sighting_count = total_amp_sighting_count + total_umbrella_sighting_count + total_email_sighting_count
return_sightings = {
'total_sighting_count': total_sighting_count,
'total_amp_sighting_count': total_amp_sighting_count,
'total_umbrella_sighting_count': total_umbrella_sighting_count,
'total_email_sighting_count': total_email_sighting_count
}
return(return_sightings)
else:
print(f"Sighting check request failed...\n")
return(response)
def new_casebook(returned_observables_json,returned_sightings,user_name,tweet_text):
'''
this function post list of observables to new case in casebook
'''
ctr_client = ThreatResponse(
client_id=config_file["ctr"]["client_id"],
client_password=config_file["ctr"]["client_password"],
region='us', # can be change to eu or apjc
)
# create title and description for SOC researcher to have more context, if there are sightings, add high priority
if returned_sightings['total_sighting_count'] == 0:
casebook_title = " #opendir Tweet: " + user_name
if returned_sightings['total_sighting_count'] != 0:
casebook_title = "*HIGH PRIORITY* #opendir Tweet: " + user_name
casebook_description = f"Twitter generated casebook from #opendir by: {user_name}, Tweet: {tweet_text}"
casebook_datetime = datetime.now().isoformat() + "Z"
# create right json format to create casebook
casebook_json = {
"title": casebook_title,
"description": casebook_description,
"observables": returned_observables_json,
"type": "casebook",
"timestamp": casebook_datetime
}
# create CTR search url
search_base_url = "https://visibility.amp.cisco.com/investigate?q="
for observable in returned_observables_json:
search_string_to_append = observable["type"] + "%3A" + observable["value"] + "%0A"
search_base_url = search_base_url + search_string_to_append
# post request to create casebook
response = ctr_client.private_intel.casebook.post(casebook_json)
if response:
print(f"[201] Success, case added to Casebook added from #opendir Tweet by: {user_name}\n")
# if Webex Teams tokens set, then send message to Webex room
if config_file['webex']['access_token'] == '' or config_file['webex']['room_id'] == '':
# user feed back
print("Webex Teams not set.\n\n")
else:
# instantiate the Webex handler with the access token
teams = webexteamssdk.WebexTeamsAPI(config_file['webex']['access_token'])
# post a message to the specified Webex room
try:
if returned_sightings['total_sighting_count'] == 0:
webex_text = f"🚨🚨🚨 - **New case added to SecureX Casebook added from 🐦 *#OPENDIR*!** - 🚨🚨🚨\n\nTweet by {user_name}:\n\n---\n>{tweet_text}\n\n---\n\n**Investigate directly with SecureX threat response:** {search_base_url}"
message = teams.messages.create(config_file['webex']['room_id'], markdown=webex_text)
if returned_sightings['total_sighting_count'] != 0:
webex_text = f"🚨🚨🚨 - **New case added to SecureX Casebook added from 🐦 *#OPENDIR*!** - 🚨🚨🚨\n\nTweet by {user_name}:\n\n---\n>{tweet_text}\n\n---\n\n**HIGH PRIORITY**, Target Sightings have been identified! AMP targets: {str(returned_sightings['total_amp_sighting_count'])}, Umbrella targets: {str(returned_sightings['total_umbrella_sighting_count'])}, Email targets: {str(returned_sightings['total_email_sighting_count'])}.\n\n**Investigate directly with SecureX threat response:** {search_base_url}"
message = teams.messages.create(config_file['webex']['room_id'], markdown=webex_text)
# error handling, if for example the Webex API key expired
except Exception:
print("Webex authentication failed... Please make sure Webex Teams API key has not expired. Please review developer.webex.com for more info.\n")
else:
print(f"Something went wrong while posting the casebook to CTR...\n")
return response
def twitter_search():
'''
This function will retrieve the most recent tweets with the #opendir hashtag.
It will not return tweets that have been previously returned (using the since_id)
'''
twitter_client = Twitter(
auth=OAuth(config_file["twitter"]["token"], config_file["twitter"]["token_secret"], config_file["twitter"]["consumer_key"], config_file["twitter"]["consumer_secret"]))
# Search for the latest tweets about #opendir
results = twitter_client.search.tweets(q="#opendir", count=100, result_type="recent", since_id=config_file["twitter"]["since_id"])
if results['statuses']:
config_file['twitter']['since_id'] = results['statuses'][0]['id']
write_config()
#loop through tweets
for tweet in results['statuses']:
#user feedback
print(f"New tweet detected! ID: {tweet['id']}, Date: {tweet['created_at']}.\n\n")
# username + text from tweet
user_name = tweet['user']['name']
tweet_text = tweet['text']
# retrieve observables from text
returned_observables_json = return_observables(tweet_text)
# double check if observables were found in tweet
if returned_observables_json == False:
print("No observables found, skipping tweet...\n")
else:
# return non clean (malicious, unkown etc.) observables only
non_clean_observables_json = return_non_clean_observables(returned_observables_json)
# if observables were returned (list not empty), create a casebook
if non_clean_observables_json != "[]":
# retrieve target sightings for observables
returned_sightings = check_for_sighting(non_clean_observables_json)
# create new case in casebook
new_casebook(non_clean_observables_json,returned_sightings,user_name,tweet_text)
else:
print(f"No new case created in casebook (no observables found) from: {entry_title}\n")
else:
# no changes since last since_id
print(f"No changes to #opendir detected, please try again later. Recommendation is to schedule the script daily (or 2-3 times a day).\n")
### main script
if __name__ == "__main__":
try:
# open config json file and grab client_id and secret
open_config()
# main function: retrieve twitter results, parse, investigate and create casebook
twitter_search()
except KeyboardInterrupt:
print("\nExiting...\n")