Skip to content

Commit

Permalink
Merge pull request #18 from FiniteStateInc/v0.1.5-2
Browse files Browse the repository at this point in the history
v0.1.5
  • Loading branch information
nickvido authored Feb 9, 2024
2 parents 605bc88 + ac2cd59 commit cdd37bf
Show file tree
Hide file tree
Showing 10 changed files with 2,746 additions and 2,358 deletions.
3,632 changes: 1,896 additions & 1,736 deletions docs/finite_state_sdk.html

Large diffs are not rendered by default.

1,110 changes: 578 additions & 532 deletions docs/finite_state_sdk/queries.html

Large diffs are not rendered by default.

2 changes: 1 addition & 1 deletion docs/finite_state_sdk/token_cache.html
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@ <h2>API Documentation</h2>
</ul>


<footer>finite-state-sdk-python v0.1.4</footer>
<footer>finite-state-sdk-python v0.1.5</footer>

<a class="attribution" title="pdoc: Python API documentation generator" href="https://pdoc.dev" target="_blank">
built with <span class="visually-hidden">pdoc</span><img
Expand Down
2 changes: 1 addition & 1 deletion docs/search.js

Large diffs are not rendered by default.

4 changes: 4 additions & 0 deletions examples/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,10 @@ How to make custom queries using pagination and helper methods in the SDK.

How to use the `search_sbom` function of the SDK to search for components by name and version, and specify whether the search should be case-sensitive or not.

# update_finding_status.py

How to update Finding resolutions using the SDK, which allows you to set the status and specify justifications or vendor responses and provide comments.

# upload_test_results.py

How to programmatically upload test results (e.g. SBOMs or Third Party Scanners). Basically a one-liner you can add to your CI systems.
Expand Down
77 changes: 77 additions & 0 deletions examples/update_finding_status.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,77 @@
import argparse
import datetime
import os
from dotenv import load_dotenv

import finite_state_sdk


def main():
dt = datetime.datetime.now()
dt_str = dt.strftime("%Y-%m-%d-%H%M")

parser = argparse.ArgumentParser(description='Update Finding Statuses')
parser.add_argument('--secrets-file', type=str, help='Path to the secrets file', required=True)

args = parser.parse_args()

load_dotenv(args.secrets_file, override=True)

# get CLIENT_ID and CLIENT_SECRET from env
CLIENT_ID = os.environ.get("CLIENT_ID")
CLIENT_SECRET = os.environ.get("CLIENT_SECRET")
ORGANIZATION_CONTEXT = os.environ.get("ORGANIZATION_CONTEXT")

# Get an auth token - this is a bearer token that you will use for all subsequent requests
# The token is valid for 24 hours
token = finite_state_sdk.get_auth_token(CLIENT_ID, CLIENT_SECRET)

# these are the finding IDs to update - you can specify one, or multiple
# if specifying multiple, they will all have the same resolution and comment
finding_ids = ['123456789', '234567891'] # replace with your finding IDs

# -------- NOT AFFECTED EXAMPLE --------

# The status to apply to the Findings
# These statuses align with the VEX resolution statuses: https://www.cisa.gov/sites/default/files/publications/VEX_Use_Cases_April2022.pdf
# For details about avialable statuses, see: https://docs.finitestate.io/types/finding-status-option
new_status = "NOT_AFFECTED"

# Valid Justifications for NOT_AFFECTED status, describing why the vulnerability is not present or not a concern:
# ["COMPONENT_NOT_PRESENT", "INLINE_MITIGATIONS_ALREADY_EXIST", "VULNERABLE_CODE_CANNOT_BE_CONTROLLED_BY_ADVERSARY", "VULNERABLE_CODE_NOT_IN_EXECUTE_PATH", "VULNERABLE_CODE_NOT_PRESENT"]
# For details, see: https://docs.finitestate.io/types/finding-status-justification-enum
justification = "COMPONENT_NOT_PRESENT"

# The comment will be applied to the Finding resolution
comment = "Updating status to NOT_AFFECTED with justification: COMPONENT_NOT_PRESENT for finding_ids: {finding_ids}"

# For more info see: https://docs.finitestate.io/mutations/update-findings-statuses
gql_response = finite_state_sdk.update_findings_status(token, ORGANIZATION_CONTEXT, status=new_status, justification=justification, comment=comment)

updated_finding_ids = gql_response["data"]["updateFindingsStatuses"]["ids"]
print(f'Updated {len(updated_finding_ids)} findings')

# -------- AFFECTED EXAMPLE --------

finding_ids = ['345678912', '456789123'] # replace with your finding IDs

