Skip to content

Commit

Permalink
Update Malwarebazaar test and comply with flake8
Browse files Browse the repository at this point in the history
  • Loading branch information
weslambert committed Dec 19, 2023
1 parent 5e71503 commit 6145891
Show file tree
Hide file tree
Showing 2 changed files with 181 additions and 158 deletions.
312 changes: 156 additions & 156 deletions salt/sensoroni/files/analyzers/malwarebazaar/malwarebazaar.py
Original file line number Diff line number Diff line change
@@ -1,156 +1,156 @@
import requests
import helpers
import json
import sys

# supports querying for hash, gimphash, tlsh, and telfhash
# usage is as follows:
# python3 malwarebazaar.py '{"artifactType":"x", "value":"y"}'


def buildReq(observ_type, observ_value):
# determine correct query type to send based off of observable type
unique_types = {'gimphash': 1, 'telfhash': 1, 'tlsh': 1}
if observ_type in unique_types:
qtype = 'get_' + observ_type
else:
qtype = 'get_info'
return {'query': qtype, observ_type: observ_value}


def sendReq(meta, query):
# send a post request with our compiled query to the API
url = meta['baseUrl']
response = requests.post(url, query)
return response.json()


def isInJson(data, target_string, maxdepth=1000, tail=0):
# searches a JSON object for an occurance of a string
# recursively.
# depth limiter (arbitrary default value of 1000)
if tail > maxdepth:
return False

if isinstance(data, dict):
for key, value in data.items():
if isinstance(value, (dict, list)):
# recursive call
if isInJson(value, target_string, maxdepth, tail + 1):
return True
elif isinstance(value, str) and target_string in value.lower():
# found target string
return True

elif isinstance(data, list):
for item in data:
if isinstance(item, (dict, list)):
# recursive call
if isInJson(item, target_string, maxdepth, tail + 1):
return True
elif isinstance(item, str) and target_string in item.lower():
# found target string
return True

return False


def prepareResults(raw):
# parse raw API response, gauge threat level
# and return status and a short summary
if raw == {}:
status = 'caution'
summary = 'internal_failure'
elif raw['query_status'] == 'ok':
parsed = raw['data'][0]
vendor_data = parsed['vendor_intel']

# get summary
if 'signature' in parsed:
summary = parsed['signature']
elif 'tags' in parsed:
summary = str(parsed['tags'][0])
elif 'YOROI_YOMI' in vendor_data:
summary = vendor_data['YOROI_YOMI']['detection']

# gauge vendors to determine an approximation of status,
# normalized to a value out of 100
# only updates score if it finds a higher indicator value
score = 0
vendor_info_list = [
('vxCube', 'maliciousness', int),
('Triage', 'score', lambda x: int(x) * 10),
('DocGuard', 'alertlevel', lambda x: int(x) * 10),
('YOROI_YOMI', 'score', lambda x: int(float(x)) * 100),
('Inquest', 'verdict', lambda x: 100 if x == 'MALICIOUS' else 0),
('ReversingLabs', 'status',
lambda x: 100 if x == 'MALICIOUS' else 0),
('Spamhaus_HBL', 'detection',
lambda x: 100 if x == 'MALICIOUS' else 0),
]
for vendor, key, transform in vendor_info_list:
if vendor in vendor_data and key in vendor_data[vendor]:
value = vendor_data[vendor][key]
score = max(score, transform(value))
# Ensure score is at least 0 (or some default value)
score = max(score, 0)

# compute status
if score >= 75 or isInJson(raw, 'MALICIOUS'.lower(), 1001):
# if score >= 75:
status = 'threat'
elif score >= 50:
status = 'caution'
elif score >= 25:
status = 'info'

else:
status = 'ok'
elif raw['query_status'] != 'ok':
status = 'info'
summary = 'no result'

return {'response': raw, 'summary': summary, 'status': status}


