From 29400ed2816fe7388a021ff14f5cb405b7deaa85 Mon Sep 17 00:00:00 2001 From: rickbrouwer Date: Tue, 26 Nov 2024 19:29:11 +0100 Subject: [PATCH 1/3] Deprecate Stan scaler (#6363) Signed-off-by: rickbrouwer --- CHANGELOG.md | 2 +- pkg/scalers/stan_scaler.go | 1 + 2 files changed, 2 insertions(+), 1 deletion(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 7da6fd76866..8660a1b2123 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -82,7 +82,7 @@ You can find all deprecations in [this overview](https://github.com/kedacore/ked New deprecation(s): -- TODO ([#XXX](https://github.com/kedacore/keda/issues/XXX)) +- **NATS Streaming scaler**: Deprecate NATS Streaming Server (aka Stan) ([#6362](https://github.com/kedacore/keda/issues/6362)) ### Breaking Changes diff --git a/pkg/scalers/stan_scaler.go b/pkg/scalers/stan_scaler.go index 701073e7d12..b4930d7bc31 100644 --- a/pkg/scalers/stan_scaler.go +++ b/pkg/scalers/stan_scaler.go @@ -278,6 +278,7 @@ func (s *stanScaler) GetMetricsAndActivity(ctx context.Context, metricName strin } totalLag := s.getMaxMsgLag() s.logger.V(1).Info("Stan scaler: Providing metrics based on totalLag, threshold", "totalLag", totalLag, "lagThreshold", s.metadata.lagThreshold) + s.logger.Info("The Stan scaler (NATS Streaming) is DEPRECATED and will be removed in v2.19 - Use scaler 'nats-jetstream' instead") metric := GenerateMetricInMili(metricName, float64(totalLag)) From 5c58849f2eb38409d60c418e81664c970127dbb5 Mon Sep 17 00:00:00 2001 From: Omer Aplatony Date: Wed, 27 Nov 2024 23:36:44 +0200 Subject: [PATCH 2/3] refactor aws sqs queue scaler configuration (#6358) Signed-off-by: Omer Aplatony --- pkg/scalers/aws_sqs_queue_scaler.go | 115 +++++------------------ pkg/scalers/aws_sqs_queue_scaler_test.go | 22 ++--- 2 files changed, 34 insertions(+), 103 deletions(-) diff --git a/pkg/scalers/aws_sqs_queue_scaler.go b/pkg/scalers/aws_sqs_queue_scaler.go index 1de9bf7f285..6f1b6b5d0ee 100644 --- a/pkg/scalers/aws_sqs_queue_scaler.go +++ b/pkg/scalers/aws_sqs_queue_scaler.go @@ -19,14 +19,6 @@ import ( kedautil "github.com/kedacore/keda/v2/pkg/util" ) -const ( - defaultTargetQueueLength = 5 - targetQueueLengthDefault = 5 - activationTargetQueueLengthDefault = 0 - defaultScaleOnInFlight = true - defaultScaleOnDelayed = false -) - type awsSqsQueueScaler struct { metricType v2.MetricTargetType metadata *awsSqsQueueMetadata @@ -35,16 +27,16 @@ type awsSqsQueueScaler struct { } type awsSqsQueueMetadata struct { - targetQueueLength int64 - activationTargetQueueLength int64 - queueURL string + TargetQueueLength int64 `keda:"name=queueLength, order=triggerMetadata, default=5"` + ActivationTargetQueueLength int64 `keda:"name=activationQueueLength, order=triggerMetadata, default=0"` + QueueURL string `keda:"name=queueURL;queueURLFromEnv, order=triggerMetadata;resolvedEnv"` queueName string - awsRegion string - awsEndpoint string + AwsRegion string `keda:"name=awsRegion, order=triggerMetadata"` + AwsEndpoint string `keda:"name=awsEndpoint, order=triggerMetadata, optional"` awsAuthorization awsutils.AuthorizationMetadata triggerIndex int - scaleOnInFlight bool - scaleOnDelayed bool + ScaleOnInFlight bool `keda:"name=scaleOnInFlight, order=triggerMetadata, default=true"` + ScaleOnDelayed bool `keda:"name=scaleOnDelayed, order=triggerMetadata, default=false"` awsSqsQueueMetricNames []types.QueueAttributeName } @@ -57,7 +49,7 @@ func NewAwsSqsQueueScaler(ctx context.Context, config *scalersconfig.ScalerConfi logger := InitializeLogger(config, "aws_sqs_queue_scaler") - meta, err := parseAwsSqsQueueMetadata(config, logger) + meta, err := parseAwsSqsQueueMetadata(config) if err != nil { return nil, fmt.Errorf("error parsing SQS queue metadata: %w", err) } @@ -87,77 +79,26 @@ func (w sqsWrapperClient) GetQueueAttributes(ctx context.Context, params *sqs.Ge return w.sqsClient.GetQueueAttributes(ctx, params, optFns...) } -func parseAwsSqsQueueMetadata(config *scalersconfig.ScalerConfig, logger logr.Logger) (*awsSqsQueueMetadata, error) { - meta := awsSqsQueueMetadata{} - meta.targetQueueLength = defaultTargetQueueLength - meta.scaleOnInFlight = defaultScaleOnInFlight - meta.scaleOnDelayed = defaultScaleOnDelayed - - if val, ok := config.TriggerMetadata["queueLength"]; ok && val != "" { - queueLength, err := strconv.ParseInt(val, 10, 64) - if err != nil { - meta.targetQueueLength = targetQueueLengthDefault - logger.Error(err, "Error parsing SQS queue metadata queueLength, using default %n", targetQueueLengthDefault) - } else { - meta.targetQueueLength = queueLength - } - } - - if val, ok := config.TriggerMetadata["activationQueueLength"]; ok && val != "" { - activationQueueLength, err := strconv.ParseInt(val, 10, 64) - if err != nil { - meta.activationTargetQueueLength = activationTargetQueueLengthDefault - logger.Error(err, "Error parsing SQS queue metadata activationQueueLength, using default %n", activationTargetQueueLengthDefault) - } else { - meta.activationTargetQueueLength = activationQueueLength - } - } - - if val, ok := config.TriggerMetadata["scaleOnDelayed"]; ok && val != "" { - scaleOnDelayed, err := strconv.ParseBool(val) - if err != nil { - meta.scaleOnDelayed = defaultScaleOnDelayed - logger.Error(err, "Error parsing SQS queue metadata scaleOnDelayed, using default %n", defaultScaleOnDelayed) - } else { - meta.scaleOnDelayed = scaleOnDelayed - } - } +func parseAwsSqsQueueMetadata(config *scalersconfig.ScalerConfig) (*awsSqsQueueMetadata, error) { + meta := &awsSqsQueueMetadata{} - if val, ok := config.TriggerMetadata["scaleOnInFlight"]; ok && val != "" { - scaleOnInFlight, err := strconv.ParseBool(val) - if err != nil { - meta.scaleOnInFlight = defaultScaleOnInFlight - logger.Error(err, "Error parsing SQS queue metadata scaleOnInFlight, using default %n", defaultScaleOnInFlight) - } else { - meta.scaleOnInFlight = scaleOnInFlight - } + if err := config.TypedConfig(meta); err != nil { + return nil, fmt.Errorf("error parsing SQS queue metadata: %w", err) } meta.awsSqsQueueMetricNames = []types.QueueAttributeName{} meta.awsSqsQueueMetricNames = append(meta.awsSqsQueueMetricNames, types.QueueAttributeNameApproximateNumberOfMessages) - if meta.scaleOnInFlight { + if meta.ScaleOnInFlight { meta.awsSqsQueueMetricNames = append(meta.awsSqsQueueMetricNames, types.QueueAttributeNameApproximateNumberOfMessagesNotVisible) } - if meta.scaleOnDelayed { + if meta.ScaleOnDelayed { meta.awsSqsQueueMetricNames = append(meta.awsSqsQueueMetricNames, types.QueueAttributeNameApproximateNumberOfMessagesDelayed) } - if val, ok := config.TriggerMetadata["queueURL"]; ok && val != "" { - meta.queueURL = val - } else if val, ok := config.TriggerMetadata["queueURLFromEnv"]; ok && val != "" { - if val, ok := config.ResolvedEnv[val]; ok && val != "" { - meta.queueURL = val - } else { - return nil, fmt.Errorf("queueURLFromEnv `%s` env variable value is empty", config.TriggerMetadata["queueURLFromEnv"]) - } - } else { - return nil, fmt.Errorf("no queueURL given") - } - - queueURL, err := url.ParseRequestURI(meta.queueURL) + queueURL, err := url.ParseRequestURI(meta.QueueURL) if err != nil { // queueURL is not a valid URL, using it as queueName - meta.queueName = meta.queueURL + meta.queueName = meta.QueueURL } else { queueURLPath := queueURL.Path queueURLPathParts := strings.Split(queueURLPath, "/") @@ -168,16 +109,6 @@ func parseAwsSqsQueueMetadata(config *scalersconfig.ScalerConfig, logger logr.Lo meta.queueName = queueURLPathParts[2] } - if val, ok := config.TriggerMetadata["awsRegion"]; ok && val != "" { - meta.awsRegion = val - } else { - return nil, fmt.Errorf("no awsRegion given") - } - - if val, ok := config.TriggerMetadata["awsEndpoint"]; ok { - meta.awsEndpoint = val - } - auth, err := awsutils.GetAwsAuthorization(config.TriggerUniqueKey, config.PodIdentity, config.TriggerMetadata, config.AuthParams, config.ResolvedEnv) if err != nil { return nil, err @@ -187,17 +118,17 @@ func parseAwsSqsQueueMetadata(config *scalersconfig.ScalerConfig, logger logr.Lo meta.triggerIndex = config.TriggerIndex - return &meta, nil + return meta, nil } func createSqsClient(ctx context.Context, metadata *awsSqsQueueMetadata) (*sqs.Client, error) { - cfg, err := awsutils.GetAwsConfig(ctx, metadata.awsRegion, metadata.awsAuthorization) + cfg, err := awsutils.GetAwsConfig(ctx, metadata.AwsRegion, metadata.awsAuthorization) if err != nil { return nil, err } return sqs.NewFromConfig(*cfg, func(options *sqs.Options) { - if metadata.awsEndpoint != "" { - options.BaseEndpoint = aws.String(metadata.awsEndpoint) + if metadata.AwsEndpoint != "" { + options.BaseEndpoint = aws.String(metadata.AwsEndpoint) } }), nil } @@ -212,7 +143,7 @@ func (s *awsSqsQueueScaler) GetMetricSpecForScaling(context.Context) []v2.Metric Metric: v2.MetricIdentifier{ Name: GenerateMetricNameWithIndex(s.metadata.triggerIndex, kedautil.NormalizeString(fmt.Sprintf("aws-sqs-%s", s.metadata.queueName))), }, - Target: GetMetricTarget(s.metricType, s.metadata.targetQueueLength), + Target: GetMetricTarget(s.metricType, s.metadata.TargetQueueLength), } metricSpec := v2.MetricSpec{External: externalMetric, Type: externalMetricType} return []v2.MetricSpec{metricSpec} @@ -229,14 +160,14 @@ func (s *awsSqsQueueScaler) GetMetricsAndActivity(ctx context.Context, metricNam metric := GenerateMetricInMili(metricName, float64(queuelen)) - return []external_metrics.ExternalMetricValue{metric}, queuelen > s.metadata.activationTargetQueueLength, nil + return []external_metrics.ExternalMetricValue{metric}, queuelen > s.metadata.ActivationTargetQueueLength, nil } // Get SQS Queue Length func (s *awsSqsQueueScaler) getAwsSqsQueueLength(ctx context.Context) (int64, error) { input := &sqs.GetQueueAttributesInput{ AttributeNames: s.metadata.awsSqsQueueMetricNames, - QueueUrl: aws.String(s.metadata.queueURL), + QueueUrl: aws.String(s.metadata.QueueURL), } output, err := s.sqsWrapperClient.GetQueueAttributes(ctx, input) diff --git a/pkg/scalers/aws_sqs_queue_scaler_test.go b/pkg/scalers/aws_sqs_queue_scaler_test.go index 6e0b065ab07..fb5db401ecc 100644 --- a/pkg/scalers/aws_sqs_queue_scaler_test.go +++ b/pkg/scalers/aws_sqs_queue_scaler_test.go @@ -144,8 +144,8 @@ var testAWSSQSMetadata = []parseAWSSQSMetadataTestData{ "awsRegion": "eu-west-1"}, testAWSSQSAuthentication, testAWSSQSEmptyResolvedEnv, - false, - "properly formed queue, invalid queueLength"}, + true, + "invalid integer value for queueLength"}, {map[string]string{ "queueURL": testAWSSQSProperQueueURL, "queueLength": "1", @@ -162,8 +162,8 @@ var testAWSSQSMetadata = []parseAWSSQSMetadataTestData{ "awsRegion": "eu-west-1"}, testAWSSQSAuthentication, testAWSSQSEmptyResolvedEnv, - false, - "properly formed queue, invalid activationQueueLength"}, + true, + "invalid integer value for activationQueueLength"}, {map[string]string{ "queueURL": testAWSSQSProperQueueURL, "queueLength": "1", @@ -304,7 +304,7 @@ var testAWSSQSMetadata = []parseAWSSQSMetadataTestData{ map[string]string{ "QUEUE_URL": "", }, - true, + false, "empty QUEUE_URL env value"}, } @@ -392,7 +392,7 @@ var awsSQSGetMetricTestData = []*parseAWSSQSMetadataTestData{ func TestSQSParseMetadata(t *testing.T) { for _, testData := range testAWSSQSMetadata { - _, err := parseAwsSqsQueueMetadata(&scalersconfig.ScalerConfig{TriggerMetadata: testData.metadata, ResolvedEnv: testData.resolvedEnv, AuthParams: testData.authParams}, logr.Discard()) + _, err := parseAwsSqsQueueMetadata(&scalersconfig.ScalerConfig{TriggerMetadata: testData.metadata, ResolvedEnv: testData.resolvedEnv, AuthParams: testData.authParams}) if err != nil && !testData.isError { t.Errorf("Expected success because %s got error, %s", testData.comment, err) } @@ -405,7 +405,7 @@ func TestSQSParseMetadata(t *testing.T) { func TestAWSSQSGetMetricSpecForScaling(t *testing.T) { for _, testData := range awsSQSMetricIdentifiers { ctx := context.Background() - meta, err := parseAwsSqsQueueMetadata(&scalersconfig.ScalerConfig{TriggerMetadata: testData.metadataTestData.metadata, ResolvedEnv: testData.metadataTestData.resolvedEnv, AuthParams: testData.metadataTestData.authParams, TriggerIndex: testData.triggerIndex}, logr.Discard()) + meta, err := parseAwsSqsQueueMetadata(&scalersconfig.ScalerConfig{TriggerMetadata: testData.metadataTestData.metadata, ResolvedEnv: testData.metadataTestData.resolvedEnv, AuthParams: testData.metadataTestData.authParams, TriggerIndex: testData.triggerIndex}) if err != nil { t.Fatal("Could not parse metadata:", err) } @@ -421,24 +421,24 @@ func TestAWSSQSGetMetricSpecForScaling(t *testing.T) { func TestAWSSQSScalerGetMetrics(t *testing.T) { for index, testData := range awsSQSGetMetricTestData { - meta, err := parseAwsSqsQueueMetadata(&scalersconfig.ScalerConfig{TriggerMetadata: testData.metadata, ResolvedEnv: testData.resolvedEnv, AuthParams: testData.authParams, TriggerIndex: index}, logr.Discard()) + meta, err := parseAwsSqsQueueMetadata(&scalersconfig.ScalerConfig{TriggerMetadata: testData.metadata, ResolvedEnv: testData.resolvedEnv, AuthParams: testData.authParams, TriggerIndex: index}) if err != nil { t.Fatal("Could not parse metadata:", err) } scaler := awsSqsQueueScaler{"", meta, &mockSqs{}, logr.Discard()} value, _, err := scaler.GetMetricsAndActivity(context.Background(), "MetricName") - switch meta.queueURL { + switch meta.QueueURL { case testAWSSQSErrorQueueURL: assert.Error(t, err, "expect error because of sqs api error") case testAWSSQSBadDataQueueURL: assert.Error(t, err, "expect error because of bad data return from sqs") default: expectedMessages := testAWSSQSApproximateNumberOfMessagesVisible - if meta.scaleOnInFlight { + if meta.ScaleOnInFlight { expectedMessages += testAWSSQSApproximateNumberOfMessagesNotVisible } - if meta.scaleOnDelayed { + if meta.ScaleOnDelayed { expectedMessages += testAWSSQSApproximateNumberOfMessagesDelayed } assert.EqualValues(t, int64(expectedMessages), value[0].Value.Value()) From 2adbc81571e5301f69fa3a591f5e397f0b564aa1 Mon Sep 17 00:00:00 2001 From: Omer Aplatony Date: Thu, 28 Nov 2024 00:03:21 +0200 Subject: [PATCH 3/3] Refactor AWS DynamoDB Streams scaler configuration (#6351) * Refactor AWS DynamoDB Streams scaler configuration Signed-off-by: Omer Aplatony * fixed unit tests Signed-off-by: Omer Aplatony * Fix invalid value test Signed-off-by: Omer Aplatony * go fmt Signed-off-by: Omer Aplatony --------- Signed-off-by: Omer Aplatony --- pkg/scalers/aws_dynamodb_streams_scaler.go | 69 ++++----------- .../aws_dynamodb_streams_scaler_test.go | 84 ++++++++----------- 2 files changed, 55 insertions(+), 98 deletions(-) diff --git a/pkg/scalers/aws_dynamodb_streams_scaler.go b/pkg/scalers/aws_dynamodb_streams_scaler.go index a8448a46408..cdcd8548320 100644 --- a/pkg/scalers/aws_dynamodb_streams_scaler.go +++ b/pkg/scalers/aws_dynamodb_streams_scaler.go @@ -3,7 +3,6 @@ package scalers import ( "context" "fmt" - "strconv" "github.com/aws/aws-sdk-go-v2/aws" "github.com/aws/aws-sdk-go-v2/service/dynamodb" @@ -31,11 +30,11 @@ type awsDynamoDBStreamsScaler struct { } type awsDynamoDBStreamsMetadata struct { - targetShardCount int64 - activationTargetShardCount int64 - tableName string - awsRegion string - awsEndpoint string + TargetShardCount int64 `keda:"name=shardCount, order=triggerMetadata, default=2"` + ActivationTargetShardCount int64 `keda:"name=activationShardCount, order=triggerMetadata, default=0"` + TableName string `keda:"name=tableName, order=triggerMetadata"` + AwsRegion string `keda:"name=awsRegion, order=triggerMetadata"` + AwsEndpoint string `keda:"name=awsEndpoint, order=triggerMetadata, optional"` awsAuthorization awsutils.AuthorizationMetadata triggerIndex int } @@ -49,7 +48,7 @@ func NewAwsDynamoDBStreamsScaler(ctx context.Context, config *scalersconfig.Scal logger := InitializeLogger(config, "aws_dynamodb_streams_scaler") - meta, err := parseAwsDynamoDBStreamsMetadata(config, logger) + meta, err := parseAwsDynamoDBStreamsMetadata(config) if err != nil { return nil, fmt.Errorf("error parsing dynamodb stream metadata: %w", err) } @@ -58,7 +57,7 @@ func NewAwsDynamoDBStreamsScaler(ctx context.Context, config *scalersconfig.Scal if err != nil { return nil, fmt.Errorf("error when creating dynamodbstream client: %w", err) } - streamArn, err := getDynamoDBStreamsArn(ctx, dbClient, &meta.tableName) + streamArn, err := getDynamoDBStreamsArn(ctx, dbClient, &meta.TableName) if err != nil { return nil, fmt.Errorf("error dynamodb stream arn: %w", err) } @@ -74,43 +73,11 @@ func NewAwsDynamoDBStreamsScaler(ctx context.Context, config *scalersconfig.Scal }, nil } -func parseAwsDynamoDBStreamsMetadata(config *scalersconfig.ScalerConfig, logger logr.Logger) (*awsDynamoDBStreamsMetadata, error) { +func parseAwsDynamoDBStreamsMetadata(config *scalersconfig.ScalerConfig) (*awsDynamoDBStreamsMetadata, error) { meta := awsDynamoDBStreamsMetadata{} - meta.targetShardCount = defaultTargetDBStreamsShardCount - if val, ok := config.TriggerMetadata["awsRegion"]; ok && val != "" { - meta.awsRegion = val - } else { - return nil, fmt.Errorf("no awsRegion given") - } - - if val, ok := config.TriggerMetadata["awsEndpoint"]; ok { - meta.awsEndpoint = val - } - - if val, ok := config.TriggerMetadata["tableName"]; ok && val != "" { - meta.tableName = val - } else { - return nil, fmt.Errorf("no tableName given") - } - - if val, ok := config.TriggerMetadata["shardCount"]; ok && val != "" { - shardCount, err := strconv.ParseInt(val, 10, 64) - if err != nil { - meta.targetShardCount = defaultTargetDBStreamsShardCount - logger.Error(err, "error parsing dyanmodb stream metadata shardCount, using default %n", defaultTargetDBStreamsShardCount) - } else { - meta.targetShardCount = shardCount - } - } - if val, ok := config.TriggerMetadata["activationShardCount"]; ok && val != "" { - shardCount, err := strconv.ParseInt(val, 10, 64) - if err != nil { - meta.activationTargetShardCount = defaultActivationTargetDBStreamsShardCount - logger.Error(err, "error parsing dyanmodb stream metadata activationTargetShardCount, using default %n", defaultActivationTargetDBStreamsShardCount) - } else { - meta.activationTargetShardCount = shardCount - } + if err := config.TypedConfig(&meta); err != nil { + return nil, fmt.Errorf("error parsing dynamodb stream metadata: %w", err) } auth, err := awsutils.GetAwsAuthorization(config.TriggerUniqueKey, config.PodIdentity, config.TriggerMetadata, config.AuthParams, config.ResolvedEnv) @@ -125,18 +92,18 @@ func parseAwsDynamoDBStreamsMetadata(config *scalersconfig.ScalerConfig, logger } func createClientsForDynamoDBStreamsScaler(ctx context.Context, metadata *awsDynamoDBStreamsMetadata) (*dynamodb.Client, *dynamodbstreams.Client, error) { - cfg, err := awsutils.GetAwsConfig(ctx, metadata.awsRegion, metadata.awsAuthorization) + cfg, err := awsutils.GetAwsConfig(ctx, metadata.AwsRegion, metadata.awsAuthorization) if err != nil { return nil, nil, err } dbClient := dynamodb.NewFromConfig(*cfg, func(options *dynamodb.Options) { - if metadata.awsEndpoint != "" { - options.BaseEndpoint = aws.String(metadata.awsEndpoint) + if metadata.AwsEndpoint != "" { + options.BaseEndpoint = aws.String(metadata.AwsEndpoint) } }) dbStreamClient := dynamodbstreams.NewFromConfig(*cfg, func(options *dynamodbstreams.Options) { - if metadata.awsEndpoint != "" { - options.BaseEndpoint = aws.String(metadata.awsEndpoint) + if metadata.AwsEndpoint != "" { + options.BaseEndpoint = aws.String(metadata.AwsEndpoint) } }) @@ -176,9 +143,9 @@ func (s *awsDynamoDBStreamsScaler) Close(_ context.Context) error { func (s *awsDynamoDBStreamsScaler) GetMetricSpecForScaling(_ context.Context) []v2.MetricSpec { externalMetric := &v2.ExternalMetricSource{ Metric: v2.MetricIdentifier{ - Name: GenerateMetricNameWithIndex(s.metadata.triggerIndex, kedautil.NormalizeString(fmt.Sprintf("aws-dynamodb-streams-%s", s.metadata.tableName))), + Name: GenerateMetricNameWithIndex(s.metadata.triggerIndex, kedautil.NormalizeString(fmt.Sprintf("aws-dynamodb-streams-%s", s.metadata.TableName))), }, - Target: GetMetricTarget(s.metricType, s.metadata.targetShardCount), + Target: GetMetricTarget(s.metricType, s.metadata.TargetShardCount), } metricSpec := v2.MetricSpec{External: externalMetric, Type: externalMetricType} return []v2.MetricSpec{metricSpec} @@ -195,7 +162,7 @@ func (s *awsDynamoDBStreamsScaler) GetMetricsAndActivity(ctx context.Context, me metric := GenerateMetricInMili(metricName, float64(shardCount)) - return []external_metrics.ExternalMetricValue{metric}, shardCount > s.metadata.activationTargetShardCount, nil + return []external_metrics.ExternalMetricValue{metric}, shardCount > s.metadata.ActivationTargetShardCount, nil } // GetDynamoDBStreamShardCount Get DynamoDB Stream Shard Count diff --git a/pkg/scalers/aws_dynamodb_streams_scaler_test.go b/pkg/scalers/aws_dynamodb_streams_scaler_test.go index 5c87de8d87a..ce1232413a6 100644 --- a/pkg/scalers/aws_dynamodb_streams_scaler_test.go +++ b/pkg/scalers/aws_dynamodb_streams_scaler_test.go @@ -135,10 +135,10 @@ var testAwsDynamoDBStreamMetadata = []parseAwsDynamoDBStreamsMetadataTestData{ "awsRegion": testAWSDynamoDBStreamsRegion}, authParams: testAWSKinesisAuthentication, expected: &awsDynamoDBStreamsMetadata{ - targetShardCount: 2, - activationTargetShardCount: 1, - tableName: testAWSDynamoDBSmallTable, - awsRegion: testAWSDynamoDBStreamsRegion, + TargetShardCount: 2, + ActivationTargetShardCount: 1, + TableName: testAWSDynamoDBSmallTable, + AwsRegion: testAWSDynamoDBStreamsRegion, awsAuthorization: awsutils.AuthorizationMetadata{ AwsAccessKeyID: testAWSDynamoDBStreamsAccessKeyID, AwsSecretAccessKey: testAWSDynamoDBStreamsSecretAccessKey, @@ -159,11 +159,11 @@ var testAwsDynamoDBStreamMetadata = []parseAwsDynamoDBStreamsMetadataTestData{ "awsEndpoint": testAWSDynamoDBStreamsEndpoint}, authParams: testAWSKinesisAuthentication, expected: &awsDynamoDBStreamsMetadata{ - targetShardCount: 2, - activationTargetShardCount: 1, - tableName: testAWSDynamoDBSmallTable, - awsRegion: testAWSDynamoDBStreamsRegion, - awsEndpoint: testAWSDynamoDBStreamsEndpoint, + TargetShardCount: 2, + ActivationTargetShardCount: 1, + TableName: testAWSDynamoDBSmallTable, + AwsRegion: testAWSDynamoDBStreamsRegion, + AwsEndpoint: testAWSDynamoDBStreamsEndpoint, awsAuthorization: awsutils.AuthorizationMetadata{ AwsAccessKeyID: testAWSDynamoDBStreamsAccessKeyID, AwsSecretAccessKey: testAWSDynamoDBStreamsSecretAccessKey, @@ -204,10 +204,10 @@ var testAwsDynamoDBStreamMetadata = []parseAwsDynamoDBStreamsMetadataTestData{ "awsRegion": testAWSDynamoDBStreamsRegion}, authParams: testAWSKinesisAuthentication, expected: &awsDynamoDBStreamsMetadata{ - targetShardCount: defaultTargetDBStreamsShardCount, - activationTargetShardCount: defaultActivationTargetDBStreamsShardCount, - tableName: testAWSDynamoDBSmallTable, - awsRegion: testAWSDynamoDBStreamsRegion, + TargetShardCount: defaultTargetDBStreamsShardCount, + ActivationTargetShardCount: defaultActivationTargetDBStreamsShardCount, + TableName: testAWSDynamoDBSmallTable, + AwsRegion: testAWSDynamoDBStreamsRegion, awsAuthorization: awsutils.AuthorizationMetadata{ AwsAccessKeyID: testAWSDynamoDBStreamsAccessKeyID, AwsSecretAccessKey: testAWSDynamoDBStreamsSecretAccessKey, @@ -224,20 +224,10 @@ var testAwsDynamoDBStreamMetadata = []parseAwsDynamoDBStreamsMetadataTestData{ "tableName": testAWSDynamoDBSmallTable, "shardCount": "a", "awsRegion": testAWSDynamoDBStreamsRegion}, - authParams: testAWSKinesisAuthentication, - expected: &awsDynamoDBStreamsMetadata{ - targetShardCount: defaultTargetDBStreamsShardCount, - tableName: testAWSDynamoDBSmallTable, - awsRegion: testAWSDynamoDBStreamsRegion, - awsAuthorization: awsutils.AuthorizationMetadata{ - AwsAccessKeyID: testAWSDynamoDBStreamsAccessKeyID, - AwsSecretAccessKey: testAWSDynamoDBStreamsSecretAccessKey, - PodIdentityOwner: true, - }, - triggerIndex: 4, - }, - isError: false, - comment: "properly formed table name and region, wrong shard count", + authParams: testAWSKinesisAuthentication, + expected: &awsDynamoDBStreamsMetadata{}, + isError: true, + comment: "invalid value - should cause error", triggerIndex: 4, }, { @@ -278,9 +268,9 @@ var testAwsDynamoDBStreamMetadata = []parseAwsDynamoDBStreamsMetadataTestData{ "awsSessionToken": testAWSDynamoDBStreamsSessionToken, }, expected: &awsDynamoDBStreamsMetadata{ - targetShardCount: 2, - tableName: testAWSDynamoDBSmallTable, - awsRegion: testAWSDynamoDBStreamsRegion, + TargetShardCount: 2, + TableName: testAWSDynamoDBSmallTable, + AwsRegion: testAWSDynamoDBStreamsRegion, awsAuthorization: awsutils.AuthorizationMetadata{ AwsAccessKeyID: testAWSDynamoDBStreamsAccessKeyID, AwsSecretAccessKey: testAWSDynamoDBStreamsSecretAccessKey, @@ -330,9 +320,9 @@ var testAwsDynamoDBStreamMetadata = []parseAwsDynamoDBStreamsMetadataTestData{ "awsRoleArn": testAWSDynamoDBStreamsRoleArn, }, expected: &awsDynamoDBStreamsMetadata{ - targetShardCount: 2, - tableName: testAWSDynamoDBSmallTable, - awsRegion: testAWSDynamoDBStreamsRegion, + TargetShardCount: 2, + TableName: testAWSDynamoDBSmallTable, + AwsRegion: testAWSDynamoDBStreamsRegion, awsAuthorization: awsutils.AuthorizationMetadata{ AwsRoleArn: testAWSDynamoDBStreamsRoleArn, PodIdentityOwner: true, @@ -350,9 +340,9 @@ var testAwsDynamoDBStreamMetadata = []parseAwsDynamoDBStreamsMetadataTestData{ "identityOwner": "operator"}, authParams: map[string]string{}, expected: &awsDynamoDBStreamsMetadata{ - targetShardCount: 2, - tableName: testAWSDynamoDBSmallTable, - awsRegion: testAWSDynamoDBStreamsRegion, + TargetShardCount: 2, + TableName: testAWSDynamoDBSmallTable, + AwsRegion: testAWSDynamoDBStreamsRegion, awsAuthorization: awsutils.AuthorizationMetadata{ PodIdentityOwner: false, }, @@ -370,15 +360,15 @@ var awsDynamoDBStreamMetricIdentifiers = []awsDynamoDBStreamsMetricIdentifier{ } var awsDynamoDBStreamsGetMetricTestData = []*awsDynamoDBStreamsMetadata{ - {tableName: testAWSDynamoDBBigTable}, - {tableName: testAWSDynamoDBSmallTable}, - {tableName: testAWSDynamoDBErrorTable}, - {tableName: testAWSDynamoDBInvalidTable}, + {TableName: testAWSDynamoDBBigTable}, + {TableName: testAWSDynamoDBSmallTable}, + {TableName: testAWSDynamoDBErrorTable}, + {TableName: testAWSDynamoDBInvalidTable}, } func TestParseAwsDynamoDBStreamsMetadata(t *testing.T) { for _, testData := range testAwsDynamoDBStreamMetadata { - result, err := parseAwsDynamoDBStreamsMetadata(&scalersconfig.ScalerConfig{TriggerMetadata: testData.metadata, ResolvedEnv: testAwsDynamoDBStreamAuthentication, AuthParams: testData.authParams, TriggerIndex: testData.triggerIndex}, logr.Discard()) + result, err := parseAwsDynamoDBStreamsMetadata(&scalersconfig.ScalerConfig{TriggerMetadata: testData.metadata, ResolvedEnv: testAwsDynamoDBStreamAuthentication, AuthParams: testData.authParams, TriggerIndex: testData.triggerIndex}) if err != nil && !testData.isError { t.Errorf("Expected success because %s got error, %s", testData.comment, err) } @@ -395,11 +385,11 @@ func TestParseAwsDynamoDBStreamsMetadata(t *testing.T) { func TestAwsDynamoDBStreamsGetMetricSpecForScaling(t *testing.T) { for _, testData := range awsDynamoDBStreamMetricIdentifiers { ctx := context.Background() - meta, err := parseAwsDynamoDBStreamsMetadata(&scalersconfig.ScalerConfig{TriggerMetadata: testData.metadataTestData.metadata, ResolvedEnv: testAwsDynamoDBStreamAuthentication, AuthParams: testData.metadataTestData.authParams, TriggerIndex: testData.triggerIndex}, logr.Discard()) + meta, err := parseAwsDynamoDBStreamsMetadata(&scalersconfig.ScalerConfig{TriggerMetadata: testData.metadataTestData.metadata, ResolvedEnv: testAwsDynamoDBStreamAuthentication, AuthParams: testData.metadataTestData.authParams, TriggerIndex: testData.triggerIndex}) if err != nil { t.Fatal("Could not parse metadata:", err) } - streamArn, err := getDynamoDBStreamsArn(ctx, &mockAwsDynamoDB{}, &meta.tableName) + streamArn, err := getDynamoDBStreamsArn(ctx, &mockAwsDynamoDB{}, &meta.TableName) if err != nil { t.Fatal("Could not get dynamodb stream arn:", err) } @@ -418,12 +408,12 @@ func TestAwsDynamoDBStreamsScalerGetMetrics(t *testing.T) { var err error var streamArn *string ctx := context.Background() - streamArn, err = getDynamoDBStreamsArn(ctx, &mockAwsDynamoDB{}, &meta.tableName) + streamArn, err = getDynamoDBStreamsArn(ctx, &mockAwsDynamoDB{}, &meta.TableName) if err == nil { scaler := awsDynamoDBStreamsScaler{"", meta, streamArn, &mockAwsDynamoDBStreams{}, logr.Discard()} value, _, err = scaler.GetMetricsAndActivity(context.Background(), "MetricName") } - switch meta.tableName { + switch meta.TableName { case testAWSDynamoDBErrorTable: assert.Error(t, err, "expect error because of dynamodb stream api error") case testAWSDynamoDBInvalidTable: @@ -442,12 +432,12 @@ func TestAwsDynamoDBStreamsScalerIsActive(t *testing.T) { var err error var streamArn *string ctx := context.Background() - streamArn, err = getDynamoDBStreamsArn(ctx, &mockAwsDynamoDB{}, &meta.tableName) + streamArn, err = getDynamoDBStreamsArn(ctx, &mockAwsDynamoDB{}, &meta.TableName) if err == nil { scaler := awsDynamoDBStreamsScaler{"", meta, streamArn, &mockAwsDynamoDBStreams{}, logr.Discard()} _, value, err = scaler.GetMetricsAndActivity(context.Background(), "MetricName") } - switch meta.tableName { + switch meta.TableName { case testAWSDynamoDBErrorTable: assert.Error(t, err, "expect error because of dynamodb stream api error") case testAWSDynamoDBInvalidTable: