From 5046398b9918f7b4e141812b50a55a6dadeec170 Mon Sep 17 00:00:00 2001 From: xuqingyun Date: Mon, 22 Jan 2024 21:22:21 +0800 Subject: [PATCH] fix(limiter): fix store data inconsistency --- .../limiter/controller/upstream_controller.go | 8 +++-- pkg/ratelimiter/limiter/ratelimter.go | 36 +++++++++++++------ pkg/ratelimiter/store/k8s/cache_store.go | 33 ++++++++++++++--- 3 files changed, 60 insertions(+), 17 deletions(-) diff --git a/pkg/ratelimiter/limiter/controller/upstream_controller.go b/pkg/ratelimiter/limiter/controller/upstream_controller.go index ebad340..885de49 100644 --- a/pkg/ratelimiter/limiter/controller/upstream_controller.go +++ b/pkg/ratelimiter/limiter/controller/upstream_controller.go @@ -15,10 +15,10 @@ package controller import ( - "k8s.io/apimachinery/pkg/api/errors" "sync" "time" + "k8s.io/apimachinery/pkg/api/errors" "k8s.io/client-go/tools/cache" "k8s.io/klog" @@ -30,6 +30,10 @@ import ( "github.com/kubewharf/kubegateway/pkg/syncqueue" ) +const ( + HandlerResyncPeriod = time.Minute * 10 +) + type UpstreamController interface { Run(stopCh <-chan struct{}) UpstreamClusterLister() proxylisters.UpstreamClusterLister @@ -48,7 +52,7 @@ type upstreamController struct { } func NewUpstreamController(gatewayClient gatewayclientset.Interface, handlers ...UpstreamClusterHandler) UpstreamController { - gatewayInformerFactory := gatewayinformers.NewSharedInformerFactory(gatewayClient, 0) + gatewayInformerFactory := gatewayinformers.NewSharedInformerFactory(gatewayClient, HandlerResyncPeriod) upstreamClusterInformer := gatewayInformerFactory.Proxy().V1alpha1().UpstreamClusters() m := &upstreamController{ handlers: handlers, diff --git a/pkg/ratelimiter/limiter/ratelimter.go b/pkg/ratelimiter/limiter/ratelimter.go index f0b1c3f..5be8b4e 100644 --- a/pkg/ratelimiter/limiter/ratelimter.go +++ b/pkg/ratelimiter/limiter/ratelimter.go @@ -126,6 +126,10 @@ func (r *rateLimiter) UpdateRateLimitConditionStatus(upstream string, condition return nil, fmt.Errorf("limit store for upstream %s upstream shard %v not found", upstream, shardId) } + mutex := r.upstreamLock[condition.Spec.UpstreamCluster] + mutex.Lock() + defer mutex.Unlock() + upstreamCondition, err := limitStore.Get(condition.Spec.UpstreamCluster, upstreamStateConditionName(condition.Spec.UpstreamCluster)) if err != nil { return nil, err @@ -150,10 +154,6 @@ func (r *rateLimiter) UpdateRateLimitConditionStatus(upstream string, condition } condition.Labels["proxy.kubegateway.io/ratelimitcondition.instance"] = oldCondition.Spec.Instance - mutex := r.upstreamLock[condition.Spec.UpstreamCluster] - mutex.Lock() - defer mutex.Unlock() - clients, err := r.clientCache.AllClients() if err != nil { return nil, err @@ -355,6 +355,9 @@ func (r *rateLimiter) UpstreamConditionHandler(cluster *proxyv1alpha1.UpstreamCl if _, ok := r.upstreamLock[cluster.Name]; !ok { r.upstreamLock[cluster.Name] = &sync.Mutex{} } + mutex := r.upstreamLock[cluster.Name] + mutex.Lock() + defer mutex.Unlock() upstreamCondition, err := limitStore.Get(cluster.Name, upstreamStateConditionName(cluster.Name)) if err != nil && !errors.IsNotFound(err) { @@ -461,6 +464,7 @@ func (r *rateLimiter) startLeading(shardId int) { delete(r.limitStoreMap, shardId) } r.limitStoreLock.Unlock() + stopLimitStoreWithRetry(limitStore, shardId) } } } @@ -491,11 +495,7 @@ func (r *rateLimiter) stopLeading(shardId int) { r.limitStoreLock.Unlock() if limitStore != nil { - err := limitStore.Stop() - if err != nil { - // TODO retry - klog.Errorf("Stop and flush limit store for shard %v error: %v", shardId, err) - } + stopLimitStoreWithRetry(limitStore, shardId) } } @@ -687,7 +687,10 @@ func (r *rateLimiter) calculateUpstreamCondition(limitStore _interface.LimitStor for _, status := range condition.Status.LimitItemStatuses { level, _ := requestLevelMap[status.Name] - flowControlConfig := upstreamFlowControlMap[status.Name] + flowControlConfig, ok := upstreamFlowControlMap[status.Name] + if !ok { + continue + } switch { case status.MaxRequestsInflight != nil: level += float64(status.MaxRequestsInflight.Max) / float64(flowControlConfig.MaxRequestsInflight.Max) @@ -777,3 +780,16 @@ func toFlowControlLimit(schema proxyv1alpha1.FlowControlSchema) proxyv1alpha1.Li func upstreamStateConditionName(cluster string) string { return fmt.Sprintf("%s.state", cluster) } + +const stopLimitStoreRetryInterval = time.Second * 2 + +func stopLimitStoreWithRetry(limitStore _interface.LimitStore, shardId int) { + for i := 0; i < 10; i++ { + err := limitStore.Stop() + if err == nil { + break + } + klog.Errorf("Stop and flush limit store for shard %v error: %v, retry after %v", shardId, err, stopLimitStoreRetryInterval) + time.Sleep(stopLimitStoreRetryInterval) + } +} diff --git a/pkg/ratelimiter/store/k8s/cache_store.go b/pkg/ratelimiter/store/k8s/cache_store.go index b2fdf34..5a8b355 100644 --- a/pkg/ratelimiter/store/k8s/cache_store.go +++ b/pkg/ratelimiter/store/k8s/cache_store.go @@ -3,6 +3,7 @@ package k8s import ( "context" "fmt" + "math/rand" "sync" "time" @@ -25,6 +26,7 @@ func NewK8sCacheStore(gatewayClient gatewayclientset.Interface, syncPeriod time. localStore := local.NewLocalStore() store := &objectStore{ + id: rand.Intn(10000), shard: shard, shardCount: shardCount, stopCh: make(chan struct{}), @@ -41,9 +43,11 @@ func NewK8sCacheStore(gatewayClient gatewayclientset.Interface, syncPeriod time. type objectStore struct { sync.Mutex + id int shard int shardCount int stopCh chan struct{} + stopped bool syncPeriod time.Duration localStore _interface.LimitStore gatewayClient gatewayclientset.Interface @@ -123,7 +127,7 @@ func (s *objectStore) Load() error { continue } - klog.V(2).Infof("Load condition %v for shard %v", newItem.Name, s.shard) + klog.V(2).Infof("Load condition %v to store [%s]", newItem.Name, s.string()) _ = s.localStore.Save(item.Spec.UpstreamCluster, newItem) } return nil @@ -146,12 +150,27 @@ func (s *objectStore) Flush() error { } func (s *objectStore) Stop() error { + if s.stopped { + return nil + } err := s.doSyncLocked() if err != nil { return err } - close(s.stopCh) - return s.localStore.Stop() + + select { + case <-s.stopCh: + default: + close(s.stopCh) + } + + err = s.localStore.Stop() + if err == nil { + s.stopped = true + } + + klog.V(0).Infof("Store [%s] stopped: %v", s.string(), err) + return err } func (s *objectStore) createOrUpdate(condition *proxyv1alpha1.RateLimitCondition) (*proxyv1alpha1.RateLimitCondition, error) { @@ -183,12 +202,12 @@ func (s *objectStore) createOrUpdate(condition *proxyv1alpha1.RateLimitCondition func (s *objectStore) sync() { err := s.doSyncLocked() if err != nil { - klog.Errorf("Sync store error: %v", err) + klog.Errorf("Sync store [%s] error: %v", s.string(), err) } } func (s *objectStore) doSyncLocked() error { - klog.V(2).Infof("Sync local data to k8s for shard %v", s.shard) + klog.V(2).Infof("Sync store [%s] local data to k8s", s.string()) s.Lock() defer s.Unlock() @@ -206,3 +225,7 @@ func (s *objectStore) doSyncLocked() error { } return nil } + +func (s *objectStore) string() string { + return fmt.Sprintf("shard=%v id=%v", s.shard, s.id) +}