# The status to apply to the Findings
new_status = "AFFECTED"

# Valid Vendor responses for AFFECTED status, describing what the vendor will do about the vulnerability:
#["CANNOT_FIX", "ROLLBACK_REQUIRED", "UPDATE_REQUIRED", "WILL_NOT_FIX", "WORKAROUND_AVAILABLE"]
# For details, see: https://docs.finitestate.io/types/finding-status-response-enum
response = "CANNOT_FIX"

# The comment will be applied to the Finding resolution
comment = "Updating status to AFFECTED with response: CANNOT_FIX for finding_ids: {finding_ids}"

# For more info see: https://docs.finitestate.io/mutations/update-findings-statuses
gql_response = finite_state_sdk.update_findings_status(token, ORGANIZATION_CONTEXT, status=new_status, response=response, comment=comment)

updated_finding_ids = gql_response["data"]["updateFindingsStatuses"]["ids"]
print(f'Updated {len(updated_finding_ids)} findings')


if __name__ == "__main__":
main()
65 changes: 60 additions & 5 deletions finite_state_sdk/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -1032,7 +1032,7 @@ def get_all_organizations(token, organization_context):
return get_all_paginated_results(token, organization_context, queries.ALL_ORGANIZATIONS['query'], queries.ALL_ORGANIZATIONS['variables'], 'allOrganizations')


