Skip to content

Commit

Permalink
fix: client state lock for global maxinflight (#67)
Browse files Browse the repository at this point in the history
* fix: client state lock for global maxinflight

* feat(limiter): add debug info and probabilistic log
  • Loading branch information
xuqingyun authored Jul 26, 2024
1 parent eab9b9c commit a6c227c
Show file tree
Hide file tree
Showing 5 changed files with 107 additions and 37 deletions.
18 changes: 13 additions & 5 deletions pkg/ratelimiter/limiter/allocation.go
Original file line number Diff line number Diff line change
@@ -1,10 +1,13 @@
package limiter

import (
"math"
mathrand "math/rand"

"k8s.io/klog"

proxyv1alpha1 "github.com/kubewharf/kubegateway/pkg/apis/proxy/v1alpha1"
"github.com/kubewharf/kubegateway/pkg/flowcontrols/flowcontrol"
"k8s.io/klog"
"math"
)

const (
Expand Down Expand Up @@ -160,9 +163,14 @@ func calculateNextQuota(
}
burst = math.Ceil(burst)

klog.V(2).Infof("[allocate] fc=%s, total=%v, allocated=%v, totalReq=%v%%, last=%v, used=%v (%v%%), next=%v, threshold=(%v, %v), level(expect: %v, allocate: %v), name=%s",
flowControlConfig.Name, total, allocated, upstreamUsed.RequestLevel, current, used, flowControlStatus.RequestLevel, next,
reducingThreshold, increasingThreshold, expectTotalLevel, expectedAllocatePercent, condition.Name)
// log_level >= 3:always logging
// log_level = 2: probabilistic logging
// log_level <= 1: do not logging
if klog.V(3) || mathrand.Float64() < AllocateLogProbability {
klog.V(2).Infof("[allocate] fc=%s, total=%v, allocated=%v, totalReq=%v%%, last=%v, used=%v (%v%%), next=%v, threshold=(%v, %v), level(expect: %v, allocate: %v), name=%s",
flowControlConfig.Name, total, allocated, upstreamUsed.RequestLevel, current, used, flowControlStatus.RequestLevel, next,
reducingThreshold, increasingThreshold, expectTotalLevel, expectedAllocatePercent, condition.Name)
}

setFlowControlLimit(&newCondition.LimitItemDetail, flowControlType, next, burst)

Expand Down
73 changes: 55 additions & 18 deletions pkg/ratelimiter/limiter/ratelimter.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,8 @@ package limiter
import (
"context"
"fmt"
mathrand "math/rand"
"os"
"sort"
"strings"
"sync"
Expand Down Expand Up @@ -36,6 +38,20 @@ const (
RateLimitConditionInstanceLabel = "proxy.kubegateway.io/ratelimitcondition.instance"
)

var (
AcquireLogProbability = 0.1
AllocateLogProbability = 0.2

EnableAcquireDebugLog = false
)

func init() {
if os.Getenv("ENABLE_ACQUIRE_DEBUG_LOG") == "true" {
klog.Infof("Enable acquire debug log")
EnableAcquireDebugLog = true
}
}

func NewRateLimiter(gatewayClient gatewayclientset.Interface, client kubernetes.Interface, limitOptions options.RateLimitOptions) (RateLimiter, error) {
leaderElector, err := elector.NewLeaderElector(limitOptions.LeaderElectionConfiguration, client, limitOptions.Identity, limitOptions.ShardingCount)
if err != nil {
Expand Down Expand Up @@ -126,15 +142,18 @@ func (r *rateLimiter) UpdateRateLimitConditionStatus(upstream string, condition
return nil, fmt.Errorf("limit store for upstream %s upstream shard %v not found", upstream, shardId)
}

mutex := r.upstreamLock[condition.Spec.UpstreamCluster]
mutex.Lock()
defer mutex.Unlock()

upstreamCondition, err := limitStore.Get(condition.Spec.UpstreamCluster, upstreamStateConditionName(condition.Spec.UpstreamCluster))
if err != nil {
return nil, err
}

mutex := r.upstreamLock[condition.Spec.UpstreamCluster]
if mutex == nil {
return nil, fmt.Errorf("interval error: upstreamLock not exist")
}
mutex.Lock()
defer mutex.Unlock()

oldCondition, err := limitStore.Get(condition.Spec.UpstreamCluster, condition.Name)
if errors.IsNotFound(err) {
oldCondition = &proxyv1alpha1.RateLimitCondition{
Expand Down Expand Up @@ -222,6 +241,8 @@ func (r *rateLimiter) DoAcquire(upstream string, acquireRequest *proxyv1alpha1.R
}

var resultLogs []string
var logging bool

result := &acquireRequest.Status
for _, limit := range acquireRequest.Spec.Requests {
rs, err := func() (proxyv1alpha1.RateLimitAcquireResult, error) {
Expand Down Expand Up @@ -253,12 +274,12 @@ func (r *rateLimiter) DoAcquire(upstream string, acquireRequest *proxyv1alpha1.R
}
metrics.MonitorAcquiredTokenCounter(upstream, acquireRequest.Spec.Instance, limit.FlowControl, string(flowControl.Type()), string(proxyv1alpha1.GlobalCountLimit), "acquire", rs.Limit)
case proxyv1alpha1.MaxRequestsInflight:
threshold, err := flowControl.SetState(acquireRequest.Spec.Instance, acquireRequest.Spec.RequestID, limit.Tokens)
accept, threshold, err := flowControl.SetState(acquireRequest.Spec.Instance, acquireRequest.Spec.RequestID, limit.Tokens)
switch {
case err != nil:
rs.Accept = false
rs.Error = err.Error()
case threshold < 0:
case accept:
rs.Accept = true
rs.Limit = limit.Tokens
default:
Expand All @@ -269,14 +290,25 @@ func (r *rateLimiter) DoAcquire(upstream string, acquireRequest *proxyv1alpha1.R
metrics.MonitorAllocateRequest(upstream, acquireRequest.Spec.Instance, limit.FlowControl, string(flowControl.Type()), string(proxyv1alpha1.GlobalCountLimit), "acquire", rs.Limit)
}

if !rs.Accept && EnableAcquireDebugLog {
logging = true
if info := flowControl.DebugInfo(); len(info) > 0 {
klog.V(2).Infof("[acquire][debug] upstream=%q instance=%q name=%q token=%v limit=%v (%v) err=%q, debugInfo: %q",
upstream, acquireRequest.Spec.Instance, limit.FlowControl, limit.Tokens, rs.Limit, rs.Accept, rs.Error, info)
}
}

return rs, nil
}()

var rslog string
if err != nil {
rs.Accept = false
rs.Error = err.Error()
rslog = fmt.Sprintf("[name=%q token=%v result=%v error=%v]", limit.FlowControl, limit.Tokens, rs.Accept, rs.Error)
}
if len(rs.Error) > 0 {
logging = true
rslog = fmt.Sprintf("[name=%q token=%v result=%v (%v) error=%v]", limit.FlowControl, limit.Tokens, rs.Accept, rs.Limit, rs.Error)
} else {
rslog = fmt.Sprintf("[name=%q token=%v result=%v (%v)]", limit.FlowControl, limit.Tokens, rs.Accept, rs.Limit)
}
Expand All @@ -285,7 +317,12 @@ func (r *rateLimiter) DoAcquire(upstream string, acquireRequest *proxyv1alpha1.R
result.Results = append(result.Results, rs)
}

klog.V(2).Infof("[acquire] upstream=%q instance=%q: %v", upstream, acquireRequest.Spec.Instance, strings.Join(resultLogs, ","))
// log_level >= 4 or has error: always logging
// log_level = 2,3: probabilistic logging
// log_level <= 1: do not logging
if logging || bool(klog.V(4)) || mathrand.Float64() < AcquireLogProbability {
klog.V(2).Infof("[acquire] upstream=%q instance=%q id=%v: %v", upstream, acquireRequest.Spec.Instance, acquireRequest.Spec.RequestID, strings.Join(resultLogs, ","))
}

return acquireRequest, nil
}
Expand Down Expand Up @@ -516,13 +553,14 @@ func (r *rateLimiter) cleanupTimeoutClient() {
if time.Now().After(lastHeartbeat.Add(ClientHeartBeatTimeout)) {
r.clientCache.Delete(c)
instance := c
reason := fmt.Sprintf("instance %s last heartbeat since %v", instance, lastHeartbeat.Format(time.RFC3339Nano))
go func() {
for _, limitStore := range r.limitStoreMap {
conditions := limitStore.List(labels.Set{RateLimitConditionInstanceLabel: instance}.AsSelector())
for _, condition := range conditions {
r.deleteCondition(limitStore, condition, true)
r.deleteCondition(limitStore, condition, reason)
}
r.deleteGlobalFlowControl(limitStore, instance)
r.deleteGlobalFlowControl(limitStore, instance, reason)
}
}()
}
Expand Down Expand Up @@ -562,7 +600,7 @@ func (r *rateLimiter) cleanupUnknownCondition() {
}

if len(condition.Spec.Instance) > 0 {
r.deleteCondition(limitStore, condition, false)
r.deleteCondition(limitStore, condition, fmt.Sprintf("client %s not exist", condition.Spec.Instance))
clientsToDelete[condition.Spec.Instance] = true
}

Expand All @@ -574,7 +612,7 @@ func (r *rateLimiter) cleanupUnknownCondition() {

for instance, _ := range clientsToDelete {
for _, limitStore := range r.limitStoreMap {
r.deleteGlobalFlowControl(limitStore, instance)
r.deleteGlobalFlowControl(limitStore, instance, fmt.Sprintf("client %s not exist", instance))
}
}

Expand All @@ -588,24 +626,23 @@ func (r *rateLimiter) cleanupUnknownCondition() {
}
}

func (r *rateLimiter) deleteCondition(limitStore _interface.LimitStore, condition *proxyv1alpha1.RateLimitCondition, logForSkip bool) {
func (r *rateLimiter) deleteCondition(limitStore _interface.LimitStore, condition *proxyv1alpha1.RateLimitCondition, reason string) {
upstream := condition.Spec.UpstreamCluster
shardId := util.GetShardID(upstream, r.shardCount)
if !r.leaderElector.IsLeader(shardId) || len(condition.Spec.Instance) == 0 {
if logForSkip {
klog.Errorf("Skip delete condition %s, leader of (upstream %s, shard %v) is %v", condition.Name, upstream, shardId, r.leaderElector.GetLeaders()[shardId])
}
klog.Errorf("Skip delete condition %s, leader of (upstream %s, shard %v) is %v", condition.Name, upstream, shardId, r.leaderElector.GetLeaders()[shardId])
return
}

klog.Infof("Clean up condition %v for unknown client %v", condition.Name, condition.Spec.Instance)
klog.Infof("Clean up condition %v for unknown client %v: %v", condition.Name, condition.Spec.Instance, reason)
err := limitStore.Delete(upstream, condition.Name)
if err != nil {
klog.Errorf("Delete error: %v", err)
}
}

func (r *rateLimiter) deleteGlobalFlowControl(limitStore _interface.LimitStore, instance string) {
func (r *rateLimiter) deleteGlobalFlowControl(limitStore _interface.LimitStore, instance string, reason string) {
klog.Infof("Clean up flowcontrol state for unknown client %v: %v", instance, reason)
limitStore.DeleteInstanceState(instance)
}

Expand Down
3 changes: 2 additions & 1 deletion pkg/ratelimiter/store/flowcontrol/global_flowcontrol.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,10 +12,11 @@ var (

type GlobalFlowControl interface {
TryAcquireN(instance string, token int32) bool
SetState(instance string, requestId int64, current int32) (int32, error)
SetState(instance string, requestId int64, current int32) (accept bool, latest int32, err error)
String() string
Resize(n int32, burst int32) bool
Type() proxyv1alpha1.FlowControlSchemaType
DebugInfo() string
}

func NewGlobalFlowControl(schema proxyv1alpha1.FlowControlSchema) GlobalFlowControl {
Expand Down
42 changes: 31 additions & 11 deletions pkg/ratelimiter/store/flowcontrol/maxinflight.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package flowcontrol

import (
"fmt"
"strings"
"sync"
"sync/atomic"

Expand Down Expand Up @@ -63,7 +64,7 @@ func (f *globalMaxInflight) add(n int32) int32 {
return count - max
}

func (f *globalMaxInflight) SetState(instance string, requestId int64, current int32) (int32, error) {
func (f *globalMaxInflight) SetState(instance string, requestId int64, current int32) (bool, int32, error) {
f.lock.RLock()
state, ok := f.instanceStates[instance]
f.lock.RUnlock()
Expand All @@ -72,26 +73,30 @@ func (f *globalMaxInflight) SetState(instance string, requestId int64, current i
if ok {
f.lock.Lock()
delete(f.instanceStates, instance)
f.add(-state.count)
f.lock.Unlock()
current = 0
}
return -1, nil
return false, -1, nil
} else if !ok || state == nil {
f.lock.Lock()
state = &instanceState{}
f.instanceStates[instance] = state
state, ok = f.instanceStates[instance]
if !ok || state == nil {
state = &instanceState{}
f.instanceStates[instance] = state
}
f.lock.Unlock()
}

f.lock.RLock()
defer f.lock.RUnlock()

if requestId > 0 {
f.lock.RLock()
oldId := atomic.LoadInt64(&state.requestId)
if requestId <= oldId {
f.lock.RUnlock()
return -1, RequestIDTooOld
return false, current, RequestIDTooOld
}
atomic.StoreInt64(&state.requestId, requestId)
f.lock.RUnlock()
}

old := atomic.SwapInt32(&state.count, current)
Expand All @@ -101,16 +106,31 @@ func (f *globalMaxInflight) SetState(instance string, requestId int64, current i
if overflowed > 0 {
atomic.AddInt32(&state.count, -delta)
f.add(-delta)
return old, nil
return false, old, nil
}
if overflowed == 0 && current > 0 {
return current, nil
return false, current, nil
}
return -1, nil
return true, current, nil
}

func (f *globalMaxInflight) overflow() int32 {
max := atomic.LoadInt32(&f.max)
count := atomic.LoadInt32(&f.count)
return count - max
}

func (f *globalMaxInflight) DebugInfo() string {
var msgs []string
var total int32
f.lock.RLock()
for instance, state := range f.instanceStates {
msgs = append(msgs, fmt.Sprintf("[%s: %v]", instance, state.count))
total += state.count
}
f.lock.RUnlock()
count := atomic.LoadInt32(&f.count)
max := atomic.LoadInt32(&f.max)
info := fmt.Sprintf("name=%s max=%v count=%v total=%v details=%v", f.name, max, count, total, strings.Join(msgs, ","))
return info
}
8 changes: 6 additions & 2 deletions pkg/ratelimiter/store/flowcontrol/tokenbucket.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,10 @@ func (f *globalTokenBucket) Resize(n int32, burst int32) bool {
return resized
}

func (f *globalTokenBucket) SetState(instance string, requestId int64, current int32) (int32, error) {
return -1, nil
func (f *globalTokenBucket) SetState(instance string, requestId int64, current int32) (bool, int32, error) {
return false, -1, nil
}

func (f *globalTokenBucket) DebugInfo() string {
return ""
}

0 comments on commit a6c227c

Please sign in to comment.