Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

WorkflowTaskTimedOut if submiting a large number of activiteis within one workflow #6806

Open
mikellxy opened this issue Nov 13, 2024 · 2 comments

Comments

@mikellxy
Copy link

hi, I apologize for using an issue to seek help.

Expected Behavior

In my workflow worker, firstly I start a timer to wait for the biz data to be ready. Then 1,000 activies are submitted, i expected that all these activities can be scheduled and run asynchronously after the submission. Then I wait for the results of the 1,000 Future object like a wait group model.

This is a simplified version of my code:

func (w *Worker) Start(wfCtx workflow.Context, req *Req) (result string, err error) {
	if err := workflow.Sleep(wfCtx, 30*time.Second); err != nil {
		return err.Error(), err
	}
        // submit 1000 activities
        var futures []workflow.Future
	for i:=0;i<1000;i++ {
                 f := workflow.ExecuteActivity(wfCtx, w.Act, req, i)
	         futures = append(futures, f)
	}
        //  Wait for all tasks to run successfully
	for _, f := range futures {
		res := new(Result)
		err := f.Get(wfCtx, res)
		if err != nil {
			return err.Error(), err
		}
		if res.KnownErr != "" {
			logger.Info("known error: %v, workflow can return directly")
			return res.KnownErr, nil
		}
	}
}

the activity options as follow:

func (w *Worker) GetActivityContext(wfCtx workflow.Context) workflow.Context {
	return workflow.WithActivityOptions(wfCtx, workflow.ActivityOptions{
		ScheduleToCloseTimeout: time.Hour,
		StartToCloseTimeout:    time.Hour,
		WaitForCancellation:    false,
		RetryPolicy: &temporal.RetryPolicy{
			MaximumAttempts:        20,         
			NonRetryableErrorTypes: []string{}, 
		},
	})
}

the workflow options:

"start_workflow_options": {
            "workflow_run_timeout": "1d",
            "workflow_task_timeout": "90m",
            "retry_policy": {
              "initial_interval": "1s",
              "backoff_coefficient": 2.0,
              "maximum_interval": "1h",
              "maximum_attempts": 3
            }
          }

the worker options:

"options": {
            "max_concurrent_activity_execution_size": 10
          }

Actual Behavior

It reports WorkflowTaskTimeOut, and none of the activities was scheduled.
The timeline is as follow:

  1. WorkflowExecutionStarted
  2. WorkflowTaskScheduled
  3. WorkflowTaskStarted
  4. WorkflowTaskCompleted
  5. TimerStarted
  6. TimerFired
  7. WorkflowTaskScheduled (happend at time T)
  8. WorkflowTaskStarted
  9. WorkflowTaskTimedOut (happend at time T+2 minutes, timeout Type was StartToClose. but my activities' StartToCloseTimeout option is one hour)

p.s. if i reduce the number of activities to 100, all the activities run successfully.

Steps to Reproduce the Problem

Specifications

  • Version:
  • Platform:
@bergundy
Copy link
Member

There's a hard coded 4 MB limit for a gRPC response. If the accumulated size of the schedule activity commands goes above that limit the RespondWorkflowTaskCompleted request will be rejected and it will manifest as a timeout.

This is a known issue without a good mitigation today.
Please confirm this theory, there may be some logs emitted by the Go SDK.
As a workaround, you can add a sleep after every 100 activities scheduled.

@drewhoskins-temporal
Copy link
Contributor

There's a feature request open for a possible fix to this issue: temporalio/features#363

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

No branches or pull requests

3 participants