Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Feature/target rebalance #69

Open
wants to merge 8 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
The table of contents is too big for display.
Diff view
Diff view
  •  
  •  
  •  
84 changes: 63 additions & 21 deletions cmd/kvass/coordinator.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,24 +47,38 @@ import (
)

var cdCfg = struct {
shardType string
shardStaticFile string
shardNamespace string
shardSelector string
shardPort int
shardMaxSeries int64
shardMinShard int32
shardMaxShard int32
shardMaxIdleTime time.Duration
shardDeletePVC bool
exploreMaxCon int
webAddress string
configFile string
syncInterval time.Duration
sdInitTimeout time.Duration
configInject configInjectOption
shardType string
shardStaticFile string
shardNamespace string
shardSelector string
shardPort int
shardMaxSeries int64
shardMinShard int32
shardMaxShard int32
shardMaxIdleTime time.Duration
shardDeletePVC bool
exploreMaxCon int
webAddress string
configFile string
syncInterval time.Duration
sdInitTimeout time.Duration
configInject configInjectOption
logLevel string
rebalanceEnable bool
rebalanceInterval time.Duration
rebalanceHealthRateWatermark float64
forceRebalanceInterval time.Duration
}{}

type LocalFormatter struct {
logrus.Formatter
}

func (u LocalFormatter) Format(e *logrus.Entry) ([]byte, error) {
e.Time = e.Time.In(time.FixedZone("UTC+8", 8*60*60))
return u.Formatter.Format(e)
}

func init() {
coordinatorCmd.Flags().StringVar(&cdCfg.shardType, "shard.type", "k8s",
"type of shard deploy: 'k8s'(default), 'static'")
Expand Down Expand Up @@ -103,6 +117,16 @@ func init() {
"ckube-apiserver proxy url to inject to all kubernetes sd")
coordinatorCmd.Flags().StringVar(&cdCfg.configInject.kubernetes.serviceAccountPath, "inject.kubernetes-sa-path", "",
"change default service account token path")
coordinatorCmd.Flags().StringVar(&cdCfg.logLevel, "log.level", "info",
"log level")
coordinatorCmd.Flags().BoolVar(&cdCfg.rebalanceEnable, "coordinator.rebalance-enable", false,
"coordinator will rebalance the targets of every job")
coordinatorCmd.Flags().DurationVar(&cdCfg.rebalanceInterval, "coordinator.rebalance-interval", time.Minute*5,
"the interval of coordinator rebalance loop")
coordinatorCmd.Flags().Float64Var(&cdCfg.rebalanceHealthRateWatermark, "coordinator.rebalance-health-rate-watermark", 0.5,
"the watermark of healthRate, only run rebalance when healthRate of targets is greater than watermark")
coordinatorCmd.Flags().DurationVar(&cdCfg.forceRebalanceInterval, "coordinator.force-rebalance-interval", time.Hour*24,
"the interval of coordinator force to run rebalance, default 1 day")
rootCmd.AddCommand(coordinatorCmd)
}

Expand All @@ -116,6 +140,11 @@ distribution targets to shards`,
return err
}

logLevel, err := logrus.ParseLevel(cdCfg.logLevel)
if err != nil {
return err
}

level := &promlog.AllowedLevel{}
level.Set("info")
format := &promlog.AllowedFormat{}
Expand All @@ -136,11 +165,14 @@ distribution targets to shards`,

cd = coordinator.NewCoordinator(
&coordinator.Option{
MaxSeries: cdCfg.shardMaxSeries,
MaxShard: cdCfg.shardMaxShard,
MinShard: cdCfg.shardMinShard,
MaxIdleTime: cdCfg.shardMaxIdleTime,
Period: cdCfg.syncInterval,
MaxSeries: cdCfg.shardMaxSeries,
MaxShard: cdCfg.shardMaxShard,
MinShard: cdCfg.shardMinShard,
MaxIdleTime: cdCfg.shardMaxIdleTime,
Period: cdCfg.syncInterval,
RebalancePeriod: cdCfg.rebalanceInterval,
RebalanceHealthRateWatermark: cdCfg.rebalanceHealthRateWatermark,
ForceRebalanceInterval: cdCfg.forceRebalanceInterval,
},
getReplicasManager(lg),
cfgManager.ConfigInfo,
Expand Down Expand Up @@ -174,6 +206,9 @@ distribution targets to shards`,
lg.WithField("component", "web"),
)

lg.SetFormatter(LocalFormatter{&logrus.TextFormatter{}})
lg.Level = logLevel

if err := cfgManager.ReloadFromFile(cdCfg.configFile); err != nil {
panic(err)
}
Expand Down Expand Up @@ -213,6 +248,13 @@ distribution targets to shards`,
return cd.Run(ctx)
})

