Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Network policies tests #232

Merged
merged 8 commits into from
Jan 23, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 9 additions & 0 deletions configurations/system/tests_cases/network_policy_tests.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
from .structures import TestConfiguration
from os.path import join


class NetworkPolicyTests(object):

@staticmethod
Expand Down Expand Up @@ -111,3 +112,11 @@ def network_policy_known_servers():
],
helm_kwargs={statics.HELM_NETWORK_POLICY_FEATURE: statics.HELM_RELEVANCY_FEATURE_ENABLED, statics.HELM_NODE_AGENT_LEARNING_PERIOD: '30s', statics.HELM_NODE_AGENT_UPDATE_PERIOD: '10s'}
)

@staticmethod
def network_policy_known_servers_cache():
from tests_scripts.helm.network_policy import NetworkPolicyKnownServersCache
return TestConfiguration(
name=inspect.currentframe().f_code.co_name,
test_obj=NetworkPolicyKnownServersCache
)
68 changes: 68 additions & 0 deletions infrastructure/backend_api.py
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,10 @@ class NotExistingCustomer(Exception):

API_ATTACK_CHAINS = "/api/v1/attackchains"

API_NETWORK_POLICIES = "/api/v1/networkpolicies"
API_NETWORK_POLICIES_GENERATE = "/api/v1/networkpolicies/generate"
API_NETWORK_POLICIES_KNOWNSERVERSCACHE = "/api/v1/networkpolicies/knownserverscache"


def deco_cookie(func):

Expand Down Expand Up @@ -1961,6 +1965,70 @@ def get_attack_chains(self, cluster_name=None):
self.customer, r.status_code, r.text))
return r

def get_known_servers_cache(self) -> requests.Response:
params = {"customerGUID": self.selected_tenant_id}
r = self.get(API_NETWORK_POLICIES_KNOWNSERVERSCACHE, params=params)
if not 200 <= r.status_code < 300:
raise Exception(
'Error accessing dashboard. Request: get known servers cache "%s" (code: %d, message: %s)' % (
self.customer, r.status_code, r.text))
return r

def get_network_policies(self, cluster_name, namespace) -> (requests.Response, dict, dict):
params = {"customerGUID": self.selected_tenant_id}

payload = {
"innerFilters": [{"clusterShortName": cluster_name, "namespace": namespace}],
}

r = self.post(API_NETWORK_POLICIES, params=params, json=payload, timeout=60)
Logger.logger.info(r.text)

if not 200 <= r.status_code < 300:
raise Exception(
'Error accessing dashboard. Request: get network policies generate "%s" (code: %d, message: %s)' % (
self.customer, r.status_code, r.text))

response = json.loads(r.text)
workloads_list = response.get("response", None)

assert workloads_list is not None, "network policies response is empty '%s' (code: %d, message: %s)" % (self.customer, r.status_code, r.text)

assert len(workloads_list) > 0, "network policies workloads list is 0 '%s' (code: %d, message: %s)" % (self.customer, r.status_code, r.text)


return r, workloads_list

def get_network_policies_generate(self, cluster_name, workload_name, namespace) -> (requests.Response, dict, dict):
params = {"customerGUID": self.selected_tenant_id}

payload = {
"innerFilters": [{"cluster": cluster_name, "name": workload_name, "namespace": namespace}],
}

r = self.post(API_NETWORK_POLICIES_GENERATE, params=params, json=payload, timeout=60)
Logger.logger.info(r.text)

if not 200 <= r.status_code < 300:
raise Exception(
'Error accessing dashboard. Request: get network policies generate "%s" (code: %d, message: %s)' % (
self.customer, r.status_code, r.text))

response = json.loads(r.text)

# verify there is a response
assert len(response) > 0, "network policies generate response is empty '%s' (code: %d, message: %s)" % (self.customer, r.status_code, r.text)

np = response[0].get("networkPolicies", None).get("kubernetes", None).get("new", None)
# verify there is a 'new' network policy
assert np is not None, "no 'new' NetworkPolicy '%s' (code: %d, message: %s)" % (self.customer, r.status_code, r.text)

graph = response[0].get("graph", None)
# verify there is a 'graph'
assert graph is not None, "No 'graph' '%s' (code: %d, message: %s)" % (self.customer, r.status_code, r.text)

return r, np, graph

def get_active_attack_chains(self, current_datetime=datetime, cluster_name=None) -> requests.Response:
r = self.get_attack_chains(cluster_name)
# checks if respose met conditions to be considered valid:
Expand Down
12 changes: 12 additions & 0 deletions system_test_mapping.json
Original file line number Diff line number Diff line change
Expand Up @@ -591,6 +591,18 @@
"description": "",
"skip_on_environment": ""
},
"network_policy_known_servers_cache": {
"target": [
"In cluster"
],
"target_repositories": [
"helm-chart",
"node-agent",
"storage"
],
"description": "",
"skip_on_environment": ""
},
"vulnerability_scanning_trigger_scan_private_quay_registry": {
"target": [
"In cluster",
Expand Down
179 changes: 172 additions & 7 deletions tests_scripts/helm/base_network_policy.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
from tests_scripts.helm.base_helm import BaseHelm
from pkg_resources import parse_version
from tests_scripts.kubernetes.base_k8s import BaseK8S
from deepdiff import DeepDiff
import copy


class BaseNetworkPolicy(BaseHelm):
Expand Down Expand Up @@ -141,6 +141,159 @@ def validate_network_neighbor_entry(self, expected_entries, actual_entries):

assert len(verified_entries) == len(expected_entries), f"verified_entries length is not equal to expected_entries length, actual: {verified_entries}, expected: {expected_entries}"

def validate_expected_network_neighbors_and_generated_network_policies_lists(self, namespace, expected_network_neighbors_list, expected_generated_network_policy_list):
"""
Validate expected network neighbors and generated network policies lists. It validates the expected network neighbors list and the expected generated network policies list
param namespace: namespace of the object
param expected_network_neighbors_list: list of expected network neighbors
param expected_generated_network_policy_list: list of expected generated network policies
"""
Logger.logger.info("validating expected network neighbors")
self.validate_expected_network_neighbors_list(namespace=namespace, expected_network_neighbors_list=expected_network_neighbors_list)
Logger.logger.info("validated expected network neighbors")

Logger.logger.info("validating expected generated network policies")
self.validate_expected_generated_network_policy_list(namespace=namespace, expected_generated_network_policy_list=expected_generated_network_policy_list)
Logger.logger.info("validated expected generated network policies")


def validate_expected_backend_results(self, cluster, namespace, expected_workloads_list, expected_network_neighbors_list, expected_generated_network_policy_list):
"""
Validate expected backend results. It validates the expected backend workloads list and the expected backend generated network policies list
param cluster: cluster name
param namespace: namespace of the object
param expected_workloads_list: list of expected workloads
param expected_network_neighbors_list: list of expected network neighbors
"""
Logger.logger.info("validating expected backend workloads list")
self.validate_expected_backend_workloads_list(cluster=cluster, namespace=namespace, expected_workloads_list=expected_workloads_list)
Logger.logger.info("validated expected backend workloads list")

Logger.logger.info("validating expected backend generated network policies")
self.validate_expected_backend_generated_network_policy_list(cluster=cluster, namespace=namespace, expected_network_policy_list=expected_generated_network_policy_list, expected_network_neighbors_list=expected_network_neighbors_list)
Logger.logger.info("validated expected backend generated network policies")


def is_workload_deleted_from_backend(self, cluster, workload_name, namespace) -> bool:
"""
Is workload deleted from backend. It pulls the actual network neighbors and validates that the workload is not in the list
param cluster: cluster name
param workload_name: workload name
param namespace: namespace of the object
"""
try:
r, np, graph = self.backend.get_network_policies_generate(cluster_name=cluster, workload_name=workload_name, namespace=namespace)
except Exception as e:
return True
return False



def validate_workload_deleted_from_backend(self, cluster, workload_name, namespace):
"""
Validate workload deleted from backend. It pulls the actual network neighbors and validates that the workload is not in the list
param cluster: cluster name
param workload_name: workload name
param namespace: namespace of the object
"""

deleted, t = self.wait_for_report(timeout=100,
sleep_interval=5,
report_type=self.is_workload_deleted_from_backend,
cluster=cluster,
workload_name=workload_name,
namespace=namespace)


assert deleted == True, f"workload {workload_name} is not deleted from backend"


def validate_expected_backend_workloads_list(self, cluster, namespace, expected_workloads_list):
"""
validate_expected_backend_workloads_list validates the expected backend workloads list. It pulls the actual workloads and validates each one of them
param cluster: cluster name
param namespace: namespace of the object
param expected_workloads_list: list of expected workloads
"""
res, t = self.wait_for_report(timeout=100,
sleep_interval=5,
report_type=self.backend.get_network_policies,
cluster_name=cluster,
namespace=namespace)
workloads_list = res[1]
assert len(workloads_list) == len(expected_workloads_list), f"workloads_list length is not equal to expected_workloads_list length, actual: len:{len(workloads_list)}, expected: len:{len(expected_workloads_list)}; actual results: {workloads_list}, expected results: {expected_workloads_list}"


def validate_expected_backend_generated_network_policy_list(self, cluster, namespace, expected_network_policy_list, expected_network_neighbors_list):
"""
validate_expected_backend_generated_network_policy_list validates the expected backend generated network policies list. It pulls the actual generated network policies and validates each one of them
param cluster: cluster name
param namespace: namespace of the object
param expected_network_policy_list: list of expected backend generated network policies
param expected_network_neighbors_list: list of expected network neighbors
"""

errors = []
for i in range(0, len(expected_network_policy_list)):
workload_name = expected_network_policy_list[i]['metadata']['labels']['kubescape.io/workload-name']
try:
res, t = self.wait_for_report(timeout=100,
sleep_interval=5,
report_type=self.backend.get_network_policies_generate,
cluster_name=cluster,
workload_name=workload_name,
namespace=namespace)
except Exception as e:
errors.append(e)
continue

backend_generated_network_policy = res[1]
graph = res[2]

self.validate_expected_backend_network_policy(expected_network_policy_list[i],backend_generated_network_policy, namespace)

self.validate_expected_network_neighbors(namespace=namespace, actual_network_neighbors=graph, expected_network_neighbors=expected_network_neighbors_list[i])

assert len(errors) == 0, f"Errors in validate_expected_backend_generated_network_policy_list: {errors}"

def convert_backend_network_policy_to_generated_network_policy(self, backend_network_policy) -> dict:
"""
convert_backend_network_policy_to_generated_network_policy converts backend network policy to generated network policy.
param backend_network_policy: backend network policy
"""

new_backend_policy = copy.deepcopy(backend_network_policy)
new_backend_policy["kind"] = "GeneratedNetworkPolicy"
new_backend_policy["apiVersion"] = "spdx.softwarecomposition.kubescape.io/v1beta1"
new_backend_policy["spec"]["apiVersion"] = "networking.k8s.io/v1"
new_backend_policy["spec"]["kind"] = "NetworkPolicy"
new_backend_policy["spec"]["metadata"] = new_backend_policy["metadata"]
del(new_backend_policy["spec"]["apiVersion"])
del(new_backend_policy["spec"]["kind"])

return new_backend_policy



def validate_expected_backend_network_policy(self, expected_network_policy, actual_network_policy, namespace: str):
"""
Validate expected backend network policy. It validates the basic metadata and then validates the policy refs and the network policy
param expected_network_policy: expected backend network policy
param actual_network_policy: actual backend network policy
"""

converted_actual_network_policy = self.convert_backend_network_policy_to_generated_network_policy(actual_network_policy)
self.validate_basic_metadata( actual_obj=converted_actual_network_policy,expected_obj= expected_network_policy, namespace= namespace)

if 'policyRef' in converted_actual_network_policy and 'policyRef' in expected_network_policy:
actual_policies_refs = converted_actual_network_policy['policyRef']
expected_policies_refs = expected_network_policy['policyRef']
self.validate_policy_refs(actual_policy_refs=actual_policies_refs, expected_policy_refs=expected_policies_refs)

actual_policy = converted_actual_network_policy['spec']
expected_policy = expected_network_policy['spec']['spec']
self.validate_network_policy_spec(actual_network_policy_spec=actual_policy, expected_network_policy_spec=expected_policy, namespace=namespace)


def validate_expected_generated_network_policy_list(self, namespace, expected_generated_network_policy_list):
"""
Expand Down Expand Up @@ -182,14 +335,26 @@ def validate_network_policy(self, actual_network_policy, expected_network_policy

self.validate_basic_metadata(actual_obj=actual_network_policy, expected_obj=expected_network_policy, namespace=namespace)

if 'Ingress' in expected_network_policy['spec']['policyTypes']:
expected_network_policy_entries = expected_network_policy['spec']['ingress']
actual_network_policy_entries = actual_network_policy['spec']['ingress']
self.validate_network_policy_spec(actual_network_policy_spec=actual_network_policy['spec'], expected_network_policy_spec=expected_network_policy['spec'], namespace=namespace)


def validate_network_policy_spec(self, actual_network_policy_spec, expected_network_policy_spec, namespace: str):
"""
Validate network policy. It validates the basic metadata and then validates the network policy entries
param actual_network_policy: actual network policy
param expected_network_policy: expected network policy
param namespace: namespace of the object
"""


if 'Ingress' in expected_network_policy_spec['policyTypes']:
expected_network_policy_entries = expected_network_policy_spec['ingress']
actual_network_policy_entries = actual_network_policy_spec['ingress']
self.validate_network_policy_entry(expected_network_policy_entries=expected_network_policy_entries, actual_network_policy_entries=actual_network_policy_entries)

if 'Egress' in expected_network_policy['spec']['policyTypes']:
expected_network_policy_entries = expected_network_policy['spec']['egress']
actual_network_policy_entries = actual_network_policy['spec']['egress']
if 'Egress' in expected_network_policy_spec['policyTypes']:
expected_network_policy_entries = expected_network_policy_spec['egress']
actual_network_policy_entries = actual_network_policy_spec['egress']
self.validate_network_policy_entry(expected_network_policy_entries=expected_network_policy_entries,actual_network_policy_entries=actual_network_policy_entries)


Expand Down
Loading
Loading