Skip to content

Commit

Permalink
feat: refactor flowcontrol meter
Browse files Browse the repository at this point in the history
  • Loading branch information
xuqingyun committed Mar 20, 2024
1 parent 5046398 commit d1d2f4c
Show file tree
Hide file tree
Showing 6 changed files with 299 additions and 321 deletions.
228 changes: 18 additions & 210 deletions pkg/flowcontrols/remote/flowcontrol_wrapper.go
Original file line number Diff line number Diff line change
@@ -1,24 +1,21 @@
package remote

import (
"math"
"fmt"
"reflect"
"strings"
"sync"
"sync/atomic"
"time"

"k8s.io/apimachinery/pkg/util/clock"
"k8s.io/klog"

proxyv1alpha1 "github.com/kubewharf/kubegateway/pkg/apis/proxy/v1alpha1"
"github.com/kubewharf/kubegateway/pkg/flowcontrols/flowcontrol"
"github.com/kubewharf/kubegateway/pkg/flowcontrols/util"
)

const (
FlowControlMeterWindow = time.Second * 3
QPSMeterTickDuration = time.Second
QPSMeterBucketLen = 3
QPSMeterTickDuration = time.Second
QPSMeterBucketLen = 3

InflightMeterBucketLen = 6
InflightMeterBucketDuration = time.Millisecond * 200
Expand Down Expand Up @@ -49,36 +46,22 @@ type RemoteFlowControlWrapper interface {
}

func NewFlowControlCache(cluster, name, clientID string, globalCounterProvider GlobalCounterProvider) FlowControlCache {
tickDuration := QPSMeterTickDuration
buckets := make([]float64, QPSMeterBucketLen)

stopCh := make(chan struct{})

clientIdSlices := strings.Split(clientID, "-")
id := clientIdSlices[len(clientIdSlices)-1]

meter := util.NewMeter(fmt.Sprintf("%s/%s", cluster, name), QPSMeterBucketLen, QPSMeterTickDuration, InflightMeterBucketLen, InflightMeterBucketDuration)

f := &flowControlCache{
cluster: cluster,
name: name,
meter: &meter{
cluster: cluster,
name: name,
stopCh: stopCh,
clock: clock.RealClock{},
ticker: time.NewTicker(tickDuration),
last: time.Now(),
mu: sync.Mutex{},
counterBuckets: buckets,
inflightBuckets: make([]int32, InflightMeterBucketLen),
inflightChan: make(chan int32, 1),
},
cluster: cluster,
name: name,
meter: meter,
globalCounter: globalCounterProvider,
clientID: id,
}

f.local = &localWrapper{flowControlCache: f}

f.meter.start()
f.meter.Start()

return f
}
Expand All @@ -90,7 +73,7 @@ type flowControlCache struct {
name string

globalCounter GlobalCounterProvider
meter *meter
meter *util.Meter

clientID string
}
Expand Down Expand Up @@ -122,19 +105,19 @@ func (f *flowControlCache) Strategy() proxyv1alpha1.LimitStrategy {
}

func (f *flowControlCache) Rate() float64 {
return f.meter.rate()
return f.meter.Rate()
}

func (f *flowControlCache) Inflight() float64 {
return f.meter.avgInflight()
return f.meter.AvgInflight()
}

func (f *flowControlCache) MaxInflight() int32 {
return f.meter.maxInflight()
return f.meter.MaxInflight()
}

func (f *flowControlCache) Stop() {
close(f.meter.stopCh)
f.meter.Stop()
if f.remote != nil {
close(f.remote.stopCh)
}
Expand Down Expand Up @@ -293,197 +276,22 @@ func (f *remoteWrapper) Done() <-chan struct{} {

type meterWrapper struct {
flowcontrol.FlowControl
meter *meter
meter *util.Meter
}

func (f *meterWrapper) TryAcquire() bool {
acquire := f.FlowControl.TryAcquire()
if acquire {
f.meter.addInflight(1)
f.meter.add(1)
f.meter.StartOne()
}
return acquire
}

func (f *meterWrapper) Release() {
f.meter.addInflight(-1)
f.meter.EndOne()
f.FlowControl.Release()
}

type meter struct {
cluster string
name string

stopCh chan struct{}
clock clock.Clock
ticker *time.Ticker
mu sync.Mutex

uncounted int64
currentIndex int
rateAvg float64
last time.Time
counterBuckets []float64

inflight int32
inflightIndex int
inflightAvg float64
inflightMax int32
inflightBuckets []int32
inflightChan chan int32

debug bool
}

func (m *meter) start() {
go m.rateTick()
go m.inflightWorker()
}

func (m *meter) addInflight(add int32) {
inflight := atomic.AddInt32(&m.inflight, add)

select {
case m.inflightChan <- inflight:
default:
}
}

func (m *meter) add(add int64) {
atomic.AddInt64(&m.uncounted, add)
}

func (m *meter) rateTick() {
defer m.ticker.Stop()
for {
select {
case <-m.ticker.C:
m.calculateAvgRate()
case <-m.stopCh:
return
}
}
}

func (m *meter) calculateAvgRate() {
latestRate := m.latestRate()

m.mu.Lock()
lastRate := m.counterBuckets[m.currentIndex]
if lastRate == math.NaN() {
lastRate = 0
}

rateAvg := m.rateAvg + (latestRate-lastRate)/float64(len(m.counterBuckets))
m.rateAvg = rateAvg
m.counterBuckets[m.currentIndex] = latestRate
m.currentIndex = (m.currentIndex + 1) % len(m.counterBuckets)
m.mu.Unlock()

klog.V(6).Infof("FlowControl %s/%s tick: latestRate %v, rateAvg %v, currentIndex %v, counterBuckets %v",
m.cluster, m.name, latestRate, m.rateAvg, m.currentIndex, m.counterBuckets)
}

func (m *meter) latestRate() float64 {
count := atomic.LoadInt64(&m.uncounted)
atomic.AddInt64(&m.uncounted, -count)
m.mu.Lock()
last := m.last
now := m.clock.Now()
timeWindow := float64(now.Sub(last)) / float64(time.Second)
instantRate := float64(count) / timeWindow
m.last = now
m.mu.Unlock()

klog.V(6).Infof("FlowControl %s/%s latestRate: count %v, timeWindow %v, rate %v",
m.cluster, m.name, count, timeWindow, instantRate)

return instantRate
}

func (m *meter) rate() float64 {
return m.rateAvg
}

func (m *meter) avgInflight() float64 {
return m.inflightAvg
}

func (m *meter) maxInflight() int32 {
return m.inflightMax
}

func (m *meter) currentInflight() int32 {
return atomic.LoadInt32(&m.inflight)
}

func (m *meter) inflightWorker() {
timerDuration := InflightMeterBucketDuration * 2

timer := m.clock.NewTimer(timerDuration)
defer timer.Stop()

for {
select {
case inflight := <-m.inflightChan:
m.calInflight(inflight)
case <-timer.C():
m.calInflight(atomic.LoadInt32(&m.inflight))
case <-m.stopCh:
return
}
timer.Reset(timerDuration)
}
}

func (m *meter) calInflight(inflight int32) {
m.mu.Lock()

now := m.clock.Now()
milli := now.UnixMilli()
currentIndex := int(milli / int64(InflightMeterBucketDuration/time.Millisecond) % InflightMeterBucketLen)
lastIndex := m.inflightIndex

if currentIndex == lastIndex {
max := m.inflightBuckets[currentIndex]
if inflight > max {
m.inflightBuckets[currentIndex] = inflight
}
} else {
fakeIndex := currentIndex
if currentIndex < lastIndex {
fakeIndex += InflightMeterBucketLen
}
inflightDelta := m.inflightBuckets[lastIndex]

for i := lastIndex + 1; i < fakeIndex; i++ {
index := i % InflightMeterBucketLen
inflightDelta -= m.inflightBuckets[index]
m.inflightBuckets[index] = 0
}
inflightDelta -= m.inflightBuckets[currentIndex]
m.inflightBuckets[currentIndex] = inflight
m.inflightIndex = currentIndex

m.inflightAvg = m.inflightAvg + float64(inflightDelta)*InflightMeterBucketDuration.Seconds()

max := int32(0)
for _, ift := range m.inflightBuckets {
if ift > max {
max = ift
}
}
m.inflightMax = max

if m.debug {
klog.Infof("[debug] [%v] bucket: %v, delta: %v, max: %v, index: %v, avg: %.2f",
m.clock.Now().Format(time.RFC3339Nano), m.inflightBuckets, inflightDelta, max, currentIndex, m.inflightAvg)
}
}

m.mu.Unlock()
}

func EnableGlobalFlowControl(schema proxyv1alpha1.FlowControlSchema) bool {
switch schema.Strategy {
case proxyv1alpha1.GlobalAllocateLimit, proxyv1alpha1.GlobalCountLimit:
Expand Down
18 changes: 10 additions & 8 deletions pkg/flowcontrols/remote/global_flowcontrol.go
Original file line number Diff line number Diff line change
@@ -1,15 +1,17 @@
package remote

import (
"k8s.io/klog"
"os"
"strconv"
"sync"
"sync/atomic"
"time"

"k8s.io/klog"

proxyv1alpha1 "github.com/kubewharf/kubegateway/pkg/apis/proxy/v1alpha1"
"github.com/kubewharf/kubegateway/pkg/flowcontrols/flowcontrol"
"github.com/kubewharf/kubegateway/pkg/flowcontrols/util"
"github.com/kubewharf/kubegateway/pkg/gateway/metrics"
)

Expand Down Expand Up @@ -111,7 +113,7 @@ type GlobalCounterFlowControl interface {
type maxInflightWrapper struct {
flowcontrol.FlowControl
fcc *flowControlCache
meter *meter
meter *util.Meter
counter CounterFun
lock sync.Mutex
cond *sync.Cond
Expand All @@ -127,7 +129,7 @@ type maxInflightWrapper struct {
}

func (m *maxInflightWrapper) ExpectToken() int32 {
inflight := m.meter.currentInflight()
inflight := m.meter.CurrentInflight()

acquire := int32(0)
overLimited := atomic.LoadInt32(&m.overLimited)
Expand Down Expand Up @@ -178,7 +180,7 @@ func (m *maxInflightWrapper) SetLimit(acquireResult *AcquireResult) bool {

m.lock.Lock()
if atomic.LoadUint32(&m.serverUnavailable) == 0 {
inflight := m.meter.maxInflight()
inflight := m.meter.MaxInflight()
localMax := m.fcc.local.localConfig.MaxRequestsInflight.Max
if inflight < localMax {
inflight = localMax
Expand Down Expand Up @@ -250,7 +252,7 @@ func (m *maxInflightWrapper) TryAcquire() bool {
return m.FlowControl.TryAcquire()
}

currentInflight := atomic.LoadInt32(&m.meter.inflight)
currentInflight := m.meter.CurrentInflight()
naxInflight := atomic.LoadInt32(&m.acquiredMaxInflight)
overLimited := atomic.LoadInt32(&m.overLimited)
max := atomic.LoadInt32(&m.max)
Expand Down Expand Up @@ -295,7 +297,7 @@ func (m *maxInflightWrapper) Release() {
type tokenBucketWrapper struct {
flowcontrol.FlowControl
fcc *flowControlCache
meter *meter
meter *util.Meter
counter CounterFun
lock sync.Mutex
cond *sync.Cond
Expand All @@ -316,7 +318,7 @@ func (m *tokenBucketWrapper) ExpectToken() int32 {
expect := m.reserve - token

batch := m.tokenBatch
lastQPS := m.meter.rate()
lastQPS := m.meter.Rate()
if lastQPS > float64(m.tokenBatch) {
batch = int32(lastQPS) * GlobalTokenBucketBatchAcquiredPercent / 100
if batch < GlobalTokenBucketBatchAcquireMin {
Expand Down Expand Up @@ -365,7 +367,7 @@ func (m *tokenBucketWrapper) SetLimit(acquireResult *AcquireResult) bool {
}
m.lock.Lock()
if atomic.LoadUint32(&m.serverUnavailable) == 0 {
lastQPS := m.meter.rate()
lastQPS := m.meter.Rate()
localQPS := m.fcc.local.localConfig.TokenBucket.QPS
if lastQPS < float64(localQPS) {
lastQPS = float64(localQPS)
Expand Down
Loading

0 comments on commit d1d2f4c

Please sign in to comment.