Skip to content

Commit

Permalink
Provide support for scaling based on Redis Key-Values
Browse files Browse the repository at this point in the history
Signed-off-by: wangrushen <[email protected]>
  • Loading branch information
dovics committed Oct 27, 2024
1 parent b2ce95d commit b617e98
Show file tree
Hide file tree
Showing 7 changed files with 845 additions and 68 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,7 @@ Here is an overview of all new **experimental** features:

### Improvements

- **Redis Scaler**: Provide support for scaling based on Redis Key-Values ([#5003](https://github.com/kedacore/keda/issues/5003))
- **General**: Prevent multiple ScaledObjects managing one HPA ([#6130](https://github.com/kedacore/keda/issues/6130))
- **AWS CloudWatch Scaler**: Add support for ignoreNullValues ([#5352](https://github.com/kedacore/keda/issues/5352))
- **Elasticsearch Scaler**: Support Query at the Elasticsearch scaler ([#6216](https://github.com/kedacore/keda/issues/6216))
Expand Down
163 changes: 100 additions & 63 deletions pkg/scalers/redis_scaler.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,9 +23,25 @@ const (
defaultEnableTLS = false
)

const getListLengthLuaScript = `
local listName = KEYS[1]
local listType = redis.call('type', listName).ok
local cmd = {
zset = 'zcard',
set = 'scard',
list = 'llen',
hash = 'hlen',
none = 'llen'
}
return redis.call(cmd[listType], listName)`

var (
// ErrRedisNoListName is returned when "listName" is missing from the config.
ErrRedisNoListName = errors.New("no list name given")
// ErrRedisNeitherKeyOrList is none of "listName" and "keyName" are set.
ErrRedisNeitherKeyOrList = errors.New("neither listName nor keyName are set")

// ErrRedisBothKeyAndList is both "listName" and "keyName" are set.
ErrRedisBothKeyAndList = errors.New("both listName and keyName are set")

// ErrRedisNoAddresses is returned when the "addresses" in the connection info is empty.
ErrRedisNoAddresses = errors.New("no addresses or hosts given. address should be a comma separated list of host:port or set the host/port values")
Expand All @@ -41,7 +57,8 @@ type redisScaler struct {
metricType v2.MetricTargetType
metadata *redisMetadata
closeFn func() error
getListLengthFn func(context.Context) (int64, error)
getValueFn func(context.Context) (float64, error)
activationValue float64
logger logr.Logger
}

Expand All @@ -63,14 +80,19 @@ type redisConnectionInfo struct {
}

type redisMetadata struct {
ListLength int64 `keda:"name=listLength, order=triggerMetadata, optional, default=5"`
ActivationListLength int64 `keda:"name=activationListLength, order=triggerMetadata, optional"`
ListName string `keda:"name=listName, order=triggerMetadata"`
DatabaseIndex int `keda:"name=databaseIndex, order=triggerMetadata, optional"`
MetadataEnableTLS string `keda:"name=enableTLS, order=triggerMetadata, optional"`
AuthParamEnableTLS string `keda:"name=tls, order=authParams, optional"`
ConnectionInfo redisConnectionInfo `keda:"optional"`
triggerIndex int
ListLength int64 `keda:"name=listLength, order=triggerMetadata, optional, default=5"`
ActivationListLength int64 `keda:"name=activationListLength, order=triggerMetadata, optional"`
ListName string `keda:"name=listName, order=triggerMetadata, optional"`

KeyValue float64 `keda:"name=keyValue, order=triggerMetadata, optional, default=5"`
ActivationKeyValue float64 `keda:"name=activationKeyValue, order=triggerMetadata, optional"`
KeyName string `keda:"name=keyName, order=triggerMetadata, optional"`

DatabaseIndex int `keda:"name=databaseIndex, order=triggerMetadata, optional"`
MetadataEnableTLS string `keda:"name=enableTLS, order=triggerMetadata, optional"`
AuthParamEnableTLS string `keda:"name=tls, order=authParams, optional"`
ConnectionInfo redisConnectionInfo `keda:"optional"`
triggerIndex int
}

func (rci *redisConnectionInfo) SetEnableTLS(metadataEnableTLS string, authParamEnableTLS string) error {
Expand Down Expand Up @@ -105,7 +127,6 @@ func (rci *redisConnectionInfo) SetEnableTLS(metadataEnableTLS string, authParam

func (r *redisMetadata) Validate() error {
err := validateRedisAddress(&r.ConnectionInfo)

if err != nil {
return err
}
Expand All @@ -115,25 +136,19 @@ func (r *redisMetadata) Validate() error {
r.MetadataEnableTLS, r.AuthParamEnableTLS = "", ""
}

if r.ListName == "" && r.KeyName == "" {
return ErrRedisNeitherKeyOrList
}

if r.ListName != "" && r.KeyName != "" {
return ErrRedisBothKeyAndList
}

return err
}

// NewRedisScaler creates a new redisScaler
func NewRedisScaler(ctx context.Context, isClustered, isSentinel bool, config *scalersconfig.ScalerConfig) (Scaler, error) {
luaScript := `
local listName = KEYS[1]
local listType = redis.call('type', listName).ok
local cmd = {
zset = 'zcard',
set = 'scard',
list = 'llen',
hash = 'hlen',
none = 'llen'
}
return redis.call(cmd[listType], listName)
`

metricType, err := GetMetricTargetType(config)
if err != nil {
return nil, fmt.Errorf("error getting scaler metric type: %w", err)
Expand All @@ -147,14 +162,52 @@ func NewRedisScaler(ctx context.Context, isClustered, isSentinel bool, config *s
}

if isClustered {
return createClusteredRedisScaler(ctx, meta, luaScript, metricType, logger)
return createClusteredRedisScaler(ctx, meta, metricType, logger)
} else if isSentinel {
return createSentinelRedisScaler(ctx, meta, luaScript, metricType, logger)
return createSentinelRedisScaler(ctx, meta, metricType, logger)
}
return createRedisScaler(ctx, meta, metricType, logger)
}

func getValueFn(meta *redisMetadata, client redis.Cmdable) func(ctx context.Context) (float64, error) {
switch {
case meta.KeyName != "":
return func(ctx context.Context) (float64, error) {
cmd := client.Get(ctx, meta.KeyName)
if cmd.Err() != nil {
return -1, cmd.Err()
}

return cmd.Float64()
}
case meta.ListName != "":
return func(ctx context.Context) (float64, error) {
cmd := client.Eval(ctx, getListLengthLuaScript, []string{meta.ListName})
if cmd.Err() != nil {
return -1, cmd.Err()
}

return cmd.Float64()
}
// should never happen, because we check keyName and listName in meta.Valaidate()
default:
return nil
}
}

func getActivationValue(meta *redisMetadata) float64 {
switch {
case meta.KeyName != "":
return meta.ActivationKeyValue
case meta.ListName != "":
return float64(meta.ActivationListLength)
// should never happen, because we check keyName and listName in meta.Valaidate()
default:
return 0
}
return createRedisScaler(ctx, meta, luaScript, metricType, logger)
}

func createClusteredRedisScaler(ctx context.Context, meta *redisMetadata, script string, metricType v2.MetricTargetType, logger logr.Logger) (Scaler, error) {
func createClusteredRedisScaler(ctx context.Context, meta *redisMetadata, metricType v2.MetricTargetType, logger logr.Logger) (Scaler, error) {
client, err := getRedisClusterClient(ctx, meta.ConnectionInfo)
if err != nil {
return nil, fmt.Errorf("connection to redis cluster failed: %w", err)
Expand All @@ -168,43 +221,37 @@ func createClusteredRedisScaler(ctx context.Context, meta *redisMetadata, script
return nil
}

listLengthFn := func(ctx context.Context) (int64, error) {
cmd := client.Eval(ctx, script, []string{meta.ListName})
if cmd.Err() != nil {
return -1, cmd.Err()
}

return cmd.Int64()
}

return &redisScaler{
scaler := &redisScaler{
metricType: metricType,
metadata: meta,
closeFn: closeFn,
getListLengthFn: listLengthFn,
getValueFn: getValueFn(meta, client),
activationValue: getActivationValue(meta),
logger: logger,
}, nil
}

return scaler, nil
}

func createSentinelRedisScaler(ctx context.Context, meta *redisMetadata, script string, metricType v2.MetricTargetType, logger logr.Logger) (Scaler, error) {
func createSentinelRedisScaler(ctx context.Context, meta *redisMetadata, metricType v2.MetricTargetType, logger logr.Logger) (Scaler, error) {
client, err := getRedisSentinelClient(ctx, meta.ConnectionInfo, meta.DatabaseIndex)
if err != nil {
return nil, fmt.Errorf("connection to redis sentinel failed: %w", err)
}

return createRedisScalerWithClient(client, meta, script, metricType, logger), nil
return createRedisScalerWithClient(client, meta, metricType, logger), nil
}

func createRedisScaler(ctx context.Context, meta *redisMetadata, script string, metricType v2.MetricTargetType, logger logr.Logger) (Scaler, error) {
func createRedisScaler(ctx context.Context, meta *redisMetadata, metricType v2.MetricTargetType, logger logr.Logger) (Scaler, error) {
client, err := getRedisClient(ctx, meta.ConnectionInfo, meta.DatabaseIndex)
if err != nil {
return nil, fmt.Errorf("connection to redis failed: %w", err)
}

return createRedisScalerWithClient(client, meta, script, metricType, logger), nil
return createRedisScalerWithClient(client, meta, metricType, logger), nil
}

func createRedisScalerWithClient(client *redis.Client, meta *redisMetadata, script string, metricType v2.MetricTargetType, logger logr.Logger) Scaler {
func createRedisScalerWithClient(client *redis.Client, meta *redisMetadata, metricType v2.MetricTargetType, logger logr.Logger) Scaler {
closeFn := func() error {
if err := client.Close(); err != nil {
logger.Error(err, "error closing redis client")
Expand All @@ -213,20 +260,12 @@ func createRedisScalerWithClient(client *redis.Client, meta *redisMetadata, scri
return nil
}

listLengthFn := func(ctx context.Context) (int64, error) {
cmd := client.Eval(ctx, script, []string{meta.ListName})
if cmd.Err() != nil {
return -1, cmd.Err()
}

return cmd.Int64()
}

return &redisScaler{
metricType: metricType,
metadata: meta,
closeFn: closeFn,
getListLengthFn: listLengthFn,
getValueFn: getValueFn(meta, client),
activationValue: getActivationValue(meta),
logger: logger,
}
}
Expand Down Expand Up @@ -262,16 +301,15 @@ func (s *redisScaler) GetMetricSpecForScaling(context.Context) []v2.MetricSpec {

// GetMetricsAndActivity connects to Redis and finds the length of the list
func (s *redisScaler) GetMetricsAndActivity(ctx context.Context, metricName string) ([]external_metrics.ExternalMetricValue, bool, error) {
listLen, err := s.getListLengthFn(ctx)

value, err := s.getValueFn(ctx)
if err != nil {
s.logger.Error(err, "error getting list length")
s.logger.Error(err, "error getting value")
return []external_metrics.ExternalMetricValue{}, false, err
}

metric := GenerateMetricInMili(metricName, float64(listLen))
metric := GenerateMetricInMili(metricName, value)

return []external_metrics.ExternalMetricValue{metric}, listLen > s.metadata.ActivationListLength, nil
return []external_metrics.ExternalMetricValue{metric}, value > s.activationValue, nil
}

func validateRedisAddress(c *redisConnectionInfo) error {
Expand All @@ -283,7 +321,6 @@ func validateRedisAddress(c *redisConnectionInfo) error {
c.Addresses = append(c.Addresses, net.JoinHostPort(c.Hosts[i], c.Ports[i]))
}
}
// }

if len(c.Addresses) == 0 || len(c.Addresses[0]) == 0 {
return ErrRedisNoAddresses
Expand Down
Loading

0 comments on commit b617e98

Please sign in to comment.