Skip to content

Commit

Permalink
Policy engine with new api func for exposure analysis (#307)
Browse files Browse the repository at this point in the history
* task1 add new api func to policy-engine; so pre-process runs only for exposure-analysis

* task2 on exposure analysis benefit from the stored data

* eliminate isRuleGeneral; skip general rules in ruleSelectsPeer

* missing func doc

* Update pkg/netpol/eval/internal/k8s/netpol.go

Co-authored-by: Adi Sosnovich <[email protected]>

* todo comment

* revert initiating conns between two peers

* avoid iterating policy if its general conns are all conns

* todo comment

---------

Co-authored-by: Adi Sosnovich <[email protected]>
  • Loading branch information
shireenf-ibm and adisos authored Feb 15, 2024
1 parent db2c484 commit da21e1b
Show file tree
Hide file tree
Showing 5 changed files with 119 additions and 55 deletions.
30 changes: 21 additions & 9 deletions pkg/netpol/connlist/connlist.go
Original file line number Diff line number Diff line change
Expand Up @@ -190,19 +190,31 @@ func (ca *ConnlistAnalyzer) hasFatalError() error {
return nil
}

func (ca *ConnlistAnalyzer) connslistFromParsedResources(objectsList []parser.K8sObject) ([]Peer2PeerConnection, []Peer, error) {
// getPolicyEngine returns a new policy engine considering the exposure analysis option
func (ca *ConnlistAnalyzer) getPolicyEngine(objectsList []parser.K8sObject) (*eval.PolicyEngine, error) {
// TODO: do we need logger in policyEngine?
pe, err := eval.NewPolicyEngineWithObjects(objectsList)
if !ca.exposureAnalysis {
return eval.NewPolicyEngineWithObjects(objectsList)
}
// else build new policy engine with exposure analysis
pe := eval.NewPolicyEngineWithOptions(ca.exposureAnalysis)
// add objects from real resources
err := pe.AddObjects(objectsList)
if err != nil {
return nil, err
}
// TODO: this will be eliminated when adding representative peers while policies upsert
// add representative resources
err = pe.SetExposureAnalysisResources()
return pe, err
}

func (ca *ConnlistAnalyzer) connslistFromParsedResources(objectsList []parser.K8sObject) ([]Peer2PeerConnection, []Peer, error) {
pe, err := ca.getPolicyEngine(objectsList)
if err != nil {
ca.errors = append(ca.errors, newResourceEvaluationError(err))
return nil, nil, err
}
if ca.exposureAnalysis {
err = pe.SetExposureAnalysisResources()
if err != nil {
return nil, nil, err
}
}
ia, err := ingressanalyzer.NewIngressAnalyzerWithObjects(objectsList, pe, ca.logger, ca.muteErrsAndWarns)
if err != nil {
ca.errors = append(ca.errors, newResourceEvaluationError(err))
Expand Down Expand Up @@ -491,7 +503,7 @@ func (ca *ConnlistAnalyzer) existsFocusWorkload(peers []Peer, excludeIngressAnal
func (ca *ConnlistAnalyzer) getConnectionsBetweenPeers(pe *eval.PolicyEngine, peers []Peer) ([]Peer2PeerConnection, exposureMap, error) {
connsRes := make([]Peer2PeerConnection, 0)
exposuresMap := exposureMap{}
// sets for marking peer checked for ingress/egress exposure to entire cluster data once
// for exposure-analysis use: sets for marking peer checked for ingress/egress exposure to entire cluster data once
ingressSet := make(map[Peer]bool, 0)
egressSet := make(map[Peer]bool, 0)

Expand Down
69 changes: 46 additions & 23 deletions pkg/netpol/eval/check.go
Original file line number Diff line number Diff line change
Expand Up @@ -131,7 +131,7 @@ func (pe *PolicyEngine) AllAllowedConnectionsBetweenWorkloadPeers(srcPeer, dstPe
func (pe *PolicyEngine) allAllowedConnectionsBetweenPeers(srcPeer, dstPeer Peer) (*common.ConnectionSet, error) {
srcK8sPeer := srcPeer.(k8s.Peer)
dstK8sPeer := dstPeer.(k8s.Peer)
// sets of peers which their exposure to entire cluster is already checked
// for exposure-analysis use: sets of peers which their exposure to entire cluster is already checked
ingressSet := make(map[k8s.Peer]bool, 0)
egressSet := make(map[k8s.Peer]bool, 0)
var res *common.ConnectionSet
Expand Down Expand Up @@ -179,7 +179,7 @@ func (pe *PolicyEngine) getPoliciesSelectingPod(p *k8s.Pod, direction netv1.Poli
res = append(res, policy)
}
}
if len(res) > 0 {
if pe.exposureAnalysisFlag && len(res) > 0 {
p.UpdatePodXgressProtectedFlag(direction == netv1.PolicyTypeIngress)
}
return res, nil
Expand Down Expand Up @@ -268,45 +268,68 @@ func (pe *PolicyEngine) allallowedXgressConnections(src, dst k8s.Peer, isIngress

// iterate relevant network policies (that capture the required pod)
for _, policy := range netpols {
// if isIngress: check for ingress rules that capture src within 'from'
// if not isIngress: check for egress rules that capture dst within 'to'
// collect the allowed connectivity from the relevant rules into allowedConns
var policyAllowedConnectionsPerDirection *common.ConnectionSet
var err error
if isIngress {
// policy selecting dst
policyAllowedConnectionsPerDirection, err = policy.GetIngressAllowedConns(src, dst)
if !ingressSet[dst] {
// in case of exposure-analysis: update exposure data for relevant pod
if pe.exposureAnalysisFlag {
if isIngress && !ingressSet[dst] {
// policy selecting dst (dst pod is real)
// if this is first time scanning policies selecting this dst peer,
// update its ingress entire cluster connection relying on policy data
dst.GetPeerPod().UpdatePodXgressExposureToEntireClusterData(policy.IngressGeneralConns.EntireClusterConns, isIngress)
}
} else {
// policy selecting src
policyAllowedConnectionsPerDirection, err = policy.GetEgressAllowedConns(dst)
if !egressSet[src] {
if !isIngress && !egressSet[src] {
// policy selecting src
// if this is first time scanning policies selecting this src peer,
// update its egress entire cluster connection relying on policy data
src.GetPeerPod().UpdatePodXgressExposureToEntireClusterData(policy.EgressGeneralConns.EntireClusterConns, isIngress)
}
}
// determine policy's allowed connections between the peers for the direction
// if isIngress: check for ingress rules that capture src within 'from'
// if not isIngress: check for egress rules that capture dst within 'to'
// collect the allowed connectivity from the relevant rules into allowedConns
policyAllowedConnectionsPerDirection, err := determineAllowedConnsPerDirection(policy, src, dst, isIngress)
if err != nil {
return allowedConns, err
}
allowedConns.Union(policyAllowedConnectionsPerDirection)
}
if isIngress {
// after looping the policies selecting this dst for first time, the exposure to entire cluster on ingress is computed;
// no need to compute again
ingressSet[dst] = true
} else {
// after looping the policies selecting this src for first time, the exposure to entire cluster on egress is computed;
// no need to compute again
egressSet[src] = true
if pe.exposureAnalysisFlag {
if isIngress {
// after looping the policies selecting this dst for first time, the exposure to entire cluster on ingress is computed;
// no need to compute again
ingressSet[dst] = true
} else {
// after looping the policies selecting this src for first time, the exposure to entire cluster on egress is computed;
// no need to compute again
egressSet[src] = true
}
}
return allowedConns, nil
}

// determineAllowedConnsPerDirection - helping func, determine policy's allowed connections between the peers for the direction
func determineAllowedConnsPerDirection(policy *k8s.NetworkPolicy, src, dst k8s.Peer, isIngress bool) (*common.ConnectionSet, error) {
if isIngress {
switch {
case policy.IngressGeneralConns.AllDestinationsConns.AllowAll:
return policy.IngressGeneralConns.AllDestinationsConns, nil
case policy.IngressGeneralConns.EntireClusterConns.AllowAll && src.PeerType() == k8s.PodType:
return policy.IngressGeneralConns.EntireClusterConns, nil
default:
return policy.GetIngressAllowedConns(src, dst)
}
}
// else egress
switch {
case policy.EgressGeneralConns.AllDestinationsConns.AllowAll:
return policy.EgressGeneralConns.AllDestinationsConns, nil
case policy.EgressGeneralConns.EntireClusterConns.AllowAll && dst.PeerType() == k8s.PodType:
return policy.EgressGeneralConns.EntireClusterConns, nil
default:
return policy.GetEgressAllowedConns(dst)
}
}

// isPeerNodeIP returns true if peer1 is an IP address of a node and peer2 is a pod on that node
func isPeerNodeIP(peer1, peer2 k8s.Peer) bool {
return peer2.PeerType() == k8s.PodType && peer1.PeerType() == k8s.IPBlockType &&
Expand Down
12 changes: 8 additions & 4 deletions pkg/netpol/eval/internal/k8s/netpol.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ import (
// -> might help to preprocess and store peers that match policy selectors + selectors in rules + set of allowed connections per rule
type NetworkPolicy struct {
*netv1.NetworkPolicy // embedding k8s network policy object
// following data stored in preprocessing;
// following data stored in preprocessing when exposure-analysis is on;
// IngressGeneralConns contains:
// - the maximal connection-set which the policy's rules allow to all destinations on ingress direction
// - the maximal connection-set which the policy's rules allow to all namespaces in the cluster on ingress direction
Expand All @@ -48,6 +48,10 @@ type NetworkPolicy struct {
EgressGeneralConns PolicyGeneralRulesConns
}

// @todo might help if while pre-process, to check containment of all rules' connections; if all "specific" rules
// connections are contained in the "general" rules connections, then we can avoid iterating policy rules for computing
// connections between two peers

type PolicyGeneralRulesConns struct {
// AllDestinationsConns contains the maximal connection-set which the policy's rules allow to all destinations
// (all namespaces, pods and IP addresses)
Expand Down Expand Up @@ -288,7 +292,7 @@ func (np *NetworkPolicy) EgressAllowedConn(dst Peer, protocol, port string) (boo
return false, nil
}

// GetEgressAllowedConns returns the set of allowed connetions from any captured pod to the destination peer
// GetEgressAllowedConns returns the set of allowed connections from any captured pod to the destination peer
func (np *NetworkPolicy) GetEgressAllowedConns(dst Peer) (*common.ConnectionSet, error) {
res := common.MakeConnectionSet(false)
for _, rule := range np.Spec.Egress {
Expand Down Expand Up @@ -438,7 +442,7 @@ func (np *NetworkPolicy) fullName() string {
}

// /////////////////////////////////////////////////////////////////////////////////////////////
// pre-processing computations - currently performed for exposure-analysis goals;
// pre-processing computations - currently performed for exposure-analysis goals only;

// DetermineGeneralConnectionsOfPolicy scans policy rules and updates if it allows conns with all destinations or/ and with entire cluster
// for ingress and/ or egress directions
Expand Down Expand Up @@ -499,7 +503,7 @@ func (np *NetworkPolicy) scanRulesForGeneralEgressConns() error {
return nil
}

// doRulesExposeToAllDestOrEntireCluster checks if the given rules list is exposed to entire cluster;
// doRulesExposeToAllDestOrEntireCluster checks if the given rules list is exposed to entire world or entire cluster;
// i.e. if the rules list is empty or if there is a rule with empty namespaceSelector
// this func assumes rules are legal (rules correctness check occurs later)
func (np *NetworkPolicy) doRulesExposeToAllDestOrEntireCluster(rules []netv1.NetworkPolicyPeer) (alldests, entireCluster bool) {
Expand Down
32 changes: 21 additions & 11 deletions pkg/netpol/eval/internal/k8s/pod.go
Original file line number Diff line number Diff line change
Expand Up @@ -280,17 +280,9 @@ func (pod *Pod) UpdatePodXgressExposureToEntireClusterData(ruleConns *common.Con
if isIngress {
// for a dst pod check if the given ruleConns contains namedPorts; if yes replace them with pod's
// matching port number
connNamedPorts := ruleConns.GetNamedPorts()
if len(connNamedPorts) > 0 {
ruleConnsCopy := ruleConns.Copy() // copying the connectionSet; in order to replace
// the named ports with pod's port numbers
for protocol, namedPorts := range connNamedPorts {
for _, namedPort := range namedPorts {
portNum := pod.ConvertPodNamedPort(namedPort)
ruleConnsCopy.ReplaceNamedPortWithMatchingPortNum(protocol, namedPort, portNum)
}
}
pod.IngressExposureData.EntireClusterConnection.Union(ruleConnsCopy)
convertedConns := pod.checkAndConvertNamedPortsInConnection(ruleConns)
if convertedConns != nil {
pod.IngressExposureData.EntireClusterConnection.Union(convertedConns)
} else {
pod.IngressExposureData.EntireClusterConnection.Union(ruleConns)
}
Expand All @@ -299,6 +291,24 @@ func (pod *Pod) UpdatePodXgressExposureToEntireClusterData(ruleConns *common.Con
}
}

// checkAndConvertNamedPortsInConnection returns the copy of the given connectionSet where named ports are converted;
// returns nil if the given connectionSet has no named ports
func (pod *Pod) checkAndConvertNamedPortsInConnection(conns *common.ConnectionSet) *common.ConnectionSet {
connNamedPorts := conns.GetNamedPorts()
if len(connNamedPorts) == 0 {
return nil
} // else - found named ports
connsCopy := conns.Copy() // copying the connectionSet; in order to replace
// the named ports with pod's port numbers
for protocol, namedPorts := range connNamedPorts {
for _, namedPort := range namedPorts {
portNum := pod.ConvertPodNamedPort(namedPort)
connsCopy.ReplaceNamedPortWithMatchingPortNum(protocol, namedPort, portNum)
}
}
return connsCopy
}

// updatePodXgressProtectedFlag updates to true the relevant ingress/egress protected flag of the pod
func (pod *Pod) UpdatePodXgressProtectedFlag(isIngress bool) {
if isIngress {
Expand Down
31 changes: 23 additions & 8 deletions pkg/netpol/eval/resources.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,21 @@ func NewPolicyEngine() *PolicyEngine {

func NewPolicyEngineWithObjects(objects []parser.K8sObject) (*PolicyEngine, error) {
pe := NewPolicyEngine()
err := pe.AddObjects(objects)
return pe, err
}

// NewPolicyEngineWithOptions returns a new policy engine with an empty state but updating the exposure analysis flag,
// TBD: currently exposure-analysis is the only option supported by policy-engine, so no need for options param,
// should have the exposureFlag or just assign to true in the func?
func NewPolicyEngineWithOptions(exposureFlag bool) *PolicyEngine {
pe := NewPolicyEngine()
pe.exposureAnalysisFlag = exposureFlag
return pe
}

// AddObjects adds k8s objects from parsed resources to the policy engine
func (pe *PolicyEngine) AddObjects(objects []parser.K8sObject) error {
var err error
for _, obj := range objects {
switch obj.Kind {
Expand Down Expand Up @@ -85,17 +100,15 @@ func NewPolicyEngineWithObjects(objects []parser.K8sObject) (*PolicyEngine, erro
fmt.Printf("ignoring resource kind %s", obj.Kind)
}
if err != nil {
return nil, err
return err
}
}
err = pe.resolveMissingNamespaces()
return pe, err
return pe.resolveMissingNamespaces()
}

// SetExposureAnalysisResources sets the new representative peers needed for exposure analysis
// TODO: changes may be done on optimizing PR (will the flag be relevant after optimize? , should have these both funcs?..)
// TODO: changes may be done on optimizing PR (should have these both funcs?)
func (pe *PolicyEngine) SetExposureAnalysisResources() error {
pe.exposureAnalysisFlag = true
// scan policies' rules for new pods in (unmatched) namespaces (TODO : and unmatched pods in un/matched namespaces)
return pe.addRepresentativePods()
}
Expand Down Expand Up @@ -335,10 +348,12 @@ func (pe *PolicyEngine) upsertNetworkPolicy(np *netv1.NetworkPolicy) error {
}
pe.netpolsMap[netpolNamespace][np.Name] = newNetpol

// scan policy ingress and egress rules to store allowed connections
// for exposure analysis only: scan policy ingress and egress rules to store allowed connections
// to entire cluster and to all destinations (if such connections are allowed by the policy)
err := newNetpol.DetermineGeneralConnectionsOfPolicy()

var err error
if pe.exposureAnalysisFlag {
err = newNetpol.DetermineGeneralConnectionsOfPolicy()
}
// clear the cache on netpols changes
pe.cache.clear()
return err
Expand Down

0 comments on commit da21e1b

Please sign in to comment.