Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: enable clusterStagedUpdateRun controller #1004

Merged
merged 2 commits into from
Jan 4, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 10 additions & 1 deletion apis/placement/v1alpha1/common.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,9 +12,18 @@ const (
// ResourceOverrideKind is the kind of the ResourceOverride.
ResourceOverrideKind = "ResourceOverride"

// ResourceOverrideSnapshotKind is the kind of the ResourceOverrideSnapshotKind.
// ResourceOverrideSnapshotKind is the kind of the ResourceOverrideSnapshot.
ResourceOverrideSnapshotKind = "ResourceOverrideSnapshot"

// ClusterStagedUpdateRunKind is the kind of the ClusterStagedUpdateRun.
ClusterStagedUpdateRunKind = "ClusterStagedUpdateRun"

// ClusterStagedUpdateStrategyKind is the kind of the ClusterStagedUpdateStrategy.
ClusterStagedUpdateStrategyKind = "ClusterStagedUpdateStrategy"

// ClusterApprovalRequestKind is the kind of the ClusterApprovalRequest.
ClusterApprovalRequestKind = "ClusterApprovalRequest"

// ClusterStagedUpdateRunFinalizer is used by the ClusterStagedUpdateRun controller to make sure that the ClusterStagedUpdateRun
// object is not deleted until all its dependent resources are deleted.
ClusterStagedUpdateRunFinalizer = fleetPrefix + "stagedupdaterun-finalizer"
Expand Down
1 change: 1 addition & 0 deletions charts/hub-agent/templates/deployment.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ spec:
- --enable-v1alpha1-apis={{ .Values.enableV1Alpha1APIs }}
- --enable-v1beta1-apis={{ .Values.enableV1Beta1APIs }}
- --enable-cluster-inventory-apis={{ .Values.enableClusterInventoryAPI }}
- --enable-staged-update-run-apis={{ .Values.enableStagedUpdateRunAPIs }}
- --max-concurrent-cluster-placement={{ .Values.MaxConcurrentClusterPlacement }}
- --concurrent-resource-change-syncs={{ .Values.ConcurrentResourceChangeSyncs }}
- --log_file_max_size={{ .Values.logFileMaxSize }}
Expand Down
1 change: 1 addition & 0 deletions charts/hub-agent/values.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ affinity: {}
enableV1Alpha1APIs: false
enableV1Beta1APIs: true
enableClusterInventoryAPI: true
enableStagedUpdateRunAPIs: true

hubAPIQPS: 250
hubAPIBurst: 1000
Expand Down
4 changes: 4 additions & 0 deletions cmd/hubagent/options/options.go
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,8 @@ type Options struct {
EnableClusterInventoryAPIs bool
// ForceDeleteWaitTime is the duration the hub agent waits before force deleting a member cluster.
ForceDeleteWaitTime metav1.Duration
// EnableStagedUpdateRunAPIs enables the agents to watch the clusterStagedUpdateRun CRs.
EnableStagedUpdateRunAPIs bool
}

// NewOptions builds an empty options.
Expand All @@ -99,6 +101,7 @@ func NewOptions() *Options {
MaxFleetSizeSupported: 100,
EnableV1Alpha1APIs: false,
EnableClusterInventoryAPIs: false,
EnableStagedUpdateRunAPIs: false,
}
}

