From 2bd19ef9be82e8a498a926ff94a8af105fe8a84a Mon Sep 17 00:00:00 2001 From: Max Cao Date: Thu, 3 Oct 2024 10:47:41 -0700 Subject: [PATCH] Add BoundServiceAccountToken trigger authentication type --- .../v1alpha1/triggerauthentication_types.go | 9 + apis/keda/v1alpha1/zz_generated.deepcopy.go | 20 ++ cmd/operator/main.go | 14 +- ...keda.sh_clustertriggerauthentications.yaml | 15 ++ .../bases/keda.sh_triggerauthentications.yaml | 15 ++ config/rbac/role.yaml | 7 + controllers/keda/scaledjob_controller.go | 7 +- controllers/keda/scaledobject_controller.go | 1 + go.mod | 4 +- pkg/eventemitter/eventemitter.go | 10 +- .../mock_executor/mock_interface.go | 2 +- .../authentication/authentication_helpers.go | 9 + pkg/scaling/resolver/scale_resolvers.go | 226 +++++++++++++++++- pkg/scaling/resolver/scale_resolvers_test.go | 8 +- pkg/scaling/scale_handler.go | 8 +- pkg/scaling/scalers_builder.go | 4 +- 16 files changed, 327 insertions(+), 32 deletions(-) diff --git a/apis/keda/v1alpha1/triggerauthentication_types.go b/apis/keda/v1alpha1/triggerauthentication_types.go index 0b0d9ffa315..aa85def04b8 100644 --- a/apis/keda/v1alpha1/triggerauthentication_types.go +++ b/apis/keda/v1alpha1/triggerauthentication_types.go @@ -95,6 +95,9 @@ type TriggerAuthenticationSpec struct { // +optional AwsSecretManager *AwsSecretManager `json:"awsSecretManager,omitempty"` + + // +optional + BoundServiceAccountToken []BoundServiceAccountToken `json:"boundServiceAccountToken,omitempty"` } // TriggerAuthenticationStatus defines the observed state of TriggerAuthentication @@ -378,6 +381,12 @@ type AwsSecretManagerSecret struct { VersionStage string `json:"versionStage,omitempty"` } +type BoundServiceAccountToken struct { + Parameter string `json:"parameter"` + ServiceAccountName string `json:"serviceAccountName"` + Expiry string `json:"expiry"` +} + func init() { SchemeBuilder.Register(&ClusterTriggerAuthentication{}, &ClusterTriggerAuthenticationList{}) SchemeBuilder.Register(&TriggerAuthentication{}, &TriggerAuthenticationList{}) diff --git a/apis/keda/v1alpha1/zz_generated.deepcopy.go b/apis/keda/v1alpha1/zz_generated.deepcopy.go index a144aeb07d3..aaef92bcdcb 100755 --- a/apis/keda/v1alpha1/zz_generated.deepcopy.go +++ b/apis/keda/v1alpha1/zz_generated.deepcopy.go @@ -354,6 +354,21 @@ func (in *AzureKeyVaultSecret) DeepCopy() *AzureKeyVaultSecret { return out } +// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. +func (in *BoundServiceAccountToken) DeepCopyInto(out *BoundServiceAccountToken) { + *out = *in +} + +// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new BoundServiceAccountToken. +func (in *BoundServiceAccountToken) DeepCopy() *BoundServiceAccountToken { + if in == nil { + return nil + } + out := new(BoundServiceAccountToken) + in.DeepCopyInto(out) + return out +} + // DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. func (in *ClusterTriggerAuthentication) DeepCopyInto(out *ClusterTriggerAuthentication) { *out = *in @@ -1177,6 +1192,11 @@ func (in *TriggerAuthenticationSpec) DeepCopyInto(out *TriggerAuthenticationSpec *out = new(AwsSecretManager) (*in).DeepCopyInto(*out) } + if in.BoundServiceAccountToken != nil { + in, out := &in.BoundServiceAccountToken, &out.BoundServiceAccountToken + *out = make([]BoundServiceAccountToken, len(*in)) + copy(*out, *in) + } } // DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new TriggerAuthenticationSpec. diff --git a/cmd/operator/main.go b/cmd/operator/main.go index aa81dc79fce..714320e837d 100644 --- a/cmd/operator/main.go +++ b/cmd/operator/main.go @@ -47,6 +47,7 @@ import ( "github.com/kedacore/keda/v2/pkg/k8s" "github.com/kedacore/keda/v2/pkg/metricscollector" "github.com/kedacore/keda/v2/pkg/metricsservice" + "github.com/kedacore/keda/v2/pkg/scalers/authentication" "github.com/kedacore/keda/v2/pkg/scaling" kedautil "github.com/kedacore/keda/v2/pkg/util" //+kubebuilder:scaffold:imports @@ -217,8 +218,14 @@ func main() { os.Exit(1) } - scaledHandler := scaling.NewScaleHandler(mgr.GetClient(), scaleClient, mgr.GetScheme(), globalHTTPTimeout, eventRecorder, secretInformer.Lister()) - eventEmitter := eventemitter.NewEventEmitter(mgr.GetClient(), eventRecorder, k8sClusterName, secretInformer.Lister()) + authClientSet := &authentication.AuthClientSet{ + TokenReviewInterface: kubeClientset.AuthenticationV1().TokenReviews(), + CoreV1Interface: kubeClientset.CoreV1(), + SecretLister: secretInformer.Lister(), + } + + scaledHandler := scaling.NewScaleHandler(mgr.GetClient(), scaleClient, mgr.GetScheme(), globalHTTPTimeout, eventRecorder, authClientSet) + eventEmitter := eventemitter.NewEventEmitter(mgr.GetClient(), eventRecorder, k8sClusterName, authClientSet) if err = (&kedacontrollers.ScaledObjectReconciler{ Client: mgr.GetClient(), @@ -237,8 +244,7 @@ func main() { Scheme: mgr.GetScheme(), GlobalHTTPTimeout: globalHTTPTimeout, EventEmitter: eventEmitter, - SecretsLister: secretInformer.Lister(), - SecretsSynced: secretInformer.Informer().HasSynced, + AuthClientSet: authClientSet, }).SetupWithManager(mgr, controller.Options{ MaxConcurrentReconciles: scaledJobMaxReconciles, }); err != nil { diff --git a/config/crd/bases/keda.sh_clustertriggerauthentications.yaml b/config/crd/bases/keda.sh_clustertriggerauthentications.yaml index db8dffadb28..960d2486290 100644 --- a/config/crd/bases/keda.sh_clustertriggerauthentications.yaml +++ b/config/crd/bases/keda.sh_clustertriggerauthentications.yaml @@ -302,6 +302,21 @@ spec: - secrets - vaultUri type: object + boundServiceAccountToken: + items: + properties: + expiry: + type: string + parameter: + type: string + serviceAccountName: + type: string + required: + - expiry + - parameter + - serviceAccountName + type: object + type: array configMapTargetRef: items: description: AuthConfigMapTargetRef is used to authenticate using diff --git a/config/crd/bases/keda.sh_triggerauthentications.yaml b/config/crd/bases/keda.sh_triggerauthentications.yaml index e8071ee6ff4..c9a8f483a2d 100644 --- a/config/crd/bases/keda.sh_triggerauthentications.yaml +++ b/config/crd/bases/keda.sh_triggerauthentications.yaml @@ -301,6 +301,21 @@ spec: - secrets - vaultUri type: object + boundServiceAccountToken: + items: + properties: + expiry: + type: string + parameter: + type: string + serviceAccountName: + type: string + required: + - expiry + - parameter + - serviceAccountName + type: object + type: array configMapTargetRef: items: description: AuthConfigMapTargetRef is used to authenticate using diff --git a/config/rbac/role.yaml b/config/rbac/role.yaml index fd9cf99b941..2658391bd9c 100644 --- a/config/rbac/role.yaml +++ b/config/rbac/role.yaml @@ -44,6 +44,13 @@ rules: verbs: - list - watch +- apiGroups: + - "" + resources: + - serviceaccounts/token + verbs: + - create + - get - apiGroups: - '*' resources: diff --git a/controllers/keda/scaledjob_controller.go b/controllers/keda/scaledjob_controller.go index bb1193be8b2..9094bf88c42 100755 --- a/controllers/keda/scaledjob_controller.go +++ b/controllers/keda/scaledjob_controller.go @@ -29,7 +29,6 @@ import ( "k8s.io/apimachinery/pkg/api/errors" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/runtime" - corev1listers "k8s.io/client-go/listers/core/v1" "k8s.io/client-go/tools/cache" ctrl "sigs.k8s.io/controller-runtime" "sigs.k8s.io/controller-runtime/pkg/builder" @@ -45,6 +44,7 @@ import ( "github.com/kedacore/keda/v2/pkg/eventemitter" "github.com/kedacore/keda/v2/pkg/eventreason" "github.com/kedacore/keda/v2/pkg/metricscollector" + "github.com/kedacore/keda/v2/pkg/scalers/authentication" "github.com/kedacore/keda/v2/pkg/scaling" kedastatus "github.com/kedacore/keda/v2/pkg/status" "github.com/kedacore/keda/v2/pkg/util" @@ -59,11 +59,10 @@ type ScaledJobReconciler struct { Scheme *runtime.Scheme GlobalHTTPTimeout time.Duration EventEmitter eventemitter.EventHandler + AuthClientSet *authentication.AuthClientSet scaledJobGenerations *sync.Map scaleHandler scaling.ScaleHandler - SecretsLister corev1listers.SecretLister - SecretsSynced cache.InformerSynced } type scaledJobMetricsData struct { @@ -83,7 +82,7 @@ func init() { // SetupWithManager initializes the ScaledJobReconciler instance and starts a new controller managed by the passed Manager instance. func (r *ScaledJobReconciler) SetupWithManager(mgr ctrl.Manager, options controller.Options) error { - r.scaleHandler = scaling.NewScaleHandler(mgr.GetClient(), nil, mgr.GetScheme(), r.GlobalHTTPTimeout, mgr.GetEventRecorderFor("scale-handler"), r.SecretsLister) + r.scaleHandler = scaling.NewScaleHandler(mgr.GetClient(), nil, mgr.GetScheme(), r.GlobalHTTPTimeout, mgr.GetEventRecorderFor("scale-handler"), r.AuthClientSet) r.scaledJobGenerations = &sync.Map{} return ctrl.NewControllerManagedBy(mgr). WithOptions(options). diff --git a/controllers/keda/scaledobject_controller.go b/controllers/keda/scaledobject_controller.go index d6f9e29449a..e8148ce13c5 100755 --- a/controllers/keda/scaledobject_controller.go +++ b/controllers/keda/scaledobject_controller.go @@ -58,6 +58,7 @@ import ( // +kubebuilder:rbac:groups=autoscaling,resources=horizontalpodautoscalers,verbs="*" // +kubebuilder:rbac:groups="",resources=configmaps;configmaps/status,verbs=get;list;watch // +kubebuilder:rbac:groups="",resources=events,verbs="*" +// +kubebuilder:rbac:groups="",resources=serviceaccounts/token,verbs=create;get // +kubebuilder:rbac:groups="",resources=pods;services;services;secrets;external,verbs=get;list;watch // +kubebuilder:rbac:groups="*",resources="*/scale",verbs=get;list;watch;update;patch // +kubebuilder:rbac:groups="",resources="serviceaccounts",verbs=list;watch diff --git a/go.mod b/go.mod index b6eff2b66cc..1bd3fa1f1e4 100644 --- a/go.mod +++ b/go.mod @@ -200,7 +200,7 @@ require ( github.com/aws/aws-sdk-go-v2/service/internal/presigned-url v1.11.17 // indirect github.com/aws/aws-sdk-go-v2/service/sso v1.22.4 // indirect github.com/aws/aws-sdk-go-v2/service/ssooidc v1.26.4 // indirect - github.com/aws/smithy-go v1.20.3 // indirect + github.com/aws/smithy-go v1.20.3 github.com/beorn7/perks v1.0.1 // indirect github.com/blang/semver/v4 v4.0.0 // indirect github.com/cenkalti/backoff/v3 v3.2.2 // indirect @@ -237,7 +237,7 @@ require ( github.com/gogo/protobuf v1.3.2 // indirect github.com/golang-jwt/jwt v3.2.2+incompatible // indirect github.com/golang-jwt/jwt/v4 v4.5.0 // indirect - github.com/golang-jwt/jwt/v5 v5.2.1 // indirect + github.com/golang-jwt/jwt/v5 v5.2.1 github.com/golang-sql/civil v0.0.0-20220223132316-b832511892a9 // indirect github.com/golang-sql/sqlexp v0.1.0 // indirect github.com/golang/groupcache v0.0.0-20210331224755-41bb18bfe9da // indirect diff --git a/pkg/eventemitter/eventemitter.go b/pkg/eventemitter/eventemitter.go index 91d57f3ca4d..4624018735f 100644 --- a/pkg/eventemitter/eventemitter.go +++ b/pkg/eventemitter/eventemitter.go @@ -36,7 +36,6 @@ import ( metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/runtime" "k8s.io/apimachinery/pkg/types" - corev1listers "k8s.io/client-go/listers/core/v1" "k8s.io/client-go/tools/record" "sigs.k8s.io/controller-runtime/pkg/client" logf "sigs.k8s.io/controller-runtime/pkg/log" @@ -44,6 +43,7 @@ import ( eventingv1alpha1 "github.com/kedacore/keda/v2/apis/eventing/v1alpha1" "github.com/kedacore/keda/v2/pkg/eventemitter/eventdata" "github.com/kedacore/keda/v2/pkg/metricscollector" + "github.com/kedacore/keda/v2/pkg/scalers/authentication" "github.com/kedacore/keda/v2/pkg/scaling/resolver" kedastatus "github.com/kedacore/keda/v2/pkg/status" ) @@ -66,7 +66,7 @@ type EventEmitter struct { eventFilterCacheLock *sync.RWMutex eventLoopContexts *sync.Map cloudEventProcessingChan chan eventdata.EventData - secretsLister corev1listers.SecretLister + authClientSet *authentication.AuthClientSet } // EventHandler defines the behavior for EventEmitter clients @@ -96,7 +96,7 @@ const ( ) // NewEventEmitter creates a new EventEmitter -func NewEventEmitter(client client.Client, recorder record.EventRecorder, clusterName string, secretsLister corev1listers.SecretLister) EventHandler { +func NewEventEmitter(client client.Client, recorder record.EventRecorder, clusterName string, authClientSet *authentication.AuthClientSet) EventHandler { return &EventEmitter{ log: logf.Log.WithName("event_emitter"), client: client, @@ -108,7 +108,7 @@ func NewEventEmitter(client client.Client, recorder record.EventRecorder, cluste eventFilterCacheLock: &sync.RWMutex{}, eventLoopContexts: &sync.Map{}, cloudEventProcessingChan: make(chan eventdata.EventData, maxChannelBuffer), - secretsLister: secretsLister, + authClientSet: authClientSet, } } @@ -188,7 +188,7 @@ func (e *EventEmitter) createEventHandlers(ctx context.Context, cloudEventSource } // Resolve auth related - authParams, podIdentity, err := resolver.ResolveAuthRefAndPodIdentity(ctx, e.client, e.log, spec.AuthenticationRef, nil, cloudEventSourceI.GetNamespace(), e.secretsLister) + authParams, podIdentity, err := resolver.ResolveAuthRefAndPodIdentity(ctx, e.client, e.log, spec.AuthenticationRef, nil, cloudEventSourceI.GetNamespace(), e.authClientSet) if err != nil { e.log.Error(err, "error resolving auth params", "cloudEventSource", cloudEventSourceI) return diff --git a/pkg/mock/mock_scaling/mock_executor/mock_interface.go b/pkg/mock/mock_scaling/mock_executor/mock_interface.go index eef9c5fc4bd..c4e2f1abf14 100644 --- a/pkg/mock/mock_scaling/mock_executor/mock_interface.go +++ b/pkg/mock/mock_scaling/mock_executor/mock_interface.go @@ -14,7 +14,7 @@ import ( reflect "reflect" v1alpha1 "github.com/kedacore/keda/v2/apis/keda/v1alpha1" - "github.com/kedacore/keda/v2/pkg/scaling/executor" + executor "github.com/kedacore/keda/v2/pkg/scaling/executor" gomock "go.uber.org/mock/gomock" ) diff --git a/pkg/scalers/authentication/authentication_helpers.go b/pkg/scalers/authentication/authentication_helpers.go index f3c4fa9c900..b3f4cd2fb70 100644 --- a/pkg/scalers/authentication/authentication_helpers.go +++ b/pkg/scalers/authentication/authentication_helpers.go @@ -13,10 +13,19 @@ import ( libs "github.com/dysnix/predictkube-libs/external/configs" "github.com/dysnix/predictkube-libs/external/http_transport" pConfig "github.com/prometheus/common/config" + authenticationv1client "k8s.io/client-go/kubernetes/typed/authentication/v1" + corev1client "k8s.io/client-go/kubernetes/typed/core/v1" + corev1listers "k8s.io/client-go/listers/core/v1" kedautil "github.com/kedacore/keda/v2/pkg/util" ) +type AuthClientSet struct { + authenticationv1client.TokenReviewInterface + corev1client.CoreV1Interface + corev1listers.SecretLister +} + const ( AuthModesKey = "authModes" ) diff --git a/pkg/scaling/resolver/scale_resolvers.go b/pkg/scaling/resolver/scale_resolvers.go index 54976381aee..f65f8369270 100644 --- a/pkg/scaling/resolver/scale_resolvers.go +++ b/pkg/scaling/resolver/scale_resolvers.go @@ -19,22 +19,31 @@ package resolver import ( "bytes" "context" + "encoding/base64" + "errors" "fmt" "strconv" "strings" + "time" + "github.com/aws/smithy-go/ptr" "github.com/go-logr/logr" + "github.com/golang-jwt/jwt/v5" appsv1 "k8s.io/api/apps/v1" + authenticationv1 "k8s.io/api/authentication/v1" corev1 "k8s.io/api/core/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured" "k8s.io/apimachinery/pkg/types" corev1listers "k8s.io/client-go/listers/core/v1" + "knative.dev/pkg/apis/duck" duckv1 "knative.dev/pkg/apis/duck/v1" "sigs.k8s.io/controller-runtime/pkg/client" logf "sigs.k8s.io/controller-runtime/pkg/log" kedav1alpha1 "github.com/kedacore/keda/v2/apis/keda/v1alpha1" + "github.com/kedacore/keda/v2/pkg/scalers/authentication" "github.com/kedacore/keda/v2/pkg/util" ) @@ -47,6 +56,19 @@ const ( defaultServiceAccount = "default" ) +type triggerAuthType interface { + SetAnnotations(map[string]string) + GetAnnotations() map[string]string +} + +type tokenStatus int + +const ( + tokenStatusValid tokenStatus = iota + tokenStatusInvalid + tokenStatusUnknown +) + var ( kedaNamespace, _ = util.GetClusterObjectNamespace() restrictSecretAccess = util.GetRestrictSecretAccess() @@ -178,9 +200,9 @@ func ResolveContainerEnv(ctx context.Context, client client.Client, logger logr. // ResolveAuthRefAndPodIdentity provides authentication parameters and pod identity needed authenticate scaler with the environment. func ResolveAuthRefAndPodIdentity(ctx context.Context, client client.Client, logger logr.Logger, triggerAuthRef *kedav1alpha1.AuthenticationRef, podTemplateSpec *corev1.PodTemplateSpec, - namespace string, secretsLister corev1listers.SecretLister) (map[string]string, kedav1alpha1.AuthPodIdentity, error) { + namespace string, authClientSet *authentication.AuthClientSet) (map[string]string, kedav1alpha1.AuthPodIdentity, error) { if podTemplateSpec != nil { - authParams, podIdentity, err := resolveAuthRef(ctx, client, logger, triggerAuthRef, &podTemplateSpec.Spec, namespace, secretsLister) + authParams, podIdentity, err := resolveAuthRef(ctx, client, logger, triggerAuthRef, &podTemplateSpec.Spec, namespace, authClientSet) if err != nil { return authParams, podIdentity, err @@ -220,14 +242,14 @@ func ResolveAuthRefAndPodIdentity(ctx context.Context, client client.Client, log return authParams, podIdentity, nil } - return resolveAuthRef(ctx, client, logger, triggerAuthRef, nil, namespace, secretsLister) + return resolveAuthRef(ctx, client, logger, triggerAuthRef, nil, namespace, authClientSet) } // resolveAuthRef provides authentication parameters needed authenticate scaler with the environment. // based on authentication method defined in TriggerAuthentication, authParams and podIdentity is returned func resolveAuthRef(ctx context.Context, client client.Client, logger logr.Logger, triggerAuthRef *kedav1alpha1.AuthenticationRef, podSpec *corev1.PodSpec, - namespace string, secretsLister corev1listers.SecretLister) (map[string]string, kedav1alpha1.AuthPodIdentity, error) { + namespace string, authClientSet *authentication.AuthClientSet) (map[string]string, kedav1alpha1.AuthPodIdentity, error) { result := make(map[string]string) podIdentity := kedav1alpha1.AuthPodIdentity{Provider: kedav1alpha1.PodIdentityProviderNone} var err error @@ -246,7 +268,7 @@ func resolveAuthRef(ctx context.Context, client client.Client, logger logr.Logge result[e.Parameter] = "" continue } - env, err := ResolveContainerEnv(ctx, client, logger, podSpec, e.ContainerName, namespace, secretsLister) + env, err := ResolveContainerEnv(ctx, client, logger, podSpec, e.ContainerName, namespace, authClientSet.SecretLister) if err != nil { result[e.Parameter] = "" } else { @@ -261,7 +283,7 @@ func resolveAuthRef(ctx context.Context, client client.Client, logger logr.Logge } if triggerAuthSpec.SecretTargetRef != nil { for _, e := range triggerAuthSpec.SecretTargetRef { - result[e.Parameter] = resolveAuthSecret(ctx, client, logger, e.Name, triggerNamespace, e.Key, secretsLister) + result[e.Parameter] = resolveAuthSecret(ctx, client, logger, e.Name, triggerNamespace, e.Key, authClientSet.SecretLister) } } if triggerAuthSpec.HashiCorpVault != nil && len(triggerAuthSpec.HashiCorpVault.Secrets) > 0 { @@ -287,7 +309,7 @@ func resolveAuthRef(ctx context.Context, client client.Client, logger logr.Logge } if triggerAuthSpec.AzureKeyVault != nil && len(triggerAuthSpec.AzureKeyVault.Secrets) > 0 { vaultHandler := NewAzureKeyVaultHandler(triggerAuthSpec.AzureKeyVault) - err := vaultHandler.Initialize(ctx, client, logger, triggerNamespace, secretsLister) + err := vaultHandler.Initialize(ctx, client, logger, triggerNamespace, authClientSet.SecretLister) if err != nil { logger.Error(err, "error authenticating to Azure Key Vault", "triggerAuthRef.Name", triggerAuthRef.Name) return result, podIdentity, err @@ -306,7 +328,7 @@ func resolveAuthRef(ctx context.Context, client client.Client, logger logr.Logge } if triggerAuthSpec.GCPSecretManager != nil && len(triggerAuthSpec.GCPSecretManager.Secrets) > 0 { secretManagerHandler := NewGCPSecretManagerHandler(triggerAuthSpec.GCPSecretManager) - err := secretManagerHandler.Initialize(ctx, client, logger, triggerNamespace, secretsLister) + err := secretManagerHandler.Initialize(ctx, client, logger, triggerNamespace, authClientSet.SecretLister) if err != nil { logger.Error(err, "error authenticating to GCP Secret Manager", "triggerAuthRef.Name", triggerAuthRef.Name) } else { @@ -327,7 +349,7 @@ func resolveAuthRef(ctx context.Context, client client.Client, logger logr.Logge } if triggerAuthSpec.AwsSecretManager != nil && len(triggerAuthSpec.AwsSecretManager.Secrets) > 0 { awsSecretManagerHandler := NewAwsSecretManagerHandler(triggerAuthSpec.AwsSecretManager) - err := awsSecretManagerHandler.Initialize(ctx, client, logger, triggerNamespace, secretsLister, podSpec) + err := awsSecretManagerHandler.Initialize(ctx, client, logger, triggerNamespace, authClientSet.SecretLister, podSpec) defer awsSecretManagerHandler.Stop() if err != nil { logger.Error(err, "error authenticating to Aws Secret Manager", "triggerAuthRef.Name", triggerAuthRef.Name) @@ -343,12 +365,36 @@ func resolveAuthRef(ctx context.Context, client client.Client, logger logr.Logge } } } + if triggerAuthSpec.BoundServiceAccountToken != nil { + for _, e := range triggerAuthSpec.BoundServiceAccountToken { + result[e.Parameter] = resolveBoundServiceAccountToken(ctx, client, logger, triggerNamespace, &e, triggerAuthRef, authClientSet) + } + } } } return result, podIdentity, err } +func getTriggerAuth(ctx context.Context, client client.Client, triggerAuthRef *kedav1alpha1.AuthenticationRef, namespace string) (triggerAuthType, string, error) { + if triggerAuthRef.Kind == "" || triggerAuthRef.Kind == "TriggerAuthentication" { + triggerAuth := &kedav1alpha1.TriggerAuthentication{} + err := client.Get(ctx, types.NamespacedName{Name: triggerAuthRef.Name, Namespace: namespace}, triggerAuth) + if err != nil { + return nil, "", err + } + return triggerAuth, namespace, nil + } else if triggerAuthRef.Kind == "ClusterTriggerAuthentication" { + triggerAuth := &kedav1alpha1.ClusterTriggerAuthentication{} + err := client.Get(ctx, types.NamespacedName{Name: triggerAuthRef.Name}, triggerAuth) + if err != nil { + return nil, "", err + } + return triggerAuth, kedaNamespace, nil + } + return nil, "", fmt.Errorf("unknown trigger auth kind %s", triggerAuthRef.Kind) +} + func getTriggerAuthSpec(ctx context.Context, client client.Client, triggerAuthRef *kedav1alpha1.AuthenticationRef, namespace string) (*kedav1alpha1.TriggerAuthenticationSpec, string, error) { if triggerAuthRef.Kind == "" || triggerAuthRef.Kind == "TriggerAuthentication" { triggerAuth := &kedav1alpha1.TriggerAuthentication{} @@ -597,6 +643,168 @@ func resolveAuthSecret(ctx context.Context, client client.Client, logger logr.Lo return string(result) } +func resolveBoundServiceAccountToken(ctx context.Context, client client.Client, logger logr.Logger, namespace string, bsat *kedav1alpha1.BoundServiceAccountToken, triggerAuthRef *kedav1alpha1.AuthenticationRef, acs *authentication.AuthClientSet) string { + serviceAccountName, expiry := bsat.ServiceAccountName, bsat.Expiry + if serviceAccountName == "" { + logger.Error(fmt.Errorf("error trying to get token"), "serviceAccountName is required") + return "" + } + var err error + var expirySeconds *int64 = ptr.Int64(3600) + if expiry != "" { + duration, err := time.ParseDuration(expiry) + if err != nil { + logger.Error(err, "error trying to parse expiry duration", "expiry", expiry) + return "" + } + // convert duration to seconds + expirySeconds = ptr.Int64(int64(duration.Seconds())) + } + + triggerAuth, _, err := getTriggerAuth(ctx, client, triggerAuthRef, namespace) + if err != nil { + logger.Error(err, "error trying to get [cluster]triggerAuth", "TriggerAuth.Namespace", namespace, "TriggerAuth.Name", triggerAuthRef.Name) + return "" + } + currentAnnotations := triggerAuth.GetAnnotations() + encodedToken := currentAnnotations["keda-serviceAccountToken"] + + // check if service account exists in the namespace + serviceAccount := &corev1.ServiceAccount{} + err = client.Get(ctx, types.NamespacedName{Name: serviceAccountName, Namespace: namespace}, serviceAccount) + if err != nil { + logger.Error(err, "error trying to get service account from namespace", "ServiceAccount.Namespace", namespace, "ServiceAccount.Name", serviceAccountName) + return "" + } + + // check if token is already referenced in the TriggerAuthentication annotation + if encodedToken != "" { + tokenValid := checkTokenValidity(ctx, logger, encodedToken, expirySeconds, acs) + if tokenValid == tokenStatusInvalid { + // token is invalid, or if more than 50% of the token's expiry has passed, create new token + return generateAndAnnotateNewToken(ctx, client, logger, serviceAccountName, namespace, expirySeconds, triggerAuth, acs) + } else if tokenValid == tokenStatusValid { + return encodedToken + } else { + return "" + } + } else { + // token doesn't exist; create new token and embed it in the the TriggerAuth + logger.Info("Token doesn't exist; creating new token and embedding it in the triggerauth", "ServiceAccount.Namespace", namespace, "ServiceAccount.Name", serviceAccountName) + return generateAndAnnotateNewToken(ctx, client, logger, serviceAccountName, namespace, expirySeconds, triggerAuth, acs) + } +} + +func generateAndAnnotateNewToken(ctx context.Context, client client.Client, logger logr.Logger, serviceAccountName, namespace string, expirySeconds *int64, triggerAuth triggerAuthType, acs *authentication.AuthClientSet) string { + newToken := generateToken(ctx, serviceAccountName, namespace, expirySeconds, acs) + // encode and embed the new token in the TriggerAuth + encodedToken := base64.StdEncoding.EncodeToString([]byte(newToken)) + currentAnnotations := triggerAuth.GetAnnotations() + currentAnnotations["keda-serviceAccountToken"] = encodedToken + triggerAuth.SetAnnotations(currentAnnotations) + switch underlyingTriggerAuth := triggerAuth.(type) { + case *kedav1alpha1.TriggerAuthentication: + err := client.Update(ctx, underlyingTriggerAuth) + if err != nil { + logger.Error(err, "error trying to update TriggerAuth", "TriggerAuth.Namespace", namespace, "TriggerAuth.Name", underlyingTriggerAuth.Name) + return "" + } + return newToken + case *kedav1alpha1.ClusterTriggerAuthentication: + err := client.Update(ctx, underlyingTriggerAuth) + if err != nil { + logger.Error(err, "error trying to update ClusterTriggerAuth", "ClusterTriggerAuth.Name", underlyingTriggerAuth.Name) + return "" + } + return newToken + } + return "" +} + +func checkTokenValidity(ctx context.Context, logger logr.Logger, encodedToken string, expiry *int64, acs *authentication.AuthClientSet) tokenStatus { + byteToken, err := base64.StdEncoding.DecodeString(encodedToken) + if err != nil { + logger.Error(err, "error trying to base64 decode token", "Token", encodedToken) + return tokenStatusInvalid + } + token := string(byteToken) + + // verify the token + tr := &authenticationv1.TokenReview{ + Spec: authenticationv1.TokenReviewSpec{ + Token: token, + }, + } + result, err := acs.TokenReviewInterface.Create(ctx, tr, metav1.CreateOptions{}) + if errors.Is(err, context.Canceled) || errors.Is(err, context.DeadlineExceeded) { + // try again + logger.Error(err, "retrying to verify token", "Token", token) + return tokenStatusUnknown + } + + if err != nil { + logger.Error(err, "error trying to verify token", "Token", token) + return tokenStatusInvalid + } + + if result.Status.User.Username == "" || (!result.Status.Authenticated) { + logger.Error(fmt.Errorf("error trying to verify token"), "token is invalid", "Token", token) + return tokenStatusInvalid + } + + // parse the token and check the expiry + jwtToken, _ := jwt.Parse(token, nil) + // err will always be non-nil since we can't verify the token without the public key, but we already verified using TokenReview API + // so only check if the jwt token is nil + if jwtToken == nil { + logger.Error(err, "jwt token parse resulted in an error", "token", token, "expiry", expiry) + return tokenStatusInvalid + } + + claims, ok := jwtToken.Claims.(jwt.MapClaims) + if !ok { + logger.Error(nil, "jwt token claims are invalid", "token", token, "expiry", expiry) + return tokenStatusInvalid + } + + exp, err := claims.GetExpirationTime() + if err != nil { + logger.Error(err, "error trying to parse expiry time from jwt token") + return tokenStatusInvalid + } + + // rotate if more than 50% of the token's expiry has passed + expSeconds := exp.Unix() + currentTime := time.Now().Unix() + timeLeft := expSeconds - currentTime + + if timeLeft < (*expiry / 2) { + logger.Info("Rotating token as more than 50% of the token's expiry has passed", "Expires at", exp, "Time left", timeLeft) + return tokenStatusInvalid + } + + return tokenStatusValid +} + +func generateToken(ctx context.Context, serviceAccountName, namespace string, expiry *int64, acs *authentication.AuthClientSet) string { + // create new token and embed it in the secret + token, err := acs.CoreV1Interface.ServiceAccounts(namespace).CreateToken( + ctx, + serviceAccountName, + &authenticationv1.TokenRequest{ + Spec: authenticationv1.TokenRequestSpec{ + ExpirationSeconds: expiry, // kubernetes prevents token expiry to be less than 10 minutes + }, + }, + metav1.CreateOptions{}, + ) + if err != nil { + log.Error(err, "error trying to create token for service account", "ServiceAccount.Name", serviceAccountName) + return "" + } + return token.Status.Token +} + // resolveServiceAccountAnnotation retrieves the value of a specific annotation // from the annotations of a given Kubernetes ServiceAccount. func resolveServiceAccountAnnotation(ctx context.Context, client client.Client, name, namespace, annotation string, required bool) (string, error) { diff --git a/pkg/scaling/resolver/scale_resolvers_test.go b/pkg/scaling/resolver/scale_resolvers_test.go index 13f3c92c07a..951b291d6fc 100644 --- a/pkg/scaling/resolver/scale_resolvers_test.go +++ b/pkg/scaling/resolver/scale_resolvers_test.go @@ -33,6 +33,7 @@ import ( kedav1alpha1 "github.com/kedacore/keda/v2/apis/keda/v1alpha1" mock_v1 "github.com/kedacore/keda/v2/pkg/mock/mock_secretlister" + "github.com/kedacore/keda/v2/pkg/scalers/authentication" ) var ( @@ -622,7 +623,12 @@ func TestResolveAuthRef(t *testing.T) { test.soar, test.podSpec, namespace, - secretsLister) + &authentication.AuthClientSet{ + SecretLister: secretsLister, + CoreV1Interface: nil, + TokenReviewInterface: nil, + }, + ) if err != nil && !test.isError { t.Errorf("Expected success because %s got error, %s", test.comment, err) diff --git a/pkg/scaling/scale_handler.go b/pkg/scaling/scale_handler.go index 5a955e48e66..d1bdc5d47ba 100644 --- a/pkg/scaling/scale_handler.go +++ b/pkg/scaling/scale_handler.go @@ -29,7 +29,6 @@ import ( corev1 "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/runtime" "k8s.io/apimachinery/pkg/types" - corev1listers "k8s.io/client-go/listers/core/v1" "k8s.io/client-go/scale" "k8s.io/client-go/tools/record" "k8s.io/metrics/pkg/apis/external_metrics" @@ -42,6 +41,7 @@ import ( "github.com/kedacore/keda/v2/pkg/fallback" "github.com/kedacore/keda/v2/pkg/metricscollector" "github.com/kedacore/keda/v2/pkg/scalers" + "github.com/kedacore/keda/v2/pkg/scalers/authentication" "github.com/kedacore/keda/v2/pkg/scalers/scalersconfig" "github.com/kedacore/keda/v2/pkg/scaling/cache" "github.com/kedacore/keda/v2/pkg/scaling/cache/metricscache" @@ -73,11 +73,11 @@ type scaleHandler struct { scalerCaches map[string]*cache.ScalersCache scalerCachesLock *sync.RWMutex scaledObjectsMetricCache metricscache.MetricsCache - secretsLister corev1listers.SecretLister + authClientSet *authentication.AuthClientSet } // NewScaleHandler creates a ScaleHandler object -func NewScaleHandler(client client.Client, scaleClient scale.ScalesGetter, reconcilerScheme *runtime.Scheme, globalHTTPTimeout time.Duration, recorder record.EventRecorder, secretsLister corev1listers.SecretLister) ScaleHandler { +func NewScaleHandler(client client.Client, scaleClient scale.ScalesGetter, reconcilerScheme *runtime.Scheme, globalHTTPTimeout time.Duration, recorder record.EventRecorder, authClientSet *authentication.AuthClientSet) ScaleHandler { return &scaleHandler{ client: client, scaleLoopContexts: &sync.Map{}, @@ -87,7 +87,7 @@ func NewScaleHandler(client client.Client, scaleClient scale.ScalesGetter, recon scalerCaches: map[string]*cache.ScalersCache{}, scalerCachesLock: &sync.RWMutex{}, scaledObjectsMetricCache: metricscache.NewMetricsCache(), - secretsLister: secretsLister, + authClientSet: authClientSet, } } diff --git a/pkg/scaling/scalers_builder.go b/pkg/scaling/scalers_builder.go index 1f4549c7ffa..4001361d6dd 100644 --- a/pkg/scaling/scalers_builder.go +++ b/pkg/scaling/scalers_builder.go @@ -48,7 +48,7 @@ func (h *scaleHandler) buildScalers(ctx context.Context, withTriggers *kedav1alp factory := func() (scalers.Scaler, *scalersconfig.ScalerConfig, error) { if podTemplateSpec != nil { - resolvedEnv, err = resolver.ResolveContainerEnv(ctx, h.client, logger, &podTemplateSpec.Spec, containerName, withTriggers.Namespace, h.secretsLister) + resolvedEnv, err = resolver.ResolveContainerEnv(ctx, h.client, logger, &podTemplateSpec.Spec, containerName, withTriggers.Namespace, h.authClientSet.SecretLister) if err != nil { return nil, nil, fmt.Errorf("error resolving secrets for ScaleTarget: %w", err) } @@ -69,7 +69,7 @@ func (h *scaleHandler) buildScalers(ctx context.Context, withTriggers *kedav1alp TriggerUniqueKey: fmt.Sprintf("%s-%s-%s-%d", withTriggers.Kind, withTriggers.Namespace, withTriggers.Name, triggerIndex), } - authParams, podIdentity, err := resolver.ResolveAuthRefAndPodIdentity(ctx, h.client, logger, trigger.AuthenticationRef, podTemplateSpec, withTriggers.Namespace, h.secretsLister) + authParams, podIdentity, err := resolver.ResolveAuthRefAndPodIdentity(ctx, h.client, logger, trigger.AuthenticationRef, podTemplateSpec, withTriggers.Namespace, h.authClientSet) switch podIdentity.Provider { case kedav1alpha1.PodIdentityProviderAwsEKS: // FIXME: Delete this for v3