diff --git a/pkg/scalers/apache_kafka_scaler.go b/pkg/scalers/apache_kafka_scaler.go index 7680c9e46b4..c1757fe76b0 100644 --- a/pkg/scalers/apache_kafka_scaler.go +++ b/pkg/scalers/apache_kafka_scaler.go @@ -189,6 +189,7 @@ func parseApacheKafkaAuthParams(config *ScalerConfig, meta *apacheKafkaMetadata) if err != nil { return fmt.Errorf("%w. No awsRegion given", err) } + meta.awsRegion = awsRegion.(string) auth, err := awsutils.GetAwsAuthorization(config.TriggerUniqueKey, config.PodIdentity, config.TriggerMetadata, config.AuthParams, config.ResolvedEnv) if err != nil { return err diff --git a/pkg/scalers/apache_kafka_scaler_test.go b/pkg/scalers/apache_kafka_scaler_test.go index a056c11cbe5..53a1c9111ed 100644 --- a/pkg/scalers/apache_kafka_scaler_test.go +++ b/pkg/scalers/apache_kafka_scaler_test.go @@ -288,7 +288,6 @@ func getBrokerApacheKafkaTestBase(t *testing.T, meta apacheKafkaMetadata, testDa if er != nil { t.Errorf("Unable to convert test data lagThreshold %s to string", testData.metadata["lagThreshold"]) } - if meta.lagThreshold != expectedLagThreshold && meta.lagThreshold != defaultKafkaLagThreshold { t.Errorf("Expected lagThreshold to be either %v or %v got %v ", meta.lagThreshold, defaultKafkaLagThreshold, expectedLagThreshold) } diff --git a/pkg/scalers/kafka_scaler.go b/pkg/scalers/kafka_scaler.go index 8d982b5063e..7e6592a1980 100644 --- a/pkg/scalers/kafka_scaler.go +++ b/pkg/scalers/kafka_scaler.go @@ -419,7 +419,7 @@ func parseKafkaMetadata(config *ScalerConfig, logger logr.Logger) (kafkaMetadata } meta.offsetResetPolicy = policy } - + meta.lagThreshold = defaultKafkaLagThreshold lagThreshold, err := getParameterFromConfigV2(config, lagThresholdMetricName, true, false, false, true, defaultKafkaLagThreshold, reflect.TypeOf(64)) if err != nil { return meta, err @@ -482,6 +482,7 @@ func parseKafkaMetadata(config *ScalerConfig, logger logr.Logger) (kafkaMetadata if err != nil { return meta, fmt.Errorf("error parsing kafka version: %w", err) } + meta.version = version meta.triggerIndex = config.TriggerIndex return meta, nil } diff --git a/pkg/scalers/kafka_scaler_test.go b/pkg/scalers/kafka_scaler_test.go index 31e733e52b5..93e08e31622 100644 --- a/pkg/scalers/kafka_scaler_test.go +++ b/pkg/scalers/kafka_scaler_test.go @@ -317,7 +317,7 @@ var kafkaMetricIdentifiers = []kafkaMetricIdentifier{ } func TestGetBrokers(t *testing.T) { - for idx, testData := range parseKafkaMetadataTestDataset { + for _, testData := range parseKafkaMetadataTestDataset { meta, err := parseKafkaMetadata(&ScalerConfig{TriggerMetadata: testData.metadata, AuthParams: validWithAuthParams}, logr.Discard()) getBrokerTestBase(t, meta, testData, err)