From 0aed528231177b560f034b4c4dc652215ef457d3 Mon Sep 17 00:00:00 2001 From: Omer Aplatony Date: Mon, 2 Dec 2024 20:56:19 +0200 Subject: [PATCH] refactor aws_kinesis_stream_scaler Signed-off-by: Omer Aplatony --- pkg/scalers/aws_kinesis_stream_scaler.go | 71 +++++------------- pkg/scalers/aws_kinesis_stream_scaler_test.go | 75 ++++++++----------- 2 files changed, 51 insertions(+), 95 deletions(-) diff --git a/pkg/scalers/aws_kinesis_stream_scaler.go b/pkg/scalers/aws_kinesis_stream_scaler.go index 65fb90a4e23..4b58426514a 100644 --- a/pkg/scalers/aws_kinesis_stream_scaler.go +++ b/pkg/scalers/aws_kinesis_stream_scaler.go @@ -3,7 +3,6 @@ package scalers import ( "context" "fmt" - "strconv" "github.com/aws/aws-sdk-go-v2/aws" "github.com/aws/aws-sdk-go-v2/service/kinesis" @@ -41,11 +40,11 @@ func (w kinesisWrapperClient) DescribeStreamSummary(ctx context.Context, params } type awsKinesisStreamMetadata struct { - targetShardCount int64 - activationTargetShardCount int64 - streamName string - awsRegion string - awsEndpoint string + TargetShardCount int64 `keda:"name=shardCount, order=triggerMetadata, default=2"` + ActivationTargetShardCount int64 `keda:"name=activationShardCount, order=triggerMetadata, default=0"` + StreamName string `keda:"name=streamName, order=triggerMetadata"` + AwsRegion string `keda:"name=awsRegion, order=triggerMetadata"` + AwsEndpoint string `keda:"name=awsEndpoint, order=triggerMetadata, optional"` awsAuthorization awsutils.AuthorizationMetadata triggerIndex int } @@ -59,7 +58,7 @@ func NewAwsKinesisStreamScaler(ctx context.Context, config *scalersconfig.Scaler logger := InitializeLogger(config, "aws_kinesis_stream_scaler") - meta, err := parseAwsKinesisStreamMetadata(config, logger) + meta, err := parseAwsKinesisStreamMetadata(config) if err != nil { return nil, fmt.Errorf("error parsing Kinesis stream metadata: %w", err) } @@ -78,44 +77,11 @@ func NewAwsKinesisStreamScaler(ctx context.Context, config *scalersconfig.Scaler }, nil } -func parseAwsKinesisStreamMetadata(config *scalersconfig.ScalerConfig, logger logr.Logger) (*awsKinesisStreamMetadata, error) { - meta := awsKinesisStreamMetadata{} - meta.targetShardCount = targetShardCountDefault - - if val, ok := config.TriggerMetadata["shardCount"]; ok && val != "" { - shardCount, err := strconv.ParseInt(val, 10, 64) - if err != nil { - meta.targetShardCount = targetShardCountDefault - logger.Error(err, "Error parsing Kinesis stream metadata shardCount, using default %n", targetShardCountDefault) - } else { - meta.targetShardCount = shardCount - } - } - - if val, ok := config.TriggerMetadata["activationShardCount"]; ok && val != "" { - activationShardCount, err := strconv.ParseInt(val, 10, 64) - if err != nil { - meta.activationTargetShardCount = activationTargetShardCountDefault - logger.Error(err, "Error parsing Kinesis stream metadata activationShardCount, using default %n", activationTargetShardCountDefault) - } else { - meta.activationTargetShardCount = activationShardCount - } - } +func parseAwsKinesisStreamMetadata(config *scalersconfig.ScalerConfig) (*awsKinesisStreamMetadata, error) { + meta := &awsKinesisStreamMetadata{} - if val, ok := config.TriggerMetadata["streamName"]; ok && val != "" { - meta.streamName = val - } else { - return nil, fmt.Errorf("no streamName given") - } - - if val, ok := config.TriggerMetadata["awsRegion"]; ok && val != "" { - meta.awsRegion = val - } else { - return nil, fmt.Errorf("no awsRegion given") - } - - if val, ok := config.TriggerMetadata["awsEndpoint"]; ok { - meta.awsEndpoint = val + if err := config.TypedConfig(meta); err != nil { + return nil, fmt.Errorf("error parsing Kinesis stream metadata: %w", err) } auth, err := awsutils.GetAwsAuthorization(config.TriggerUniqueKey, config.PodIdentity, config.TriggerMetadata, config.AuthParams, config.ResolvedEnv) @@ -124,20 +90,19 @@ func parseAwsKinesisStreamMetadata(config *scalersconfig.ScalerConfig, logger lo } meta.awsAuthorization = auth - meta.triggerIndex = config.TriggerIndex - return &meta, nil + return meta, nil } func createKinesisClient(ctx context.Context, metadata *awsKinesisStreamMetadata) (*kinesis.Client, error) { - cfg, err := awsutils.GetAwsConfig(ctx, metadata.awsRegion, metadata.awsAuthorization) + cfg, err := awsutils.GetAwsConfig(ctx, metadata.AwsRegion, metadata.awsAuthorization) if err != nil { return nil, err } return kinesis.NewFromConfig(*cfg, func(options *kinesis.Options) { - if metadata.awsEndpoint != "" { - options.BaseEndpoint = aws.String(metadata.awsEndpoint) + if metadata.AwsEndpoint != "" { + options.BaseEndpoint = aws.String(metadata.AwsEndpoint) } }), nil } @@ -150,9 +115,9 @@ func (s *awsKinesisStreamScaler) Close(context.Context) error { func (s *awsKinesisStreamScaler) GetMetricSpecForScaling(context.Context) []v2.MetricSpec { externalMetric := &v2.ExternalMetricSource{ Metric: v2.MetricIdentifier{ - Name: GenerateMetricNameWithIndex(s.metadata.triggerIndex, kedautil.NormalizeString(fmt.Sprintf("aws-kinesis-%s", s.metadata.streamName))), + Name: GenerateMetricNameWithIndex(s.metadata.triggerIndex, kedautil.NormalizeString(fmt.Sprintf("aws-kinesis-%s", s.metadata.StreamName))), }, - Target: GetMetricTarget(s.metricType, s.metadata.targetShardCount), + Target: GetMetricTarget(s.metricType, s.metadata.TargetShardCount), } metricSpec := v2.MetricSpec{External: externalMetric, Type: externalMetricType} return []v2.MetricSpec{metricSpec} @@ -169,13 +134,13 @@ func (s *awsKinesisStreamScaler) GetMetricsAndActivity(ctx context.Context, metr metric := GenerateMetricInMili(metricName, float64(shardCount)) - return []external_metrics.ExternalMetricValue{metric}, shardCount > s.metadata.activationTargetShardCount, nil + return []external_metrics.ExternalMetricValue{metric}, shardCount > s.metadata.ActivationTargetShardCount, nil } // GetAwsKinesisOpenShardCount Get Kinesis open shard count func (s *awsKinesisStreamScaler) GetAwsKinesisOpenShardCount(ctx context.Context) (int64, error) { input := &kinesis.DescribeStreamSummaryInput{ - StreamName: &s.metadata.streamName, + StreamName: &s.metadata.StreamName, } output, err := s.kinesisWrapperClient.DescribeStreamSummary(ctx, input) diff --git a/pkg/scalers/aws_kinesis_stream_scaler_test.go b/pkg/scalers/aws_kinesis_stream_scaler_test.go index c62a047d7d6..676c2d778f0 100644 --- a/pkg/scalers/aws_kinesis_stream_scaler_test.go +++ b/pkg/scalers/aws_kinesis_stream_scaler_test.go @@ -68,7 +68,8 @@ var testAWSKinesisMetadata = []parseAWSKinesisMetadataTestData{ authParams: testAWSKinesisAuthentication, expected: &awsKinesisStreamMetadata{}, isError: true, - comment: "metadata empty"}, + comment: "metadata empty", + }, { metadata: map[string]string{ "streamName": testAWSKinesisStreamName, @@ -77,10 +78,10 @@ var testAWSKinesisMetadata = []parseAWSKinesisMetadataTestData{ "awsRegion": testAWSRegion}, authParams: testAWSKinesisAuthentication, expected: &awsKinesisStreamMetadata{ - targetShardCount: 2, - activationTargetShardCount: 1, - streamName: testAWSKinesisStreamName, - awsRegion: testAWSRegion, + TargetShardCount: 2, + ActivationTargetShardCount: 1, + StreamName: testAWSKinesisStreamName, + AwsRegion: testAWSRegion, awsAuthorization: awsutils.AuthorizationMetadata{ AwsAccessKeyID: testAWSKinesisAccessKeyID, AwsSecretAccessKey: testAWSKinesisSecretAccessKey, @@ -101,11 +102,11 @@ var testAWSKinesisMetadata = []parseAWSKinesisMetadataTestData{ "awsEndpoint": testAWSEndpoint}, authParams: testAWSKinesisAuthentication, expected: &awsKinesisStreamMetadata{ - targetShardCount: 2, - activationTargetShardCount: 1, - streamName: testAWSKinesisStreamName, - awsRegion: testAWSRegion, - awsEndpoint: testAWSEndpoint, + TargetShardCount: 2, + ActivationTargetShardCount: 1, + StreamName: testAWSKinesisStreamName, + AwsRegion: testAWSRegion, + AwsEndpoint: testAWSEndpoint, awsAuthorization: awsutils.AuthorizationMetadata{ AwsAccessKeyID: testAWSKinesisAccessKeyID, AwsSecretAccessKey: testAWSKinesisSecretAccessKey, @@ -147,10 +148,10 @@ var testAWSKinesisMetadata = []parseAWSKinesisMetadataTestData{ "awsRegion": testAWSRegion}, authParams: testAWSKinesisAuthentication, expected: &awsKinesisStreamMetadata{ - targetShardCount: targetShardCountDefault, - activationTargetShardCount: activationTargetShardCountDefault, - streamName: testAWSKinesisStreamName, - awsRegion: testAWSRegion, + TargetShardCount: targetShardCountDefault, + ActivationTargetShardCount: activationTargetShardCountDefault, + StreamName: testAWSKinesisStreamName, + AwsRegion: testAWSRegion, awsAuthorization: awsutils.AuthorizationMetadata{ AwsAccessKeyID: testAWSKinesisAccessKeyID, AwsSecretAccessKey: testAWSKinesisSecretAccessKey, @@ -167,20 +168,10 @@ var testAWSKinesisMetadata = []parseAWSKinesisMetadataTestData{ "streamName": testAWSKinesisStreamName, "shardCount": "a", "awsRegion": testAWSRegion}, - authParams: testAWSKinesisAuthentication, - expected: &awsKinesisStreamMetadata{ - targetShardCount: 2, - streamName: testAWSKinesisStreamName, - awsRegion: testAWSRegion, - awsAuthorization: awsutils.AuthorizationMetadata{ - AwsAccessKeyID: testAWSKinesisAccessKeyID, - AwsSecretAccessKey: testAWSKinesisSecretAccessKey, - PodIdentityOwner: true, - }, - triggerIndex: 4, - }, - isError: false, - comment: "properly formed stream name and region, wrong shard count", + authParams: testAWSKinesisAuthentication, + expected: &awsKinesisStreamMetadata{}, + isError: true, + comment: "invalid shardCount value", triggerIndex: 4, }, { @@ -221,9 +212,9 @@ var testAWSKinesisMetadata = []parseAWSKinesisMetadataTestData{ "awsSessionToken": testAWSKinesisSessionToken, }, expected: &awsKinesisStreamMetadata{ - targetShardCount: 2, - streamName: testAWSKinesisStreamName, - awsRegion: testAWSRegion, + TargetShardCount: 2, + StreamName: testAWSKinesisStreamName, + AwsRegion: testAWSRegion, awsAuthorization: awsutils.AuthorizationMetadata{ AwsAccessKeyID: testAWSKinesisAccessKeyID, AwsSecretAccessKey: testAWSKinesisSecretAccessKey, @@ -273,9 +264,9 @@ var testAWSKinesisMetadata = []parseAWSKinesisMetadataTestData{ "awsRoleArn": testAWSKinesisRoleArn, }, expected: &awsKinesisStreamMetadata{ - targetShardCount: 2, - streamName: testAWSKinesisStreamName, - awsRegion: testAWSRegion, + TargetShardCount: 2, + StreamName: testAWSKinesisStreamName, + AwsRegion: testAWSRegion, awsAuthorization: awsutils.AuthorizationMetadata{ AwsRoleArn: testAWSKinesisRoleArn, PodIdentityOwner: true, @@ -293,9 +284,9 @@ var testAWSKinesisMetadata = []parseAWSKinesisMetadataTestData{ "identityOwner": "operator"}, authParams: map[string]string{}, expected: &awsKinesisStreamMetadata{ - targetShardCount: 2, - streamName: testAWSKinesisStreamName, - awsRegion: testAWSRegion, + TargetShardCount: 2, + StreamName: testAWSKinesisStreamName, + AwsRegion: testAWSRegion, awsAuthorization: awsutils.AuthorizationMetadata{ PodIdentityOwner: false, }, @@ -313,13 +304,13 @@ var awsKinesisMetricIdentifiers = []awsKinesisMetricIdentifier{ } var awsKinesisGetMetricTestData = []*awsKinesisStreamMetadata{ - {streamName: "Good"}, - {streamName: testAWSKinesisErrorStream}, + {StreamName: "Good"}, + {StreamName: testAWSKinesisErrorStream}, } func TestKinesisParseMetadata(t *testing.T) { for _, testData := range testAWSKinesisMetadata { - result, err := parseAwsKinesisStreamMetadata(&scalersconfig.ScalerConfig{TriggerMetadata: testData.metadata, ResolvedEnv: testAWSKinesisAuthentication, AuthParams: testData.authParams, TriggerIndex: testData.triggerIndex}, logr.Discard()) + result, err := parseAwsKinesisStreamMetadata(&scalersconfig.ScalerConfig{TriggerMetadata: testData.metadata, ResolvedEnv: testAWSKinesisAuthentication, AuthParams: testData.authParams, TriggerIndex: testData.triggerIndex}) if err != nil && !testData.isError { t.Errorf("Expected success because %s got error, %s", testData.comment, err) } @@ -336,7 +327,7 @@ func TestKinesisParseMetadata(t *testing.T) { func TestAWSKinesisGetMetricSpecForScaling(t *testing.T) { for _, testData := range awsKinesisMetricIdentifiers { ctx := context.Background() - meta, err := parseAwsKinesisStreamMetadata(&scalersconfig.ScalerConfig{TriggerMetadata: testData.metadataTestData.metadata, ResolvedEnv: testAWSKinesisAuthentication, AuthParams: testData.metadataTestData.authParams, TriggerIndex: testData.triggerIndex}, logr.Discard()) + meta, err := parseAwsKinesisStreamMetadata(&scalersconfig.ScalerConfig{TriggerMetadata: testData.metadataTestData.metadata, ResolvedEnv: testAWSKinesisAuthentication, AuthParams: testData.metadataTestData.authParams, TriggerIndex: testData.triggerIndex}) if err != nil { t.Fatal("Could not parse metadata:", err) } @@ -354,7 +345,7 @@ func TestAWSKinesisStreamScalerGetMetrics(t *testing.T) { for _, meta := range awsKinesisGetMetricTestData { scaler := awsKinesisStreamScaler{"", meta, &mockKinesis{}, logr.Discard()} value, _, err := scaler.GetMetricsAndActivity(context.Background(), "MetricName") - switch meta.streamName { + switch meta.StreamName { case testAWSKinesisErrorStream: assert.Error(t, err, "expect error because of kinesis api error") default: