Skip to content

Commit

Permalink
WIP use ratelimiter config in webapi plugins
Browse files Browse the repository at this point in the history
Signed-off-by: Ketan Umare <[email protected]>
  • Loading branch information
kumare3 committed Apr 5, 2024
1 parent 44c701e commit eb10118
Show file tree
Hide file tree
Showing 2 changed files with 8 additions and 3 deletions.
2 changes: 1 addition & 1 deletion flyteplugins/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ require (
golang.org/x/exp v0.0.0-20240325151524-a685a6edb6d8
golang.org/x/net v0.22.0
golang.org/x/oauth2 v0.16.0
golang.org/x/time v0.5.0
google.golang.org/api v0.155.0
google.golang.org/grpc v1.62.1
google.golang.org/protobuf v1.33.0
Expand Down Expand Up @@ -128,7 +129,6 @@ require (
golang.org/x/sys v0.18.0 // indirect
golang.org/x/term v0.18.0 // indirect
golang.org/x/text v0.14.0 // indirect
golang.org/x/time v0.5.0 // indirect
gomodules.xyz/jsonpatch/v2 v2.4.0 // indirect
google.golang.org/appengine v1.6.8 // indirect
google.golang.org/genproto v0.0.0-20240123012728-ef4313101c80 // indirect
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,9 @@ package webapi

import (
"context"

"golang.org/x/time/rate"
"k8s.io/client-go/util/workqueue"
"time"

"github.com/flyteorg/flyte/flyteplugins/go/tasks/errors"
"github.com/flyteorg/flyte/flyteplugins/go/tasks/pluginmachinery/core"
Expand Down Expand Up @@ -161,6 +162,7 @@ func ToPluginPhase(s core.Phase) (Phase, error) {
}

func NewResourceCache(ctx context.Context, name string, client Client, cfg webapi.CachingConfig,
rateCfg webapi.RateLimiterConfig,
scope promutils.Scope) (ResourceCache, error) {

q := ResourceCache{
Expand All @@ -169,7 +171,10 @@ func NewResourceCache(ctx context.Context, name string, client Client, cfg webap
}

autoRefreshCache, err := cache.NewAutoRefreshCache(name, q.SyncResource,
workqueue.DefaultControllerRateLimiter(), cfg.ResyncInterval.Duration, cfg.Workers, cfg.Size,
workqueue.NewMaxOfRateLimiter(
workqueue.NewItemExponentialFailureRateLimiter(5*time.Millisecond, 1000*time.Second),
&workqueue.BucketRateLimiter{Limiter: rate.NewLimiter(rate.Limit(rateCfg.QPS), rateCfg.Burst)},
), cfg.ResyncInterval.Duration, cfg.Workers, cfg.Size,
scope.NewSubScope("cache"))

if err != nil {
Expand Down

0 comments on commit eb10118

Please sign in to comment.