Skip to content

Commit

Permalink
Merge branch 'main' into init-nativelib
Browse files Browse the repository at this point in the history
  • Loading branch information
willis89pr authored Jan 13, 2025
2 parents 44fe268 + 0901d8f commit 51f82d7
Show file tree
Hide file tree
Showing 2 changed files with 258 additions and 48 deletions.
104 changes: 83 additions & 21 deletions surfactant/cmd/merge.py
Original file line number Diff line number Diff line change
@@ -1,11 +1,12 @@
import json
import uuid as uuid_module
from collections import deque
from typing import Dict, List
from typing import Dict, List, Tuple

import click
from loguru import logger

from surfactant.configmanager import ConfigManager
from surfactant.plugin.manager import find_io_plugin, get_plugin_manager
from surfactant.sbomtypes._relationship import Relationship
from surfactant.sbomtypes._sbom import SBOM
Expand All @@ -14,11 +15,18 @@

@click.argument("sbom_outfile", envvar="SBOM_OUTPUT", type=click.File("w"), required=True)
@click.argument("input_sboms", type=click.File("r"), required=True, nargs=-1)
@click.option("--config_file", type=click.File("r"), required=False)
@click.option(
"--config_file",
type=click.File("r"),
required=False,
help="Config file for controlling some aspects of the merged SBOM, primarily the creation of a new top-level system object (settings here will typically take precedence over command line options)",
)
@click.option(
"--output_format",
is_flag=False,
default="surfactant.output.cytrics_writer",
default=ConfigManager().get(
"core", "output_format", fallback="surfactant.output.cytrics_writer"
),
help="SBOM output format, options=surfactant.output.[cytrics|csv|spdx]_writer",
)
@click.option(
Expand All @@ -27,8 +35,36 @@
default="surfactant.input_readers.cytrics_reader",
help="SBOM input format, assumes that all input SBOMs being merged have the same format, options=surfactant.input_readers.[cytrics|cyclonedx|spdx]_reader",
)
@click.option(
"--system_uuid",
is_flag=False,
help="System UUID to use for relationships to a top-level system object",
)
@click.option(
"--system_relationship",
is_flag=False,
default="Contains",
show_default=True,
help="Relationship type between merged SBOM contents to a top-level system object",
)
@click.option(
"--add_system/--no_add_system",
default=False,
show_default=True,
help="Create a top-level system entry for tying together the merged SBOM components. When disabled, relationships will still be created to a provided system UUID",
)
@click.command("merge")
def merge_command(input_sboms, sbom_outfile, config_file, output_format, input_format):
# pylint: disable-next=too-many-positional-arguments
def merge_command(
input_sboms,
sbom_outfile,
config_file,
output_format,
input_format,
system_uuid,
system_relationship,
add_system,
):
"""Merge two or more INPUT_SBOMS together into SBOM_OUTFILE.
An optional CONFIG_FILE can be supplied to specify a root system entry
Expand All @@ -43,10 +79,19 @@ def merge_command(input_sboms, sbom_outfile, config_file, output_format, input_f
config = None
if config_file:
config = json.load(config_file)
merge(sboms, sbom_outfile, config, output_writer)


def merge(input_sboms, sbom_outfile, config, output_writer):
merge(sboms, sbom_outfile, config, output_writer, system_uuid, system_relationship, add_system)


# pylint: disable-next=too-many-positional-arguments
def merge(
input_sboms,
sbom_outfile,
config,
output_writer,
system_uuid=None,
system_relationship="Contains", # remember: keep in-sync with click arg default
add_system=False,
):
"""Merge two or more SBOMs."""
merged_sbom = input_sboms[0]
for sbom_m in input_sboms[1:]:
Expand All @@ -56,22 +101,28 @@ def merge(input_sboms, sbom_outfile, config, output_writer):
roots = get_roots_check_cycles(rel_graph)

# Check if provided UUID for a system object already exists to avoid creating a duplicate
add_system = True
if config and "system" in config:
if "UUID" in config["system"]:
for s in merged_sbom.systems:
if config["system"]["UUID"] == s.UUID:
add_system = False
break
# Even if not adding the system, create a dummy/placeholder with the UUID for creating relationships
system = create_system_object(merged_sbom, config)
system, using_random_uuid = create_system_object(merged_sbom, config, system_uuid)
if add_system:
merged_sbom.systems.append(system)

# Add a system relationship to each root software/systems entry identified
for r in roots:
merged_sbom.relationships.add(
Relationship(xUUID=system["UUID"], yUUID=r, relationship="Includes")
if not using_random_uuid or add_system:
if config and "systemRelationship" in config:
system_relationship = config["systemRelationship"]
for r in roots:
merged_sbom.relationships.add(
Relationship(xUUID=system.UUID, yUUID=r, relationship=system_relationship)
)
else:
logger.warning(
"No top-level system relationships added; enable the add system option to randomly generate a UUID, or specify a system UUID"
)

output_writer.write_sbom(merged_sbom, sbom_outfile)
Expand All @@ -92,10 +143,10 @@ def construct_relationship_graph(sbom: SBOM):
rel_graph[sw.UUID] = []
# iterate through all relationships, adding edges to the adjacency list
for rel in sbom.relationships:
# check case where xUUID doesn't exist (and error if yUUID doesn't exist) in the graph
if rel.xUUID not in rel_graph or rel.yUUID not in rel_graph:
logger.error("====ERROR xUUID or yUUID doesn't exist====")
logger.error(f"{rel = }")
logger.error(
f"Either the xUUID or yUUID for the relationship does not exist in the graph: {rel = }"
)
continue
# consider also including relationship type for the edge
# treat as directed graph, with inverted edges (pointing to parents) so dfs will eventually lead to the root parent node for a (sub)graph
Expand Down Expand Up @@ -151,23 +202,34 @@ def dfs(rel):
return roots


def create_system_object(sbom: SBOM, config=None) -> System:
def create_system_object(sbom: SBOM, config=None, system_uuid=None) -> Tuple[System, bool]:
"""Function to create an accurate system object
Positional arguments:
sbom (SBOM): The SBOM the system object is being created for.
config: The user specified config json (Optional).
Returns:
System: The created system object.
Tuple[System, bool]: The created system object and a boolean indicating if a random UUID was used.
"""

system = {}
using_random_uuid = False
if config and "system" in config:
system = config["system"]
# make sure the required fields are present and at least mostly valid
if ("UUID" not in system) or not sbom.is_valid_uuid4(system["UUID"]):

# system_uuid supplied via command line overrides config file UUID
if system_uuid:
system["UUID"] = system_uuid
elif "UUID" not in system:
# No UUID, generate a random one...
using_random_uuid = True
system["UUID"] = str(uuid_module.uuid4())
# check if the UUID appears valid based on the CyTRICS schema
elif not sbom.is_valid_uuid4(system["UUID"]):
invalid_uuid = system["UUID"]
logger.error(f"Invalid uuid4 given ({invalid_uuid}) for the system")

if "name" not in system:
system["name"] = ""
captureStart = -1
Expand All @@ -183,4 +245,4 @@ def create_system_object(sbom: SBOM, config=None) -> System:
system["captureStart"] = captureStart
if "captureEnd" not in system or not system["captureEnd"]:
system["captureEnd"] = captureEnd
return System(**system)
return System(**system), using_random_uuid
Loading

0 comments on commit 51f82d7

Please sign in to comment.