diff --git a/secator/tasks/grype.py b/secator/tasks/grype.py index 6b354205..af6fe77b 100644 --- a/secator/tasks/grype.py +++ b/secator/tasks/grype.py @@ -1,13 +1,14 @@ import os import yaml +from secator.config import CONFIG from secator.decorators import task from secator.definitions import (DELAY, FOLLOW_REDIRECT, HEADER, OPT_NOT_SUPPORTED, PROXY, RATE_LIMIT, RETRIES, - THREADS, TIMEOUT, USER_AGENT) -from secator.output_types import Vulnerability + THREADS, TIMEOUT, USER_AGENT, OUTPUT_PATH) +from secator.output_types import Vulnerability, Info, Error from secator.tasks._categories import VulnCode -from secator.definitions import (OUTPUT_PATH) +from secator.utils import debug @task() @@ -29,23 +30,23 @@ class grype(VulnCode): TIMEOUT: OPT_NOT_SUPPORTED, USER_AGENT: OPT_NOT_SUPPORTED } - output_types = [Vulnerability] - install_cmd = ( - '$(curl -sSfL https://raw.githubusercontent.com/anchore/grype/main/install.sh | sudo sh -s -- -b /usr/local/bin) || exit 1' - ) - install_github_handle = 'anchore/grype' - output_map = { - Vulnerability: { - 'name': lambda x: x['vulnerability']['id'], + Vulnerability: { + 'name': lambda x: x['vulnerability']['id'], + 'id': lambda x: x['vulnerability']['id'], 'severity': lambda x: x['vulnerability']['severity'].lower(), 'cvss_score': lambda x: x['vulnerability']['cvss_score'], 'references': lambda x: x['vulnerability']['urls'], 'description': lambda x: x['vulnerability']['description'] - } - } + } + } + output_types = [Vulnerability] + install_cmd = ( + '$(curl -sSfL https://raw.githubusercontent.com/anchore/grype/main/install.sh | sudo sh -s -- -b /usr/local/bin) || exit 1' + ) + install_github_handle = 'anchore/grype' - @staticmetho + @staticmethod def on_cmd(self): output_path = self.get_opt_value(OUTPUT_PATH) if not output_path: @@ -53,19 +54,62 @@ def on_cmd(self): self.output_path = output_path self.cmd = f'{self.cmd} --file {self.output_path}' - - def yielder(self): - prev = self.print_item_count - self.print_item_count = False - list(super().yielder()) - if self.return_code != 0: - return - self.results = [] - if not self.output_json: + @staticmethod + def on_cmd_done(self): + if not os.path.exists(self.output_path): + yield Error(message=f'Could not find JSON results in {self.output_path}') return - note = f'Trivy JSON result saved to {self.output_path} - item = self._process_item(item) - if not item: - continue - yield item - self.print_item_count = prev + + yield Info(message=f'JSON results saved to {self.output_path}') + with open(self.output_path, 'r') as f: + results = yaml.safe_load(f.read()) + for item in results['matches']: + vulns = [item['vulnerability']] + item['relatedVulnerabilities'] + details = item['matchDetails'][0] + searchedBy = details['searchedBy'] + for vuln_data in vulns: + vuln_id = vuln_data['id'] + cvss = None + if len(vuln_data['cvss']) > 0: + cvss = vuln_data['cvss'][0]['metrics']['baseScore'] + description = vuln_data['description'] + references = vuln_data['urls'] + severity = vuln_data['severity'].lower() + match_type = details['type'] + if severity == 'negligible': + severity = 'low' + confidence_to_match = { + 'cpe-match': 'high', + 'exact-direct-match': 'medium', + 'exact-indirect-match': 'low' + } + confidence = confidence_to_match.get(match_type, 'low') + if (CONFIG.runners.skip_cve_low_confidence and confidence == 'low'): + debug(f'{vuln_id}: ignored (low confidence).', sub='cve') + continue + data = { + 'id': vuln_id, + 'name': vuln_id, + 'description': description, + 'matched_at': self.inputs[0], + 'confidence': confidence, + 'provider': 'grype', + 'severity': severity, + 'cvss_score': cvss, + 'tags': [details['type']], + 'references': references, + 'extra_data': {} + } + if 'language' in searchedBy: + data['extra_data']['lang'] = searchedBy['language'] + if 'package' in searchedBy: + data['extra_data']['product'] = searchedBy['package']['name'] + data['extra_data']['version'] = searchedBy['package']['version'] + if 'namespace' in searchedBy: + data['extra_data']['namespace'] = searchedBy['namespace'] + is_ghsa = vuln_id.startswith('GHSA') + if is_ghsa: + data['tags'].append('ghsa') + else: + data['tags'].append('cve') + yield Vulnerability(**data) diff --git a/tests/integration/outputs.py b/tests/integration/outputs.py index c80c8e24..a43ab5ff 100644 --- a/tests/integration/outputs.py +++ b/tests/integration/outputs.py @@ -159,10 +159,10 @@ id='CVE-2024-24790', matched_at='redis:7.4.1', ip='', - confidence='medium', + confidence='high', severity='critical', - cvss_score=-1, - tags=[], + cvss_score=9.8, + tags=['cpe-match', 'cve'], _source='grype', ) ],