Skip to content

Commit

Permalink
controlplane/control: Introduce peer manager
Browse files Browse the repository at this point in the history
This commit adds the peerManager which is responsible for monitoring peers,
and updating peer status (reachable or not).

Signed-off-by: Or Ozeri <[email protected]>
  • Loading branch information
orozery committed Feb 29, 2024
1 parent 2ac8493 commit 6565b83
Show file tree
Hide file tree
Showing 14 changed files with 419 additions and 126 deletions.
18 changes: 11 additions & 7 deletions cmd/cl-controlplane/app/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -159,12 +159,6 @@ func (o *Options) Run() error {
httpServer := utilrest.NewServer("controlplane-http", parsedCertData.ServerConfig())
grpcServer := grpc.NewServer("controlplane-grpc", parsedCertData.ServerConfig())

runnableManager := runnable.NewManager()
runnableManager.Add(controller.NewManager(mgr))
runnableManager.AddServer(httpServerAddress, httpServer)
runnableManager.AddServer(grpcServerAddress, grpcServer)
runnableManager.AddServer(controlplaneServerListenAddress, sniProxy)

authzManager, err := authz.NewManager(parsedCertData)
if err != nil {
return fmt.Errorf("cannot create authorization manager: %w", err)
Expand All @@ -175,7 +169,7 @@ func (o *Options) Run() error {
return fmt.Errorf("cannot create authz controllers: %w", err)
}

controlManager := control.NewManager(mgr.GetClient(), o.CRDMode)
controlManager := control.NewManager(mgr.GetClient(), parsedCertData, o.CRDMode)

xdsManager := xds.NewManager()
xds.RegisterService(
Expand All @@ -202,6 +196,16 @@ func (o *Options) Run() error {
}

cprest.RegisterHandlers(cp, httpServer)
controlManager.SetStatusCallback(func(pr *v1alpha1.Peer) {
authzManager.AddPeer(pr)
})

runnableManager := runnable.NewManager()
runnableManager.Add(controller.NewManager(mgr))
runnableManager.Add(controlManager)
runnableManager.AddServer(httpServerAddress, httpServer)
runnableManager.AddServer(grpcServerAddress, grpcServer)
runnableManager.AddServer(controlplaneServerListenAddress, sniProxy)

return runnableManager.Run()
}
Expand Down
73 changes: 73 additions & 0 deletions config/crds/clusterlink.net_peers.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,79 @@ spec:
required:
- gateways
type: object
status:
description: Status represents the peer status.
properties:
conditions:
description: Conditions of the peer.
items:
description: "Condition contains details for one aspect of the current
state of this API Resource. --- This struct is intended for direct
use as an array at the field path .status.conditions. For example,
\n type FooStatus struct{ // Represents the observations of a
foo's current state. // Known .status.conditions.type are: \"Available\",
\"Progressing\", and \"Degraded\" // +patchMergeKey=type // +patchStrategy=merge
// +listType=map // +listMapKey=type Conditions []metav1.Condition
`json:\"conditions,omitempty\" patchStrategy:\"merge\" patchMergeKey:\"type\"
protobuf:\"bytes,1,rep,name=conditions\"` \n // other fields }"
properties:
lastTransitionTime:
description: lastTransitionTime is the last time the condition
transitioned from one status to another. This should be when
the underlying condition changed. If that is not known, then
using the time when the API field changed is acceptable.
format: date-time
type: string
message:
description: message is a human readable message indicating
details about the transition. This may be an empty string.
maxLength: 32768
type: string
observedGeneration:
description: observedGeneration represents the .metadata.generation
that the condition was set based upon. For instance, if .metadata.generation
is currently 12, but the .status.conditions[x].observedGeneration
is 9, the condition is out of date with respect to the current
state of the instance.
format: int64
minimum: 0
type: integer
reason:
description: reason contains a programmatic identifier indicating
the reason for the condition's last transition. Producers
of specific condition types may define expected values and
meanings for this field, and whether the values are considered
a guaranteed API. The value should be a CamelCase string.
This field may not be empty.
maxLength: 1024
minLength: 1
pattern: ^[A-Za-z]([A-Za-z0-9_,:]*[A-Za-z0-9_])?$
type: string
status:
description: status of the condition, one of True, False, Unknown.
enum:
- "True"
- "False"
- Unknown
type: string
type:
description: type of condition in CamelCase or in foo.example.com/CamelCase.
--- Many .condition.type values are consistent across resources
like Available, but because arbitrary conditions can be useful
(see .node.status.conditions), the ability to deconflict is
important. The regex it matches is (dns1123SubdomainFmt/)?(qualifiedNameFmt)
maxLength: 316
pattern: ^([a-z0-9]([-a-z0-9]*[a-z0-9])?(\.[a-z0-9]([-a-z0-9]*[a-z0-9])?)*/)?(([A-Za-z0-9][-A-Za-z0-9_.]*)?[A-Za-z0-9])$
type: string
required:
- lastTransitionTime
- message
- reason
- status
- type
type: object
type: array
type: object
required:
- spec
type: object
Expand Down
6 changes: 6 additions & 0 deletions config/operator/rbac/role.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,12 @@ rules:
- get
- patch
- update
- apiGroups:
- clusterlink.net
resources:
- peers/status
verbs:
- update
- apiGroups:
- rbac.authorization.k8s.io
resources:
Expand Down
1 change: 1 addition & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ module github.com/clusterlink-net/clusterlink
go 1.20

require (
github.com/cenkalti/backoff/v4 v4.2.1
github.com/envoyproxy/go-control-plane v0.12.0
github.com/go-chi/chi v4.1.2+incompatible
github.com/google/uuid v1.6.0
Expand Down
2 changes: 2 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@ github.com/armon/go-proxyproto v0.0.0-20210323213023-7e956b284f0a/go.mod h1:QmP9
github.com/armon/go-socks5 v0.0.0-20160902184237-e75332964ef5 h1:0CwZNZbxp69SHPdPJAN/hZIm0C4OItdklCFmMRWYpio=
github.com/beorn7/perks v1.0.1 h1:VlbKKnNfV8bJzeqoa4cOKqO6bYr3WgKZxO8Z16+hsOM=
github.com/beorn7/perks v1.0.1/go.mod h1:G2ZrVWU2WbWT9wwq4/hrbKbnv/1ERSJQ0ibhJ6rlkpw=
github.com/cenkalti/backoff/v4 v4.2.1 h1:y4OZtCnogmCPw98Zjyt5a6+QwPLGkiQsYW5oUqylYbM=
github.com/cenkalti/backoff/v4 v4.2.1/go.mod h1:Y3VNntkOUPxTVeUxJ/G5vcM//AlwfmyYozVcomhLiZE=
github.com/census-instrumentation/opencensus-proto v0.4.1 h1:iKLQ0xPNFxR/2hzXZMrBo8f1j86j5WHzznCCQxV/b8g=
github.com/census-instrumentation/opencensus-proto v0.4.1/go.mod h1:4T9NM4+4Vw91VeyqjLS6ao50K5bOcLKN6Q42XnYaRYw=
github.com/cespare/xxhash/v2 v2.2.0 h1:DC2CZ1Ep5Y4k3ZQ899DldepgrayRUGE6BBZ/cd9Cj44=
Expand Down
10 changes: 9 additions & 1 deletion pkg/apis/clusterlink.net/v1alpha1/peer.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,8 @@ type Peer struct {

// Spec represents the peer attributes.
Spec PeerSpec `json:"spec"`
// Status represents the peer status.
Status PeerStatus `json:"status,omitempty"`
}

// Endpoint represents a network endpoint (i.e., host or IP and a port).
Expand All @@ -43,9 +45,15 @@ type PeerSpec struct {
Gateways []Endpoint `json:"gateways"`
}

const (
// PeerReachable is a condition type for indicating whether a peer is reachable (heartbeat responding).
PeerReachable string = "PeerReachable"
)

// PeerStatus represents the status of a peer.
type PeerStatus struct {
// TODO: add fields
// Conditions of the peer.
Conditions []metav1.Condition `json:"conditions,omitempty"`
}

// +kubebuilder:object:root=true
Expand Down
8 changes: 8 additions & 0 deletions pkg/apis/clusterlink.net/v1alpha1/zz_generated.deepcopy.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

3 changes: 3 additions & 0 deletions pkg/bootstrap/platform/k8s.go
Original file line number Diff line number Diff line change
Expand Up @@ -280,6 +280,9 @@ rules:
- apiGroups: ["clusterlink.net"]
resources: ["imports"]
verbs: ["update"]
- apiGroups: ["clusterlink.net"]
resources: ["peers/status"]
verbs: ["update"]
{{ end }}
---
apiVersion: rbac.authorization.k8s.io/v1
Expand Down
24 changes: 5 additions & 19 deletions pkg/controlplane/authz/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import (
"github.com/lestrrat-go/jwx/jwt"
"github.com/sirupsen/logrus"
v1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/meta"
"k8s.io/apimachinery/pkg/types"

"github.com/clusterlink-net/clusterlink/pkg/api"
Expand Down Expand Up @@ -113,39 +114,24 @@ func (m *Manager) AddPeer(pr *v1alpha1.Peer) {
client := peer.NewClient(pr, m.peerTLS.ClientConfig(pr.Name))

m.peerLock.Lock()
oldClient := m.peerClient[pr.Name]
m.peerClient[pr.Name] = client
m.peerLock.Unlock()

if oldClient != nil {
oldClient.StopMonitor()
}

m.policyDecider.AddPeer(pr.Name)

client.SetPeerStatusCallback(func(isActive bool) {
if isActive {
m.policyDecider.AddPeer(pr.Name)
return
}

if meta.IsStatusConditionTrue(pr.Status.Conditions, v1alpha1.PeerReachable) {
m.policyDecider.AddPeer(pr.Name)
} else {
m.policyDecider.DeletePeer(pr.Name)
})
}
}

// DeletePeer removes the possibility for egress dataplane connections to be routed to a given peer.
func (m *Manager) DeletePeer(name string) {
m.logger.Infof("Deleting peer '%s'.", name)

m.peerLock.Lock()
oldClient := m.peerClient[name]
delete(m.peerClient, name)
m.peerLock.Unlock()

if oldClient != nil {
oldClient.StopMonitor()
}

m.policyDecider.DeletePeer(name)
}

Expand Down
15 changes: 9 additions & 6 deletions pkg/controlplane/control/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@ package control
import (
"context"
"fmt"

"github.com/sirupsen/logrus"
v1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/errors"
Expand All @@ -29,13 +28,16 @@ import (
"github.com/clusterlink-net/clusterlink/pkg/api"
"github.com/clusterlink-net/clusterlink/pkg/apis/clusterlink.net/v1alpha1"
"github.com/clusterlink-net/clusterlink/pkg/util/net"
"github.com/clusterlink-net/clusterlink/pkg/util/tls"
)

// Manager is responsible for handling control operations,
// which needs to be coordinated across all dataplane/controlplane instances.
// This includes target port generation for imported services, as well as
// k8s service creation per imported service.
type Manager struct {
peerManager

client client.Client
crdMode bool
ports *portManager
Expand Down Expand Up @@ -219,13 +221,14 @@ func serviceChanged(svc1, svc2 *v1.Service) bool {
}

// NewManager returns a new control manager.
func NewManager(cl client.Client, crdMode bool) *Manager {
func NewManager(cl client.Client, peerTLS *tls.ParsedCertData, crdMode bool) *Manager {
logger := logrus.WithField("component", "controlplane.control.manager")

return &Manager{
client: cl,
crdMode: crdMode,
ports: newPortManager(),
logger: logger,
peerManager: newPeerManager(cl, peerTLS),
client: cl,
crdMode: crdMode,
ports: newPortManager(),
logger: logger,
}
}
Loading

0 comments on commit 6565b83

Please sign in to comment.