Skip to content

Commit

Permalink
implement stagedUpdateRun execution
Browse files Browse the repository at this point in the history
  • Loading branch information
jwtty committed Dec 23, 2024
1 parent cb0c5f7 commit 45e5740
Show file tree
Hide file tree
Showing 9 changed files with 1,491 additions and 94 deletions.
80 changes: 72 additions & 8 deletions pkg/controllers/updaterun/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ import (
"time"

"k8s.io/apimachinery/pkg/api/meta"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/types"
"k8s.io/client-go/tools/record"
"k8s.io/client-go/util/workqueue"
Expand Down Expand Up @@ -40,10 +41,6 @@ var (
// errInitializedFailed is the error when the ClusterStagedUpdateRun fails to initialize.
// It is a wrapped error of errStagedUpdatedAborted, because some initialization functions are reused in the validation step.
errInitializedFailed = fmt.Errorf("%w: failed to initialize the clusterStagedUpdateRun", errStagedUpdatedAborted)

// stageUpdatingWaitTime is the time to wait before rechecking the stage update status.
// Put it as a variable for convenient testing.
stageUpdatingWaitTime = 60 * time.Second
)

// Reconciler reconciles a ClusterStagedUpdateRun object.
Expand Down Expand Up @@ -127,10 +124,35 @@ func (r *Reconciler) Reconcile(ctx context.Context, req runtime.Request) (runtim
klog.V(2).InfoS("The clusterStagedUpdateRun is validated", "clusterStagedUpdateRun", runObjRef)
}

// TODO(wantjian): execute the clusterStagedUpdateRun and fix the requeue time.
klog.V(2).InfoS("Executing the clusterStagedUpdateRun", "clusterStagedUpdateRun", runObjRef, "updatingStageIndex", updatingStageIndex,
"toBeUpdatedBindings count", len(toBeUpdatedBindings), "toBeDeletedBindings count", len(toBeDeletedBindings))
return runtime.Result{RequeueAfter: stageUpdatingWaitTime}, nil
// The previous run is completed but the update to the status failed.
if updatingStageIndex == -1 {
klog.V(2).InfoS("The clusterStagedUpdateRun is completed", "clusterStagedUpdateRun", runObjRef)
return runtime.Result{}, r.recordUpdateRunSucceeded(ctx, &updateRun)
}

// Execute the updateRun.
klog.V(2).InfoS("Continue to execute the clusterStagedUpdateRun", "updatingStageIndex", updatingStageIndex, "clusterStagedUpdateRun", runObjRef)
finished, waitTime, execErr := r.execute(ctx, &updateRun, updatingStageIndex, toBeUpdatedBindings, toBeDeletedBindings)
if execErr != nil && errors.Is(execErr, errStagedUpdatedAborted) {
// errStagedUpdatedAborted cannot be retried.
return runtime.Result{}, r.recordUpdateRunFailed(ctx, &updateRun, execErr.Error())
}

if finished {
klog.V(2).InfoS("The clusterStagedUpdateRun is completed", "clusterStagedUpdateRun", runObjRef)
return runtime.Result{}, r.recordUpdateRunSucceeded(ctx, &updateRun)
}

// The execution is not finished yet or it encounters a retriable error.
// We need to record the status and requeue.
if updateErr := r.recordUpdateRunStatus(ctx, &updateRun); updateErr != nil {
return runtime.Result{}, updateErr
}
klog.V(2).InfoS("The clusterStagedUpdateRun is not finished yet", "requeueWaitTime", waitTime, "execErr", execErr, "clusterStagedUpdateRun", runObjRef)
if execErr != nil {
return runtime.Result{}, execErr
}
return runtime.Result{Requeue: true, RequeueAfter: waitTime}, nil
}

// handleDelete handles the deletion of the clusterStagedUpdateRun object.
Expand Down Expand Up @@ -162,6 +184,48 @@ func (r *Reconciler) ensureFinalizer(ctx context.Context, updateRun *placementv1
return r.Update(ctx, updateRun, client.FieldOwner(utils.UpdateRunControllerFieldManagerName))
}

// recordUpdateRunSucceeded records the succeeded condition in the ClusterStagedUpdateRun status.
func (r *Reconciler) recordUpdateRunSucceeded(ctx context.Context, updateRun *placementv1alpha1.ClusterStagedUpdateRun) error {
meta.SetStatusCondition(&updateRun.Status.Conditions, metav1.Condition{
Type: string(placementv1alpha1.StagedUpdateRunConditionSucceeded),
Status: metav1.ConditionTrue,
ObservedGeneration: updateRun.Generation,
Reason: condition.UpdateRunSucceededReason,
})
if updateErr := r.Client.Status().Update(ctx, updateRun); updateErr != nil {
klog.ErrorS(updateErr, "Failed to update the ClusterStagedUpdateRun status as succeeded", "clusterStagedUpdateRun", klog.KObj(updateRun))
// updateErr can be retried.
return controller.NewUpdateIgnoreConflictError(updateErr)
}
return nil
}