Expand Down Expand Up @@ -140,6 +143,7 @@ func (o *Options) AddFlags(flags *flag.FlagSet) {
flags.BoolVar(&o.EnableV1Beta1APIs, "enable-v1beta1-apis", true, "If set, the agents will watch for the v1beta1 APIs.")
flags.BoolVar(&o.EnableClusterInventoryAPIs, "enable-cluster-inventory-apis", false, "If set, the agents will watch for the ClusterInventory APIs.")
flags.DurationVar(&o.ForceDeleteWaitTime.Duration, "force-delete-wait-time", 15*time.Minute, "The duration the hub agent waits before force deleting a member cluster.")
flags.BoolVar(&o.EnableStagedUpdateRunAPIs, "enable-staged-update-run-apis", false, "If set, the agents will watch for the ClusterStagedUpdateRun APIs.")

o.RateLimiterOpts.AddFlags(flags)
}
32 changes: 29 additions & 3 deletions cmd/hubagent/workload/setup.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ import (
"go.goms.io/fleet/pkg/controllers/overrider"
"go.goms.io/fleet/pkg/controllers/resourcechange"
"go.goms.io/fleet/pkg/controllers/rollout"
"go.goms.io/fleet/pkg/controllers/updaterun"
"go.goms.io/fleet/pkg/controllers/workgenerator"
"go.goms.io/fleet/pkg/resourcewatcher"
"go.goms.io/fleet/pkg/scheduler"
Expand Down Expand Up @@ -85,13 +86,20 @@ var (
placementv1alpha1.GroupVersion.WithKind(placementv1alpha1.ResourceOverrideSnapshotKind),
}

clusterStagedUpdateRunGVKs = []schema.GroupVersionKind{
placementv1alpha1.GroupVersion.WithKind(placementv1alpha1.ClusterStagedUpdateRunKind),
placementv1alpha1.GroupVersion.WithKind(placementv1alpha1.ClusterStagedUpdateStrategyKind),
placementv1alpha1.GroupVersion.WithKind(placementv1alpha1.ClusterApprovalRequestKind),
}

clusterInventoryGVKs = []schema.GroupVersionKind{
clusterinventory.GroupVersion.WithKind("ClusterProfile"),
}
)

