Skip to content

Commit

Permalink
Peers to share their labels with other peers thru heartbeat mechanism (
Browse files Browse the repository at this point in the history
…#686)

* using heartbeat to share peer attributes
* Store remote Peer labels in the CR Status
* Add test for destination-peer attributes
* Update docs to include peer labels in the list of available attributes

Signed-off-by: Ziv Nevo <[email protected]>
  • Loading branch information
zivnevo authored Aug 22, 2024
1 parent 593d248 commit 6b80d32
Show file tree
Hide file tree
Showing 10 changed files with 103 additions and 27 deletions.
5 changes: 5 additions & 0 deletions config/crds/clusterlink.net_peers.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -133,6 +133,11 @@ spec:
- type
type: object
type: array
labels:
additionalProperties:
type: string
description: Labels holds peer labels, as reported by the remote peer
type: object
type: object
required:
- spec
Expand Down
2 changes: 2 additions & 0 deletions pkg/apis/clusterlink.net/v1alpha1/peer.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,8 @@ const (
type PeerStatus struct {
// Conditions of the peer.
Conditions []metav1.Condition `json:"conditions,omitempty"`
// Labels holds peer labels, as reported by the remote peer
Labels map[string]string `json:"labels,omitempty"`
}

// +kubebuilder:object:root=true
Expand Down
7 changes: 7 additions & 0 deletions pkg/apis/clusterlink.net/v1alpha1/zz_generated.deepcopy.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 2 additions & 0 deletions pkg/controlplane/api/heartbeat.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,4 +16,6 @@ package api
const (
// HeartbeatPath is the path for Heartbeat requests from remote peers.
HeartbeatPath = "/healthz"
// PeerLabelsCustomHeader is a custom HTTP response header for reporting peer labels.
PeerLabelsCustomHeader = "ClusterLinkPeerLabels"
)
45 changes: 26 additions & 19 deletions pkg/controlplane/authz/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -243,11 +243,31 @@ func (m *Manager) getSrcAttributes(req *egressAuthorizationRequest) connectivity
clientAttrs[PeerLabelsPrefix+k] = v
}

m.logger.Infof("Client attributes: %v.", clientAttrs)
m.logger.Debugf("Client attributes: %v.", clientAttrs)

return clientAttrs
}

func (m *Manager) getDstAttributes(svcName, svcNS, peerName string,
svcLabels, peerLabels map[string]string,
) connectivitypdp.WorkloadAttrs {
dstAttributes := connectivitypdp.WorkloadAttrs{
ServiceNameLabel: svcName,
ServiceNamespaceLabel: svcNS,
PeerNameLabel: peerName,
}
for k, v := range svcLabels {
dstAttributes[ServiceLabelsPrefix+k] = v
}
for k, v := range peerLabels {
dstAttributes[PeerLabelsPrefix+k] = v
}

m.logger.Debugf("dstAttributes: %v", dstAttributes)

return dstAttributes
}

