Skip to content

Commit

Permalink
Merge pull request #8 from FiniteStateInc/v0.0.8
Browse files Browse the repository at this point in the history
v0.0.8 RC
  • Loading branch information
nickvido authored Oct 10, 2023
2 parents 70310e8 + 8af61ef commit 9f9ea92
Show file tree
Hide file tree
Showing 9 changed files with 4,474 additions and 3,568 deletions.
6,413 changes: 3,442 additions & 2,971 deletions docs/finite_state_sdk.html

Large diffs are not rendered by default.

1,127 changes: 620 additions & 507 deletions docs/finite_state_sdk/queries.html

Large diffs are not rendered by default.

2 changes: 1 addition & 1 deletion docs/finite_state_sdk/token_cache.html
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ <h2>API Documentation</h2>
</ul>


<footer>finite-state-sdk-python v0.0.7</footer>
<footer>finite-state-sdk-python v0.0.8</footer>

<a class="attribution" title="pdoc: Python API documentation generator" href="https://pdoc.dev" target="_blank">
built with <span class="visually-hidden">pdoc</span><img
Expand Down
2 changes: 1 addition & 1 deletion docs/search.js

Large diffs are not rendered by default.

39 changes: 39 additions & 0 deletions examples/download_sboms.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
import finite_state_sdk
import requests


def custom_download_sbom(token, organization_context, sbom_type="CYCLONEDX", sbom_subtype="SBOM_ONLY", asset_version_id=None, output_filename=None):
"""
Demonstration of a method for getting a download URL. Downloads an SBOM from the Finite State Platform and saves it to the specified output_filename.
You could build your own method to do something else with the URL, or you can use the built-in finite_state_sdk.download_sbom() method.
:param token: Finite State API token
:param organization_context: Finite State API organization context
:param sbom_type: The type of SBOM to download. Valid values are "CYCLONEDX" and "SPDX"
:param sbom_subtype: The subtype of SBOM to download. Valid values are "SBOM_ONLY", "SBOM_WITH_VDR", and "VDR_ONLY"
:param asset_version_id: The asset version ID to download the SBOM for
:param output_filename: The filename to save the SBOM to
"""
url = finite_state_sdk.generate_sbom_download_url(token, organization_context, sbom_type=sbom_type, sbom_subtype=sbom_subtype, asset_version_id=asset_version_id)

# Send an HTTP GET request to the URL
response = requests.get(url)

# Check if the request was successful (status code 200)
if response.status_code == 200:
# Open a local file in binary write mode and write the content to it
print("File downloaded successfully.")
with open(output_filename, 'wb') as file:
file.write(response.content)
print(f'Wrote file to {output_filename}')
else:
print("Failed to download the file. Status code:", response.status_code)


def example_download_sboms(token, organization_context):
custom_download_sbom(token, organization_context, sbom_type="CYCLONEDX", sbom_subtype="SBOM_ONLY", asset_version_id='123456789', output_filename='sbom.cyclonedx.sbom_only.json')
custom_download_sbom(token, organization_context, sbom_type="CYCLONEDX", sbom_subtype="SBOM_WITH_VDR", asset_version_id='123456789', output_filename='sbom.cyclonedx.sbom_with_vdr.json')
custom_download_sbom(token, organization_context, sbom_type="CYCLONEDX", sbom_subtype="VDR_ONLY", asset_version_id='123456789', output_filename='sbom.cyclonedx.vdr_only.json')
custom_download_sbom(token, organization_context, sbom_type="SPDX", sbom_subtype="SBOM_ONLY", asset_version_id='123456789', output_filename='sbom.spdx.sbom_only.json')

finite_state_sdk.download_sbom(token, organization_context, sbom_type="CYCLONEDX", sbom_subtype="SBOM_ONLY", asset_version_id='123456789', output_filename='sbom.cyclonedx.sbom_only.json', verbose=True)

178 changes: 174 additions & 4 deletions finite_state_sdk/__init__.py
Original file line number Diff line number Diff line change
@@ -1,10 +1,11 @@
import json
import requests
import time
import finite_state_sdk.queries as queries

API_URL = 'https://platform.finitestate.io/api/v1/graphql'
AUDIENCE = "https://platform.finitestate.io/api/v1/graphql"
TOKEN_URL = "https://finitestate.auth0.com/oauth/token"
TOKEN_URL = "https://platform.finitestate.io/api/v1/auth/token"


