Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

More Pod attributes for PDP: SA and labels #665

Merged
merged 10 commits into from
Jul 17, 2024
55 changes: 41 additions & 14 deletions pkg/controlplane/authz/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,10 @@ const (
// the number of seconds a JWT access token is valid before it expires.
jwtExpirySeconds = 5

ClientNameLabel = "clusterlink/metadata.clientName"
elevran marked this conversation as resolved.
Show resolved Hide resolved
ClientNamespaceLabel = "clusterlink/metadata.clientNamespace"
ClientSALabel = "clusterlink/metadata.clientServiceAccount"
ClientLabelsPrefix = "client/metadata."
elevran marked this conversation as resolved.
Show resolved Hide resolved
ServiceNameLabel = "clusterlink/metadata.serviceName"
ServiceNamespaceLabel = "clusterlink/metadata.serviceNamespace"
ServiceLabelsPrefix = "service/metadata."
Expand Down Expand Up @@ -84,9 +88,10 @@ type ingressAuthorizationResponse struct {
}

type podInfo struct {
name string
namespace string
labels map[string]string
name string
namespace string
serviceAccount string
labels map[string]string
}

// Manager manages the authorization dataplane connections.
Expand Down Expand Up @@ -167,7 +172,12 @@ func (m *Manager) addPod(pod *v1.Pod) {
defer m.podLock.Unlock()

podID := types.NamespacedName{Name: pod.Name, Namespace: pod.Namespace}
m.podList[podID] = podInfo{name: pod.Name, namespace: pod.Namespace, labels: pod.Labels}
m.podList[podID] = podInfo{
name: pod.Name,
namespace: pod.Namespace,
labels: pod.Labels,
serviceAccount: pod.Spec.ServiceAccountName,
}
for _, ip := range pod.Status.PodIPs {
// ignoring host-networked Pod IPs
if ip.IP != pod.Status.HostIP {
Expand Down Expand Up @@ -218,20 +228,37 @@ func (m *Manager) getPodInfoByIP(ip string) *podInfo {
return nil
}

func (m *Manager) getClientAttributes(req *egressAuthorizationRequest) connectivitypdp.WorkloadAttrs {
clientAttrs := connectivitypdp.WorkloadAttrs{GatewayNameLabel: m.getPeerName()}
podInfo := m.getPodInfoByIP(req.IP)
if podInfo == nil {
m.logger.Infof("Pod has no info: IP=%v.", req.IP)
return clientAttrs // better return an error here?
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Returning an error will simply yield a RST to the client connection.
I think it's better to keep it the way it is (which still allows for allow-all policies to proceed).

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is a bit problematic when having policies with negative selectors (using the NotIn and DoesNotExist operators).
E.g., I have a policy to deny all connections, except from those originating from namespace foo.
Connections from IPs for which CL cannot link a Pod, will always be allowed.

apiVersion: clusterlink.net/v1alpha1
kind: AccessPolicy
metadata:
  name: deny-except-from-foo
spec:
  action: deny
  from:
  - workloadSelector:
      matchExpressions:
      - key: clusterlink/metadata.clientNamespace
        operator: NotIn
        value: foo
  to:
  - workloadSelector: {}

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Right.
I think the right thing to do is to do a non-cached GET from the API-server to get the pod info in this case.
I did not find a way to do this via the given client. This will be the best option.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Found this:
kubernetes-sigs/controller-runtime#585 (comment)

So there's a non-cached reader, but it's not available to authz.Manager.
Need to propagate it from cl-controlplane/app/server.go.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Apparently, even this is not good enough. Pod may start running and send requests, even before its IP is updated in etcd. From what I see, some non-cached List() calls return the Pod with its Node IP, but without its Pod IP. It only gets updated later.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How long is this window of a working pod with etcd not yet updated?
If not long, maybe we can stall a bit to wait for the update.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Or deny and let the client retry?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I changed the implementation: now if the client has no attributes (no pod info) AND the PDP has at least one policy which depends on client attributes, the request will be denied, and the client will have to retry.
Note that even if the PDP has no attribute-dependent policies, attribute-less requests can still be denied, depending on the policies (e.g., DENY policies take precedence, or no ALLOW policies)
This is enforced both on egress and on ingress.

}

clientAttrs[ServiceNamespaceLabel] = podInfo.namespace // deprecated
clientAttrs[ClientNamespaceLabel] = podInfo.namespace
clientAttrs[ClientSALabel] = podInfo.serviceAccount

if src, ok := podInfo.labels["app"]; ok {
clientAttrs[ServiceNameLabel] = src // deprecated
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What does it take to remove the deprecated attributes?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think they are being used currently in some experiments, so better wait a bit.
They will not appear in the documentation.

clientAttrs[ClientNameLabel] = src
}

for k, v := range podInfo.labels {
clientAttrs[ClientLabelsPrefix+k] = v
}

m.logger.Infof("Client attributes: %v.", clientAttrs)

return clientAttrs
}

// authorizeEgress authorizes a request for accessing an imported service.
func (m *Manager) authorizeEgress(ctx context.Context, req *egressAuthorizationRequest) (*egressAuthorizationResponse, error) {
m.logger.Infof("Received egress authorization request: %v.", req)

srcAttributes := connectivitypdp.WorkloadAttrs{GatewayNameLabel: m.getPeerName()}
podInfo := m.getPodInfoByIP(req.IP)
if podInfo != nil {
srcAttributes[ServiceNamespaceLabel] = podInfo.namespace

if src, ok := podInfo.labels["app"]; ok { // TODO: Add support for labels other than just the "app" key.
m.logger.Infof("Received egress authorization srcLabels[app]: %v.", podInfo.labels["app"])
srcAttributes[ServiceNameLabel] = src
}
}
srcAttributes := m.getClientAttributes(req)

var imp v1alpha1.Import
if err := m.client.Get(ctx, req.ImportName, &imp); err != nil {
Expand Down
16 changes: 15 additions & 1 deletion tests/e2e/k8s/services/httpecho/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,8 @@ import (
"github.com/clusterlink-net/clusterlink/tests/e2e/k8s/util"
)

const EchoClientPodName = "echo-client"

func GetEchoValue(cluster *util.KindCluster, server *util.Service) (string, error) {
port, err := cluster.ExposeNodeport(server)
if err != nil {
Expand Down Expand Up @@ -77,10 +79,22 @@ func GetEchoValue(cluster *util.KindCluster, server *util.Service) (string, erro

func RunClientInPod(cluster *util.KindCluster, server *util.Service) (string, error) {
body, err := cluster.RunPod(&util.Pod{
Name: "echo-client",
Name: EchoClientPodName,
Namespace: server.Namespace,
Image: "curlimages/curl",
Args: []string{"curl", "-s", "-m", "1", "http://" + server.Name},
})
return strings.TrimSpace(body), err
}

// Sleep allows more time for CL to be notified of the new Pod, so CL can retrieve the Pod's info.
func RunClientInPodWithSleep(cluster *util.KindCluster, server *util.Service) (string, error) {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think you need this function. If the non-sleep function fails, you will be covered by the allowRetry=true for the AccessService function.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How does the retry work when we run the client in a Pod?
Aren't we just creating Pods again and again?

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Right, my bad.
So maybe add a flag that removes the RestartPolicy: v1.RestartPolicyNever.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Right, my bad. So maybe add a flag that removes the RestartPolicy: v1.RestartPolicyNever.

This won't work.

How about using
--retry-delay 1 --retry-all-errors
in the curl pod command line?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes, this works.

body, err := cluster.RunPod(&util.Pod{
Name: EchoClientPodName,
Namespace: server.Namespace,
Image: "curlimages/curl",
Command: []string{"sh", "-c"},
Args: []string{"sleep 3 && curl -s -m 1 http://" + server.Name},
})
return strings.TrimSpace(body), err
}
92 changes: 68 additions & 24 deletions tests/e2e/k8s/test_policy.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ package k8s

import (
"github.com/stretchr/testify/require"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"

"github.com/clusterlink-net/clusterlink/pkg/apis/clusterlink.net/v1alpha1"
"github.com/clusterlink-net/clusterlink/pkg/controlplane/authz"
Expand All @@ -24,19 +25,7 @@ import (
)

func (s *TestSuite) TestPolicyLabels() {
cl, err := s.fabric.DeployClusterlinks(2, nil)
require.Nil(s.T(), err)

require.Nil(s.T(), cl[0].CreateService(&httpEchoService))
require.Nil(s.T(), cl[0].CreateExport(&httpEchoService))
require.Nil(s.T(), cl[1].CreatePeer(cl[0]))

importedService := &util.Service{
Name: httpEchoService.Name,
Port: 80,
Labels: httpEchoService.Labels,
}
require.Nil(s.T(), cl[1].CreateImport(importedService, cl[0], httpEchoService.Name))
cl, importedService := s.createTwoClustersWithEchoSvc()

// 1. Create a policy that allows traffic only to the echo service at cl[0] - apply in cl[1] (on egress)
// In addition, create a policy to only allow traffic from cl[1] - apply in cl[0] (on ingress)
Expand Down Expand Up @@ -143,20 +132,58 @@ func (s *TestSuite) TestPolicyLabels() {
require.Equal(s.T(), cl[0].Name(), data)
}

func (s *TestSuite) TestPrivilegedPolicies() {
cl, err := s.fabric.DeployClusterlinks(2, nil)
func (s *TestSuite) TestPodAttributes() {
cl, importedService := s.createTwoClustersWithEchoSvc()
require.Nil(s.T(), cl[0].CreatePolicy(util.PolicyAllowAll))
require.Nil(s.T(), cl[1].CreatePolicy(util.PolicyAllowAll))

// 1. Sanity - just test that a pod can connect to echo service
data, err := cl[1].AccessService(httpecho.RunClientInPodWithSleep, importedService, true, nil)
require.Nil(s.T(), err)
require.Equal(s.T(), cl[0].Name(), data)

require.Nil(s.T(), cl[0].CreateService(&httpEchoService))
require.Nil(s.T(), cl[0].CreateExport(&httpEchoService))
require.Nil(s.T(), cl[0].CreatePolicy(util.PolicyAllowAll))
require.Nil(s.T(), cl[1].CreatePeer(cl[0]))
// 2. Denying clients with a service account which is different from the Pod's SA - connection should work
srcLabels := map[string]string{
authz.ClientSALabel: "non-default",
}
denyNonDefaultSA := util.NewPolicy("deny-non-default-sa", v1alpha1.AccessPolicyActionDeny, srcLabels, nil)
require.Nil(s.T(), cl[0].CreatePolicy(denyNonDefaultSA))
require.Nil(s.T(), cl[1].CreatePolicy(denyNonDefaultSA))
_, err = cl[1].AccessService(httpecho.RunClientInPodWithSleep, importedService, true, nil)
require.Nil(s.T(), err)

importedService := &util.Service{
Name: httpEchoService.Name,
Port: 80,
// 3. Egress only - denying clients with a SA which equals the Pod's SA - connection should fail
srcLabels = map[string]string{
authz.ClientSALabel: "default",
}
require.Nil(s.T(), cl[1].CreateImport(importedService, cl[0], httpEchoService.Name))
denyDefaultSA := util.NewPolicy("deny-default-sa", v1alpha1.AccessPolicyActionDeny, srcLabels, nil)
require.Nil(s.T(), cl[1].CreatePolicy(denyDefaultSA))
_, err = cl[1].AccessService(httpecho.RunClientInPodWithSleep, importedService, true, nil)
require.NotNil(s.T(), err)
require.Nil(s.T(), cl[1].DeletePolicy(denyDefaultSA.Name)) // revert

// 4. Ingress only - denying clients with a SA which equals the Pod's SA - connection should fail
require.Nil(s.T(), cl[0].CreatePolicy(denyDefaultSA))
_, err = cl[1].AccessService(httpecho.RunClientInPodWithSleep, importedService, true, nil)
require.NotNil(s.T(), err)
require.Nil(s.T(), cl[0].DeletePolicy(denyDefaultSA.Name)) // revert

// 5. Egress only - denying client-pods with a label different from the client pod - connection should work
selRequirement := metav1.LabelSelectorRequirement{
Key: authz.ClientLabelsPrefix + "app",
Operator: metav1.LabelSelectorOpNotIn,
Values: []string{httpecho.EchoClientPodName},
}
labelSelector := metav1.LabelSelector{MatchExpressions: []metav1.LabelSelectorRequirement{selRequirement}}
denyOthers := util.NewPolicyFromLabelSelectors("deny-others", v1alpha1.AccessPolicyActionDeny, &labelSelector, nil)
require.Nil(s.T(), cl[1].CreatePolicy(denyOthers))
_, err = cl[1].AccessService(httpecho.RunClientInPodWithSleep, importedService, true, nil)
require.Nil(s.T(), err)
}

func (s *TestSuite) TestPrivilegedPolicies() {
cl, importedService := s.createTwoClustersWithEchoSvc()
require.Nil(s.T(), cl[0].CreatePolicy(util.PolicyAllowAll))

dstLabels := map[string]string{
authz.ServiceNameLabel: httpEchoService.Name,
Expand All @@ -180,7 +207,7 @@ func (s *TestSuite) TestPrivilegedPolicies() {
require.Nil(s.T(), cl[1].CreatePolicy(regAllowPolicy))

// 1. privileged deny has highest priority -> connection is denied
_, err = cl[1].AccessService(httpecho.GetEchoValue, importedService, true, &services.ConnectionResetError{})
_, err := cl[1].AccessService(httpecho.GetEchoValue, importedService, true, &services.ConnectionResetError{})
require.ErrorIs(s.T(), err, &services.ConnectionResetError{})

// 2. deleting privileged deny -> privileged allow now has highest priority -> connection is allowed
Expand Down Expand Up @@ -209,3 +236,20 @@ func (s *TestSuite) TestPrivilegedPolicies() {
_, err = cl[1].AccessService(httpecho.GetEchoValue, importedService, true, &services.ConnectionResetError{})
require.ErrorIs(s.T(), err, &services.ConnectionResetError{})
}

func (s *TestSuite) createTwoClustersWithEchoSvc() ([]*util.ClusterLink, *util.Service) {
cl, err := s.fabric.DeployClusterlinks(2, nil)
require.Nil(s.T(), err)

require.Nil(s.T(), cl[0].CreateService(&httpEchoService))
require.Nil(s.T(), cl[0].CreateExport(&httpEchoService))
require.Nil(s.T(), cl[1].CreatePeer(cl[0]))

importedService := &util.Service{
Name: httpEchoService.Name,
Port: 80,
Labels: httpEchoService.Labels,
}
require.Nil(s.T(), cl[1].CreateImport(importedService, cl[0], httpEchoService.Name))
return cl, importedService
}
9 changes: 6 additions & 3 deletions tests/e2e/k8s/util/kind.go
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,8 @@ type Pod struct {
Namespace string
// Image is the container image.
Image string
// Command is the command to execute on the container
Command []string
// Args are the container command line arguments.
Args []string
}
Expand Down Expand Up @@ -300,9 +302,10 @@ func (c *KindCluster) RunPod(podSpec *Pod) (string, error) {
Spec: v1.PodSpec{
RestartPolicy: v1.RestartPolicyNever,
Containers: []v1.Container{{
Name: podSpec.Name,
Image: podSpec.Image,
Args: podSpec.Args,
Name: podSpec.Name,
Image: podSpec.Image,
Command: podSpec.Command,
Args: podSpec.Args,
}},
},
})
Expand Down
24 changes: 22 additions & 2 deletions tests/e2e/k8s/util/policies.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,17 +26,37 @@ func NewPolicy(
action v1alpha1.AccessPolicyAction,
from, to map[string]string,
) *v1alpha1.AccessPolicy {
return NewPolicyFromLabelSelectors(
name,
action,
&metav1.LabelSelector{MatchLabels: from},
&metav1.LabelSelector{MatchLabels: to},
)
}

func NewPolicyFromLabelSelectors(
name string,
action v1alpha1.AccessPolicyAction,
from, to *metav1.LabelSelector,
) *v1alpha1.AccessPolicy {
if from == nil {
from = &metav1.LabelSelector{MatchLabels: nil}
}
if to == nil {
to = &metav1.LabelSelector{MatchLabels: nil}
}

return &v1alpha1.AccessPolicy{
ObjectMeta: metav1.ObjectMeta{
Name: name,
},
Spec: v1alpha1.AccessPolicySpec{
Action: action,
From: v1alpha1.WorkloadSetOrSelectorList{{
WorkloadSelector: &metav1.LabelSelector{MatchLabels: from},
WorkloadSelector: from,
}},
To: v1alpha1.WorkloadSetOrSelectorList{{
WorkloadSelector: &metav1.LabelSelector{MatchLabels: to},
WorkloadSelector: to,
}},
},
}
Expand Down
Loading