-
Notifications
You must be signed in to change notification settings - Fork 1.1k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
feat: add scaler for temporal #4863
Conversation
Thank you for your contribution! 🙏 We will review your PR as soon as possible.
While you are waiting, make sure to:
Learn more about: |
cbec598
to
020c659
Compare
Can someone please review this? |
} | ||
|
||
// getQueueSize returns the queue size of open workflows. | ||
func (s *temporalWorkflowScaler) getQueueSize(ctx context.Context) (int64, error) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This does not get an accurate queue size (this paginates). You can use .CountWorkflow
but that's only for workflows, it doesn't help with activities (and often it's activities that are the reason for needing to scale).
The proper way to scale Temporal workers is to use the temporal_worker_task_slots_available
metric on the workers. See https://docs.temporal.io/dev-guide/worker-performance.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It would make more sense if we consider multiple activities within a single workflow and deploy workers for each activity. However, the current scaling mechanism relies on pending workflows rather than individual activities. I plan to review the SDK documentation to explore the possibility of integrating activities into the scaling process.
Notably, "temporal_worker_task_slots_available" serves as a Prometheus metric, which could potentially be employed alongside the Prometheus scaler for those interested in scaling based on this particular metric.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It would make more sense if we consider multiple activities within a single workflow and deploy workers for each activity.
I don't think it's a reasonable scaler if you don't consider activities. And I don't think the scaler is working that well if it's only for a single workflow type.
However, the current scaling mechanism relies on pending workflows rather than individual activities.
Pending activities matter too (maybe more). Even if you were only doing pending workflows, list open workflows is paginated, you are not getting full counts. Regardless, scaling a worker based on a single workflow is not the best way to write a scaler.
Notably, "temporal_worker_task_slots_available" serves as a Prometheus metric, which could potentially be employed alongside the Prometheus scaler for those interested in scaling based on this particular metric.
This is the metric that should be scaled on and is the one Temporal recommends scaling on (assuming you've configured individual worker resources properly based on your workflows/activites), see https://docs.temporal.io/dev-guide/worker-performance. The current scaler which doesn't include activities, only works for a single workflow type, etc is not sufficient IMO.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I have transitioned this process to a paginated approach. Unfortunately, I haven't discovered a method to integrate activity counts into the current setup. It seems that further research is necessary to explore potential solutions in this regard.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I have transitioned this process to a paginated approach.
You should just use CountWorkflow
, not list every workflow. But regardless, we do not have a way for you to easily get all pending activities from the server for a task queue. The scaler needs to use the slots metric per the worker performance doc. Using list/count is not the best way to write the scaler.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@cretz Can you please review the recent changes?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The idea of a target queue size and listing workflows is not the recommended approach to determining whether to scale up or down (ntm it'd be better to use count with a query checking whether running). We recommend using the temporal_worker_task_slots_available
metric (with check whether worker type is activity or workflow).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@cretz is one of the main contributors of temporal.io SDK repo.
I think that we should follow his recommendations at this point @Prajithp . Could you implement it? 🙏
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@JorTurFer I am unsure about the feasibility of achieving this through the CountWorkflow method, as it might not provide visibility into whether the activity is presently running or not. As recommended by him, individuals aiming to scale based on Prometheus data can make use of the query specified above.
Can we please leave this pull request open for a while? This would allow for the possibility of additional suggestions from others. In the meantime, we will maintain our own fork and deploy it in our production environment.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can we please leave this pull request open for a while?
Yeas sure, no problem at all
we will maintain our own fork and deploy it in our production environment
I'd suggest using external scaler/metrics api scaler instead of maintaining your own fork. I mean, KEDA can be extended using those scalers from a 3rd party service that you can develop with the code that you prefer. Using this approach instead of maintaining your own fork can brings you the option to upgrade KEDA without the hard effort of rebase it and adapt the code (as you have develop just a scaler this shouldn't be a drama, but extending is always better than modifying)
I had to abandon this as I did not have time to write the e2e tests #4721 |
@Prajithp can you please fix DCO issues and open a PR for docs please?
Also note the above so please bear with us ☝ |
Signed-off-by: Prajith P <[email protected]> Signed-off-by: Prajith P <[email protected]>
Signed-off-by: Prajith P <[email protected]> Signed-off-by: Prajith P <[email protected]>
Signed-off-by: Prajith P <[email protected]>
Signed-off-by: Prajith P <[email protected]>
fe2bdcf
to
673d9f1
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for this addition ❤️ and sorry for the slow review 😞 , summer is complicated :/
I have left some comments inline. Apart from them, we use vendor to have reproducible builds, so you have to execute:
go mod tidy
go mod vendor
This will add all the new deps to de vendor folder, please commit and push them too
HostPort: meta.endpoint, | ||
ConnectionOptions: sdk.ConnectionOptions{ | ||
DialOptions: []grpc.DialOption{ | ||
grpc.WithTimeout(time.Duration(temporalClientTimeOut) * time.Second), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
As gRPC is HTTP at the of the day, I think that we should use the environment variable that it's in config.GlobalHTTPTimeout
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Please, add the TLS options too. There is a helper you should use that unifies all TLS configs like minVersion or custom CAs.
fmt.Sprintf("temporal-%s-%s", meta.namespace, meta.workflowName), | ||
), | ||
) | ||
meta.scalerIndex = config.ScalerIndex |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think that this line it's not necessary because you are already generating the metrics name here (and it's the only reason to use the scalerIndex IIRC)
} | ||
|
||
// getQueueSize returns the queue size of open workflows. | ||
func (s *temporalWorkflowScaler) getQueueSize(ctx context.Context) (int64, error) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
TBH, I don't have any knowledge about temporal, so I can't give you any extra insight about the implementation.
@cretz , do you agree with the current implementation? As you (both) are the experts on temporal, I hope to get your consensus on the implementation
for { | ||
listOpenWorkflowExecutionsRequest := &workflowservice.ListOpenWorkflowExecutionsRequest{ | ||
Namespace: s.metadata.namespace, | ||
MaximumPageSize: 1000, | ||
NextPageToken: nextPageToken, | ||
Filters: &workflowservice.ListOpenWorkflowExecutionsRequest_TypeFilter{ | ||
TypeFilter: &tclfilter.WorkflowTypeFilter{ | ||
Name: s.metadata.workflowName, | ||
}, | ||
}, | ||
} | ||
ws, err := s.tcl.ListOpenWorkflow(ctx, listOpenWorkflowExecutionsRequest) | ||
if err != nil { | ||
return 0, fmt.Errorf("failed to get workflows: %w", err) | ||
} | ||
|
||
for _, exec := range ws.GetExecutions() { | ||
execution := executionInfo{ | ||
workflowId: exec.Execution.GetWorkflowId(), | ||
runId: exec.Execution.RunId, | ||
} | ||
executions = append(executions, execution) | ||
} | ||
|
||
if nextPageToken = ws.NextPageToken; len(nextPageToken) == 0 { | ||
break | ||
} | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm afraid about the performance impact of this. Could we face with any infinite (or almost infinite loop)? If the backend responds slowly and we have to browse idk, 50 pages, what will happen?
Is adding a limit for the pages doable? Maybe just with a parameter that users can modify under their own risk?
WDYT?
} | ||
|
||
// getQueueSize returns the queue size of open workflows. | ||
func (s *temporalWorkflowScaler) getQueueSize(ctx context.Context) (int64, error) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This PR's implementation and your implementation are really different: https://github.com/kedacore/keda/pull/4721/files#diff-f59fd700aa9c39c0f77d364730bcf70d05712e8841b100cc6b2d502f5224724bR183-R190
for _, execInfo := range executions { | ||
wg.Add(1) | ||
go func(e executionInfo) { | ||
defer wg.Done() | ||
|
||
workflowId := e.workflowId | ||
runId := e.runId | ||
|
||
if !s.isActivityRunning(ctx, workflowId, runId) { | ||
executionId := workflowId + "__" + runId | ||
pendingCh <- executionId | ||
} | ||
|
||
}(execInfo) | ||
} | ||
wg.Wait() | ||
close(pendingCh) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Same as above, if there are thousand of pending executions, What will happen?
Probably I'm wrong, but I understand that we are listing all the workflows and inside the workflows, we are checking all the executions to decide if it's running or not. This suggests me some questions:
- Are executed activities removed at any moment or will we have this queue growing and growing?
- Doesn't the workflow have any option to give that information during the first requests instead of having to navigate over all the activities?
} | ||
} | ||
|
||
func TestParseTemporalMetadata(t *testing.T) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Isn't this test duplicated?
spec: | ||
containers: | ||
- name: worker | ||
image: "prajithp/temporal-sample:1.0.0" |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Could you open a PR to this repo adding the image? We prefer to have all the used images in the org infra to prevent possible issues
spec: | ||
containers: | ||
- name: workerflow | ||
image: "prajithp/temporal-sample:1.0.0" |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
same as above
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for this addition ❤️ and sorry for the slow review 😞 , summer is complicated :/
I have left some comments inline. Apart from them, we use vendor to have reproducible builds, so you have to execute:
go mod tidy
go mod vendor
This will add all the new deps to de vendor folder, please commit and push them too
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@Prajithp any update please?
|
Any update here? |
@Prajithp any updates on this? If we don't hear back soon we'll have to close this PR |
@tomkerkhove, I think we should close this for now since @cretz doesn't seem to be happy with this approach. |
Yes, I am sorry but the current approach is not how we tell users to scale and has many limitations and drawbacks. We at Temporal are considering building and contributing this, but I am afraid I have no details yet. Even without dedicated server support (e.g. task queue backlog counts), this would need to mirror the approach at https://docs.temporal.io/dev-guide/worker-performance to be a reasonable one. |
@cretz thank you for the information. I am gonna close this PR. Feel free to reach out if you have a better proposal. |
#6191 please check the pr based on new apporach by temporal 1.25 Task Queue Statistics @cretz @zroubalik @JorTurFer |
Implement a temporal scaler
Checklist
Relates to #4724