diff --git a/pkg/cloudevents/generic/agentclient.go b/pkg/cloudevents/generic/agentclient.go index 0c3a3d6e..d85c8588 100644 --- a/pkg/cloudevents/generic/agentclient.go +++ b/pkg/cloudevents/generic/agentclient.go @@ -151,8 +151,6 @@ func (c *CloudEventAgentClient[T]) Subscribe(ctx context.Context, handlers ...Re } func (c *CloudEventAgentClient[T]) receive(ctx context.Context, evt cloudevents.Event, handlers ...ResourceHandler[T]) { - klog.V(4).Infof("Received event:\n%s", evt) - eventType, err := types.ParseCloudEventsType(evt.Type()) if err != nil { klog.Errorf("failed to parse cloud event type %s, %v", evt.Type(), err) diff --git a/pkg/cloudevents/generic/sourceclient.go b/pkg/cloudevents/generic/sourceclient.go index c576f737..1cb9725a 100644 --- a/pkg/cloudevents/generic/sourceclient.go +++ b/pkg/cloudevents/generic/sourceclient.go @@ -145,8 +145,6 @@ func (c *CloudEventSourceClient[T]) Subscribe(ctx context.Context, handlers ...R } func (c *CloudEventSourceClient[T]) receive(ctx context.Context, evt cloudevents.Event, handlers ...ResourceHandler[T]) { - klog.V(4).Infof("Received event:\n%s", evt) - eventType, err := types.ParseCloudEventsType(evt.Type()) if err != nil { klog.Errorf("failed to parse cloud event type, %v", err) diff --git a/pkg/cloudevents/work/agent/client/manifestwork.go b/pkg/cloudevents/work/agent/client/manifestwork.go index 8890937c..17e7e395 100644 --- a/pkg/cloudevents/work/agent/client/manifestwork.go +++ b/pkg/cloudevents/work/agent/client/manifestwork.go @@ -4,6 +4,7 @@ import ( "context" "fmt" "net/http" + "strconv" "k8s.io/apimachinery/pkg/api/errors" "k8s.io/apimachinery/pkg/api/meta" @@ -136,13 +137,39 @@ func (c *ManifestWorkAgentClient) Patch(ctx context.Context, name string, pt kub if statusUpdated { eventType.Action = common.UpdateRequestAction + // publish the status update event to source, source will check the resource version + // and reject the update if it's status update is outdated. if err := c.cloudEventsClient.Publish(ctx, eventType, newWork); err != nil { return nil, workerrors.NewPublishError(common.ManifestWorkGR, name, err) } + // Fetch the latest work from the store and verify the resource version to avoid updating the store + // with outdated work. Return a conflict error if the resource version is outdated. + // Due to the lack of read-modify-write guarantees in the store, race conditions may occur between + // this update operation and one from the agent informer after receiving the event from the source. + latestWork, exists, err := c.watcherStore.Get(c.namespace, name) + if err != nil { + return nil, errors.NewInternalError(err) + } + if !exists { + return nil, errors.NewNotFound(common.ManifestWorkGR, name) + } + lastResourceVersion, err := strconv.ParseInt(latestWork.GetResourceVersion(), 10, 64) + if err != nil { + return nil, errors.NewInternalError(err) + } + newResourceVersion, err := strconv.ParseInt(newWork.GetResourceVersion(), 10, 64) + if err != nil { + return nil, errors.NewInternalError(err) + } + // ensure the resource version of the work is not outdated + if newResourceVersion < lastResourceVersion { + // It's safe to return a conflict error here, even if the status update event + // has already been sent. The source may reject the update due to an outdated resource version. + return nil, errors.NewConflict(common.ManifestWorkGR, name, fmt.Errorf("the resource version of the work is outdated")) + } if err := c.watcherStore.Update(newWork); err != nil { return nil, errors.NewInternalError(err) - } return newWork, nil } diff --git a/pkg/cloudevents/work/store/informer.go b/pkg/cloudevents/work/store/informer.go index a087bbe2..cc307ac7 100644 --- a/pkg/cloudevents/work/store/informer.go +++ b/pkg/cloudevents/work/store/informer.go @@ -8,6 +8,7 @@ import ( "k8s.io/apimachinery/pkg/watch" "k8s.io/client-go/tools/cache" "k8s.io/client-go/util/workqueue" + "k8s.io/klog/v2" workv1 "open-cluster-management.io/api/work/v1" "open-cluster-management.io/sdk-go/pkg/cloudevents/generic/types" @@ -118,6 +119,11 @@ func (s *AgentInformerWatcherStore) HandleReceivedWork(action types.ResourceActi if !exists { return fmt.Errorf("the work %s/%s does not exist", work.Namespace, work.Name) } + // prevent the work from being updated if it is deleting + if !lastWork.GetDeletionTimestamp().IsZero() { + klog.Warningf("the work %s/%s is deleting, ignore the update", work.Namespace, work.Name) + return nil + } updatedWork := work.DeepCopy()