Skip to content

Commit

Permalink
Merge branch 'main' into main
Browse files Browse the repository at this point in the history
  • Loading branch information
VietND96 authored Nov 2, 2024
2 parents da13edf + 5062262 commit 81f2d30
Show file tree
Hide file tree
Showing 30 changed files with 109 additions and 92 deletions.
2 changes: 1 addition & 1 deletion .github/workflows/pr-e2e.yml
Original file line number Diff line number Diff line change
Expand Up @@ -181,7 +181,7 @@ jobs:
- name: Scale cluster
run: make scale-node-pool
env:
NODE_POOL_SIZE: 2
NODE_POOL_SIZE: 3
TEST_CLUSTER_NAME: keda-e2e-cluster-pr

- name: Run end to end tests
Expand Down
2 changes: 1 addition & 1 deletion .github/workflows/template-main-e2e-test.yml
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ jobs:
- name: Scale cluster
run: make scale-node-pool
env:
NODE_POOL_SIZE: 2
NODE_POOL_SIZE: 3

- name: Run end to end tests
env:
Expand Down
11 changes: 0 additions & 11 deletions pkg/scalers/cpu_memory_scaler.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,10 +29,6 @@ type cpuMemoryMetadata struct {
MetricType v2.MetricTargetType
}

func (m *cpuMemoryMetadata) Validate() error {
return nil
}

