Skip to content

Commit

Permalink
Merge pull request #5451 from jnummelin/fix/etcd-member-full-resync
Browse files Browse the repository at this point in the history
EtcdMember full resync on leader change
  • Loading branch information
jnummelin authored Jan 22, 2025
2 parents cae4b5f + de10c8a commit 8b01a4b
Show file tree
Hide file tree
Showing 3 changed files with 152 additions and 25 deletions.
57 changes: 49 additions & 8 deletions inttest/etcdmember/etcdmember_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,17 +20,21 @@ import (
"context"
"encoding/json"
"fmt"
"strings"
"testing"
"time"

"github.com/k0sproject/k0s/inttest/common"
"github.com/stretchr/testify/suite"
"golang.org/x/sync/errgroup"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"

etcdv1beta1 "github.com/k0sproject/k0s/pkg/apis/etcd/v1beta1"
"github.com/k0sproject/k0s/pkg/kubernetes/watch"
)

const basePath = "apis/etcd.k0sproject.io/v1beta1/etcdmembers/%s"

type EtcdMemberSuite struct {
common.BootlooseSuite
}
Expand Down Expand Up @@ -81,12 +85,10 @@ func (s *EtcdMemberSuite) TestDeregistration() {
etcdMemberClient, err := s.EtcdMemberClient(s.ControllerNode(0))

// Check each node is present in the etcd cluster and reports joined state
expectedObjects := []string{"controller0", "controller1", "controller2"}

// Use errorgroup to wait for all the statuses to be updated
eg := errgroup.Group{}

for i, obj := range expectedObjects {
for i, obj := range nodes {
eg.Go(func() error {
s.T().Logf("verifying initial status of %s", obj)
em := &etcdv1beta1.EtcdMember{}
Expand Down Expand Up @@ -136,7 +138,7 @@ func (s *EtcdMemberSuite) TestDeregistration() {

// Make sure the EtcdMember CR status is successfully updated
em := s.getMember(ctx, "controller2")
s.Require().Equal("Success", em.Status.ReconcileStatus)
s.Require().Equal(etcdv1beta1.ReconcileStatusSuccess, em.Status.ReconcileStatus)
s.Require().Equal(etcdv1beta1.ConditionFalse, em.Status.GetCondition(etcdv1beta1.ConditionTypeJoined).Status)

// Stop k0s and reset the node
Expand Down Expand Up @@ -165,12 +167,46 @@ func (s *EtcdMemberSuite) TestDeregistration() {
s.Require().NoError(err)
s.Require().Equal(em.Status.PeerAddress, s.GetControllerIPAddress(2))

// Figure out what node is the leader and mark it as leaving
leader := s.getLeader(ctx)
s.leaveNode(ctx, leader)

}

const basePath = "apis/etcd.k0sproject.io/v1beta1/etcdmembers/%s"
// getLeader returns the name of the current k0s leader node by comparing
// the holder identity of the "k0s-endpoint-reconciler" lease to the per node leases
func (s *EtcdMemberSuite) getLeader(ctx context.Context) string {
// First we need to get all leases in "kube-node-lease" NS
kc, err := s.KubeClient(s.ControllerNode(0))
s.Require().NoError(err)
leases, err := kc.CoordinationV1().Leases("kube-node-lease").List(ctx, metav1.ListOptions{})
s.Require().NoError(err)
leaseIDs := make(map[string]string)
for _, l := range leases.Items {
if strings.Contains(l.Name, "k0s-ctrl") {
node := strings.ReplaceAll(l.Name, "k0s-ctrl-", "")
leaseID := l.Spec.HolderIdentity
leaseIDs[*leaseID] = node
}
}
// Next we need to match the "k0s-endpoint-reconciler" lease holder identity to a node name
leaderLease, err := kc.CoordinationV1().Leases("kube-node-lease").Get(ctx, "k0s-endpoint-reconciler", metav1.GetOptions{})
s.Require().NoError(err)
return leaseIDs[*leaderLease.Spec.HolderIdentity]

}

func (s *EtcdMemberSuite) leaveNode(ctx context.Context, name string) {
kc, err := s.KubeClient(s.ControllerNode(0))
// Get kube client to some other node that we're marking to leave
n := ""
for _, node := range nodes {
if node != name {
n = node
break
}
}
s.T().Logf("using %s as API server to mark %s for leaving", n, name)
kc, err := s.KubeClient(n)
s.Require().NoError(err)

// Patch the EtcdMember CR to set the Leave flag
Expand All @@ -183,13 +219,16 @@ func (s *EtcdMemberSuite) leaveNode(ctx context.Context, name string) {
err = common.Poll(ctx, func(ctx context.Context) (done bool, err error) {
em := &etcdv1beta1.EtcdMember{}
err = kc.RESTClient().Get().AbsPath(fmt.Sprintf(basePath, name)).Do(ctx).Into(em)
s.Require().NoError(err)
if err != nil {
// We need to retry on errors since it's very common to hit "etcd leader changed" errors when we're messing with the cluster
s.T().Logf("error getting EtcdMember %s, gonna retry: %v", name, err)
return false, nil
}

c := em.Status.GetCondition(etcdv1beta1.ConditionTypeJoined)
if c == nil {
return false, nil
}
s.T().Logf("JoinStatus = %s, waiting for %s", c.Status, etcdv1beta1.ConditionFalse)
return c.Status == etcdv1beta1.ConditionFalse, nil

})
Expand All @@ -208,6 +247,8 @@ func (s *EtcdMemberSuite) getMember(ctx context.Context, name string) *etcdv1bet
return em
}

var nodes = []string{"controller0", "controller1", "controller2"}

func TestEtcdMemberSuite(t *testing.T) {
s := EtcdMemberSuite{
common.BootlooseSuite{
Expand Down
5 changes: 5 additions & 0 deletions pkg/apis/etcd/v1beta1/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,11 @@ const (
JoinStatusLeft JoinStatus = "Left"
)

const (
ReconcileStatusSuccess = "Success"
ReconcileStatusFailed = "Failed"
)

// EtcdMemberSpec defines the desired state of EtcdMember
type EtcdMemberSpec struct {
// Leave is a flag to indicate that the member should be removed from the cluster
Expand Down
115 changes: 98 additions & 17 deletions pkg/component/controller/etcd_member_reconciler.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,12 @@ import (
"errors"
"fmt"
"strconv"
"strings"
"sync"
"sync/atomic"
"time"

"github.com/avast/retry-go"
etcdv1beta1 "github.com/k0sproject/k0s/pkg/apis/etcd/v1beta1"
"github.com/k0sproject/k0s/pkg/apis/k0s/v1beta1"
etcdmemberclient "github.com/k0sproject/k0s/pkg/client/clientset/typed/etcd/v1beta1"
Expand Down Expand Up @@ -56,21 +60,60 @@ type EtcdMemberReconciler struct {
etcdConfig *v1beta1.EtcdConfig
etcdMemberClient etcdmemberclient.EtcdMemberInterface
leaderElector leaderelector.Interface
mux sync.Mutex
started atomic.Bool
}

func (e *EtcdMemberReconciler) Init(_ context.Context) error {
return nil
}

// resync does a full resync of the etcd members when the leader changes
// This is needed to ensure all the member objects are in sync with the actual etcd cluster
// We might get stale state if we remove the current leader as the leader will essentially
// remove itself from the etcd cluster and after that tries to update the member object.
func (e *EtcdMemberReconciler) resync(ctx context.Context) error {
e.mux.Lock()
defer e.mux.Unlock()

if !e.started.Load() {
logrus.WithField("component", "EtcdMemberReconciler").Debug("Not started yet!!?!?")
return nil
}

// Loop through all the members and run reconcile on them
// Use high timeout as etcd/api could be a bit slow when the leader changes
ctx, cancel := context.WithTimeout(ctx, 1*time.Minute)
defer cancel()
members, err := e.etcdMemberClient.List(ctx, metav1.ListOptions{})
if err != nil {
return err
}
for _, member := range members.Items {
e.reconcileMember(ctx, &member)
}
return nil
}

func (e *EtcdMemberReconciler) Start(ctx context.Context) error {
log := logrus.WithField("component", "etcdMemberReconciler")
log := logrus.WithField("component", "EtcdMemberReconciler")

etcdMemberClient, err := e.clientFactory.GetEtcdMemberClient()
if err != nil {
return err
}
e.etcdMemberClient = etcdMemberClient

e.leaderElector.AddAcquiredLeaseCallback(func() {
// Spin up resync in separate routine to not block the leader election
go func() {
log.Info("leader lease acquired, starting resync")
if err := e.resync(ctx); err != nil {
log.WithError(err).Error("failed to resync etcd members")
}
}()
})

// Run the watch in go routine so it keeps running till the context ends
go func() {
err = e.waitForCRD(ctx)
Expand All @@ -80,10 +123,25 @@ func (e *EtcdMemberReconciler) Start(ctx context.Context) error {
}

// Create the object for this node
err = e.createMemberObject(ctx)
// Need to be done in retry loop as during the initial startup the etcd might not be stable
err = retry.Do(
func() error {
return e.createMemberObject(ctx)
},
retry.Delay(3*time.Second),
retry.Attempts(5),
retry.Context(ctx),
retry.LastErrorOnly(true),
retry.RetryIf(func(retryErr error) bool {
log.Debugf("retrying createMemberObject: %v", retryErr)
// During etcd cluster bootstrap, it's common to see k8s giving 500 errors due to etcd timeouts
return apierrors.IsInternalError(retryErr)
}),
)
if err != nil {
log.WithError(err).Error("failed to create EtcdMember object")
log.WithError(err).Error("failed to create EtcdMember object for this controller")
}
e.started.Store(true)
var lastObservedVersion string
err = watch.EtcdMembers(etcdMemberClient).
WithErrorCallback(func(err error) (time.Duration, error) {
Expand All @@ -102,7 +160,14 @@ func (e *EtcdMemberReconciler) Start(ctx context.Context) error {
}).
Until(ctx, func(member *etcdv1beta1.EtcdMember) (bool, error) {
lastObservedVersion = member.ResourceVersion
e.reconcileMember(ctx, member)
log.Debugf("watch triggered on %s", member.Name)
if e.leaderElector.IsLeader() {
if err := e.resync(ctx); err != nil {
log.WithError(err).Error("failed to resync etcd members")
}
} else {
log.Debug("Not the leader, skipping")
}
// Never stop the watch
return false, nil
})
Expand Down Expand Up @@ -225,8 +290,6 @@ func (e *EtcdMemberReconciler) createMemberObject(ctx context.Context) error {
}
}

em.Status.PeerAddress = e.etcdConfig.PeerAddress
em.Status.MemberID = memberIDStr
em.Spec.Leave = false

log.Debug("EtcdMember object already exists, updating it")
Expand All @@ -235,6 +298,8 @@ func (e *EtcdMemberReconciler) createMemberObject(ctx context.Context) error {
if err != nil {
return err
}
em.Status.PeerAddress = e.etcdConfig.PeerAddress
em.Status.MemberID = memberIDStr
em.Status.SetCondition(etcdv1beta1.ConditionTypeJoined, etcdv1beta1.ConditionTrue, "Member joined", time.Now())
_, err = e.etcdMemberClient.UpdateStatus(ctx, em, metav1.UpdateOptions{})
if err != nil {
Expand All @@ -253,12 +318,7 @@ func (e *EtcdMemberReconciler) reconcileMember(ctx context.Context, member *etcd
"peerAddress": member.Status.PeerAddress,
})

if !e.leaderElector.IsLeader() {
log.Debug("not the leader, skipping reconcile")
return
}

log.Debugf("reconciling EtcdMember: %+v", member)
log.Debugf("reconciling EtcdMember: %s", member.Name)

if !member.Spec.Leave {
log.Debug("member not marked for leave, no action needed")
Expand All @@ -268,7 +328,7 @@ func (e *EtcdMemberReconciler) reconcileMember(ctx context.Context, member *etcd
etcdClient, err := etcd.NewClient(e.k0sVars.CertRootDir, e.k0sVars.EtcdCertDir, e.etcdConfig)
if err != nil {
log.WithError(err).Warn("failed to create etcd client")
member.Status.ReconcileStatus = "Failed"
member.Status.ReconcileStatus = etcdv1beta1.ReconcileStatusFailed
member.Status.Message = err.Error()
if _, err = e.etcdMemberClient.UpdateStatus(ctx, member, metav1.UpdateOptions{}); err != nil {
log.WithError(err).Error("failed to update member state")
Expand All @@ -283,7 +343,7 @@ func (e *EtcdMemberReconciler) reconcileMember(ctx context.Context, member *etcd
// Verify that the member is actually still present in etcd
members, err := etcdClient.ListMembers(ctx)
if err != nil {
member.Status.ReconcileStatus = "Failed"
member.Status.ReconcileStatus = etcdv1beta1.ReconcileStatusFailed
member.Status.Message = err.Error()
if _, err = e.etcdMemberClient.UpdateStatus(ctx, member, metav1.UpdateOptions{}); err != nil {
log.WithError(err).Error("failed to update member state")
Expand All @@ -297,6 +357,7 @@ func (e *EtcdMemberReconciler) reconcileMember(ctx context.Context, member *etcd
if !ok {
log.Debug("member marked for leave but not in actual member list, updating state to reflect that")
member.Status.SetCondition(etcdv1beta1.ConditionTypeJoined, etcdv1beta1.ConditionFalse, member.Status.Message, time.Now())
member.Status.ReconcileStatus = etcdv1beta1.ReconcileStatusSuccess
member, err = e.etcdMemberClient.UpdateStatus(ctx, member, metav1.UpdateOptions{})
if err != nil {
log.WithError(err).Error("failed to update EtcdMember status")
Expand All @@ -316,11 +377,31 @@ func (e *EtcdMemberReconciler) reconcileMember(ctx context.Context, member *etcd
return
}

if err = etcdClient.DeleteMember(ctx, memberID); err != nil {
err = retry.Do(func() error {
return etcdClient.DeleteMember(ctx, memberID)
},
retry.Delay(5*time.Second),
retry.LastErrorOnly(true),
retry.Attempts(5),
retry.Context(ctx),
retry.RetryIf(func(err error) bool {
// In case etcd reports unhealthy cluster, retry
msg := err.Error()
switch {
case strings.Contains(msg, "unhealthy cluster"):
return true
case strings.Contains(msg, "leader changed"):
return true
}
return false
}),
)

if err != nil {
logrus.
WithError(err).
Errorf("Failed to delete etcd peer from cluster")
member.Status.ReconcileStatus = "Failed"
member.Status.ReconcileStatus = etcdv1beta1.ReconcileStatusFailed
member.Status.Message = err.Error()
_, err = e.etcdMemberClient.UpdateStatus(ctx, member, metav1.UpdateOptions{})
if err != nil {
Expand All @@ -331,7 +412,7 @@ func (e *EtcdMemberReconciler) reconcileMember(ctx context.Context, member *etcd

// Peer removed successfully, update status
log.Info("reconcile succeeded")
member.Status.ReconcileStatus = "Success"
member.Status.ReconcileStatus = etcdv1beta1.ReconcileStatusSuccess
member.Status.Message = "Member removed from cluster"
member.Status.SetCondition(etcdv1beta1.ConditionTypeJoined, etcdv1beta1.ConditionFalse, member.Status.Message, time.Now())
_, err = e.etcdMemberClient.UpdateStatus(ctx, member, metav1.UpdateOptions{})
Expand Down

0 comments on commit 8b01a4b

Please sign in to comment.