diff --git a/flyteplugins/go.mod b/flyteplugins/go.mod index 6e5ae9670a..5464f06c89 100644 --- a/flyteplugins/go.mod +++ b/flyteplugins/go.mod @@ -28,6 +28,7 @@ require ( golang.org/x/exp v0.0.0-20240325151524-a685a6edb6d8 golang.org/x/net v0.22.0 golang.org/x/oauth2 v0.16.0 + golang.org/x/time v0.5.0 google.golang.org/api v0.155.0 google.golang.org/grpc v1.62.1 google.golang.org/protobuf v1.33.0 @@ -128,7 +129,6 @@ require ( golang.org/x/sys v0.18.0 // indirect golang.org/x/term v0.18.0 // indirect golang.org/x/text v0.14.0 // indirect - golang.org/x/time v0.5.0 // indirect gomodules.xyz/jsonpatch/v2 v2.4.0 // indirect google.golang.org/appengine v1.6.8 // indirect google.golang.org/genproto v0.0.0-20240123012728-ef4313101c80 // indirect diff --git a/flyteplugins/go/tasks/pluginmachinery/internal/webapi/cache.go b/flyteplugins/go/tasks/pluginmachinery/internal/webapi/cache.go index a305323dca..8b0b49e993 100644 --- a/flyteplugins/go/tasks/pluginmachinery/internal/webapi/cache.go +++ b/flyteplugins/go/tasks/pluginmachinery/internal/webapi/cache.go @@ -2,8 +2,9 @@ package webapi import ( "context" - + "golang.org/x/time/rate" "k8s.io/client-go/util/workqueue" + "time" "github.com/flyteorg/flyte/flyteplugins/go/tasks/errors" "github.com/flyteorg/flyte/flyteplugins/go/tasks/pluginmachinery/core" @@ -161,6 +162,7 @@ func ToPluginPhase(s core.Phase) (Phase, error) { } func NewResourceCache(ctx context.Context, name string, client Client, cfg webapi.CachingConfig, + rateCfg webapi.RateLimiterConfig, scope promutils.Scope) (ResourceCache, error) { q := ResourceCache{ @@ -169,7 +171,10 @@ func NewResourceCache(ctx context.Context, name string, client Client, cfg webap } autoRefreshCache, err := cache.NewAutoRefreshCache(name, q.SyncResource, - workqueue.DefaultControllerRateLimiter(), cfg.ResyncInterval.Duration, cfg.Workers, cfg.Size, + workqueue.NewMaxOfRateLimiter( + workqueue.NewItemExponentialFailureRateLimiter(5*time.Millisecond, 1000*time.Second), + &workqueue.BucketRateLimiter{Limiter: rate.NewLimiter(rate.Limit(rateCfg.QPS), rateCfg.Burst)}, + ), cfg.ResyncInterval.Duration, cfg.Workers, cfg.Size, scope.NewSubScope("cache")) if err != nil {