Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Security Hub - get_findings and batch_import_findings #8518

Open
wants to merge 18 commits into
base: master
Choose a base branch
from
Open
1 change: 1 addition & 0 deletions moto/backend_index.py
Original file line number Diff line number Diff line change
Expand Up @@ -184,6 +184,7 @@
("scheduler", re.compile("https?://scheduler\\.(.+)\\.amazonaws\\.com")),
("sdb", re.compile("https?://sdb\\.(.+)\\.amazonaws\\.com")),
("secretsmanager", re.compile("https?://secretsmanager\\.(.+)\\.amazonaws\\.com")),
("securityhub", re.compile("https?://securityhub\\.(.+)\\.amazonaws\\.com")),
(
"servicediscovery",
re.compile("https?://(data-)?servicediscovery\\.(.+)\\.amazonaws\\.com"),
Expand Down
1 change: 1 addition & 0 deletions moto/securityhub/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
from .models import securityhub_backends # noqa: F401
21 changes: 21 additions & 0 deletions moto/securityhub/exceptions.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
"""Exceptions raised by the securityhub service."""

from moto.core.exceptions import JsonRESTError


class SecurityHubClientError(JsonRESTError):
code = 400


class _InvalidOperationException(SecurityHubClientError):
def __init__(self, error_type: str, op: str, msg: str):
super().__init__(
error_type,
"An error occurred (%s) when calling the %s operation: %s"
% (error_type, op, msg),
)


class InvalidInputException(_InvalidOperationException):
def __init__(self, op: str, msg: str):
super().__init__("InvalidInputException", op, msg)
118 changes: 118 additions & 0 deletions moto/securityhub/models.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,118 @@
"""SecurityHubBackend class with methods for supported APIs."""

from typing import Any, Dict, List, Optional, Tuple

from moto.core.base_backend import BackendDict, BaseBackend
from moto.core.common_models import BaseModel
from moto.securityhub.exceptions import InvalidInputException
from moto.utilities.paginator import Paginator


class Finding(BaseModel):
def __init__(self, finding_id: str, finding_data: Dict[str, Any]):
self.id = finding_id
self.data = finding_data

def as_dict(self) -> Dict[str, Any]:
return self.data


class SecurityHubBackend(BaseBackend):
"""Implementation of SecurityHub APIs."""

def __init__(self, region_name: str, account_id: str):
super().__init__(region_name, account_id)
self.findings: List[Finding] = []

def get_findings(
self,
filters: Optional[Dict[str, Any]] = None,
sort_criteria: Optional[List[Dict[str, str]]] = None,
next_token: Optional[str] = None,
max_results: Optional[int] = None,
) -> Dict[str, Any]:
findings = self.findings

# Max Results Parameter
if max_results is not None:
try:
max_results = int(max_results)
if max_results < 1 or max_results > 100:
raise InvalidInputException(
op="GetFindings",
msg="MaxResults must be a number between 1 and 100",
)
except ValueError:
raise InvalidInputException(

Check warning on line 46 in moto/securityhub/models.py

View check run for this annotation

Codecov / codecov/patch

moto/securityhub/models.py#L46

Added line #L46 was not covered by tests
op="GetFindings", msg="MaxResults must be a number greater than 0"
)

paginator = Paginator(
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It looks like we can just use the @paginate-decorator on the method here, that should simplify the implementation a bit.

Please see the docs here: https://docs.getmoto.org/en/latest/docs/contributing/development_tips/utilities.html#paginator

max_results=max_results or 100,
unique_attribute=["id"],
starting_token=next_token,
fail_on_invalid_token=True,
)

paginated_findings, next_token = paginator.paginate(findings)

return {
"Findings": [f.as_dict() for f in paginated_findings],
"NextToken": next_token,
}

def batch_import_findings(
self, findings: List[Dict[str, Any]]
) -> Tuple[int, int, List[Dict[str, Any]]]:
"""
Import findings in batch to SecurityHub.

Args:
findings: List of finding dictionaries to import

Returns:
Tuple of (failed_count, success_count, failed_findings)
"""
failed_count = 0
success_count = 0
failed_findings = []

for finding_data in findings:
try:
if (
not isinstance(finding_data["Resources"], list)
or len(finding_data["Resources"]) == 0
):
raise InvalidInputException(

Check warning on line 86 in moto/securityhub/models.py

View check run for this annotation

Codecov / codecov/patch

moto/securityhub/models.py#L86

Added line #L86 was not covered by tests
op="BatchImportFindings",
msg="Finding must contain at least one resource in the Resources array",
)

finding_id = finding_data["Id"]

existing_finding = next(
(f for f in self.findings if f.id == finding_id), None
)

if existing_finding:
existing_finding.data.update(finding_data)

Check warning on line 98 in moto/securityhub/models.py

View check run for this annotation

Codecov / codecov/patch

moto/securityhub/models.py#L98

Added line #L98 was not covered by tests
else:
new_finding = Finding(finding_id, finding_data)
self.findings.append(new_finding)

success_count += 1

except Exception as e:
failed_count += 1
failed_findings.append(

Check warning on line 107 in moto/securityhub/models.py

View check run for this annotation

Codecov / codecov/patch

moto/securityhub/models.py#L105-L107

Added lines #L105 - L107 were not covered by tests
{
"Id": finding_data.get("Id", ""),
"ErrorCode": "InvalidInput",
"ErrorMessage": str(e),
}
)

return failed_count, success_count, failed_findings


securityhub_backends = BackendDict(SecurityHubBackend, "securityhub")
64 changes: 64 additions & 0 deletions moto/securityhub/responses.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,64 @@
"""Handles incoming securityhub requests, invokes methods, returns responses."""

import json

from moto.core.responses import BaseResponse

from .models import SecurityHubBackend, securityhub_backends


class SecurityHubResponse(BaseResponse):
def __init__(self) -> None:
super().__init__(service_name="securityhub")

@property
def securityhub_backend(self) -> SecurityHubBackend:
return securityhub_backends[self.current_account][self.region]

def get_findings(self) -> str:
raw_params = self._get_params()

params = json.loads(next(iter(raw_params.keys()), "{}"))

sort_criteria = params.get("SortCriteria", [])
filters = params.get("Filters", {})
next_token = params.get("NextToken", None)
max_results = params.get("MaxResults", 100)

result = self.securityhub_backend.get_findings(
filters=filters,
sort_criteria=sort_criteria,
next_token=next_token,
max_results=max_results,
)

return json.dumps(result)

def batch_import_findings(self) -> str:
raw_body = self.body
if isinstance(raw_body, bytes):
raw_body = raw_body.decode("utf-8")

Check warning on line 40 in moto/securityhub/responses.py

View check run for this annotation

Codecov / codecov/patch

moto/securityhub/responses.py#L40

Added line #L40 was not covered by tests
body = json.loads(raw_body)

findings = body.get("Findings", [])

failed_count, success_count, failed_findings = (
self.securityhub_backend.batch_import_findings(
findings=findings,
)
)

return json.dumps(
{
"FailedCount": failed_count,
"FailedFindings": [
{
"ErrorCode": finding.get("ErrorCode"),
"ErrorMessage": finding.get("ErrorMessage"),
"Id": finding.get("Id"),
}
for finding in failed_findings
],
"SuccessCount": success_count,
}
)
12 changes: 12 additions & 0 deletions moto/securityhub/urls.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
"""securityhub base URL and path."""

from .responses import SecurityHubResponse

url_bases = [
r"https?://securityhub\.(.+)\.amazonaws\.com",
]

url_paths = {
"{0}/findings$": SecurityHubResponse.dispatch,
"{0}/findings/import$": SecurityHubResponse.dispatch,
}
Empty file.
156 changes: 156 additions & 0 deletions tests/test_securityhub/test_securityhub.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,156 @@
"""Unit tests for securityhub-supported APIs."""

import boto3
import pytest
from botocore.exceptions import ClientError

from moto import mock_aws
from moto.core import DEFAULT_ACCOUNT_ID


@mock_aws
def test_get_findings():
client = boto3.client("securityhub", region_name="us-east-1")

test_finding = {
"AwsAccountId": DEFAULT_ACCOUNT_ID,
"CreatedAt": "2024-01-01T00:00:00.001Z",
"UpdatedAt": "2024-01-01T00:00:00.000Z",
"Description": "Test finding description",
"GeneratorId": "test-generator",
"Id": "test-finding-001",
"ProductArn": f"arn:aws:securityhub:{client.meta.region_name}:{DEFAULT_ACCOUNT_ID}:product/{DEFAULT_ACCOUNT_ID}/default",
"Resources": [{"Id": "test-resource", "Type": "AwsEc2Instance"}],
"SchemaVersion": "2018-10-08",
"Severity": {"Label": "HIGH"},
"Title": "Test Finding",
"Types": ["Software and Configuration Checks"],
}

import_response = client.batch_import_findings(Findings=[test_finding])
assert import_response["SuccessCount"] == 1

response = client.get_findings()

assert "Findings" in response
assert isinstance(response["Findings"], list)
assert len(response["Findings"]) == 1
finding = response["Findings"][0]
assert finding["Id"] == "test-finding-001"
assert finding["SchemaVersion"] == "2018-10-08"


@mock_aws
def test_batch_import_findings():
client = boto3.client("securityhub", region_name="us-east-2")

valid_finding = {
"AwsAccountId": DEFAULT_ACCOUNT_ID,
"CreatedAt": "2024-01-01T00:00:00.000Z",
"UpdatedAt": "2024-01-01T00:00:00.000Z",
"Description": "Test finding description",
"GeneratorId": "test-generator",
"Id": "test-finding-001",
"ProductArn": f"arn:aws:securityhub:{client.meta.region_name}:{DEFAULT_ACCOUNT_ID}:product/{DEFAULT_ACCOUNT_ID}/default",
"Resources": [{"Id": "test-resource", "Type": "AwsEc2Instance"}],
"SchemaVersion": "2018-10-08",
"Severity": {"Label": "HIGH"},
"Title": "Test Finding",
"Types": ["Software and Configuration Checks"],
}

response = client.batch_import_findings(Findings=[valid_finding])
assert response["SuccessCount"] == 1
assert response["FailedCount"] == 0
assert response["FailedFindings"] == []

invalid_finding = valid_finding.copy()
invalid_finding["Id"] = "test-finding-002"
invalid_finding["Severity"]["Label"] = "INVALID_LABEL"

response = client.batch_import_findings(Findings=[invalid_finding])

assert response["SuccessCount"] == 1
assert response["FailedCount"] == 0
assert len(response["FailedFindings"]) == 0


@mock_aws
def test_get_findings_invalid_parameters():
client = boto3.client("securityhub", region_name="us-east-1")

with pytest.raises(ClientError) as exc:
client.get_findings(MaxResults=101)

err = exc.value.response["Error"]
assert err["Code"] == "InvalidInputException"
assert "MaxResults must be a number between 1 and 100" in err["Message"]


@mock_aws
def test_batch_import_multiple_findings():
client = boto3.client("securityhub", region_name="us-east-1")

findings = [
{
"AwsAccountId": DEFAULT_ACCOUNT_ID,
"CreatedAt": "2024-01-01T00:00:00.000Z",
"UpdatedAt": "2024-01-01T00:00:00.000Z",
"Description": f"Test finding description {i}",
"GeneratorId": "test-generator",
"Id": f"test-finding-{i:03d}",
"ProductArn": f"arn:aws:securityhub:{client.meta.region_name}:{DEFAULT_ACCOUNT_ID}:product/{DEFAULT_ACCOUNT_ID}/default",
"Resources": [{"Id": f"test-resource-{i}", "Type": "AwsEc2Instance"}],
"SchemaVersion": "2018-10-08",
"Severity": {"Label": "HIGH"},
"Title": f"Test Finding {i}",
"Types": ["Software and Configuration Checks"],
}
for i in range(1, 4)
]

import_response = client.batch_import_findings(Findings=findings)
assert import_response["SuccessCount"] == 3
assert import_response["FailedCount"] == 0
assert import_response["FailedFindings"] == []

get_response = client.get_findings()
assert "Findings" in get_response
assert isinstance(get_response["Findings"], list)
assert len(get_response["Findings"]) == 3

imported_ids = {finding["Id"] for finding in get_response["Findings"]}
expected_ids = {f"test-finding-{i:03d}" for i in range(1, 4)}
assert imported_ids == expected_ids


@mock_aws
def test_get_findings_max_results():
client = boto3.client("securityhub", region_name="us-east-1")

findings = [
{
"AwsAccountId": DEFAULT_ACCOUNT_ID,
"CreatedAt": "2024-01-01T00:00:00.000Z",
"UpdatedAt": "2024-01-01T00:00:00.000Z",
"Description": f"Test finding description {i}",
"GeneratorId": "test-generator",
"Id": f"test-finding-{i:03d}",
"ProductArn": f"arn:aws:securityhub:{client.meta.region_name}:{DEFAULT_ACCOUNT_ID}:product/{DEFAULT_ACCOUNT_ID}/default",
"Resources": [{"Id": f"test-resource-{i}", "Type": "AwsEc2Instance"}],
"SchemaVersion": "2018-10-08",
"Severity": {"Label": "HIGH"},
"Title": f"Test Finding {i}",
"Types": ["Software and Configuration Checks"],
}
for i in range(1, 4)
]

import_response = client.batch_import_findings(Findings=findings)
assert import_response["SuccessCount"] == 3

get_response = client.get_findings(MaxResults=1)
assert "Findings" in get_response
assert isinstance(get_response["Findings"], list)
assert len(get_response["Findings"]) == 1
assert "NextToken" in get_response
Loading