// recordUpdateRunFailed records the failed condition in the ClusterStagedUpdateRun status.
func (r *Reconciler) recordUpdateRunFailed(ctx context.Context, updateRun *placementv1alpha1.ClusterStagedUpdateRun, message string) error {
meta.SetStatusCondition(&updateRun.Status.Conditions, metav1.Condition{
Type: string(placementv1alpha1.StagedUpdateRunConditionSucceeded),
Status: metav1.ConditionFalse,
ObservedGeneration: updateRun.Generation,
Reason: condition.UpdateRunFailedReason,
Message: message,
})
if updateErr := r.Client.Status().Update(ctx, updateRun); updateErr != nil {
klog.ErrorS(updateErr, "Failed to update the ClusterStagedUpdateRun status as failed", "clusterStagedUpdateRun", klog.KObj(updateRun))
// updateErr can be retried.
return controller.NewUpdateIgnoreConflictError(updateErr)
}
return nil
}

// recordUpdateRunStatus records the ClusterStagedUpdateRun status.
func (r *Reconciler) recordUpdateRunStatus(ctx context.Context, updateRun *placementv1alpha1.ClusterStagedUpdateRun) error {
if updateErr := r.Client.Status().Update(ctx, updateRun); updateErr != nil {
klog.ErrorS(updateErr, "Failed to update the ClusterStagedUpdateRun status", "clusterStagedUpdateRun", klog.KObj(updateRun))
return controller.NewUpdateIgnoreConflictError(updateErr)
}
return nil
}

// SetupWithManager sets up the controller with the Manager.
func (r *Reconciler) SetupWithManager(mgr runtime.Manager) error {
r.recorder = mgr.GetEventRecorderFor("clusterresource-stagedupdaterun-controller")
Expand Down
39 changes: 34 additions & 5 deletions pkg/controllers/updaterun/controller_integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/types"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/controller/controllerutil"

clusterv1beta1 "go.goms.io/fleet/apis/cluster/v1beta1"
Expand Down Expand Up @@ -327,7 +328,7 @@ func generateTestClusterStagedUpdateStrategy() *placementv1alpha1.ClusterStagedU
{
Type: placementv1alpha1.AfterStageTaskTypeTimedWait,
WaitTime: metav1.Duration{
Duration: time.Minute * 10,
Duration: time.Second * 4,
},
},
},
Expand Down Expand Up @@ -469,7 +470,7 @@ func validateApprovalRequestCount(ctx context.Context, count int) {
}, timeout, interval).Should(Equal(count), "approval requests count mismatch")
}

func generateTrueCondition(updateRun *placementv1alpha1.ClusterStagedUpdateRun, condType any) metav1.Condition {
func generateTrueCondition(obj client.Object, condType any) metav1.Condition {
reason, typeStr := "", ""
switch cond := condType.(type) {
case placementv1alpha1.StagedUpdateRunConditionType:
Expand Down Expand Up @@ -498,16 +499,38 @@ func generateTrueCondition(updateRun *placementv1alpha1.ClusterStagedUpdateRun,
reason = condition.ClusterUpdatingSucceededReason
}
typeStr = string(cond)
case placementv1alpha1.AfterStageTaskConditionType:
switch cond {
case placementv1alpha1.AfterStageTaskConditionWaitTimeElapsed:
reason = condition.AfterStageTaskWaitTimeElapsedReason
case placementv1alpha1.AfterStageTaskConditionApprovalRequestCreated:
reason = condition.AfterStageTaskApprovalRequestCreatedReason
case placementv1alpha1.AfterStageTaskConditionApprovalRequestApproved:
reason = condition.AfterStageTaskApprovalRequestApprovedReason
}
typeStr = string(cond)
case placementv1alpha1.ApprovalRequestConditionType:
switch cond {
case placementv1alpha1.ApprovalRequestConditionApproved:
reason = "LGTM"
}
typeStr = string(cond)
case placementv1beta1.ResourceBindingConditionType:
switch cond {
case placementv1beta1.ResourceBindingAvailable:
reason = condition.AvailableReason
}
typeStr = string(cond)
}
return metav1.Condition{
Status: metav1.ConditionTrue,
Type: typeStr,
ObservedGeneration: updateRun.Generation,
ObservedGeneration: obj.GetGeneration(),
Reason: reason,
}
}

func generateFalseCondition(updateRun *placementv1alpha1.ClusterStagedUpdateRun, condType any) metav1.Condition {
func generateFalseCondition(obj client.Object, condType any) metav1.Condition {
reason, typeStr := "", ""
switch cond := condType.(type) {
case placementv1alpha1.StagedUpdateRunConditionType:
Expand All @@ -530,11 +553,17 @@ func generateFalseCondition(updateRun *placementv1alpha1.ClusterStagedUpdateRun,
reason = condition.ClusterUpdatingFailedReason
}
typeStr = string(cond)
case placementv1beta1.ResourceBindingConditionType:
switch cond {
case placementv1beta1.ResourceBindingApplied:
reason = condition.ApplyFailedReason
}
typeStr = string(cond)
}
return metav1.Condition{
Status: metav1.ConditionFalse,
Type: typeStr,
ObservedGeneration: updateRun.Generation,
ObservedGeneration: obj.GetGeneration(),
Reason: reason,
}
}
Loading

0 comments on commit 45e5740

Please sign in to comment.