Skip to content

Commit

Permalink
handle resource snapshot missing but work already synced and add cro/…
Browse files Browse the repository at this point in the history
…ro annotation
  • Loading branch information
Ryan Zhang committed Oct 24, 2024
1 parent 6d5ca02 commit 176f75d
Show file tree
Hide file tree
Showing 3 changed files with 67 additions and 11 deletions.
6 changes: 6 additions & 0 deletions apis/placement/v1beta1/commons.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,12 @@ const (
// The format is {workPrefix}-configMap-uuid
WorkNameWithConfigEnvelopeFmt = "%s-configmap-%s"

// ParentClusterResourceOverrideSnapshotHashAnnotation is the label to work that contains the hash of the parent cluster resource override snapshot list.
ParentClusterResourceOverrideSnapshotHashAnnotation = fleetPrefix + "parent-cluster-resource-override-snapshot-hash"

// ParentResourceOverrideSnapshotHashAnnotation is the label to work that contains the hash of the parent resource override snapshot list.
ParentResourceOverrideSnapshotHashAnnotation = fleetPrefix + "parent-resource-override-snapshot-hash"

// ParentResourceSnapshotIndexLabel is the label applied to work that contains the index of the resource snapshot that generates the work.
ParentResourceSnapshotIndexLabel = fleetPrefix + "parent-resource-snapshot-index"

Expand Down
1 change: 1 addition & 0 deletions pkg/controllers/rollout/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -281,6 +281,7 @@ func createUpdateInfo(binding *fleetv1beta1.ClusterResourceBinding, crp *fleetv1
desiredBinding.Spec.ResourceSnapshotName = latestResourceSnapshot.Name
// update the resource apply strategy when controller rolls out the new changes
desiredBinding.Spec.ApplyStrategy = crp.Spec.Strategy.ApplyStrategy
// TODO: check the size of the cro and ro to not exceed the limit
desiredBinding.Spec.ClusterResourceOverrideSnapshots = cro
desiredBinding.Spec.ResourceOverrideSnapshots = ro
return toBeUpdatedBinding{
Expand Down
71 changes: 60 additions & 11 deletions pkg/controllers/workgenerator/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,13 +49,14 @@ import (
"go.goms.io/fleet/pkg/utils/controller"
"go.goms.io/fleet/pkg/utils/informer"
"go.goms.io/fleet/pkg/utils/labels"
"go.goms.io/fleet/pkg/utils/resource"
)

var (
// maxFailedResourcePlacementLimit indicates the max number of failed resource placements to include in the status.
maxFailedResourcePlacementLimit = 100

errResourceSnapshotNotFound = errors.New("the master resource snapshot is not found")
errResourceSnapshotNotFound = fmt.Errorf("the master resource snapshot is not found")
)

// Reconciler watches binding objects and generate work objects in the designated cluster namespace
Expand Down Expand Up @@ -375,10 +376,28 @@ func (r *Reconciler) listAllWorksAssociated(ctx context.Context, resourceBinding
func (r *Reconciler) syncAllWork(ctx context.Context, resourceBinding *fleetv1beta1.ClusterResourceBinding, existingWorks map[string]*fleetv1beta1.Work, cluster clusterv1beta1.MemberCluster) (bool, bool, error) {
updateAny := atomic.NewBool(false)
resourceBindingRef := klog.KObj(resourceBinding)
// the hash256 function can can handle empty list https://go.dev/play/p/_4HW17fooXM
resourceOverrideSnapshotHash, err := resource.HashOf(resourceBinding.Spec.ResourceOverrideSnapshots)
if err != nil {
return false, false, controller.NewUnexpectedBehaviorError(err)
}
clusterResourceOverrideSnapshotHash, err := resource.HashOf(resourceBinding.Spec.ClusterResourceOverrideSnapshots)
if err != nil {
return false, false, controller.NewUnexpectedBehaviorError(err)
}
// TODO: check all work synced first before fetching the snapshots after we put ParentResourceOverrideSnapshotHashAnnotation and ParentClusterResourceOverrideSnapshotHashAnnotation in all the work objects

// Gather all the resource resourceSnapshots
resourceSnapshots, err := r.fetchAllResourceSnapshots(ctx, resourceBinding)
if err != nil {
if errors.Is(err, errResourceSnapshotNotFound) {
// the resourceIndex is deleted but the works might still be up to date with the binding.
if areAllWorkSynced(existingWorks, resourceBinding.Spec.ResourceSnapshotName, resourceOverrideSnapshotHash, clusterResourceOverrideSnapshotHash) {
klog.V(2).InfoS("All the works are synced with the resourceBinding even if the resource snapshot index is removed", "resourceBinding", resourceBindingRef)
return false, false, nil
}
return false, false, controller.NewUserError(err)
}
// TODO(RZ): handle errResourceNotFullyCreated error so we don't need to wait for all the snapshots to be created
return false, false, err
}
Expand Down Expand Up @@ -422,7 +441,7 @@ func (r *Reconciler) syncAllWork(ctx context.Context, resourceBinding *fleetv1be
if uResource.GetObjectKind().GroupVersionKind() == utils.ConfigMapGVK &&
len(uResource.GetAnnotations()[fleetv1beta1.EnvelopeConfigMapAnnotation]) != 0 {
// get a work object for the enveloped configMap
work, err := r.getConfigMapEnvelopWorkObj(ctx, workNamePrefix, resourceBinding, snapshot, &uResource)
work, err := r.getConfigMapEnvelopWorkObj(ctx, workNamePrefix, resourceBinding, snapshot, &uResource, resourceOverrideSnapshotHash, clusterResourceOverrideSnapshotHash)
if err != nil {
return true, false, err
}
Expand All @@ -438,7 +457,7 @@ func (r *Reconciler) syncAllWork(ctx context.Context, resourceBinding *fleetv1be
// generate a work object for the manifests even if there is nothing to place
// to allow CRP to collect the status of the placement
// TODO (RZ): revisit to see if we need this hack
work := generateSnapshotWorkObj(workNamePrefix, resourceBinding, snapshot, simpleManifests)
work := generateSnapshotWorkObj(workNamePrefix, resourceBinding, snapshot, simpleManifests, resourceOverrideSnapshotHash, clusterResourceOverrideSnapshotHash)
activeWork[work.Name] = work
newWork = append(newWork, work)

Expand Down Expand Up @@ -485,6 +504,17 @@ func (r *Reconciler) syncAllWork(ctx context.Context, resourceBinding *fleetv1be
return true, updateAny.Load(), nil
}

// areAllWorkSynced checks if all the works are synced with the resource binding.
func areAllWorkSynced(existingWorks map[string]*fleetv1beta1.Work, resourceSnapshotIndex, _, _ string) bool {
// TODO: check resourceOverrideSnapshotHash and clusterResourceOverrideSnapshotHash after all the work has the ParentResourceOverrideSnapshotHashAnnotation and ParentClusterResourceOverrideSnapshotHashAnnotation
for _, work := range existingWorks {
if work.GetLabels()[fleetv1beta1.ParentResourceSnapshotIndexLabel] != resourceSnapshotIndex {
return false
}
}
return true
}

// fetchAllResourceSnapshots gathers all the resource snapshots for the resource binding.
func (r *Reconciler) fetchAllResourceSnapshots(ctx context.Context, resourceBinding *fleetv1beta1.ClusterResourceBinding) (map[string]*fleetv1beta1.ClusterResourceSnapshot, error) {
// fetch the master snapshot first
Expand All @@ -504,7 +534,7 @@ func (r *Reconciler) fetchAllResourceSnapshots(ctx context.Context, resourceBind
// getConfigMapEnvelopWorkObj first try to locate a work object for the corresponding envelopObj of type configMap.
// we create a new one if the work object doesn't exist. We do this to avoid repeatedly delete and create the same work object.
func (r *Reconciler) getConfigMapEnvelopWorkObj(ctx context.Context, workNamePrefix string, resourceBinding *fleetv1beta1.ClusterResourceBinding,
resourceSnapshot *fleetv1beta1.ClusterResourceSnapshot, envelopeObj *unstructured.Unstructured) (*fleetv1beta1.Work, error) {
resourceSnapshot *fleetv1beta1.ClusterResourceSnapshot, envelopeObj *unstructured.Unstructured, resourceOverrideSnapshotHash, clusterResourceOverrideSnapshotHash string) (*fleetv1beta1.Work, error) {
// we group all the resources in one configMap to one work
manifest, err := extractResFromConfigMap(envelopeObj)
if err != nil {
Expand All @@ -514,6 +544,7 @@ func (r *Reconciler) getConfigMapEnvelopWorkObj(ctx context.Context, workNamePre
}
klog.V(2).InfoS("Successfully extract the enveloped resources from the configMap", "numOfResources", len(manifest),
"snapshot", klog.KObj(resourceSnapshot), "resourceBinding", klog.KObj(resourceBinding), "configMapWrapper", klog.KObj(envelopeObj))

// Try to see if we already have a work represent the same enveloped object for this CRP in the same cluster
// The ParentResourceSnapshotIndexLabel can change between snapshots so we have to exclude that label in the match
envelopWorkLabelMatcher := client.MatchingLabels{
Expand Down Expand Up @@ -544,6 +575,10 @@ func (r *Reconciler) getConfigMapEnvelopWorkObj(ctx context.Context, workNamePre
fleetv1beta1.EnvelopeNameLabel: envelopeObj.GetName(),
fleetv1beta1.EnvelopeNamespaceLabel: envelopeObj.GetNamespace(),
},
Annotations: map[string]string{
fleetv1beta1.ParentResourceOverrideSnapshotHashAnnotation: resourceOverrideSnapshotHash,
fleetv1beta1.ParentClusterResourceOverrideSnapshotHashAnnotation: clusterResourceOverrideSnapshotHash,
},
OwnerReferences: []metav1.OwnerReference{
{
APIVersion: fleetv1beta1.GroupVersion.String(),
Expand All @@ -567,16 +602,18 @@ func (r *Reconciler) getConfigMapEnvelopWorkObj(ctx context.Context, workNamePre
klog.ErrorS(controller.NewUnexpectedBehaviorError(fmt.Errorf("find %d work representing configMap", len(workList.Items))),
"snapshot", klog.KObj(resourceSnapshot), "resourceBinding", klog.KObj(resourceBinding), "configMapWrapper", klog.KObj(envelopeObj))
}
// we just pick the first one if there are more than one.
work := workList.Items[0]
work.Labels[fleetv1beta1.ParentResourceSnapshotIndexLabel] = resourceSnapshot.Labels[fleetv1beta1.ResourceIndexLabel]
work.Annotations[fleetv1beta1.ParentResourceOverrideSnapshotHashAnnotation] = resourceOverrideSnapshotHash
work.Annotations[fleetv1beta1.ParentClusterResourceOverrideSnapshotHashAnnotation] = clusterResourceOverrideSnapshotHash
work.Spec.Workload.Manifests = manifest
work.Spec.ApplyStrategy = resourceBinding.Spec.ApplyStrategy
return &work, nil
}

// generateSnapshotWorkObj generates the work object for the corresponding snapshot
func generateSnapshotWorkObj(workName string, resourceBinding *fleetv1beta1.ClusterResourceBinding, resourceSnapshot *fleetv1beta1.ClusterResourceSnapshot, manifest []fleetv1beta1.Manifest) *fleetv1beta1.Work {
func generateSnapshotWorkObj(workName string, resourceBinding *fleetv1beta1.ClusterResourceBinding, resourceSnapshot *fleetv1beta1.ClusterResourceSnapshot,
manifest []fleetv1beta1.Manifest, resourceOverrideSnapshotHash, clusterResourceOverrideSnapshotHash string) *fleetv1beta1.Work {
return &fleetv1beta1.Work{
ObjectMeta: metav1.ObjectMeta{
Name: workName,
Expand All @@ -586,6 +623,10 @@ func generateSnapshotWorkObj(workName string, resourceBinding *fleetv1beta1.Clus
fleetv1beta1.CRPTrackingLabel: resourceBinding.Labels[fleetv1beta1.CRPTrackingLabel],
fleetv1beta1.ParentResourceSnapshotIndexLabel: resourceSnapshot.Labels[fleetv1beta1.ResourceIndexLabel],
},
Annotations: map[string]string{
fleetv1beta1.ParentResourceOverrideSnapshotHashAnnotation: resourceOverrideSnapshotHash,
fleetv1beta1.ParentClusterResourceOverrideSnapshotHashAnnotation: clusterResourceOverrideSnapshotHash,
},
OwnerReferences: []metav1.OwnerReference{
{
APIVersion: fleetv1beta1.GroupVersion.String(),
Expand Down Expand Up @@ -619,6 +660,7 @@ func (r *Reconciler) upsertWork(ctx context.Context, newWork, existingWork *flee
"resourceSnapshot", resourceSnapshotObj, "work", workObj)
return true, nil
}
// TODO: remove the compare after we did the check on all work in the sync all
// check if we need to update the existing work object
workResourceIndex, err := labels.ExtractResourceSnapshotIndexFromWork(existingWork)
if err != nil {
Expand All @@ -628,12 +670,19 @@ func (r *Reconciler) upsertWork(ctx context.Context, newWork, existingWork *flee
// we already checked the label in fetchAllResourceSnapShots function so no need to check again
resourceIndex, _ := labels.ExtractResourceIndexFromClusterResourceSnapshot(resourceSnapshot)
if workResourceIndex == resourceIndex {
// no need to do anything if the work is generated from the same resource snapshot group since the resource snapshot is immutable.
klog.V(2).InfoS("Work is already associated with the desired resourceSnapshot", "resourceIndex", resourceIndex, "work", workObj, "resourceSnapshot", resourceSnapshotObj)
return false, nil
// no need to do anything if the work is generated from the same snapshots
if existingWork.Annotations[fleetv1beta1.ParentResourceOverrideSnapshotHashAnnotation] == newWork.Annotations[fleetv1beta1.ParentResourceOverrideSnapshotHashAnnotation] &&
existingWork.Annotations[fleetv1beta1.ParentClusterResourceOverrideSnapshotHashAnnotation] == newWork.Annotations[fleetv1beta1.ParentClusterResourceOverrideSnapshotHashAnnotation] {
klog.V(2).InfoS("Work is not associated with the desired override snapshots", "existingROHash", existingWork.Annotations[fleetv1beta1.ParentResourceOverrideSnapshotHashAnnotation],
"existingCROHash", existingWork.Annotations[fleetv1beta1.ParentClusterResourceOverrideSnapshotHashAnnotation], "work", workObj)
return false, nil
}
klog.V(2).InfoS("Work is already associated with the desired resourceSnapshot but still not having the right override snapshots", "resourceIndex", resourceIndex, "work", workObj, "resourceSnapshot", resourceSnapshotObj)
}
// need to update the existing work, only two possible changes:
existingWork.Labels[fleetv1beta1.ParentResourceSnapshotIndexLabel] = resourceSnapshot.Labels[fleetv1beta1.ResourceIndexLabel]
// need to copy the new work to the existing work, only 4 possible changes:
existingWork.Labels[fleetv1beta1.ParentResourceSnapshotIndexLabel] = newWork.Labels[fleetv1beta1.ParentResourceSnapshotIndexLabel]
existingWork.Annotations[fleetv1beta1.ParentResourceOverrideSnapshotHashAnnotation] = newWork.Annotations[fleetv1beta1.ParentResourceOverrideSnapshotHashAnnotation]
existingWork.Annotations[fleetv1beta1.ParentClusterResourceOverrideSnapshotHashAnnotation] = newWork.Annotations[fleetv1beta1.ParentClusterResourceOverrideSnapshotHashAnnotation]
existingWork.Spec.Workload.Manifests = newWork.Spec.Workload.Manifests
if err := r.Client.Update(ctx, existingWork); err != nil {
klog.ErrorS(err, "Failed to update the work associated with the resourceSnapshot", "resourceSnapshot", resourceSnapshotObj, "work", workObj)
Expand Down

0 comments on commit 176f75d

Please sign in to comment.