diff --git a/cmd/cl-controlplane/app/server.go b/cmd/cl-controlplane/app/server.go index 416bf4311..719a69921 100644 --- a/cmd/cl-controlplane/app/server.go +++ b/cmd/cl-controlplane/app/server.go @@ -159,12 +159,6 @@ func (o *Options) Run() error { httpServer := utilrest.NewServer("controlplane-http", parsedCertData.ServerConfig()) grpcServer := grpc.NewServer("controlplane-grpc", parsedCertData.ServerConfig()) - runnableManager := runnable.NewManager() - runnableManager.Add(controller.NewManager(mgr)) - runnableManager.AddServer(httpServerAddress, httpServer) - runnableManager.AddServer(grpcServerAddress, grpcServer) - runnableManager.AddServer(controlplaneServerListenAddress, sniProxy) - authzManager, err := authz.NewManager(parsedCertData) if err != nil { return fmt.Errorf("cannot create authorization manager: %w", err) @@ -175,7 +169,7 @@ func (o *Options) Run() error { return fmt.Errorf("cannot create authz controllers: %w", err) } - controlManager := control.NewManager(mgr.GetClient(), o.CRDMode) + controlManager := control.NewManager(mgr.GetClient(), parsedCertData, o.CRDMode) xdsManager := xds.NewManager() xds.RegisterService( @@ -202,6 +196,16 @@ func (o *Options) Run() error { } cprest.RegisterHandlers(cp, httpServer) + controlManager.SetStatusCallback(func(pr *v1alpha1.Peer) { + authzManager.AddPeer(pr) + }) + + runnableManager := runnable.NewManager() + runnableManager.Add(controller.NewManager(mgr)) + runnableManager.Add(controlManager) + runnableManager.AddServer(httpServerAddress, httpServer) + runnableManager.AddServer(grpcServerAddress, grpcServer) + runnableManager.AddServer(controlplaneServerListenAddress, sniProxy) return runnableManager.Run() } diff --git a/config/crds/clusterlink.net_peers.yaml b/config/crds/clusterlink.net_peers.yaml index a4abb9171..826226474 100644 --- a/config/crds/clusterlink.net_peers.yaml +++ b/config/crds/clusterlink.net_peers.yaml @@ -55,6 +55,79 @@ spec: required: - gateways type: object + status: + description: Status represents the peer status. + properties: + conditions: + description: Conditions of the peer. + items: + description: "Condition contains details for one aspect of the current + state of this API Resource. --- This struct is intended for direct + use as an array at the field path .status.conditions. For example, + \n type FooStatus struct{ // Represents the observations of a + foo's current state. // Known .status.conditions.type are: \"Available\", + \"Progressing\", and \"Degraded\" // +patchMergeKey=type // +patchStrategy=merge + // +listType=map // +listMapKey=type Conditions []metav1.Condition + `json:\"conditions,omitempty\" patchStrategy:\"merge\" patchMergeKey:\"type\" + protobuf:\"bytes,1,rep,name=conditions\"` \n // other fields }" + properties: + lastTransitionTime: + description: lastTransitionTime is the last time the condition + transitioned from one status to another. This should be when + the underlying condition changed. If that is not known, then + using the time when the API field changed is acceptable. + format: date-time + type: string + message: + description: message is a human readable message indicating + details about the transition. This may be an empty string. + maxLength: 32768 + type: string + observedGeneration: + description: observedGeneration represents the .metadata.generation + that the condition was set based upon. For instance, if .metadata.generation + is currently 12, but the .status.conditions[x].observedGeneration + is 9, the condition is out of date with respect to the current + state of the instance. + format: int64 + minimum: 0 + type: integer + reason: + description: reason contains a programmatic identifier indicating + the reason for the condition's last transition. Producers + of specific condition types may define expected values and + meanings for this field, and whether the values are considered + a guaranteed API. The value should be a CamelCase string. + This field may not be empty. + maxLength: 1024 + minLength: 1 + pattern: ^[A-Za-z]([A-Za-z0-9_,:]*[A-Za-z0-9_])?$ + type: string + status: + description: status of the condition, one of True, False, Unknown. + enum: + - "True" + - "False" + - Unknown + type: string + type: + description: type of condition in CamelCase or in foo.example.com/CamelCase. + --- Many .condition.type values are consistent across resources + like Available, but because arbitrary conditions can be useful + (see .node.status.conditions), the ability to deconflict is + important. The regex it matches is (dns1123SubdomainFmt/)?(qualifiedNameFmt) + maxLength: 316 + pattern: ^([a-z0-9]([-a-z0-9]*[a-z0-9])?(\.[a-z0-9]([-a-z0-9]*[a-z0-9])?)*/)?(([A-Za-z0-9][-A-Za-z0-9_.]*)?[A-Za-z0-9])$ + type: string + required: + - lastTransitionTime + - message + - reason + - status + - type + type: object + type: array + type: object required: - spec type: object diff --git a/config/operator/rbac/role.yaml b/config/operator/rbac/role.yaml index 30b65199d..3840e1a33 100644 --- a/config/operator/rbac/role.yaml +++ b/config/operator/rbac/role.yaml @@ -87,6 +87,12 @@ rules: - get - patch - update +- apiGroups: + - clusterlink.net + resources: + - peers/status + verbs: + - update - apiGroups: - rbac.authorization.k8s.io resources: diff --git a/go.mod b/go.mod index 6e2e88d87..3d4bbf7ce 100644 --- a/go.mod +++ b/go.mod @@ -3,6 +3,7 @@ module github.com/clusterlink-net/clusterlink go 1.20 require ( + github.com/cenkalti/backoff/v4 v4.2.1 github.com/envoyproxy/go-control-plane v0.12.0 github.com/go-chi/chi v4.1.2+incompatible github.com/google/uuid v1.6.0 diff --git a/go.sum b/go.sum index 35af0da88..63282dc4b 100644 --- a/go.sum +++ b/go.sum @@ -2,6 +2,8 @@ github.com/armon/go-proxyproto v0.0.0-20210323213023-7e956b284f0a/go.mod h1:QmP9 github.com/armon/go-socks5 v0.0.0-20160902184237-e75332964ef5 h1:0CwZNZbxp69SHPdPJAN/hZIm0C4OItdklCFmMRWYpio= github.com/beorn7/perks v1.0.1 h1:VlbKKnNfV8bJzeqoa4cOKqO6bYr3WgKZxO8Z16+hsOM= github.com/beorn7/perks v1.0.1/go.mod h1:G2ZrVWU2WbWT9wwq4/hrbKbnv/1ERSJQ0ibhJ6rlkpw= +github.com/cenkalti/backoff/v4 v4.2.1 h1:y4OZtCnogmCPw98Zjyt5a6+QwPLGkiQsYW5oUqylYbM= +github.com/cenkalti/backoff/v4 v4.2.1/go.mod h1:Y3VNntkOUPxTVeUxJ/G5vcM//AlwfmyYozVcomhLiZE= github.com/census-instrumentation/opencensus-proto v0.4.1 h1:iKLQ0xPNFxR/2hzXZMrBo8f1j86j5WHzznCCQxV/b8g= github.com/census-instrumentation/opencensus-proto v0.4.1/go.mod h1:4T9NM4+4Vw91VeyqjLS6ao50K5bOcLKN6Q42XnYaRYw= github.com/cespare/xxhash/v2 v2.2.0 h1:DC2CZ1Ep5Y4k3ZQ899DldepgrayRUGE6BBZ/cd9Cj44= diff --git a/pkg/apis/clusterlink.net/v1alpha1/peer.go b/pkg/apis/clusterlink.net/v1alpha1/peer.go index 5dfcfd703..74df49000 100644 --- a/pkg/apis/clusterlink.net/v1alpha1/peer.go +++ b/pkg/apis/clusterlink.net/v1alpha1/peer.go @@ -27,6 +27,8 @@ type Peer struct { // Spec represents the peer attributes. Spec PeerSpec `json:"spec"` + // Status represents the peer status. + Status PeerStatus `json:"status,omitempty"` } // Endpoint represents a network endpoint (i.e., host or IP and a port). @@ -43,9 +45,15 @@ type PeerSpec struct { Gateways []Endpoint `json:"gateways"` } +const ( + // PeerReachable is a condition type for indicating whether a peer is reachable (heartbeat responding). + PeerReachable string = "PeerReachable" +) + // PeerStatus represents the status of a peer. type PeerStatus struct { - // TODO: add fields + // Conditions of the peer. + Conditions []metav1.Condition `json:"conditions,omitempty"` } // +kubebuilder:object:root=true diff --git a/pkg/apis/clusterlink.net/v1alpha1/zz_generated.deepcopy.go b/pkg/apis/clusterlink.net/v1alpha1/zz_generated.deepcopy.go index b65552fe5..40188a81d 100644 --- a/pkg/apis/clusterlink.net/v1alpha1/zz_generated.deepcopy.go +++ b/pkg/apis/clusterlink.net/v1alpha1/zz_generated.deepcopy.go @@ -494,6 +494,7 @@ func (in *Peer) DeepCopyInto(out *Peer) { out.TypeMeta = in.TypeMeta in.ObjectMeta.DeepCopyInto(&out.ObjectMeta) in.Spec.DeepCopyInto(&out.Spec) + in.Status.DeepCopyInto(&out.Status) } // DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new Peer. @@ -569,6 +570,13 @@ func (in *PeerSpec) DeepCopy() *PeerSpec { // DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. func (in *PeerStatus) DeepCopyInto(out *PeerStatus) { *out = *in + if in.Conditions != nil { + in, out := &in.Conditions, &out.Conditions + *out = make([]v1.Condition, len(*in)) + for i := range *in { + (*in)[i].DeepCopyInto(&(*out)[i]) + } + } } // DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new PeerStatus. diff --git a/pkg/bootstrap/platform/k8s.go b/pkg/bootstrap/platform/k8s.go index 1b86cb5cb..401ec923e 100644 --- a/pkg/bootstrap/platform/k8s.go +++ b/pkg/bootstrap/platform/k8s.go @@ -280,6 +280,9 @@ rules: - apiGroups: ["clusterlink.net"] resources: ["imports"] verbs: ["update"] +- apiGroups: ["clusterlink.net"] + resources: ["peers/status"] + verbs: ["update"] {{ end }} --- apiVersion: rbac.authorization.k8s.io/v1 diff --git a/pkg/controlplane/authz/manager.go b/pkg/controlplane/authz/manager.go index 855148557..eb37c00aa 100644 --- a/pkg/controlplane/authz/manager.go +++ b/pkg/controlplane/authz/manager.go @@ -24,6 +24,7 @@ import ( "github.com/lestrrat-go/jwx/jwt" "github.com/sirupsen/logrus" v1 "k8s.io/api/core/v1" + "k8s.io/apimachinery/pkg/api/meta" "k8s.io/apimachinery/pkg/types" "github.com/clusterlink-net/clusterlink/pkg/api" @@ -113,24 +114,14 @@ func (m *Manager) AddPeer(pr *v1alpha1.Peer) { client := peer.NewClient(pr, m.peerTLS.ClientConfig(pr.Name)) m.peerLock.Lock() - oldClient := m.peerClient[pr.Name] m.peerClient[pr.Name] = client m.peerLock.Unlock() - if oldClient != nil { - oldClient.StopMonitor() - } - - m.policyDecider.AddPeer(pr.Name) - - client.SetPeerStatusCallback(func(isActive bool) { - if isActive { - m.policyDecider.AddPeer(pr.Name) - return - } - + if meta.IsStatusConditionTrue(pr.Status.Conditions, v1alpha1.PeerReachable) { + m.policyDecider.AddPeer(pr.Name) + } else { m.policyDecider.DeletePeer(pr.Name) - }) + } } // DeletePeer removes the possibility for egress dataplane connections to be routed to a given peer. @@ -138,14 +129,9 @@ func (m *Manager) DeletePeer(name string) { m.logger.Infof("Deleting peer '%s'.", name) m.peerLock.Lock() - oldClient := m.peerClient[name] delete(m.peerClient, name) m.peerLock.Unlock() - if oldClient != nil { - oldClient.StopMonitor() - } - m.policyDecider.DeletePeer(name) } diff --git a/pkg/controlplane/control/manager.go b/pkg/controlplane/control/manager.go index b124209d7..7b6838723 100644 --- a/pkg/controlplane/control/manager.go +++ b/pkg/controlplane/control/manager.go @@ -16,7 +16,6 @@ package control import ( "context" "fmt" - "github.com/sirupsen/logrus" v1 "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/api/errors" @@ -29,6 +28,7 @@ import ( "github.com/clusterlink-net/clusterlink/pkg/api" "github.com/clusterlink-net/clusterlink/pkg/apis/clusterlink.net/v1alpha1" "github.com/clusterlink-net/clusterlink/pkg/util/net" + "github.com/clusterlink-net/clusterlink/pkg/util/tls" ) // Manager is responsible for handling control operations, @@ -36,6 +36,8 @@ import ( // This includes target port generation for imported services, as well as // k8s service creation per imported service. type Manager struct { + peerManager + client client.Client crdMode bool ports *portManager @@ -219,13 +221,14 @@ func serviceChanged(svc1, svc2 *v1.Service) bool { } // NewManager returns a new control manager. -func NewManager(cl client.Client, crdMode bool) *Manager { +func NewManager(cl client.Client, peerTLS *tls.ParsedCertData, crdMode bool) *Manager { logger := logrus.WithField("component", "controlplane.control.manager") return &Manager{ - client: cl, - crdMode: crdMode, - ports: newPortManager(), - logger: logger, + peerManager: newPeerManager(cl, peerTLS), + client: cl, + crdMode: crdMode, + ports: newPortManager(), + logger: logger, } } diff --git a/pkg/controlplane/control/peer.go b/pkg/controlplane/control/peer.go new file mode 100644 index 000000000..2550a55e7 --- /dev/null +++ b/pkg/controlplane/control/peer.go @@ -0,0 +1,276 @@ +// Copyright 2023 The ClusterLink Authors. +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package control + +import ( + "context" + "sync" + "time" + + "github.com/cenkalti/backoff/v4" + "github.com/sirupsen/logrus" + "k8s.io/apimachinery/pkg/api/meta" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "sigs.k8s.io/controller-runtime/pkg/client" + + "github.com/clusterlink-net/clusterlink/pkg/apis/clusterlink.net/v1alpha1" + "github.com/clusterlink-net/clusterlink/pkg/controlplane/peer" + "github.com/clusterlink-net/clusterlink/pkg/util/tls" +) + +const ( + heartbeatInterval = 10 * time.Second +) + +// peerMonitor monitors a single peer. +type peerMonitor struct { + lock sync.Mutex + pr *v1alpha1.Peer + client *peer.Client + statusCallback func(*v1alpha1.Peer) + + wg *sync.WaitGroup + stopCh chan struct{} + + logger *logrus.Entry +} + +// peerManager manages peers status. +type peerManager struct { + client client.Client + peerTLS *tls.ParsedCertData + statusCallback func(*v1alpha1.Peer) + + lock sync.Mutex + monitors map[string]*peerMonitor + + stopped bool + monitorWG sync.WaitGroup + updaterWG sync.WaitGroup + stopCh chan struct{} + statusUpdatesCh chan *v1alpha1.Peer + + logger *logrus.Entry +} + +func (m *peerMonitor) Peer() v1alpha1.Peer { + m.lock.Lock() + defer m.lock.Unlock() + + return *m.pr +} + +func (m *peerMonitor) Start() { + defer m.wg.Done() + + ticker := time.NewTicker(heartbeatInterval) + defer ticker.Stop() + + backoffConfig := backoff.NewExponentialBackOff() + + reachable := false + reachableCond := metav1.Condition{ + Type: v1alpha1.PeerReachable, + Status: metav1.ConditionFalse, + Reason: "Heartbeat", + } + + for { + select { + case <-m.stopCh: + return + default: + break + } + + err := backoff.Retry(m.client.GetHeartbeat, backoffConfig) + if heartbeatOK := err == nil; heartbeatOK != reachable { + m.logger.Infof("Heartbeat result: %v", heartbeatOK) + + if heartbeatOK { + reachableCond.Status = metav1.ConditionTrue + backoffConfig.MaxElapsedTime = heartbeatInterval + } else { + reachableCond.Status = metav1.ConditionFalse + backoffConfig.MaxElapsedTime = 0 + } + + reachable = heartbeatOK + + m.lock.Lock() + meta.SetStatusCondition(&m.pr.Status.Conditions, reachableCond) + m.lock.Unlock() + + m.statusCallback(m.pr) + } + + // wait till it's time for next heartbeat round + <-ticker.C + } +} + +func (m *peerMonitor) Stop() { + close(m.stopCh) +} + +// AddPeer defines a new route target for egress dataplane connections. +func (m *peerManager) AddPeer(pr *v1alpha1.Peer) { + m.logger.Infof("Adding peer '%s'.", pr.Name) + + m.lock.Lock() + defer m.lock.Unlock() + + if m.stopped { + return + } + + monitor, ok := m.monitors[pr.Name] + if !ok || peerChanged(monitor.pr, pr) { + m.monitors[pr.Name] = newPeerMonitor(pr, m) + } +} + +// DeletePeer removes the possibility for egress dataplane connections to be routed to a given peer. +func (m *peerManager) DeletePeer(name string) { + m.logger.Infof("Deleting peer '%s'.", name) + + m.lock.Lock() + defer m.lock.Unlock() + delete(m.monitors, name) +} + +// Name of the peer monitor runnable. +func (m *peerManager) Name() string { + return "peerManager" +} + +func (m *peerManager) SetStatusCallback(callback func(*v1alpha1.Peer)) { + m.statusCallback = callback +} + +// Start the peer manager. +func (m *peerManager) Start() error { + m.updaterWG.Add(1) + defer m.updaterWG.Done() + + for { + select { + case <-m.stopCh: + return nil + case pr := <-m.statusUpdatesCh: + // retry loop + for { + m.lock.Lock() + monitor, ok := m.monitors[pr.Name] + m.lock.Unlock() + + if !ok { + continue + } + + currPeer := monitor.Peer() + + if m.statusCallback != nil { + m.statusCallback(&currPeer) + } else { + // CRD-mode + err := m.client.Status().Update(context.Background(), &currPeer) + if err != nil { + m.logger.Warnf("Cannot update peer '%s' status: %v", pr.Name, err) + continue + } + } + + break + } + } + } +} + +// Stop the peer manager. +func (m *peerManager) Stop() error { + m.lock.Lock() + defer m.lock.Unlock() + + for _, monitor := range m.monitors { + monitor.Stop() + } + m.monitorWG.Wait() + + close(m.stopCh) + m.updaterWG.Wait() + + m.stopped = true + return nil +} + +// GracefulStop does a graceful stop of the peer manager. +func (m *peerManager) GracefulStop() error { + return m.Stop() +} + +func (m *peerManager) queueStatusUpdate(pr *v1alpha1.Peer) { + m.statusUpdatesCh <- pr +} + +func peerChanged(pr1, pr2 *v1alpha1.Peer) bool { + if len(pr1.Spec.Gateways) != len(pr2.Spec.Gateways) { + return true + } + + for i := 0; i < len(pr1.Spec.Gateways); i++ { + if pr1.Spec.Gateways[i].Host != pr2.Spec.Gateways[i].Host { + return true + } + if pr1.Spec.Gateways[i].Port != pr2.Spec.Gateways[i].Port { + return true + } + } + + return false +} + +func newPeerMonitor(pr *v1alpha1.Peer, manager *peerManager) *peerMonitor { + logger := logrus.WithFields(logrus.Fields{ + "component": "controlplane.control.peerMonitor", + "peer": pr.Name, + }) + + monitor := &peerMonitor{ + pr: pr, + client: peer.NewClient(pr, manager.peerTLS.ClientConfig(pr.Name)), + statusCallback: manager.queueStatusUpdate, + wg: &manager.monitorWG, + stopCh: make(chan struct{}), + logger: logger, + } + + manager.monitorWG.Add(1) + go monitor.Start() + return monitor +} + +// newPeerManager returns a new empty peerManager. +func newPeerManager(cl client.Client, peerTLS *tls.ParsedCertData) peerManager { + logger := logrus.WithField("component", "controlplane.control.peerManager") + + return peerManager{ + client: cl, + peerTLS: peerTLS, + monitors: make(map[string]*peerMonitor), + stopCh: make(chan struct{}), + statusUpdatesCh: make(chan *v1alpha1.Peer), + logger: logger, + } +} diff --git a/pkg/controlplane/instance.go b/pkg/controlplane/instance.go index a86695e0a..1f4914beb 100644 --- a/pkg/controlplane/instance.go +++ b/pkg/controlplane/instance.go @@ -100,6 +100,7 @@ func (cp *Instance) CreatePeer(peer *cpstore.Peer) error { k8sPeer := toK8SPeer(peer) cp.authzManager.AddPeer(k8sPeer) + cp.controlManager.AddPeer(k8sPeer) return cp.xdsManager.AddPeer(k8sPeer) } @@ -116,6 +117,7 @@ func (cp *Instance) UpdatePeer(peer *cpstore.Peer) error { k8sPeer := toK8SPeer(peer) cp.authzManager.AddPeer(k8sPeer) + cp.controlManager.AddPeer(k8sPeer) return cp.xdsManager.AddPeer(k8sPeer) } @@ -138,6 +140,7 @@ func (cp *Instance) DeletePeer(name string) (*cpstore.Peer, error) { } cp.authzManager.DeletePeer(name) + cp.controlManager.DeletePeer(name) err = cp.xdsManager.DeletePeer(name) if err != nil { diff --git a/pkg/controlplane/peer/client.go b/pkg/controlplane/peer/client.go index 0c05ebefa..fa66bc684 100644 --- a/pkg/controlplane/peer/client.go +++ b/pkg/controlplane/peer/client.go @@ -19,8 +19,6 @@ import ( "errors" "fmt" "net/http" - "sync" - "time" "github.com/sirupsen/logrus" @@ -29,23 +27,11 @@ import ( "github.com/clusterlink-net/clusterlink/pkg/util/jsonapi" ) -const ( - // heartbeatInterval is the time lapse between consecutive heartbeat requests to a responding peer. - heartbeatInterval = 10 * time.Second - // heartbeatRetransmissionTime is the time lapse between consecutive heartbeat requests to a non-responding peer. - heartbeatRetransmissionTime = 60 * time.Second -) - // Client for accessing a remote peer. type Client struct { // jsonapi clients for connecting to the remote peer (one per each gateway) - clients []*jsonapi.Client - lastSeen time.Time - active bool - stopSignal chan struct{} - lock sync.RWMutex - logger *logrus.Entry - peerStatusCallback func(bool) // Callback function for notifying changes in peer + clients []*jsonapi.Client + logger *logrus.Entry } // RemoteServerAuthorizationResponse represents an authorization response received from a remote controlplane server. @@ -105,39 +91,10 @@ func (c *Client) Authorize(req *api.AuthorizationRequest) (*RemoteServerAuthoriz return resp, nil } -// IsActive returns if the peer is active or not. -func (c *Client) IsActive() bool { - c.lock.RLock() - defer c.lock.RUnlock() - return c.active -} - -// setActive the peer status (active or not). -func (c *Client) setActive(active bool) { - c.lock.Lock() - activePrevState := c.active - c.active = active - if active || c.lastSeen.IsZero() { - c.lastSeen = time.Now() - } - c.lock.Unlock() - - // Update other components like the policy engine with the peer status. - if active != activePrevState && c.peerStatusCallback != nil { - c.peerStatusCallback(active) - } -} - // GetHeartbeat get a heartbeat from other peers. -func (c *Client) getHeartbeat() error { +func (c *Client) GetHeartbeat() error { var retErr error - // copy peer clients array aside - peerClients := make([]*jsonapi.Client, len(c.clients)) - c.lock.RLock() - copy(peerClients, c.clients) - c.lock.RUnlock() - - for _, client := range peerClients { + for _, client := range c.clients { serverResp, err := client.Get(api.HeartbeatPath) if err != nil { retErr = errors.Join(retErr, err) @@ -155,61 +112,18 @@ func (c *Client) getHeartbeat() error { return retErr // Return an error if all client targets are unreachable } -// StopMonitor send signal to stop heartbeat monitor. -func (c *Client) StopMonitor() { - close(c.stopSignal) -} - -// heartbeatMonitor checks all peers for responsiveness, every fixed amount of time. -func (c *Client) heartbeatMonitor() { - c.logger.Info("Start sending heartbeat requests to peer") - ticker := time.NewTicker(heartbeatInterval) - defer ticker.Stop() - for { - select { - case <-c.stopSignal: - return - default: - t := time.Now() - if c.IsActive() || (!c.IsActive() && (t.Sub(c.lastSeen) > heartbeatRetransmissionTime)) || - c.lastSeen.IsZero() { - if err := c.getHeartbeat(); err != nil { - if c.IsActive() { - c.logger.Errorf("Unable to get heartbeat from peer error: %v", err.Error()) - c.setActive(false) - } - } else { - c.setActive(true) - } - } - } - // wait till it's time for next heartbeat round - <-ticker.C - } -} - -// SetPeerStatusCallback set the peerStatusCallback. -func (c *Client) SetPeerStatusCallback(callback func(bool)) { - c.peerStatusCallback = callback -} - // NewClient returns a new Peer API client. func NewClient(peer *v1alpha1.Peer, tlsConfig *tls.Config) *Client { clients := make([]*jsonapi.Client, len(peer.Spec.Gateways)) for i, endpoint := range peer.Spec.Gateways { clients[i] = jsonapi.NewClient(endpoint.Host, endpoint.Port, tlsConfig) } - clnt := &Client{ - clients: clients, - active: false, - lastSeen: time.Time{}, - stopSignal: make(chan struct{}), + + return &Client{ + clients: clients, logger: logrus.WithFields(logrus.Fields{ "component": "controlplane.peer.client", "peer": peer, }), } - - go clnt.heartbeatMonitor() - return clnt } diff --git a/pkg/operator/controller/instance_controller.go b/pkg/operator/controller/instance_controller.go index 3dcfa0205..37e4ab6d5 100644 --- a/pkg/operator/controller/instance_controller.go +++ b/pkg/operator/controller/instance_controller.go @@ -72,6 +72,7 @@ type InstanceReconciler struct { // +kubebuilder:rbac:groups="",resources=nodes,verbs=list;get;watch // +kubebuilder:rbac:groups="",resources=pods,verbs=list;get;watch // +kubebuilder:rbac:groups=clusterlink.net,resources=imports,verbs=update +// +kubebuilder:rbac:groups=clusterlink.net,resources=peers/status,verbs=update // +kubebuilder:rbac:groups="apps",resources=deployments,verbs=list;get;watch;create;update;patch;delete //nolint:lll // Ignore long line warning for Kubebuilder command. // +kubebuilder:rbac:groups="rbac.authorization.k8s.io",resources=clusterroles;clusterrolebindings,verbs=list;get;watch;create;update;patch;delete @@ -462,6 +463,11 @@ func (r *InstanceReconciler) createAccessControl(ctx context.Context, name, name Resources: []string{"imports"}, Verbs: []string{"update"}, }, + { + APIGroups: []string{"clusterlink.net"}, + Resources: []string{"peers/status"}, + Verbs: []string{"update"}, + }, }, }