From dbc2ce11cfc91ee4f09c37931b6fc3404b80a0b3 Mon Sep 17 00:00:00 2001 From: kunwooy Date: Thu, 19 Sep 2024 16:41:07 +0900 Subject: [PATCH] feat: add activation feature for CPU/Memory scaler Signed-off-by: kunwooy --- CHANGELOG.md | 2 +- .../keda/scaledobject_controller_test.go | 5 +- pkg/scalers/cpu_memory_scaler.go | 112 +++++++++++- pkg/scalers/cpu_memory_scaler_test.go | 164 +++++++++++++++++- pkg/scaling/scale_handler.go | 105 ++++++----- pkg/scaling/scalers_builder.go | 4 +- tests/scalers/cpu/cpu_test.go | 14 +- 7 files changed, 340 insertions(+), 66 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 71cef1b99cf..4f0b279f950 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -70,11 +70,11 @@ Here is an overview of all new **experimental** features: ### Improvements - **AWS CloudWatch Scaler**: Add support for ignoreNullValues ([#5352](https://github.com/kedacore/keda/issues/5352)) +- **CPU/Memory Scaler**: Add activation feature ([#6057](https://github.com/kedacore/keda/issues/6057)) - **GCP Scalers**: Added custom time horizon in GCP scalers ([#5778](https://github.com/kedacore/keda/issues/5778)) - **GitHub Scaler**: Fixed pagination, fetching repository list ([#5738](https://github.com/kedacore/keda/issues/5738)) - **Kafka**: Fix logic to scale to zero on invalid offset even with earliest offsetResetPolicy ([#5689](https://github.com/kedacore/keda/issues/5689)) - **RabbitMQ Scaler**: Add connection name for AMQP ([#5958](https://github.com/kedacore/keda/issues/5958)) -- TODO ([#XXX](https://github.com/kedacore/keda/issues/XXX)) ### Fixes diff --git a/controllers/keda/scaledobject_controller_test.go b/controllers/keda/scaledobject_controller_test.go index 059025fc2ab..a09ca0c1f4f 100644 --- a/controllers/keda/scaledobject_controller_test.go +++ b/controllers/keda/scaledobject_controller_test.go @@ -790,13 +790,16 @@ var _ = Describe("ScaledObjectController", func() { Eventually(func() error { return k8sClient.Get(context.Background(), types.NamespacedName{Name: getHPAName(so), Namespace: "default"}, hpa) }).ShouldNot(HaveOccurred()) + + averageUtilization := int32(100) hpa.Status.CurrentMetrics = []autoscalingv2.MetricStatus{ { Type: autoscalingv2.ResourceMetricSourceType, Resource: &autoscalingv2.ResourceMetricStatus{ Name: corev1.ResourceCPU, Current: autoscalingv2.MetricValueStatus{ - Value: resource.NewQuantity(int64(100), resource.DecimalSI), + Value: resource.NewQuantity(int64(100), resource.DecimalSI), + AverageUtilization: &averageUtilization, }, }, }, diff --git a/pkg/scalers/cpu_memory_scaler.go b/pkg/scalers/cpu_memory_scaler.go index da5119f3ec0..f6d4aa9a6a1 100644 --- a/pkg/scalers/cpu_memory_scaler.go +++ b/pkg/scalers/cpu_memory_scaler.go @@ -9,8 +9,11 @@ import ( v2 "k8s.io/api/autoscaling/v2" v1 "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/api/resource" + "k8s.io/apimachinery/pkg/types" "k8s.io/metrics/pkg/apis/external_metrics" + "sigs.k8s.io/controller-runtime/pkg/client" + kedav1alpha1 "github.com/kedacore/keda/v2/apis/keda/v1alpha1" "github.com/kedacore/keda/v2/pkg/scalers/scalersconfig" ) @@ -18,17 +21,23 @@ type cpuMemoryScaler struct { metadata *cpuMemoryMetadata resourceName v1.ResourceName logger logr.Logger + kubeClient client.Client } type cpuMemoryMetadata struct { - Type v2.MetricTargetType - AverageValue *resource.Quantity - AverageUtilization *int32 - ContainerName string + Type v2.MetricTargetType + AverageValue *resource.Quantity + AverageUtilization *int32 + ContainerName string + ActivationAverageValue *resource.Quantity + ActivationAverageUtilization *int32 + ScalableObjectName string + ScalableObjectType string + ScalableObjectNamespace string } // NewCPUMemoryScaler creates a new cpuMemoryScaler -func NewCPUMemoryScaler(resourceName v1.ResourceName, config *scalersconfig.ScalerConfig) (Scaler, error) { +func NewCPUMemoryScaler(resourceName v1.ResourceName, config *scalersconfig.ScalerConfig, kubeClient client.Client) (Scaler, error) { logger := InitializeLogger(config, "cpu_memory_scaler") meta, parseErr := parseResourceMetadata(config, logger) @@ -40,12 +49,13 @@ func NewCPUMemoryScaler(resourceName v1.ResourceName, config *scalersconfig.Scal metadata: meta, resourceName: resourceName, logger: logger, + kubeClient: kubeClient, }, nil } func parseResourceMetadata(config *scalersconfig.ScalerConfig, logger logr.Logger) (*cpuMemoryMetadata, error) { meta := &cpuMemoryMetadata{} - var value string + var value, activationValue string var ok bool value, ok = config.TriggerMetadata["type"] switch { @@ -63,10 +73,17 @@ func parseResourceMetadata(config *scalersconfig.ScalerConfig, logger logr.Logge if value, ok = config.TriggerMetadata["value"]; !ok || value == "" { return nil, fmt.Errorf("no value given") } + if activationValue, ok = config.TriggerMetadata["activationValue"]; !ok || activationValue == "" { + activationValue = "0" + } + switch meta.Type { case v2.AverageValueMetricType: averageValueQuantity := resource.MustParse(value) meta.AverageValue = &averageValueQuantity + + activationValueQuantity := resource.MustParse(activationValue) + meta.ActivationAverageValue = &activationValueQuantity case v2.UtilizationMetricType: valueNum, err := strconv.ParseInt(value, 10, 32) if err != nil { @@ -74,6 +91,13 @@ func parseResourceMetadata(config *scalersconfig.ScalerConfig, logger logr.Logge } utilizationNum := int32(valueNum) meta.AverageUtilization = &utilizationNum + + valueNum, err = strconv.ParseInt(activationValue, 10, 32) + if err != nil { + return nil, err + } + activationAverageUtilization := int32(valueNum) + meta.ActivationAverageUtilization = &activationAverageUtilization default: return nil, fmt.Errorf("unsupported metric type, allowed values are 'Utilization' or 'AverageValue'") } @@ -82,6 +106,10 @@ func parseResourceMetadata(config *scalersconfig.ScalerConfig, logger logr.Logge meta.ContainerName = value } + meta.ScalableObjectName = config.ScalableObjectName + meta.ScalableObjectNamespace = config.ScalableObjectNamespace + meta.ScalableObjectType = config.ScalableObjectType + return meta, nil } @@ -90,6 +118,34 @@ func (s *cpuMemoryScaler) Close(context.Context) error { return nil } +func (s *cpuMemoryScaler) getHPA(ctx context.Context) (*v2.HorizontalPodAutoscaler, error) { + if s.metadata.ScalableObjectType == "ScaledObject" { + scaledObject := &kedav1alpha1.ScaledObject{} + err := s.kubeClient.Get(ctx, types.NamespacedName{ + Name: s.metadata.ScalableObjectName, + Namespace: s.metadata.ScalableObjectNamespace, + }, scaledObject) + + if err != nil { + return nil, err + } + + hpa := &v2.HorizontalPodAutoscaler{} + err = s.kubeClient.Get(ctx, types.NamespacedName{ + Name: scaledObject.Status.HpaName, + Namespace: s.metadata.ScalableObjectNamespace, + }, hpa) + + if err != nil { + return nil, err + } + + return hpa, nil + } + + return nil, nil +} + // GetMetricSpecForScaling returns the metric spec for the HPA func (s *cpuMemoryScaler) GetMetricSpecForScaling(context.Context) []v2.MetricSpec { var metricSpec v2.MetricSpec @@ -120,7 +176,45 @@ func (s *cpuMemoryScaler) GetMetricSpecForScaling(context.Context) []v2.MetricSp return []v2.MetricSpec{metricSpec} } -// GetMetricsAndActivity no need for cpu/memory scaler and always active for cpu/memory scaler -func (s *cpuMemoryScaler) GetMetricsAndActivity(_ context.Context, _ string) ([]external_metrics.ExternalMetricValue, bool, error) { - return nil, true, nil +// GetMetricsAndActivity only returns the activity of the cpu/memory scaler +func (s *cpuMemoryScaler) GetMetricsAndActivity(ctx context.Context, metricName string) ([]external_metrics.ExternalMetricValue, bool, error) { + hpa, err := s.getHPA(ctx) + if err != nil { + return nil, false, err + } + + if hpa == nil { + if s.metadata.ScalableObjectType == "ScaledJob" { + return nil, false, nil + } + return nil, false, fmt.Errorf("HPA not found") + } + + for _, metric := range hpa.Status.CurrentMetrics { + if metric.Resource == nil { + continue + } + + if string(metric.Resource.Name) != metricName { + continue + } + + if s.metadata.Type == v2.AverageValueMetricType { + averageValue := metric.Resource.Current.AverageValue + if averageValue == nil { + return nil, false, fmt.Errorf("HPA has no average value") + } + + return nil, averageValue.Cmp(*s.metadata.ActivationAverageValue) == 1, nil + } else if s.metadata.Type == v2.UtilizationMetricType { + averageUtilization := metric.Resource.Current.AverageUtilization + if averageUtilization == nil { + return nil, false, fmt.Errorf("HPA has no average utilization") + } + + return nil, *averageUtilization > *s.metadata.ActivationAverageUtilization, nil + } + } + + return nil, false, fmt.Errorf("no matching resource metric found for %s", s.resourceName) } diff --git a/pkg/scalers/cpu_memory_scaler_test.go b/pkg/scalers/cpu_memory_scaler_test.go index 81f7ea9df9a..e1d83826f9c 100644 --- a/pkg/scalers/cpu_memory_scaler_test.go +++ b/pkg/scalers/cpu_memory_scaler_test.go @@ -2,13 +2,19 @@ package scalers import ( "context" + "fmt" "testing" "github.com/go-logr/logr" "github.com/stretchr/testify/assert" v2 "k8s.io/api/autoscaling/v2" v1 "k8s.io/api/core/v1" + "k8s.io/apimachinery/pkg/api/resource" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/client-go/kubernetes/scheme" + "sigs.k8s.io/controller-runtime/pkg/client/fake" + kedav1alpha1 "github.com/kedacore/keda/v2/apis/keda/v1alpha1" "github.com/kedacore/keda/v2/pkg/scalers/scalersconfig" ) @@ -20,8 +26,9 @@ type parseCPUMemoryMetadataTestData struct { // A complete valid metadata example for reference var validCPUMemoryMetadata = map[string]string{ - "type": "Utilization", - "value": "50", + "type": "Utilization", + "value": "50", + "activationValue": "40", } var validContainerCPUMemoryMetadata = map[string]string{ "type": "Utilization", @@ -37,6 +44,7 @@ var testCPUMemoryMetadata = []parseCPUMemoryMetadataTestData{ {v2.UtilizationMetricType, map[string]string{"value": "50"}, false}, {"", map[string]string{"type": "AverageValue", "value": "50"}, false}, {v2.AverageValueMetricType, map[string]string{"value": "50"}, false}, + {"", map[string]string{"type": "AverageValue", "value": "50", "activationValue": "40"}, false}, {"", map[string]string{"type": "Value", "value": "50"}, true}, {v2.ValueMetricType, map[string]string{"value": "50"}, true}, {"", map[string]string{"type": "AverageValue"}, true}, @@ -64,7 +72,8 @@ func TestGetMetricSpecForScaling(t *testing.T) { config := &scalersconfig.ScalerConfig{ TriggerMetadata: validCPUMemoryMetadata, } - scaler, _ := NewCPUMemoryScaler(v1.ResourceCPU, config) + kubeClient := fake.NewFakeClient() + scaler, _ := NewCPUMemoryScaler(v1.ResourceCPU, config, kubeClient) metricSpec := scaler.GetMetricSpecForScaling(context.Background()) assert.Equal(t, metricSpec[0].Type, v2.ResourceMetricSourceType) @@ -76,7 +85,7 @@ func TestGetMetricSpecForScaling(t *testing.T) { TriggerMetadata: map[string]string{"value": "50"}, MetricType: v2.UtilizationMetricType, } - scaler, _ = NewCPUMemoryScaler(v1.ResourceCPU, config) + scaler, _ = NewCPUMemoryScaler(v1.ResourceCPU, config, kubeClient) metricSpec = scaler.GetMetricSpecForScaling(context.Background()) assert.Equal(t, metricSpec[0].Type, v2.ResourceMetricSourceType) @@ -89,7 +98,8 @@ func TestGetContainerMetricSpecForScaling(t *testing.T) { config := &scalersconfig.ScalerConfig{ TriggerMetadata: validContainerCPUMemoryMetadata, } - scaler, _ := NewCPUMemoryScaler(v1.ResourceCPU, config) + kubeClient := fake.NewFakeClient() + scaler, _ := NewCPUMemoryScaler(v1.ResourceCPU, config, kubeClient) metricSpec := scaler.GetMetricSpecForScaling(context.Background()) assert.Equal(t, metricSpec[0].Type, v2.ContainerResourceMetricSourceType) @@ -102,7 +112,7 @@ func TestGetContainerMetricSpecForScaling(t *testing.T) { TriggerMetadata: map[string]string{"value": "50", "containerName": "bar"}, MetricType: v2.UtilizationMetricType, } - scaler, _ = NewCPUMemoryScaler(v1.ResourceCPU, config) + scaler, _ = NewCPUMemoryScaler(v1.ResourceCPU, config, kubeClient) metricSpec = scaler.GetMetricSpecForScaling(context.Background()) assert.Equal(t, metricSpec[0].Type, v2.ContainerResourceMetricSourceType) @@ -110,3 +120,145 @@ func TestGetContainerMetricSpecForScaling(t *testing.T) { assert.Equal(t, metricSpec[0].ContainerResource.Target.Type, v2.UtilizationMetricType) assert.Equal(t, metricSpec[0].ContainerResource.Container, "bar") } + +func createScaledObject() *kedav1alpha1.ScaledObject { + maxReplicas := int32(3) + minReplicas := int32(0) + pollingInterval := int32(10) + return &kedav1alpha1.ScaledObject{ + TypeMeta: metav1.TypeMeta{ + APIVersion: "keda.sh/v1alpha1", + Kind: "ScaledObject", + }, + ObjectMeta: metav1.ObjectMeta{ + Name: "test-name", + Namespace: "test-namespace", + }, + Spec: kedav1alpha1.ScaledObjectSpec{ + MaxReplicaCount: &maxReplicas, + MinReplicaCount: &minReplicas, + PollingInterval: &pollingInterval, + ScaleTargetRef: &kedav1alpha1.ScaleTarget{ + APIVersion: "apps/v1", + Kind: "Deployment", + Name: "test-deployment", + }, + Triggers: []kedav1alpha1.ScaleTriggers{ + { + Type: "cpu", + Metadata: map[string]string{ + "activationValue": "500", + "value": "800", + }, + MetricType: v2.UtilizationMetricType, + }, + }, + }, + Status: kedav1alpha1.ScaledObjectStatus{ + HpaName: "keda-hpa-test-name", + }, + } +} + +func createHPAWithAverageUtilization(averageUtilization int32) (*v2.HorizontalPodAutoscaler, error) { + minReplicas := int32(1) + averageValue, err := resource.ParseQuantity("800m") + if err != nil { + return nil, fmt.Errorf("error parsing quantity: %s", err) + } + + return &v2.HorizontalPodAutoscaler{ + TypeMeta: metav1.TypeMeta{ + APIVersion: "autoscaling/v2", + Kind: "HorizontalPodAutoscaler", + }, + ObjectMeta: metav1.ObjectMeta{ + Name: "keda-hpa-test-name", + Namespace: "test-namespace", + }, + Spec: v2.HorizontalPodAutoscalerSpec{ + MaxReplicas: 3, + MinReplicas: &minReplicas, + Metrics: []v2.MetricSpec{ + + { + Type: v2.ResourceMetricSourceType, + Resource: &v2.ResourceMetricSource{ + Name: v1.ResourceCPU, + Target: v2.MetricTarget{ + AverageUtilization: &averageUtilization, + Type: v2.UtilizationMetricType, + }, + }, + }, + }, + }, + Status: v2.HorizontalPodAutoscalerStatus{ + CurrentMetrics: []v2.MetricStatus{ + { + Type: v2.ResourceMetricSourceType, + Resource: &v2.ResourceMetricStatus{ + Name: v1.ResourceCPU, + Current: v2.MetricValueStatus{ + AverageUtilization: &averageUtilization, + AverageValue: &averageValue, + }, + }, + }, + }, + }, + }, nil +} + +func TestGetMetricsAndActivity_IsActive(t *testing.T) { + config := &scalersconfig.ScalerConfig{ + TriggerMetadata: validCPUMemoryMetadata, + ScalableObjectType: "ScaledObject", + ScalableObjectName: "test-name", + ScalableObjectNamespace: "test-namespace", + } + + hpa, err := createHPAWithAverageUtilization(50) + if err != nil { + t.Errorf("Error creating HPA: %s", err) + return + } + + err = kedav1alpha1.AddToScheme(scheme.Scheme) + if err != nil { + t.Errorf("Error adding to scheme: %s", err) + return + } + + kubeClient := fake.NewClientBuilder().WithObjects(hpa, createScaledObject()).WithScheme(scheme.Scheme).Build() + scaler, _ := NewCPUMemoryScaler(v1.ResourceCPU, config, kubeClient) + + _, isActive, _ := scaler.GetMetricsAndActivity(context.Background(), "cpu") + assert.Equal(t, isActive, true) +} + +func TestGetMetricsAndActivity_IsNotActive(t *testing.T) { + config := &scalersconfig.ScalerConfig{ + TriggerMetadata: validCPUMemoryMetadata, + ScalableObjectType: "ScaledObject", + ScalableObjectName: "test-name", + ScalableObjectNamespace: "test-namespace", + } + + hpa, err := createHPAWithAverageUtilization(30) + if err != nil { + t.Errorf("Error creating HPA: %s", err) + } + + err = kedav1alpha1.AddToScheme(scheme.Scheme) + if err != nil { + t.Errorf("Error adding to scheme: %s", err) + return + } + + kubeClient := fake.NewClientBuilder().WithObjects(hpa, createScaledObject()).WithScheme(scheme.Scheme).Build() + scaler, _ := NewCPUMemoryScaler(v1.ResourceCPU, config, kubeClient) + + _, isActive, _ := scaler.GetMetricsAndActivity(context.Background(), "cpu") + assert.Equal(t, isActive, false) +} diff --git a/pkg/scaling/scale_handler.go b/pkg/scaling/scale_handler.go index 5a955e48e66..6fb2b38dd0a 100644 --- a/pkg/scaling/scale_handler.go +++ b/pkg/scaling/scale_handler.go @@ -18,6 +18,7 @@ package scaling import ( "context" + "errors" "fmt" "strconv" "strings" @@ -748,60 +749,74 @@ func (*scaleHandler) getScalerState(ctx context.Context, scaler scalers.Scaler, } for _, spec := range metricSpecs { - if spec.External == nil { - continue - } + switch { + case spec.Resource != nil: + metricName := spec.Resource.Name.String() + _, isMetricActive, _, err := cache.GetMetricsAndActivityForScaler(ctx, triggerIndex, metricName) + if err != nil { + result.Err = err + logger.Error(err, "error getting metric source", "source", result.TriggerName, "metricName", metricName) + cache.Recorder.Event(scaledObject, corev1.EventTypeWarning, eventreason.KEDAMetricSourceFailed, err.Error()) + continue + } - metricName := spec.External.Metric.Name + metricscollector.RecordScalerError(scaledObject.Namespace, scaledObject.Name, result.TriggerName, triggerIndex, metricName, true, err) + metricscollector.RecordScalerActive(scaledObject.Namespace, scaledObject.Name, result.TriggerName, triggerIndex, metricName, true, isMetricActive) + result.IsActive = isMetricActive + case spec.External != nil: + metricName := spec.External.Metric.Name - var latency time.Duration - metrics, isMetricActive, latency, err := cache.GetMetricsAndActivityForScaler(ctx, triggerIndex, metricName) - metricscollector.RecordScalerError(scaledObject.Namespace, scaledObject.Name, result.TriggerName, triggerIndex, metricName, true, err) - if latency != -1 { - metricscollector.RecordScalerLatency(scaledObject.Namespace, scaledObject.Name, result.TriggerName, triggerIndex, metricName, true, latency) - } - result.Metrics = append(result.Metrics, metrics...) - logger.V(1).Info("Getting metrics and activity from scaler", "scaler", result.TriggerName, "metricName", metricName, "metrics", metrics, "activity", isMetricActive, "scalerError", err) - - if scalerConfig.TriggerUseCachedMetrics { - result.Records[metricName] = metricscache.MetricsRecord{ - IsActive: isMetricActive, - Metric: metrics, - ScalerError: err, + var latency time.Duration + metrics, isMetricActive, latency, err := cache.GetMetricsAndActivityForScaler(ctx, triggerIndex, metricName) + metricscollector.RecordScalerError(scaledObject.Namespace, scaledObject.Name, result.TriggerName, triggerIndex, metricName, true, err) + if latency != -1 { + metricscollector.RecordScalerLatency(scaledObject.Namespace, scaledObject.Name, result.TriggerName, triggerIndex, metricName, true, latency) + } + result.Metrics = append(result.Metrics, metrics...) + logger.V(1).Info("Getting metrics and activity from scaler", "scaler", result.TriggerName, "metricName", metricName, "metrics", metrics, "activity", isMetricActive, "scalerError", err) + + if scalerConfig.TriggerUseCachedMetrics { + result.Records[metricName] = metricscache.MetricsRecord{ + IsActive: isMetricActive, + Metric: metrics, + ScalerError: err, + } } - } - if err != nil { - result.Err = err - if scaledObject.IsUsingModifiers() { - logger.Error(err, "error getting metric source", "source", result.TriggerName) - cache.Recorder.Event(scaledObject, corev1.EventTypeWarning, eventreason.KEDAMetricSourceFailed, err.Error()) + if err != nil { + result.Err = err + if scaledObject.IsUsingModifiers() { + logger.Error(err, "error getting metric source", "source", result.TriggerName) + cache.Recorder.Event(scaledObject, corev1.EventTypeWarning, eventreason.KEDAMetricSourceFailed, err.Error()) + } else { + logger.Error(err, "error getting scale decision", "scaler", result.TriggerName) + cache.Recorder.Event(scaledObject, corev1.EventTypeWarning, eventreason.KEDAScalerFailed, err.Error()) + } } else { - logger.Error(err, "error getting scale decision", "scaler", result.TriggerName) - cache.Recorder.Event(scaledObject, corev1.EventTypeWarning, eventreason.KEDAScalerFailed, err.Error()) - } - } else { - result.IsActive = isMetricActive - for _, metric := range metrics { - metricValue := metric.Value.AsApproximateFloat64() - metricscollector.RecordScalerMetric(scaledObject.Namespace, scaledObject.Name, result.TriggerName, triggerIndex, metric.MetricName, true, metricValue) - } - if !scaledObject.IsUsingModifiers() { - if isMetricActive { - if spec.External != nil { - logger.V(1).Info("Scaler for scaledObject is active", "scaler", result.TriggerName, "metricName", metricName) - } - if spec.Resource != nil { - logger.V(1).Info("Scaler for scaledObject is active", "scaler", result.TriggerName, "metricName", spec.Resource.Name) + result.IsActive = isMetricActive + for _, metric := range metrics { + metricValue := metric.Value.AsApproximateFloat64() + metricscollector.RecordScalerMetric(scaledObject.Namespace, scaledObject.Name, result.TriggerName, triggerIndex, metric.MetricName, true, metricValue) + } + if !scaledObject.IsUsingModifiers() { + if isMetricActive { + if spec.External != nil { + logger.V(1).Info("Scaler for scaledObject is active", "scaler", result.TriggerName, "metricName", metricName) + } + if spec.Resource != nil { + logger.V(1).Info("Scaler for scaledObject is active", "scaler", result.TriggerName, "metricName", spec.Resource.Name) + } } + metricscollector.RecordScalerActive(scaledObject.Namespace, scaledObject.Name, result.TriggerName, triggerIndex, metricName, true, isMetricActive) } - metricscollector.RecordScalerActive(scaledObject.Namespace, scaledObject.Name, result.TriggerName, triggerIndex, metricName, true, isMetricActive) } - } - result.Pairs, err = modifiers.GetPairTriggerAndMetric(scaledObject, metricName, scalerConfig.TriggerName) - if err != nil { - logger.Error(err, "error pairing triggers & metrics for compositeScaler") + result.Pairs, err = modifiers.GetPairTriggerAndMetric(scaledObject, metricName, scalerConfig.TriggerName) + if err != nil { + logger.Error(err, "error pairing triggers & metrics for compositeScaler") + } + default: + logger.Error(errors.New("error parsing metric for the scaler"), "both resource and external metrics are nil", "scaler", result.TriggerName) } } return result diff --git a/pkg/scaling/scalers_builder.go b/pkg/scaling/scalers_builder.go index 1f4549c7ffa..330bf6d0c31 100644 --- a/pkg/scaling/scalers_builder.go +++ b/pkg/scaling/scalers_builder.go @@ -157,7 +157,7 @@ func buildScaler(ctx context.Context, client client.Client, triggerType string, case "couchdb": return scalers.NewCouchDBScaler(ctx, config) case "cpu": - return scalers.NewCPUMemoryScaler(corev1.ResourceCPU, config) + return scalers.NewCPUMemoryScaler(corev1.ResourceCPU, config, client) case "cron": return scalers.NewCronScaler(config) case "datadog": @@ -202,7 +202,7 @@ func buildScaler(ctx context.Context, client client.Client, triggerType string, case "loki": return scalers.NewLokiScaler(config) case "memory": - return scalers.NewCPUMemoryScaler(corev1.ResourceMemory, config) + return scalers.NewCPUMemoryScaler(corev1.ResourceMemory, config, client) case "metrics-api": return scalers.NewMetricsAPIScaler(config) case "mongodb": diff --git a/tests/scalers/cpu/cpu_test.go b/tests/scalers/cpu/cpu_test.go index f24922dc61d..a7f55bdef86 100644 --- a/tests/scalers/cpu/cpu_test.go +++ b/tests/scalers/cpu/cpu_test.go @@ -135,7 +135,8 @@ spec: - type: cpu metadata: type: Utilization - value: "50" + value: "10" + activationValue: "5" - type: kubernetes-workload metadata: podSelector: 'pod={{.WorkloadDeploymentName}}' @@ -245,9 +246,18 @@ func scaleToZero(t *testing.T, kc *kubernetes.Clientset, data templateData) { assert.True(t, WaitForDeploymentReplicaReadyCount(t, kc, deploymentName, testNamespace, maxReplicas, 60, 1), "Replica count should be %v", maxReplicas) - // scale external trigger in (expect replicas back to 0 -- external trigger not active) + // activate cpu trigger + KubectlReplaceWithTemplate(t, data, "triggerJobTemplate", triggerJob) + + // replica count should not change from maxReplicas + AssertReplicaCountNotChangeDuringTimePeriod(t, kc, deploymentName, testNamespace, maxReplicas, 60) + + // scale external trigger in (expect replicas to stay at maxReplicas -- external trigger not active) KubernetesScaleDeployment(t, kc, workloadDeploymentName, int64(minReplicas), testNamespace) + AssertReplicaCountNotChangeDuringTimePeriod(t, kc, deploymentName, testNamespace, maxReplicas, 60) + // remove trigger job to deactivate cpu trigger + KubectlDeleteWithTemplate(t, data, "triggerJobTemplate", triggerJob) assert.True(t, WaitForDeploymentReplicaReadyCount(t, kc, deploymentName, testNamespace, minReplicas, 60, 1), "Replica count should be %v", minReplicas) }