Skip to content

Commit

Permalink
Merge pull request #454 from orozery/control-peer
Browse files Browse the repository at this point in the history
controlplane/control: Change heartbeat strategy
  • Loading branch information
orozery authored Mar 27, 2024
2 parents 3bda509 + c337c9b commit 08f2167
Show file tree
Hide file tree
Showing 3 changed files with 41 additions and 24 deletions.
1 change: 0 additions & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@ go 1.20

require (
github.com/bombsimon/logrusr/v4 v4.1.0
github.com/cenkalti/backoff/v4 v4.2.1
github.com/envoyproxy/go-control-plane v0.12.0
github.com/go-chi/chi v4.1.2+incompatible
github.com/google/uuid v1.6.0
Expand Down
2 changes: 0 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,6 @@ github.com/beorn7/perks v1.0.1 h1:VlbKKnNfV8bJzeqoa4cOKqO6bYr3WgKZxO8Z16+hsOM=
github.com/beorn7/perks v1.0.1/go.mod h1:G2ZrVWU2WbWT9wwq4/hrbKbnv/1ERSJQ0ibhJ6rlkpw=
github.com/bombsimon/logrusr/v4 v4.1.0 h1:uZNPbwusB0eUXlO8hIUwStE6Lr5bLN6IgYgG+75kuh4=
github.com/bombsimon/logrusr/v4 v4.1.0/go.mod h1:pjfHC5e59CvjTBIU3V3sGhFWFAnsnhOR03TRc6im0l8=
github.com/cenkalti/backoff/v4 v4.2.1 h1:y4OZtCnogmCPw98Zjyt5a6+QwPLGkiQsYW5oUqylYbM=
github.com/cenkalti/backoff/v4 v4.2.1/go.mod h1:Y3VNntkOUPxTVeUxJ/G5vcM//AlwfmyYozVcomhLiZE=
github.com/census-instrumentation/opencensus-proto v0.4.1 h1:iKLQ0xPNFxR/2hzXZMrBo8f1j86j5WHzznCCQxV/b8g=
github.com/census-instrumentation/opencensus-proto v0.4.1/go.mod h1:4T9NM4+4Vw91VeyqjLS6ao50K5bOcLKN6Q42XnYaRYw=
github.com/cespare/xxhash/v2 v2.2.0 h1:DC2CZ1Ep5Y4k3ZQ899DldepgrayRUGE6BBZ/cd9Cj44=
Expand Down
62 changes: 41 additions & 21 deletions pkg/controlplane/control/peer.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@ import (
"sync"
"time"

"github.com/cenkalti/backoff/v4"
"github.com/sirupsen/logrus"
"k8s.io/apimachinery/pkg/api/meta"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
Expand All @@ -30,7 +29,14 @@ import (
)

const (
heartbeatInterval = 10 * time.Second
// time interval between health check requests when peer has recently responded.
healthyInterval = 1 * time.Second
// time interval between health check requests when peer has not recently responded.
unhealthyInterval = 10 * time.Second
// number of consecutive successful healthchecks for a peer to be declared reachable.
healthyThreshold = 3
// number of consecutive unsuccessful healthchecks for a peer to be declared unreachable.
unhealthyThreshold = 5
)

// peerMonitor monitors a single peer.
Expand Down Expand Up @@ -81,12 +87,12 @@ func (m *peerMonitor) SetPeer(pr *v1alpha1.Peer) {
func (m *peerMonitor) Start() {
defer m.wg.Done()

ticker := time.NewTicker(heartbeatInterval)
ticker := time.NewTicker(healthyInterval)
defer ticker.Stop()

backoffConfig := backoff.NewExponentialBackOff()

reachable := false
healthy := false
strikeCount := 0
threshold := 1 // require only a single heartbeat on startup
reachableCond := metav1.Condition{
Type: v1alpha1.PeerReachable,
Status: metav1.ConditionFalse,
Expand All @@ -101,28 +107,42 @@ func (m *peerMonitor) Start() {
break
}

err := backoff.Retry(m.client.GetHeartbeat, backoffConfig)
if heartbeatOK := err == nil; heartbeatOK != reachable {
m.logger.Infof("Heartbeat result: %v", heartbeatOK)

heartbeatOK := m.client.GetHeartbeat() == nil
if healthy == heartbeatOK {
strikeCount = 0
} else {
if heartbeatOK {
reachableCond.Status = metav1.ConditionTrue
backoffConfig.MaxElapsedTime = heartbeatInterval
} else {
reachableCond.Status = metav1.ConditionFalse
backoffConfig.MaxElapsedTime = 0
// switch to healthy interval (even though not yet declared healthy)
ticker.Reset(healthyInterval)
}
strikeCount++
}

reachable = heartbeatOK
if strikeCount < threshold {
<-ticker.C
continue
}

m.lock.Lock()
meta.SetStatusCondition(&m.pr.Status.Conditions, reachableCond)
m.lock.Unlock()
m.logger.Infof("Peer reachable status changed to: %v", heartbeatOK)

// callback for non-CRD mode, which does not watch peers/status
m.statusCallback(m.pr)
if heartbeatOK {
reachableCond.Status = metav1.ConditionTrue
threshold = unhealthyThreshold
} else {
reachableCond.Status = metav1.ConditionFalse
threshold = healthyThreshold
ticker.Reset(unhealthyInterval)
}

strikeCount = 0
healthy = heartbeatOK

m.lock.Lock()
meta.SetStatusCondition(&m.pr.Status.Conditions, reachableCond)
m.lock.Unlock()

m.statusCallback(m.pr)

// wait till it's time for next heartbeat round
<-ticker.C
}
Expand Down

0 comments on commit 08f2167

Please sign in to comment.