Skip to content

Commit

Permalink
Merge pull request go-graphite#264 from msaf1980/feat/sd_tune
Browse files Browse the repository at this point in the history
sd: allow to tune degraded point
  • Loading branch information
msaf1980 authored Feb 20, 2024
2 parents ac5e5a3 + a648e3e commit b866b8d
Show file tree
Hide file tree
Showing 25 changed files with 240 additions and 187 deletions.
3 changes: 2 additions & 1 deletion .github/workflows/tests.yml
Original file line number Diff line number Diff line change
Expand Up @@ -97,8 +97,9 @@ jobs:

- name: Install packaging dependencies
run: |
gem install dotenv -v 2.8.1 # workaroaund for ruby version 2.7.8.225
gem install fpm package_cloud
GO111MODULE=off go get github.com/mitchellh/gox
go install github.com/mitchellh/gox@latest
- name: Check packaging
run: |
Expand Down
20 changes: 14 additions & 6 deletions config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -104,12 +104,14 @@ type Common struct {
MemoryReturnInterval time.Duration `toml:"memory-return-interval" json:"memory-return-interval" comment:"daemon will return the freed memory to the OS when it>0"`
HeadersToLog []string `toml:"headers-to-log" json:"headers-to-log" comment:"additional request headers to log"`

BaseWeight int `toml:"base_weight" json:"base_weight" comment:"service discovery base weight (on idle)"`
SDType SDType `toml:"service-discovery-type" json:"service-discovery-type" comment:"service discovery type"`
SD string `toml:"service-discovery" json:"service-discovery" comment:"service discovery address (consul)"`
SDNamespace string `toml:"service-discovery-ns" json:"service-discovery-ns" comment:"service discovery namespace (graphite by default)"`
SDDc []string `toml:"service-discovery-ds" json:"service-discovery-ds" comment:"service discovery datacenters (first - is primary, in other register as backup)"`
SDExpire time.Duration `toml:"service-discovery-expire" json:"service-discovery-expire" comment:"service discovery expire duration for cleanup (minimum is 24h, if enabled)"`
BaseWeight int `toml:"base_weight" json:"base_weight" comment:"service discovery base weight (on idle)"`
DegragedMultiply float64 `toml:"degraged_multiply" json:"degraged_multiply" comment:"service discovery degraded load avg multiplier (if normalized load avg > degraged_load_avg) (default 4.0)"`
DegragedLoad float64 `toml:"degraged_load_avg" json:"degraged_load_avg" comment:"service discovery normilized load avg degraded point (default 1.0)"`
SDType SDType `toml:"service-discovery-type" json:"service-discovery-type" comment:"service discovery type"`
SD string `toml:"service-discovery" json:"service-discovery" comment:"service discovery address (consul)"`
SDNamespace string `toml:"service-discovery-ns" json:"service-discovery-ns" comment:"service discovery namespace (graphite by default)"`
SDDc []string `toml:"service-discovery-ds" json:"service-discovery-ds" comment:"service discovery datacenters (first - is primary, in other register as backup)"`
SDExpire time.Duration `toml:"service-discovery-expire" json:"service-discovery-expire" comment:"service discovery expire duration for cleanup (minimum is 24h, if enabled)"`

FindCacheConfig CacheConfig `toml:"find-cache" json:"find-cache" comment:"find/tags cache config"`

Expand Down Expand Up @@ -732,6 +734,12 @@ func Unmarshal(body []byte, exactConfig bool) (cfg *Config, warns []zap.Field, e
// NeedLoadAvgColect check if load avg collect is neeeded
func (c *Config) NeedLoadAvgColect() bool {
if c.Common.SD != "" {
if c.Common.DegragedMultiply <= 0 {
c.Common.DegragedMultiply = 4.0
}
if c.Common.DegragedLoad <= 0 {
c.Common.DegragedLoad = 1.0
}
if c.Common.BaseWeight <= 0 {
c.Common.BaseWeight = 100
}
Expand Down
4 changes: 4 additions & 0 deletions doc/config.md
Original file line number Diff line number Diff line change
Expand Up @@ -194,6 +194,10 @@ Only one tag used as filter for index field Tag1, see graphite_tagged table [str
headers-to-log = []
# service discovery base weight (on idle)
base_weight = 0
# service discovery degraded load avg multiplier (if normalized load avg > degraged_load_avg) (default 4.0)
degraged_multiply = 0.0
# service discovery normilized load avg degraded point (default 1.0)
degraged_load_avg = 0.0
# service discovery type
service-discovery-type = 0
# service discovery address (consul)
Expand Down
2 changes: 1 addition & 1 deletion issues/daytime/test.toml
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
precision = "10s"

[[test.clickhouse]]
version = "latest"
version = "23.12"
dir = "tests/clickhouse/rollup"

[test.carbon_clickhouse]
Expand Down
63 changes: 32 additions & 31 deletions limiter/alimiter.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ var (
checkDelay = time.Second * 60
)

// calc reserved slots count based on load average (for protect overload)
func getWeighted(n, max int) int {
if n <= 0 {
return 0
Expand All @@ -35,31 +36,31 @@ func getWeighted(n, max int) int {

// ALimiter provide limiter amount of requests/concurrently executing requests (adaptive with load avg)
type ALimiter struct {
l limiter
cL limiter
c int
n int
limiter limiter
concurrentLimiter limiter
concurrent int
n int

m metrics.WaitMetric
}

// NewServerLimiter creates a limiter for specific servers list.
func NewALimiter(l, c, n int, enableMetrics bool, scope, sub string) ServerLimiter {
if l <= 0 && c <= 0 {
func NewALimiter(capacity, concurrent, n int, enableMetrics bool, scope, sub string) ServerLimiter {
if capacity <= 0 && concurrent <= 0 {
return NoopLimiter{}
}
if n >= c {
n = c - 1
if n >= concurrent {
n = concurrent - 1
}
if n <= 0 {
return NewWLimiter(l, c, enableMetrics, scope, sub)
return NewWLimiter(capacity, concurrent, enableMetrics, scope, sub)
}

a := &ALimiter{
m: metrics.NewWaitMetric(enableMetrics, scope, sub), c: c, n: n,
m: metrics.NewWaitMetric(enableMetrics, scope, sub), concurrent: concurrent, n: n,
}
a.cL.ch = make(chan struct{}, c)
a.cL.cap = c
a.concurrentLimiter.ch = make(chan struct{}, concurrent)
a.concurrentLimiter.cap = concurrent

go a.balance()

Expand All @@ -70,17 +71,17 @@ func (sl *ALimiter) balance() int {
var last int
for {
start := time.Now()
n := getWeighted(sl.n, sl.c)
n := getWeighted(sl.n, sl.concurrent)
if n > last {
for i := 0; i < n-last; i++ {
if sl.cL.enter(ctxMain, "balance") != nil {
if sl.concurrentLimiter.enter(ctxMain, "balance") != nil {
break
}
}
last = n
} else if n < last {
for i := 0; i < last-n; i++ {
sl.cL.leave(ctxMain, "balance")
sl.concurrentLimiter.leave(ctxMain, "balance")
}
last = n
}
Expand All @@ -92,20 +93,20 @@ func (sl *ALimiter) balance() int {
}

func (sl *ALimiter) Capacity() int {
return sl.l.capacity()
return sl.limiter.capacity()
}

func (sl *ALimiter) Enter(ctx context.Context, s string) (err error) {
if sl.l.cap > 0 {
if err = sl.l.tryEnter(ctx, s); err != nil {
if sl.limiter.cap > 0 {
if err = sl.limiter.tryEnter(ctx, s); err != nil {
sl.m.WaitErrors.Add(1)
return
}
}
if sl.cL.cap > 0 {
if sl.cL.enter(ctx, s) != nil {
if sl.l.cap > 0 {
sl.l.leave(ctx, s)
if sl.concurrentLimiter.cap > 0 {
if sl.concurrentLimiter.enter(ctx, s) != nil {
if sl.limiter.cap > 0 {
sl.limiter.leave(ctx, s)
}
sl.m.WaitErrors.Add(1)
err = ErrTimeout
Expand All @@ -117,16 +118,16 @@ func (sl *ALimiter) Enter(ctx context.Context, s string) (err error) {

// TryEnter claims one of free slots without blocking.
func (sl *ALimiter) TryEnter(ctx context.Context, s string) (err error) {
if sl.l.cap > 0 {
if err = sl.l.tryEnter(ctx, s); err != nil {
if sl.limiter.cap > 0 {
if err = sl.limiter.tryEnter(ctx, s); err != nil {
sl.m.WaitErrors.Add(1)
return
}
}
if sl.cL.cap > 0 {
if sl.cL.tryEnter(ctx, s) != nil {
if sl.l.cap > 0 {
sl.l.leave(ctx, s)
if sl.concurrentLimiter.cap > 0 {
if sl.concurrentLimiter.tryEnter(ctx, s) != nil {
if sl.limiter.cap > 0 {
sl.limiter.leave(ctx, s)
}
sl.m.WaitErrors.Add(1)
err = ErrTimeout
Expand All @@ -138,10 +139,10 @@ func (sl *ALimiter) TryEnter(ctx context.Context, s string) (err error) {

// Frees a slot in limiter
func (sl *ALimiter) Leave(ctx context.Context, s string) {
if sl.l.cap > 0 {
sl.l.leave(ctx, s)
if sl.limiter.cap > 0 {
sl.limiter.leave(ctx, s)
}
sl.cL.leave(ctx, s)
sl.concurrentLimiter.leave(ctx, s)
}

// SendDuration send StatsD duration iming
Expand Down
59 changes: 31 additions & 28 deletions limiter/alimiter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,99 +15,102 @@ import (
func Test_getWeighted(t *testing.T) {
tests := []struct {
loadAvg float64
c int
n int
max int
want int
}{
{loadAvg: 0, c: 100, n: 100, want: 0},
{loadAvg: 0.2, c: 100, n: 100, want: 0},
{loadAvg: 0.999, c: 100, n: 1, want: 0},
{loadAvg: 1, c: 1, n: 100, want: 1},
{loadAvg: 1, c: 100, n: 100, want: 99},
{loadAvg: 1, c: 101, n: 100, want: 100},
{loadAvg: 1, c: 200, n: 100, want: 100},
{loadAvg: 2, c: 100, n: 200, want: 99},
{loadAvg: 2, c: 200, n: 200, want: 199},
{loadAvg: 2, c: 300, n: 200, want: 299},
{loadAvg: 2, c: 400, n: 200, want: 399},
{loadAvg: 2, c: 401, n: 200, want: 400},
{loadAvg: 2, c: 402, n: 200, want: 400},
{loadAvg: 0, max: 100, n: 100, want: 0},
{loadAvg: 0.2, max: 100, n: 100, want: 0},
{loadAvg: 0.7, max: 100, n: 100, want: 70},
{loadAvg: 0.8, max: 100, n: 100, want: 80},
{loadAvg: 0.999, max: 100, n: 100, want: 99},
{loadAvg: 0.999, max: 100, n: 1, want: 0},
{loadAvg: 1, max: 1, n: 100, want: 1},
{loadAvg: 1, max: 100, n: 100, want: 99},
{loadAvg: 1, max: 101, n: 100, want: 100},
{loadAvg: 1, max: 200, n: 100, want: 100},
{loadAvg: 2, max: 100, n: 200, want: 99},
{loadAvg: 2, max: 200, n: 200, want: 199},
{loadAvg: 2, max: 300, n: 200, want: 299},
{loadAvg: 2, max: 400, n: 200, want: 399},
{loadAvg: 2, max: 401, n: 200, want: 400},
{loadAvg: 2, max: 402, n: 200, want: 400},
}
for n, tt := range tests {
t.Run(strconv.Itoa(n), func(t *testing.T) {
load_avg.Store(tt.loadAvg)
if got := getWeighted(tt.n, tt.c); got != tt.want {
t.Errorf("load avg = %f getWeighted(%d) = %v, want %v", tt.loadAvg, tt.n, got, tt.want)
if got := getWeighted(tt.n, tt.max); got != tt.want {
t.Errorf("load avg = %f getWeighted(%d, %d) = %v, want %v", tt.loadAvg, tt.n, tt.max, got, tt.want)
}
})
}
}

func TestNewALimiter(t *testing.T) {
l := 14
c := 12
capacity := 14
concurrent := 12
n := 10
checkDelay = time.Millisecond * 10
limiter := NewALimiter(l, c, n, false, "", "")
limiter := NewALimiter(capacity, concurrent, n, false, "", "")

// inital - load not collected
load_avg.Store(0)

var i int
ctx, cancel := context.WithTimeout(context.Background(), time.Millisecond*100)

for i = 0; i < c; i++ {
for i = 0; i < concurrent; i++ {
require.NoError(t, limiter.Enter(ctx, "render"), "try to lock with load_avg = 0 [%d]", i)
}

require.Error(t, limiter.Enter(ctx, "render"))

for i = 0; i < c; i++ {
for i = 0; i < concurrent; i++ {
limiter.Leave(ctx, "render")
}

cancel()

// load_avg 0.5
load_avg.Store(0.5)
k := getWeighted(n, c)
k := getWeighted(n, concurrent)
require.Equal(t, 0, k)

// load_avg 0.6
load_avg.Store(0.6)
k = getWeighted(n, c)
k = getWeighted(n, concurrent)
require.Equal(t, n*6/10, k)

time.Sleep(checkDelay * 2)

ctx, cancel = context.WithTimeout(context.Background(), time.Millisecond*100)
for i = 0; i < c-k; i++ {
for i = 0; i < concurrent-k; i++ {
require.NoError(t, limiter.Enter(ctx, "render"), "try to lock with load_avg = 0.5 [%d]", i)
}

require.Error(t, limiter.Enter(ctx, "render"))

for i = 0; i < c-k; i++ {
for i = 0; i < concurrent-k; i++ {
limiter.Leave(ctx, "render")
}

cancel()

// // load_avg 1
load_avg.Store(1)
k = getWeighted(n, c)
k = getWeighted(n, concurrent)
require.Equal(t, n, k)

time.Sleep(checkDelay * 2)

ctx, cancel = context.WithTimeout(context.Background(), time.Millisecond*10)
for i = 0; i < c-n; i++ {
for i = 0; i < concurrent-n; i++ {
require.NoError(t, limiter.Enter(ctx, "render"), "try to lock with load_avg = 1 [%d]", i)
}

require.Error(t, limiter.Enter(ctx, "render"))

for i = 0; i < c-n; i++ {
for i = 0; i < concurrent-n; i++ {
limiter.Leave(ctx, "render")
}

Expand Down
Loading

0 comments on commit b866b8d

Please sign in to comment.