// authorizeEgress authorizes a request for accessing an imported service.
func (m *Manager) authorizeEgress(ctx context.Context, req *egressAuthorizationRequest) (*egressAuthorizationResponse, error) {
m.logger.Infof("Received egress authorization request: %v.", req)
Expand All @@ -262,11 +282,6 @@ func (m *Manager) authorizeEgress(ctx context.Context, req *egressAuthorizationR
return nil, fmt.Errorf("cannot get import %v: %w", req.ImportName, err)
}

dstAttributes := connectivitypdp.WorkloadAttrs{}
for k, v := range imp.Labels { // add import labels to destination attributes
dstAttributes[ServiceLabelsPrefix+k] = v
}

lbResult := NewLoadBalancingResult(&imp)
for {
if err := m.loadBalancer.Select(lbResult); err != nil {
Expand All @@ -291,10 +306,10 @@ func (m *Manager) authorizeEgress(ctx context.Context, req *egressAuthorizationR
}
}

dstAttributes[ServiceNameLabel] = importSource.ExportName
dstAttributes[ServiceNamespaceLabel] = importSource.ExportNamespace
dstAttributes[PeerNameLabel] = importSource.Peer

dstAttributes := m.getDstAttributes(
importSource.ExportName, importSource.ExportNamespace,
importSource.Peer, imp.Labels, pr.Status.Labels,
)
decision, err := m.connectivityPDP.Decide(srcAttributes, dstAttributes, req.ImportName.Namespace)
if err != nil {
return nil, fmt.Errorf("error deciding on an egress connection: %w", err)
Expand Down Expand Up @@ -403,22 +418,14 @@ func (m *Manager) authorizeIngress(

resp.ServiceExists = true

dstAttributes := connectivitypdp.WorkloadAttrs{
ServiceNameLabel: export.Name,
ServiceNamespaceLabel: export.Namespace,
PeerNameLabel: m.getPeerName(),
}
for k, v := range export.Labels { // add export labels to destination attributes
dstAttributes[ServiceLabelsPrefix+k] = v
}

// do not allow requests from clients with no attributes if the PDP has attribute-dependent policies
if len(req.SrcAttributes) == 0 && m.connectivityPDP.DependsOnClientAttrs() {
m.logger.Infof("PDP not allowing connection: No client attributes")
resp.Allowed = false
return resp, nil
}

dstAttributes := m.getDstAttributes(export.Name, export.Namespace, m.getPeerName(), export.Labels, m.peerLabels)
decision, err := m.connectivityPDP.Decide(req.SrcAttributes, dstAttributes, req.ServiceName.Namespace)
if err != nil {
return nil, fmt.Errorf("error deciding on an ingress connection: %w", err)
Expand Down
15 changes: 13 additions & 2 deletions pkg/controlplane/authz/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ package authz

import (
"context"
"encoding/base64"
"encoding/json"
"fmt"
"net/http"
Expand Down Expand Up @@ -116,13 +117,23 @@ func (s *server) checkEgress(ctx context.Context, req *authv3.CheckRequest) *aut
})
}

func (s *server) encodePeerLabels() string {
data, err := json.Marshal(&s.manager.peerLabels)
if err != nil {
data = []byte{'{', '}'} // empty map
}
return base64.StdEncoding.EncodeToString(data)
}

// check an ingress dataplane connection.
func (s *server) checkIngress(ctx context.Context, req *authv3.CheckRequest) *authv3.CheckResponse {
httpReq := req.Attributes.Request.Http
switch {
case httpReq.Method == http.MethodGet && httpReq.Path == api.HeartbeatPath:
// heartbeat request always simply allowed
return buildAllowedResponse(&authv3.OkHttpResponse{})
// heartbeat request always simply allowed. Peer labels are added to the OK response.
hv := &corev3.HeaderValue{Key: api.PeerLabelsCustomHeader, Value: s.encodePeerLabels()}
hvo := &corev3.HeaderValueOption{Header: hv, AppendAction: corev3.HeaderValueOption_APPEND_IF_EXISTS_OR_ADD}
return buildAllowedResponse(&authv3.OkHttpResponse{ResponseHeadersToAdd: []*corev3.HeaderValueOption{hvo}})
case httpReq.Method == http.MethodPost && httpReq.Path == api.RemotePeerAuthorizationPath:
return s.checkAuthorizationRequest(ctx, httpReq)
case httpReq.Method == http.MethodConnect:
Expand Down
13 changes: 12 additions & 1 deletion pkg/controlplane/control/peer.go
Original file line number Diff line number Diff line change
Expand Up @@ -121,7 +121,7 @@ func (m *peerMonitor) Start() {
break
}

heartbeatErr := m.getClient().GetHeartbeat()
peerLabels, heartbeatErr := m.getClient().GetHeartbeat()
heartbeatOK := heartbeatErr == nil
if healthy == heartbeatOK {
if !healthy {
Expand Down Expand Up @@ -159,6 +159,7 @@ func (m *peerMonitor) Start() {

m.lock.Lock()
meta.SetStatusCondition(&m.pr.Status.Conditions, reachableCond)
m.pr.Status.Labels = peerLabels
m.lock.Unlock()

m.statusCallback(m.pr)
Expand Down Expand Up @@ -292,6 +293,16 @@ func peerChanged(pr1, pr2 *v1alpha1.Peer) bool {
}
}

if len(pr1.Status.Labels) != len(pr2.Status.Labels) {
return true
}

for key, val := range pr1.Status.Labels {
if val != pr2.Status.Labels[key] {
return true
}
}

return false
}

Expand Down
20 changes: 16 additions & 4 deletions pkg/controlplane/peer/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ package peer
import (
"context"
"crypto/tls"
"encoding/base64"
"encoding/json"
"errors"
"fmt"
Expand Down Expand Up @@ -117,20 +118,31 @@ func (c *Client) Authorize(ctx context.Context, req *api.AuthorizationRequest) (
}

// GetHeartbeat get a heartbeat from other peers.
func (c *Client) GetHeartbeat() error {
// If check is successful, also return the labels of the remote peer.
func (c *Client) GetHeartbeat() (map[string]string, error) {
serverResp, err := c.getResponse(func(client *jsonapi.Client) (*jsonapi.Response, error) {
return client.Get(context.Background(), api.HeartbeatPath)
})
if err != nil {
return err
return nil, err
}

if serverResp.Status != http.StatusOK {
return fmt.Errorf("unable to get heartbeat (%d), server returned: %s",
return nil, fmt.Errorf("unable to get heartbeat (%d), server returned: %s",
serverResp.Status, serverResp.Body)
}

return nil
peerLabelsHeader := serverResp.Headers.Get(api.PeerLabelsCustomHeader)
sDec, err := base64.StdEncoding.DecodeString(peerLabelsHeader)
if err != nil {
return nil, err
}
var peerLabels map[string]string
if err := json.Unmarshal(sDec, &peerLabels); err != nil {
return nil, err
}

return peerLabels, nil
}

// Peer object this client corresponds to.
Expand Down
18 changes: 18 additions & 0 deletions tests/e2e/k8s/test_policy.go
Original file line number Diff line number Diff line change
Expand Up @@ -260,6 +260,24 @@ func (s *TestSuite) TestPeerLabels() {
_, err = cl[1].AccessService(httpecho.RunClientInPod, importedService, false, nil)
require.Nil(s.T(), err)

// 3. Creating a deny policy in cl[1] that doesn't have cl[0] label - should have no effect on connection to cl[0]
dstLabels := map[string]string{
authz.PeerLabelsPrefix + util.PeerIPLabel: "not.cl.0.ip",
}
denyNotCl0 := util.NewPolicy("deny-other-cluster", v1alpha1.AccessPolicyActionDeny, nil, dstLabels)
require.Nil(s.T(), cl[1].CreatePolicy(denyNotCl0))
_, err = cl[1].AccessService(httpecho.RunClientInPod, importedService, false, nil)
require.Nil(s.T(), err)

// 4. Creating a deny policy in cl[1] to block connections to cl[0]
dstLabels = map[string]string{
authz.PeerLabelsPrefix + util.PeerIPLabel: cl[0].Cluster().IP(),
}
denyCl0 := util.NewPolicy("deny-cl0-cluster", v1alpha1.AccessPolicyActionDeny, nil, dstLabels)
require.Nil(s.T(), cl[1].CreatePolicy(denyCl0))
_, err = cl[1].AccessService(httpecho.RunClientInPod, importedService, false, nil)
require.NotNil(s.T(), err)

// privileged policies are not namespaced, so remain after the test's namespace is deleted
require.Nil(s.T(), cl[0].DeletePrivilegedPolicy(allowCl1.Name))
}
Expand Down
3 changes: 2 additions & 1 deletion website/content/en/docs/main/concepts/policies.md
Original file line number Diff line number Diff line change
Expand Up @@ -164,7 +164,8 @@ More examples are available on our repo under [examples/policies][].
### Available attributes
The following attributes (labels) are set by ClusterLink on each connection request, and can be used in access policies within a `workloadSelector`.
#### Peer attributes - set when running `clusterlink deploy peer`
* `peer.clusterlink.net/name` - Peer name
* `peer.clusterlink.net/name` - Peer name as set by the `--name` flag
* `peer.clusterlink.net/labels.<label-key>` - Peer's labels, set by using `--label` flags
#### Client attributes - derived from Pod info, as retrieved from Kubernetes API. Only relevant in the `from` section of access policies
* `client.clusterlink.net/namespace` - Pod's Namespace
* `client.clusterlink.net/service-account` - Pod's Service Account
Expand Down

0 comments on commit 6b80d32

Please sign in to comment.