Skip to content

Commit

Permalink
Merge pull request #425 from armosec/fix_ac_tests_compare
Browse files Browse the repository at this point in the history
fix attack chain comparison
  • Loading branch information
kooomix authored Jul 28, 2024
2 parents 62e10f1 + accabf8 commit 0f58e98
Show file tree
Hide file tree
Showing 3 changed files with 35 additions and 28 deletions.
12 changes: 6 additions & 6 deletions system_test_mapping.json
Original file line number Diff line number Diff line change
Expand Up @@ -151,8 +151,8 @@
"Backend"
],
"target_repositories": [
"cadashboardbe",
"event-ingester-service"
"cadashboardbe-dummy",
"event-ingester-service-dummy"
],
"description": "This test checks the scenario of fixing an AC by fixing the image without relevancy info with cronjob.",
"skip_on_environment": "",
Expand Down Expand Up @@ -677,10 +677,10 @@
"Backend"
],
"target_repositories": [
"cadashboardbe",
"careportsreceiver",
"event-ingester-service",
"gateway"
"cadashboardbe-dummy",
"careportsreceiver-dummy",
"event-ingester-service-dummy",
"gateway-dummy"
],
"description": "",
"skip_on_environment": "",
Expand Down
46 changes: 25 additions & 21 deletions systest_utils/scenarios_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -111,11 +111,12 @@ class AttackChainsScenarioManager(ScenarioManager):
def __init__(self, test_obj, backend: backend_api.ControlPanelAPI, cluster, namespace):
super().__init__(test_obj, backend, cluster, namespace, SCENARIOS_TEST_PATH)

def verify_scenario(self):
def verify_scenario(self, current_datetime=None):
"""
verify_scenario validate the attack chains results on the backend
"""
current_datetime = datetime.now(timezone.utc)
if current_datetime == None:
current_datetime = datetime.now(timezone.utc)
Logger.logger.info("wait for response from BE")
r, t = self.wait_for_report(
self.backend.get_active_attack_chains,
Expand All @@ -130,8 +131,10 @@ def verify_scenario(self):
response = json.loads(r.text)

Logger.logger.info('comparing attack-chains result with expected ones')
assert self.check_attack_chains_results(response, expected), f"Attack chain response differs from the expected one. Response: {response}, Expected: {expected}"
return True
try:
self.check_attack_chains_results(response, expected)
except Exception as e:
raise Exception(f"Failed to validate attack chains scenario: {e}, response: {response}, expected: {expected}")

def verify_fix(self):
"""
Expand All @@ -151,7 +154,7 @@ def verify_fix(self):



def compare_nodes(self, obj1, obj2) -> bool:
def compare_nodes(self, obj1, obj2):
"""Walk 2 dictionary object to compare their values.
:param obj1: dictionary one to be compared.
Expand All @@ -161,26 +164,26 @@ def compare_nodes(self, obj1, obj2) -> bool:
# check at first if we are managin dictionaries
if isinstance(obj1, dict) and isinstance(obj2, dict):
if 'relatedResources' in obj1 and 'relatedResources' in obj2 and obj1['relatedResources'] != None and obj2['relatedResources'] != None and obj1['relatedResources'] != "None" and obj2['relatedResources'] != "None":
if len(obj1['relatedResources']) != len(obj2['relatedResources']):
Logger.logger.error(f"Length mismatch: result: {len(obj1['relatedResources'])} != expected: {len(obj2['relatedResources'])}")
return False
assert len(obj1['relatedResources']) == len(obj2['relatedResources']), f"Length mismatch: result: {len(obj1['relatedResources'])} != expected: {len(obj2['relatedResources'])}"
# check if key 'nextNodes' is present in the dictionaries
if 'nextNodes' in obj1 and 'nextNodes' in obj2:
# check if length of the items is the same
if len(obj1['nextNodes']) != len(obj2['nextNodes']):
return False
assert len(obj1['nextNodes']) == len(obj2['nextNodes']), f"Length mismatch: result: {len(obj1['nextNodes'])} != expected: {len(obj2['nextNodes'])}"

# sort the nextNodes by name
obj1['nextNodes'] = sorted(obj1['nextNodes'], key=lambda x: x['name'])
obj2['nextNodes'] = sorted(obj2['nextNodes'], key=lambda x: x['name'])

# loop over the new nextNodes
for node1, node2 in zip(obj1['nextNodes'], obj2['nextNodes']):
if not self.compare_nodes(node1, node2):
return False
return True
self.compare_nodes(node1, node2)

else:
if 'name' in obj1 and 'name' in obj2:
return obj1['name'] == obj2['name']
assert obj1['name'] == obj2['name'], f"Node name mismatch: result: {obj1['name']} != expected: {obj2['name']}"
return all(self.compare_nodes(obj1[key], obj2[key]) for key in obj1.keys())
return False

def check_attack_chains_results(self, result, expected) -> bool:
def check_attack_chains_results(self, result, expected):
"""Validate the input content with the expected one.
:param result: content retrieved from backend.
Expand All @@ -190,11 +193,12 @@ def check_attack_chains_results(self, result, expected) -> bool:
for acid, ac in enumerate(result['response']['attackChains']):
ac_node_result = result['response']['attackChains'][acid]['attackChainNodes']
ac_node_expected = expected['response']['attackChains'][acid]['attackChainNodes']
if ac_node_result['name'] != ac_node_expected['name']:
return False
if not self.compare_nodes(ac_node_result, ac_node_expected):
return False
return True

# comparing the 'name' (type: attack track) of the attack chain
assert ac_node_result['name'] == ac_node_expected['name'], f"Attack chain name mismatch: result: {ac_node_result['name']} != expected: {ac_node_expected['name']}"

self.compare_nodes(ac_node_result, ac_node_expected)



class SecurityRisksScenarioManager(ScenarioManager):
Expand Down
5 changes: 4 additions & 1 deletion tests_scripts/helm/ks_microservice.py
Original file line number Diff line number Diff line change
Expand Up @@ -325,6 +325,7 @@ def __init__(self, test_obj=None, backend=None, kubernetes_obj=None, test_driver
super(ScanAttackChainsWithKubescapeHelmChart, self).__init__(test_obj=test_obj, backend=backend,
kubernetes_obj=kubernetes_obj,
test_driver=test_driver)
self.wait_for_agg_to_end = False

def start(self):
"""
Expand All @@ -343,6 +344,8 @@ def start(self):
self.ignore_agent = True
cluster, namespace = self.setup(apply_services=False)

current_datetime = datetime.now(timezone.utc)

Logger.logger.info('1. Install attack-chains scenario manifests in the cluster')
Logger.logger.info(
f"1.1 construct AttackChainsScenarioManager with test_scenario: {self.test_obj[('test_scenario', None)]} and cluster {cluster}")
Expand All @@ -364,7 +367,7 @@ def start(self):
time.sleep(10)

Logger.logger.info("3. Verify scenario on backend")
scenarios_manager.verify_scenario()
scenarios_manager.verify_scenario(current_datetime)
Logger.logger.info("attack chains detected, applying fix command")

Logger.logger.info("4. Apply attack chain fix")
Expand Down

0 comments on commit 0f58e98

Please sign in to comment.