Skip to content

Commit

Permalink
operator: Allow proxy and target controllers to directly manage recon…
Browse files Browse the repository at this point in the history
…ciliation results

Signed-off-by: Aaron Wilson <[email protected]>
  • Loading branch information
aaronnw committed Oct 22, 2024
1 parent 7d947c9 commit 10064d4
Show file tree
Hide file tree
Showing 7 changed files with 123 additions and 96 deletions.
45 changes: 23 additions & 22 deletions operator/pkg/controllers/cluster_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ package controllers

import (
"context"
"fmt"
"time"

aisv1 "github.com/ais-operator/api/v1beta1"
Expand Down Expand Up @@ -447,18 +448,18 @@ func (r *AIStoreReconciler) ensurePrereqs(ctx context.Context, ais *aisv1.AIStor

func (r *AIStoreReconciler) bootstrapNew(ctx context.Context, ais *aisv1.AIStore) (result ctrl.Result, err error) {
// 1. Bootstrap proxies
if result.Requeue, err = r.initProxies(ctx, ais); err != nil {
if result, err = r.initProxies(ctx, ais); err != nil {
r.recordError(ctx, ais, err, "Failed to create Proxy resources")
return result, err
} else if result.Requeue {
return
} else if !result.IsZero() {
return
}

// 2. Bootstrap targets
if result.Requeue, err = r.initTargets(ctx, ais); err != nil {
if result, err = r.initTargets(ctx, ais); err != nil {
r.recordError(ctx, ais, err, "Failed to create Target resources")
return result, err
} else if result.Requeue {
return
} else if !result.IsZero() {
return
}

Expand All @@ -481,35 +482,35 @@ func (r *AIStoreReconciler) bootstrapNew(ctx context.Context, ais *aisv1.AIStore
// 3. Check if config is properly updated in the cluster.
// 4. If expected state is not yet met we should reconcile until everything is ready.
func (r *AIStoreReconciler) handleCREvents(ctx context.Context, ais *aisv1.AIStore) (result ctrl.Result, err error) {
var proxyReady, targetReady bool
if proxyReady, err = r.handleProxyState(ctx, ais); err != nil {
logger := logf.FromContext(ctx)
if result, err = r.handleProxyState(ctx, ais); err != nil {
return
} else if !proxyReady {
} else if !result.IsZero() {
goto requeue
}

if targetReady, err = r.handleTargetState(ctx, ais); err != nil {
if result, err = r.handleTargetState(ctx, ais); err != nil {
return
} else if !targetReady {
} else if !result.IsZero() {
goto requeue
}

err = r.checkAISClusterReady(ctx, ais)
if err != nil {
result, err = r.checkAISClusterReady(ctx, ais)
if err != nil || !result.IsZero() {
return
}
// Enables the rebalance condition (still respects the spec desired rebalance.Enabled property)
err = r.enableRebalanceCondition(ctx, ais)
if err != nil {
logf.FromContext(ctx).Error(err, "Failed to enable rebalance condition")
logger.Error(err, "Failed to enable rebalance condition")
return
}

if err = r.handleConfigState(ctx, ais, false /*force*/); err != nil {
goto requeue
}

return r.handleSuccessfulReconcile(ctx, ais)
return result, r.handleSuccessfulReconcile(ctx, ais)

requeue:
// We requeue till the AIStore cluster becomes ready.
Expand Down Expand Up @@ -548,8 +549,7 @@ func (r *AIStoreReconciler) handleConfigState(ctx context.Context, ais *aisv1.AI
logger.Info("Updating cluster config to match spec via API")
err = apiClient.SetClusterConfigUsingMsg(desiredConf, false /*transient*/)
if err != nil {
logger.Error(err, "Failed to update cluster config")
return err
return fmt.Errorf("failed to update cluster config: %w", err)
}

// Finally update CRD with proper annotation.
Expand Down Expand Up @@ -653,7 +653,7 @@ func (r *AIStoreReconciler) SetupWithManager(mgr ctrl.Manager) error {
Complete(r)
}

func (r *AIStoreReconciler) handleSuccessfulReconcile(ctx context.Context, ais *aisv1.AIStore) (result ctrl.Result, err error) {
func (r *AIStoreReconciler) handleSuccessfulReconcile(ctx context.Context, ais *aisv1.AIStore) (err error) {
var needsUpdate bool
if !ais.IsConditionTrue(aisv1.ConditionReady) {
r.recorder.Event(ais, corev1.EventTypeNormal, EventReasonReady, "Successfully reconciled AIStore cluster")
Expand All @@ -669,16 +669,17 @@ func (r *AIStoreReconciler) handleSuccessfulReconcile(ctx context.Context, ais *
return
}

func (r *AIStoreReconciler) checkAISClusterReady(ctx context.Context, ais *aisv1.AIStore) error {
func (r *AIStoreReconciler) checkAISClusterReady(ctx context.Context, ais *aisv1.AIStore) (result ctrl.Result, err error) {
apiClient, err := r.clientManager.GetClient(ctx, ais)
if err != nil {
return err
return
}
err = apiClient.Health(true /*readyToRebalance*/)
if err != nil {
logf.FromContext(ctx).Info("Waiting for AIS to be healthy...")
logf.FromContext(ctx).Info("AIS cluster is not ready", "health_error", err.Error())
return ctrl.Result{Requeue: true}, nil
}
return err
return
}

func (r *AIStoreReconciler) recordError(ctx context.Context, ais *aisv1.AIStore, err error, msg string) {
Expand Down
8 changes: 4 additions & 4 deletions operator/pkg/controllers/cluster_controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -162,9 +162,9 @@ var _ = Describe("AIStoreController", func() {
Expect(err).To(HaveOccurred())

By("Reconcile to create proxy StatefulSet")
ready, err := r.handleProxyState(ctx, ais)
result, err := r.handleProxyState(ctx, ais)
Expect(err).ToNot(HaveOccurred())
Expect(ready).To(BeFalse())
Expect(result.Requeue).To(BeTrue())

By("Ensure that proxy StatefulSet has been created")
err = c.Get(ctx, types.NamespacedName{Name: "ais-proxy", Namespace: namespace}, &ss)
Expand All @@ -179,9 +179,9 @@ var _ = Describe("AIStoreController", func() {
Expect(err).To(HaveOccurred())

By("Reconcile to create target StatefulSet")
ready, err := r.handleTargetState(ctx, ais)
result, err := r.handleTargetState(ctx, ais)
Expect(err).ToNot(HaveOccurred())
Expect(ready).To(BeFalse())
Expect(result.Requeue).To(BeTrue())

By("Ensure that target StatefulSet has been created")
err = c.Get(ctx, types.NamespacedName{Name: "ais-target", Namespace: namespace}, &ss)
Expand Down
96 changes: 57 additions & 39 deletions operator/pkg/controllers/proxy_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,10 +20,15 @@ import (
corev1 "k8s.io/api/core/v1"
k8serrors "k8s.io/apimachinery/pkg/api/errors"
"k8s.io/apimachinery/pkg/types"
ctrl "sigs.k8s.io/controller-runtime"
logf "sigs.k8s.io/controller-runtime/pkg/log"
)

const dnsLookupTimeout = 10 * time.Second
const (
proxyStartupInterval = 5 * time.Second
proxyDNSInterval = 5 * time.Second
proxyDNSTimeout = 10 * time.Second
)

func (r *AIStoreReconciler) ensureProxyPrereqs(ctx context.Context, ais *aisv1.AIStore) (err error) {
var cm *corev1.ConfigMap
Expand All @@ -48,8 +53,9 @@ func (r *AIStoreReconciler) ensureProxyPrereqs(ctx context.Context, ais *aisv1.A
return
}

func (r *AIStoreReconciler) initProxies(ctx context.Context, ais *aisv1.AIStore) (requeue bool, err error) {
func (r *AIStoreReconciler) initProxies(ctx context.Context, ais *aisv1.AIStore) (ctrl.Result, error) {
var (
err error
exists bool
changed bool
logger = logf.FromContext(ctx)
Expand All @@ -59,39 +65,38 @@ func (r *AIStoreReconciler) initProxies(ctx context.Context, ais *aisv1.AIStore)
ss := proxy.NewProxyStatefulSet(ais, 1)
if exists, err = r.k8sClient.CreateResourceIfNotExists(ctx, ais, ss); err != nil {
r.recordError(ctx, ais, err, "Failed to deploy Primary proxy")
return
return ctrl.Result{}, err
} else if !exists {
requeue = true
return
return ctrl.Result{Requeue: true}, nil
}

// Wait for primary to start-up.
if _, err := r.k8sClient.GetReadyPod(ctx, proxy.DefaultPrimaryNSName(ais)); err != nil {
logger.Info("Waiting for primary proxy to come up", "err", err)
_, err = r.k8sClient.GetReadyPod(ctx, proxy.DefaultPrimaryNSName(ais))
if err != nil {
logger.Info("Waiting for primary proxy to come up", "err", err.Error())
r.recorder.Event(ais, corev1.EventTypeNormal, EventReasonWaiting, "Waiting for primary proxy to come up")
return true /*requeue*/, nil
return ctrl.Result{RequeueAfter: proxyStartupInterval}, nil
}

// 2. Start all the proxy daemons
changed, err = r.k8sClient.UpdateStatefulSetReplicas(ctx, proxy.StatefulSetNSName(ais), ais.GetProxySize())
if err != nil {
r.recordError(ctx, ais, err, "Failed to deploy StatefulSet")
return
return ctrl.Result{}, err
}
if changed {
msg := "Successfully initialized proxy nodes"
logger.Info(msg)
r.recorder.Event(ais, corev1.EventTypeNormal, EventReasonInitialized, msg)
requeue = true
}

// Check whether proxy service to have a registered DNS entry.
if err = checkDNSEntry(ctx, ais); err != nil {
logger.Info("Failed to find any DNS entries for proxy service", "error", err)
if dnsErr := checkDNSEntry(ctx, ais); dnsErr != nil {
logger.Info("Failed to find any DNS entries for proxy service", "error", dnsErr)
r.recorder.Event(ais, corev1.EventTypeNormal, EventReasonWaiting, "Waiting for proxy service to have registered DNS entries")
return true /*requeue*/, nil
return ctrl.Result{RequeueAfter: proxyDNSInterval}, nil
}
return
return ctrl.Result{}, nil
}

var checkDNSEntry = checkDNSEntryDefault
Expand All @@ -101,7 +106,7 @@ func checkDNSEntryDefault(ctx context.Context, ais *aisv1.AIStore) error {
clusterDomain := ais.GetClusterDomain()
hostname := fmt.Sprintf("%s.%s.svc.%s", nsName.Name, nsName.Namespace, clusterDomain)

ctx, cancel := context.WithTimeout(ctx, dnsLookupTimeout)
ctx, cancel := context.WithTimeout(ctx, proxyDNSTimeout)
defer cancel()
_, err := net.DefaultResolver.LookupIPAddr(ctx, hostname)
// Log an error if we have an actual error, not just no host found
Expand All @@ -123,23 +128,19 @@ func (r *AIStoreReconciler) cleanupProxy(ctx context.Context, ais *aisv1.AIStore
)
}

func (r *AIStoreReconciler) handleProxyState(ctx context.Context, ais *aisv1.AIStore) (ready bool, err error) {
// Fetch the latest proxy StatefulSet.
func (r *AIStoreReconciler) handleProxyState(ctx context.Context, ais *aisv1.AIStore) (result ctrl.Result, err error) {
proxySSName := proxy.StatefulSetNSName(ais)
ss, err := r.k8sClient.GetStatefulSet(ctx, proxySSName)
if err != nil {
if k8serrors.IsNotFound(err) {
// FIXME: We should likely set condition that `ais-proxy` StatefulSet
// needs to reconciled and we should invoke this function over and over
// until done.
requeue, err := r.initProxies(ctx, ais)
return !requeue, err
return r.initProxies(ctx, ais)
}
return ready, err
return
}

if hasLatest, err := r.handleProxyImage(ctx, ais, ss); !hasLatest || err != nil {
return false, err
result, err = r.handleProxyImage(ctx, ais, ss)
if err != nil || !result.IsZero() {
return
}

if *ss.Spec.Replicas != ais.GetProxySize() {
Expand All @@ -149,25 +150,32 @@ func (r *AIStoreReconciler) handleProxyState(ctx context.Context, ais *aisv1.AIS
}
// If anything was updated, we consider it not immediately ready.
updated, err := r.k8sClient.UpdateStatefulSetReplicas(ctx, proxySSName, ais.GetProxySize())
if updated || err != nil {
return false, err
if err != nil || updated {
result.Requeue = true
return result, err
}
}

// For now, state of proxy is considered ready if the number of proxy pods ready matches the size provided in AIS cluster spec.
return ss.Status.ReadyReplicas == ais.GetProxySize(), nil
// Requeue if the number of proxy pods ready does not match the size provided in AIS cluster spec.
if ss.Status.ReadyReplicas != ais.GetProxySize() {
logf.FromContext(ctx).Info("Waiting for proxy statefulset to reach desired replicas")
return ctrl.Result{RequeueAfter: proxyStartupInterval}, nil
}
return
}

func (r *AIStoreReconciler) handleProxyImage(ctx context.Context, ais *aisv1.AIStore, ss *appsv1.StatefulSet) (ready bool, err error) {
// formerly "ready"
func (r *AIStoreReconciler) handleProxyImage(ctx context.Context, ais *aisv1.AIStore, ss *appsv1.StatefulSet) (result ctrl.Result, err error) {
logger := logf.FromContext(ctx)

firstPodName := proxy.PodName(ais, 0)
updated := ss.Spec.Template.Spec.Containers[0].Image != ais.Spec.NodeImage
if updated {
if ss.Status.ReadyReplicas > 0 {
if err := r.setPrimaryTo(ctx, ais, 0); err != nil {
err = r.setPrimaryTo(ctx, ais, 0)
if err != nil {
logger.Error(err, "failed to set primary proxy")
return false, err
return
}
logger.Info("Updated primary to pod " + firstPodName)
ss.Spec.UpdateStrategy = appsv1.StatefulSetUpdateStrategy{
Expand All @@ -178,7 +186,9 @@ func (r *AIStoreReconciler) handleProxyImage(ctx context.Context, ais *aisv1.AIS
}
}
ss.Spec.Template.Spec.Containers[0].Image = ais.Spec.NodeImage
return false, r.k8sClient.Update(ctx, ss)
result.Requeue = true
err = r.k8sClient.Update(ctx, ss)
return
}

podList, err := r.k8sClient.ListPods(ctx, ss)
Expand All @@ -203,8 +213,9 @@ func (r *AIStoreReconciler) handleProxyImage(ctx context.Context, ais *aisv1.AIS
// This implies the pod with the largest index is the oldest proxy,
// and we set it as a primary.
if toUpdate == 1 && firstYetToUpdate {
if err := r.setPrimaryTo(ctx, ais, *ss.Spec.Replicas-1); err != nil {
return false, err
err = r.setPrimaryTo(ctx, ais, *ss.Spec.Replicas-1)
if err != nil {
return
}
// Revert statefulset partition spec
ss.Spec.UpdateStrategy = appsv1.StatefulSetUpdateStrategy{
Expand All @@ -214,19 +225,26 @@ func (r *AIStoreReconciler) handleProxyImage(ctx context.Context, ais *aisv1.AIS
},
}

if err = r.k8sClient.Update(ctx, ss); err != nil {
err = r.k8sClient.Update(ctx, ss)
if err != nil {
logger.Error(err, "failed to update proxy statefulset update policy")
return false, err
return
}

// Delete the first pod to update its docker image.
_, err = r.k8sClient.DeletePodIfExists(ctx, types.NamespacedName{
Namespace: ais.Namespace,
Name: firstPodName,
})
return false, err
if err != nil {
return
}
return ctrl.Result{Requeue: true}, nil
}
if toUpdate == 0 {
return ctrl.Result{}, nil
}
return toUpdate == 0, nil
return ctrl.Result{Requeue: true}, nil
}

func (r *AIStoreReconciler) setPrimaryTo(ctx context.Context, ais *aisv1.AIStore, podIdx int32) error {
Expand Down
Loading

0 comments on commit 10064d4

Please sign in to comment.