def analyze(input):
# put all of our methods together, pass them input, and return
# properly formatted json/python dict output
data = json.loads(input)
meta = helpers.loadMetadata(__file__)
helpers.checkSupportedType(meta, data["artifactType"])

if (data['artifactType'] == 'tlsh' or data['artifactType'] == 'gimphash'
or data['artifactType'] == 'telfhash'):
# To get accurate reporting for TLSH, telfhash and gimphash,
# we deem it necessary to query
# twice for the sake of retrieving more specific data.

initialQuery = buildReq(data['artifactType'], data['value'])
initialRaw = sendReq(meta, initialQuery)

# To prevent double-querying when a tlsh/gimphash is invalid,
# this if statement is necessary.
if initialRaw['query_status'] == 'ok':
# Setting artifactType and value to our new re-query arguments
# to get a more detailed report.
data['artifactType'] = 'hash'
data['value'] = initialRaw['data'][0]['sha256_hash']
else:
return prepareResults(initialRaw)

query = buildReq(data['artifactType'], data['value'])
response = sendReq(meta, query)
return prepareResults(response)


def main():
if len(sys.argv) == 2:
results = analyze(sys.argv[1])
print(json.dumps(results))
else:
print("ERROR: Input is not in proper JSON format")


if __name__ == '__main__':
main()
import requests
import helpers
import json
import sys

# supports querying for hash, gimphash, tlsh, and telfhash
# usage is as follows:
# python3 malwarebazaar.py '{"artifactType":"x", "value":"y"}'


def buildReq(observ_type, observ_value):
# determine correct query type to send based off of observable type
unique_types = {'gimphash': 1, 'telfhash': 1, 'tlsh': 1}
if observ_type in unique_types:
qtype = 'get_' + observ_type
else:
qtype = 'get_info'
return {'query': qtype, observ_type: observ_value}


def sendReq(meta, query):
# send a post request with our compiled query to the API
url = meta['baseUrl']
response = requests.post(url, query)
return response.json()


def isInJson(data, target_string, maxdepth=1000, tail=0):
# searches a JSON object for an occurance of a string
# recursively.
# depth limiter (arbitrary default value of 1000)
if tail > maxdepth:
return False

if isinstance(data, dict):
for key, value in data.items():
if isinstance(value, (dict, list)):
# recursive call
if isInJson(value, target_string, maxdepth, tail + 1):
return True
elif isinstance(value, str) and target_string in value.lower():
# found target string
return True

elif isinstance(data, list):
for item in data:
if isinstance(item, (dict, list)):
# recursive call
if isInJson(item, target_string, maxdepth, tail + 1):
return True
elif isinstance(item, str) and target_string in item.lower():
# found target string
return True

return False


def prepareResults(raw):
# parse raw API response, gauge threat level
# and return status and a short summary
if raw == {}:
status = 'caution'
summary = 'internal_failure'
elif raw['query_status'] == 'ok':
parsed = raw['data'][0]
vendor_data = parsed['vendor_intel']

# get summary
if 'signature' in parsed:
summary = parsed['signature']
elif 'tags' in parsed:
summary = str(parsed['tags'][0])
elif 'YOROI_YOMI' in vendor_data:
summary = vendor_data['YOROI_YOMI']['detection']

# gauge vendors to determine an approximation of status,
# normalized to a value out of 100
# only updates score if it finds a higher indicator value
score = 0
vendor_info_list = [
('vxCube', 'maliciousness', int),
('Triage', 'score', lambda x: int(x) * 10),
('DocGuard', 'alertlevel', lambda x: int(x) * 10),
('YOROI_YOMI', 'score', lambda x: int(float(x)) * 100),
('Inquest', 'verdict', lambda x: 100 if x == 'MALICIOUS' else 0),
('ReversingLabs', 'status',
lambda x: 100 if x == 'MALICIOUS' else 0),
('Spamhaus_HBL', 'detection',
lambda x: 100 if x == 'MALICIOUS' else 0),
]
for vendor, key, transform in vendor_info_list:
if vendor in vendor_data and key in vendor_data[vendor]:
value = vendor_data[vendor][key]
score = max(score, transform(value))
# Ensure score is at least 0 (or some default value)
score = max(score, 0)

