Skip to content

Commit

Permalink
Rebase from master
Browse files Browse the repository at this point in the history
Signed-off-by: dttung2905 <[email protected]>
  • Loading branch information
dttung2905 committed Jan 15, 2024
1 parent 1f6b9b2 commit 8b8a4f2
Show file tree
Hide file tree
Showing 4 changed files with 4 additions and 3 deletions.
1 change: 1 addition & 0 deletions pkg/scalers/apache_kafka_scaler.go
Original file line number Diff line number Diff line change
Expand Up @@ -189,6 +189,7 @@ func parseApacheKafkaAuthParams(config *ScalerConfig, meta *apacheKafkaMetadata)
if err != nil {
return fmt.Errorf("%w. No awsRegion given", err)
}
meta.awsRegion = awsRegion.(string)
auth, err := awsutils.GetAwsAuthorization(config.TriggerUniqueKey, config.PodIdentity, config.TriggerMetadata, config.AuthParams, config.ResolvedEnv)
if err != nil {
return err
Expand Down
1 change: 0 additions & 1 deletion pkg/scalers/apache_kafka_scaler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -288,7 +288,6 @@ func getBrokerApacheKafkaTestBase(t *testing.T, meta apacheKafkaMetadata, testDa
if er != nil {
t.Errorf("Unable to convert test data lagThreshold %s to string", testData.metadata["lagThreshold"])
}

if meta.lagThreshold != expectedLagThreshold && meta.lagThreshold != defaultKafkaLagThreshold {
t.Errorf("Expected lagThreshold to be either %v or %v got %v ", meta.lagThreshold, defaultKafkaLagThreshold, expectedLagThreshold)
}
Expand Down
3 changes: 2 additions & 1 deletion pkg/scalers/kafka_scaler.go
Original file line number Diff line number Diff line change
Expand Up @@ -419,7 +419,7 @@ func parseKafkaMetadata(config *ScalerConfig, logger logr.Logger) (kafkaMetadata
}
meta.offsetResetPolicy = policy
}

meta.lagThreshold = defaultKafkaLagThreshold
lagThreshold, err := getParameterFromConfigV2(config, lagThresholdMetricName, true, false, false, true, defaultKafkaLagThreshold, reflect.TypeOf(64))
if err != nil {
return meta, err
Expand Down Expand Up @@ -482,6 +482,7 @@ func parseKafkaMetadata(config *ScalerConfig, logger logr.Logger) (kafkaMetadata
if err != nil {
return meta, fmt.Errorf("error parsing kafka version: %w", err)
}
meta.version = version
meta.triggerIndex = config.TriggerIndex
return meta, nil
}
Expand Down
2 changes: 1 addition & 1 deletion pkg/scalers/kafka_scaler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -317,7 +317,7 @@ var kafkaMetricIdentifiers = []kafkaMetricIdentifier{
}

func TestGetBrokers(t *testing.T) {
for idx, testData := range parseKafkaMetadataTestDataset {
for _, testData := range parseKafkaMetadataTestDataset {
meta, err := parseKafkaMetadata(&ScalerConfig{TriggerMetadata: testData.metadata, AuthParams: validWithAuthParams}, logr.Discard())
getBrokerTestBase(t, meta, testData, err)

Expand Down

0 comments on commit 8b8a4f2

Please sign in to comment.