Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Policy engine with new api func for exposure analysis #307

30 changes: 21 additions & 9 deletions pkg/netpol/connlist/connlist.go
Original file line number Diff line number Diff line change
Expand Up @@ -190,19 +190,31 @@ func (ca *ConnlistAnalyzer) hasFatalError() error {
return nil
}

func (ca *ConnlistAnalyzer) connslistFromParsedResources(objectsList []parser.K8sObject) ([]Peer2PeerConnection, []Peer, error) {
// getPolicyEngine returns a new policy engine considering the exposure analysis option
func (ca *ConnlistAnalyzer) getPolicyEngine(objectsList []parser.K8sObject) (*eval.PolicyEngine, error) {
// TODO: do we need logger in policyEngine?
pe, err := eval.NewPolicyEngineWithObjects(objectsList)
if !ca.exposureAnalysis {
return eval.NewPolicyEngineWithObjects(objectsList)
}
// else build new policy engine with exposure analysis
pe := eval.NewPolicyEngineWithOptions(ca.exposureAnalysis)
// add objects from real resources
err := pe.AddObjects(objectsList)
if err != nil {
return nil, err
}
// TODO: this will be eliminated when adding representative peers while policies upsert
// add representative resources
err = pe.SetExposureAnalysisResources()
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

do we have a planned item to avoid re-iterating the netpols here at SetExposureAnalysisResources, and add the representative peers while adding policies? if so add a todo here as well

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes we do :

    • optimize fake-pods generations
      - first add fake pods for all non-empty rules while policies upsert
      - refine pods that has a match in the resources

added TODO

return pe, err
}

func (ca *ConnlistAnalyzer) connslistFromParsedResources(objectsList []parser.K8sObject) ([]Peer2PeerConnection, []Peer, error) {
pe, err := ca.getPolicyEngine(objectsList)
if err != nil {
ca.errors = append(ca.errors, newResourceEvaluationError(err))
return nil, nil, err
}
if ca.exposureAnalysis {
err = pe.SetExposureAnalysisResources()
if err != nil {
return nil, nil, err
}
}
ia, err := ingressanalyzer.NewIngressAnalyzerWithObjects(objectsList, pe, ca.logger, ca.muteErrsAndWarns)
if err != nil {
ca.errors = append(ca.errors, newResourceEvaluationError(err))
Expand Down Expand Up @@ -491,7 +503,7 @@ func (ca *ConnlistAnalyzer) existsFocusWorkload(peers []Peer, excludeIngressAnal
func (ca *ConnlistAnalyzer) getConnectionsBetweenPeers(pe *eval.PolicyEngine, peers []Peer) ([]Peer2PeerConnection, exposureMap, error) {
connsRes := make([]Peer2PeerConnection, 0)
exposuresMap := exposureMap{}
// sets for marking peer checked for ingress/egress exposure to entire cluster data once
// for exposure-analysis use: sets for marking peer checked for ingress/egress exposure to entire cluster data once
ingressSet := make(map[Peer]bool, 0)
egressSet := make(map[Peer]bool, 0)

Expand Down
69 changes: 46 additions & 23 deletions pkg/netpol/eval/check.go
Original file line number Diff line number Diff line change
Expand Up @@ -131,7 +131,7 @@ func (pe *PolicyEngine) AllAllowedConnectionsBetweenWorkloadPeers(srcPeer, dstPe
func (pe *PolicyEngine) allAllowedConnectionsBetweenPeers(srcPeer, dstPeer Peer) (*common.ConnectionSet, error) {
srcK8sPeer := srcPeer.(k8s.Peer)
dstK8sPeer := dstPeer.(k8s.Peer)
// sets of peers which their exposure to entire cluster is already checked
// for exposure-analysis use: sets of peers which their exposure to entire cluster is already checked
ingressSet := make(map[k8s.Peer]bool, 0)
egressSet := make(map[k8s.Peer]bool, 0)
var res *common.ConnectionSet
Expand Down Expand Up @@ -179,7 +179,7 @@ func (pe *PolicyEngine) getPoliciesSelectingPod(p *k8s.Pod, direction netv1.Poli
res = append(res, policy)
}
}
if len(res) > 0 {
if pe.exposureAnalysisFlag && len(res) > 0 {
p.UpdatePodXgressProtectedFlag(direction == netv1.PolicyTypeIngress)
}
return res, nil
Expand Down Expand Up @@ -268,45 +268,68 @@ func (pe *PolicyEngine) allallowedXgressConnections(src, dst k8s.Peer, isIngress

// iterate relevant network policies (that capture the required pod)
for _, policy := range netpols {
// if isIngress: check for ingress rules that capture src within 'from'
// if not isIngress: check for egress rules that capture dst within 'to'
// collect the allowed connectivity from the relevant rules into allowedConns
var policyAllowedConnectionsPerDirection *common.ConnectionSet
var err error
if isIngress {
// policy selecting dst
policyAllowedConnectionsPerDirection, err = policy.GetIngressAllowedConns(src, dst)
if !ingressSet[dst] {
// in case of exposure-analysis: update exposure data for relevant pod
if pe.exposureAnalysisFlag {
if isIngress && !ingressSet[dst] {
// policy selecting dst (dst pod is real)
// if this is first time scanning policies selecting this dst peer,
// update its ingress entire cluster connection relying on policy data
dst.GetPeerPod().UpdatePodXgressExposureToEntireClusterData(policy.IngressGeneralConns.EntireClusterConns, isIngress)
}
} else {
// policy selecting src
policyAllowedConnectionsPerDirection, err = policy.GetEgressAllowedConns(dst)
if !egressSet[src] {
if !isIngress && !egressSet[src] {
// policy selecting src
// if this is first time scanning policies selecting this src peer,
// update its egress entire cluster connection relying on policy data
src.GetPeerPod().UpdatePodXgressExposureToEntireClusterData(policy.EgressGeneralConns.EntireClusterConns, isIngress)
}
}
// determine policy's allowed connections between the peers for the direction
// if isIngress: check for ingress rules that capture src within 'from'
// if not isIngress: check for egress rules that capture dst within 'to'
// collect the allowed connectivity from the relevant rules into allowedConns
policyAllowedConnectionsPerDirection, err := determineAllowedConnsPerDirection(policy, src, dst, isIngress)
if err != nil {
return allowedConns, err
}
allowedConns.Union(policyAllowedConnectionsPerDirection)
}
if isIngress {
// after looping the policies selecting this dst for first time, the exposure to entire cluster on ingress is computed;
// no need to compute again
ingressSet[dst] = true
} else {
// after looping the policies selecting this src for first time, the exposure to entire cluster on egress is computed;
// no need to compute again
egressSet[src] = true
if pe.exposureAnalysisFlag {
if isIngress {
// after looping the policies selecting this dst for first time, the exposure to entire cluster on ingress is computed;
// no need to compute again
ingressSet[dst] = true
} else {
// after looping the policies selecting this src for first time, the exposure to entire cluster on egress is computed;
// no need to compute again
egressSet[src] = true
}
}
return allowedConns, nil
}

// determineAllowedConnsPerDirection - helping func, determine policy's allowed connections between the peers for the direction
func determineAllowedConnsPerDirection(policy *k8s.NetworkPolicy, src, dst k8s.Peer, isIngress bool) (*common.ConnectionSet, error) {
if isIngress {
switch {
case policy.IngressGeneralConns.AllDestinationsConns.AllowAll:
return policy.IngressGeneralConns.AllDestinationsConns, nil
case policy.IngressGeneralConns.EntireClusterConns.AllowAll && src.PeerType() == k8s.PodType:
return policy.IngressGeneralConns.EntireClusterConns, nil
default:
return policy.GetIngressAllowedConns(src, dst)
}
}
// else egress
switch {
case policy.EgressGeneralConns.AllDestinationsConns.AllowAll:
return policy.EgressGeneralConns.AllDestinationsConns, nil
case policy.EgressGeneralConns.EntireClusterConns.AllowAll && dst.PeerType() == k8s.PodType:
return policy.EgressGeneralConns.EntireClusterConns, nil
default:
return policy.GetEgressAllowedConns(dst)
}
}

// isPeerNodeIP returns true if peer1 is an IP address of a node and peer2 is a pod on that node
func isPeerNodeIP(peer1, peer2 k8s.Peer) bool {
return peer2.PeerType() == k8s.PodType && peer1.PeerType() == k8s.IPBlockType &&
Expand Down
12 changes: 8 additions & 4 deletions pkg/netpol/eval/internal/k8s/netpol.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ import (
// -> might help to preprocess and store peers that match policy selectors + selectors in rules + set of allowed connections per rule
type NetworkPolicy struct {
*netv1.NetworkPolicy // embedding k8s network policy object
// following data stored in preprocessing;
// following data stored in preprocessing when exposure-analysis is on;
// IngressGeneralConns contains:
// - the maximal connection-set which the policy's rules allow to all destinations on ingress direction
// - the maximal connection-set which the policy's rules allow to all namespaces in the cluster on ingress direction
Expand All @@ -48,6 +48,10 @@ type NetworkPolicy struct {
EgressGeneralConns PolicyGeneralRulesConns
}

// @todo might help if while pre-process, to check containment of all rules' connections; if all "specific" rules
// connections are contained in the "general" rules connections, then we can avoid iterating policy rules for computing
// connections between two peers

type PolicyGeneralRulesConns struct {
// AllDestinationsConns contains the maximal connection-set which the policy's rules allow to all destinations
// (all namespaces, pods and IP addresses)
Expand Down Expand Up @@ -288,7 +292,7 @@ func (np *NetworkPolicy) EgressAllowedConn(dst Peer, protocol, port string) (boo
return false, nil
}

// GetEgressAllowedConns returns the set of allowed connetions from any captured pod to the destination peer
// GetEgressAllowedConns returns the set of allowed connections from any captured pod to the destination peer
func (np *NetworkPolicy) GetEgressAllowedConns(dst Peer) (*common.ConnectionSet, error) {
res := common.MakeConnectionSet(false)
for _, rule := range np.Spec.Egress {
Expand Down Expand Up @@ -438,7 +442,7 @@ func (np *NetworkPolicy) fullName() string {
}

// /////////////////////////////////////////////////////////////////////////////////////////////
// pre-processing computations - currently performed for exposure-analysis goals;
// pre-processing computations - currently performed for exposure-analysis goals only;

// DetermineGeneralConnectionsOfPolicy scans policy rules and updates if it allows conns with all destinations or/ and with entire cluster
// for ingress and/ or egress directions
Expand Down Expand Up @@ -499,7 +503,7 @@ func (np *NetworkPolicy) scanRulesForGeneralEgressConns() error {
return nil
}

// doRulesExposeToAllDestOrEntireCluster checks if the given rules list is exposed to entire cluster;
// doRulesExposeToAllDestOrEntireCluster checks if the given rules list is exposed to entire world or entire cluster;
// i.e. if the rules list is empty or if there is a rule with empty namespaceSelector
// this func assumes rules are legal (rules correctness check occurs later)
func (np *NetworkPolicy) doRulesExposeToAllDestOrEntireCluster(rules []netv1.NetworkPolicyPeer) (alldests, entireCluster bool) {
Expand Down
32 changes: 21 additions & 11 deletions pkg/netpol/eval/internal/k8s/pod.go
Original file line number Diff line number Diff line change
Expand Up @@ -280,17 +280,9 @@ func (pod *Pod) UpdatePodXgressExposureToEntireClusterData(ruleConns *common.Con
if isIngress {
// for a dst pod check if the given ruleConns contains namedPorts; if yes replace them with pod's
// matching port number
connNamedPorts := ruleConns.GetNamedPorts()
if len(connNamedPorts) > 0 {
ruleConnsCopy := ruleConns.Copy() // copying the connectionSet; in order to replace
// the named ports with pod's port numbers
for protocol, namedPorts := range connNamedPorts {
for _, namedPort := range namedPorts {
portNum := pod.ConvertPodNamedPort(namedPort)
ruleConnsCopy.ReplaceNamedPortWithMatchingPortNum(protocol, namedPort, portNum)
}
}
pod.IngressExposureData.EntireClusterConnection.Union(ruleConnsCopy)
convertedConns := pod.checkAndConvertNamedPortsInConnection(ruleConns)
if convertedConns != nil {
pod.IngressExposureData.EntireClusterConnection.Union(convertedConns)
} else {
pod.IngressExposureData.EntireClusterConnection.Union(ruleConns)
}
Expand All @@ -299,6 +291,24 @@ func (pod *Pod) UpdatePodXgressExposureToEntireClusterData(ruleConns *common.Con
}
}

// checkAndConvertNamedPortsInConnection returns the copy of the given connectionSet where named ports are converted;
// returns nil if the given connectionSet has no named ports
func (pod *Pod) checkAndConvertNamedPortsInConnection(conns *common.ConnectionSet) *common.ConnectionSet {
connNamedPorts := conns.GetNamedPorts()
if len(connNamedPorts) == 0 {
return nil
} // else - found named ports
connsCopy := conns.Copy() // copying the connectionSet; in order to replace
// the named ports with pod's port numbers
for protocol, namedPorts := range connNamedPorts {
for _, namedPort := range namedPorts {
portNum := pod.ConvertPodNamedPort(namedPort)
connsCopy.ReplaceNamedPortWithMatchingPortNum(protocol, namedPort, portNum)
}
}
return connsCopy
}

// updatePodXgressProtectedFlag updates to true the relevant ingress/egress protected flag of the pod
func (pod *Pod) UpdatePodXgressProtectedFlag(isIngress bool) {
if isIngress {
Expand Down
31 changes: 23 additions & 8 deletions pkg/netpol/eval/resources.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,21 @@ func NewPolicyEngine() *PolicyEngine {

func NewPolicyEngineWithObjects(objects []parser.K8sObject) (*PolicyEngine, error) {
pe := NewPolicyEngine()
err := pe.AddObjects(objects)
return pe, err
}

// NewPolicyEngineWithOptions returns a new policy engine with an empty state but updating the exposure analysis flag,
// TBD: currently exposure-analysis is the only option supported by policy-engine, so no need for options param,
// should have the exposureFlag or just assign to true in the func?
func NewPolicyEngineWithOptions(exposureFlag bool) *PolicyEngine {
pe := NewPolicyEngine()
pe.exposureAnalysisFlag = exposureFlag
return pe
}

// AddObjects adds k8s objects from parsed resources to the policy engine
func (pe *PolicyEngine) AddObjects(objects []parser.K8sObject) error {
var err error
for _, obj := range objects {
switch obj.Kind {
Expand Down Expand Up @@ -85,17 +100,15 @@ func NewPolicyEngineWithObjects(objects []parser.K8sObject) (*PolicyEngine, erro
fmt.Printf("ignoring resource kind %s", obj.Kind)
}
if err != nil {
return nil, err
return err
}
}
err = pe.resolveMissingNamespaces()
return pe, err
return pe.resolveMissingNamespaces()
}

// SetExposureAnalysisResources sets the new representative peers needed for exposure analysis
// TODO: changes may be done on optimizing PR (will the flag be relevant after optimize? , should have these both funcs?..)
// TODO: changes may be done on optimizing PR (should have these both funcs?)
func (pe *PolicyEngine) SetExposureAnalysisResources() error {
pe.exposureAnalysisFlag = true
// scan policies' rules for new pods in (unmatched) namespaces (TODO : and unmatched pods in un/matched namespaces)
return pe.addRepresentativePods()
}
Expand Down Expand Up @@ -335,10 +348,12 @@ func (pe *PolicyEngine) upsertNetworkPolicy(np *netv1.NetworkPolicy) error {
}
pe.netpolsMap[netpolNamespace][np.Name] = newNetpol

// scan policy ingress and egress rules to store allowed connections
// for exposure analysis only: scan policy ingress and egress rules to store allowed connections
// to entire cluster and to all destinations (if such connections are allowed by the policy)
err := newNetpol.DetermineGeneralConnectionsOfPolicy()

var err error
if pe.exposureAnalysisFlag {
err = newNetpol.DetermineGeneralConnectionsOfPolicy()
}
// clear the cache on netpols changes
pe.cache.clear()
return err
Expand Down
Loading