# compute status
if score >= 75 or isInJson(raw, 'MALICIOUS'.lower(), 1001):
# if score >= 75:
status = 'threat'
elif score >= 50:
status = 'caution'
elif score >= 25:
status = 'info'

else:
status = 'ok'
elif raw['query_status'] != 'ok':
status = 'info'
summary = 'no result'

return {'response': raw, 'summary': summary, 'status': status}


def analyze(input):
# put all of our methods together, pass them input, and return
# properly formatted json/python dict output
data = json.loads(input)
meta = helpers.loadMetadata(__file__)
helpers.checkSupportedType(meta, data["artifactType"])

if (data['artifactType'] == 'tlsh' or data['artifactType'] == 'gimphash'
or data['artifactType'] == 'telfhash'):
# To get accurate reporting for TLSH, telfhash and gimphash,
# we deem it necessary to query
# twice for the sake of retrieving more specific data.

initialQuery = buildReq(data['artifactType'], data['value'])
initialRaw = sendReq(meta, initialQuery)

# To prevent double-querying when a tlsh/gimphash is invalid,
# this if statement is necessary.
if initialRaw['query_status'] == 'ok':
# Setting artifactType and value to our new re-query arguments
# to get a more detailed report.
data['artifactType'] = 'hash'
data['value'] = initialRaw['data'][0]['sha256_hash']
else:
return prepareResults(initialRaw)

query = buildReq(data['artifactType'], data['value'])
response = sendReq(meta, query)
return prepareResults(response)


def main():
if len(sys.argv) == 2:
results = analyze(sys.argv[1])
print(json.dumps(results))
else:
print("ERROR: Input is not in proper JSON format")


if __name__ == '__main__':
main()
27 changes: 25 additions & 2 deletions salt/sensoroni/files/analyzers/malwarebazaar/malwarebazaar_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,18 @@ def test_main_success(self):
self.assertEqual(mock_cmd.getvalue(), expected)
mock.assert_called_once()

def test_isInJson_tail_greater_than_max_depth(self):
max_depth = 1000
tail = 2000
test_string = "helo"
input_json = {
"value": "test",
"test": "value",
"arr": ["Foo", "Bar", "Hello"],
"dict1": {"key1": "val", "key2": "helo"}
}
self.assertEqual(malwarebazaar.isInJson(input_json, test_string, max_depth, tail), False)

def test_isInJson_string_found_in_dict(self):
test_string = "helo"
input_json = {
Expand All @@ -33,6 +45,18 @@ def test_isInJson_string_found_in_dict(self):
}
self.assertEqual(malwarebazaar.isInJson(input_json, test_string), True)

def test_isInJson_dict_in_list(self):
max_depth = 1000
tail = 1
test_string = "helo"
input_json = {
"key1": "test",
"key2": "value",
"key3": ["Foo", "Bar", "Hello"],
"nested_list": [{"key1": "val", "key2": "helo"}]
}
self.assertEqual(malwarebazaar.isInJson(input_json, test_string, max_depth, tail), True)

def test_isInJson_string_found_in_arr(self):
test_string = "helo"
input_json = {
Expand All @@ -51,8 +75,7 @@ def test_isInJson_string_not_found(self):
"arr": ["Foo", "Bar", "helo"],
"dict1": {"Hello": "val", "key": "val"}
}
self.assertEqual(malwarebazaar.isInJson(
input_json, test_string), False)
self.assertEqual(malwarebazaar.isInJson(input_json, test_string), False)

def test_analyze(self):
"""simulated sendReq and prepareResults with 2 mock objects
Expand Down

0 comments on commit 6145891

Please sign in to comment.