Skip to content

Commit

Permalink
🐛 fix race condition between work status update and deletion. (#74)
Browse files Browse the repository at this point in the history
* fix race condition between work status update and deletion.

Signed-off-by: morvencao <[email protected]>

* add comments.

Signed-off-by: morvencao <[email protected]>

---------

Signed-off-by: morvencao <[email protected]>
  • Loading branch information
morvencao authored Aug 29, 2024
1 parent 1afb1ea commit 7bd852f
Show file tree
Hide file tree
Showing 4 changed files with 34 additions and 5 deletions.
2 changes: 0 additions & 2 deletions pkg/cloudevents/generic/agentclient.go
Original file line number Diff line number Diff line change
Expand Up @@ -151,8 +151,6 @@ func (c *CloudEventAgentClient[T]) Subscribe(ctx context.Context, handlers ...Re
}

func (c *CloudEventAgentClient[T]) receive(ctx context.Context, evt cloudevents.Event, handlers ...ResourceHandler[T]) {
klog.V(4).Infof("Received event:\n%s", evt)

eventType, err := types.ParseCloudEventsType(evt.Type())
if err != nil {
klog.Errorf("failed to parse cloud event type %s, %v", evt.Type(), err)
Expand Down
2 changes: 0 additions & 2 deletions pkg/cloudevents/generic/sourceclient.go
Original file line number Diff line number Diff line change
Expand Up @@ -145,8 +145,6 @@ func (c *CloudEventSourceClient[T]) Subscribe(ctx context.Context, handlers ...R
}

func (c *CloudEventSourceClient[T]) receive(ctx context.Context, evt cloudevents.Event, handlers ...ResourceHandler[T]) {
klog.V(4).Infof("Received event:\n%s", evt)

eventType, err := types.ParseCloudEventsType(evt.Type())
if err != nil {
klog.Errorf("failed to parse cloud event type, %v", err)
Expand Down
29 changes: 28 additions & 1 deletion pkg/cloudevents/work/agent/client/manifestwork.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"context"
"fmt"
"net/http"
"strconv"

"k8s.io/apimachinery/pkg/api/errors"
"k8s.io/apimachinery/pkg/api/meta"
Expand Down Expand Up @@ -136,13 +137,39 @@ func (c *ManifestWorkAgentClient) Patch(ctx context.Context, name string, pt kub

if statusUpdated {
eventType.Action = common.UpdateRequestAction
// publish the status update event to source, source will check the resource version
// and reject the update if it's status update is outdated.
if err := c.cloudEventsClient.Publish(ctx, eventType, newWork); err != nil {
return nil, workerrors.NewPublishError(common.ManifestWorkGR, name, err)
}

// Fetch the latest work from the store and verify the resource version to avoid updating the store
// with outdated work. Return a conflict error if the resource version is outdated.
// Due to the lack of read-modify-write guarantees in the store, race conditions may occur between
// this update operation and one from the agent informer after receiving the event from the source.
latestWork, exists, err := c.watcherStore.Get(c.namespace, name)
if err != nil {
return nil, errors.NewInternalError(err)
}
if !exists {
return nil, errors.NewNotFound(common.ManifestWorkGR, name)
}
lastResourceVersion, err := strconv.ParseInt(latestWork.GetResourceVersion(), 10, 64)
if err != nil {
return nil, errors.NewInternalError(err)
}
newResourceVersion, err := strconv.ParseInt(newWork.GetResourceVersion(), 10, 64)
if err != nil {
return nil, errors.NewInternalError(err)
}
// ensure the resource version of the work is not outdated
if newResourceVersion < lastResourceVersion {
// It's safe to return a conflict error here, even if the status update event
// has already been sent. The source may reject the update due to an outdated resource version.
return nil, errors.NewConflict(common.ManifestWorkGR, name, fmt.Errorf("the resource version of the work is outdated"))
}
if err := c.watcherStore.Update(newWork); err != nil {
return nil, errors.NewInternalError(err)

}
return newWork, nil
}
Expand Down
6 changes: 6 additions & 0 deletions pkg/cloudevents/work/store/informer.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (
"k8s.io/apimachinery/pkg/watch"
"k8s.io/client-go/tools/cache"
"k8s.io/client-go/util/workqueue"
"k8s.io/klog/v2"

workv1 "open-cluster-management.io/api/work/v1"
"open-cluster-management.io/sdk-go/pkg/cloudevents/generic/types"
Expand Down Expand Up @@ -118,6 +119,11 @@ func (s *AgentInformerWatcherStore) HandleReceivedWork(action types.ResourceActi
if !exists {
return fmt.Errorf("the work %s/%s does not exist", work.Namespace, work.Name)
}
// prevent the work from being updated if it is deleting
if !lastWork.GetDeletionTimestamp().IsZero() {
klog.Warningf("the work %s/%s is deleting, ignore the update", work.Namespace, work.Name)
return nil
}

updatedWork := work.DeepCopy()

Expand Down

0 comments on commit 7bd852f

Please sign in to comment.