Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

add scaler for temporal #6191

Open
wants to merge 28 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 11 commits
Commits
Show all changes
28 commits
Select commit Hold shift + click to select a range
f5d7f78
add scaler for temporal
Prajithp Sep 26, 2024
2cfaa31
add option to filter based on build ids
Prajithp Sep 27, 2024
6018463
use typed config
Prajithp Sep 27, 2024
984d1de
support apiKey authentication
Prajithp Sep 30, 2024
58d8990
use context
Prajithp Oct 1, 2024
6766405
Merge branch 'main' into temporal
Prajithp Oct 3, 2024
9aad76c
add MTLS auth option and some fixes
Prajithp Oct 4, 2024
5159eb6
update e2e test to use official image
Prajithp Oct 7, 2024
5c8b3e6
rename metadata variables
Prajithp Oct 13, 2024
e946a54
fix temporal server override command
Prajithp Oct 13, 2024
7390335
remove namespace from cli args
Prajithp Oct 14, 2024
66a373b
Update tests/scalers/temporal/temporal_test.go
Prajithp Oct 17, 2024
7681beb
Update tests/scalers/temporal/temporal_test.go
Prajithp Oct 17, 2024
d1aa803
Update tests/scalers/temporal/temporal_test.go
Prajithp Oct 17, 2024
4cdecfb
add MinConnectTimeout option
Prajithp Oct 17, 2024
ba2049a
add test case for worker versioning
Prajithp Oct 17, 2024
9e08d57
Merge branch 'main' into temporal
Prajithp Oct 18, 2024
ec360cd
Merge branch 'main' into temporal
Prajithp Oct 30, 2024
79af8dc
add modules to vendor
Prajithp Nov 1, 2024
6e403df
Update tests/scalers/temporal/temporal_test.go
Prajithp Nov 1, 2024
4e7f89b
Update tests/scalers/temporal/temporal_test.go
Prajithp Nov 1, 2024
b716fa2
refactoring e2e test
Prajithp Nov 9, 2024
5c06b4e
update vendor modules
Prajithp Nov 9, 2024
73dae53
fix tests
Prajithp Nov 11, 2024
9dd849d
fix ci
Prajithp Nov 11, 2024
6ccc697
fix ci
Prajithp Nov 11, 2024
f65e940
fix conflict
Prajithp Dec 5, 2024
abb7ccc
update CHANGELOG
Prajithp Dec 5, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
243 changes: 243 additions & 0 deletions pkg/scalers/temporal.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,243 @@
package scalers

import (
"context"
"fmt"
"log/slog"
"time"

"github.com/go-logr/logr"
"github.com/kedacore/keda/v2/pkg/scalers/scalersconfig"
kedautil "github.com/kedacore/keda/v2/pkg/util"
sdk "go.temporal.io/sdk/client"
sdklog "go.temporal.io/sdk/log"
"google.golang.org/grpc"
"google.golang.org/grpc/metadata"
v2 "k8s.io/api/autoscaling/v2"
"k8s.io/metrics/pkg/apis/external_metrics"
)

var (
temporalDefauleQueueTypes = []sdk.TaskQueueType{
sdk.TaskQueueTypeActivity,
sdk.TaskQueueTypeWorkflow,
sdk.TaskQueueTypeNexus,
}
)

type temporalScaler struct {
metricType v2.MetricTargetType
metadata *temporalMetadata
tcl sdk.Client
logger logr.Logger
}

