Skip to content

Commit

Permalink
restore
Browse files Browse the repository at this point in the history
  • Loading branch information
britaniar committed Oct 30, 2024
1 parent 8657b1a commit c5e7835
Show file tree
Hide file tree
Showing 5 changed files with 21 additions and 21 deletions.
8 changes: 4 additions & 4 deletions cmd/hubagent/options/ratelimit.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ func (o *RateLimitOptions) AddFlags(fs *flag.FlagSet) {
}

// DefaultControllerRateLimiter provide a default rate limiter for controller, and users can tune it by corresponding flags.
func DefaultControllerRateLimiter(opts RateLimitOptions) workqueue.TypedRateLimiter[any] {
func DefaultControllerRateLimiter(opts RateLimitOptions) workqueue.RateLimiter {
// set defaults
if opts.RateLimiterBaseDelay <= 0 {
opts.RateLimiterBaseDelay = 5 * time.Millisecond
Expand All @@ -51,8 +51,8 @@ func DefaultControllerRateLimiter(opts RateLimitOptions) workqueue.TypedRateLimi
if opts.RateLimiterBucketSize <= 0 {
opts.RateLimiterBucketSize = 100
}
return workqueue.NewTypedMaxOfRateLimiter[any](
workqueue.NewTypedItemExponentialFailureRateLimiter[any](opts.RateLimiterBaseDelay, opts.RateLimiterMaxDelay),
&workqueue.TypedBucketRateLimiter[any]{Limiter: rate.NewLimiter(rate.Limit(opts.RateLimiterQPS), opts.RateLimiterBucketSize)},
return workqueue.NewMaxOfRateLimiter(
workqueue.NewItemExponentialFailureRateLimiter(opts.RateLimiterBaseDelay, opts.RateLimiterMaxDelay),
&workqueue.BucketRateLimiter{Limiter: rate.NewLimiter(rate.Limit(opts.RateLimiterQPS), opts.RateLimiterBucketSize)},
)
}
14 changes: 7 additions & 7 deletions pkg/controllers/rollout/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -623,25 +623,25 @@ func (r *Reconciler) SetupWithManager(mgr runtime.Manager) error {
return runtime.NewControllerManagedBy(mgr).Named("rollout-controller").
WithOptions(ctrl.Options{MaxConcurrentReconciles: r.MaxConcurrentReconciles}). // set the max number of concurrent reconciles
Watches(&fleetv1beta1.ClusterResourceSnapshot{}, handler.Funcs{
CreateFunc: func(ctx context.Context, e event.CreateEvent, q workqueue.TypedRateLimitingInterface[reconcile.Request]) {
CreateFunc: func(ctx context.Context, e event.CreateEvent, q workqueue.RateLimitingInterface) {
klog.V(2).InfoS("Handling a resourceSnapshot create event", "resourceSnapshot", klog.KObj(e.Object))
handleResourceSnapshot(e.Object, q)
},
GenericFunc: func(ctx context.Context, e event.GenericEvent, q workqueue.TypedRateLimitingInterface[reconcile.Request]) {
GenericFunc: func(ctx context.Context, e event.GenericEvent, q workqueue.RateLimitingInterface) {
klog.V(2).InfoS("Handling a resourceSnapshot generic event", "resourceSnapshot", klog.KObj(e.Object))
handleResourceSnapshot(e.Object, q)
},
}).
Watches(&fleetv1beta1.ClusterResourceBinding{}, handler.Funcs{
CreateFunc: func(ctx context.Context, e event.CreateEvent, q workqueue.TypedRateLimitingInterface[reconcile.Request]) {
CreateFunc: func(ctx context.Context, e event.CreateEvent, q workqueue.RateLimitingInterface) {
klog.V(2).InfoS("Handling a resourceBinding create event", "resourceBinding", klog.KObj(e.Object))
handleResourceBinding(e.Object, q)
},
UpdateFunc: func(ctx context.Context, e event.UpdateEvent, q workqueue.TypedRateLimitingInterface[reconcile.Request]) {
UpdateFunc: func(ctx context.Context, e event.UpdateEvent, q workqueue.RateLimitingInterface) {
klog.V(2).InfoS("Handling a resourceBinding update event", "resourceBinding", klog.KObj(e.ObjectNew))
handleResourceBinding(e.ObjectNew, q)
},
GenericFunc: func(ctx context.Context, e event.GenericEvent, q workqueue.TypedRateLimitingInterface[reconcile.Request]) {
GenericFunc: func(ctx context.Context, e event.GenericEvent, q workqueue.RateLimitingInterface) {
klog.V(2).InfoS("Handling a resourceBinding generic event", "resourceBinding", klog.KObj(e.Object))
handleResourceBinding(e.Object, q)
},
Expand All @@ -650,7 +650,7 @@ func (r *Reconciler) SetupWithManager(mgr runtime.Manager) error {
}

// handleResourceSnapshot parse the resourceBinding label and annotation and enqueue the CRP name associated with the resource resourceBinding
func handleResourceSnapshot(snapshot client.Object, q workqueue.TypedRateLimitingInterface[reconcile.Request]) {
func handleResourceSnapshot(snapshot client.Object, q workqueue.RateLimitingInterface) {
snapshotKRef := klog.KObj(snapshot)
// check if it is the first resource resourceBinding which is supposed to have NumberOfResourceSnapshotsAnnotation
_, exist := snapshot.GetAnnotations()[fleetv1beta1.ResourceGroupHashAnnotation]
Expand Down Expand Up @@ -687,7 +687,7 @@ func handleResourceSnapshot(snapshot client.Object, q workqueue.TypedRateLimitin
}

// handleResourceBinding parse the binding label and enqueue the CRP name associated with the resource binding
func handleResourceBinding(binding client.Object, q workqueue.TypedRateLimitingInterface[reconcile.Request]) {
func handleResourceBinding(binding client.Object, q workqueue.RateLimitingInterface) {
bindingRef := klog.KObj(binding)
// get the CRP name from the label
crp := binding.GetLabels()[fleetv1beta1.CRPTrackingLabel]
Expand Down
4 changes: 2 additions & 2 deletions pkg/controllers/workgenerator/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -889,7 +889,7 @@ func (r *Reconciler) SetupWithManager(mgr controllerruntime.Manager) error {
Watches(&fleetv1beta1.Work{}, &handler.Funcs{
// we care about work delete event as we want to know when a work is deleted so that we can
// delete the corresponding resource binding fast.
DeleteFunc: func(ctx context.Context, evt event.DeleteEvent, queue workqueue.TypedRateLimitingInterface[reconcile.Request]) {
DeleteFunc: func(ctx context.Context, evt event.DeleteEvent, queue workqueue.RateLimitingInterface) {
if evt.Object == nil {
klog.ErrorS(controller.NewUnexpectedBehaviorError(fmt.Errorf("deleteEvent %v received with no metadata", evt)),
"Failed to process a delete event for work object")
Expand All @@ -909,7 +909,7 @@ func (r *Reconciler) SetupWithManager(mgr controllerruntime.Manager) error {
},
// we care about work update event as we want to know when a work is applied so that we can
// update the corresponding resource binding status fast.
UpdateFunc: func(ctx context.Context, evt event.UpdateEvent, queue workqueue.TypedRateLimitingInterface[reconcile.Request]) {
UpdateFunc: func(ctx context.Context, evt event.UpdateEvent, queue workqueue.RateLimitingInterface) {
if evt.ObjectOld == nil || evt.ObjectNew == nil {
klog.ErrorS(controller.NewUnexpectedBehaviorError(fmt.Errorf("updateEvent %v received with no metadata", evt)),
"Failed to process an update event for work object")
Expand Down
10 changes: 5 additions & 5 deletions pkg/scheduler/queue/queue.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ type ClusterResourcePlacementSchedulingQueue interface {
// In the future, when more features, e.g., inter-placement affinity/anti-affinity, are added,
// more queues, such as a backoff queue, might become necessary.
type simpleClusterResourcePlacementSchedulingQueue struct {
active workqueue.TypedRateLimitingInterface[any]
active workqueue.RateLimitingInterface
}

// Verify that simpleClusterResourcePlacementSchedulingQueue implements
Expand All @@ -69,20 +69,20 @@ var _ ClusterResourcePlacementSchedulingQueue = &simpleClusterResourcePlacementS
// simpleClusterResourcePlacementSchedulingQueueOptions are the options for the
// simpleClusterResourcePlacementSchedulingQueue.
type simpleClusterResourcePlacementSchedulingQueueOptions struct {
rateLimiter workqueue.TypedRateLimiter[any]
rateLimiter workqueue.RateLimiter
name string
}

// Option is the function that configures the simpleClusterResourcePlacementSchedulingQueue.
type Option func(*simpleClusterResourcePlacementSchedulingQueueOptions)

var defaultSimpleClusterResourcePlacementSchedulingQueueOptions = simpleClusterResourcePlacementSchedulingQueueOptions{
rateLimiter: workqueue.DefaultTypedControllerRateLimiter[any](),
rateLimiter: workqueue.DefaultControllerRateLimiter(),
name: "clusterResourcePlacementSchedulingQueue",
}

// WithRateLimiter sets a rate limiter for the workqueue.
func WithRateLimiter(rateLimiter workqueue.TypedRateLimiter[any]) Option {
func WithRateLimiter(rateLimiter workqueue.RateLimiter) Option {
return func(o *simpleClusterResourcePlacementSchedulingQueueOptions) {
o.rateLimiter = rateLimiter
}
Expand Down Expand Up @@ -166,7 +166,7 @@ func NewSimpleClusterResourcePlacementSchedulingQueue(opts ...Option) ClusterRes
}

return &simpleClusterResourcePlacementSchedulingQueue{
active: workqueue.NewTypedRateLimitingQueueWithConfig(options.rateLimiter, workqueue.TypedRateLimitingQueueConfig[any]{
active: workqueue.NewRateLimitingQueueWithConfig(options.rateLimiter, workqueue.RateLimitingQueueConfig{
Name: options.name,
}),
}
Expand Down
6 changes: 3 additions & 3 deletions pkg/utils/controller/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -161,17 +161,17 @@ type controller struct {
reconcileFunc ReconcileFunc

// queue allowing parallel processing of resources.
queue workqueue.TypedRateLimitingInterface[any]
queue workqueue.RateLimitingInterface
}

// NewController returns a controller which can process resource periodically. We create the queue during the creation
// of the controller which means it can only be run once. We can move that to the run if we need to run it multiple times
func NewController(Name string, KeyFunc KeyFunc, ReconcileFunc ReconcileFunc, rateLimiter workqueue.TypedRateLimiter[any]) Controller {
func NewController(Name string, KeyFunc KeyFunc, ReconcileFunc ReconcileFunc, rateLimiter workqueue.RateLimiter) Controller {
return &controller{
name: Name,
keyFunc: KeyFunc,
reconcileFunc: ReconcileFunc,
queue: workqueue.NewTypedRateLimitingQueue(rateLimiter),
queue: workqueue.NewRateLimitingQueue(rateLimiter),
}
}

Expand Down

0 comments on commit c5e7835

Please sign in to comment.