diff --git a/.github/workflows/go-presubmit.yml b/.github/workflows/go-presubmit.yml index 82de5a5f..f339c87d 100644 --- a/.github/workflows/go-presubmit.yml +++ b/.github/workflows/go-presubmit.yml @@ -144,7 +144,7 @@ jobs: go-version: ${{ env.GO_VERSION }} - name: install imagebuilder run: go install github.com/openshift/imagebuilder/cmd/imagebuilder@v1.2.3 - - name: addon-examples-image # will build and tag the examples image + - name: addon-examples-image run: imagebuilder --allow-pull -t quay.io/open-cluster-management/addon-examples:latest -t quay.io/ocm/addon-examples:latest -f ./build/Dockerfile.example . - name: setup kind uses: engineerd/setup-kind@v0.5.0 diff --git a/go.mod b/go.mod index 409a4ea3..68ef0922 100644 --- a/go.mod +++ b/go.mod @@ -24,7 +24,7 @@ require ( k8s.io/klog/v2 v2.120.1 k8s.io/utils v0.0.0-20240310230437-4693a0247e57 open-cluster-management.io/api v0.13.0 - open-cluster-management.io/sdk-go v0.13.1-0.20240321032811-7dbdd1b5c63d + open-cluster-management.io/sdk-go v0.13.1-0.20240415030117-612344aae744 sigs.k8s.io/controller-runtime v0.17.2 ) @@ -42,7 +42,7 @@ require ( github.com/cenkalti/backoff/v4 v4.2.1 // indirect github.com/cespare/xxhash/v2 v2.2.0 // indirect github.com/cloudevents/sdk-go/protocol/mqtt_paho/v2 v2.0.0-20231030012137-0836a524e995 // indirect - github.com/cloudevents/sdk-go/v2 v2.14.0 // indirect + github.com/cloudevents/sdk-go/v2 v2.15.2 // indirect github.com/coreos/go-semver v0.3.1 // indirect github.com/coreos/go-systemd/v22 v22.5.0 // indirect github.com/cyphar/filepath-securejoin v0.2.4 // indirect @@ -135,5 +135,3 @@ require ( sigs.k8s.io/structured-merge-diff/v4 v4.4.1 // indirect sigs.k8s.io/yaml v1.4.0 // indirect ) - -replace open-cluster-management.io/sdk-go => github.com/morvencao/ocm-sdk-go v0.0.0-20240411073141-7692feecb557 diff --git a/go.sum b/go.sum index a0579bb7..74cb2538 100644 --- a/go.sum +++ b/go.sum @@ -30,8 +30,8 @@ github.com/cespare/xxhash/v2 v2.2.0 h1:DC2CZ1Ep5Y4k3ZQ899DldepgrayRUGE6BBZ/cd9Cj github.com/cespare/xxhash/v2 v2.2.0/go.mod h1:VGX0DQ3Q6kWi7AoAeZDth3/j3BFtOZR5XLFGgcrjCOs= github.com/cloudevents/sdk-go/protocol/mqtt_paho/v2 v2.0.0-20231030012137-0836a524e995 h1:pXyRKZ0T5WoB6X9QnHS5cEyW0Got39bNQIECxGUKVO4= github.com/cloudevents/sdk-go/protocol/mqtt_paho/v2 v2.0.0-20231030012137-0836a524e995/go.mod h1:mz9oS2Yhh/S7cvrrsgGMMR+6Shy0ZyL2lDN1sHQO1wE= -github.com/cloudevents/sdk-go/v2 v2.14.0 h1:Nrob4FwVgi5L4tV9lhjzZcjYqFVyJzsA56CwPaPfv6s= -github.com/cloudevents/sdk-go/v2 v2.14.0/go.mod h1:xDmKfzNjM8gBvjaF8ijFjM1VYOVUEeUfapHMUX1T5To= +github.com/cloudevents/sdk-go/v2 v2.15.2 h1:54+I5xQEnI73RBhWHxbI1XJcqOFOVJN85vb41+8mHUc= +github.com/cloudevents/sdk-go/v2 v2.15.2/go.mod h1:lL7kSWAE/V8VI4Wh0jbL2v/jvqsm6tjmaQBSvxcv4uE= github.com/cncf/xds/go v0.0.0-20231128003011-0fa0005c9caa h1:jQCWAUqqlij9Pgj2i/PB79y4KOPYVyFYdROxgaCwdTQ= github.com/cncf/xds/go v0.0.0-20231128003011-0fa0005c9caa/go.mod h1:x/1Gn8zydmfq8dk6e9PdstVsDgu9RuyIIJqAaF//0IM= github.com/coreos/go-semver v0.3.1 h1:yi21YpKnrx1gt5R+la8n5WgS0kCrsPp33dmEyHReZr4= @@ -180,8 +180,6 @@ github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd h1:TRLaZ9cD/w github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd/go.mod h1:6dJC0mAP4ikYIbvyc7fijjWJddQyLn8Ig3JB5CqoB9Q= github.com/modern-go/reflect2 v1.0.2 h1:xBagoLtFs94CBntxluKeaWgTMpvLxC4ur3nMaC9Gz0M= github.com/modern-go/reflect2 v1.0.2/go.mod h1:yWuevngMOJpCy52FWWMvUC8ws7m/LJsjYzDa0/r8luk= -github.com/morvencao/ocm-sdk-go v0.0.0-20240411073141-7692feecb557 h1:er9Zc4jhqxvund50yaA8LLM1J7nrS/IfjwVm21g/4cI= -github.com/morvencao/ocm-sdk-go v0.0.0-20240411073141-7692feecb557/go.mod h1:sq+amR9Ls9JzMP5dypvlCx4jIGfDg45gicS67Z/MnlI= github.com/munnerz/goautoneg v0.0.0-20191010083416-a7dc8b61c822 h1:C3w9PqII01/Oq1c1nUAm88MOHcQC9l5mIlSMApZMrHA= github.com/munnerz/goautoneg v0.0.0-20191010083416-a7dc8b61c822/go.mod h1:+n7T8mK8HuQTcFwEeznm/DIxMOiR9yIdICNftLE1DvQ= github.com/nxadm/tail v1.4.4/go.mod h1:kenIhsEOeOJmVchQTgglprH7qJGnHDVpk1VPCcaMI8A= @@ -443,6 +441,8 @@ k8s.io/utils v0.0.0-20240310230437-4693a0247e57 h1:gbqbevonBh57eILzModw6mrkbwM0g k8s.io/utils v0.0.0-20240310230437-4693a0247e57/go.mod h1:OLgZIPagt7ERELqWJFomSt595RzquPNLL48iOWgYOg0= open-cluster-management.io/api v0.13.0 h1:dlcJEZlNlE0DmSDctK2s7iWKg9l+Tgb0V78Z040nMuk= open-cluster-management.io/api v0.13.0/go.mod h1:CuCPEzXDvOyxBB0H1d1eSeajbHqaeGEKq9c63vQc63w= +open-cluster-management.io/sdk-go v0.13.1-0.20240415030117-612344aae744 h1:dBO6eK3gHSoRpl8OckW1zyOp35BOI48rYgoCznrPn40= +open-cluster-management.io/sdk-go v0.13.1-0.20240415030117-612344aae744/go.mod h1:w2OaxtCyegxeyFLU42UQ3oxUz01QdsBQkcHI17T/l48= sigs.k8s.io/apiserver-network-proxy/konnectivity-client v0.28.0 h1:TgtAeesdhpm2SGwkQasmbeqDo8th5wOBA5h/AjTKA4I= sigs.k8s.io/apiserver-network-proxy/konnectivity-client v0.28.0/go.mod h1:VHVDI/KrK4fjnV61bE2g3sA7tiETLn8sooImelsCx3Y= sigs.k8s.io/controller-runtime v0.17.2 h1:FwHwD1CTUemg0pW2otk7/U5/i5m2ymzvOXdbeGOUvw0= diff --git a/pkg/addonmanager/addontesting/helpers.go b/pkg/addonmanager/addontesting/helpers.go index 03ee3bcd..c33b1a01 100644 --- a/pkg/addonmanager/addontesting/helpers.go +++ b/pkg/addonmanager/addontesting/helpers.go @@ -190,8 +190,7 @@ func NewManifestWork(name, namespace string, objects ...*unstructured.Unstructur Name: name, Namespace: namespace, Annotations: map[string]string{ - common.CloudEventsDataTypeAnnotationKey: payload.ManifestBundleEventDataType.String(), - common.CloudEventsGenerationAnnotationKey: "0", + common.CloudEventsDataTypeAnnotationKey: payload.ManifestBundleEventDataType.String(), }, }, Spec: workapiv1.ManifestWorkSpec{ diff --git a/pkg/addonmanager/controllers/agentdeploy/controller.go b/pkg/addonmanager/controllers/agentdeploy/controller.go index dd3ffc5c..e5e7bed3 100644 --- a/pkg/addonmanager/controllers/agentdeploy/controller.go +++ b/pkg/addonmanager/controllers/agentdeploy/controller.go @@ -314,8 +314,7 @@ func (c *addonDeployController) sync(ctx context.Context, syncCtx factory.SyncCo // updateAddon updates finalizers and conditions of addon. // to avoid conflict updateAddon updates finalizers firstly if finalizers has change. func (c *addonDeployController) updateAddon(ctx context.Context, new, old *addonapiv1alpha1.ManagedClusterAddOn) error { - if !equality.Semantic.DeepEqual(new.GetFinalizers(), old.GetFinalizers()) || - !equality.Semantic.DeepEqual(new.GetAnnotations(), old.GetAnnotations()) { + if !equality.Semantic.DeepEqual(new.GetFinalizers(), old.GetFinalizers()) { _, err := c.addonClient.AddonV1alpha1().ManagedClusterAddOns(new.Namespace).Update(ctx, new, metav1.UpdateOptions{}) return err } @@ -331,6 +330,7 @@ func (c *addonDeployController) updateAddon(ctx context.Context, new, old *addon func (c *addonDeployController) applyWork(ctx context.Context, appliedType string, work *workapiv1.ManifestWork, addon *addonapiv1alpha1.ManagedClusterAddOn) (*workapiv1.ManifestWork, error) { + work, err := c.workApplier.Apply(ctx, work) if err != nil { meta.SetStatusCondition(&addon.Status.Conditions, metav1.Condition{ diff --git a/vendor/github.com/cloudevents/sdk-go/v2/binding/doc.go b/vendor/github.com/cloudevents/sdk-go/v2/binding/doc.go index 8fa99978..ff92f683 100644 --- a/vendor/github.com/cloudevents/sdk-go/v2/binding/doc.go +++ b/vendor/github.com/cloudevents/sdk-go/v2/binding/doc.go @@ -4,7 +4,6 @@ */ /* - Package binding defines interfaces for protocol bindings. NOTE: Most applications that emit or consume events should use the ../client @@ -16,11 +15,11 @@ Receiver and a Sender belonging to different bindings. This is useful for intermediary applications that route or forward events, but not necessary for most "endpoint" applications that emit or consume events. -Protocol Bindings +# Protocol Bindings A protocol binding usually implements a Message, a Sender and Receiver, a StructuredWriter and a BinaryWriter (depending on the supported encodings of the protocol) and an Write[ProtocolMessage] method. -Read and write events +# Read and write events The core of this package is the binding.Message interface. Through binding.MessageReader It defines how to read a protocol specific message for an @@ -49,7 +48,7 @@ The binding.Write method tries to preserve the structured/binary encoding, in or Messages can be eventually wrapped to change their behaviours and binding their lifecycle, like the binding.FinishMessage. Every Message wrapper implements the MessageWrapper interface -Sender and Receiver +# Sender and Receiver A Receiver receives protocol specific messages and wraps them to into binding.Message implementations. @@ -60,9 +59,8 @@ Message and ExactlyOnceMessage provide methods to allow acknowledgments to propagate when a reliable messages is forwarded from a Receiver to a Sender. QoS 0 (unreliable), 1 (at-least-once) and 2 (exactly-once) are supported. -Transport +# Transport A binding implementation providing Sender and Receiver implementations can be used as a Transport through the BindingTransport adapter. - */ package binding diff --git a/vendor/github.com/cloudevents/sdk-go/v2/binding/encoding.go b/vendor/github.com/cloudevents/sdk-go/v2/binding/encoding.go index 5070b729..bb8f9142 100644 --- a/vendor/github.com/cloudevents/sdk-go/v2/binding/encoding.go +++ b/vendor/github.com/cloudevents/sdk-go/v2/binding/encoding.go @@ -11,9 +11,9 @@ import "errors" type Encoding int const ( - // Binary encoding as specified in https://github.com/cloudevents/spec/blob/master/spec.md#message + // Binary encoding as specified in https://github.com/cloudevents/spec/blob/main/cloudevents/spec.md#message EncodingBinary Encoding = iota - // Structured encoding as specified in https://github.com/cloudevents/spec/blob/master/spec.md#message + // Structured encoding as specified in https://github.com/cloudevents/spec/blob/main/cloudevents/spec.md#message EncodingStructured // Message is an instance of EventMessage or it contains EventMessage nested (through MessageWrapper) EncodingEvent diff --git a/vendor/github.com/cloudevents/sdk-go/v2/binding/event_message.go b/vendor/github.com/cloudevents/sdk-go/v2/binding/event_message.go index f82c729c..83d613af 100644 --- a/vendor/github.com/cloudevents/sdk-go/v2/binding/event_message.go +++ b/vendor/github.com/cloudevents/sdk-go/v2/binding/event_message.go @@ -22,7 +22,9 @@ const ( // EventMessage type-converts a event.Event object to implement Message. // This allows local event.Event objects to be sent directly via Sender.Send() -// s.Send(ctx, binding.EventMessage(e)) +// +// s.Send(ctx, binding.EventMessage(e)) +// // When an event is wrapped into a EventMessage, the original event could be // potentially mutated. If you need to use the Event again, after wrapping it into // an Event message, you should copy it before diff --git a/vendor/github.com/cloudevents/sdk-go/v2/binding/message.go b/vendor/github.com/cloudevents/sdk-go/v2/binding/message.go index e30e150c..2fb136c6 100644 --- a/vendor/github.com/cloudevents/sdk-go/v2/binding/message.go +++ b/vendor/github.com/cloudevents/sdk-go/v2/binding/message.go @@ -66,7 +66,7 @@ type MessageMetadataReader interface { // Message is the interface to a binding-specific message containing an event. // -// Reliable Delivery +// # Reliable Delivery // // There are 3 reliable qualities of service for messages: // diff --git a/vendor/github.com/cloudevents/sdk-go/v2/binding/spec/doc.go b/vendor/github.com/cloudevents/sdk-go/v2/binding/spec/doc.go index 44c0b314..da5bc9f8 100644 --- a/vendor/github.com/cloudevents/sdk-go/v2/binding/spec/doc.go +++ b/vendor/github.com/cloudevents/sdk-go/v2/binding/spec/doc.go @@ -8,6 +8,5 @@ Package spec provides spec-version metadata. For use by code that maps events using (prefixed) attribute name strings. Supports handling multiple spec versions uniformly. - */ package spec diff --git a/vendor/github.com/cloudevents/sdk-go/v2/client/client.go b/vendor/github.com/cloudevents/sdk-go/v2/client/client.go index ea8fbfbb..452304ff 100644 --- a/vendor/github.com/cloudevents/sdk-go/v2/client/client.go +++ b/vendor/github.com/cloudevents/sdk-go/v2/client/client.go @@ -98,6 +98,7 @@ type ceClient struct { eventDefaulterFns []EventDefaulter pollGoroutines int blockingCallback bool + ackMalformedEvent bool } func (c *ceClient) applyOptions(opts ...Option) error { @@ -202,7 +203,13 @@ func (c *ceClient) StartReceiver(ctx context.Context, fn interface{}) error { return fmt.Errorf("client already has a receiver") } - invoker, err := newReceiveInvoker(fn, c.observabilityService, c.inboundContextDecorators, c.eventDefaulterFns...) + invoker, err := newReceiveInvoker( + fn, + c.observabilityService, + c.inboundContextDecorators, + c.eventDefaulterFns, + c.ackMalformedEvent, + ) if err != nil { return err } diff --git a/vendor/github.com/cloudevents/sdk-go/v2/client/http_receiver.go b/vendor/github.com/cloudevents/sdk-go/v2/client/http_receiver.go index 94a4b4e6..672581b5 100644 --- a/vendor/github.com/cloudevents/sdk-go/v2/client/http_receiver.go +++ b/vendor/github.com/cloudevents/sdk-go/v2/client/http_receiver.go @@ -14,7 +14,7 @@ import ( ) func NewHTTPReceiveHandler(ctx context.Context, p *thttp.Protocol, fn interface{}) (*EventReceiver, error) { - invoker, err := newReceiveInvoker(fn, noopObservabilityService{}, nil) //TODO(slinkydeveloper) maybe not nil? + invoker, err := newReceiveInvoker(fn, noopObservabilityService{}, nil, nil, false) //TODO(slinkydeveloper) maybe not nil? if err != nil { return nil, err } diff --git a/vendor/github.com/cloudevents/sdk-go/v2/client/invoker.go b/vendor/github.com/cloudevents/sdk-go/v2/client/invoker.go index 403fb0f5..a3080b00 100644 --- a/vendor/github.com/cloudevents/sdk-go/v2/client/invoker.go +++ b/vendor/github.com/cloudevents/sdk-go/v2/client/invoker.go @@ -23,11 +23,18 @@ type Invoker interface { var _ Invoker = (*receiveInvoker)(nil) -func newReceiveInvoker(fn interface{}, observabilityService ObservabilityService, inboundContextDecorators []func(context.Context, binding.Message) context.Context, fns ...EventDefaulter) (Invoker, error) { +func newReceiveInvoker( + fn interface{}, + observabilityService ObservabilityService, + inboundContextDecorators []func(context.Context, binding.Message) context.Context, + fns []EventDefaulter, + ackMalformedEvent bool, +) (Invoker, error) { r := &receiveInvoker{ eventDefaulterFns: fns, observabilityService: observabilityService, inboundContextDecorators: inboundContextDecorators, + ackMalformedEvent: ackMalformedEvent, } if fn, err := receiver(fn); err != nil { @@ -44,6 +51,7 @@ type receiveInvoker struct { observabilityService ObservabilityService eventDefaulterFns []EventDefaulter inboundContextDecorators []func(context.Context, binding.Message) context.Context + ackMalformedEvent bool } func (r *receiveInvoker) Invoke(ctx context.Context, m binding.Message, respFn protocol.ResponseFn) (err error) { @@ -58,13 +66,13 @@ func (r *receiveInvoker) Invoke(ctx context.Context, m binding.Message, respFn p switch { case eventErr != nil && r.fn.hasEventIn: r.observabilityService.RecordReceivedMalformedEvent(ctx, eventErr) - return respFn(ctx, nil, protocol.NewReceipt(false, "failed to convert Message to Event: %w", eventErr)) + return respFn(ctx, nil, protocol.NewReceipt(r.ackMalformedEvent, "failed to convert Message to Event: %w", eventErr)) case r.fn != nil: // Check if event is valid before invoking the receiver function if e != nil { if validationErr := e.Validate(); validationErr != nil { r.observabilityService.RecordReceivedMalformedEvent(ctx, validationErr) - return respFn(ctx, nil, protocol.NewReceipt(false, "validation error in incoming event: %w", validationErr)) + return respFn(ctx, nil, protocol.NewReceipt(r.ackMalformedEvent, "validation error in incoming event: %w", validationErr)) } } diff --git a/vendor/github.com/cloudevents/sdk-go/v2/client/options.go b/vendor/github.com/cloudevents/sdk-go/v2/client/options.go index 93847816..44394be3 100644 --- a/vendor/github.com/cloudevents/sdk-go/v2/client/options.go +++ b/vendor/github.com/cloudevents/sdk-go/v2/client/options.go @@ -126,3 +126,16 @@ func WithBlockingCallback() Option { return nil } } + +// WithAckMalformedevents causes malformed events received within StartReceiver to be acknowledged +// rather than being permanently not-acknowledged. This can be useful when a protocol does not +// provide a responder implementation and would otherwise cause the receiver to be partially or +// fully stuck. +func WithAckMalformedEvent() Option { + return func(i interface{}) error { + if c, ok := i.(*ceClient); ok { + c.ackMalformedEvent = true + } + return nil + } +} diff --git a/vendor/github.com/cloudevents/sdk-go/v2/client/receiver.go b/vendor/github.com/cloudevents/sdk-go/v2/client/receiver.go index b1ab532d..2cc0e649 100644 --- a/vendor/github.com/cloudevents/sdk-go/v2/client/receiver.go +++ b/vendor/github.com/cloudevents/sdk-go/v2/client/receiver.go @@ -57,7 +57,6 @@ var ( // * func(event.Event) (*event.Event, protocol.Result) // * func(context.Context, event.Event) *event.Event // * func(context.Context, event.Event) (*event.Event, protocol.Result) -// func receiver(fn interface{}) (*receiverFn, error) { fnType := reflect.TypeOf(fn) if fnType.Kind() != reflect.Func { diff --git a/vendor/github.com/cloudevents/sdk-go/v2/event/event.go b/vendor/github.com/cloudevents/sdk-go/v2/event/event.go index 94b5aa0a..52495f9a 100644 --- a/vendor/github.com/cloudevents/sdk-go/v2/event/event.go +++ b/vendor/github.com/cloudevents/sdk-go/v2/event/event.go @@ -55,13 +55,12 @@ func New(version ...string) Event { // Use functions in the types package to convert extension values. // For example replace this: // -// var i int -// err := e.ExtensionAs("foo", &i) +// var i int +// err := e.ExtensionAs("foo", &i) // // With this: // -// i, err := types.ToInteger(e.Extensions["foo"]) -// +// i, err := types.ToInteger(e.Extensions["foo"]) func (e Event) ExtensionAs(name string, obj interface{}) error { return e.Context.ExtensionAs(name, obj) } diff --git a/vendor/github.com/cloudevents/sdk-go/v2/event/eventcontext_v03.go b/vendor/github.com/cloudevents/sdk-go/v2/event/eventcontext_v03.go index c511c81c..3f050554 100644 --- a/vendor/github.com/cloudevents/sdk-go/v2/event/eventcontext_v03.go +++ b/vendor/github.com/cloudevents/sdk-go/v2/event/eventcontext_v03.go @@ -179,7 +179,8 @@ func (ec EventContextV03) AsV1() *EventContextV1 { } // Validate returns errors based on requirements from the CloudEvents spec. -// For more details, see https://github.com/cloudevents/spec/blob/master/spec.md +// For more details, see +// https://github.com/cloudevents/spec/blob/main/cloudevents/spec.md // As of Feb 26, 2019, commit 17c32ea26baf7714ad027d9917d03d2fff79fc7e // + https://github.com/cloudevents/spec/pull/387 -> datacontentencoding // + https://github.com/cloudevents/spec/pull/406 -> subject diff --git a/vendor/github.com/cloudevents/sdk-go/v2/protocol/doc.go b/vendor/github.com/cloudevents/sdk-go/v2/protocol/doc.go index f826a184..3c771fc5 100644 --- a/vendor/github.com/cloudevents/sdk-go/v2/protocol/doc.go +++ b/vendor/github.com/cloudevents/sdk-go/v2/protocol/doc.go @@ -21,6 +21,5 @@ Available protocols: * Nats * Nats Streaming (stan) * Google PubSub - */ package protocol diff --git a/vendor/github.com/cloudevents/sdk-go/v2/protocol/http/context.go b/vendor/github.com/cloudevents/sdk-go/v2/protocol/http/context.go index 0eec396a..e973738c 100644 --- a/vendor/github.com/cloudevents/sdk-go/v2/protocol/http/context.go +++ b/vendor/github.com/cloudevents/sdk-go/v2/protocol/http/context.go @@ -24,7 +24,7 @@ type RequestData struct { } // WithRequestDataAtContext uses the http.Request to add RequestData -// information to the Context. +// information to the Context. func WithRequestDataAtContext(ctx context.Context, r *nethttp.Request) context.Context { if r == nil { return ctx diff --git a/vendor/github.com/cloudevents/sdk-go/v2/protocol/http/options.go b/vendor/github.com/cloudevents/sdk-go/v2/protocol/http/options.go index 5e400905..6582af3e 100644 --- a/vendor/github.com/cloudevents/sdk-go/v2/protocol/http/options.go +++ b/vendor/github.com/cloudevents/sdk-go/v2/protocol/http/options.go @@ -158,7 +158,6 @@ func WithMethod(method string) Option { } } -// // Middleware is a function that takes an existing http.Handler and wraps it in middleware, // returning the wrapped http.Handler. type Middleware func(next nethttp.Handler) nethttp.Handler diff --git a/vendor/github.com/cloudevents/sdk-go/v2/protocol/http/protocol.go b/vendor/github.com/cloudevents/sdk-go/v2/protocol/http/protocol.go index dba6fd7b..7ee3b8fe 100644 --- a/vendor/github.com/cloudevents/sdk-go/v2/protocol/http/protocol.go +++ b/vendor/github.com/cloudevents/sdk-go/v2/protocol/http/protocol.go @@ -102,7 +102,10 @@ func New(opts ...Option) (*Protocol, error) { } if p.Client == nil { - p.Client = http.DefaultClient + // This is how http.DefaultClient is initialized. We do not just use + // that because when WithRoundTripper is used, it will change the client's + // transport, which would cause that transport to be used process-wide. + p.Client = &http.Client{} } if p.roundTripper != nil { diff --git a/vendor/github.com/cloudevents/sdk-go/v2/protocol/http/protocol_retry.go b/vendor/github.com/cloudevents/sdk-go/v2/protocol/http/protocol_retry.go index 71e7346f..21fc7e9b 100644 --- a/vendor/github.com/cloudevents/sdk-go/v2/protocol/http/protocol_retry.go +++ b/vendor/github.com/cloudevents/sdk-go/v2/protocol/http/protocol_retry.go @@ -10,9 +10,7 @@ import ( "context" "errors" "io" - "io/ioutil" "net/http" - "net/url" "time" "go.uber.org/zap" @@ -52,7 +50,7 @@ func (p *Protocol) doOnce(req *http.Request) (binding.Message, protocol.Result) } func (p *Protocol) doWithRetry(ctx context.Context, params *cecontext.RetryParams, req *http.Request) (binding.Message, error) { - then := time.Now() + start := time.Now() retry := 0 results := make([]protocol.Result, 0) @@ -67,7 +65,7 @@ func (p *Protocol) doWithRetry(ctx context.Context, params *cecontext.RetryParam cecontext.LoggerFrom(ctx).Warnw("could not close request body", zap.Error(err)) } }() - body, err = ioutil.ReadAll(req.Body) + body, err = io.ReadAll(req.Body) if err != nil { panic(err) } @@ -79,51 +77,34 @@ func (p *Protocol) doWithRetry(ctx context.Context, params *cecontext.RetryParam // Fast track common case. if protocol.IsACK(result) { - return msg, NewRetriesResult(result, retry, then, results) + return msg, NewRetriesResult(result, retry, start, results) } - // Try again? - // - // Make sure the error was something we should retry. - - { - var uErr *url.Error - if errors.As(result, &uErr) { - goto DoBackoff + var httpResult *Result + if errors.As(result, &httpResult) { + sc := httpResult.StatusCode + if !p.isRetriableFunc(sc) { + cecontext.LoggerFrom(ctx).Debugw("status code not retryable, will not try again", + zap.Error(httpResult), + zap.Int("statusCode", sc)) + return msg, NewRetriesResult(result, retry, start, results) } } - { - var httpResult *Result - if errors.As(result, &httpResult) { - sc := httpResult.StatusCode - if p.isRetriableFunc(sc) { - // retry! - goto DoBackoff - } else { - // Permanent error - cecontext.LoggerFrom(ctx).Debugw("status code not retryable, will not try again", - zap.Error(httpResult), - zap.Int("statusCode", sc)) - return msg, NewRetriesResult(result, retry, then, results) - } - } - } - - DoBackoff: - resetBody(req, body) - - // Wait for the correct amount of backoff time. - // total tries = retry + 1 - if err := params.Backoff(ctx, retry+1); err != nil { + if err = params.Backoff(ctx, retry+1); err != nil { // do not try again. cecontext.LoggerFrom(ctx).Debugw("backoff error, will not try again", zap.Error(err)) - return msg, NewRetriesResult(result, retry, then, results) + return msg, NewRetriesResult(result, retry, start, results) } retry++ + resetBody(req, body) results = append(results, result) + if msg != nil { + // avoid leak, forget message, ignore error + _ = msg.Finish(nil) + } } } @@ -134,12 +115,12 @@ func resetBody(req *http.Request, body []byte) { return } - req.Body = ioutil.NopCloser(bytes.NewReader(body)) + req.Body = io.NopCloser(bytes.NewReader(body)) // do not modify existing GetBody function if req.GetBody == nil { req.GetBody = func() (io.ReadCloser, error) { - return ioutil.NopCloser(bytes.NewReader(body)), nil + return io.NopCloser(bytes.NewReader(body)), nil } } } diff --git a/vendor/github.com/cloudevents/sdk-go/v2/protocol/http/write_request.go b/vendor/github.com/cloudevents/sdk-go/v2/protocol/http/write_request.go index 43ad3618..f22259a3 100644 --- a/vendor/github.com/cloudevents/sdk-go/v2/protocol/http/write_request.go +++ b/vendor/github.com/cloudevents/sdk-go/v2/protocol/http/write_request.go @@ -9,7 +9,6 @@ import ( "bytes" "context" "io" - "io/ioutil" "net/http" "strings" @@ -58,7 +57,7 @@ func (b *httpRequestWriter) SetData(data io.Reader) error { func (b *httpRequestWriter) setBody(body io.Reader) error { rc, ok := body.(io.ReadCloser) if !ok && body != nil { - rc = ioutil.NopCloser(body) + rc = io.NopCloser(body) } b.Body = rc if body != nil { @@ -68,21 +67,21 @@ func (b *httpRequestWriter) setBody(body io.Reader) error { buf := v.Bytes() b.GetBody = func() (io.ReadCloser, error) { r := bytes.NewReader(buf) - return ioutil.NopCloser(r), nil + return io.NopCloser(r), nil } case *bytes.Reader: b.ContentLength = int64(v.Len()) snapshot := *v b.GetBody = func() (io.ReadCloser, error) { r := snapshot - return ioutil.NopCloser(&r), nil + return io.NopCloser(&r), nil } case *strings.Reader: b.ContentLength = int64(v.Len()) snapshot := *v b.GetBody = func() (io.ReadCloser, error) { r := snapshot - return ioutil.NopCloser(&r), nil + return io.NopCloser(&r), nil } default: // This is where we'd set it to -1 (at least @@ -137,5 +136,7 @@ func (b *httpRequestWriter) SetExtension(name string, value interface{}) error { return nil } -var _ binding.StructuredWriter = (*httpRequestWriter)(nil) // Test it conforms to the interface -var _ binding.BinaryWriter = (*httpRequestWriter)(nil) // Test it conforms to the interface +var ( + _ binding.StructuredWriter = (*httpRequestWriter)(nil) // Test it conforms to the interface + _ binding.BinaryWriter = (*httpRequestWriter)(nil) // Test it conforms to the interface +) diff --git a/vendor/github.com/cloudevents/sdk-go/v2/types/doc.go b/vendor/github.com/cloudevents/sdk-go/v2/types/doc.go index cf7a94f3..3a0a595a 100644 --- a/vendor/github.com/cloudevents/sdk-go/v2/types/doc.go +++ b/vendor/github.com/cloudevents/sdk-go/v2/types/doc.go @@ -11,25 +11,25 @@ type has a corresponding native Go type and a canonical string encoding. The native Go types used to represent the CloudEvents types are: bool, int32, string, []byte, *url.URL, time.Time - +----------------+----------------+-----------------------------------+ - |CloudEvents Type|Native Type |Convertible From | - +================+================+===================================+ - |Bool |bool |bool | - +----------------+----------------+-----------------------------------+ - |Integer |int32 |Any numeric type with value in | - | | |range of int32 | - +----------------+----------------+-----------------------------------+ - |String |string |string | - +----------------+----------------+-----------------------------------+ - |Binary |[]byte |[]byte | - +----------------+----------------+-----------------------------------+ - |URI-Reference |*url.URL |url.URL, types.URIRef, types.URI | - +----------------+----------------+-----------------------------------+ - |URI |*url.URL |url.URL, types.URIRef, types.URI | - | | |Must be an absolute URI. | - +----------------+----------------+-----------------------------------+ - |Timestamp |time.Time |time.Time, types.Timestamp | - +----------------+----------------+-----------------------------------+ + +----------------+----------------+-----------------------------------+ + |CloudEvents Type|Native Type |Convertible From | + +================+================+===================================+ + |Bool |bool |bool | + +----------------+----------------+-----------------------------------+ + |Integer |int32 |Any numeric type with value in | + | | |range of int32 | + +----------------+----------------+-----------------------------------+ + |String |string |string | + +----------------+----------------+-----------------------------------+ + |Binary |[]byte |[]byte | + +----------------+----------------+-----------------------------------+ + |URI-Reference |*url.URL |url.URL, types.URIRef, types.URI | + +----------------+----------------+-----------------------------------+ + |URI |*url.URL |url.URL, types.URIRef, types.URI | + | | |Must be an absolute URI. | + +----------------+----------------+-----------------------------------+ + |Timestamp |time.Time |time.Time, types.Timestamp | + +----------------+----------------+-----------------------------------+ Extension attributes may be stored as a native type or a canonical string. The To functions will convert to the desired from any convertible type @@ -41,6 +41,5 @@ canonical strings. Note are no Parse or Format functions for URL or string. For URL use the standard url.Parse() and url.URL.String(). The canonical string format of a string is the string itself. - */ package types diff --git a/vendor/github.com/cloudevents/sdk-go/v2/types/value.go b/vendor/github.com/cloudevents/sdk-go/v2/types/value.go index f643d0aa..14004d3e 100644 --- a/vendor/github.com/cloudevents/sdk-go/v2/types/value.go +++ b/vendor/github.com/cloudevents/sdk-go/v2/types/value.go @@ -86,7 +86,7 @@ func Format(v interface{}) (string, error) { } // Validate v is a valid CloudEvents attribute value, convert it to one of: -// bool, int32, string, []byte, types.URI, types.URIRef, types.Timestamp +// bool, int32, string, []byte, types.URI, types.URIRef, types.Timestamp func Validate(v interface{}) (interface{}, error) { switch v := v.(type) { case bool, int32, string, []byte: @@ -151,7 +151,9 @@ func Validate(v interface{}) (interface{}, error) { } // Clone v clones a CloudEvents attribute value, which is one of the valid types: -// bool, int32, string, []byte, types.URI, types.URIRef, types.Timestamp +// +// bool, int32, string, []byte, types.URI, types.URIRef, types.Timestamp +// // Returns the same type // Panics if the type is not valid func Clone(v interface{}) interface{} { diff --git a/vendor/modules.txt b/vendor/modules.txt index 930d7c6a..63347b16 100644 --- a/vendor/modules.txt +++ b/vendor/modules.txt @@ -38,8 +38,8 @@ github.com/cespare/xxhash/v2 # github.com/cloudevents/sdk-go/protocol/mqtt_paho/v2 v2.0.0-20231030012137-0836a524e995 ## explicit; go 1.18 github.com/cloudevents/sdk-go/protocol/mqtt_paho/v2 -# github.com/cloudevents/sdk-go/v2 v2.14.0 -## explicit; go 1.17 +# github.com/cloudevents/sdk-go/v2 v2.15.2 +## explicit; go 1.18 github.com/cloudevents/sdk-go/v2 github.com/cloudevents/sdk-go/v2/binding github.com/cloudevents/sdk-go/v2/binding/format @@ -1391,7 +1391,7 @@ open-cluster-management.io/api/utils/work/v1/workapplier open-cluster-management.io/api/utils/work/v1/workvalidator open-cluster-management.io/api/work/v1 open-cluster-management.io/api/work/v1alpha1 -# open-cluster-management.io/sdk-go v0.13.1-0.20240321032811-7dbdd1b5c63d => github.com/morvencao/ocm-sdk-go v0.0.0-20240411073141-7692feecb557 +# open-cluster-management.io/sdk-go v0.13.1-0.20240415030117-612344aae744 ## explicit; go 1.21 open-cluster-management.io/sdk-go/pkg/apis/work/v1/applier open-cluster-management.io/sdk-go/pkg/apis/work/v1/builder @@ -1453,4 +1453,3 @@ sigs.k8s.io/structured-merge-diff/v4/value ## explicit; go 1.12 sigs.k8s.io/yaml sigs.k8s.io/yaml/goyaml.v2 -# open-cluster-management.io/sdk-go => github.com/morvencao/ocm-sdk-go v0.0.0-20240411073141-7692feecb557 diff --git a/vendor/open-cluster-management.io/sdk-go/pkg/cloudevents/generic/agentclient.go b/vendor/open-cluster-management.io/sdk-go/pkg/cloudevents/generic/agentclient.go index f4651a39..417b3166 100644 --- a/vendor/open-cluster-management.io/sdk-go/pkg/cloudevents/generic/agentclient.go +++ b/vendor/open-cluster-management.io/sdk-go/pkg/cloudevents/generic/agentclient.go @@ -288,7 +288,17 @@ func (c *CloudEventAgentClient[T]) specAction(source string, obj T) (evt types.R return types.Modified, nil } - if obj.GetResourceVersion() <= lastObj.GetResourceVersion() { + resourceVersion, err := strconv.ParseInt(obj.GetResourceVersion(), 10, 64) + if err != nil { + return evt, err + } + + lastResourceVersion, err := strconv.ParseInt(lastObj.GetResourceVersion(), 10, 64) + if err != nil { + return evt, err + } + + if resourceVersion <= lastResourceVersion { return evt, nil } diff --git a/vendor/open-cluster-management.io/sdk-go/pkg/cloudevents/generic/baseclient.go b/vendor/open-cluster-management.io/sdk-go/pkg/cloudevents/generic/baseclient.go index c9be382d..0ba5c718 100644 --- a/vendor/open-cluster-management.io/sdk-go/pkg/cloudevents/generic/baseclient.go +++ b/vendor/open-cluster-management.io/sdk-go/pkg/cloudevents/generic/baseclient.go @@ -27,6 +27,7 @@ type receiveFn func(ctx context.Context, evt cloudevents.Event) type baseClient struct { sync.RWMutex cloudEventsOptions options.CloudEventsOptions + cloudEventsProtocol options.CloudEventsProtocol cloudEventsClient cloudevents.Client cloudEventsRateLimiter flowcontrol.RateLimiter receiverChan chan int @@ -35,7 +36,7 @@ type baseClient struct { func (c *baseClient) connect(ctx context.Context) error { var err error - c.cloudEventsClient, err = c.cloudEventsOptions.Client(ctx) + c.cloudEventsClient, err = c.newCloudEventsClient(ctx) if err != nil { return err } @@ -57,7 +58,8 @@ func (c *baseClient) connect(ctx context.Context) error { for { if cloudEventsClient == nil { klog.V(4).Infof("reconnecting the cloudevents client") - cloudEventsClient, err = c.cloudEventsOptions.Client(ctx) + + c.cloudEventsClient, err = c.newCloudEventsClient(ctx) // TODO enhance the cloudevents SKD to avoid wrapping the error type to distinguish the net connection // errors if err != nil { @@ -93,7 +95,12 @@ func (c *baseClient) connect(ctx context.Context) error { // client to nil and retry c.sendReceiverSignal(stopReceiverSignal) + err = c.cloudEventsProtocol.Close(ctx) + if err != nil { + runtime.HandleError(fmt.Errorf("failed to close the cloudevents protocol, %v", err)) + } cloudEventsClient = nil + c.resetClient(cloudEventsClient) <-wait.RealTimer(delayFn()).C() @@ -221,3 +228,16 @@ func (c *baseClient) sendReconnectedSignal() { defer c.RUnlock() c.reconnectedChan <- struct{}{} } + +func (c *baseClient) newCloudEventsClient(ctx context.Context) (cloudevents.Client, error) { + var err error + c.cloudEventsProtocol, err = c.cloudEventsOptions.Protocol(ctx) + if err != nil { + return nil, err + } + c.cloudEventsClient, err = cloudevents.NewClient(c.cloudEventsProtocol) + if err != nil { + return nil, err + } + return c.cloudEventsClient, nil +} diff --git a/vendor/open-cluster-management.io/sdk-go/pkg/cloudevents/generic/options/grpc/agentoptions.go b/vendor/open-cluster-management.io/sdk-go/pkg/cloudevents/generic/options/grpc/agentoptions.go index 293f4b17..7e94551b 100644 --- a/vendor/open-cluster-management.io/sdk-go/pkg/cloudevents/generic/options/grpc/agentoptions.go +++ b/vendor/open-cluster-management.io/sdk-go/pkg/cloudevents/generic/options/grpc/agentoptions.go @@ -33,8 +33,8 @@ func (o *grpcAgentOptions) WithContext(ctx context.Context, evtCtx cloudevents.E return ctx, nil } -func (o *grpcAgentOptions) Client(ctx context.Context) (cloudevents.Client, error) { - receiver, err := o.GetCloudEventsClient( +func (o *grpcAgentOptions) Protocol(ctx context.Context) (options.CloudEventsProtocol, error) { + receiver, err := o.GetCloudEventsProtocol( ctx, func(err error) { o.errorChan <- err diff --git a/vendor/open-cluster-management.io/sdk-go/pkg/cloudevents/generic/options/grpc/options.go b/vendor/open-cluster-management.io/sdk-go/pkg/cloudevents/generic/options/grpc/options.go index 403673c0..5f0f8ecd 100644 --- a/vendor/open-cluster-management.io/sdk-go/pkg/cloudevents/generic/options/grpc/options.go +++ b/vendor/open-cluster-management.io/sdk-go/pkg/cloudevents/generic/options/grpc/options.go @@ -14,8 +14,7 @@ import ( "google.golang.org/grpc/credentials/insecure" "gopkg.in/yaml.v2" - cloudevents "github.com/cloudevents/sdk-go/v2" - + "open-cluster-management.io/sdk-go/pkg/cloudevents/generic/options" "open-cluster-management.io/sdk-go/pkg/cloudevents/generic/options/grpc/protocol" ) @@ -119,7 +118,7 @@ func (o *GRPCOptions) GetGRPCClientConn() (*grpc.ClientConn, error) { return conn, nil } -func (o *GRPCOptions) GetCloudEventsClient(ctx context.Context, errorHandler func(error), clientOpts ...protocol.Option) (cloudevents.Client, error) { +func (o *GRPCOptions) GetCloudEventsProtocol(ctx context.Context, errorHandler func(error), clientOpts ...protocol.Option) (options.CloudEventsProtocol, error) { conn, err := o.GetGRPCClientConn() if err != nil { return nil, err @@ -146,10 +145,5 @@ func (o *GRPCOptions) GetCloudEventsClient(ctx context.Context, errorHandler fun opts := []protocol.Option{} opts = append(opts, clientOpts...) - p, err := protocol.NewProtocol(conn, opts...) - if err != nil { - return nil, err - } - - return cloudevents.NewClient(p) + return protocol.NewProtocol(conn, opts...) } diff --git a/vendor/open-cluster-management.io/sdk-go/pkg/cloudevents/generic/options/grpc/sourceoptions.go b/vendor/open-cluster-management.io/sdk-go/pkg/cloudevents/generic/options/grpc/sourceoptions.go index e9a65ff0..1d71cfe7 100644 --- a/vendor/open-cluster-management.io/sdk-go/pkg/cloudevents/generic/options/grpc/sourceoptions.go +++ b/vendor/open-cluster-management.io/sdk-go/pkg/cloudevents/generic/options/grpc/sourceoptions.go @@ -31,8 +31,8 @@ func (o *gRPCSourceOptions) WithContext(ctx context.Context, evtCtx cloudevents. return ctx, nil } -func (o *gRPCSourceOptions) Client(ctx context.Context) (cloudevents.Client, error) { - receiver, err := o.GetCloudEventsClient( +func (o *gRPCSourceOptions) Protocol(ctx context.Context) (options.CloudEventsProtocol, error) { + receiver, err := o.GetCloudEventsProtocol( ctx, func(err error) { o.errorChan <- err diff --git a/vendor/open-cluster-management.io/sdk-go/pkg/cloudevents/generic/options/mqtt/agentoptions.go b/vendor/open-cluster-management.io/sdk-go/pkg/cloudevents/generic/options/mqtt/agentoptions.go index 526c7feb..9d1ef702 100644 --- a/vendor/open-cluster-management.io/sdk-go/pkg/cloudevents/generic/options/mqtt/agentoptions.go +++ b/vendor/open-cluster-management.io/sdk-go/pkg/cloudevents/generic/options/mqtt/agentoptions.go @@ -82,7 +82,7 @@ func (o *mqttAgentOptions) WithContext(ctx context.Context, evtCtx cloudevents.E return cloudeventscontext.WithTopic(ctx, eventsTopic), nil } -func (o *mqttAgentOptions) Client(ctx context.Context) (cloudevents.Client, error) { +func (o *mqttAgentOptions) Protocol(ctx context.Context) (options.CloudEventsProtocol, error) { subscribe := &paho.Subscribe{ Subscriptions: map[string]paho.SubscribeOptions{ // TODO support multiple sources, currently the client require the source events topic has a sourceID, in @@ -97,7 +97,7 @@ func (o *mqttAgentOptions) Client(ctx context.Context) (cloudevents.Client, erro subscribe.Subscriptions[o.Topics.SourceBroadcast] = paho.SubscribeOptions{QoS: byte(o.SubQoS)} } - receiver, err := o.GetCloudEventsClient( + return o.GetCloudEventsProtocol( ctx, fmt.Sprintf("%s-client", o.agentID), func(err error) { @@ -106,10 +106,6 @@ func (o *mqttAgentOptions) Client(ctx context.Context) (cloudevents.Client, erro cloudeventsmqtt.WithPublish(&paho.Publish{QoS: byte(o.PubQoS)}), cloudeventsmqtt.WithSubscribe(subscribe), ) - if err != nil { - return nil, err - } - return receiver, nil } func (o *mqttAgentOptions) ErrorChan() <-chan error { diff --git a/vendor/open-cluster-management.io/sdk-go/pkg/cloudevents/generic/options/mqtt/options.go b/vendor/open-cluster-management.io/sdk-go/pkg/cloudevents/generic/options/mqtt/options.go index 56b13be9..cfce3004 100644 --- a/vendor/open-cluster-management.io/sdk-go/pkg/cloudevents/generic/options/mqtt/options.go +++ b/vendor/open-cluster-management.io/sdk-go/pkg/cloudevents/generic/options/mqtt/options.go @@ -12,11 +12,11 @@ import ( "time" cloudeventsmqtt "github.com/cloudevents/sdk-go/protocol/mqtt_paho/v2" - cloudevents "github.com/cloudevents/sdk-go/v2" "github.com/eclipse/paho.golang/packets" "github.com/eclipse/paho.golang/paho" "gopkg.in/yaml.v2" "k8s.io/apimachinery/pkg/util/errors" + "open-cluster-management.io/sdk-go/pkg/cloudevents/generic/options" "open-cluster-management.io/sdk-go/pkg/cloudevents/generic/types" ) @@ -198,12 +198,12 @@ func (o *MQTTOptions) GetMQTTConnectOption(clientID string) *paho.Connect { return connect } -func (o *MQTTOptions) GetCloudEventsClient( +func (o *MQTTOptions) GetCloudEventsProtocol( ctx context.Context, clientID string, errorHandler func(error), clientOpts ...cloudeventsmqtt.Option, -) (cloudevents.Client, error) { +) (options.CloudEventsProtocol, error) { netConn, err := o.GetNetConn() if err != nil { return nil, err @@ -217,12 +217,7 @@ func (o *MQTTOptions) GetCloudEventsClient( opts := []cloudeventsmqtt.Option{cloudeventsmqtt.WithConnect(o.GetMQTTConnectOption(clientID))} opts = append(opts, clientOpts...) - protocol, err := cloudeventsmqtt.New(ctx, config, opts...) - if err != nil { - return nil, err - } - - return cloudevents.NewClient(protocol) + return cloudeventsmqtt.New(ctx, config, opts...) } func validateTopics(topics *types.Topics) error { diff --git a/vendor/open-cluster-management.io/sdk-go/pkg/cloudevents/generic/options/mqtt/sourceoptions.go b/vendor/open-cluster-management.io/sdk-go/pkg/cloudevents/generic/options/mqtt/sourceoptions.go index 1f51e378..9c245809 100644 --- a/vendor/open-cluster-management.io/sdk-go/pkg/cloudevents/generic/options/mqtt/sourceoptions.go +++ b/vendor/open-cluster-management.io/sdk-go/pkg/cloudevents/generic/options/mqtt/sourceoptions.go @@ -70,7 +70,7 @@ func (o *mqttSourceOptions) WithContext(ctx context.Context, evtCtx cloudevents. return cloudeventscontext.WithTopic(ctx, eventsTopic), nil } -func (o *mqttSourceOptions) Client(ctx context.Context) (cloudevents.Client, error) { +func (o *mqttSourceOptions) Protocol(ctx context.Context) (options.CloudEventsProtocol, error) { topicSource, err := getSourceFromEventsTopic(o.Topics.AgentEvents) if err != nil { return nil, err @@ -93,7 +93,7 @@ func (o *mqttSourceOptions) Client(ctx context.Context) (cloudevents.Client, err subscribe.Subscriptions[o.Topics.AgentBroadcast] = paho.SubscribeOptions{QoS: byte(o.SubQoS)} } - receiver, err := o.GetCloudEventsClient( + receiver, err := o.GetCloudEventsProtocol( ctx, o.clientID, func(err error) { diff --git a/vendor/open-cluster-management.io/sdk-go/pkg/cloudevents/generic/options/options.go b/vendor/open-cluster-management.io/sdk-go/pkg/cloudevents/generic/options/options.go index 57fbf14e..9bd231c8 100644 --- a/vendor/open-cluster-management.io/sdk-go/pkg/cloudevents/generic/options/options.go +++ b/vendor/open-cluster-management.io/sdk-go/pkg/cloudevents/generic/options/options.go @@ -4,6 +4,7 @@ import ( "context" cloudevents "github.com/cloudevents/sdk-go/v2" + "github.com/cloudevents/sdk-go/v2/protocol" ) // CloudEventsOptions provides cloudevents clients to send/receive cloudevents based on different event protocol. @@ -16,14 +17,22 @@ type CloudEventsOptions interface { // the MQTT topic, for Kafka, the context should contain the message key, etc. WithContext(ctx context.Context, evtContext cloudevents.EventContext) (context.Context, error) - // Client returns a cloudevents client for sending and receiving cloudevents - Client(ctx context.Context) (cloudevents.Client, error) + // Protocol returns a specific protocol to initialize the cloudevents client + Protocol(ctx context.Context) (CloudEventsProtocol, error) // ErrorChan returns a chan which will receive the cloudevents connection error. The source/agent client will try to // reconnect the when this error occurs. ErrorChan() <-chan error } +// CloudEventsProtocol is a set of interfaces for a specific binding need to implemented +// Reference: https://cloudevents.github.io/sdk-go/protocol_implementations.html#protocol-interfaces +type CloudEventsProtocol interface { + protocol.Sender + protocol.Receiver + protocol.Closer +} + // EventRateLimit for limiting the event sending rate. type EventRateLimit struct { // QPS indicates the maximum QPS to send the event. diff --git a/vendor/open-cluster-management.io/sdk-go/pkg/cloudevents/work/agent/handler/resourcehandler.go b/vendor/open-cluster-management.io/sdk-go/pkg/cloudevents/work/agent/handler/resourcehandler.go index 9f8d53b7..983c7634 100644 --- a/vendor/open-cluster-management.io/sdk-go/pkg/cloudevents/work/agent/handler/resourcehandler.go +++ b/vendor/open-cluster-management.io/sdk-go/pkg/cloudevents/work/agent/handler/resourcehandler.go @@ -2,11 +2,9 @@ package handler import ( "fmt" - "strconv" "k8s.io/apimachinery/pkg/api/errors" "k8s.io/apimachinery/pkg/watch" - "k8s.io/klog/v2" workv1lister "open-cluster-management.io/api/client/work/listers/work/v1" workv1 "open-cluster-management.io/api/work/v1" @@ -29,25 +27,6 @@ func NewManifestWorkAgentHandler(lister workv1lister.ManifestWorkNamespaceLister return err } - resourceVersion, err := strconv.ParseInt(work.ResourceVersion, 10, 64) - if err != nil { - return fmt.Errorf("failed to parse the resourceVersion of the manifestwork %s, %v", work.Name, err) - } - - lastResourceVersion, err := strconv.ParseInt(lastWork.ResourceVersion, 10, 64) - if err != nil { - return fmt.Errorf("failed to parse the resourceVersion of the manifestwork %s, %v", lastWork.Name, err) - } - - // if both the current and the last object have the resource version "0", then object - // is considered as changed, the message broker guarantees the order of the messages - if resourceVersion != 0 || lastResourceVersion != 0 { - if resourceVersion <= lastResourceVersion { - klog.Infof("The work %s resource version is less than or equal to cached, ignore", work.Name) - return nil - } - } - updatedWork := work.DeepCopy() // restore the fields that are maintained by local agent diff --git a/vendor/open-cluster-management.io/sdk-go/pkg/cloudevents/work/source/client/manifestwork.go b/vendor/open-cluster-management.io/sdk-go/pkg/cloudevents/work/source/client/manifestwork.go index 6b3b97fc..c9d6083e 100644 --- a/vendor/open-cluster-management.io/sdk-go/pkg/cloudevents/work/source/client/manifestwork.go +++ b/vendor/open-cluster-management.io/sdk-go/pkg/cloudevents/work/source/client/manifestwork.go @@ -209,7 +209,7 @@ func getWorkGeneration(work *workv1.ManifestWork) (int64, error) { generationInt, err := strconv.Atoi(generation) if err != nil { - return -1, fmt.Errorf("failed to convert generation %s to int: %v", generation, err) + return 0, fmt.Errorf("failed to convert generation %s to int: %v", generation, err) } return int64(generationInt), nil