From c7d074cb6f749d054a78f767173b6de5278c67e5 Mon Sep 17 00:00:00 2001 From: "J.P. Weiser" <4422366+jpweiser@users.noreply.github.com> Date: Mon, 18 Nov 2024 14:08:11 -0500 Subject: [PATCH 1/6] Create convert_nessus_csv_to_sarif.py Add script to convert Nessus CSV report to SARIF JSON --- .../tools/convert_nessus_csv_to_sarif.py | 141 ++++++++++++++++++ 1 file changed, 141 insertions(+) create mode 100644 scanners/generic/tools/convert_nessus_csv_to_sarif.py diff --git a/scanners/generic/tools/convert_nessus_csv_to_sarif.py b/scanners/generic/tools/convert_nessus_csv_to_sarif.py new file mode 100644 index 00000000..349b8e92 --- /dev/null +++ b/scanners/generic/tools/convert_nessus_csv_to_sarif.py @@ -0,0 +1,141 @@ +#!/usr/bin/env python3 +####################################### +# +# Convert a Nessus CSV report to SARIF format(stdout). +# A usage example (see options in the code): +# $ convert_nessus_csv_to_sarify.py [-f ] [--log-level=DEBUG] +# If `-f` is absent, or its value is `-`, CSV data will be read from STDIN +# +# +import argparse +import json +import logging +import sys +import csv +import re + +def map_level(risk): + """ + Map severity to match SARIF level property + """ + if risk is 'Critical' or risk is 'High': + return "error" + elif risk is 'Medium': + return "warning" + elif risk is 'Low': + return "note" + else: + return "none" + +def nessus_info(field_name, entry): + """ + Extract scan details from Nessus Plugin 19506 + """ + # Match the field name with RegEx, then split it to extract + # the value. Finally, strip all surrounding whitespace + return re.compile(field_name + '.*\n').search(entry)[0].split(':')[1].strip() + +def is_file(file_name): + """ + Bool to determine if filename was provided + """ + return file_name is not None and file_name != "-" + +def uri(host, port): + """ + Format URI from host and port + """ + uri = host + # Ignore port if 0 + if port is not '0': + uri = uri + ':' + port + return uri + +def convert_csv_to_sarif(csv_file): + """ + Convert CSV data to SARIF format. + """ + + # Start of template. Nessus and version provided as default values to be replaced + sarif_template = { + "version": "2.1.0", + "runs": [ + { + "tool": {"driver": {"name": "Nessus", "version": "10.8", "rules": []}}, + "results": [], + } + ], + } + + rule_ids = set() + + # Below used for logging purposes for file vs stdin + if is_file(csv_file): + logging.debug(f"Reading input from '{csv_file}'") + else: + logging.debug("Reading input from STDIN") + + with open(csv_file, newline='') if is_file(csv_file) else sys.stdin as report: + reader = csv.DictReader(report) + for row in reader: + if row['Plugin ID'] == '19506': + # This Plugin contains lots of details about scan to populate SARIF tool property + sarif_template["runs"][0]["tool"]["driver"]["name"] = nessus_info('Scanner edition used', row['Plugin Output']) + sarif_template["runs"][0]["tool"]["driver"]["version"] = nessus_info('Nessus version', row['Plugin Output']) + # Adding fullname to include policy + sarif_template["runs"][0]["tool"]["driver"]["fullName"] = f"{nessus_info('Scanner edition used',row['Plugin Output'])} {nessus_info('Nessus version', row['Plugin Output'])} {nessus_info('Scan policy used', row['Plugin Output'])} Policy" + + if row["Plugin ID"] not in rule_ids: + new_rule = { + "id": row["Plugin ID"], + "name": row["Name"], + "shortDescription": {"text": row["Description"]}, + } + sarif_template["runs"][0]["tool"]["driver"]["rules"].append(new_rule) + rule_ids.add(row["Plugin ID"]) + + artifact_location = uri(row["Host"],row["Port"]) + + new_report = { + "ruleId": row["Plugin ID"], + "level": map_level(row["Risk"]), + "message": {"text": f"{row["Plugin Output"]}\n\nSolution: {row["Solution"]}"}, + "locations": [{"physicalLocation": {"artifactLocation": {"uri": artifact_location}}}], + } + + sarif_template["runs"][0]["results"].append(new_report) + + return sarif_template + + +def main(): + # Parse command-line arguments + parser = argparse.ArgumentParser(description="Convert JSON data to SARIF format with JSON block added to message.") + parser.add_argument( + "-f", + "--filename", + type=str, + required=False, + default=None, + help="Path to JSON file (if absent or '-': read from STDIN)", + ) + parser.add_argument( + "--log-level", + dest="loglevel", + choices=["DEBUG", "VERBOSE", "INFO", "WARNING", "ERROR", "CRITICAL"], + default="INFO", + help="Level of verbosity", + ) + + args = parser.parse_args() + + logging.basicConfig(format="%(levelname)s:%(message)s", level=args.loglevel) + + sarif_data = convert_csv_to_sarif(args.filename) + + # Print the SARIF data + print(json.dumps(sarif_data, indent=2)) + + +if __name__ == "__main__": + main() From f228df3bf39a6398f4b608fe878f6ff32929db77 Mon Sep 17 00:00:00 2001 From: "J.P. Weiser" <4422366+jpweiser@users.noreply.github.com> Date: Mon, 18 Nov 2024 14:48:20 -0500 Subject: [PATCH 2/6] Update convert_nessus_csv_to_sarif.py per linter --- .../tools/convert_nessus_csv_to_sarif.py | 77 ++++++++++++------- 1 file changed, 51 insertions(+), 26 deletions(-) diff --git a/scanners/generic/tools/convert_nessus_csv_to_sarif.py b/scanners/generic/tools/convert_nessus_csv_to_sarif.py index 349b8e92..12b15153 100644 --- a/scanners/generic/tools/convert_nessus_csv_to_sarif.py +++ b/scanners/generic/tools/convert_nessus_csv_to_sarif.py @@ -1,39 +1,41 @@ #!/usr/bin/env python3 -####################################### -# +""" # Convert a Nessus CSV report to SARIF format(stdout). # A usage example (see options in the code): # $ convert_nessus_csv_to_sarify.py [-f ] [--log-level=DEBUG] # If `-f` is absent, or its value is `-`, CSV data will be read from STDIN # -# +""" import argparse +import csv import json import logging -import sys -import csv import re +import sys + def map_level(risk): """ Map severity to match SARIF level property """ - if risk is 'Critical' or risk is 'High': + if risk is "Critical" or risk is "High": return "error" - elif risk is 'Medium': + elif risk is "Medium": return "warning" - elif risk is 'Low': + elif risk is "Low": return "note" else: return "none" + def nessus_info(field_name, entry): """ Extract scan details from Nessus Plugin 19506 """ # Match the field name with RegEx, then split it to extract # the value. Finally, strip all surrounding whitespace - return re.compile(field_name + '.*\n').search(entry)[0].split(':')[1].strip() + return re.compile(field_name + ".*\n").search(entry)[0].split(":")[1].strip() + def is_file(file_name): """ @@ -41,15 +43,17 @@ def is_file(file_name): """ return file_name is not None and file_name != "-" + def uri(host, port): """ Format URI from host and port """ - uri = host + target = host # Ignore port if 0 - if port is not '0': - uri = uri + ':' + port - return uri + if port is not "0": + target = target + ":" + port + return target + def convert_csv_to_sarif(csv_file): """ @@ -68,22 +72,30 @@ def convert_csv_to_sarif(csv_file): } rule_ids = set() - + # Below used for logging purposes for file vs stdin if is_file(csv_file): - logging.debug(f"Reading input from '{csv_file}'") + logging.debug("Reading input from: %s", csv_file) else: logging.debug("Reading input from STDIN") - with open(csv_file, newline='') if is_file(csv_file) else sys.stdin as report: + with ( + open(csv_file, newline="", encoding="utf-8") if is_file(csv_file) else sys.stdin + ) as report: reader = csv.DictReader(report) for row in reader: - if row['Plugin ID'] == '19506': + if row["Plugin ID"] == "19506": # This Plugin contains lots of details about scan to populate SARIF tool property - sarif_template["runs"][0]["tool"]["driver"]["name"] = nessus_info('Scanner edition used', row['Plugin Output']) - sarif_template["runs"][0]["tool"]["driver"]["version"] = nessus_info('Nessus version', row['Plugin Output']) + sarif_template["runs"][0]["tool"]["driver"]["name"] = nessus_info( + "Scanner edition used", row["Plugin Output"] + ) + sarif_template["runs"][0]["tool"]["driver"]["version"] = nessus_info( + "Nessus version", row["Plugin Output"] + ) # Adding fullname to include policy - sarif_template["runs"][0]["tool"]["driver"]["fullName"] = f"{nessus_info('Scanner edition used',row['Plugin Output'])} {nessus_info('Nessus version', row['Plugin Output'])} {nessus_info('Scan policy used', row['Plugin Output'])} Policy" + sarif_template["runs"][0]["tool"]["driver"][ + "fullName" + ] = f"{nessus_info('Scanner edition used',row['Plugin Output'])} {nessus_info('Nessus version', row['Plugin Output'])} {nessus_info('Scan policy used', row['Plugin Output'])} Policy" if row["Plugin ID"] not in rule_ids: new_rule = { @@ -93,14 +105,22 @@ def convert_csv_to_sarif(csv_file): } sarif_template["runs"][0]["tool"]["driver"]["rules"].append(new_rule) rule_ids.add(row["Plugin ID"]) - - artifact_location = uri(row["Host"],row["Port"]) - + + artifact_location = uri(row["Host"], row["Port"]) + new_report = { "ruleId": row["Plugin ID"], "level": map_level(row["Risk"]), - "message": {"text": f"{row["Plugin Output"]}\n\nSolution: {row["Solution"]}"}, - "locations": [{"physicalLocation": {"artifactLocation": {"uri": artifact_location}}}], + "message": { + "text": f"{row["Plugin Output"]}\n\nSolution: {row["Solution"]}" + }, + "locations": [ + { + "physicalLocation": { + "artifactLocation": {"uri": artifact_location} + } + } + ], } sarif_template["runs"][0]["results"].append(new_report) @@ -109,8 +129,13 @@ def convert_csv_to_sarif(csv_file): def main(): + """ + Parses arguments before converting Nessus CSV report to SARIF JSON format + """ # Parse command-line arguments - parser = argparse.ArgumentParser(description="Convert JSON data to SARIF format with JSON block added to message.") + parser = argparse.ArgumentParser( + description="Convert JSON data to SARIF format with JSON block added to message." + ) parser.add_argument( "-f", "--filename", From 6dc51f7e0ff5eb3edfb0117960ddd56c35626e98 Mon Sep 17 00:00:00 2001 From: "J.P. Weiser" <4422366+jpweiser@users.noreply.github.com> Date: Mon, 18 Nov 2024 15:02:41 -0500 Subject: [PATCH 3/6] Update convert_nessus_csv_to_sarif.py per pylint --- .../tools/convert_nessus_csv_to_sarif.py | 23 +++++++++++-------- 1 file changed, 14 insertions(+), 9 deletions(-) diff --git a/scanners/generic/tools/convert_nessus_csv_to_sarif.py b/scanners/generic/tools/convert_nessus_csv_to_sarif.py index 12b15153..91e4551b 100644 --- a/scanners/generic/tools/convert_nessus_csv_to_sarif.py +++ b/scanners/generic/tools/convert_nessus_csv_to_sarif.py @@ -18,14 +18,16 @@ def map_level(risk): """ Map severity to match SARIF level property """ - if risk is "Critical" or risk is "High": + if risk in ("Critical", "High"): return "error" - elif risk is "Medium": + + if risk == "Medium": return "warning" - elif risk is "Low": + + if risk == "Low": return "note" - else: - return "none" + + return "none" def nessus_info(field_name, entry): @@ -50,7 +52,7 @@ def uri(host, port): """ target = host # Ignore port if 0 - if port is not "0": + if port != "0": target = target + ":" + port return target @@ -93,9 +95,12 @@ def convert_csv_to_sarif(csv_file): "Nessus version", row["Plugin Output"] ) # Adding fullname to include policy - sarif_template["runs"][0]["tool"]["driver"][ - "fullName" - ] = f"{nessus_info('Scanner edition used',row['Plugin Output'])} {nessus_info('Nessus version', row['Plugin Output'])} {nessus_info('Scan policy used', row['Plugin Output'])} Policy" + sarif_template["runs"][0]["tool"]["driver"]["fullName"] = ( + "%s %s %s Policy", + nessus_info("Scanner edition used", row["Plugin Output"]), + nessus_info("Nessus version", row["Plugin Output"]), + nessus_info("Scan policy used", row["Plugin Output"]), + ) if row["Plugin ID"] not in rule_ids: new_rule = { From a6666dcba48bcdbb8c62bc2756eed85fa5ee618b Mon Sep 17 00:00:00 2001 From: "J.P. Weiser" <4422366+jpweiser@users.noreply.github.com> Date: Mon, 18 Nov 2024 15:25:18 -0500 Subject: [PATCH 4/6] Update convert_nessus_csv_to_sarif.py more linting --- .../generic/tools/convert_nessus_csv_to_sarif.py | 13 ++++++++----- 1 file changed, 8 insertions(+), 5 deletions(-) diff --git a/scanners/generic/tools/convert_nessus_csv_to_sarif.py b/scanners/generic/tools/convert_nessus_csv_to_sarif.py index 91e4551b..07647a60 100644 --- a/scanners/generic/tools/convert_nessus_csv_to_sarif.py +++ b/scanners/generic/tools/convert_nessus_csv_to_sarif.py @@ -95,12 +95,14 @@ def convert_csv_to_sarif(csv_file): "Nessus version", row["Plugin Output"] ) # Adding fullname to include policy - sarif_template["runs"][0]["tool"]["driver"]["fullName"] = ( - "%s %s %s Policy", + full_name = ( nessus_info("Scanner edition used", row["Plugin Output"]), nessus_info("Nessus version", row["Plugin Output"]), nessus_info("Scan policy used", row["Plugin Output"]), ) + sarif_template["runs"][0]["tool"]["driver"][ + "fullName" + ] = f"{full_name[0]} {full_name[1]} {full_name[2]} Policy" if row["Plugin ID"] not in rule_ids: new_rule = { @@ -117,7 +119,7 @@ def convert_csv_to_sarif(csv_file): "ruleId": row["Plugin ID"], "level": map_level(row["Risk"]), "message": { - "text": f"{row["Plugin Output"]}\n\nSolution: {row["Solution"]}" + "text": f"{row['Plugin Output']}\n\nSolution: {row['Solution']}" }, "locations": [ { @@ -139,7 +141,7 @@ def main(): """ # Parse command-line arguments parser = argparse.ArgumentParser( - description="Convert JSON data to SARIF format with JSON block added to message." + description="Convert Nessus CSV report to SARIF JSON format." ) parser.add_argument( "-f", @@ -147,7 +149,7 @@ def main(): type=str, required=False, default=None, - help="Path to JSON file (if absent or '-': read from STDIN)", + help="Path to Nessus CSV file (if absent or '-': read from STDIN)", ) parser.add_argument( "--log-level", @@ -169,3 +171,4 @@ def main(): if __name__ == "__main__": main() + From 35de6c68a49e5861bed97ecc0e60ca239e60e44f Mon Sep 17 00:00:00 2001 From: "J.P. Weiser" <4422366+jpweiser@users.noreply.github.com> Date: Mon, 18 Nov 2024 15:49:59 -0500 Subject: [PATCH 5/6] Update convert_nessus_csv_to_sarif.py --- .../tools/convert_nessus_csv_to_sarif.py | 20 ++++--------------- 1 file changed, 4 insertions(+), 16 deletions(-) diff --git a/scanners/generic/tools/convert_nessus_csv_to_sarif.py b/scanners/generic/tools/convert_nessus_csv_to_sarif.py index 07647a60..99b14687 100644 --- a/scanners/generic/tools/convert_nessus_csv_to_sarif.py +++ b/scanners/generic/tools/convert_nessus_csv_to_sarif.py @@ -81,9 +81,7 @@ def convert_csv_to_sarif(csv_file): else: logging.debug("Reading input from STDIN") - with ( - open(csv_file, newline="", encoding="utf-8") if is_file(csv_file) else sys.stdin - ) as report: + with open(csv_file, newline="", encoding="utf-8") if is_file(csv_file) else sys.stdin as report: reader = csv.DictReader(report) for row in reader: if row["Plugin ID"] == "19506": @@ -118,16 +116,8 @@ def convert_csv_to_sarif(csv_file): new_report = { "ruleId": row["Plugin ID"], "level": map_level(row["Risk"]), - "message": { - "text": f"{row['Plugin Output']}\n\nSolution: {row['Solution']}" - }, - "locations": [ - { - "physicalLocation": { - "artifactLocation": {"uri": artifact_location} - } - } - ], + "message": {"text": f"{row['Plugin Output']}\n\nSolution: {row['Solution']}"}, + "locations": [{"physicalLocation": {"artifactLocation": {"uri": artifact_location}}}], } sarif_template["runs"][0]["results"].append(new_report) @@ -140,9 +130,7 @@ def main(): Parses arguments before converting Nessus CSV report to SARIF JSON format """ # Parse command-line arguments - parser = argparse.ArgumentParser( - description="Convert Nessus CSV report to SARIF JSON format." - ) + parser = argparse.ArgumentParser(description="Convert Nessus CSV report to SARIF JSON format.") parser.add_argument( "-f", "--filename", From 717dcfc61cf5bffb6db6d3e0ed9e4f39dab503cc Mon Sep 17 00:00:00 2001 From: "J.P. Weiser" <4422366+jpweiser@users.noreply.github.com> Date: Mon, 18 Nov 2024 15:54:14 -0500 Subject: [PATCH 6/6] Remove newline at end of file --- scanners/generic/tools/convert_nessus_csv_to_sarif.py | 1 - 1 file changed, 1 deletion(-) diff --git a/scanners/generic/tools/convert_nessus_csv_to_sarif.py b/scanners/generic/tools/convert_nessus_csv_to_sarif.py index 99b14687..f526edd0 100644 --- a/scanners/generic/tools/convert_nessus_csv_to_sarif.py +++ b/scanners/generic/tools/convert_nessus_csv_to_sarif.py @@ -159,4 +159,3 @@ def main(): if __name__ == "__main__": main() -