From de10c8a3a1e1745683fdba00559ce7a313353293 Mon Sep 17 00:00:00 2001 From: Jussi Nummelin Date: Tue, 21 Jan 2025 11:24:14 +0200 Subject: [PATCH] Do full EtcdMember re-sync on leader changes Also changes the logic a bit to do full re-sync on any EtcdMember changes. This ensures that we get the EtcdMember states correct even when removing leader nodes. The actual removal is now wrapped in retry loop to ensure we are not hit by Etcd ["cooldown" limits](https://github.com/etcd-io/etcd/blob/5ccbeec769685826424aa147aed452ea4ec52673/server/etcdserver/server.go#L91-L93) when removing peers. Signed-off-by: Jussi Nummelin --- inttest/etcdmember/etcdmember_test.go | 57 +++++++-- pkg/apis/etcd/v1beta1/types.go | 5 + .../controller/etcd_member_reconciler.go | 115 +++++++++++++++--- 3 files changed, 152 insertions(+), 25 deletions(-) diff --git a/inttest/etcdmember/etcdmember_test.go b/inttest/etcdmember/etcdmember_test.go index 9d96f6dcc42b..1f898143877c 100644 --- a/inttest/etcdmember/etcdmember_test.go +++ b/inttest/etcdmember/etcdmember_test.go @@ -20,17 +20,21 @@ import ( "context" "encoding/json" "fmt" + "strings" "testing" "time" "github.com/k0sproject/k0s/inttest/common" "github.com/stretchr/testify/suite" "golang.org/x/sync/errgroup" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" etcdv1beta1 "github.com/k0sproject/k0s/pkg/apis/etcd/v1beta1" "github.com/k0sproject/k0s/pkg/kubernetes/watch" ) +const basePath = "apis/etcd.k0sproject.io/v1beta1/etcdmembers/%s" + type EtcdMemberSuite struct { common.BootlooseSuite } @@ -81,12 +85,10 @@ func (s *EtcdMemberSuite) TestDeregistration() { etcdMemberClient, err := s.EtcdMemberClient(s.ControllerNode(0)) // Check each node is present in the etcd cluster and reports joined state - expectedObjects := []string{"controller0", "controller1", "controller2"} - // Use errorgroup to wait for all the statuses to be updated eg := errgroup.Group{} - for i, obj := range expectedObjects { + for i, obj := range nodes { eg.Go(func() error { s.T().Logf("verifying initial status of %s", obj) em := &etcdv1beta1.EtcdMember{} @@ -136,7 +138,7 @@ func (s *EtcdMemberSuite) TestDeregistration() { // Make sure the EtcdMember CR status is successfully updated em := s.getMember(ctx, "controller2") - s.Require().Equal("Success", em.Status.ReconcileStatus) + s.Require().Equal(etcdv1beta1.ReconcileStatusSuccess, em.Status.ReconcileStatus) s.Require().Equal(etcdv1beta1.ConditionFalse, em.Status.GetCondition(etcdv1beta1.ConditionTypeJoined).Status) // Stop k0s and reset the node @@ -165,12 +167,46 @@ func (s *EtcdMemberSuite) TestDeregistration() { s.Require().NoError(err) s.Require().Equal(em.Status.PeerAddress, s.GetControllerIPAddress(2)) + // Figure out what node is the leader and mark it as leaving + leader := s.getLeader(ctx) + s.leaveNode(ctx, leader) + } -const basePath = "apis/etcd.k0sproject.io/v1beta1/etcdmembers/%s" +// getLeader returns the name of the current k0s leader node by comparing +// the holder identity of the "k0s-endpoint-reconciler" lease to the per node leases +func (s *EtcdMemberSuite) getLeader(ctx context.Context) string { + // First we need to get all leases in "kube-node-lease" NS + kc, err := s.KubeClient(s.ControllerNode(0)) + s.Require().NoError(err) + leases, err := kc.CoordinationV1().Leases("kube-node-lease").List(ctx, metav1.ListOptions{}) + s.Require().NoError(err) + leaseIDs := make(map[string]string) + for _, l := range leases.Items { + if strings.Contains(l.Name, "k0s-ctrl") { + node := strings.ReplaceAll(l.Name, "k0s-ctrl-", "") + leaseID := l.Spec.HolderIdentity + leaseIDs[*leaseID] = node + } + } + // Next we need to match the "k0s-endpoint-reconciler" lease holder identity to a node name + leaderLease, err := kc.CoordinationV1().Leases("kube-node-lease").Get(ctx, "k0s-endpoint-reconciler", metav1.GetOptions{}) + s.Require().NoError(err) + return leaseIDs[*leaderLease.Spec.HolderIdentity] + +} func (s *EtcdMemberSuite) leaveNode(ctx context.Context, name string) { - kc, err := s.KubeClient(s.ControllerNode(0)) + // Get kube client to some other node that we're marking to leave + n := "" + for _, node := range nodes { + if node != name { + n = node + break + } + } + s.T().Logf("using %s as API server to mark %s for leaving", n, name) + kc, err := s.KubeClient(n) s.Require().NoError(err) // Patch the EtcdMember CR to set the Leave flag @@ -183,13 +219,16 @@ func (s *EtcdMemberSuite) leaveNode(ctx context.Context, name string) { err = common.Poll(ctx, func(ctx context.Context) (done bool, err error) { em := &etcdv1beta1.EtcdMember{} err = kc.RESTClient().Get().AbsPath(fmt.Sprintf(basePath, name)).Do(ctx).Into(em) - s.Require().NoError(err) + if err != nil { + // We need to retry on errors since it's very common to hit "etcd leader changed" errors when we're messing with the cluster + s.T().Logf("error getting EtcdMember %s, gonna retry: %v", name, err) + return false, nil + } c := em.Status.GetCondition(etcdv1beta1.ConditionTypeJoined) if c == nil { return false, nil } - s.T().Logf("JoinStatus = %s, waiting for %s", c.Status, etcdv1beta1.ConditionFalse) return c.Status == etcdv1beta1.ConditionFalse, nil }) @@ -208,6 +247,8 @@ func (s *EtcdMemberSuite) getMember(ctx context.Context, name string) *etcdv1bet return em } +var nodes = []string{"controller0", "controller1", "controller2"} + func TestEtcdMemberSuite(t *testing.T) { s := EtcdMemberSuite{ common.BootlooseSuite{ diff --git a/pkg/apis/etcd/v1beta1/types.go b/pkg/apis/etcd/v1beta1/types.go index 76f9359f1f66..85b117423338 100644 --- a/pkg/apis/etcd/v1beta1/types.go +++ b/pkg/apis/etcd/v1beta1/types.go @@ -51,6 +51,11 @@ const ( JoinStatusLeft JoinStatus = "Left" ) +const ( + ReconcileStatusSuccess = "Success" + ReconcileStatusFailed = "Failed" +) + // EtcdMemberSpec defines the desired state of EtcdMember type EtcdMemberSpec struct { // Leave is a flag to indicate that the member should be removed from the cluster diff --git a/pkg/component/controller/etcd_member_reconciler.go b/pkg/component/controller/etcd_member_reconciler.go index 81a0eb38ed8d..83ef66f9f464 100644 --- a/pkg/component/controller/etcd_member_reconciler.go +++ b/pkg/component/controller/etcd_member_reconciler.go @@ -21,8 +21,12 @@ import ( "errors" "fmt" "strconv" + "strings" + "sync" + "sync/atomic" "time" + "github.com/avast/retry-go" etcdv1beta1 "github.com/k0sproject/k0s/pkg/apis/etcd/v1beta1" "github.com/k0sproject/k0s/pkg/apis/k0s/v1beta1" etcdmemberclient "github.com/k0sproject/k0s/pkg/client/clientset/typed/etcd/v1beta1" @@ -56,14 +60,43 @@ type EtcdMemberReconciler struct { etcdConfig *v1beta1.EtcdConfig etcdMemberClient etcdmemberclient.EtcdMemberInterface leaderElector leaderelector.Interface + mux sync.Mutex + started atomic.Bool } func (e *EtcdMemberReconciler) Init(_ context.Context) error { return nil } +// resync does a full resync of the etcd members when the leader changes +// This is needed to ensure all the member objects are in sync with the actual etcd cluster +// We might get stale state if we remove the current leader as the leader will essentially +// remove itself from the etcd cluster and after that tries to update the member object. +func (e *EtcdMemberReconciler) resync(ctx context.Context) error { + e.mux.Lock() + defer e.mux.Unlock() + + if !e.started.Load() { + logrus.WithField("component", "EtcdMemberReconciler").Debug("Not started yet!!?!?") + return nil + } + + // Loop through all the members and run reconcile on them + // Use high timeout as etcd/api could be a bit slow when the leader changes + ctx, cancel := context.WithTimeout(ctx, 1*time.Minute) + defer cancel() + members, err := e.etcdMemberClient.List(ctx, metav1.ListOptions{}) + if err != nil { + return err + } + for _, member := range members.Items { + e.reconcileMember(ctx, &member) + } + return nil +} + func (e *EtcdMemberReconciler) Start(ctx context.Context) error { - log := logrus.WithField("component", "etcdMemberReconciler") + log := logrus.WithField("component", "EtcdMemberReconciler") etcdMemberClient, err := e.clientFactory.GetEtcdMemberClient() if err != nil { @@ -71,6 +104,16 @@ func (e *EtcdMemberReconciler) Start(ctx context.Context) error { } e.etcdMemberClient = etcdMemberClient + e.leaderElector.AddAcquiredLeaseCallback(func() { + // Spin up resync in separate routine to not block the leader election + go func() { + log.Info("leader lease acquired, starting resync") + if err := e.resync(ctx); err != nil { + log.WithError(err).Error("failed to resync etcd members") + } + }() + }) + // Run the watch in go routine so it keeps running till the context ends go func() { err = e.waitForCRD(ctx) @@ -80,10 +123,25 @@ func (e *EtcdMemberReconciler) Start(ctx context.Context) error { } // Create the object for this node - err = e.createMemberObject(ctx) + // Need to be done in retry loop as during the initial startup the etcd might not be stable + err = retry.Do( + func() error { + return e.createMemberObject(ctx) + }, + retry.Delay(3*time.Second), + retry.Attempts(5), + retry.Context(ctx), + retry.LastErrorOnly(true), + retry.RetryIf(func(retryErr error) bool { + log.Debugf("retrying createMemberObject: %v", retryErr) + // During etcd cluster bootstrap, it's common to see k8s giving 500 errors due to etcd timeouts + return apierrors.IsInternalError(retryErr) + }), + ) if err != nil { - log.WithError(err).Error("failed to create EtcdMember object") + log.WithError(err).Error("failed to create EtcdMember object for this controller") } + e.started.Store(true) var lastObservedVersion string err = watch.EtcdMembers(etcdMemberClient). WithErrorCallback(func(err error) (time.Duration, error) { @@ -102,7 +160,14 @@ func (e *EtcdMemberReconciler) Start(ctx context.Context) error { }). Until(ctx, func(member *etcdv1beta1.EtcdMember) (bool, error) { lastObservedVersion = member.ResourceVersion - e.reconcileMember(ctx, member) + log.Debugf("watch triggered on %s", member.Name) + if e.leaderElector.IsLeader() { + if err := e.resync(ctx); err != nil { + log.WithError(err).Error("failed to resync etcd members") + } + } else { + log.Debug("Not the leader, skipping") + } // Never stop the watch return false, nil }) @@ -225,8 +290,6 @@ func (e *EtcdMemberReconciler) createMemberObject(ctx context.Context) error { } } - em.Status.PeerAddress = e.etcdConfig.PeerAddress - em.Status.MemberID = memberIDStr em.Spec.Leave = false log.Debug("EtcdMember object already exists, updating it") @@ -235,6 +298,8 @@ func (e *EtcdMemberReconciler) createMemberObject(ctx context.Context) error { if err != nil { return err } + em.Status.PeerAddress = e.etcdConfig.PeerAddress + em.Status.MemberID = memberIDStr em.Status.SetCondition(etcdv1beta1.ConditionTypeJoined, etcdv1beta1.ConditionTrue, "Member joined", time.Now()) _, err = e.etcdMemberClient.UpdateStatus(ctx, em, metav1.UpdateOptions{}) if err != nil { @@ -253,12 +318,7 @@ func (e *EtcdMemberReconciler) reconcileMember(ctx context.Context, member *etcd "peerAddress": member.Status.PeerAddress, }) - if !e.leaderElector.IsLeader() { - log.Debug("not the leader, skipping reconcile") - return - } - - log.Debugf("reconciling EtcdMember: %+v", member) + log.Debugf("reconciling EtcdMember: %s", member.Name) if !member.Spec.Leave { log.Debug("member not marked for leave, no action needed") @@ -268,7 +328,7 @@ func (e *EtcdMemberReconciler) reconcileMember(ctx context.Context, member *etcd etcdClient, err := etcd.NewClient(e.k0sVars.CertRootDir, e.k0sVars.EtcdCertDir, e.etcdConfig) if err != nil { log.WithError(err).Warn("failed to create etcd client") - member.Status.ReconcileStatus = "Failed" + member.Status.ReconcileStatus = etcdv1beta1.ReconcileStatusFailed member.Status.Message = err.Error() if _, err = e.etcdMemberClient.UpdateStatus(ctx, member, metav1.UpdateOptions{}); err != nil { log.WithError(err).Error("failed to update member state") @@ -283,7 +343,7 @@ func (e *EtcdMemberReconciler) reconcileMember(ctx context.Context, member *etcd // Verify that the member is actually still present in etcd members, err := etcdClient.ListMembers(ctx) if err != nil { - member.Status.ReconcileStatus = "Failed" + member.Status.ReconcileStatus = etcdv1beta1.ReconcileStatusFailed member.Status.Message = err.Error() if _, err = e.etcdMemberClient.UpdateStatus(ctx, member, metav1.UpdateOptions{}); err != nil { log.WithError(err).Error("failed to update member state") @@ -297,6 +357,7 @@ func (e *EtcdMemberReconciler) reconcileMember(ctx context.Context, member *etcd if !ok { log.Debug("member marked for leave but not in actual member list, updating state to reflect that") member.Status.SetCondition(etcdv1beta1.ConditionTypeJoined, etcdv1beta1.ConditionFalse, member.Status.Message, time.Now()) + member.Status.ReconcileStatus = etcdv1beta1.ReconcileStatusSuccess member, err = e.etcdMemberClient.UpdateStatus(ctx, member, metav1.UpdateOptions{}) if err != nil { log.WithError(err).Error("failed to update EtcdMember status") @@ -316,11 +377,31 @@ func (e *EtcdMemberReconciler) reconcileMember(ctx context.Context, member *etcd return } - if err = etcdClient.DeleteMember(ctx, memberID); err != nil { + err = retry.Do(func() error { + return etcdClient.DeleteMember(ctx, memberID) + }, + retry.Delay(5*time.Second), + retry.LastErrorOnly(true), + retry.Attempts(5), + retry.Context(ctx), + retry.RetryIf(func(err error) bool { + // In case etcd reports unhealthy cluster, retry + msg := err.Error() + switch { + case strings.Contains(msg, "unhealthy cluster"): + return true + case strings.Contains(msg, "leader changed"): + return true + } + return false + }), + ) + + if err != nil { logrus. WithError(err). Errorf("Failed to delete etcd peer from cluster") - member.Status.ReconcileStatus = "Failed" + member.Status.ReconcileStatus = etcdv1beta1.ReconcileStatusFailed member.Status.Message = err.Error() _, err = e.etcdMemberClient.UpdateStatus(ctx, member, metav1.UpdateOptions{}) if err != nil { @@ -331,7 +412,7 @@ func (e *EtcdMemberReconciler) reconcileMember(ctx context.Context, member *etcd // Peer removed successfully, update status log.Info("reconcile succeeded") - member.Status.ReconcileStatus = "Success" + member.Status.ReconcileStatus = etcdv1beta1.ReconcileStatusSuccess member.Status.Message = "Member removed from cluster" member.Status.SetCondition(etcdv1beta1.ConditionTypeJoined, etcdv1beta1.ConditionFalse, member.Status.Message, time.Now()) _, err = e.etcdMemberClient.UpdateStatus(ctx, member, metav1.UpdateOptions{})