Skip to content

Commit

Permalink
adding representativePeer struct (#309)
Browse files Browse the repository at this point in the history
* adding representativePeer struct

* Update pkg/netpol/eval/internal/k8s/peer.go

Co-authored-by: Adi Sosnovich <[email protected]>

* Update pkg/netpol/eval/resources.go

Co-authored-by: Adi Sosnovich <[email protected]>

* Update pkg/netpol/eval/resources.go

Co-authored-by: Adi Sosnovich <[email protected]>

* gofmt

* connlist if-else fix

* fixes to exposure_map.go

* fixing connlist includePairOfWorkloads

* fix GetPeerList() - separate GetRepresentativePeersList

* comment how Pod of representative peer is originated

---------

Co-authored-by: Adi Sosnovich <[email protected]>
  • Loading branch information
shireenf-ibm and adisos authored Feb 21, 2024
1 parent a7a9eb2 commit 71349cc
Show file tree
Hide file tree
Showing 10 changed files with 185 additions and 88 deletions.
6 changes: 5 additions & 1 deletion pkg/internal/netpolerrors/netpol_errors.go
Original file line number Diff line number Diff line change
Expand Up @@ -98,7 +98,11 @@ func MissingNamespaceErrStr(peerStr string) string {

// NotPeerErrStr returns error string of a peer that is not workload peer
func NotPeerErrStr(peerStr string) string {
return "peer: " + peerStr + ",is not a WorkloadPeer"
return "peer: " + peerStr + ", is not a WorkloadPeer"
}

func NotRepresentativePeerErrStr(peerStr string) string {
return peerStr + ", is not a Representative peer"
}

// BothSrcAndDstIPsErrStr returns error string that conn from ip to ip is not supported
Expand Down
62 changes: 32 additions & 30 deletions pkg/netpol/connlist/connlist.go
Original file line number Diff line number Diff line change
Expand Up @@ -366,7 +366,7 @@ func GetConnectionSetFromP2PConnection(c Peer2PeerConnection) *common.Connection

//////////////////////////////////////////////////////////////////////////////////////////////

func (ca *ConnlistAnalyzer) includePairOfWorkloads(src, dst eval.Peer) bool {
func (ca *ConnlistAnalyzer) includePairOfWorkloads(pe *eval.PolicyEngine, src, dst eval.Peer) bool {
if src.IsPeerIPType() && dst.IsPeerIPType() {
return false
}
Expand All @@ -375,8 +375,8 @@ func (ca *ConnlistAnalyzer) includePairOfWorkloads(src, dst eval.Peer) bool {
if src.String() == dst.String() {
return false
}
// when exposure-analysis, skip conns between fake pods (exposure or ingress-controller) or ip-peer and fake pods
if ca.exposureAnalysis && ca.hasFakePodsAndIPs(src, dst) {
// when exposure-analysis, skip conns between fake pods or ip-peer and fake pods
if ca.exposureAnalysis && ca.hasFakePodsAndIPs(pe, src, dst) {
return false
}
if ca.focusWorkload == "" {
Expand All @@ -386,21 +386,20 @@ func (ca *ConnlistAnalyzer) includePairOfWorkloads(src, dst eval.Peer) bool {
return ca.isPeerFocusWorkload(src) || ca.isPeerFocusWorkload(dst)
}

// TODO : enhance this after implementing representative peers
func (ca *ConnlistAnalyzer) hasFakePodsAndIPs(src, dst eval.Peer) bool {
if src.IsPeerIPType() && dst.Name() == common.PodInRepNs {
func (ca *ConnlistAnalyzer) hasFakePodsAndIPs(pe *eval.PolicyEngine, src, dst eval.Peer) bool {
// if both peers are Representative
if pe.IsRepresentativePeer(src) && pe.IsRepresentativePeer(dst) {
return true
}
if src.Name() == common.PodInRepNs && dst.IsPeerIPType() {
// if one peer is IP and the other is a representative peer
if (src.IsPeerIPType() || dst.IsPeerIPType()) &&
(pe.IsRepresentativePeer(src) || pe.IsRepresentativePeer(dst)) {
return true
}
if src.Name() == common.PodInRepNs && dst.Name() == common.PodInRepNs {
return true
}
if src.Name() == common.PodInRepNs && dst.Name() == common.IngressPodName {
return true
}
if src.Name() == common.IngressPodName && dst.Name() == common.PodInRepNs {
// if one peer is fake ingress-pod and the other is a representative peer
// todo: might check if peer is a fake ingress-controller by checking name and fakePod flag (within new pe func)
if (src.Name() == common.IngressPodName || dst.Name() == common.IngressPodName) &&
(pe.IsRepresentativePeer(src) || pe.IsRepresentativePeer(dst)) {
return true
}
return false
Expand Down Expand Up @@ -435,6 +434,15 @@ func (ca *ConnlistAnalyzer) getConnectionsList(pe *eval.PolicyEngine, ia *ingres
peers[i] = p
}

// connPeers represents []connlist.Peer to be sent to ca.getConnectionsBetweenPeers
connPeers := peers
if ca.exposureAnalysis {
representativePeers := pe.GetRepresentativePeersList()
for _, p := range representativePeers {
connPeers = append(connPeers, p)
}
}

excludeIngressAnalysis := (ia == nil || ia.IsEmpty())

// if ca.focusWorkload is not empty, check if it exists in the peers before proceeding
Expand All @@ -447,7 +455,7 @@ func (ca *ConnlistAnalyzer) getConnectionsList(pe *eval.PolicyEngine, ia *ingres

// compute connections between peers based on pe analysis of network policies
// if exposure-analysis is on, also compute and return the exposures-map
peersAllowedConns, exposuresMap, err := ca.getConnectionsBetweenPeers(pe, peers)
peersAllowedConns, exposuresMap, err := ca.getConnectionsBetweenPeers(pe, connPeers)
if err != nil {
ca.errors = append(ca.errors, newResourceEvaluationError(err))
return nil, nil, err
Expand Down Expand Up @@ -511,7 +519,7 @@ func (ca *ConnlistAnalyzer) getConnectionsBetweenPeers(pe *eval.PolicyEngine, pe
srcPeer := peers[i]
for j := range peers {
dstPeer := peers[j]
if !ca.includePairOfWorkloads(srcPeer, dstPeer) {
if !ca.includePairOfWorkloads(pe, srcPeer, dstPeer) {
continue
}
allowedConnections, err := pe.AllAllowedConnectionsBetweenWorkloadPeers(srcPeer, dstPeer)
Expand Down Expand Up @@ -555,7 +563,7 @@ func (ca *ConnlistAnalyzer) getIngressAllowedConnections(ia *ingressanalyzer.Ing
}
for peerStr, peerAndConn := range ingressConns {
// refines to only relevant connections if ca.focusWorkload is not empty
if !ca.includePairOfWorkloads(ingressControllerPod, peerAndConn.Peer) {
if !ca.includePairOfWorkloads(pe, ingressControllerPod, peerAndConn.Peer) {
continue
}
// compute allowed connections based on pe.policies to the peer, then intersect the conns with
Expand Down Expand Up @@ -606,21 +614,15 @@ func (ca *ConnlistAnalyzer) checkIfP2PConnOrExposureConn(pe *eval.PolicyEngine,
}
// else exposure analysis is on

// TODO : enhance this if condition after implementing eval.RepresentativePeer
if src.Name() != common.PodInRepNs && dst.Name() != common.PodInRepNs {
if !pe.IsRepresentativePeer(src) && !pe.IsRepresentativePeer(dst) {
// both src and dst are peers are found in the parsed resources
return createConnectionObject(allowedConnections, src, dst), nil
}
// else: one of the peers is inferred from a netpol-rule , and the other is a peer from the parsed resources
// else: one of the peers is a representative peer (inferred from a netpol-rule) ,
// and the other is a peer from the parsed resources
// an exposure analysis connection
var err error
if src.Name() != common.PodInRepNs {
// dst is the inferred from netpol peer, we have an exposed egress for the src peer
err = exposuresMap.addConnToExposureMap(pe, allowedConnections, src, dst, false)
} else {
// src is the inferred from netpol peer, we have an exposed ingress to the dst peer
err = exposuresMap.addConnToExposureMap(pe, allowedConnections, src, dst, true)
}
isIngress := pe.IsRepresentativePeer(src)
err := exposuresMap.addConnToExposureMap(pe, allowedConnections, src, dst, isIngress)
return nil, err
}

Expand All @@ -646,15 +648,15 @@ func updatePeersGeneralExposureData(pe *eval.PolicyEngine, src, dst Peer, ingres
// (e.g. only one peer with one netpol exposing the peer to entire cluster, no netpols)
var err error
// 1. only on first time : add general exposure data for the src peer (on egress)
if !src.IsPeerIPType() && src.Name() != common.PodInRepNs && !egressSet[src] {
if !src.IsPeerIPType() && !pe.IsRepresentativePeer(src) && !egressSet[src] {
err = exMap.addPeerGeneralExposure(pe, src, false)
if err != nil {
return err
}
}
egressSet[src] = true
// 2. only on first time : add general exposure data for the dst peer (on ingress)
if !dst.IsPeerIPType() && dst.Name() != common.PodInRepNs && !ingressSet[dst] {
if !dst.IsPeerIPType() && !pe.IsRepresentativePeer(dst) && !ingressSet[dst] {
err = exMap.addPeerGeneralExposure(pe, dst, true)
}
ingressSet[dst] = true
Expand Down
19 changes: 10 additions & 9 deletions pkg/netpol/connlist/exposure_map.go
Original file line number Diff line number Diff line change
Expand Up @@ -90,11 +90,11 @@ func (ex exposureMap) addPeerXgressEntireClusterExp(pe *eval.PolicyEngine, peer
// finally the map will include refined lists of ingress and egress exposure connections for each workload peer
func (ex exposureMap) addConnToExposureMap(pe *eval.PolicyEngine, allowedConnections common.Connection, src, dst Peer,
isIngress bool) error {
peer := src // real peer
inferredPeer := dst // inferred from netpol rule
peer := src // real peer
representativePeer := dst // inferred from netpol rule
if isIngress {
peer = dst
inferredPeer = src
representativePeer = src
}
// if peer is not protected return
protected, err := pe.IsPeerProtected(peer, isIngress)
Expand Down Expand Up @@ -122,18 +122,19 @@ func (ex exposureMap) addConnToExposureMap(pe *eval.PolicyEngine, allowedConnect
if _, ok := ex[peer]; !ok {
ex.initiatePeerEntry(peer)
}

nsLabels, err := pe.GetPeerNsLabels(representativePeer)
if err != nil {
return err
}
// store connection data
expData := &xgressExposure{
exposedToEntireCluster: false,
namespaceLabels: pe.GetPeerNsLabels(inferredPeer),
namespaceLabels: nsLabels,
podLabels: map[string]string{}, // will be empty since in this branch rules with namespaceSelectors only supported
potentialConn: allowedConnSet,
}
if isIngress {
ex.appendPeerXgressExposureData(peer, expData, true)
} else { // egress
ex.appendPeerXgressExposureData(peer, expData, false)
}
ex.appendPeerXgressExposureData(peer, expData, isIngress)
return nil
}

Expand Down
49 changes: 33 additions & 16 deletions pkg/netpol/eval/check.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,16 +63,33 @@ func (pe *PolicyEngine) CheckIfAllowed(src, dst, protocol, port string) (bool, e
return ingressRes, nil
}

func (pe *PolicyEngine) convertWorkloadPeerToPodPeer(peer Peer) (*k8s.PodPeer, error) {
if workloadPeer, ok := peer.(*k8s.WorkloadPeer); ok {
podNamespace, ok := pe.namspacesMap[workloadPeer.Pod.Namespace]
if !ok {
return nil, errors.New(netpolerrors.MissingNamespaceErrStr(workloadPeer.String()))
}
podPeer := &k8s.PodPeer{Pod: workloadPeer.Pod, NamespaceObject: podNamespace}
return podPeer, nil
func (pe *PolicyEngine) convertPeerToPodPeer(peer Peer) (*k8s.PodPeer, error) {
var podObj *k8s.Pod
var podNamespace *k8s.Namespace
var err error
switch currPeer := peer.(type) {
case *k8s.WorkloadPeer:
podObj = currPeer.Pod
podNamespace, err = pe.getPeerNamespaceObject(currPeer.Pod.Namespace, currPeer.String())
case *k8s.RepresentativePeer:
podObj = currPeer.Pod
podNamespace, err = pe.getPeerNamespaceObject(currPeer.Pod.Namespace, currPeer.String())
default: // should not get here
return nil, errors.New(netpolerrors.InvalidPeerErrStr(peer.String()))
}
if err != nil {
return nil, err
}
podPeer := &k8s.PodPeer{Pod: podObj, NamespaceObject: podNamespace}
return podPeer, nil
}

func (pe *PolicyEngine) getPeerNamespaceObject(namespaceName, peerStr string) (*k8s.Namespace, error) {
podNamespace, ok := pe.namspacesMap[namespaceName]
if !ok {
return nil, errors.New(netpolerrors.MissingNamespaceErrStr(peerStr))
}
return nil, errors.New(netpolerrors.NotPeerErrStr(peer.String()))
return podNamespace, nil
}

// for connectivity considerations, when requesting allowed connections between 2 workload peers which are the same,
Expand All @@ -91,28 +108,28 @@ func (pe *PolicyEngine) changePodPeerToAnotherPodObject(peer *k8s.PodPeer) {
// expecting that srcPeer and dstPeer are in level of workloads (WorkloadPeer)
func (pe *PolicyEngine) AllAllowedConnectionsBetweenWorkloadPeers(srcPeer, dstPeer Peer) (common.Connection, error) {
if srcPeer.IsPeerIPType() && !dstPeer.IsPeerIPType() {
// assuming dstPeer is WorkloadPeer, should be converted to k8s.Peer
dstPodPeer, err := pe.convertWorkloadPeerToPodPeer(dstPeer)
// assuming dstPeer is WorkloadPeer/RepresentativePeer, should be converted to k8s.Peer
dstPodPeer, err := pe.convertPeerToPodPeer(dstPeer)
if err != nil {
return nil, err
}
return pe.allAllowedConnectionsBetweenPeers(srcPeer, dstPodPeer)
}
if dstPeer.IsPeerIPType() && !srcPeer.IsPeerIPType() {
// assuming srcPeer is WorkloadPeer, should be converted to k8s.Peer
srcPodPeer, err := pe.convertWorkloadPeerToPodPeer(srcPeer)
// assuming srcPeer is WorkloadPeer/RepresentativePeer, should be converted to k8s.Peer
srcPodPeer, err := pe.convertPeerToPodPeer(srcPeer)
if err != nil {
return nil, err
}
return pe.allAllowedConnectionsBetweenPeers(srcPodPeer, dstPeer)
}
if !dstPeer.IsPeerIPType() && !srcPeer.IsPeerIPType() {
// assuming srcPeer and dstPeer are WorkloadPeer, should be converted to k8s.Peer
srcPodPeer, err := pe.convertWorkloadPeerToPodPeer(srcPeer)
// assuming srcPeer and dstPeer are WorkloadPeer/RepresentativePeer, should be converted to k8s.Peer
srcPodPeer, err := pe.convertPeerToPodPeer(srcPeer)
if err != nil {
return nil, err
}
dstPodPeer, err := pe.convertWorkloadPeerToPodPeer(dstPeer)
dstPodPeer, err := pe.convertPeerToPodPeer(dstPeer)
if err != nil {
return nil, err
}
Expand Down
25 changes: 20 additions & 5 deletions pkg/netpol/eval/exposure.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,9 @@ func (pe *PolicyEngine) addPodsForUnmatchedRules(policyName string, policy *k8s.
return err
}

// any fake namespace added will start with following prefix for ns name and following pod name
const repNsNamePrefix = "representative-namespace-"

// gets a list of policy xgress rules consisted only from namespaceSelector.
// adds new pod for each selector that does not have a matching namespace in the resources
func (pe *PolicyEngine) addPodsForUnmatchedNamespaceSelectors(nsSelectors []*metav1.LabelSelector, policyName string) error {
Expand All @@ -64,7 +67,7 @@ func (pe *PolicyEngine) addPodsForUnmatchedNamespaceSelectors(nsSelectors []*met
}
foundNs := pe.checkNamespaceSelectorsMatch(selectorMap)
if !foundNs {
_, err = pe.AddPodByNameAndNamespace(common.PodInRepNs, common.NsNamePrefix+policyName+fmt.Sprint(i), selectorMap)
_, err = pe.AddPodByNameAndNamespace(k8s.RepresentativePodName, repNsNamePrefix+policyName+fmt.Sprint(i), selectorMap)
if err != nil {
return err
}
Expand Down Expand Up @@ -127,8 +130,20 @@ func (pe *PolicyEngine) GetPeerXgressEntireClusterConn(p Peer, isIngress bool) (
return peer.Pod.EgressExposureData.EntireClusterConnection, nil
}

// TODO these functions will be changed in a later PR (when implementing RepresentativePeer)
func (pe *PolicyEngine) GetPeerNsLabels(p Peer) map[string]string {
peer := p.(*k8s.WorkloadPeer)
return peer.Pod.ExposureNsLabels
/////////////////////////////////////////////

// IsRepresentativePeer returns whether the peer is representative peer (inferred from netpol rule)
func (pe *PolicyEngine) IsRepresentativePeer(peer Peer) bool {
_, ok := peer.(*k8s.RepresentativePeer)
return ok
}

// GetPeerNsLabels returns namespace labels defining the given representative peer
// relevant only for RepresentativePeer
func (pe *PolicyEngine) GetPeerNsLabels(p Peer) (map[string]string, error) {
peer, ok := p.(*k8s.RepresentativePeer)
if !ok { // should not get here
return nil, errors.New(netpolerrors.NotRepresentativePeerErrStr(p.String()))
}
return peer.PotentialNamespaceLabels, nil
}
14 changes: 11 additions & 3 deletions pkg/netpol/eval/internal/k8s/netpol.go
Original file line number Diff line number Diff line change
Expand Up @@ -131,10 +131,10 @@ func (np *NetworkPolicy) ruleConnections(rulePorts []netv1.NetworkPolicyPort, ds
if err != nil {
return res, err
}
if dst == nil && portName != "" {
if (dst == nil || isRepresentativePod(dst)) && portName != "" {
// adding namedPort to connectionSet in case of :
// - dst is nil - for general connections;
// - TODO: if dst is a fake pod (representative)
// - dst is nil - for general connections;
// - if dst is a representative pod (its namedPorts are unknown)
ports.AddPort(intstr.FromString(portName))
}
if !isEmptyPortRange(startPort, endPort) {
Expand All @@ -146,6 +146,14 @@ func (np *NetworkPolicy) ruleConnections(rulePorts []netv1.NetworkPolicyPort, ds
return res, nil
}

// isRepresentativePod determines if the peer's source is representativePeer; i.e. its pod fake and has RepresentativePodName
func isRepresentativePod(peer Peer) bool {
if peer.GetPeerPod() == nil {
return false
}
return peer.GetPeerPod().FakePod && peer.GetPeerPod().Name == RepresentativePodName
}

// ruleConnsContain returns true if the given protocol and port are contained in connections allowed by rulePorts
func (np *NetworkPolicy) ruleConnsContain(rulePorts []netv1.NetworkPolicyPort, protocol, port string, dst Peer) (bool, error) {
if len(rulePorts) == 0 {
Expand Down
Loading

0 comments on commit 71349cc

Please sign in to comment.