-
Notifications
You must be signed in to change notification settings - Fork 1.1k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
feat: add scaler for temporal #4863
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,269 @@ | ||
package scalers | ||
|
||
import ( | ||
"context" | ||
"errors" | ||
"fmt" | ||
"strconv" | ||
"strings" | ||
"sync" | ||
"time" | ||
|
||
"github.com/go-logr/logr" | ||
kedautil "github.com/kedacore/keda/v2/pkg/util" | ||
tclfilter "go.temporal.io/api/filter/v1" | ||
workflowservice "go.temporal.io/api/workflowservice/v1" | ||
sdk "go.temporal.io/sdk/client" | ||
"google.golang.org/grpc" | ||
v2 "k8s.io/api/autoscaling/v2" | ||
"k8s.io/metrics/pkg/apis/external_metrics" | ||
) | ||
|
||
const ( | ||
defaultTargetWorkflowLength = 5 | ||
defaultActivationTargetWorkflowLength = 0 | ||
temporalClientTimeOut = 30 | ||
) | ||
|
||
type executionInfo struct { | ||
workflowId string | ||
runId string | ||
} | ||
|
||
type temporalWorkflowScaler struct { | ||
metricType v2.MetricTargetType | ||
metadata *temporalWorkflowMetadata | ||
tcl sdk.Client | ||
logger logr.Logger | ||
} | ||
|
||
type temporalWorkflowMetadata struct { | ||
activationTargetWorkflowLength int64 | ||
endpoint string | ||
namespace string | ||
workflowName string | ||
activities []string | ||
scalerIndex int | ||
targetQueueSize int64 | ||
metricName string | ||
} | ||
|
||
// NewTemporalWorkflowScaler creates a new instance of temporalWorkflowScaler. | ||
func NewTemporalWorkflowScaler(config *ScalerConfig) (Scaler, error) { | ||
metricType, err := GetMetricTargetType(config) | ||
if err != nil { | ||
return nil, fmt.Errorf("failed to get scaler metric type: %w", err) | ||
} | ||
|
||
meta, err := parseTemporalMetadata(config) | ||
if err != nil { | ||
return nil, fmt.Errorf("failed to parse Temporal metadata: %w", err) | ||
} | ||
|
||
logger := InitializeLogger(config, "temporal_workflow_scaler") | ||
|
||
c, err := sdk.Dial(sdk.Options{ | ||
HostPort: meta.endpoint, | ||
ConnectionOptions: sdk.ConnectionOptions{ | ||
DialOptions: []grpc.DialOption{ | ||
grpc.WithTimeout(time.Duration(temporalClientTimeOut) * time.Second), | ||
}, | ||
}, | ||
}) | ||
|
||
if err != nil { | ||
return nil, fmt.Errorf("failed to create Temporal client: %w", err) | ||
} | ||
|
||
return &temporalWorkflowScaler{ | ||
metricType: metricType, | ||
metadata: meta, | ||
tcl: c, | ||
logger: logger, | ||
}, nil | ||
} | ||
|
||
// Close closes the Temporal client connection. | ||
func (s *temporalWorkflowScaler) Close(context.Context) error { | ||
if s.tcl != nil { | ||
s.tcl.Close() | ||
} | ||
return nil | ||
} | ||
|
||
// GetMetricSpecForScaling returns the metric specification for scaling. | ||
func (s *temporalWorkflowScaler) GetMetricSpecForScaling(context.Context) []v2.MetricSpec { | ||
externalMetric := &v2.ExternalMetricSource{ | ||
Metric: v2.MetricIdentifier{ | ||
Name: s.metadata.metricName, | ||
}, | ||
Target: GetMetricTarget(s.metricType, s.metadata.targetQueueSize), | ||
} | ||
metricSpec := v2.MetricSpec{ | ||
External: externalMetric, | ||
Type: externalMetricType, | ||
} | ||
return []v2.MetricSpec{metricSpec} | ||
} | ||
|
||
// GetMetricsAndActivity returns metrics and activity for the scaler. | ||
func (s *temporalWorkflowScaler) GetMetricsAndActivity(ctx context.Context, metricName string) ([]external_metrics.ExternalMetricValue, bool, error) { | ||
queueSize, err := s.getQueueSize(ctx) | ||
if err != nil { | ||
return nil, false, fmt.Errorf("failed to get Temporal queue size: %w", err) | ||
} | ||
|
||
metric := GenerateMetricInMili(metricName, float64(queueSize)) | ||
|
||
return []external_metrics.ExternalMetricValue{metric}, queueSize > s.metadata.activationTargetWorkflowLength, nil | ||
} | ||
|
||
// getQueueSize returns the queue size of open workflows. | ||
func (s *temporalWorkflowScaler) getQueueSize(ctx context.Context) (int64, error) { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This does not get an accurate queue size (this paginates). You can use The proper way to scale Temporal workers is to use the There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. It would make more sense if we consider multiple activities within a single workflow and deploy workers for each activity. However, the current scaling mechanism relies on pending workflows rather than individual activities. I plan to review the SDK documentation to explore the possibility of integrating activities into the scaling process. Notably, "temporal_worker_task_slots_available" serves as a Prometheus metric, which could potentially be employed alongside the Prometheus scaler for those interested in scaling based on this particular metric. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
I don't think it's a reasonable scaler if you don't consider activities. And I don't think the scaler is working that well if it's only for a single workflow type.
Pending activities matter too (maybe more). Even if you were only doing pending workflows, list open workflows is paginated, you are not getting full counts. Regardless, scaling a worker based on a single workflow is not the best way to write a scaler.
This is the metric that should be scaled on and is the one Temporal recommends scaling on (assuming you've configured individual worker resources properly based on your workflows/activites), see https://docs.temporal.io/dev-guide/worker-performance. The current scaler which doesn't include activities, only works for a single workflow type, etc is not sufficient IMO. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I have transitioned this process to a paginated approach. Unfortunately, I haven't discovered a method to integrate activity counts into the current setup. It seems that further research is necessary to explore potential solutions in this regard. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
You should just use There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. @cretz Can you please review the recent changes? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. The idea of a target queue size and listing workflows is not the recommended approach to determining whether to scale up or down (ntm it'd be better to use count with a query checking whether running). We recommend using the There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. @cretz is one of the main contributors of temporal.io SDK repo. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. @JorTurFer I am unsure about the feasibility of achieving this through the CountWorkflow method, as it might not provide visibility into whether the activity is presently running or not. As recommended by him, individuals aiming to scale based on Prometheus data can make use of the query specified above. Can we please leave this pull request open for a while? This would allow for the possibility of additional suggestions from others. In the meantime, we will maintain our own fork and deploy it in our production environment. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
Yeas sure, no problem at all
I'd suggest using external scaler/metrics api scaler instead of maintaining your own fork. I mean, KEDA can be extended using those scalers from a 3rd party service that you can develop with the code that you prefer. Using this approach instead of maintaining your own fork can brings you the option to upgrade KEDA without the hard effort of rebase it and adapt the code (as you have develop just a scaler this shouldn't be a drama, but extending is always better than modifying) |
||
|
||
var executions []executionInfo | ||
var nextPageToken []byte | ||
for { | ||
listOpenWorkflowExecutionsRequest := &workflowservice.ListOpenWorkflowExecutionsRequest{ | ||
Namespace: s.metadata.namespace, | ||
MaximumPageSize: 1000, | ||
NextPageToken: nextPageToken, | ||
Filters: &workflowservice.ListOpenWorkflowExecutionsRequest_TypeFilter{ | ||
TypeFilter: &tclfilter.WorkflowTypeFilter{ | ||
Name: s.metadata.workflowName, | ||
}, | ||
}, | ||
} | ||
ws, err := s.tcl.ListOpenWorkflow(ctx, listOpenWorkflowExecutionsRequest) | ||
if err != nil { | ||
return 0, fmt.Errorf("failed to get workflows: %w", err) | ||
} | ||
|
||
for _, exec := range ws.GetExecutions() { | ||
execution := executionInfo{ | ||
workflowId: exec.Execution.GetWorkflowId(), | ||
runId: exec.Execution.RunId, | ||
} | ||
executions = append(executions, execution) | ||
} | ||
|
||
if nextPageToken = ws.NextPageToken; len(nextPageToken) == 0 { | ||
break | ||
} | ||
} | ||
Comment on lines
+126
to
+153
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I'm afraid about the performance impact of this. Could we face with any infinite (or almost infinite loop)? If the backend responds slowly and we have to browse idk, 50 pages, what will happen? |
||
|
||
pendingCh := make(chan string, len(executions)) | ||
var wg sync.WaitGroup | ||
|
||
for _, execInfo := range executions { | ||
wg.Add(1) | ||
go func(e executionInfo) { | ||
defer wg.Done() | ||
|
||
workflowId := e.workflowId | ||
runId := e.runId | ||
|
||
if !s.isActivityRunning(ctx, workflowId, runId) { | ||
executionId := workflowId + "__" + runId | ||
pendingCh <- executionId | ||
} | ||
|
||
}(execInfo) | ||
} | ||
wg.Wait() | ||
close(pendingCh) | ||
Comment on lines
+158
to
+174
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Same as above, if there are thousand of pending executions, What will happen?
|
||
|
||
var queueLength int64 | ||
for range pendingCh { | ||
queueLength++ | ||
} | ||
return queueLength, nil | ||
} | ||
|
||
// isActivityRunning checks if there are running activities associated with a specific workflow execution. | ||
func (s *temporalWorkflowScaler) isActivityRunning(ctx context.Context, workflowId, runId string) bool { | ||
resp, err := s.tcl.DescribeWorkflowExecution(ctx, workflowId, runId) | ||
if err != nil { | ||
s.logger.Error(err, "error describing workflow execution", "workflowId", workflowId, "runId", runId) | ||
return false | ||
} | ||
|
||
// If there is no activityName and there are running activities, return true. | ||
if len(s.metadata.activities) == 0 && len(resp.GetPendingActivities()) > 0 { | ||
return true | ||
} | ||
|
||
// Store the IDs of running activities. Make sure no duplicates incase of anything. | ||
runningActivities := make(map[string]struct{}) | ||
for _, pendingActivity := range resp.GetPendingActivities() { | ||
activityName := pendingActivity.ActivityType.GetName() | ||
if s.hasMatchingActivityName(activityName) { | ||
runningActivities[pendingActivity.ActivityId] = struct{}{} | ||
} | ||
} | ||
|
||
// Return true if there are any running activities, otherwise false. | ||
return len(runningActivities) > 0 | ||
} | ||
|
||
// hasMatchingActivityName checks if the provided activity name matches any of the defined activity names in the metadata. | ||
func (s *temporalWorkflowScaler) hasMatchingActivityName(activityName string) bool { | ||
for _, activity := range s.metadata.activities { | ||
if activityName == activity { | ||
return true | ||
} | ||
} | ||
return false | ||
} | ||
|
||
// parseTemporalMetadata parses the Temporal metadata from the ScalerConfig. | ||
func parseTemporalMetadata(config *ScalerConfig) (*temporalWorkflowMetadata, error) { | ||
meta := &temporalWorkflowMetadata{} | ||
meta.activationTargetWorkflowLength = defaultActivationTargetWorkflowLength | ||
meta.targetQueueSize = defaultTargetWorkflowLength | ||
|
||
if config.TriggerMetadata["endpoint"] == "" { | ||
return nil, errors.New("no Temporal gRPC endpoint provided") | ||
} | ||
meta.endpoint = config.TriggerMetadata["endpoint"] | ||
|
||
if config.TriggerMetadata["namespace"] == "" { | ||
meta.namespace = "default" | ||
} else { | ||
meta.namespace = config.TriggerMetadata["namespace"] | ||
} | ||
|
||
if config.TriggerMetadata["workflowName"] == "" { | ||
return nil, errors.New("no workflow name provided") | ||
} | ||
meta.workflowName = config.TriggerMetadata["workflowName"] | ||
|
||
if activities := config.TriggerMetadata["activityName"]; activities != "" { | ||
meta.activities = strings.Split(activities, ",") | ||
} | ||
|
||
if size, ok := config.TriggerMetadata["targetQueueSize"]; ok { | ||
queueSize, err := strconv.ParseInt(size, 10, 64) | ||
if err != nil { | ||
return nil, fmt.Errorf("invalid targetQueueSize - must be an integer") | ||
} | ||
meta.targetQueueSize = queueSize | ||
} | ||
|
||
if size, ok := config.TriggerMetadata["activationTargetQueueSize"]; ok { | ||
activationTargetQueueSize, err := strconv.ParseInt(size, 10, 64) | ||
if err != nil { | ||
return nil, fmt.Errorf("invalid activationTargetQueueSize - must be an integer") | ||
} | ||
meta.activationTargetWorkflowLength = activationTargetQueueSize | ||
} | ||
|
||
meta.metricName = GenerateMetricNameWithIndex( | ||
config.ScalerIndex, kedautil.NormalizeString( | ||
fmt.Sprintf("temporal-%s-%s", meta.namespace, meta.workflowName), | ||
), | ||
) | ||
meta.scalerIndex = config.ScalerIndex | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I think that this line it's not necessary because you are already generating the metrics name here (and it's the only reason to use the scalerIndex IIRC) |
||
|
||
return meta, nil | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
As gRPC is HTTP at the of the day, I think that we should use the environment variable that it's in
config.GlobalHTTPTimeout
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Please, add the TLS options too. There is a helper you should use that unifies all TLS configs like minVersion or custom CAs.