def create_artifact(token, organization_context, business_unit_id=None, created_by_user_id=None, asset_version_id=None, artifact_name=None, product_id=None):
Expand Down Expand Up @@ -746,6 +747,51 @@ def create_test_as_third_party_scanner(token, organization_context, business_uni
return create_test(token, organization_context, business_unit_id=business_unit_id, created_by_user_id=created_by_user_id, asset_id=asset_id, artifact_id=artifact_id, test_name=test_name, product_id=product_id, test_type=test_type)


def download_sbom(token, organization_context, sbom_type="CYCLONEDX", sbom_subtype="SBOM_ONLY", asset_version_id=None, output_filename="sbom.json", verbose=False):
"""
Download an SBOM for an Asset Version and save it to a local file. This is a blocking call, and can sometimes take minutes to return if the SBOM is very large.
Args:
token (str):
Auth token. This is the token returned by get_auth_token(). Just the token, do not include "Bearer" in this string, that is handled inside the method.
organization_context (str):
Organization context. This is provided by the Finite State API management. It looks like "xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx".
sbom_type (str, required):
The type of SBOM to download. Valid values are "CYCLONEDX" and "SPDX". Defaults to "CYCLONEDX".
sbom_subtype (str, required):
The subtype of SBOM to download. Valid values for CycloneDX are "SBOM_ONLY", "SBOM_WITH_VDR", "VDR_ONLY. For SPDX valid values are "SBOM_ONLY". Defaults to "SBOM_ONLY".
asset_version_id (str, required):
The Asset Version ID to download the SBOM for.
output_filename (str, required):
The local filename to save the SBOM to. If not provided, the SBOM will be saved to a file named "sbom.json" in the current directory.
verbose (bool, optional):
If True, will print additional information to the console. Defaults to False.
Raises:
ValueError: Raised if required parameters are not provided.
Exception: Raised if the query fails.
Returns:
None
"""
url = generate_sbom_download_url(token, organization_context, sbom_type=sbom_type, sbom_subtype=sbom_subtype, asset_version_id=asset_version_id, verbose=verbose)

# Send an HTTP GET request to the URL
response = requests.get(url)

# Check if the request was successful (status code 200)
if response.status_code == 200:
# Open a local file in binary write mode and write the content to it
if verbose:
print("File downloaded successfully.")
with open(output_filename, 'wb') as file:
file.write(response.content)
if verbose:
print(f'Wrote file to {output_filename}')
else:
raise Exception(f"Failed to download the file. Status code: {response.status_code}")


def file_chunks(file_path, chunk_size=1024 * 1024 * 1024 * 5):
"""
Helper method to read a file in chunks.
Expand Down Expand Up @@ -1036,6 +1082,31 @@ def get_assets(token, organization_context, asset_id=None, business_unit_id=None
return get_all_paginated_results(token, organization_context, queries.ALL_ASSETS['query'], queries.ALL_ASSETS['variables'](asset_id, business_unit_id), 'allAssets')


def get_asset_versions(token, organization_context, asset_version_id=None, asset_id=None, business_unit_id=None):
"""
Gets asset versions in the organization. Uses pagination to get all results.
Args:
token (str):
Auth token. This is the token returned by get_auth_token(). Just the token, do not include "Bearer" in this string, that is handled inside the method.
organization_context (str):
Organization context. This is provided by the Finite State API management. It looks like "xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx".
asset_version_id (str, optional):
Asset Version ID to get, by default None. If None specified, will get all Asset Versions. If specified, will get only the Asset Version with that ID.
asset_id (str, optional):
Asset ID to filter by, by default None. If None specified, will get all Asset Versions. If specified, will get only the Asset Versions for the specified Asset.
business_unit_id (str, optional):
Business Unit ID to filter by, by default None. If None specified, will get all Asset Versions. If specified, will get only the Asset Versions in the specified Business Unit.
Raises:
Exception: Raised if the query fails.
Returns:
list: List of AssetVersion Objects
"""
return get_all_paginated_results(token, organization_context, queries.ALL_ASSET_VERSIONS['query'], queries.ALL_ASSET_VERSIONS['variables'](asset_version_id=asset_version_id, asset_id=asset_id, business_unit_id=business_unit_id), 'allAssetVersions')


def get_auth_token(client_id, client_secret, token_url=TOKEN_URL, audience=AUDIENCE):
"""
Get an auth token for use with the API using CLIENT_ID and CLIENT_SECRET
Expand Down Expand Up @@ -1120,7 +1191,7 @@ def get_product_asset_versions(token, organization_context, product_id=None):
return get_all_paginated_results(token, organization_context, queries.GET_PRODUCT_ASSET_VERSIONS['query'], queries.GET_PRODUCT_ASSET_VERSIONS['variables'](product_id), 'allProducts')


def get_products(token, organization_context, business_unit_id=None):
def get_products(token, organization_context, business_unit_id=None) -> list:
"""
Gets all the products for the specified business unit.
Args:
Expand All @@ -1142,7 +1213,105 @@ def get_products(token, organization_context, business_unit_id=None):
return get_all_paginated_results(token, organization_context, queries.GET_PRODUCTS_BUSINESS_UNIT['query'], queries.GET_PRODUCTS_BUSINESS_UNIT['variables'](business_unit_id), 'allProducts')


def get_software_components(token, organization_context, asset_version_id=None, type=None):
def generate_sbom_download_url(token, organization_context, sbom_type=None, sbom_subtype=None, asset_version_id=None, verbose=False) -> str:
"""
Blocking call: Initiates generation of an SBOM for the asset_version_id, and return a pre-signed URL for downloading the SBOM.
This may take several minutes to complete, depending on the size of SBOM.
Args:
token (str):
Auth token. This is the token returned by get_auth_token(). Just the token, do not include "Bearer" in this string, that is handled inside the method.
organization_context (str):
Organization context. This is provided by the Finite State API management. It looks like "xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx".
sbom_type (str, required):
The type of SBOM to download. Valid values are "CYCLONEDX" or "SPDX".
sbom_subtype (str, required):
The subtype of SBOM to download. Valid values for CycloneDX are "SBOM_ONLY", "SBOM_WITH_VDR", "VDR_ONLY"; valid values for SPDX are "SBOM_ONLY".
asset_version_id (str, required):
Asset Version ID to download the SBOM for.
verbose (bool, optional):
If True, print additional information to the console. Defaults to False.
Raises:
ValueError: Raised if sbom_type, sbom_subtype, or asset_version_id are not provided.
Exception: Raised if the query fails.
Returns:
str: URL to download the SBOM from.
"""

if not sbom_type:
raise ValueError("SBOM Type is required")
if not sbom_subtype:
raise ValueError("SBOM Subtype is required")
if not asset_version_id:
raise ValueError("Asset Version ID is required")

if sbom_type not in ["CYCLONEDX", "SPDX"]:
raise Exception(f"SBOM Type {sbom_type} not supported")

if sbom_type == "CYCLONEDX":
if sbom_subtype not in ["SBOM_ONLY", "SBOM_WITH_VDR", "VDR_ONLY"]:
raise Exception(f"SBOM Subtype {sbom_subtype} not supported")

mutation = queries.LAUNCH_CYCLONEDX_EXPORT['mutation']
variables = queries.LAUNCH_CYCLONEDX_EXPORT['variables'](sbom_subtype, asset_version_id)

response_data = send_graphql_query(token, organization_context, mutation, variables)
if verbose:
print(f'Response Data: {json.dumps(response_data, indent=4)}')

# get exportJobId from the result
export_job_id = response_data['data']['launchCycloneDxExport']['exportJobId']
if verbose:
print(f'Export Job ID: {export_job_id}')

if sbom_type == "SPDX":
if sbom_subtype not in ["SBOM_ONLY"]:
raise Exception(f"SBOM Subtype {sbom_subtype} not supported")

mutation = queries.LAUNCH_SPDX_EXPORT['mutation']
variables = queries.LAUNCH_SPDX_EXPORT['variables'](sbom_subtype, asset_version_id)

response_data = send_graphql_query(token, organization_context, mutation, variables)
if verbose:
print(f'Response Data: {json.dumps(response_data, indent=4)}')

# get exportJobId from the result
export_job_id = response_data['data']['launchSpdxExport']['exportJobId']
if verbose:
print(f'Export Job ID: {export_job_id}')

if not export_job_id:
raise Exception("Error: Export Job ID not found - this should not happen, please contact your Finite State representative")

# poll the API until the export job is complete
if verbose:
print('Polling every 5 seconds for export job to complete')
total_time = 0
sleep_time = 5
while True:
time.sleep(sleep_time)
total_time += sleep_time
if verbose:
print(f'Total time elapsed: {total_time} seconds')

query = queries.GENERATE_EXPORT_DOWNLOAD_PRESIGNED_URL['query']
variables = queries.GENERATE_EXPORT_DOWNLOAD_PRESIGNED_URL['variables'](export_job_id)

response_data = send_graphql_query(token, organization_context, query, variables)

if verbose:
print(f'Response Data: {json.dumps(response_data, indent=4)}')

if response_data['data']['generateExportDownloadPresignedUrl']['status'] == "COMPLETED":
if response_data['data']['generateExportDownloadPresignedUrl']['downloadLink']:
if verbose:
print(f'Export Job Complete. Download URL: {response_data["data"]["generateExportDownloadPresignedUrl"]["downloadLink"]}')
return response_data['data']['generateExportDownloadPresignedUrl']['downloadLink']


def get_software_components(token, organization_context, asset_version_id=None, type=None) -> list:
"""
Gets all the Software Components for an Asset Version. Uses pagination to get all results.
Args:
Expand All @@ -1165,12 +1334,13 @@ def get_software_components(token, organization_context, asset_version_id=None,
return get_all_paginated_results(token, organization_context, queries.GET_SOFTWARE_COMPONENTS['query'], queries.GET_SOFTWARE_COMPONENTS['variables'](asset_version_id=asset_version_id, type=type), 'allSoftwareComponentInstances')


def search_sbom(token, organization_context, name=None, version=None, asset_version_id=None, search_method='EXACT', case_sensitive=False):
def search_sbom(token, organization_context, name=None, version=None, asset_version_id=None, search_method='EXACT', case_sensitive=False) -> list:
"""
Searches the SBOM of a specific asset version or the entire organization for matching software components.
Search Methods: EXACT or CONTAINS
An exact match will return only the software component whose name matches the name exactly.
A contains match will return all software components whose name contains the search string.
Args:
token (str):
Auth token. This is the token returned by get_auth_token(). Just the token, do not include "Bearer" in this string, that is handled inside the method.
Expand Down
Loading

0 comments on commit 9f9ea92

Please sign in to comment.