Skip to content

Commit

Permalink
Refactor kubernetes workload (#6226)
Browse files Browse the repository at this point in the history
Signed-off-by: rickbrouwer <[email protected]>
  • Loading branch information
rickbrouwer authored Oct 21, 2024
1 parent cf4026f commit efe4d7c
Show file tree
Hide file tree
Showing 2 changed files with 67 additions and 55 deletions.
93 changes: 47 additions & 46 deletions pkg/scalers/kubernetes_workload_scaler.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@ package scalers
import (
"context"
"fmt"
"strconv"

"github.com/go-logr/logr"
v2 "k8s.io/api/autoscaling/v2"
Expand All @@ -18,16 +17,13 @@ import (

type kubernetesWorkloadScaler struct {
metricType v2.MetricTargetType
metadata *kubernetesWorkloadMetadata
metadata kubernetesWorkloadMetadata
kubeClient client.Client
logger logr.Logger
}

const (
kubernetesWorkloadMetricType = "External"
podSelectorKey = "podSelector"
valueKey = "value"
activationValueKey = "activationValue"
)

var phasesCountedAsTerminated = []corev1.PodPhase{
Expand All @@ -36,11 +32,22 @@ var phasesCountedAsTerminated = []corev1.PodPhase{
}

type kubernetesWorkloadMetadata struct {
podSelector labels.Selector
namespace string
value float64
activationValue float64
triggerIndex int
PodSelector string `keda:"name=podSelector, order=triggerMetadata"`
Value float64 `keda:"name=value, order=triggerMetadata"`
ActivationValue float64 `keda:"name=activationValue, order=triggerMetadata, default=0"`

namespace string
triggerIndex int
podSelector labels.Selector
asMetricSource bool
}

func (m *kubernetesWorkloadMetadata) Validate() error {
if m.Value <= 0 && !m.asMetricSource {
return fmt.Errorf("value must be a float greater than 0")
}

return nil
}

// NewKubernetesWorkloadScaler creates a new kubernetesWorkloadScaler
Expand All @@ -50,9 +57,9 @@ func NewKubernetesWorkloadScaler(kubeClient client.Client, config *scalersconfig
return nil, fmt.Errorf("error getting scaler metric type: %w", err)
}

meta, parseErr := parseWorkloadMetadata(config)
if parseErr != nil {
return nil, fmt.Errorf("error parsing kubernetes workload metadata: %w", parseErr)
meta, err := parseKubernetesWorkloadMetadata(config)
if err != nil {
return nil, fmt.Errorf("error parsing kubernetes workload metadata: %w", err)
}

return &kubernetesWorkloadScaler{
Expand All @@ -63,50 +70,46 @@ func NewKubernetesWorkloadScaler(kubeClient client.Client, config *scalersconfig
}, nil
}

func parseWorkloadMetadata(config *scalersconfig.ScalerConfig) (*kubernetesWorkloadMetadata, error) {
meta := &kubernetesWorkloadMetadata{}
var err error
func parseKubernetesWorkloadMetadata(config *scalersconfig.ScalerConfig) (kubernetesWorkloadMetadata, error) {
meta := kubernetesWorkloadMetadata{}
err := config.TypedConfig(&meta)
if err != nil {
return meta, fmt.Errorf("error parsing kubernetes workload metadata: %w", err)
}

meta.namespace = config.ScalableObjectNamespace
podSelector, err := labels.Parse(config.TriggerMetadata[podSelectorKey])
if err != nil || podSelector.String() == "" {
return nil, fmt.Errorf("invalid pod selector")
meta.triggerIndex = config.TriggerIndex
meta.asMetricSource = config.AsMetricSource

if meta.asMetricSource {
meta.Value = 0
}
meta.podSelector = podSelector
value, err := strconv.ParseFloat(config.TriggerMetadata[valueKey], 64)
if err != nil || value == 0 {
if config.AsMetricSource {
value = 0
} else {
return nil, fmt.Errorf("value must be a float greater than 0")
}

selector, err := labels.Parse(meta.PodSelector)
if err != nil {
return meta, fmt.Errorf("error parsing pod selector: %w", err)
}
meta.value = value
meta.podSelector = selector

meta.activationValue = 0
if val, ok := config.TriggerMetadata[activationValueKey]; ok {
activationValue, err := strconv.ParseFloat(val, 64)
if err != nil {
return nil, fmt.Errorf("value must be a float")
}
meta.activationValue = activationValue
if err := meta.Validate(); err != nil {
return meta, err
}

meta.triggerIndex = config.TriggerIndex
return meta, nil
}

// Close no need for kubernetes workload scaler
func (s *kubernetesWorkloadScaler) Close(context.Context) error {
return nil
}

// GetMetricSpecForScaling returns the metric spec for the HPA
func (s *kubernetesWorkloadScaler) GetMetricSpecForScaling(context.Context) []v2.MetricSpec {
metricName := kedautil.NormalizeString(fmt.Sprintf("workload-%s", s.metadata.namespace))
externalMetric := &v2.ExternalMetricSource{
Metric: v2.MetricIdentifier{
Name: GenerateMetricNameWithIndex(s.metadata.triggerIndex, kedautil.NormalizeString(fmt.Sprintf("workload-%s", s.metadata.namespace))),
Name: GenerateMetricNameWithIndex(s.metadata.triggerIndex, metricName),
},
Target: GetMetricTargetMili(s.metricType, s.metadata.value),
Target: GetMetricTargetMili(s.metricType, s.metadata.Value),
}
metricSpec := v2.MetricSpec{External: externalMetric, Type: kubernetesWorkloadMetricType}
return []v2.MetricSpec{metricSpec}
Expand All @@ -121,19 +124,17 @@ func (s *kubernetesWorkloadScaler) GetMetricsAndActivity(ctx context.Context, me

metric := GenerateMetricInMili(metricName, float64(pods))

return []external_metrics.ExternalMetricValue{metric}, float64(pods) > s.metadata.activationValue, nil
return []external_metrics.ExternalMetricValue{metric}, float64(pods) > s.metadata.ActivationValue, nil
}

func (s *kubernetesWorkloadScaler) getMetricValue(ctx context.Context) (int64, error) {
podList := &corev1.PodList{}
listOptions := client.ListOptions{}
listOptions.LabelSelector = s.metadata.podSelector
listOptions.Namespace = s.metadata.namespace
opts := []client.ListOption{
&listOptions,
listOptions := client.ListOptions{
LabelSelector: s.metadata.podSelector,
Namespace: s.metadata.namespace,
}

err := s.kubeClient.List(ctx, podList, opts...)
err := s.kubeClient.List(ctx, podList, &listOptions)
if err != nil {
return 0, err
}
Expand Down
29 changes: 20 additions & 9 deletions pkg/scalers/kubernetes_workload_scaler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,13 @@ var parseWorkloadMetadataTestDataset = []workloadMetadataTestData{

func TestParseWorkloadMetadata(t *testing.T) {
for _, testData := range parseWorkloadMetadataTestDataset {
_, err := parseWorkloadMetadata(&scalersconfig.ScalerConfig{TriggerMetadata: testData.metadata, ScalableObjectNamespace: testData.namespace})
_, err := NewKubernetesWorkloadScaler(
fake.NewClientBuilder().Build(),
&scalersconfig.ScalerConfig{
TriggerMetadata: testData.metadata,
ScalableObjectNamespace: testData.namespace,
},
)
if err != nil && !testData.isError {
t.Error("Expected success but got error", err)
}
Expand Down Expand Up @@ -68,7 +74,7 @@ var isActiveWorkloadTestDataset = []workloadIsActiveTestData{

func TestWorkloadIsActive(t *testing.T) {
for _, testData := range isActiveWorkloadTestDataset {
s, _ := NewKubernetesWorkloadScaler(
s, err := NewKubernetesWorkloadScaler(
fake.NewClientBuilder().WithRuntimeObjects(createPodlist(testData.podCount)).Build(),
&scalersconfig.ScalerConfig{
TriggerMetadata: testData.metadata,
Expand All @@ -77,6 +83,10 @@ func TestWorkloadIsActive(t *testing.T) {
ScalableObjectNamespace: testData.namespace,
},
)
if err != nil {
t.Error("Error creating scaler", err)
continue
}
_, isActive, _ := s.GetMetricsAndActivity(context.TODO(), "Metric")
if testData.active && !isActive {
t.Error("Expected active but got inactive")
Expand Down Expand Up @@ -107,7 +117,7 @@ var getMetricSpecForScalingTestDataset = []workloadGetMetricSpecForScalingTestDa

func TestWorkloadGetMetricSpecForScaling(t *testing.T) {
for _, testData := range getMetricSpecForScalingTestDataset {
s, _ := NewKubernetesWorkloadScaler(
s, err := NewKubernetesWorkloadScaler(
fake.NewClientBuilder().Build(),
&scalersconfig.ScalerConfig{
TriggerMetadata: testData.metadata,
Expand All @@ -117,6 +127,10 @@ func TestWorkloadGetMetricSpecForScaling(t *testing.T) {
TriggerIndex: testData.triggerIndex,
},
)
if err != nil {
t.Error("Error creating scaler", err)
continue
}
metric := s.GetMetricSpecForScaling(context.Background())

if metric[0].External.Metric.Name != testData.name {
Expand Down Expand Up @@ -145,14 +159,11 @@ func createPodlist(count int) *v1.PodList {

func TestWorkloadPhase(t *testing.T) {
phases := map[v1.PodPhase]bool{
v1.PodRunning: true,
// succeeded and failed clearly count as terminated
v1.PodRunning: true,
v1.PodSucceeded: false,
v1.PodFailed: false,
// unknown could be for example a temporarily unresponsive node; count the pod
v1.PodUnknown: true,
// count pre-Running to avoid an additional delay on top of the poll interval
v1.PodPending: true,
v1.PodUnknown: true,
v1.PodPending: true,
}
for phase, active := range phases {
list := &v1.PodList{}
Expand Down

0 comments on commit efe4d7c

Please sign in to comment.