Skip to content

Commit

Permalink
feat(nmap): lookup and correlate exploits with CVE ids (#489)
Browse files Browse the repository at this point in the history
  • Loading branch information
ocervell authored Nov 26, 2024
1 parent a54077d commit 988edcb
Show file tree
Hide file tree
Showing 4 changed files with 91 additions and 24 deletions.
3 changes: 2 additions & 1 deletion secator/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,8 @@ class Runners(StrictModel):
backend_update_frequency: int = 5
poll_frequency: int = 5
skip_cve_search: bool = False
skip_cve_low_confidence: bool = True
skip_exploit_search: bool = False
skip_cve_low_confidence: bool = False
remove_duplicates: bool = False


Expand Down
6 changes: 5 additions & 1 deletion secator/output_types/exploit.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,10 +8,12 @@
@dataclass
class Exploit(OutputType):
name: str
id: str
provider: str
id: str
matched_at: str = ''
ip: str = ''
confidence: str = 'low'
cvss_score: float = 0
reference: str = ''
cves: list = field(default_factory=list, compare=False)
tags: list = field(default_factory=list, compare=False)
Expand Down Expand Up @@ -50,4 +52,6 @@ def __repr__(self):
if self.extra_data:
data = ', '.join([f'{k}:{v}' for k, v in self.extra_data.items()])
s += f' \[[yellow]{str(data)}[/]]'
if self.confidence == 'low':
s = f'[dim]{s}[/]'
return rich_to_ansi(s)
65 changes: 59 additions & 6 deletions secator/tasks/_categories.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,8 @@
import json
import os
import re

from functools import cache

import requests
from bs4 import BeautifulSoup
Expand Down Expand Up @@ -178,13 +181,57 @@ def match_cpes(fs1, fs2):
tup2 = split_fs2[3], split_fs2[4], split_fs2[5]
return tup1 == tup2

@cache
@staticmethod
def lookup_cve_from_vulners_exploit(exploit_id, *cpes):
"""Search for a CVE corresponding to an exploit by extracting the CVE id from the exploit HTML page.
Args:
exploit_id (str): Exploit ID.
cpes (tuple[str], Optional): CPEs to match for.
Returns:
dict: vulnerability data.
"""
if CONFIG.runners.skip_exploit_search:
debug(f'Skip remote query for {exploit_id} since config.runners.skip_exploit_search is set.', sub='cve')
return None
if CONFIG.offline_mode:
debug(f'Skip remote query for {exploit_id} since config.offline_mode is set.', sub='cve')
return None
try:
resp = requests.get(f'https://vulners.com/githubexploit/{exploit_id}', timeout=5)
resp.raise_for_status()
soup = BeautifulSoup(resp.text, 'lxml')
title = soup.title.get_text(strip=True)
h1 = [h1.get_text(strip=True) for h1 in soup.find_all('h1')]
if '404' in h1:
raise requests.RequestException("404 [not found or rate limited]")
code = [code.get_text(strip=True) for code in soup.find_all('code')]
elems = [title] + h1 + code
content = '\n'.join(elems)
cve_regex = re.compile(r'(CVE(?:-|_)\d{4}(?:-|_)\d{4,7})', re.IGNORECASE)
matches = cve_regex.findall(str(content))
if not matches:
debug(f'{exploit_id}: No CVE found in https://vulners.com/githubexploit/{exploit_id}.', sub='cve')
return None
cve_id = matches[0].replace('_', '-').upper()
cve_data = Vuln.lookup_cve(cve_id, *cpes)
if cve_data:
return cve_data

except requests.RequestException as e:
debug(f'Failed remote query for {exploit_id} ({str(e)}).', sub='cve')
return None

@cache
@staticmethod
def lookup_cve(cve_id, cpes=[]):
def lookup_cve(cve_id, *cpes):
"""Search for a CVE in local db or using cve.circl.lu and return vulnerability data.
Args:
cve_id (str): CVE ID in the form CVE-*
cpes (str, Optional): CPEs to match for.
cpes (tuple[str], Optional): CPEs to match for.
Returns:
dict: vulnerability data.
Expand Down Expand Up @@ -216,15 +263,20 @@ def lookup_cve(cve_id, cpes=[]):
# The check is not executed if no CPE was passed (sometimes nmap cannot properly detect a CPE) or if the CPE
# version cannot be determined.
cpe_match = False
tags = []
tags = [cve_id]
if cpes:
for cpe in cpes:
cpe_obj = CPE(cpe)
cpe_fs = cpe_obj.as_fs()
try:
cpe_obj = CPE(cpe)
cpe_fs = cpe_obj.as_fs()
except NotImplementedError:
debug(f'{cve_id}: Failed to parse CPE {cpe} with CPE parser.', sub='cve')
cpe_fs = cpe
tags.append('cpe-invalid')
# cpe_version = cpe_obj.get_version()[0]
vulnerable_fs = cve_info['vulnerable_product']
for fs in vulnerable_fs:
# debug(f'{cve_id}: Testing {cpe_fs} against {fs}', sub='cve') # for hardcore debugging
debug(f'{cve_id}: Testing {cpe_fs} against {fs}', sub='cve.match', verbose=True)
if Vuln.match_cpes(cpe_fs, fs):
debug(f'{cve_id}: CPE match found for {cpe}.', sub='cve')
cpe_match = True
Expand Down Expand Up @@ -285,6 +337,7 @@ def lookup_cve(cve_id, cpes=[]):
}
return vuln

@cache
@staticmethod
def lookup_ghsa(ghsa_id):
"""Search for a GHSA on Github and and return associated CVE vulnerability data.
Expand Down
41 changes: 25 additions & 16 deletions secator/tasks/nmap.py
Original file line number Diff line number Diff line change
Expand Up @@ -116,6 +116,7 @@ def xml_to_json(self):
class nmapData(dict):

def __iter__(self):
datas = []
for host in self._get_hosts():
hostname = self._get_hostname(host)
ip = self._get_ip(host)
Expand Down Expand Up @@ -177,16 +178,19 @@ def __iter__(self):
if not func:
debug(f'Script output parser for "{script_id}" is not supported YET.', sub='cve')
continue
for vuln in func(output, cpes=cpes):
vuln.update(metadata)
for data in func(output, cpes=cpes):
data.update(metadata)
confidence = 'low'
if 'cpe-match' in vuln[TAGS]:
if 'cpe-match' in data[TAGS]:
confidence = 'high' if version_exact else 'medium'
vuln[CONFIDENCE] = confidence
if (CONFIG.runners.skip_cve_low_confidence and vuln[CONFIDENCE] == 'low'):
debug(f'{vuln[ID]}: ignored (low confidence).', sub='cve')
data[CONFIDENCE] = confidence
if (CONFIG.runners.skip_cve_low_confidence and data[CONFIDENCE] == 'low'):
debug(f'{data[ID]}: ignored (low confidence).', sub='cve')
continue
yield vuln
if data in datas:
continue
yield data
datas.append(data)

#---------------------#
# XML FILE EXTRACTORS #
Expand Down Expand Up @@ -339,7 +343,7 @@ def _parse_vulscan_output(self, out, cpes=[]):
TAGS: [vuln_id, provider_name]
}
if provider_name == 'MITRE CVE':
data = VulnMulti.lookup_cve(vuln['id'], cpes=cpes)
data = VulnMulti.lookup_cve(vuln['id'], *cpes)
if data:
vuln.update(data)
yield vuln
Expand All @@ -358,30 +362,35 @@ def _parse_vulners_output(self, out, **kwargs):
cpes.append(line.rstrip(':'))
continue
elems = tuple(line.split('\t'))
vuln = {}