if cdCfg.rebalanceEnable {
g.Go(func() error {
lg.Infof("rebalance start")
return cd.RunRebalance(ctx)
})
}

g.Go(func() error {
lg.Infof("api start at %s", cdCfg.webAddress)
return svc.Run(cdCfg.webAddress)
Expand Down
18 changes: 17 additions & 1 deletion cmd/kvass/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,23 @@ package main
import (
"fmt"
"github.com/gin-gonic/gin"
_ "github.com/prometheus/prometheus/discovery/install"
_ "github.com/prometheus/prometheus/discovery/aws" // register aws
_ "github.com/prometheus/prometheus/discovery/azure" // register azure
_ "github.com/prometheus/prometheus/discovery/consul" // register consul
_ "github.com/prometheus/prometheus/discovery/digitalocean" // register digitalocean
_ "github.com/prometheus/prometheus/discovery/dns" // register dns
_ "github.com/prometheus/prometheus/discovery/eureka" // register eureka
_ "github.com/prometheus/prometheus/discovery/file" // register file
_ "github.com/prometheus/prometheus/discovery/hetzner" // register hetzner
_ "github.com/prometheus/prometheus/discovery/http" // register http
_ "github.com/prometheus/prometheus/discovery/kubernetes" // register kubernetes
_ "github.com/prometheus/prometheus/discovery/linode" // register linode
_ "github.com/prometheus/prometheus/discovery/marathon" // register marathon
_ "github.com/prometheus/prometheus/discovery/moby" // register moby
_ "github.com/prometheus/prometheus/discovery/openstack" // register openstack
_ "github.com/prometheus/prometheus/discovery/scaleway" // register scaleway
_ "github.com/prometheus/prometheus/discovery/triton" // register triton
_ "github.com/prometheus/prometheus/discovery/zookeeper" // register zookeeper
"github.com/spf13/cobra"
"math/rand"
"os"
Expand Down
22 changes: 13 additions & 9 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -4,23 +4,27 @@ go 1.15

require (
github.com/cssivision/reverseproxy v0.0.1
github.com/fsnotify/fsnotify v1.4.9 // indirect
github.com/gin-contrib/pprof v1.3.0
github.com/gin-gonic/gin v1.6.3
github.com/go-kit/kit v0.10.0
github.com/go-kit/log v0.2.1
github.com/gobuffalo/packr/v2 v2.2.0
github.com/mitchellh/hashstructure/v2 v2.0.1
github.com/grd/statistics v0.0.0-20130405091615-5af75da930c9
github.com/mitchellh/hashstructure/v2 v2.0.2
github.com/mroth/weightedrand v0.4.1
github.com/pkg/errors v0.9.1
github.com/prometheus/common v0.14.0
github.com/prometheus/prometheus v1.8.2-0.20201015110737-0a7fdd3b7696
github.com/prometheus/common v0.42.0
github.com/prometheus/prometheus v0.0.0-20210701113011-642722e5d01a
github.com/sirupsen/logrus v1.6.0
github.com/spf13/cobra v1.0.0
github.com/stretchr/testify v1.6.1
github.com/stretchr/testify v1.8.0
go.etcd.io/etcd v0.0.0-20191023171146-3cf2f69b5738
golang.org/x/sync v0.0.0-20200930132711-30421366ff76
golang.org/x/sys v0.0.0-20210309040221-94ec62e08169 // indirect
golang.org/x/sync v0.0.0-20220722155255-886fb9371eb4
gopkg.in/yaml.v2 v2.4.0
k8s.io/api v0.19.2
k8s.io/apimachinery v0.19.2
k8s.io/client-go v0.19.2
k8s.io/api v0.21.1
k8s.io/apimachinery v0.21.1
k8s.io/client-go v0.21.1
)

replace github.com/prometheus/prometheus => ./staging/src/github.com/promethues/prometheus
Loading