Skip to content

Commit

Permalink
Minor fixes
Browse files Browse the repository at this point in the history
  • Loading branch information
michaelawyu committed Jan 7, 2025
1 parent 08b66ee commit 2c9a14e
Show file tree
Hide file tree
Showing 10 changed files with 338 additions and 255 deletions.
71 changes: 50 additions & 21 deletions pkg/controllers/workapplier/apply.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import (
"sigs.k8s.io/controller-runtime/pkg/client"

fleetv1beta1 "go.goms.io/fleet/apis/placement/v1beta1"
"go.goms.io/fleet/pkg/utils/controller"
"go.goms.io/fleet/pkg/utils/resource"
)

Expand All @@ -35,7 +36,7 @@ func init() {
_ = clientgoscheme.AddToScheme(builtInScheme)
}

// applyInDryRunMode
// applyInDryRunMode dry-runs an apply op.
func (r *Reconciler) applyInDryRunMode(
ctx context.Context,
gvr *schema.GroupVersionResource,
Expand Down Expand Up @@ -74,8 +75,8 @@ func (r *Reconciler) apply(

// Compute the hash of the manifest object.
//
// Originally the manifest hash is kept only if three-way merge patch (client side apply esque
// strategy) is used; with the new drift detection and takeover capabilities, the manifest hash
// Originally the manifest hash is kept only if three-way merge patch (client side apply)
// is used; with the new drift detection and takeover capabilities, the manifest hash
// will always be kept regardless of the apply strategy in use, as it is needed for
// drift detection purposes.
//
Expand Down Expand Up @@ -141,12 +142,12 @@ func (r *Reconciler) apply(
switch {
case applyStrategy.Type == fleetv1beta1.ApplyStrategyTypeClientSideApply && isLastAppliedAnnotationSet:
// The apply strategy dictates that three-way merge patch
// (client-side apply esque patch) should be used, and the last applied annotation
// (client-side apply) should be used, and the last applied annotation
// has been set.
return r.threeWayMergePatch(ctx, gvr, manifestObjCopy, inMemberClusterObj, isOptimisticLockEnabled, false)
case applyStrategy.Type == fleetv1beta1.ApplyStrategyTypeClientSideApply:
// The apply strategy dictates that three-way merge patch
// (client-side apply esque patch) should be used, but the last applied annotation
// (client-side apply) should be used, but the last applied annotation
// cannot be set. Fleet will fall back to server-side apply.
return r.serverSideApply(
ctx,
Expand All @@ -161,8 +162,11 @@ func (r *Reconciler) apply(
applyStrategy.ServerSideApplyConfig.ForceConflicts, isOptimisticLockEnabled, false,
)
default:
// An unexpected apply strategy has been set.
return nil, fmt.Errorf("unexpected apply strategy %s is found", applyStrategy.Type)
// An unexpected apply strategy has been set. Normally this will never run as the built-in
// validation would block invalid values.
wrappedErr := fmt.Errorf("unexpected apply strategy %s is found", applyStrategy.Type)
_ = controller.NewUnexpectedBehaviorError(wrappedErr)
return nil, wrappedErr
}
}

Expand All @@ -177,6 +181,7 @@ func (r *Reconciler) createManifestObject(
}
createdObj, err := r.spokeDynamicClient.Resource(*gvr).Namespace(manifestObject.GetNamespace()).Create(ctx, manifestObject, createOpts)
if err != nil {
_ = controller.NewAPIServerError(false, err)
return nil, fmt.Errorf("failed to create manifest object: %w", err)
}
return createdObj, nil
Expand Down Expand Up @@ -212,7 +217,9 @@ func (r *Reconciler) threeWayMergePatch(
data, err := patch.Data(manifestObj)
if err != nil {
// Fleet uses raw patch; this branch should never run.
return nil, fmt.Errorf("failed to get patch data: %w", err)
wrappedErr := fmt.Errorf("failed to get patch data: %w", err)
_ = controller.NewUnexpectedBehaviorError(wrappedErr)
return nil, wrappedErr
}

// Use three-way merge (similar to kubectl client side apply) to patch the object in the
Expand All @@ -233,6 +240,7 @@ func (r *Reconciler) threeWayMergePatch(
Resource(*gvr).Namespace(manifestObj.GetNamespace()).
Patch(ctx, manifestObj.GetName(), patch.Type(), data, patchOpts)
if err != nil {
_ = controller.NewAPIServerError(false, err)
return nil, fmt.Errorf("failed to patch the manifest object: %w", err)
}
return patchedObj, nil
Expand Down Expand Up @@ -274,6 +282,7 @@ func (r *Reconciler) serverSideApply(
Resource(*gvr).Namespace(manifestObj.GetNamespace()).
Apply(ctx, manifestObj.GetName(), manifestObj, applyOpts)
if err != nil {
_ = controller.NewAPIServerError(false, err)
return nil, fmt.Errorf("failed to apply the manifest object: %w", err)
}
return appliedObj, nil
Expand Down Expand Up @@ -314,7 +323,9 @@ func buildThreeWayMergePatch(manifestObj, liveObj *unstructured.Unstructured) (c
patchData, err = jsonmergepatch.CreateThreeWayJSONMergePatch(
lastAppliedObjJSONBytes, manifestObjJSONBytes, liveObjJSONBytes, preconditions...)
if err != nil {
return nil, err
wrappedErr := fmt.Errorf("failed to create three-way JSON merge patch: %w", err)
_ = controller.NewUnexpectedBehaviorError(wrappedErr)
return nil, wrappedErr
}
case err != nil:
return nil, err
Expand All @@ -323,11 +334,15 @@ func buildThreeWayMergePatch(manifestObj, liveObj *unstructured.Unstructured) (c
patchType = types.StrategicMergePatchType
lookupPatchMeta, err = strategicpatch.NewPatchMetaFromStruct(versionedObject)
if err != nil {
return nil, err
wrappedErr := fmt.Errorf("failed to create patch meta from struct (strategic merge patch): %w", err)
_ = controller.NewUnexpectedBehaviorError(wrappedErr)
return nil, wrappedErr
}
patchData, err = strategicpatch.CreateThreeWayMergePatch(lastAppliedObjJSONBytes, manifestObjJSONBytes, liveObjJSONBytes, lookupPatchMeta, true)
if err != nil {
return nil, err
wrappedErr := fmt.Errorf("failed to create three-way strategic merge patch: %w", err)
_ = controller.NewUnexpectedBehaviorError(wrappedErr)
return nil, wrappedErr
}
}
return client.RawPatch(patchType, patchData), nil
Expand All @@ -346,7 +361,9 @@ func setFleetLastAppliedAnnotation(manifestObj *unstructured.Unstructured) (bool

lastAppliedManifestJSONBytes, err := manifestObj.MarshalJSON()
if err != nil {
return false, fmt.Errorf("failed to marshal the manifest object into JSON: %w", err)
wrappedErr := fmt.Errorf("failed to marshal the manifest object into JSON: %w", err)
_ = controller.NewUnexpectedBehaviorError(wrappedErr)
return false, wrappedErr
}
annotations[fleetv1beta1.LastAppliedConfigAnnotation] = string(lastAppliedManifestJSONBytes)
isLastAppliedAnnotationSet := true
Expand Down Expand Up @@ -388,7 +405,9 @@ func setManifestHashAnnotation(manifestObj *unstructured.Unstructured) error {
cleanedManifestObj := discardFieldsIrrelevantInComparisonFrom(manifestObj)
manifestObjHash, err := resource.HashOf(cleanedManifestObj.Object)
if err != nil {
return err
wrappedErr := fmt.Errorf("failed to compute the hash of the manifest object: %w", err)
_ = controller.NewUnexpectedBehaviorError(wrappedErr)
return wrappedErr
}

annotations := manifestObj.GetAnnotations()
Expand All @@ -404,13 +423,13 @@ func setManifestHashAnnotation(manifestObj *unstructured.Unstructured) error {
// on a manifest to be applied.
func setOwnerRef(obj *unstructured.Unstructured, expectedAppliedWorkOwnerRef *metav1.OwnerReference) {
ownerRefs := obj.GetOwnerReferences()
if ownerRefs == nil {
ownerRefs = []metav1.OwnerReference{}
}

// Typically owner references is a system-managed field, and at this moment Fleet will
// clear owner references (if any) set in the manifest object. However, for consistency
// reasons, here Fleet will still assume that there might be some owner references set
// in the manifest object.
//
// TO-DO (chenyu1): evaluate if user-set owner references should be kept.
ownerRefs = append(ownerRefs, *expectedAppliedWorkOwnerRef)
obj.SetOwnerReferences(ownerRefs)
}
Expand All @@ -431,7 +450,9 @@ func validateOwnerReferences(
// perform sanitization on the manifest object before applying it, which removes all owner
// references.
if len(manifestObjOwnerRefs) > 0 && !applyStrategy.AllowCoOwnership {
return fmt.Errorf("manifest is set to have multiple owner references but co-ownership is disallowed")
wrappedErr := fmt.Errorf("manifest is set to have owner references but co-ownership is disallowed")
_ = controller.NewUnexpectedBehaviorError(wrappedErr)
return wrappedErr
}

// Do a sanity check to verify that no AppliedWork object is directly added as an owner
Expand All @@ -440,7 +461,9 @@ func validateOwnerReferences(
// references.
for _, ownerRef := range manifestObjOwnerRefs {
if ownerRef.APIVersion == fleetv1beta1.GroupVersion.String() && ownerRef.Kind == fleetv1beta1.AppliedWorkKind {
return fmt.Errorf("an AppliedWork object is unexpectedly added as an owner in the manifest object")
wrappedErr := fmt.Errorf("an AppliedWork object is unexpectedly added as an owner in the manifest object")
_ = controller.NewUnexpectedBehaviorError(wrappedErr)
return wrappedErr
}
}

Expand All @@ -452,7 +475,9 @@ func validateOwnerReferences(

// If the live object is co-owned but co-ownership is no longer allowed, the validation fails.
if len(inMemberClusterObjOwnerRefs) > 1 && !applyStrategy.AllowCoOwnership {
return fmt.Errorf("object is co-owned by multiple objects but co-ownership has been disallowed")
wrappedErr := fmt.Errorf("object is co-owned by multiple objects but co-ownership has been disallowed")
_ = controller.NewUserError(wrappedErr)
return wrappedErr
}

// Note that at this point of execution, one of the owner references is guaranteed to be the
Expand All @@ -465,15 +490,19 @@ func validateOwnerReferences(
}
}
if !found {
return fmt.Errorf("object is not owned by the expected AppliedWork object")
wrappedErr := fmt.Errorf("object is not owned by the expected AppliedWork object")
_ = controller.NewUnexpectedBehaviorError(wrappedErr)
return wrappedErr
}

// If the object is already owned by another AppliedWork object, the validation fails.
//
// Normally this branch will never get executed as Fleet would refuse to take over an object
// that has been owned by another AppliedWork object.
if isPlacedByFleetInDuplicate(inMemberClusterObjOwnerRefs, expectedAppliedWorkOwnerRef) {
return fmt.Errorf("object is already owned by another AppliedWork object")
wrappedErr := fmt.Errorf("object is already owned by another AppliedWork object")
_ = controller.NewUnexpectedBehaviorError(wrappedErr)
return wrappedErr
}

return nil
Expand Down
20 changes: 15 additions & 5 deletions pkg/controllers/workapplier/availability_tracker.go
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,9 @@ func trackDeploymentAvailability(inMemberClusterObj *unstructured.Unstructured)
var deploy appv1.Deployment
if err := runtime.DefaultUnstructuredConverter.FromUnstructured(inMemberClusterObj.Object, &deploy); err != nil {
// Normally this branch should never run.
return ManifestProcessingAvailabilityResultTypeFailed, controller.NewUnexpectedBehaviorError(fmt.Errorf("failed to convert the unstructured object to a deployment: %w", err))
wrappedErr := fmt.Errorf("failed to convert the unstructured object to a deployment: %w", err)
_ = controller.NewUnexpectedBehaviorError(wrappedErr)
return ManifestProcessingAvailabilityResultTypeFailed, wrappedErr
}

// Check if the deployment is available.
Expand All @@ -119,7 +121,9 @@ func trackStatefulSetAvailability(inMemberClusterObj *unstructured.Unstructured)
var statefulSet appv1.StatefulSet
if err := runtime.DefaultUnstructuredConverter.FromUnstructured(inMemberClusterObj.Object, &statefulSet); err != nil {
// Normally this branch should never run.
return ManifestProcessingAvailabilityResultTypeFailed, controller.NewUnexpectedBehaviorError(fmt.Errorf("failed to convert the unstructured object to a stateful set: %w", err))
wrappedErr := fmt.Errorf("failed to convert the unstructured object to a stateful set: %w", err)
_ = controller.NewUnexpectedBehaviorError(wrappedErr)
return ManifestProcessingAvailabilityResultTypeFailed, wrappedErr
}

// Check if the stateful set is available.
Expand All @@ -145,8 +149,10 @@ func trackStatefulSetAvailability(inMemberClusterObj *unstructured.Unstructured)
func trackDaemonSetAvailability(inMemberClusterObj *unstructured.Unstructured) (ManifestProcessingAvailabilityResultType, error) {
var daemonSet appv1.DaemonSet
if err := runtime.DefaultUnstructuredConverter.FromUnstructured(inMemberClusterObj.Object, &daemonSet); err != nil {
wrappedErr := fmt.Errorf("failed to convert the unstructured object to a daemon set: %w", err)
_ = controller.NewUnexpectedBehaviorError(wrappedErr)
// Normally this branch should never run.
return ManifestProcessingAvailabilityResultTypeFailed, controller.NewUnexpectedBehaviorError(fmt.Errorf("failed to convert the unstructured object to a daemon set: %w", err))
return ManifestProcessingAvailabilityResultTypeFailed, wrappedErr
}

// Check if the daemonSet is available.
Expand All @@ -168,7 +174,9 @@ func trackDaemonSetAvailability(inMemberClusterObj *unstructured.Unstructured) (
func trackServiceAvailability(inMemberClusterObj *unstructured.Unstructured) (ManifestProcessingAvailabilityResultType, error) {
var svc corev1.Service
if err := runtime.DefaultUnstructuredConverter.FromUnstructured(inMemberClusterObj.Object, &svc); err != nil {
return ManifestProcessingAvailabilityResultTypeFailed, controller.NewUnexpectedBehaviorError(fmt.Errorf("failed to convert the unstructured object to a service: %w", err))
wrappedErr := fmt.Errorf("failed to convert the unstructured object to a service: %w", err)
_ = controller.NewUnexpectedBehaviorError(wrappedErr)
return ManifestProcessingAvailabilityResultTypeFailed, wrappedErr
}
switch svc.Spec.Type {
case "":
Expand Down Expand Up @@ -205,7 +213,9 @@ func trackServiceAvailability(inMemberClusterObj *unstructured.Unstructured) (Ma
func trackCRDAvailability(inMemberClusterObj *unstructured.Unstructured) (ManifestProcessingAvailabilityResultType, error) {
var crd apiextensionsv1.CustomResourceDefinition
if err := runtime.DefaultUnstructuredConverter.FromUnstructured(inMemberClusterObj.Object, &crd); err != nil {
return ManifestProcessingAvailabilityResultTypeFailed, controller.NewUnexpectedBehaviorError(fmt.Errorf("failed to convert the unstructured object to a custom resource definition: %w", err))
wrappedErr := fmt.Errorf("failed to convert the unstructured object to a custom resource definition: %w", err)
_ = controller.NewUnexpectedBehaviorError(wrappedErr)
return ManifestProcessingAvailabilityResultTypeFailed, wrappedErr
}

// If both conditions are True, the CRD has become available.
Expand Down
17 changes: 13 additions & 4 deletions pkg/controllers/workapplier/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -338,12 +338,17 @@ func (r *Reconciler) garbageCollectAppliedWork(ctx context.Context, work *fleetv
klog.V(2).InfoS("The appliedWork is already deleted", "appliedWork", work.Name)
case err != nil:
klog.ErrorS(err, "Failed to delete the appliedWork", "appliedWork", work.Name)
return ctrl.Result{}, err
return ctrl.Result{}, controller.NewAPIServerError(false, err)
default:
klog.InfoS("Successfully deleted the appliedWork", "appliedWork", work.Name)
}
controllerutil.RemoveFinalizer(work, fleetv1beta1.WorkFinalizer)
return ctrl.Result{}, r.hubClient.Update(ctx, work, &client.UpdateOptions{})

if err := r.hubClient.Update(ctx, work, &client.UpdateOptions{}); err != nil {
klog.ErrorS(err, "Failed to remove the finalizer from the work", "work", klog.KObj(work))
return ctrl.Result{}, controller.NewAPIServerError(false, err)
}
return ctrl.Result{}, nil
}

// ensureAppliedWork makes sure that an associated appliedWork and a finalizer on the work resource exists on the cluster.
Expand Down Expand Up @@ -377,12 +382,16 @@ func (r *Reconciler) ensureAppliedWork(ctx context.Context, work *fleetv1beta1.W
}
if err := r.spokeClient.Create(ctx, appliedWork); err != nil && !apierrors.IsAlreadyExists(err) {
klog.ErrorS(err, "AppliedWork create failed", "appliedWork", workRef.Name)
return nil, err
return nil, controller.NewAPIServerError(false, err)
}
if !hasFinalizer {
klog.InfoS("Add the finalizer to the work", "work", workRef)
work.Finalizers = append(work.Finalizers, fleetv1beta1.WorkFinalizer)
return appliedWork, r.hubClient.Update(ctx, work, &client.UpdateOptions{})

if err := r.hubClient.Update(ctx, work, &client.UpdateOptions{}); err != nil {
klog.ErrorS(err, "Failed to add the finalizer to the work", "work", workRef)
return nil, controller.NewAPIServerError(false, err)
}
}
klog.InfoS("Recreated the appliedWork resource", "appliedWork", workRef.Name)
return appliedWork, nil
Expand Down
Loading

0 comments on commit 2c9a14e

Please sign in to comment.