def get_all_paginated_results(token, organization_context, query, variables=None, field=None):
def get_all_paginated_results(token, organization_context, query, variables=None, field=None, limit=None):
"""
Get all results from a paginated GraphQL query
Expand All @@ -1047,6 +1047,8 @@ def get_all_paginated_results(token, organization_context, query, variables=None
Variables to be used in the GraphQL query, by default None
field (str, required):
The field in the response JSON that contains the results
limit (int, optional):
The maximum number of results to return. If not provided, will return all results. By default None
Raises:
Exception: If the response status code is not 200, or if the field is not in the response JSON
Expand Down Expand Up @@ -1079,6 +1081,10 @@ def get_all_paginated_results(token, organization_context, query, variables=None
cursor = response_data['data'][field][len(response_data['data'][field]) - 1]['_cursor']

while cursor:
if limit is not None:
if len(results) >= limit:
break

variables['after'] = cursor

# add the next page of results to the list
Expand Down Expand Up @@ -1244,7 +1250,7 @@ def get_auth_token(client_id, client_secret, token_url=TOKEN_URL, audience=AUDIE
return auth_token


def get_findings(token, organization_context, asset_version_id=None, category=None, status=None, severity=None, count=False):
def get_findings(token, organization_context, asset_version_id=None, finding_id=None, category=None, status=None, severity=None, count=False, limit=None):
"""
Gets all the Findings for an Asset Version. Uses pagination to get all results.
Args:
Expand All @@ -1254,24 +1260,31 @@ def get_findings(token, organization_context, asset_version_id=None, category=No
Organization context. This is provided by the Finite State API management. It looks like "xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx".
asset_version_id (str, optional):
Asset Version ID to get findings for. If not provided, will get all findings in the organization.
finding_id (str, optional):
The ID of a specific finding to get. If specified, will return only the finding with that ID.
category (str, optional):
The category of Findings to return. Valid values are "CONFIG_ISSUES", "CREDENTIALS", "CRYPTO_MATERIAL", "CVE", "SAST_ANALYSIS". If not specified, will return all findings. See https://docs.finitestate.io/types/finding-category
The category of Findings to return. Valid values are "CONFIG_ISSUES", "CREDENTIALS", "CRYPTO_MATERIAL", "CVE", "SAST_ANALYSIS". If not specified, will return all findings. See https://docs.finitestate.io/types/finding-category.
This can be a single string, or an array of values.
status (str, optional):
The status of Findings to return.
severity (str, optional):
The severity of Findings to return. Valid values are "CRITICAL", "HIGH", "MEDIUM", "LOW", "INFO", and "UNKNOWN". If not specified, will return all findings.
count (bool, optional):
If True, will return the count of findings instead of the findings themselves. Defaults to False.
limit (int, optional):
The maximum number of findings to return. If not specified, will return all findings, up to the default of 1000.
Raises:
Exception: Raised if the query fails, required parameters are not specified, or parameters are incompatible.
Returns:
list: List of Finding Objects
"""

if count:
return send_graphql_query(token, organization_context, queries.GET_FINDINGS_COUNT['query'], queries.GET_FINDINGS_COUNT['variables'](asset_version_id=asset_version_id, category=category, status=status, severity=severity))["data"]["_allFindingsMeta"]
return send_graphql_query(token, organization_context, queries.GET_FINDINGS_COUNT['query'], queries.GET_FINDINGS_COUNT['variables'](asset_version_id=asset_version_id, finding_id=finding_id, category=category, status=status, severity=severity, limit=limit))["data"]["_allFindingsMeta"]
else:
return get_all_paginated_results(token, organization_context, queries.GET_FINDINGS['query'], queries.GET_FINDINGS['variables'](asset_version_id=asset_version_id, category=category, status=status, severity=severity), 'allFindings')
return get_all_paginated_results(token, organization_context, queries.GET_FINDINGS['query'], queries.GET_FINDINGS['variables'](asset_version_id=asset_version_id, finding_id=finding_id, category=category, status=status, severity=severity, limit=limit), 'allFindings', limit=limit)


def get_product_asset_versions(token, organization_context, product_id=None):
Expand Down Expand Up @@ -1701,6 +1714,48 @@ def send_graphql_query(token, organization_context, query, variables=None):
raise Exception(f"Error: {response.status_code} - {response.text}")


def update_finding_statuses(token, organization_context, user_id=None, finding_ids=None, status=None, justification=None, response=None, comment=None):
"""
Updates the status of a findings or multiple findings. This is a blocking call.
Args:
token (str):
Auth token. This is the token returned by get_auth_token(). Just the token, do not include "Bearer" in this string.
organization_context (str):
Organization context. This is provided by the Finite State API management. It looks like "xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx".
user_id (str, required):
User ID to update the finding status for.
finding_ids (str, required):
Finding ID to update the status for.
status (str, required):
Status to update the finding to. Valid values are "AFFECTED", "FIXED", "NOT_AFFECTED", and "UNDER_INVESTIGATION". For more details, see https://docs.finitestate.io/types/finding-status-option
justification (str, optional):
Optional justification that applies to status of "NOT AFFECTED". Valid values are "COMPONENT_NOT_PRESENT", "INLINE_MITIGATIONS_ALREADY_EXIST", "VULNERABLE_CODE_CANNOT_BE_CONTROLLED_BY_ADVERSARY", "VULNERABLE_CODE_NOT_IN_EXECUTE_PATH", "VULNERABLE_CODE_NOT_PRESENT". For more details see https://docs.finitestate.io/types/finding-status-justification-enum
response (str, optional):
Optional "Vendor Responses" that applies to status of "AFFECTED". Valid values are "CANNOT_FIX", "ROLLBACK_REQUIRED", "UPDATE_REQUIRED", "WILL_NOT_FIX", and "WORKAROUND_AVAILABLE". For more details, see https://docs.finitestate.io/types/finding-status-response-enum
comment (str, optional):
Optional comment to add to the finding status update.
Raises:
ValueError: Raised if required parameters are not provided.
Exception: Raised if the query fails.
Returns:
dict: Response JSON from the GraphQL query of type UpdateFindingsStatusesResponse. For details see https://docs.finitestate.io/types/update-findings-statuses-response
"""
if not user_id:
raise ValueError("User ID is required")
if not finding_ids:
raise ValueError("Finding ID is required")
if not status:
raise ValueError("Status is required")

mutation = queries.UPDATE_FINDING_STATUSES['mutation']
variables = queries.UPDATE_FINDING_STATUSES['variables'](user_id=user_id, finding_ids=finding_ids, status=status, justification=justification, response=response, comment=comment)

return send_graphql_query(token, organization_context, mutation, variables)


def upload_file_for_binary_analysis(token, organization_context, test_id=None, file_path=None, chunk_size=1024 * 1024 * 1024 * 5, quick_scan=False):
"""
Upload a file for Binary Analysis. Will automatically chunk the file into chunks and upload each chunk. Chunk size defaults to 5GB.
Expand Down
58 changes: 52 additions & 6 deletions finite_state_sdk/queries.py
Original file line number Diff line number Diff line change
Expand Up @@ -369,7 +369,7 @@ def artifact_variables(artifact_id=None, business_unit_id=None):
}


