diff --git a/docs/configuration.md b/docs/configuration.md index 5e53f34c..02cc15e1 100644 --- a/docs/configuration.md +++ b/docs/configuration.md @@ -126,6 +126,9 @@ namespace的配置格式为json,包含分表、非分表、实例等配置信 | rw_flag | int | 读写标识, 只读=1, 读写=2 | | rw_split | int | 是否读写分离, 非读写分离=0, 读写分离=1 | | other_property | int | 目前用来标识是否走统计从实例, 普通用户=0, 统计用户=1 | +| open_rate_limit | bool | 是否开启流控 | +| max_rate_limit | int | 最大流控速率 | +| max_token_wait_time | int | 流控时获取不到流控颁发的令牌,最大等待时间,单位ms | ### 全局序列号配置 diff --git a/models/user.go b/models/user.go index 56bb5739..2e7fa3fd 100644 --- a/models/user.go +++ b/models/user.go @@ -41,12 +41,15 @@ const ( // User meand user struct type User struct { - UserName string `json:"user_name"` - Password string `json:"password"` - Namespace string `json:"namespace"` - RWFlag int `json:"rw_flag"` //1: 只读 2:读写 - RWSplit int `json:"rw_split"` //0: 不采用读写分离 1:读写分离 - OtherProperty int `json:"other_property"` // 1:统计用户 + UserName string `json:"user_name"` + Password string `json:"password"` + Namespace string `json:"namespace"` + RWFlag int `json:"rw_flag"` //1: 只读 2:读写 + RWSplit int `json:"rw_split"` //0: 不采用读写分离 1:读写分离 + OtherProperty int `json:"other_property"` // 1:统计用户 + OpenRateLimit bool `json:"open_rate_limit"` // false:非流控;true:流控 + MaxRateLimit int `json:"max_rate_limit"` // 流控,最大QPS速率,默认值是0. + MaxTokenWaitTime int `json:"max_token_wait_time"` //流控,获取流控token默认最大等待时间。默认值0,表示不等待 } func (p *User) verify() error { @@ -77,5 +80,13 @@ func (p *User) verify() error { return fmt.Errorf("invalid other property, user: %s, %d", p.UserName, p.OtherProperty) } + if p.MaxRateLimit < 0 { + return fmt.Errorf("invalid MaxRateLimit, MaxRateLimit: %d, max_rate_limit should >= 0", p.MaxRateLimit) + } + + if p.MaxTokenWaitTime < 0 { + return fmt.Errorf("invalid MaxTokenWaitTime, MaxTokenWaitTime: %d, max_token_wait_time should >= 0", p.MaxTokenWaitTime) + } + return nil } diff --git a/proxy/server/executor.go b/proxy/server/executor.go index f4f74be4..5d234629 100644 --- a/proxy/server/executor.go +++ b/proxy/server/executor.go @@ -267,8 +267,22 @@ func (se *SessionExecutor) GetDatabase() string { return se.db } +func (se *SessionExecutor) checkFlowControl() (bool, error) { + ns := se.GetNamespace() + if ns.userProperties != nil && ns.userProperties[se.user] != nil { + return ns.userProperties[se.user].GetRateLimiterToken() + } else { + return true, nil + } +} + // ExecuteCommand execute command func (se *SessionExecutor) ExecuteCommand(cmd byte, data []byte) Response { + if ok, err := se.checkFlowControl(); !ok { + log.Warn("flow control error: %v", err) + return CreateErrorResponse(se.status, err) + } + switch cmd { case mysql.ComQuit: se.handleRollback(nil) diff --git a/proxy/server/namespace.go b/proxy/server/namespace.go index 5aff93c2..89edba56 100644 --- a/proxy/server/namespace.go +++ b/proxy/server/namespace.go @@ -15,11 +15,13 @@ package server import ( + "context" "errors" "fmt" "net" "strconv" "strings" + "sync" "time" "github.com/XiaoMi/Gaea/backend" @@ -31,6 +33,7 @@ import ( "github.com/XiaoMi/Gaea/proxy/sequence" "github.com/XiaoMi/Gaea/util" "github.com/XiaoMi/Gaea/util/cache" + "golang.org/x/time/rate" ) const ( @@ -51,6 +54,46 @@ type UserProperty struct { RWFlag int RWSplit int OtherProperty int + rateLimiter *rate.Limiter + maxTokenWaitTimeMs int64 //流控,获取流控token默认最大等待时间。默认值0,表示不等待 + lock sync.RWMutex +} +func (cc *UserProperty) initRateLimiter(openRateLimit bool, maxRateLimit int, maxTokenWaitTime int) { + cc.lock.Lock() + defer cc.lock.Unlock() + if !openRateLimit { + cc.rateLimiter = nil + return + } + + //cc.maxRateLimit = maxRateLimit + cc.rateLimiter = rate.NewLimiter(rate.Limit(maxRateLimit), maxRateLimit) + cc.maxTokenWaitTimeMs = int64(maxTokenWaitTime) +} + +func (cc *UserProperty) GetRateLimiterToken() (ok bool, err error) { + cc.lock.RLock() + defer cc.lock.RUnlock() + if cc.rateLimiter == nil { + return true, nil + } + + ok = true + if cc.maxTokenWaitTimeMs > 0 { + getCtx, cancel := context.WithTimeout(context.Background(), time.Duration(cc.maxTokenWaitTimeMs)*time.Millisecond) + defer cancel() + err = cc.rateLimiter.Wait(getCtx) + if err != nil { + ok = false + } + } else { + ok = cc.rateLimiter.Allow() + if !ok { + err = fmt.Errorf("rate limit %v,can't get rate limit token now", cc.rateLimiter.Limit()) + } + } + + return ok, err } // Namespace is struct driected used by server @@ -160,7 +203,8 @@ func NewNamespace(namespaceConfig *models.Namespace) (*Namespace, error) { // init user properties for _, user := range namespaceConfig.Users { up := &UserProperty{RWFlag: user.RWFlag, RWSplit: user.RWSplit, OtherProperty: user.OtherProperty} - namespace.userProperties[user.UserName] = up + up.initRateLimiter(user.OpenRateLimit, user.MaxRateLimit, user.MaxTokenWaitTime) + namespace.userProperties[user.UserName] = up } // init backend slices