diff --git a/pkg/flowcontrols/remote/flowcontrol_wrapper.go b/pkg/flowcontrols/remote/flowcontrol_wrapper.go index d3746fc..c7e25e7 100644 --- a/pkg/flowcontrols/remote/flowcontrol_wrapper.go +++ b/pkg/flowcontrols/remote/flowcontrol_wrapper.go @@ -1,24 +1,21 @@ package remote import ( - "math" + "fmt" "reflect" "strings" - "sync" - "sync/atomic" "time" - "k8s.io/apimachinery/pkg/util/clock" "k8s.io/klog" proxyv1alpha1 "github.com/kubewharf/kubegateway/pkg/apis/proxy/v1alpha1" "github.com/kubewharf/kubegateway/pkg/flowcontrols/flowcontrol" + "github.com/kubewharf/kubegateway/pkg/flowcontrols/util" ) const ( - FlowControlMeterWindow = time.Second * 3 - QPSMeterTickDuration = time.Second - QPSMeterBucketLen = 3 + QPSMeterTickDuration = time.Second + QPSMeterBucketLen = 3 InflightMeterBucketLen = 6 InflightMeterBucketDuration = time.Millisecond * 200 @@ -49,36 +46,22 @@ type RemoteFlowControlWrapper interface { } func NewFlowControlCache(cluster, name, clientID string, globalCounterProvider GlobalCounterProvider) FlowControlCache { - tickDuration := QPSMeterTickDuration - buckets := make([]float64, QPSMeterBucketLen) - - stopCh := make(chan struct{}) - clientIdSlices := strings.Split(clientID, "-") id := clientIdSlices[len(clientIdSlices)-1] + meter := util.NewMeter(fmt.Sprintf("%s/%s", cluster, name), QPSMeterBucketLen, QPSMeterTickDuration, InflightMeterBucketLen, InflightMeterBucketDuration) + f := &flowControlCache{ - cluster: cluster, - name: name, - meter: &meter{ - cluster: cluster, - name: name, - stopCh: stopCh, - clock: clock.RealClock{}, - ticker: time.NewTicker(tickDuration), - last: time.Now(), - mu: sync.Mutex{}, - counterBuckets: buckets, - inflightBuckets: make([]int32, InflightMeterBucketLen), - inflightChan: make(chan int32, 1), - }, + cluster: cluster, + name: name, + meter: meter, globalCounter: globalCounterProvider, clientID: id, } f.local = &localWrapper{flowControlCache: f} - f.meter.start() + f.meter.Start() return f } @@ -90,7 +73,7 @@ type flowControlCache struct { name string globalCounter GlobalCounterProvider - meter *meter + meter *util.Meter clientID string } @@ -122,19 +105,19 @@ func (f *flowControlCache) Strategy() proxyv1alpha1.LimitStrategy { } func (f *flowControlCache) Rate() float64 { - return f.meter.rate() + return f.meter.Rate() } func (f *flowControlCache) Inflight() float64 { - return f.meter.avgInflight() + return f.meter.AvgInflight() } func (f *flowControlCache) MaxInflight() int32 { - return f.meter.maxInflight() + return f.meter.MaxInflight() } func (f *flowControlCache) Stop() { - close(f.meter.stopCh) + f.meter.Stop() if f.remote != nil { close(f.remote.stopCh) } @@ -293,197 +276,22 @@ func (f *remoteWrapper) Done() <-chan struct{} { type meterWrapper struct { flowcontrol.FlowControl - meter *meter + meter *util.Meter } func (f *meterWrapper) TryAcquire() bool { acquire := f.FlowControl.TryAcquire() if acquire { - f.meter.addInflight(1) - f.meter.add(1) + f.meter.StartOne() } return acquire } func (f *meterWrapper) Release() { - f.meter.addInflight(-1) + f.meter.EndOne() f.FlowControl.Release() } -type meter struct { - cluster string - name string - - stopCh chan struct{} - clock clock.Clock - ticker *time.Ticker - mu sync.Mutex - - uncounted int64 - currentIndex int - rateAvg float64 - last time.Time - counterBuckets []float64 - - inflight int32 - inflightIndex int - inflightAvg float64 - inflightMax int32 - inflightBuckets []int32 - inflightChan chan int32 - - debug bool -} - -func (m *meter) start() { - go m.rateTick() - go m.inflightWorker() -} - -func (m *meter) addInflight(add int32) { - inflight := atomic.AddInt32(&m.inflight, add) - - select { - case m.inflightChan <- inflight: - default: - } -} - -func (m *meter) add(add int64) { - atomic.AddInt64(&m.uncounted, add) -} - -func (m *meter) rateTick() { - defer m.ticker.Stop() - for { - select { - case <-m.ticker.C: - m.calculateAvgRate() - case <-m.stopCh: - return - } - } -} - -func (m *meter) calculateAvgRate() { - latestRate := m.latestRate() - - m.mu.Lock() - lastRate := m.counterBuckets[m.currentIndex] - if lastRate == math.NaN() { - lastRate = 0 - } - - rateAvg := m.rateAvg + (latestRate-lastRate)/float64(len(m.counterBuckets)) - m.rateAvg = rateAvg - m.counterBuckets[m.currentIndex] = latestRate - m.currentIndex = (m.currentIndex + 1) % len(m.counterBuckets) - m.mu.Unlock() - - klog.V(6).Infof("FlowControl %s/%s tick: latestRate %v, rateAvg %v, currentIndex %v, counterBuckets %v", - m.cluster, m.name, latestRate, m.rateAvg, m.currentIndex, m.counterBuckets) -} - -func (m *meter) latestRate() float64 { - count := atomic.LoadInt64(&m.uncounted) - atomic.AddInt64(&m.uncounted, -count) - m.mu.Lock() - last := m.last - now := m.clock.Now() - timeWindow := float64(now.Sub(last)) / float64(time.Second) - instantRate := float64(count) / timeWindow - m.last = now - m.mu.Unlock() - - klog.V(6).Infof("FlowControl %s/%s latestRate: count %v, timeWindow %v, rate %v", - m.cluster, m.name, count, timeWindow, instantRate) - - return instantRate -} - -func (m *meter) rate() float64 { - return m.rateAvg -} - -func (m *meter) avgInflight() float64 { - return m.inflightAvg -} - -func (m *meter) maxInflight() int32 { - return m.inflightMax -} - -func (m *meter) currentInflight() int32 { - return atomic.LoadInt32(&m.inflight) -} - -func (m *meter) inflightWorker() { - timerDuration := InflightMeterBucketDuration * 2 - - timer := m.clock.NewTimer(timerDuration) - defer timer.Stop() - - for { - select { - case inflight := <-m.inflightChan: - m.calInflight(inflight) - case <-timer.C(): - m.calInflight(atomic.LoadInt32(&m.inflight)) - case <-m.stopCh: - return - } - timer.Reset(timerDuration) - } -} - -func (m *meter) calInflight(inflight int32) { - m.mu.Lock() - - now := m.clock.Now() - milli := now.UnixMilli() - currentIndex := int(milli / int64(InflightMeterBucketDuration/time.Millisecond) % InflightMeterBucketLen) - lastIndex := m.inflightIndex - - if currentIndex == lastIndex { - max := m.inflightBuckets[currentIndex] - if inflight > max { - m.inflightBuckets[currentIndex] = inflight - } - } else { - fakeIndex := currentIndex - if currentIndex < lastIndex { - fakeIndex += InflightMeterBucketLen - } - inflightDelta := m.inflightBuckets[lastIndex] - - for i := lastIndex + 1; i < fakeIndex; i++ { - index := i % InflightMeterBucketLen - inflightDelta -= m.inflightBuckets[index] - m.inflightBuckets[index] = 0 - } - inflightDelta -= m.inflightBuckets[currentIndex] - m.inflightBuckets[currentIndex] = inflight - m.inflightIndex = currentIndex - - m.inflightAvg = m.inflightAvg + float64(inflightDelta)*InflightMeterBucketDuration.Seconds() - - max := int32(0) - for _, ift := range m.inflightBuckets { - if ift > max { - max = ift - } - } - m.inflightMax = max - - if m.debug { - klog.Infof("[debug] [%v] bucket: %v, delta: %v, max: %v, index: %v, avg: %.2f", - m.clock.Now().Format(time.RFC3339Nano), m.inflightBuckets, inflightDelta, max, currentIndex, m.inflightAvg) - } - } - - m.mu.Unlock() -} - func EnableGlobalFlowControl(schema proxyv1alpha1.FlowControlSchema) bool { switch schema.Strategy { case proxyv1alpha1.GlobalAllocateLimit, proxyv1alpha1.GlobalCountLimit: diff --git a/pkg/flowcontrols/remote/global_flowcontrol.go b/pkg/flowcontrols/remote/global_flowcontrol.go index 9ca287a..c46d1f5 100644 --- a/pkg/flowcontrols/remote/global_flowcontrol.go +++ b/pkg/flowcontrols/remote/global_flowcontrol.go @@ -1,15 +1,17 @@ package remote import ( - "k8s.io/klog" "os" "strconv" "sync" "sync/atomic" "time" + "k8s.io/klog" + proxyv1alpha1 "github.com/kubewharf/kubegateway/pkg/apis/proxy/v1alpha1" "github.com/kubewharf/kubegateway/pkg/flowcontrols/flowcontrol" + "github.com/kubewharf/kubegateway/pkg/flowcontrols/util" "github.com/kubewharf/kubegateway/pkg/gateway/metrics" ) @@ -111,7 +113,7 @@ type GlobalCounterFlowControl interface { type maxInflightWrapper struct { flowcontrol.FlowControl fcc *flowControlCache - meter *meter + meter *util.Meter counter CounterFun lock sync.Mutex cond *sync.Cond @@ -127,7 +129,7 @@ type maxInflightWrapper struct { } func (m *maxInflightWrapper) ExpectToken() int32 { - inflight := m.meter.currentInflight() + inflight := m.meter.CurrentInflight() acquire := int32(0) overLimited := atomic.LoadInt32(&m.overLimited) @@ -178,7 +180,7 @@ func (m *maxInflightWrapper) SetLimit(acquireResult *AcquireResult) bool { m.lock.Lock() if atomic.LoadUint32(&m.serverUnavailable) == 0 { - inflight := m.meter.maxInflight() + inflight := m.meter.MaxInflight() localMax := m.fcc.local.localConfig.MaxRequestsInflight.Max if inflight < localMax { inflight = localMax @@ -250,7 +252,7 @@ func (m *maxInflightWrapper) TryAcquire() bool { return m.FlowControl.TryAcquire() } - currentInflight := atomic.LoadInt32(&m.meter.inflight) + currentInflight := m.meter.CurrentInflight() naxInflight := atomic.LoadInt32(&m.acquiredMaxInflight) overLimited := atomic.LoadInt32(&m.overLimited) max := atomic.LoadInt32(&m.max) @@ -295,7 +297,7 @@ func (m *maxInflightWrapper) Release() { type tokenBucketWrapper struct { flowcontrol.FlowControl fcc *flowControlCache - meter *meter + meter *util.Meter counter CounterFun lock sync.Mutex cond *sync.Cond @@ -316,7 +318,7 @@ func (m *tokenBucketWrapper) ExpectToken() int32 { expect := m.reserve - token batch := m.tokenBatch - lastQPS := m.meter.rate() + lastQPS := m.meter.Rate() if lastQPS > float64(m.tokenBatch) { batch = int32(lastQPS) * GlobalTokenBucketBatchAcquiredPercent / 100 if batch < GlobalTokenBucketBatchAcquireMin { @@ -365,7 +367,7 @@ func (m *tokenBucketWrapper) SetLimit(acquireResult *AcquireResult) bool { } m.lock.Lock() if atomic.LoadUint32(&m.serverUnavailable) == 0 { - lastQPS := m.meter.rate() + lastQPS := m.meter.Rate() localQPS := m.fcc.local.localConfig.TokenBucket.QPS if lastQPS < float64(localQPS) { lastQPS = float64(localQPS) diff --git a/pkg/flowcontrols/remote/remote_counter.go b/pkg/flowcontrols/remote/remote_counter.go index 32bf2d4..1a5a7da 100644 --- a/pkg/flowcontrols/remote/remote_counter.go +++ b/pkg/flowcontrols/remote/remote_counter.go @@ -2,7 +2,6 @@ package remote import ( "context" - "github.com/kubewharf/kubegateway/pkg/gateway/metrics" "os" "sync" "sync/atomic" @@ -13,7 +12,7 @@ import ( "k8s.io/klog" proxyv1alpha1 "github.com/kubewharf/kubegateway/pkg/apis/proxy/v1alpha1" - "github.com/kubewharf/kubegateway/pkg/flowcontrols/util" + "github.com/kubewharf/kubegateway/pkg/gateway/metrics" "github.com/kubewharf/kubegateway/pkg/ratelimiter/clientsets" ) @@ -86,7 +85,6 @@ func (g *globalCounterManager) Add(name string, typ proxyv1alpha1.FlowControlSch g.counterMap[name] = counter go counter.resetCheck(internalStopCh) - go counter.debugInfo(internalStopCh) stopCh := flowControl.Done() go func() { @@ -297,12 +295,6 @@ type globalCounter struct { eventCh chan struct{} manager *globalCounterManager flowControl RemoteFlowControlWrapper - - // debug - requiredRate util.RateMeter - acquireRate util.RateMeter - reqRate util.RateMeter - limitedRate util.RateMeter } type AcquireResult struct { @@ -329,17 +321,6 @@ func (g *globalCounter) send(response *AcquireResult) { g.manager.cluster, g.name, response.result.Error) } - // debug - { - g.requiredRate.RecordN(int64(response.request.Tokens)) - g.reqRate.Record() - if response.result.Accept { - g.acquireRate.RecordN(int64(response.result.Limit)) - } else { - g.limitedRate.Record() - } - } - if g.flowControl.SetLimit(response) { go func() { <-time.NewTimer(time.Millisecond * 200).C @@ -385,22 +366,3 @@ func (g *globalCounter) resetCheck(stopCh <-chan struct{}) { } } } - -func (g *globalCounter) debugInfo(stopCh <-chan struct{}) { - ticker := time.NewTicker(time.Second * 1) - defer ticker.Stop() - for { - select { - case <-ticker.C: - if os.Getenv("DEBUG_LIMITER_COUNTER") == "true" { - klog.Infof("[debug] remote counter [%v/%v/%v]: token [require: %v, acquire: %v], request: [total: %v, limited: %v]", - g.manager.cluster, g.name, g.manager.clientID, - g.requiredRate.Delta(), g.acquireRate.Delta(), - g.reqRate.Delta(), g.limitedRate.Delta(), - ) - } - case <-stopCh: - return - } - } -} diff --git a/pkg/flowcontrols/util/meter.go b/pkg/flowcontrols/util/meter.go new file mode 100644 index 0000000..c234b68 --- /dev/null +++ b/pkg/flowcontrols/util/meter.go @@ -0,0 +1,241 @@ +package util + +import ( + "math" + "sync" + "sync/atomic" + "time" + + "k8s.io/apimachinery/pkg/util/clock" + "k8s.io/klog" +) + +type Meter struct { + rateBucketLen int + rateBucketDuration time.Duration + + inflightBucketLen int + inflightBucketDuration time.Duration + + name string + + stopCh chan struct{} + clock clock.Clock + mu sync.Mutex + started bool + + uncounted int64 + currentIndex int + rateAvg float64 + last time.Time + counterBuckets []float64 + + inflight int32 + inflightIndex int + inflightAvg float64 + inflightMax int32 + inflightBuckets []int32 + inflightChan chan int32 + + debug bool +} + +func NewMeter(name string, + rateBucketLen int, + rateBucketDuration time.Duration, + inflightBucketLen int, + inflightBucketDuration time.Duration, +) *Meter { + stopCh := make(chan struct{}) + m := &Meter{ + name: name, + stopCh: stopCh, + clock: clock.RealClock{}, + last: time.Now(), + mu: sync.Mutex{}, + counterBuckets: make([]float64, rateBucketLen), + rateBucketLen: rateBucketLen, + rateBucketDuration: rateBucketDuration, + inflightBuckets: make([]int32, inflightBucketLen), + inflightBucketLen: inflightBucketLen, + inflightBucketDuration: inflightBucketDuration, + inflightChan: make(chan int32, 1), + } + return m +} + +func (m *Meter) Start() { + if m.started { + return + } + + go m.rateTick() + if m.inflightBucketDuration > 0 { + go m.inflightWorker() + } + + m.started = true +} + +func (m *Meter) Stop() { + close(m.stopCh) +} + +func (m *Meter) Rate() float64 { + return m.rateAvg +} + +func (m *Meter) AvgInflight() float64 { + return m.inflightAvg +} + +func (m *Meter) MaxInflight() int32 { + return m.inflightMax +} + +func (m *Meter) CurrentInflight() int32 { + return atomic.LoadInt32(&m.inflight) +} + +func (m *Meter) AddN(n int64) { + m.add(n) +} + +func (m *Meter) StartOne() { + m.addInflight(1) + m.add(1) +} + +func (m *Meter) EndOne() { + m.addInflight(-1) +} + +func (m *Meter) addInflight(add int32) { + inflight := atomic.AddInt32(&m.inflight, add) + + select { + case m.inflightChan <- inflight: + default: + } +} + +func (m *Meter) add(add int64) { + atomic.AddInt64(&m.uncounted, add) +} + +func (m *Meter) rateTick() { + ticker := time.NewTicker(m.rateBucketDuration) + defer ticker.Stop() + for { + select { + case <-ticker.C: + m.calculateAvgRate() + case <-m.stopCh: + return + } + } +} + +func (m *Meter) calculateAvgRate() { + latestRate := m.latestRate() + + m.mu.Lock() + lastRate := m.counterBuckets[m.currentIndex] + if lastRate == math.NaN() { + lastRate = 0 + } + + rateAvg := m.rateAvg + (latestRate-lastRate)/float64(len(m.counterBuckets)) + m.rateAvg = rateAvg + m.counterBuckets[m.currentIndex] = latestRate + m.currentIndex = (m.currentIndex + 1) % len(m.counterBuckets) + m.mu.Unlock() + + klog.V(6).Infof("FlowControl %s tick: latestRate %v, rateAvg %v, currentIndex %v, counterBuckets %v", + m.name, latestRate, m.rateAvg, m.currentIndex, m.counterBuckets) +} + +func (m *Meter) latestRate() float64 { + count := atomic.LoadInt64(&m.uncounted) + atomic.AddInt64(&m.uncounted, -count) + m.mu.Lock() + last := m.last + now := m.clock.Now() + timeWindow := float64(now.Sub(last)) / float64(time.Second) + instantRate := float64(count) / timeWindow + m.last = now + m.mu.Unlock() + + klog.V(6).Infof("FlowControl %s latestRate: count %v, timeWindow %v,rate %v", + m.name, count, timeWindow, instantRate) + + return instantRate +} + +func (m *Meter) inflightWorker() { + + timerDuration := m.inflightBucketDuration * 2 + + timer := m.clock.NewTimer(timerDuration) + defer timer.Stop() + + for { + select { + case inflight := <-m.inflightChan: + m.calInflight(inflight) + case <-timer.C(): + m.calInflight(atomic.LoadInt32(&m.inflight)) + case <-m.stopCh: + return + } + timer.Reset(timerDuration) + } +} + +func (m *Meter) calInflight(inflight int32) { + m.mu.Lock() + + now := m.clock.Now() + milli := now.UnixMilli() + currentIndex := int(milli / int64(m.inflightBucketDuration/time.Millisecond) % int64(m.inflightBucketLen)) + lastIndex := m.inflightIndex + + if currentIndex == lastIndex { + max := m.inflightBuckets[currentIndex] + if inflight > max { + m.inflightBuckets[currentIndex] = inflight + } + } else { + fakeIndex := currentIndex + if currentIndex < lastIndex { + fakeIndex += m.inflightBucketLen + } + inflightDelta := m.inflightBuckets[lastIndex] + + for i := lastIndex + 1; i < fakeIndex; i++ { + index := i % m.inflightBucketLen + inflightDelta -= m.inflightBuckets[index] + m.inflightBuckets[index] = 0 + } + inflightDelta -= m.inflightBuckets[currentIndex] + m.inflightBuckets[currentIndex] = inflight + m.inflightIndex = currentIndex + + m.inflightAvg = m.inflightAvg + float64(inflightDelta)*m.inflightBucketDuration.Seconds() + + max := int32(0) + for _, ift := range m.inflightBuckets { + if ift > max { + max = ift + } + } + m.inflightMax = max + + if m.debug { + klog.Infof("[debug] [%v] bucket: %v, delta: %v, max: %v, index: %v, avg: %.2f", + m.clock.Now().Format(time.RFC3339Nano), m.inflightBuckets, inflightDelta, max, currentIndex, m.inflightAvg) + } + } + + m.mu.Unlock() +} diff --git a/pkg/flowcontrols/remote/flowcontrol_wrapper_test.go b/pkg/flowcontrols/util/meter_test.go similarity index 61% rename from pkg/flowcontrols/remote/flowcontrol_wrapper_test.go rename to pkg/flowcontrols/util/meter_test.go index b280bde..6ca5fe9 100644 --- a/pkg/flowcontrols/remote/flowcontrol_wrapper_test.go +++ b/pkg/flowcontrols/util/meter_test.go @@ -1,16 +1,19 @@ -package remote +package util import ( - "k8s.io/apimachinery/pkg/util/clock" "sync" "testing" "time" + + "k8s.io/apimachinery/pkg/util/clock" ) func Test_meter_inflight(t *testing.T) { - cluster := "fake-cluster" name := "fake-flowcontrol" + inflightBucketSecond := 0.1 + inflightBucketLen := 6 + type fields struct { mockInflightPerBuckets []int32 debugLog bool @@ -27,7 +30,7 @@ func Test_meter_inflight(t *testing.T) { mockInflightPerBuckets: []int32{1, 3, 5, 4, 2, 3}, debugLog: false, }, - wantAvg: 3, // sum(1, 3, 5, 4, 2)*0.2 + wantAvg: 15 * inflightBucketSecond, // sum(1, 3, 5, 4, 2)*0.2 wantMax: 5, }, { @@ -36,7 +39,7 @@ func Test_meter_inflight(t *testing.T) { mockInflightPerBuckets: []int32{1, 3, 5, 4, 2, 1, 6, 3}, debugLog: false, }, - wantAvg: 3.6, // sum(5, 4, 2, 1, 6)*0.2 + wantAvg: 18 * inflightBucketSecond, // sum(5, 4, 2, 1, 6)*0.2 wantMax: 6, }, { @@ -45,7 +48,7 @@ func Test_meter_inflight(t *testing.T) { mockInflightPerBuckets: []int32{1, 3, 5, 4, 2, 3, 8, 4, 3, 6, 2, 4, 3}, debugLog: false, }, - wantAvg: 3.8, + wantAvg: 19 * inflightBucketSecond, wantMax: 6, }, { @@ -54,7 +57,7 @@ func Test_meter_inflight(t *testing.T) { mockInflightPerBuckets: []int32{1, 3, 5, 4, 2, 3, 6, 4, 3, 8, 2, 4, 3}, debugLog: false, }, - wantAvg: 4.2, + wantAvg: 21 * inflightBucketSecond, wantMax: 8, }, { @@ -63,23 +66,27 @@ func Test_meter_inflight(t *testing.T) { mockInflightPerBuckets: []int32{1, 3, 5, 4, 2, 3, 7, 4, 3, 8, 2, 4, 3, 9, 4, 6, 3, 8, 6, 3, 5, 6, 7, 5, 3, 1, 9, 4}, debugLog: false, }, - wantAvg: 5, + wantAvg: 25 * inflightBucketSecond, wantMax: 9, }, } for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { fakeClock := clock.NewFakeClock(time.Now()) - m := &meter{ - cluster: cluster, - name: name, - stopCh: make(chan struct{}), - clock: fakeClock, - last: time.Now(), - mu: sync.Mutex{}, - counterBuckets: make([]float64, QPSMeterBucketLen), - inflightBuckets: make([]int32, InflightMeterBucketLen), - inflightChan: make(chan int32, 100), + m := &Meter{ + name: name, + stopCh: make(chan struct{}), + clock: fakeClock, + last: time.Now(), + mu: sync.Mutex{}, + counterBuckets: make([]float64, 3), + rateBucketLen: 3, + rateBucketDuration: time.Second, + + inflightBuckets: make([]int32, inflightBucketLen), + inflightBucketLen: inflightBucketLen, + inflightBucketDuration: time.Millisecond * time.Duration(1000*inflightBucketSecond), + inflightChan: make(chan int32, 100), debug: tt.fields.debugLog, } @@ -92,13 +99,13 @@ func Test_meter_inflight(t *testing.T) { if tt.fields.debugLog { t.Logf("clock: %v", m.clock.Now().Format(time.RFC3339Nano)) } - fakeClock.Sleep(InflightMeterBucketDuration) + fakeClock.Sleep(m.inflightBucketDuration) } - if delta := m.avgInflight() - tt.wantAvg; delta > 0.001 || delta < -0.001 { - t.Errorf("avgInflight() = %v, want %v", m.avgInflight(), tt.wantAvg) + if delta := m.AvgInflight() - tt.wantAvg; delta > 0.001 || delta < -0.001 { + t.Errorf("avgInflight() = %v, want %v", m.AvgInflight(), tt.wantAvg) } - if got := m.maxInflight(); got != tt.wantMax { + if got := m.MaxInflight(); got != tt.wantMax { t.Errorf("maxInflight() = %v, want %v", got, tt.wantMax) } }) diff --git a/pkg/flowcontrols/util/rate.go b/pkg/flowcontrols/util/rate.go deleted file mode 100644 index 3f71c1f..0000000 --- a/pkg/flowcontrols/util/rate.go +++ /dev/null @@ -1,42 +0,0 @@ -package util - -import ( - "sync" - "sync/atomic" - "time" -) - -type RateMeter struct { - lastCount int64 - lastTime time.Time - count int64 - lock sync.Mutex -} - -func (r *RateMeter) Record() { - r.RecordN(1) -} - -func (r *RateMeter) RecordN(n int64) { - atomic.AddInt64(&r.count, n) -} - -func (r *RateMeter) Delta() int64 { - r.lock.Lock() - count := atomic.LoadInt64(&r.count) - lastCount := atomic.LoadInt64(&r.lastCount) - now := time.Now() - //lastTime := r.lastTime - - r.lastCount = count - r.lastTime = now - r.lock.Unlock() - - delta := count - lastCount - //qps = float64(delta) / now.Sub(lastTime).Seconds() - return delta -} - -func (r *RateMeter) Count() int64 { - return atomic.LoadInt64(&r.count) -}