diff --git a/flyteplugins/go.mod b/flyteplugins/go.mod index 6e5ae9670a7..5464f06c89a 100644 --- a/flyteplugins/go.mod +++ b/flyteplugins/go.mod @@ -28,6 +28,7 @@ require ( golang.org/x/exp v0.0.0-20240325151524-a685a6edb6d8 golang.org/x/net v0.22.0 golang.org/x/oauth2 v0.16.0 + golang.org/x/time v0.5.0 google.golang.org/api v0.155.0 google.golang.org/grpc v1.62.1 google.golang.org/protobuf v1.33.0 @@ -128,7 +129,6 @@ require ( golang.org/x/sys v0.18.0 // indirect golang.org/x/term v0.18.0 // indirect golang.org/x/text v0.14.0 // indirect - golang.org/x/time v0.5.0 // indirect gomodules.xyz/jsonpatch/v2 v2.4.0 // indirect google.golang.org/appengine v1.6.8 // indirect google.golang.org/genproto v0.0.0-20240123012728-ef4313101c80 // indirect diff --git a/flyteplugins/go/tasks/pluginmachinery/internal/webapi/cache.go b/flyteplugins/go/tasks/pluginmachinery/internal/webapi/cache.go index a305323dca8..b13070534e3 100644 --- a/flyteplugins/go/tasks/pluginmachinery/internal/webapi/cache.go +++ b/flyteplugins/go/tasks/pluginmachinery/internal/webapi/cache.go @@ -2,7 +2,9 @@ package webapi import ( "context" + "time" + "golang.org/x/time/rate" "k8s.io/client-go/util/workqueue" "github.com/flyteorg/flyte/flyteplugins/go/tasks/errors" @@ -161,6 +163,7 @@ func ToPluginPhase(s core.Phase) (Phase, error) { } func NewResourceCache(ctx context.Context, name string, client Client, cfg webapi.CachingConfig, + rateCfg webapi.RateLimiterConfig, scope promutils.Scope) (ResourceCache, error) { q := ResourceCache{ @@ -169,7 +172,10 @@ func NewResourceCache(ctx context.Context, name string, client Client, cfg webap } autoRefreshCache, err := cache.NewAutoRefreshCache(name, q.SyncResource, - workqueue.DefaultControllerRateLimiter(), cfg.ResyncInterval.Duration, cfg.Workers, cfg.Size, + workqueue.NewMaxOfRateLimiter( + workqueue.NewItemExponentialFailureRateLimiter(5*time.Millisecond, 1000*time.Second), + &workqueue.BucketRateLimiter{Limiter: rate.NewLimiter(rate.Limit(rateCfg.QPS), rateCfg.Burst)}, + ), cfg.ResyncInterval.Duration, cfg.Workers, cfg.Size, scope.NewSubScope("cache")) if err != nil { diff --git a/flyteplugins/go/tasks/pluginmachinery/internal/webapi/cache_test.go b/flyteplugins/go/tasks/pluginmachinery/internal/webapi/cache_test.go index 53e80cd391b..d228e7954e6 100644 --- a/flyteplugins/go/tasks/pluginmachinery/internal/webapi/cache_test.go +++ b/flyteplugins/go/tasks/pluginmachinery/internal/webapi/cache_test.go @@ -21,13 +21,14 @@ func TestNewResourceCache(t *testing.T) { t.Run("Simple", func(t *testing.T) { c, err := NewResourceCache(context.Background(), "Cache1", &mocks.Client{}, webapi.CachingConfig{ Size: 10, - }, promutils.NewTestScope()) + }, webapi.RateLimiterConfig{QPS: 1, Burst: 1}, promutils.NewTestScope()) assert.NoError(t, err) assert.NotNil(t, c) }) t.Run("Error", func(t *testing.T) { _, err := NewResourceCache(context.Background(), "Cache1", &mocks.Client{}, webapi.CachingConfig{}, + webapi.RateLimiterConfig{}, promutils.NewTestScope()) assert.Error(t, err) }) diff --git a/flyteplugins/go/tasks/pluginmachinery/internal/webapi/core.go b/flyteplugins/go/tasks/pluginmachinery/internal/webapi/core.go index a23f985fc6f..9c985218978 100644 --- a/flyteplugins/go/tasks/pluginmachinery/internal/webapi/core.go +++ b/flyteplugins/go/tasks/pluginmachinery/internal/webapi/core.go @@ -191,7 +191,7 @@ func createRemotePlugin(pluginEntry webapi.PluginEntry, c clock.Clock) core.Plug } resourceCache, err := NewResourceCache(ctx, pluginEntry.ID, p, p.GetConfig().Caching, - iCtx.MetricsScope().NewSubScope("cache")) + p.GetConfig().ReadRateLimiter, iCtx.MetricsScope().NewSubScope("cache")) if err != nil { return nil, err