if len(elems) == 4: # exploit
# TODO: Implement exploit processing
exploit_id, cvss_score, reference_url, _ = elems
name = exploit_id
# edb_id = name.split(':')[-1] if 'EDB-ID' in name else None
vuln = {
exploit = {
ID: exploit_id,
NAME: name,
PROVIDER: provider_name,
REFERENCE: reference_url,
TAGS: [exploit_id, provider_name],
CVSS_SCORE: cvss_score,
CONFIDENCE: 'low',
'_type': 'exploit',
TAGS: [exploit_id, provider_name]
# CVSS_SCORE: cvss_score,
# CONFIDENCE: 'low'
}
# TODO: lookup exploit in ExploitDB to find related CVEs
# if edb_id:
# print(edb_id)
# vuln_data = VulnMulti.lookup_exploitdb(edb_id)
yield vuln
# exploit_data = VulnMulti.lookup_exploitdb(edb_id)
vuln = VulnMulti.lookup_cve_from_vulners_exploit(exploit_id, *cpes)
if vuln:
yield vuln
exploit[TAGS].extend(vuln[TAGS])
exploit[CONFIDENCE] = vuln[CONFIDENCE]
yield exploit

elif len(elems) == 3: # vuln
vuln = {}
vuln_id, vuln_cvss, reference_url = tuple(line.split('\t'))
vuln_cvss = float(vuln_cvss)
vuln_id = vuln_id.split(':')[-1]
Expand All @@ -398,7 +407,7 @@ def _parse_vulners_output(self, out, **kwargs):
}
if vuln_type == 'CVE' or vuln_type == 'PRION:CVE':
vuln[TAGS].append('cve')
data = VulnMulti.lookup_cve(vuln_id, cpes=cpes)
data = VulnMulti.lookup_cve(vuln_id, *cpes)
if data:
vuln.update(data)
yield vuln
Expand Down

0 comments on commit 988edcb

Please sign in to comment.