// SetupControllers set up the customized controllers we developed
func SetupControllers(ctx context.Context, wg *sync.WaitGroup, mgr ctrl.Manager, config *rest.Config, opts *options.Options) error {
func SetupControllers(ctx context.Context, wg *sync.WaitGroup, mgr ctrl.Manager, config *rest.Config, opts *options.Options) error { //nolint:gocyclo
// TODO: Try to reduce the complexity of this last measured at 33 (failing at > 30) and remove the // nolint:gocyclo
dynamicClient, err := dynamic.NewForConfig(config)
if err != nil {
klog.ErrorS(err, "unable to create the dynamic client")
Expand Down Expand Up @@ -195,7 +203,7 @@ func SetupControllers(ctx context.Context, wg *sync.WaitGroup, mgr ctrl.Manager,
return err
}

// Set up a new controller to do rollout resources according to CRP rollout strategy
// Set up a new controller to do rollout resources according to CRP rollout strategy
klog.Info("Setting up rollout controller")
if err := (&rollout.Reconciler{
Client: mgr.GetClient(),
Expand All @@ -215,6 +223,24 @@ func SetupControllers(ctx context.Context, wg *sync.WaitGroup, mgr ctrl.Manager,
return err
}

// Set up a controller to do staged update run, rolling out resources to clusters in a stage by stage manner.
if opts.EnableStagedUpdateRunAPIs {
for _, gvk := range clusterStagedUpdateRunGVKs {
if err = utils.CheckCRDInstalled(discoverClient, gvk); err != nil {
klog.ErrorS(err, "unable to find the required CRD", "GVK", gvk)
return err
}
}
klog.Info("Setting up clusterStagedUpdateRun controller")
if err = (&updaterun.Reconciler{
Client: mgr.GetClient(),
InformerManager: dynamicInformerManager,
}).SetupWithManager(mgr); err != nil {
klog.ErrorS(err, "unable to set up clusterStagedUpdateRun controller")
return err
}
}

// Set up the work generator
klog.Info("Setting up work generator")
if err := (&workgenerator.Reconciler{
Expand Down Expand Up @@ -327,7 +353,7 @@ func SetupControllers(ctx context.Context, wg *sync.WaitGroup, mgr ctrl.Manager,
}
}

// Set up a new controller to reconcile any resources in the cluster
// Set up a new controller to reconcile any resources in the cluster
klog.Info("Setting up resource change controller")
rcr := &resourcechange.Reconciler{
DynamicClient: dynamicClient,
Expand Down
4 changes: 2 additions & 2 deletions examples/stagedupdaterun/clusterStagedUpdateRun.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -4,10 +4,10 @@ metadata:
name: example-run
spec:
placementName: example-placement
resourceSnapshotIndex: "1"
resourceSnapshotIndex: example-placement-0-snapshot
stagedRolloutStrategyName: example-strategy
status:
policySnapshotIndexUsed: "1"
policySnapshotIndexUsed: example-placement-0
policyObservedClusterCount: 3
appliedStrategy:
type: Immediate
Expand Down
2 changes: 1 addition & 1 deletion pkg/controllers/updaterun/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -133,7 +133,7 @@ func (r *Reconciler) Reconcile(ctx context.Context, req runtime.Request) (runtim
// Execute the updateRun.
klog.V(2).InfoS("Continue to execute the clusterStagedUpdateRun", "updatingStageIndex", updatingStageIndex, "clusterStagedUpdateRun", runObjRef)
finished, waitTime, execErr := r.execute(ctx, &updateRun, updatingStageIndex, toBeUpdatedBindings, toBeDeletedBindings)
if execErr != nil && errors.Is(execErr, errStagedUpdatedAborted) {
if errors.Is(execErr, errStagedUpdatedAborted) {
// errStagedUpdatedAborted cannot be retried.
return runtime.Result{}, r.recordUpdateRunFailed(ctx, &updateRun, execErr.Error())
}
Expand Down
4 changes: 2 additions & 2 deletions pkg/controllers/updaterun/controller_integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -268,7 +268,7 @@ func generateTestClusterSchedulingPolicySnapshot(idx int) *placementv1beta1.Clus
}
}

func generateTestClusterResourceBinding(policySnapshotName, targetCluster string) *placementv1beta1.ClusterResourceBinding {
func generateTestClusterResourceBinding(policySnapshotName, targetCluster string, state placementv1beta1.BindingState) *placementv1beta1.ClusterResourceBinding {
binding := &placementv1beta1.ClusterResourceBinding{
ObjectMeta: metav1.ObjectMeta{
Name: "binding-" + testResourceSnapshotName + "-" + targetCluster,
Expand All @@ -277,7 +277,7 @@ func generateTestClusterResourceBinding(policySnapshotName, targetCluster string
},
},
Spec: placementv1beta1.ResourceBindingSpec{
State: placementv1beta1.BindingStateScheduled,
State: state,
TargetCluster: targetCluster,
SchedulingPolicySnapshotName: policySnapshotName,
},
Expand Down
41 changes: 37 additions & 4 deletions pkg/controllers/updaterun/execution.go
Original file line number Diff line number Diff line change
Expand Up @@ -115,6 +115,9 @@ func (r *Reconciler) executeUpdatingStage(
return 0, controller.NewUpdateIgnoreConflictError(err)
}
klog.V(2).InfoS("Updated the status of a binding to bound", "binding", klog.KObj(binding), "cluster", clusterStatus.ClusterName, "stage", updatingStageStatus.StageName, "clusterStagedUpdateRun", updateRunRef)
if err := r.updateBindingRolloutStarted(ctx, binding, updateRun); err != nil {
return 0, err
}
} else {
klog.V(2).InfoS("Found the first binding that is updating but the cluster status has not been updated", "cluster", clusterStatus.ClusterName, "stage", updatingStageStatus.StageName, "clusterStagedUpdateRun", updateRunRef)
if binding.Spec.State != placementv1beta1.BindingStateBound {
Expand All @@ -124,6 +127,14 @@ func (r *Reconciler) executeUpdatingStage(
return 0, controller.NewUpdateIgnoreConflictError(err)
}
klog.V(2).InfoS("Updated the status of a binding to bound", "binding", klog.KObj(binding), "cluster", clusterStatus.ClusterName, "stage", updatingStageStatus.StageName, "clusterStagedUpdateRun", updateRunRef)
if err := r.updateBindingRolloutStarted(ctx, binding, updateRun); err != nil {
return 0, err
}
} else if !condition.IsConditionStatusTrue(meta.FindStatusCondition(binding.Status.Conditions, string(placementv1beta1.ResourceBindingRolloutStarted)), binding.Generation) {
klog.V(2).InfoS("The binding is bound and up-to-date but the generation is updated by the scheduler, update rolloutStarted status again", "binding", klog.KObj(binding), "cluster", clusterStatus.ClusterName, "stage", updatingStageStatus.StageName, "clusterStagedUpdateRun", updateRunRef)
if err := r.updateBindingRolloutStarted(ctx, binding, updateRun); err != nil {
return 0, err
}
} else {
if _, updateErr := checkClusterUpdateResult(binding, clusterStatus, updatingStageStatus, updateRun); updateErr != nil {
return clusterUpdatingWaitTime, updateErr
Expand All @@ -139,8 +150,10 @@ func (r *Reconciler) executeUpdatingStage(
}

// Now the cluster has to be updating, the binding should point to the right resource snapshot and the binding should be bound.
if !isBindingSyncedWithClusterStatus(updateRun, binding, clusterStatus) || binding.Spec.State != placementv1beta1.BindingStateBound {
unexpectedErr := controller.NewUnexpectedBehaviorError(fmt.Errorf("the updating cluster `%s` in the stage %s does not match the cluster status: %+v, binding: %+v", clusterStatus.ClusterName, updatingStageStatus.StageName, clusterStatus, binding.Spec))
if !isBindingSyncedWithClusterStatus(updateRun, binding, clusterStatus) || binding.Spec.State != placementv1beta1.BindingStateBound ||
!condition.IsConditionStatusTrue(meta.FindStatusCondition(binding.Status.Conditions, string(placementv1beta1.ResourceBindingRolloutStarted)), binding.Generation) {
unexpectedErr := controller.NewUnexpectedBehaviorError(fmt.Errorf("the updating cluster `%s` in the stage %s does not match the cluster status: %+v, binding: %+v, condition: %+v",
clusterStatus.ClusterName, updatingStageStatus.StageName, clusterStatus, binding.Spec, binding.GetCondition(string(placementv1beta1.ResourceBindingRolloutStarted))))
klog.ErrorS(unexpectedErr, "The binding has been changed during updating, please check if there's concurrent clusterStagedUpdateRun", "clusterStagedUpdateRun", updateRunRef)
markClusterUpdatingFailed(clusterStatus, updateRun.Generation, unexpectedErr.Error())
return 0, fmt.Errorf("%w: %s", errStagedUpdatedAborted, unexpectedErr.Error())
Expand Down Expand Up @@ -314,6 +327,26 @@ func (r *Reconciler) checkAfterStageTasksStatus(ctx context.Context, updatingSta
return true, nil
}

// updateBindingRolloutStarted updates the binding status to indicate the rollout has started.
func (r *Reconciler) updateBindingRolloutStarted(ctx context.Context, binding *placementv1beta1.ClusterResourceBinding, updateRun *placementv1alpha1.ClusterStagedUpdateRun) error {
// first reset the condition to reflect the latest lastTransitionTime
binding.RemoveCondition(string(placementv1beta1.ResourceBindingRolloutStarted))
cond := metav1.Condition{
Type: string(placementv1beta1.ResourceBindingRolloutStarted),
Status: metav1.ConditionTrue,
ObservedGeneration: binding.Generation,
Reason: condition.RolloutStartedReason,
Message: fmt.Sprintf("Detected the new changes on the resources and started the rollout process, resourceSnapshotName: %s, clusterStagedUpdateRun: %s", updateRun.Spec.ResourceSnapshotIndex, updateRun.Name),
}
binding.SetConditions(cond)
jwtty marked this conversation as resolved.
Show resolved Hide resolved
if err := r.Client.Status().Update(ctx, binding); err != nil {
klog.ErrorS(err, "Failed to update binding status", "clusterResourceBinding", klog.KObj(binding), "condition", cond)
return controller.NewUpdateIgnoreConflictError(err)
}
klog.V(2).InfoS("Updated binding as rolloutStarted", "clusterResourceBinding", klog.KObj(binding), "condition", cond)
return nil
}

// isBindingSyncedWithClusterStatus checks if the binding is up-to-date with the cluster status.
func isBindingSyncedWithClusterStatus(updateRun *placementv1alpha1.ClusterStagedUpdateRun, binding *placementv1beta1.ClusterResourceBinding, cluster *placementv1alpha1.ClusterUpdatingStatus) bool {
if binding.Spec.ResourceSnapshotName != updateRun.Spec.ResourceSnapshotIndex {
Expand All @@ -335,8 +368,8 @@ func isBindingSyncedWithClusterStatus(updateRun *placementv1alpha1.ClusterStaged
return true
}

// checkClusterUpdateResult checks if the cluster has been updated successfully.
// It returns if the cluster has been updated successfully and the error if the cluster update failed.
// checkClusterUpdateResult checks if the resources have been updated successfully on a given cluster.
// It returns true if the resources have been updated successfully or any error if the update failed.
func checkClusterUpdateResult(
binding *placementv1beta1.ClusterResourceBinding,
clusterStatus *placementv1alpha1.ClusterUpdatingStatus,
Expand Down
11 changes: 8 additions & 3 deletions pkg/controllers/updaterun/execution_integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,14 +64,14 @@ var _ = Describe("UpdateRun execution tests", func() {
}
// reserse the order of the clusters by index
targetClusters[i] = generateTestMemberCluster(numTargetClusters-1-i, "cluster-"+strconv.Itoa(i), map[string]string{"group": "prod", "region": region})
resourceBindings[i] = generateTestClusterResourceBinding(policySnapshot.Name, targetClusters[i].Name)
resourceBindings[i] = generateTestClusterResourceBinding(policySnapshot.Name, targetClusters[i].Name, placementv1beta1.BindingStateScheduled)
}

unscheduledCluster = make([]*clusterv1beta1.MemberCluster, numUnscheduledClusters)
for i := range unscheduledCluster {
unscheduledCluster[i] = generateTestMemberCluster(i, "unscheduled-cluster-"+strconv.Itoa(i), map[string]string{"group": "staging"})
// update the policySnapshot name so that these clusters are considered to-be-deleted
resourceBindings[numTargetClusters+i] = generateTestClusterResourceBinding(policySnapshot.Name+"a", unscheduledCluster[i].Name)
resourceBindings[numTargetClusters+i] = generateTestClusterResourceBinding(policySnapshot.Name+"a", unscheduledCluster[i].Name, placementv1beta1.BindingStateUnscheduled)
}

var err error
Expand Down Expand Up @@ -215,7 +215,7 @@ var _ = Describe("UpdateRun execution tests", func() {
validateClusterStagedUpdateRunStatus(ctx, updateRun, wantStatus, "")
})

It("Should mark the 3rd cluster in the 1st stage as succeeded", func() {
It("Should mark the 3rd cluster in the 1st stage as succeeded after marking the binding available", func() {
By("Validating the 3rd clusterResourceBinding is updated to Bound")
binding := resourceBindings[numTargetClusters-5] // cluster-5
validateBindingState(ctx, binding, resourceSnapshot.Name, updateRun, 0)
Expand Down Expand Up @@ -494,6 +494,11 @@ func validateBindingState(ctx context.Context, binding *placementv1beta1.Cluster
if diff := cmp.Diff(binding.Spec.ApplyStrategy, updateRun.Status.ApplyStrategy); diff != "" {
return fmt.Errorf("binding %s has different applyStrategy (-want +got):\n%s", binding.Name, diff)
}

rolloutStartedCond := binding.GetCondition(string(placementv1beta1.ResourceBindingRolloutStarted))
if !condition.IsConditionStatusTrue(rolloutStartedCond, binding.Generation) {
return fmt.Errorf("binding %s does not have RolloutStarted condition", binding.Name)
}
return nil
}, timeout, interval).Should(Succeed(), "failed to validate the binding state")
}
6 changes: 6 additions & 0 deletions pkg/controllers/updaterun/initialization.go
Original file line number Diff line number Diff line change
Expand Up @@ -183,6 +183,12 @@ func (r *Reconciler) collectScheduledClusters(
klog.V(2).InfoS("Found a scheduled binding", "binding", binding.Name, "clusterResourcePlacement", placementName, "latestPolicySnapshot", latestPolicySnapshot.Name, "clusterStagedUpdateRun", updateRunRef)
selectedBindings = append(selectedBindings, &bindingList.Items[i])
} else {
if binding.Spec.State != placementv1beta1.BindingStateUnscheduled {
stateErr := controller.NewUnexpectedBehaviorError(fmt.Errorf("binding `%s` with old policy snapshot %s has state %s, not unscheduled", binding.Name, binding.Spec.SchedulingPolicySnapshotName, binding.Spec.State))
klog.ErrorS(stateErr, "Failed to collect clusterResourceBindings", "clusterResourcePlacement", placementName, "latestPolicySnapshot", latestPolicySnapshot.Name, "clusterStagedUpdateRun", updateRunRef)
// no more retries here.
return nil, nil, fmt.Errorf("%w: %s", errInitializedFailed, stateErr.Error())
}
klog.V(2).InfoS("Found a to-be-deleted binding", "binding", binding.Name, "clusterResourcePlacement", placementName, "latestPolicySnapshot", latestPolicySnapshot.Name, "clusterStagedUpdateRun", updateRunRef)
toBeDeletedBindings = append(toBeDeletedBindings, &bindingList.Items[i])
}
Expand Down
Loading
Loading