def _create_GET_FINDINGS_VARIABLES(asset_version_id=None, category=None, cve_id=None, status=None, severity=None, limit=1000, count=False):
def _create_GET_FINDINGS_VARIABLES(asset_version_id=None, category=None, cve_id=None, finding_id=None, status=None, severity=None, limit=1000, count=False):
variables = {
"filter": {
"mergedFindingRefId": None,
Expand All @@ -383,21 +383,30 @@ def _create_GET_FINDINGS_VARIABLES(asset_version_id=None, category=None, cve_id=
variables["first"] = limit
variables["orderBy"] = ["title_ASC"]

if finding_id is not None:
# if finding_id is a list, use the "in" operator
if isinstance(finding_id, list):
variables["filter"]["id_in"] = finding_id
else:
variables["filter"]["id"] = str(finding_id)

if asset_version_id is not None:
# if asset_version_id is a list, use the "in" operator
if isinstance(asset_version_id, list):
variables["filter"]["assetVersionRefId_in"] = asset_version_id
else:
variables["filter"]["assetVersionRefId"] = str(asset_version_id)

# if category is a string, make it a list
if isinstance(category, str):
category = [category]

if category is not None:
variables["filter"]["AND"] = [
{
"OR": [
{
"category_in": [
category
]
"category_in": category
}
]
},
Expand Down Expand Up @@ -451,7 +460,7 @@ def _create_GET_FINDINGS_VARIABLES(asset_version_id=None, category=None, cve_id=
}
}
""",
"variables": lambda asset_version_id=None, category=None, cve_id=None, status=None, severity=None, limit=None: _create_GET_FINDINGS_VARIABLES(asset_version_id=asset_version_id, category=category, cve_id=cve_id, status=status, severity=severity, limit=limit, count=True)
"variables": lambda asset_version_id=None, category=None, cve_id=None, finding_id=None, status=None, severity=None, limit=None: _create_GET_FINDINGS_VARIABLES(asset_version_id=asset_version_id, category=category, cve_id=cve_id, finding_id=finding_id, status=status, severity=severity, limit=limit, count=True)
}

GET_FINDINGS = {
Expand Down Expand Up @@ -500,6 +509,7 @@ def _create_GET_FINDINGS_VARIABLES(asset_version_id=None, category=None, cve_id=
}
id
justification
responses
status
updatedAt
__typename
Expand Down Expand Up @@ -564,7 +574,7 @@ def _create_GET_FINDINGS_VARIABLES(asset_version_id=None, category=None, cve_id=
__typename
}
}""",
"variables": lambda asset_version_id=None, category=None, cve_id=None, status=None, severity=None, limit=None: _create_GET_FINDINGS_VARIABLES(asset_version_id=asset_version_id, category=category, cve_id=cve_id, severity=severity, status=status, limit=limit)
"variables": lambda asset_version_id=None, category=None, cve_id=None, finding_id=None, status=None, severity=None, limit=None: _create_GET_FINDINGS_VARIABLES(asset_version_id=asset_version_id, category=category, cve_id=cve_id, finding_id=finding_id, severity=severity, status=status, limit=limit)
}


Expand Down Expand Up @@ -922,6 +932,42 @@ def _create_LAUNCH_SPDX_EXPORT_VARIABLES(spdx_subtype, asset_version_id):
}
}


def __create_UPDATE_FINDING_STATUSES_VARIABLES(user_id=None, finding_ids=None, status=None, justification=None, response=None, comment=None):
if not isinstance(finding_ids, list):
finding_ids = [finding_ids]

if status == "AFFECTED":
if justification is not None:
raise Exception("justification pertains to status NOT AFFECTED. Specify response instead.")
elif status == "NOT_AFFECTED":
if response is not None:
raise Exception("response pertains to status AFFECTED. Specify justification instead.")

return {
"ids": finding_ids,
"updateStatusInput": {
"comment": comment,
"status": status,
"justification": justification,
"responses": response
},
"userId": user_id
}


UPDATE_FINDING_STATUSES = {
"mutation": """
mutation UpdateFindingsStatuses($ids: [ID!]!, $updateStatusInput: UpdateFindingStatusesInput!, $userId: ID!) {
updateFindingsStatuses(ids: $ids, updateStatusInput: $updateStatusInput, userId: $userId) {
ids
}
}
""",
"variables": lambda user_id=None, finding_ids=None, status=None, justification=None, response=None, comment=None: __create_UPDATE_FINDING_STATUSES_VARIABLES(user_id=user_id, finding_ids=finding_ids, status=status, justification=justification, response=response, comment=comment)
}


__all__ = [
"ALL_BUSINESS_UNITS",
"ALL_USERS",
Expand Down
2 changes: 1 addition & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ build-backend = "poetry.core.masonry.api"

[tool.poetry]
name = "finite-state-sdk"
version = "0.1.4"
version = "0.1.5"
authors = [
"Finite State, Inc. <[email protected]>"
]
Expand Down
Loading

0 comments on commit cdd37bf

Please sign in to comment.