Skip to content

Commit

Permalink
fix: two paths for retry
Browse files Browse the repository at this point in the history
  • Loading branch information
grutt committed Feb 5, 2025
1 parent cdc0e45 commit 049df19
Show file tree
Hide file tree
Showing 2 changed files with 172 additions and 18 deletions.
95 changes: 86 additions & 9 deletions pkg/repository/prisma/dbsqlc/queue.sql
Original file line number Diff line number Diff line change
Expand Up @@ -588,7 +588,9 @@ WITH retries AS (
s."id" AS "stepId",
s."timeout" AS "stepTimeout",
s."scheduleTimeout" AS "scheduleTimeout",
wr."id" AS "workflowRunId"
sr."jobRunId",
wr."id" AS "workflowRunId",
wr."concurrencyGroupId" AS "workflowConcurrencyGroupId"
FROM
retries
JOIN
Expand All @@ -601,23 +603,98 @@ WITH retries AS (
"WorkflowRun" wr ON jr."workflowRunId" = wr."id"
WHERE
sr."status" NOT IN ('SUCCEEDED', 'FAILED', 'CANCELLED')
), updated_step_runs AS (
),
srs_with_concurrency AS (
SELECT
srs.*,
srs."workflowConcurrencyGroupId"
FROM
srs
WHERE
srs."workflowConcurrencyGroupId" IS NOT NULL
),
srs_without_concurrency AS (
SELECT
srs.*,
NULL AS "workflowConcurrencyGroupId"
FROM
srs
WHERE
srs."workflowConcurrencyGroupId" IS NULL
),
updated_step_runs_with_concurrency AS (
UPDATE "StepRun" sr
SET
"scheduleTimeoutAt" = CURRENT_TIMESTAMP + COALESCE(convert_duration_to_interval(srs."scheduleTimeout"), INTERVAL '5 minutes'),
"scheduleTimeoutAt" = CURRENT_TIMESTAMP + COALESCE(convert_duration_to_interval(srs_with_concurrency."scheduleTimeout"), INTERVAL '5 minutes'),
"updatedAt" = CURRENT_TIMESTAMP,
"retryCount" = srs."retryCount" + 1
FROM srs
WHERE sr."id" = srs."id"
"retryCount" = srs_with_concurrency."retryCount" + 1
FROM srs_with_concurrency
WHERE sr."id" = srs_with_concurrency."id"
RETURNING sr."id"
), updated_workflow_runs AS (
),
updated_workflow_runs_with_concurrency AS (
UPDATE "WorkflowRun" wr
SET
"status" = 'QUEUED',
"updatedAt" = CURRENT_TIMESTAMP
FROM srs
WHERE wr."id" = srs."workflowRunId"
FROM srs_with_concurrency
WHERE wr."id" = srs_with_concurrency."workflowRunId"
RETURNING wr."id"
),
updated_step_runs_without_concurrency AS (
UPDATE "StepRun" sr
SET
"status" = 'PENDING_ASSIGNMENT',
"scheduleTimeoutAt" = CURRENT_TIMESTAMP + COALESCE(convert_duration_to_interval(srs_without_concurrency."scheduleTimeout"), INTERVAL '5 minutes'),
"updatedAt" = CURRENT_TIMESTAMP,
"retryCount" = srs_without_concurrency."retryCount" + 1
FROM srs_without_concurrency
WHERE sr."id" = srs_without_concurrency."id"
RETURNING sr."id"
),
updated_workflow_runs_without_concurrency AS (
UPDATE "WorkflowRun" wr
SET
"status" = 'RUNNING',
"updatedAt" = CURRENT_TIMESTAMP
FROM srs_without_concurrency
WHERE wr."id" = srs_without_concurrency."workflowRunId"
RETURNING wr."id"
),
update_job_runs_without_concurrency AS (
UPDATE "JobRun" jr
SET
"status" = 'RUNNING',
"updatedAt" = CURRENT_TIMESTAMP
FROM srs_without_concurrency
WHERE jr."id" = srs_without_concurrency."jobRunId"
RETURNING jr."id"
), inserted_sqs_without_concurrency AS (
INSERT INTO "QueueItem" (
"stepRunId",
"stepId",
"actionId",
"scheduleTimeoutAt",
"stepTimeout",
"priority",
"isQueued",
"tenantId",
"queue"
)
SELECT
srs_without_concurrency."id",
srs_without_concurrency."stepId",
srs_without_concurrency."actionId",
CURRENT_TIMESTAMP + COALESCE(convert_duration_to_interval(srs_without_concurrency."scheduleTimeout"), INTERVAL '5 minutes'),
srs_without_concurrency."stepTimeout",
-- Queue with priority 4 so that retry gets highest priority
4,
true,
srs_without_concurrency."tenantId",
srs_without_concurrency."actionId"
FROM
srs_without_concurrency
RETURNING "stepRunId"
)
SELECT * FROM retries;

Expand Down
95 changes: 86 additions & 9 deletions pkg/repository/prisma/dbsqlc/queue.sql.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

0 comments on commit 049df19

Please sign in to comment.