Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix attack chain comparison #425

Merged
merged 3 commits into from
Jul 28, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 6 additions & 6 deletions system_test_mapping.json
Original file line number Diff line number Diff line change
Expand Up @@ -151,8 +151,8 @@
"Backend"
],
"target_repositories": [
"cadashboardbe",
"event-ingester-service"
"cadashboardbe-dummy",
"event-ingester-service-dummy"
],
"description": "This test checks the scenario of fixing an AC by fixing the image without relevancy info with cronjob.",
"skip_on_environment": "",
Expand Down Expand Up @@ -677,10 +677,10 @@
"Backend"
],
"target_repositories": [
"cadashboardbe",
"careportsreceiver",
"event-ingester-service",
"gateway"
"cadashboardbe-dummy",
"careportsreceiver-dummy",
"event-ingester-service-dummy",
"gateway-dummy"
],
"description": "",
"skip_on_environment": "",
Expand Down
46 changes: 25 additions & 21 deletions systest_utils/scenarios_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -111,11 +111,12 @@ class AttackChainsScenarioManager(ScenarioManager):
def __init__(self, test_obj, backend: backend_api.ControlPanelAPI, cluster, namespace):
super().__init__(test_obj, backend, cluster, namespace, SCENARIOS_TEST_PATH)

def verify_scenario(self):
def verify_scenario(self, current_datetime=None):
"""
verify_scenario validate the attack chains results on the backend
"""
current_datetime = datetime.now(timezone.utc)
if current_datetime == None:
current_datetime = datetime.now(timezone.utc)
Logger.logger.info("wait for response from BE")
r, t = self.wait_for_report(
self.backend.get_active_attack_chains,
Expand All @@ -130,8 +131,10 @@ def verify_scenario(self):
response = json.loads(r.text)

Logger.logger.info('comparing attack-chains result with expected ones')
assert self.check_attack_chains_results(response, expected), f"Attack chain response differs from the expected one. Response: {response}, Expected: {expected}"
return True
try:
self.check_attack_chains_results(response, expected)
except Exception as e:
raise Exception(f"Failed to validate attack chains scenario: {e}, response: {response}, expected: {expected}")

def verify_fix(self):
"""
Expand All @@ -151,7 +154,7 @@ def verify_fix(self):



def compare_nodes(self, obj1, obj2) -> bool:
def compare_nodes(self, obj1, obj2):
"""Walk 2 dictionary object to compare their values.

:param obj1: dictionary one to be compared.
Expand All @@ -161,26 +164,26 @@ def compare_nodes(self, obj1, obj2) -> bool:
# check at first if we are managin dictionaries
if isinstance(obj1, dict) and isinstance(obj2, dict):
if 'relatedResources' in obj1 and 'relatedResources' in obj2 and obj1['relatedResources'] != None and obj2['relatedResources'] != None and obj1['relatedResources'] != "None" and obj2['relatedResources'] != "None":
if len(obj1['relatedResources']) != len(obj2['relatedResources']):
Logger.logger.error(f"Length mismatch: result: {len(obj1['relatedResources'])} != expected: {len(obj2['relatedResources'])}")
return False
assert len(obj1['relatedResources']) == len(obj2['relatedResources']), f"Length mismatch: result: {len(obj1['relatedResources'])} != expected: {len(obj2['relatedResources'])}"
# check if key 'nextNodes' is present in the dictionaries
if 'nextNodes' in obj1 and 'nextNodes' in obj2:
# check if length of the items is the same
if len(obj1['nextNodes']) != len(obj2['nextNodes']):
return False
assert len(obj1['nextNodes']) == len(obj2['nextNodes']), f"Length mismatch: result: {len(obj1['nextNodes'])} != expected: {len(obj2['nextNodes'])}"

# sort the nextNodes by name
obj1['nextNodes'] = sorted(obj1['nextNodes'], key=lambda x: x['name'])
obj2['nextNodes'] = sorted(obj2['nextNodes'], key=lambda x: x['name'])

# loop over the new nextNodes
for node1, node2 in zip(obj1['nextNodes'], obj2['nextNodes']):
if not self.compare_nodes(node1, node2):
return False
return True
self.compare_nodes(node1, node2)

else:
if 'name' in obj1 and 'name' in obj2:
return obj1['name'] == obj2['name']
assert obj1['name'] == obj2['name'], f"Node name mismatch: result: {obj1['name']} != expected: {obj2['name']}"
return all(self.compare_nodes(obj1[key], obj2[key]) for key in obj1.keys())
return False

def check_attack_chains_results(self, result, expected) -> bool:
def check_attack_chains_results(self, result, expected):
"""Validate the input content with the expected one.

:param result: content retrieved from backend.
Expand All @@ -190,11 +193,12 @@ def check_attack_chains_results(self, result, expected) -> bool:
for acid, ac in enumerate(result['response']['attackChains']):
ac_node_result = result['response']['attackChains'][acid]['attackChainNodes']
ac_node_expected = expected['response']['attackChains'][acid]['attackChainNodes']
if ac_node_result['name'] != ac_node_expected['name']:
return False
if not self.compare_nodes(ac_node_result, ac_node_expected):
return False
return True

# comparing the 'name' (type: attack track) of the attack chain
assert ac_node_result['name'] == ac_node_expected['name'], f"Attack chain name mismatch: result: {ac_node_result['name']} != expected: {ac_node_expected['name']}"

self.compare_nodes(ac_node_result, ac_node_expected)



class SecurityRisksScenarioManager(ScenarioManager):
Expand Down
5 changes: 4 additions & 1 deletion tests_scripts/helm/ks_microservice.py
Original file line number Diff line number Diff line change
Expand Up @@ -325,6 +325,7 @@ def __init__(self, test_obj=None, backend=None, kubernetes_obj=None, test_driver
super(ScanAttackChainsWithKubescapeHelmChart, self).__init__(test_obj=test_obj, backend=backend,
kubernetes_obj=kubernetes_obj,
test_driver=test_driver)
self.wait_for_agg_to_end = False

def start(self):
"""
Expand All @@ -343,6 +344,8 @@ def start(self):
self.ignore_agent = True
cluster, namespace = self.setup(apply_services=False)

current_datetime = datetime.now(timezone.utc)

Logger.logger.info('1. Install attack-chains scenario manifests in the cluster')
Logger.logger.info(
f"1.1 construct AttackChainsScenarioManager with test_scenario: {self.test_obj[('test_scenario', None)]} and cluster {cluster}")
Expand All @@ -364,7 +367,7 @@ def start(self):
time.sleep(10)

Logger.logger.info("3. Verify scenario on backend")
scenarios_manager.verify_scenario()
scenarios_manager.verify_scenario(current_datetime)
Logger.logger.info("attack chains detected, applying fix command")

Logger.logger.info("4. Apply attack chain fix")
Expand Down
Loading