type temporalMetadata struct {
Endpoint string `keda:"name=endpoint, order=triggerMetadata;resolvedEnv"`
Namespace string `keda:"name=namespace, order=triggerMetadata;resolvedEnv, default=default"`
ActivationTargetQueueSize int64 `keda:"name=activationTargetQueueSize, order=triggerMetadata, default=0"`
TargetQueueSize int64 `keda:"name=targetQueueSize, order=triggerMetadata, default=5"`
TaskQueue string `keda:"name=taskQueue, order=triggerMetadata;resolvedEnv"`
QueueTypes []string `keda:"name=queueTypes, order=triggerMetadata, optional"`
BuildID string `keda:"name=buildId, order=triggerMetadata;resolvedEnv, optional"`
AllActive bool `keda:"name=selectAllActive, order=triggerMetadata, default=true"`
Unversioned bool `keda:"name=selectUnversioned, order=triggerMetadata, default=true"`
APIKey string `keda:"name=apiKey, order=authParams;resolvedEnv;triggerMetadata, optional"`

UnsafeSsl bool `keda:"name=unsafeSsl, order=triggerMetadata, optional"`
Cert string `keda:"name=cert, order=authParams;resolvedEnv, optional"`

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it's more standard in Temporal deployments to have env vars that point to files for the key/cert rather than have them present in the environment directly. It would be good to have CertPath/KeyPath/CaPath or similar.

Copy link
Author

@Prajithp Prajithp Oct 13, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@robholland That's currently not possible with KEDA. KEDA can only read from environment variables or secrets. Ideally, in k8s, certificates and keys are stored in secrets, so this can be easily managed using TriggerAuthentication. @JorTurFer

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The file paths are normally pointed to certs that are mounted into the pods from secrets, so presumably users can just point at the secrets directly for KEDA use.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes it can be done with TriggerAuthentication.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, we must read them from Kubernetes API and not from file. This is because we can't restart KEDA to include new certs. Take into account that KEDA provides a self-service approach where admins can set up KEDA and users can deploy their own resources, so assuming that certs are part of the KEDA's filesystem isn't an option. If the SDK only support reading them from files, the scaler must pull the secret and store in a temporal file within the container

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Our mTLS client certificates and keys auto rotate, live on the filesystem, at least every hour. Our worker code handles the rotation without restarting.

I think any code using mTLS to connect to Temporal needs to be able to handle certificates and keys that are rotated frequently. Getting the certificates and keys from the filesystem seems natural and normal, if there is a way for KEDA to support that it would be good.

We use a sidecar to write/rotate the certificates and keys. But there are other Kubernetes systems for mTLS certificate management that also use filesystems and not Kubernetes Secret resources, like cert-manager csi-driver-spiffe.

Key string `keda:"name=key, order=authParams;resolvedEnv, optional"`
KeyPassword string `keda:"name=keyPassword, order=authParams;resolvedEnv, optional"`
CA string `keda:"name=ca, order=authParams;resolvedEnv, optional"`
Prajithp marked this conversation as resolved.
Show resolved Hide resolved

triggerIndex int
}

func (a *temporalMetadata) Validate() error {
if a.TargetQueueSize <= 0 {
return fmt.Errorf("targetQueueSize must be a positive number")
}
if a.ActivationTargetQueueSize < 0 {
return fmt.Errorf("activationTargetQueueSize must be a positive number")
}

if (a.Cert == "") != (a.Key == "") {
return fmt.Errorf("both cert and key must be provided when using TLS")
}

return nil
}

func NewTemporalScaler(ctx context.Context, config *scalersconfig.ScalerConfig) (Scaler, error) {
logger := InitializeLogger(config, "temporal_scaler")

metricType, err := GetMetricTargetType(config)
if err != nil {
return nil, fmt.Errorf("failed to get scaler metric type: %w", err)
}

meta, err := parseTemporalMetadata(config, logger)
if err != nil {
return nil, fmt.Errorf("failed to parse Temporal metadata: %w", err)
}

c, err := getTemporalClient(ctx, meta, logger)
if err != nil {
return nil, fmt.Errorf("failed to create Temporal client connection: %w", err)
}

return &temporalScaler{
metricType: metricType,
metadata: meta,
tcl: c,
logger: logger,
}, nil
}

func (s *temporalScaler) Close(_ context.Context) error {
if s.tcl != nil {
s.tcl.Close()
}
return nil
}

func (s *temporalScaler) GetMetricSpecForScaling(context.Context) []v2.MetricSpec {
metricName := kedautil.NormalizeString(fmt.Sprintf("temporal-%s-%s", s.metadata.Namespace, s.metadata.TaskQueue))
externalMetric := &v2.ExternalMetricSource{
Metric: v2.MetricIdentifier{
Name: GenerateMetricNameWithIndex(s.metadata.triggerIndex, metricName),
},
Target: GetMetricTarget(s.metricType, s.metadata.TargetQueueSize),
}

metricSpec := v2.MetricSpec{
External: externalMetric,
Type: externalMetricType,
}

return []v2.MetricSpec{metricSpec}
}

