Skip to content

Commit

Permalink
fix: extend timeout and make pop query pseudo-random
Browse files Browse the repository at this point in the history
  • Loading branch information
abelanger5 committed Jan 21, 2025
1 parent 602cb84 commit b9d7f45
Show file tree
Hide file tree
Showing 4 changed files with 8 additions and 6 deletions.
2 changes: 1 addition & 1 deletion internal/services/controllers/workflows/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -164,7 +164,7 @@ func New(fs ...WorkflowsControllerOpt) (*WorkflowsControllerImpl, error) {

w.processWorkflowEventsOps = queueutils.NewOperationPool(w.l, time.Second*5, "process workflow events", w.processWorkflowEvents)
w.unpausedWorkflowRunsOps = queueutils.NewOperationPool(w.l, time.Second*5, "unpause workflow runs", w.unpauseWorkflowRuns)
w.bumpQueueOps = queueutils.NewOperationPool(w.l, time.Second*30, "bump queue", w.runPollActiveQueuesTenant)
w.bumpQueueOps = queueutils.NewOperationPool(w.l, time.Second*5, "bump queue", w.runPollActiveQueuesTenant)

return w, nil
}
Expand Down
5 changes: 3 additions & 2 deletions pkg/repository/prisma/dbsqlc/workflow_runs.sql
Original file line number Diff line number Diff line change
Expand Up @@ -297,8 +297,9 @@ WITH workflow_runs AS (
r2.id,
r2."status",
r2."concurrencyGroupId",
row_number() OVER (PARTITION BY r2."concurrencyGroupId" ORDER BY r2."createdAt", r2."insertOrder", r2.id) AS "rn",
row_number() OVER (ORDER BY r2."createdAt", r2."insertOrder", r2.id) AS "seqnum"
row_number() OVER (PARTITION BY r2."concurrencyGroupId" ORDER BY r2."createdAt", r2.id) AS "rn",
-- we order by r2.id as a second parameter to get a pseudo-random, stable order
row_number() OVER (ORDER BY r2."createdAt", r2.id) AS "seqnum"
FROM
"WorkflowRun" r2
LEFT JOIN
Expand Down
5 changes: 3 additions & 2 deletions pkg/repository/prisma/dbsqlc/workflow_runs.sql.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion pkg/repository/prisma/workflow_run.go
Original file line number Diff line number Diff line change
Expand Up @@ -1055,7 +1055,7 @@ func (w *workflowRunEngineRepository) PopWorkflowRunsCancelNewest(ctx context.Co

func (w *workflowRunEngineRepository) PopWorkflowRunsRoundRobin(ctx context.Context, tenantId string, workflowVersionId string, maxRuns int) ([]*dbsqlc.WorkflowRun, []*dbsqlc.GetStepRunForEngineRow, error) {

tx, commit, rollback, err := sqlchelpers.PrepareTx(ctx, w.pool, w.l, 25000)
tx, commit, rollback, err := sqlchelpers.PrepareTx(ctx, w.pool, w.l, 5000)

if err != nil {
return nil, nil, err
Expand Down

0 comments on commit b9d7f45

Please sign in to comment.