diff --git a/scripts/native_libraries/get_emba_db.py b/scripts/native_libraries/get_emba_db.py deleted file mode 100644 index 8c37b53a..00000000 --- a/scripts/native_libraries/get_emba_db.py +++ /dev/null @@ -1,92 +0,0 @@ -import json -import os -import re - -import requests - -from surfactant.configmanager import ConfigManager - - -def load_database(url): - response = requests.get(url) - response.raise_for_status() - return response.text - - -def parse_cfg_file(content): - database = {} - lines = content.splitlines() - filtered_lines = [] - - for line in lines: - if not (line.startswith("#") or line.startswith("identifier")): - filtered_lines.append(line) - - for line in filtered_lines: - line = line.strip() - - # Split by semicolons - fields = line.split(";") - - # Name of library - lib_name = fields[0] - - # Empty filename because EMBA doesn't need filename patterns - name_patterns = [] - - # Check if it starts with one double quote and ends with two double quotes - if fields[3].startswith('"') and fields[3].endswith('""'): - filecontent = fields[3][1:-1] - elif fields[3].endswith('""'): - filecontent = fields[3][:-1] - else: - filecontent = fields[3].strip('"') - - # Create a dictionary for this entry and add it to the database - # Strict mode is deprecated so those entries will be matched just by filename - if fields[1] == "" or fields[1] == "strict": - if fields[1] == "strict": - if lib_name not in database: - database[lib_name] = { - "filename": [lib_name], - "filecontent": [], - } - else: - try: - re.search(filecontent.encode("utf-8"), b"") - if lib_name not in database: - database[lib_name] = { - "filename": name_patterns, - "filecontent": [filecontent], - } - else: - database[lib_name]["filecontent"].append(filecontent) - except re.error as e: - print(f"Error parsing file content regexp {filecontent}: {e}") - - return database - - -# Use database from this specific commit -emba_database_url = "https://raw.githubusercontent.com/e-m-b-a/emba/11d6c281189c3a14fc56f243859b0bccccce8b9a/config/bin_version_strings.cfg" -json_file_path = ConfigManager().get_data_dir_path() / "native_lib_patterns" / "emba.json" - -file_content = load_database(emba_database_url) - -parsed_data = parse_cfg_file(file_content) - -for _, value in parsed_data.items(): - filecontent_list = value["filecontent"] - - # Remove leading ^ from each string in the filecontent list - for i, pattern in enumerate(filecontent_list): # Use enumerate to get index and value - if pattern.startswith("^"): - filecontent_list[i] = pattern[1:] - - if not pattern.endswith("\\$"): - if pattern.endswith("$"): - filecontent_list[i] = pattern[:-1] - -os.makedirs(os.path.dirname(json_file_path), exist_ok=True) -with open(json_file_path, "w") as json_file: - json.dump(parsed_data, json_file, indent=4) diff --git a/surfactant/infoextractors/native_lib_file.py b/surfactant/infoextractors/native_lib_file.py index b25b9e19..30a374ef 100644 --- a/surfactant/infoextractors/native_lib_file.py +++ b/surfactant/infoextractors/native_lib_file.py @@ -1,8 +1,9 @@ import json import os import re -from typing import Any, Dict, List, Optional +from typing import Any, Dict, List, Optional, Union +import requests from loguru import logger import surfactant.plugin @@ -10,50 +11,55 @@ from surfactant.sbomtypes import SBOM, Software -@surfactant.plugin.hookimpl -def short_name() -> Optional[str]: - return "native_lib_patterns" +class NativeLibDatabaseManager: + def __init__(self) -> None: + self.native_lib_database: Optional[Dict[str, Any]] = None + def load_db(self) -> None: + native_lib_file = ConfigManager().get_data_dir_path() / "native_lib_patterns" / "emba.json" -def load_pattern_db(): - # Load regex patterns into database var - try: - with open(native_lib_patterns, "r") as regex: - emba_patterns = json.load(regex) - return emba_patterns - except FileNotFoundError: - logger.warning(f"File not found for native library detection: {native_lib_patterns}") - return None + try: + with open(native_lib_file, "r") as regex: + self.native_lib_database = json.load(regex) + except FileNotFoundError: + logger.warning( + "Native library pattern could not be loaded. Run `surfactant plugin update-db native_lib_patterns` to fetch the pattern database." + ) + self.native_lib_database = None + def get_database(self) -> Optional[Dict[str, Any]]: + return self.native_lib_database -# Load the pattern database once at module import -native_lib_patterns = ConfigManager().get_data_dir_path() / "native_lib_patterns" / "emba.json" -database = load_pattern_db() +native_lib_manager = NativeLibDatabaseManager() -def supports_file(filetype) -> bool: + +def supports_file(filetype: str) -> bool: return filetype in ("PE", "ELF", "MACHOFAT", "MACHOFAT64", "MACHO32", "MACHO64") @surfactant.plugin.hookimpl -def extract_file_info(sbom: SBOM, software: Software, filename: str, filetype: str) -> object: +def extract_file_info( + sbom: SBOM, software: Software, filename: str, filetype: str +) -> Optional[Dict[str, Any]]: if not supports_file(filetype): return None return extract_native_lib_info(filename) -def extract_native_lib_info(filename): +def extract_native_lib_info(filename: str) -> Optional[Dict[str, Any]]: native_lib_info: Dict[str, Any] = {"nativeLibraries": []} - if not database: + native_lib_database = native_lib_manager.get_database() + + if native_lib_database is None: return None - found_libraries = set() - library_names = [] - contains_library_names = [] + found_libraries: set = set() + library_names: List[str] = [] + contains_library_names: List[str] = [] - # Match based on filename base_filename = os.path.basename(filename) - filenames_list = match_by_attribute("filename", base_filename, database) + filenames_list = match_by_attribute("filename", base_filename, native_lib_database) if len(filenames_list) > 0: for match in filenames_list: library_name = match["isLibrary"] @@ -61,13 +67,11 @@ def extract_native_lib_info(filename): library_names.append(library_name) found_libraries.add(library_name) - # Match based on filecontent try: with open(filename, "rb") as native_file: filecontent = native_file.read() - filecontent_list = match_by_attribute("filecontent", filecontent, database) + filecontent_list = match_by_attribute("filecontent", filecontent, native_lib_database) - # Extend the list and add the new libraries found for match in filecontent_list: library_name = match["containsLibrary"] if library_name not in found_libraries: @@ -77,19 +81,19 @@ def extract_native_lib_info(filename): except FileNotFoundError: logger.warning(f"File not found: {filename}") - # Create the single entry for isLibrary if library_names: native_lib_info["nativeLibraries"].append({"isLibrary": library_names}) - # Create the single entry for containsLibrary if contains_library_names: native_lib_info["nativeLibraries"].append({"containsLibrary": contains_library_names}) return native_lib_info -def match_by_attribute(attribute: str, content: str, patterns_database: Dict) -> List[Dict]: - libs = [] +def match_by_attribute( + attribute: str, content: Union[str, bytes], patterns_database: Dict[str, Any] +) -> List[Dict[str, Any]]: + libs: List[Dict[str, str]] = [] for lib_name, lib_info in patterns_database.items(): if attribute in lib_info: for pattern in lib_info[attribute]: @@ -102,3 +106,107 @@ def match_by_attribute(attribute: str, content: str, patterns_database: Dict) -> if matches: libs.append({"containsLibrary": lib_name}) return libs + + +def download_database() -> Optional[str]: + emba_database_url = "https://raw.githubusercontent.com/e-m-b-a/emba/11d6c281189c3a14fc56f243859b0bccccce8b9a/config/bin_version_strings.cfg" + response = requests.get(emba_database_url) + if response.status_code == 200: + logger.info("Request successful!") + return response.text + + if response.status_code == 404: + logger.error("Resource not found.") + else: + logger.error("An error occurred.") + + return None + + +def parse_emba_cfg_file(content: str) -> Dict[str, Dict[str, List[str]]]: + database: Dict[str, Dict[str, List[str]]] = {} + lines = content.splitlines() + filtered_lines: List[str] = [] + + for line in lines: + if not (line.startswith("#") or line.startswith("identifier")): + filtered_lines.append(line) + + for line in filtered_lines: + line = line.strip() + + fields = line.split(";") + + lib_name = fields[0] + + name_patterns: List[str] = [] + + if fields[3].startswith('"') and fields[3].endswith('""'): + filecontent = fields[3][1:-1] + elif fields[3].endswith('""'): + filecontent = fields[3][:-1] + else: + filecontent = fields[3].strip('"') + + if fields[1] == "" or fields[1] == "strict": + if fields[1] == "strict": + if lib_name not in database: + database[lib_name] = { + "filename": [lib_name], + "filecontent": [], + } + else: + if lib_name not in database[lib_name]["filename"]: + database[lib_name]["filename"].append(lib_name) + else: + try: + re.search(filecontent.encode("utf-8"), b"") + if lib_name not in database: + database[lib_name] = { + "filename": name_patterns, + "filecontent": [filecontent], + } + else: + database[lib_name]["filecontent"].append(filecontent) + except re.error as e: + logger.error(f"Error parsing file content regexp {filecontent}: {e}") + + return database + + +@surfactant.plugin.hookimpl +def update_db() -> str: + file_content = download_database() + if file_content is not None: + parsed_data = parse_emba_cfg_file(file_content) + for _, value in parsed_data.items(): + filecontent_list = value["filecontent"] + + for i, pattern in enumerate(filecontent_list): + if pattern.startswith("^"): + filecontent_list[i] = pattern[1:] + + if not pattern.endswith("\\$"): + if pattern.endswith("$"): + filecontent_list[i] = pattern[:-1] + + path = ConfigManager().get_data_dir_path() / "native_lib_patterns" + path.mkdir(parents=True, exist_ok=True) + native_lib_file = ConfigManager().get_data_dir_path() / "native_lib_patterns" / "emba.json" + with open(native_lib_file, "w") as json_file: + json.dump(parsed_data, json_file, indent=4) + return "Update complete." + return "No update occurred." + + +@surfactant.plugin.hookimpl +def short_name() -> Optional[str]: + return "native_lib_patterns" + + +@surfactant.plugin.hookimpl +def init_hook(command_name: Optional[str] = None) -> None: + if command_name != "update-db": + logger.info("Initializing native_lib_file...") + native_lib_manager.load_db() + logger.info("Initializing native_lib_file complete.")