func (s *temporalScaler) GetMetricsAndActivity(ctx context.Context, metricName string) ([]external_metrics.ExternalMetricValue, bool, error) {
queueSize, err := s.getQueueSize(ctx)
if err != nil {
return nil, false, fmt.Errorf("failed to get Temporal queue size: %w", err)
}

metric := GenerateMetricInMili(metricName, float64(queueSize))

return []external_metrics.ExternalMetricValue{metric}, queueSize > s.metadata.ActivationTargetQueueSize, nil
}

func (s *temporalScaler) getQueueSize(ctx context.Context) (int64, error) {
var selection *sdk.TaskQueueVersionSelection
if s.metadata.AllActive || s.metadata.Unversioned || s.metadata.BuildID != "" {
selection = &sdk.TaskQueueVersionSelection{
AllActive: s.metadata.AllActive,
Unversioned: s.metadata.Unversioned,
BuildIDs: []string{s.metadata.BuildID},
}
}

queueType := getQueueTypes(s.metadata.QueueTypes)

resp, err := s.tcl.DescribeTaskQueueEnhanced(ctx, sdk.DescribeTaskQueueEnhancedOptions{
TaskQueue: s.metadata.TaskQueue,
ReportStats: true,
Versions: selection,
TaskQueueTypes: queueType,
})
if err != nil {
return 0, fmt.Errorf("failed to get Temporal queue size: %w", err)
}

return getCombinedBacklogCount(resp), nil
}

func getQueueTypes(queueTypes []string) []sdk.TaskQueueType {
var taskQueueTypes []sdk.TaskQueueType
for _, t := range queueTypes {
var taskQueueType sdk.TaskQueueType
switch t {
case "workflow":
taskQueueType = sdk.TaskQueueTypeWorkflow
case "activity":
taskQueueType = sdk.TaskQueueTypeActivity
case "nexus":
taskQueueType = sdk.TaskQueueTypeNexus
}
taskQueueTypes = append(taskQueueTypes, taskQueueType)
}

if len(taskQueueTypes) == 0 {
return temporalDefauleQueueTypes
}
return taskQueueTypes
}

func getCombinedBacklogCount(description sdk.TaskQueueDescription) int64 {
var count int64
for _, versionInfo := range description.VersionsInfo {
for _, typeInfo := range versionInfo.TypesInfo {
if typeInfo.Stats != nil {
count += typeInfo.Stats.ApproximateBacklogCount
}
}
}
return count
}

func getTemporalClient(ctx context.Context, meta *temporalMetadata, log logr.Logger) (sdk.Client, error) {
logHandler := logr.ToSlogHandler(log)
options := sdk.Options{
HostPort: meta.Endpoint,
Namespace: meta.Namespace,
Logger: sdklog.NewStructuredLogger(slog.New(logHandler)),
}

dialOptions := []grpc.DialOption{
grpc.WithConnectParams(grpc.ConnectParams{
MinConnectTimeout: 5 * time.Second,
Prajithp marked this conversation as resolved.
Show resolved Hide resolved
}),
}

if meta.APIKey != "" {
dialOptions = append(dialOptions, grpc.WithUnaryInterceptor(
func(ctx context.Context, method string, req any, reply any,
cc *grpc.ClientConn, invoker grpc.UnaryInvoker, opts ...grpc.CallOption) error {
return invoker(
metadata.AppendToOutgoingContext(ctx, "temporal-namespace", meta.Namespace),
method,
req,
reply,
cc,
opts...,
)
},
))
options.Credentials = sdk.NewAPIKeyStaticCredentials(meta.APIKey)
}

options.ConnectionOptions = sdk.ConnectionOptions{
DialOptions: dialOptions,
}

if meta.Cert != "" && meta.Key != "" {
tlsConfig, err := kedautil.NewTLSConfigWithPassword(meta.Cert, meta.Key, meta.KeyPassword, meta.CA, meta.UnsafeSsl)
if err != nil {
return nil, err
}
options.ConnectionOptions.TLS = tlsConfig
}

return sdk.DialContext(ctx, options)
}

func parseTemporalMetadata(config *scalersconfig.ScalerConfig, _ logr.Logger) (*temporalMetadata, error) {
Prajithp marked this conversation as resolved.
Show resolved Hide resolved
meta := &temporalMetadata{triggerIndex: config.TriggerIndex}
Prajithp marked this conversation as resolved.
Show resolved Hide resolved
if err := config.TypedConfig(meta); err != nil {
return meta, fmt.Errorf("error parsing temporal metadata: %w", err)
}

return meta, nil
}
Loading
Loading