// NewCPUMemoryScaler creates a new cpuMemoryScaler
func NewCPUMemoryScaler(resourceName v1.ResourceName, config *scalersconfig.ScalerConfig) (Scaler, error) {
logger := InitializeLogger(config, "cpu_memory_scaler")
Expand All @@ -42,13 +38,6 @@ func NewCPUMemoryScaler(resourceName v1.ResourceName, config *scalersconfig.Scal
return nil, fmt.Errorf("error parsing %s metadata: %w", resourceName, err)
}

if err := meta.Validate(); err != nil {
if meta.MetricType == "" {
return nil, fmt.Errorf("metricType is required")
}
return nil, fmt.Errorf("validation error: %w", err)
}

return &cpuMemoryScaler{
metadata: meta,
resourceName: resourceName,
Expand Down
16 changes: 4 additions & 12 deletions pkg/scalers/kubernetes_workload_scaler.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ var phasesCountedAsTerminated = []corev1.PodPhase{

type kubernetesWorkloadMetadata struct {
PodSelector string `keda:"name=podSelector, order=triggerMetadata"`
Value float64 `keda:"name=value, order=triggerMetadata"`
Value float64 `keda:"name=value, order=triggerMetadata, default=0"`
ActivationValue float64 `keda:"name=activationValue, order=triggerMetadata, default=0"`

namespace string
Expand Down Expand Up @@ -72,17 +72,13 @@ func NewKubernetesWorkloadScaler(kubeClient client.Client, config *scalersconfig

func parseKubernetesWorkloadMetadata(config *scalersconfig.ScalerConfig) (kubernetesWorkloadMetadata, error) {
meta := kubernetesWorkloadMetadata{}
err := config.TypedConfig(&meta)
if err != nil {
return meta, fmt.Errorf("error parsing kubernetes workload metadata: %w", err)
}

meta.namespace = config.ScalableObjectNamespace
meta.triggerIndex = config.TriggerIndex
meta.asMetricSource = config.AsMetricSource

if meta.asMetricSource {
meta.Value = 0
err := config.TypedConfig(&meta)
if err != nil {
return meta, fmt.Errorf("error parsing kubernetes workload metadata: %w", err)
}

selector, err := labels.Parse(meta.PodSelector)
Expand All @@ -91,10 +87,6 @@ func parseKubernetesWorkloadMetadata(config *scalersconfig.ScalerConfig) (kubern
}
meta.podSelector = selector

if err := meta.Validate(); err != nil {
return meta, err
}

return meta, nil
}

Expand Down
12 changes: 12 additions & 0 deletions tests/helper/helper.go
Original file line number Diff line number Diff line change
Expand Up @@ -253,6 +253,7 @@ func DeleteNamespace(t *testing.T, nsName string) {
err = nil
}
assert.NoErrorf(t, err, "cannot delete kubernetes namespace - %s", err)
DeletePodsInNamespace(t, nsName)
}

func WaitForJobSuccess(t *testing.T, kc *kubernetes.Clientset, jobName, namespace string, iterations, interval int) bool {
Expand Down Expand Up @@ -744,6 +745,17 @@ func DeletePodsInNamespaceBySelector(t *testing.T, kc *kubernetes.Clientset, sel
assert.NoErrorf(t, err, "cannot delete pods - %s", err)
}

// Delete all pods in namespace
func DeletePodsInNamespace(t *testing.T, namespace string) {
err := GetKubernetesClient(t).CoreV1().Pods(namespace).DeleteCollection(context.Background(), metav1.DeleteOptions{
GracePeriodSeconds: ptr.To(int64(0)),
}, metav1.ListOptions{})
if errors.IsNotFound(err) {
err = nil
}
assert.NoErrorf(t, err, "cannot delete pods - %s", err)
}

// Wait for Pods identified by selector to complete termination
func WaitForPodsTerminated(t *testing.T, kc *kubernetes.Clientset, selector, namespace string, iterations, intervalSeconds int) bool {
for i := 0; i < iterations; i++ {
Expand Down
8 changes: 4 additions & 4 deletions tests/internals/cache_metrics/cache_metrics_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -160,8 +160,8 @@ func testCacheMetricsOnPollingInterval(t *testing.T, kc *kubernetes.Clientset, d

// Metric Value = 8, DesiredAverageMetricValue = 2
// should scale in to 8/2 = 4 replicas, irrespective of current replicas
assert.True(t, WaitForDeploymentReplicaReadyCount(t, kc, deploymentName, testNamespace, 4, 60, 1),
"replica count should be 4 after 1 minute")
assert.True(t, WaitForDeploymentReplicaReadyCount(t, kc, deploymentName, testNamespace, 4, 60, 3),
"replica count should be 4 after 3 minute")

// Changing Metric Value to 4, but because we have a long polling interval, the replicas number should remain the same
data.MonitoredDeploymentReplicas = 4
Expand Down Expand Up @@ -196,8 +196,8 @@ func testDirectQuery(t *testing.T, kc *kubernetes.Clientset, data templateData)

// Metric Value = 8, DesiredAverageMetricValue = 2
// should scale in to 8/2 = 4 replicas, irrespective of current replicas
assert.True(t, WaitForDeploymentReplicaReadyCount(t, kc, deploymentName, testNamespace, 4, 60, 1),
"replica count should be 4 after 1 minute")
assert.True(t, WaitForDeploymentReplicaReadyCount(t, kc, deploymentName, testNamespace, 4, 60, 3),
"replica count should be 4 after 3 minute")

// Changing Metric Value to 4, deployment should scale to 2
data.MonitoredDeploymentReplicas = 4
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -265,21 +265,25 @@ func checkMessage(t *testing.T, count int, client *azservicebus.Client) {
if err != nil {
assert.NoErrorf(t, err, "cannot create receiver - %s", err)
}
defer receiver.Close(context.TODO())

messages, err := receiver.ReceiveMessages(context.TODO(), count, nil)
assert.NoErrorf(t, err, "cannot receive messages - %s", err)
assert.NotEmpty(t, messages)
defer receiver.Close(context.Background())

// We try to read the messages 3 times with a second of delay
tries := 3
found := false
for _, message := range messages {
event := messaging.CloudEvent{}
err = json.Unmarshal(message.Body, &event)
assert.NoErrorf(t, err, "cannot retrieve message - %s", err)
if expectedSubject == *event.Subject &&
expectedSource == event.Source &&
expectedType == event.Type {
found = true
for i := 0; i < tries && !found; i++ {
messages, err := receiver.ReceiveMessages(context.Background(), count, nil)
assert.NoErrorf(t, err, "cannot receive messages - %s", err)
assert.NotEmpty(t, messages)

for _, message := range messages {
event := messaging.CloudEvent{}
err = json.Unmarshal(message.Body, &event)
assert.NoErrorf(t, err, "cannot retrieve message - %s", err)
if expectedSubject == *event.Subject &&
expectedSource == event.Source &&
expectedType == event.Type {
found = true
}
}
}

Expand Down
4 changes: 2 additions & 2 deletions tests/internals/idle_replicas/idle_replicas_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -147,8 +147,8 @@ func testScaleOut(t *testing.T, kc *kubernetes.Clientset) {

t.Log("--- scale to max replicas ---")
KubernetesScaleDeployment(t, kc, monitoredDeploymentName, 4, testNamespace)
assert.True(t, WaitForDeploymentReplicaReadyCount(t, kc, deploymentName, testNamespace, 4, 60, 1),
"replica count should be 4 after 1 minute")
assert.True(t, WaitForDeploymentReplicaReadyCount(t, kc, deploymentName, testNamespace, 4, 60, 3),
"replica count should be 4 after 3 minute")
}

func testScaleIn(t *testing.T, kc *kubernetes.Clientset) {
Expand Down
4 changes: 2 additions & 2 deletions tests/internals/restore_original/restore_original_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -138,8 +138,8 @@ func testScale(t *testing.T, kc *kubernetes.Clientset, data templateData) {
t.Log("--- testing scaling ---")
KubectlApplyWithTemplate(t, data, "scaledObjectTemplate", scaledObjectTemplate)

assert.True(t, WaitForDeploymentReplicaReadyCount(t, kc, deploymentName, testNamespace, 4, 60, 1),
"replica count should be 4 after 1 minute")
assert.True(t, WaitForDeploymentReplicaReadyCount(t, kc, deploymentName, testNamespace, 4, 60, 3),
"replica count should be 4 after 3 minute")
}

func testRestore(t *testing.T, kc *kubernetes.Clientset, data templateData) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ import (
)

const (
testName = "scaled-object-validation-test"
testName = "scaled-job-validation-test"
)

var (
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -100,8 +100,11 @@ func TestScalingStrategy(t *testing.T) {
})

RMQInstall(t, kc, rmqNamespace, user, password, vhost, WithoutOAuth())
CreateKubernetesResources(t, kc, testNamespace, data, templates)
// Publish 0 messges but create the queue
RMQPublishMessages(t, rmqNamespace, connectionString, queueName, 0)
WaitForAllJobsSuccess(t, kc, rmqNamespace, 60, 1)

CreateKubernetesResources(t, kc, testNamespace, data, templates)
testEagerScaling(t, kc)
}

Expand All @@ -121,14 +124,17 @@ func getTemplateData() (templateData, []Template) {
func testEagerScaling(t *testing.T, kc *kubernetes.Clientset) {
iterationCount := 20
RMQPublishMessages(t, rmqNamespace, connectionString, queueName, 4)
WaitForAllJobsSuccess(t, kc, rmqNamespace, 60, 1)
assert.True(t, WaitForScaledJobCount(t, kc, scaledJobName, testNamespace, 4, iterationCount, 1),
"job count should be %d after %d iterations", 4, iterationCount)

RMQPublishMessages(t, rmqNamespace, connectionString, queueName, 4)
WaitForAllJobsSuccess(t, kc, rmqNamespace, 60, 1)
assert.True(t, WaitForScaledJobCount(t, kc, scaledJobName, testNamespace, 8, iterationCount, 1),
"job count should be %d after %d iterations", 8, iterationCount)

RMQPublishMessages(t, rmqNamespace, connectionString, queueName, 4)
RMQPublishMessages(t, rmqNamespace, connectionString, queueName, 8)
WaitForAllJobsSuccess(t, kc, rmqNamespace, 60, 1)
assert.True(t, WaitForScaledJobCount(t, kc, scaledJobName, testNamespace, 10, iterationCount, 1),
"job count should be %d after %d iterations", 10, iterationCount)
}
4 changes: 2 additions & 2 deletions tests/internals/value_metric_type/value_metric_type_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -149,8 +149,8 @@ func testScaleByAverageValue(t *testing.T, kc *kubernetes.Clientset, data templa

// Metric Value = 8, DesiredAverageMetricValue = 2
// should scale in to 8/2 = 4 replicas, irrespective of current replicas
assert.True(t, WaitForDeploymentReplicaReadyCount(t, kc, deploymentName, testNamespace, 4, 60, 1),
"replica count should be 4 after 1 minute")
assert.True(t, WaitForDeploymentReplicaReadyCount(t, kc, deploymentName, testNamespace, 4, 60, 3),
"replica count should be 4 after 3 minute")

KubectlDeleteWithTemplate(t, data, "scaledObjectTemplate", scaledObjectTemplate)
}
Expand Down
4 changes: 2 additions & 2 deletions tests/run-all.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,11 +25,11 @@ import (
)

var (
concurrentTests = 15
concurrentTests = 25
regularTestsTimeout = "20m"
regularTestsRetries = 3
sequentialTestsTimeout = "20m"
sequentialTestsRetries = 1
sequentialTestsRetries = 2
)

type TestResult struct {
Expand Down
21 changes: 15 additions & 6 deletions tests/scalers/artemis/artemis_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ type templateData struct {
SecretName string
ArtemisPasswordBase64 string
ArtemisUserBase64 string
MessageCount int
}

const (
Expand Down Expand Up @@ -87,8 +88,8 @@ spec:
spec:
containers:
- name: kedartemis-consumer
image: balchu/kedartemis-consumer
imagePullPolicy: Always
image: ghcr.io/kedacore/tests-artemis
args: ["consumer"]
env:
- name: ARTEMIS_PASSWORD
valueFrom:
Expand All @@ -100,10 +101,12 @@ spec:
secretKeyRef:
name: {{.SecretName}}
key: artemis-username
- name: ARTEMIS_HOST
- name: ARTEMIS_SERVER_HOST
value: "artemis-activemq.{{.TestNamespace}}"
- name: ARTEMIS_PORT
- name: ARTEMIS_SERVER_PORT
value: "61616"
- name: ARTEMIS_MESSAGE_SLEEP_MS
value: "70"
`

artemisDeploymentTemplate = `apiVersion: apps/v1
Expand Down Expand Up @@ -260,7 +263,7 @@ spec:
managementEndpoint: "artemis-activemq.{{.TestNamespace}}:8161"
queueName: "test"
queueLength: "50"
activationQueueLength: "1500"
activationQueueLength: "5"
brokerName: "artemis-activemq"
brokerAddress: "test"
authenticationRef:
Expand All @@ -279,7 +282,8 @@ spec:
spec:
containers:
- name: artemis-producer
image: balchu/artemis-producer:0.0.1
image: ghcr.io/kedacore/tests-artemis
args: ["producer"]
env:
- name: ARTEMIS_PASSWORD
valueFrom:
Expand All @@ -295,6 +299,8 @@ spec:
value: "artemis-activemq.{{.TestNamespace}}"
- name: ARTEMIS_SERVER_PORT
value: "61616"
- name: ARTEMIS_MESSAGE_COUNT
value: "{{.MessageCount}}"
restartPolicy: Never
backoffLimit: 4
`
Expand All @@ -321,13 +327,15 @@ func TestArtemisScaler(t *testing.T) {

func testActivation(t *testing.T, kc *kubernetes.Clientset, data templateData) {
t.Log("--- testing activation ---")
data.MessageCount = 1
KubectlReplaceWithTemplate(t, data, "triggerJobTemplate", producerJob)

AssertReplicaCountNotChangeDuringTimePeriod(t, kc, deploymentName, testNamespace, minReplicaCount, 60)
}

func testScaleOut(t *testing.T, kc *kubernetes.Clientset, data templateData) {
t.Log("--- testing scale out ---")
data.MessageCount = 1000
KubectlReplaceWithTemplate(t, data, "triggerJobTemplate", producerJob)

assert.True(t, WaitForDeploymentReplicaReadyCount(t, kc, deploymentName, testNamespace, maxReplicaCount, 60, 3),
Expand All @@ -349,6 +357,7 @@ func getTemplateData() (templateData, []Template) {
SecretName: secretName,
ArtemisPasswordBase64: base64.StdEncoding.EncodeToString([]byte(artemisPassword)),
ArtemisUserBase64: base64.StdEncoding.EncodeToString([]byte(artemisUser)),
MessageCount: 0,
}, []Template{
{Name: "secretTemplate", Config: secretTemplate},
{Name: "triggerAuthenticationTemplate", Config: triggerAuthenticationTemplate},
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ import (
var _ = godotenv.Load("../../../.env")

const (
testName = "azure-event-hub-dapr"
testName = "azure-event-hub-dapr-wi"
eventhubConsumerGroup = "$Default"
)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -202,8 +202,8 @@ func testScale(t *testing.T, kc *kubernetes.Clientset, client *azservicebus.Clie
// check different aggregation operations
data.Operation = "max"
KubectlApplyWithTemplate(t, data, "scaledObjectTemplate", scaledObjectTemplate)
assert.True(t, WaitForDeploymentReplicaReadyCount(t, kc, deploymentName, testNamespace, 4, 60, 1),
"replica count should be 4 after 1 minute")
assert.True(t, WaitForDeploymentReplicaReadyCount(t, kc, deploymentName, testNamespace, 4, 60, 3),
"replica count should be 4 after 3 minute")

data.Operation = "avg"
KubectlApplyWithTemplate(t, data, "scaledObjectTemplate", scaledObjectTemplate)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -225,8 +225,8 @@ func testScale(t *testing.T, kc *kubernetes.Clientset, client *azservicebus.Clie
// check different aggregation operations
data.Operation = "max"
KubectlApplyWithTemplate(t, data, "scaledObjectTemplate", scaledObjectTemplate)
assert.True(t, WaitForDeploymentReplicaReadyCount(t, kc, deploymentName, testNamespace, 4, 60, 1),
"replica count should be 4 after 1 minute")
assert.True(t, WaitForDeploymentReplicaReadyCount(t, kc, deploymentName, testNamespace, 4, 60, 3),
"replica count should be 4 after 3 minute")

data.Operation = "avg"
KubectlApplyWithTemplate(t, data, "scaledObjectTemplate", scaledObjectTemplate)
Expand Down
4 changes: 2 additions & 2 deletions tests/scalers/elasticsearch/elasticsearch_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,7 @@ metadata:
labels:
app: {{.DeploymentName}}
spec:
replicas: 1
replicas: 0
selector:
matchLabels:
app: {{.DeploymentName}}
Expand Down Expand Up @@ -397,7 +397,7 @@ func testElasticsearchScaler(t *testing.T, kc *kubernetes.Clientset) {
AssertReplicaCountNotChangeDuringTimePeriod(t, kc, deploymentName, testNamespace, minReplicaCount, 60)

t.Log("--- testing scale out ---")
addElements(t, 5)
addElements(t, 10)

assert.True(t, WaitForDeploymentReplicaReadyCount(t, kc, deploymentName, testNamespace, maxReplicaCount, 60, 3),
"replica count should be %d after 3 minutes", maxReplicaCount)
Expand Down
Loading

0 comments on commit 81f2d30

Please sign in to comment.