Skip to content

Commit

Permalink
fix(limiter): fix store data inconsistency
Browse files Browse the repository at this point in the history
  • Loading branch information
xuqingyun committed Mar 20, 2024
1 parent ea0032d commit 5046398
Show file tree
Hide file tree
Showing 3 changed files with 60 additions and 17 deletions.
8 changes: 6 additions & 2 deletions pkg/ratelimiter/limiter/controller/upstream_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,10 +15,10 @@
package controller

import (
"k8s.io/apimachinery/pkg/api/errors"
"sync"
"time"

"k8s.io/apimachinery/pkg/api/errors"
"k8s.io/client-go/tools/cache"
"k8s.io/klog"

Expand All @@ -30,6 +30,10 @@ import (
"github.com/kubewharf/kubegateway/pkg/syncqueue"
)

const (
HandlerResyncPeriod = time.Minute * 10
)

type UpstreamController interface {
Run(stopCh <-chan struct{})
UpstreamClusterLister() proxylisters.UpstreamClusterLister
Expand All @@ -48,7 +52,7 @@ type upstreamController struct {
}

func NewUpstreamController(gatewayClient gatewayclientset.Interface, handlers ...UpstreamClusterHandler) UpstreamController {
gatewayInformerFactory := gatewayinformers.NewSharedInformerFactory(gatewayClient, 0)
gatewayInformerFactory := gatewayinformers.NewSharedInformerFactory(gatewayClient, HandlerResyncPeriod)
upstreamClusterInformer := gatewayInformerFactory.Proxy().V1alpha1().UpstreamClusters()
m := &upstreamController{
handlers: handlers,
Expand Down
36 changes: 26 additions & 10 deletions pkg/ratelimiter/limiter/ratelimter.go
Original file line number Diff line number Diff line change
Expand Up @@ -126,6 +126,10 @@ func (r *rateLimiter) UpdateRateLimitConditionStatus(upstream string, condition
return nil, fmt.Errorf("limit store for upstream %s upstream shard %v not found", upstream, shardId)
}

mutex := r.upstreamLock[condition.Spec.UpstreamCluster]
mutex.Lock()
defer mutex.Unlock()

upstreamCondition, err := limitStore.Get(condition.Spec.UpstreamCluster, upstreamStateConditionName(condition.Spec.UpstreamCluster))
if err != nil {
return nil, err
Expand All @@ -150,10 +154,6 @@ func (r *rateLimiter) UpdateRateLimitConditionStatus(upstream string, condition
}
condition.Labels["proxy.kubegateway.io/ratelimitcondition.instance"] = oldCondition.Spec.Instance

mutex := r.upstreamLock[condition.Spec.UpstreamCluster]
mutex.Lock()
defer mutex.Unlock()

clients, err := r.clientCache.AllClients()
if err != nil {
return nil, err
Expand Down Expand Up @@ -355,6 +355,9 @@ func (r *rateLimiter) UpstreamConditionHandler(cluster *proxyv1alpha1.UpstreamCl
if _, ok := r.upstreamLock[cluster.Name]; !ok {
r.upstreamLock[cluster.Name] = &sync.Mutex{}
}
mutex := r.upstreamLock[cluster.Name]
mutex.Lock()
defer mutex.Unlock()

upstreamCondition, err := limitStore.Get(cluster.Name, upstreamStateConditionName(cluster.Name))
if err != nil && !errors.IsNotFound(err) {
Expand Down Expand Up @@ -461,6 +464,7 @@ func (r *rateLimiter) startLeading(shardId int) {
delete(r.limitStoreMap, shardId)
}
r.limitStoreLock.Unlock()
stopLimitStoreWithRetry(limitStore, shardId)
}
}
}
Expand Down Expand Up @@ -491,11 +495,7 @@ func (r *rateLimiter) stopLeading(shardId int) {
r.limitStoreLock.Unlock()

if limitStore != nil {
err := limitStore.Stop()
if err != nil {
// TODO retry
klog.Errorf("Stop and flush limit store for shard %v error: %v", shardId, err)
}
stopLimitStoreWithRetry(limitStore, shardId)
}
}

Expand Down Expand Up @@ -687,7 +687,10 @@ func (r *rateLimiter) calculateUpstreamCondition(limitStore _interface.LimitStor
for _, status := range condition.Status.LimitItemStatuses {
level, _ := requestLevelMap[status.Name]

flowControlConfig := upstreamFlowControlMap[status.Name]
flowControlConfig, ok := upstreamFlowControlMap[status.Name]
if !ok {
continue
}
switch {
case status.MaxRequestsInflight != nil:
level += float64(status.MaxRequestsInflight.Max) / float64(flowControlConfig.MaxRequestsInflight.Max)
Expand Down Expand Up @@ -777,3 +780,16 @@ func toFlowControlLimit(schema proxyv1alpha1.FlowControlSchema) proxyv1alpha1.Li
func upstreamStateConditionName(cluster string) string {
return fmt.Sprintf("%s.state", cluster)
}

const stopLimitStoreRetryInterval = time.Second * 2

func stopLimitStoreWithRetry(limitStore _interface.LimitStore, shardId int) {
for i := 0; i < 10; i++ {
err := limitStore.Stop()
if err == nil {
break
}
klog.Errorf("Stop and flush limit store for shard %v error: %v, retry after %v", shardId, err, stopLimitStoreRetryInterval)
time.Sleep(stopLimitStoreRetryInterval)
}
}
33 changes: 28 additions & 5 deletions pkg/ratelimiter/store/k8s/cache_store.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package k8s
import (
"context"
"fmt"
"math/rand"
"sync"
"time"

Expand All @@ -25,6 +26,7 @@ func NewK8sCacheStore(gatewayClient gatewayclientset.Interface, syncPeriod time.
localStore := local.NewLocalStore()

store := &objectStore{
id: rand.Intn(10000),
shard: shard,
shardCount: shardCount,
stopCh: make(chan struct{}),
Expand All @@ -41,9 +43,11 @@ func NewK8sCacheStore(gatewayClient gatewayclientset.Interface, syncPeriod time.

type objectStore struct {
sync.Mutex
id int
shard int
shardCount int
stopCh chan struct{}
stopped bool
syncPeriod time.Duration
localStore _interface.LimitStore
gatewayClient gatewayclientset.Interface
Expand Down Expand Up @@ -123,7 +127,7 @@ func (s *objectStore) Load() error {
continue
}

klog.V(2).Infof("Load condition %v for shard %v", newItem.Name, s.shard)
klog.V(2).Infof("Load condition %v to store [%s]", newItem.Name, s.string())
_ = s.localStore.Save(item.Spec.UpstreamCluster, newItem)
}
return nil
Expand All @@ -146,12 +150,27 @@ func (s *objectStore) Flush() error {
}

func (s *objectStore) Stop() error {
if s.stopped {
return nil
}
err := s.doSyncLocked()
if err != nil {
return err
}
close(s.stopCh)
return s.localStore.Stop()

select {
case <-s.stopCh:
default:
close(s.stopCh)
}

err = s.localStore.Stop()
if err == nil {
s.stopped = true
}

klog.V(0).Infof("Store [%s] stopped: %v", s.string(), err)
return err
}

func (s *objectStore) createOrUpdate(condition *proxyv1alpha1.RateLimitCondition) (*proxyv1alpha1.RateLimitCondition, error) {
Expand Down Expand Up @@ -183,12 +202,12 @@ func (s *objectStore) createOrUpdate(condition *proxyv1alpha1.RateLimitCondition
func (s *objectStore) sync() {
err := s.doSyncLocked()
if err != nil {
klog.Errorf("Sync store error: %v", err)
klog.Errorf("Sync store [%s] error: %v", s.string(), err)
}
}

func (s *objectStore) doSyncLocked() error {
klog.V(2).Infof("Sync local data to k8s for shard %v", s.shard)
klog.V(2).Infof("Sync store [%s] local data to k8s", s.string())

s.Lock()
defer s.Unlock()
Expand All @@ -206,3 +225,7 @@ func (s *objectStore) doSyncLocked() error {
}
return nil
}

func (s *objectStore) string() string {
return fmt.Sprintf("shard=%v id=%v", s.shard, s.id)
}

0 comments on commit 5046398

Please sign in to comment.