Skip to content

Commit

Permalink
wip: backoff state
Browse files Browse the repository at this point in the history
  • Loading branch information
grutt committed Jan 27, 2025
1 parent b53e8bd commit d821a7b
Show file tree
Hide file tree
Showing 22 changed files with 389 additions and 220 deletions.
3 changes: 3 additions & 0 deletions api-contracts/openapi/components/schemas/workflow_run.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -321,6 +321,7 @@ StepRunStatus:
- FAILED
- CANCELLED
- CANCELLING
- BACKOFF

JobRunStatus:
type: string
Expand All @@ -330,6 +331,7 @@ JobRunStatus:
- SUCCEEDED
- FAILED
- CANCELLED
- BACKOFF

WorkflowRunStatus:
type: string
Expand All @@ -340,6 +342,7 @@ WorkflowRunStatus:
- FAILED
- CANCELLED
- QUEUED
- BACKOFF

ScheduledRunStatus:
type: string
Expand Down
400 changes: 201 additions & 199 deletions api/v1/server/oas/gen/openapi.gen.go

Large diffs are not rendered by default.

2 changes: 2 additions & 0 deletions frontend/app/src/lib/api/generated/data-contracts.ts
Original file line number Diff line number Diff line change
Expand Up @@ -493,6 +493,7 @@ export enum WorkflowRunStatus {
FAILED = 'FAILED',
CANCELLED = 'CANCELLED',
QUEUED = 'QUEUED',
BACKOFF = 'BACKOFF',
}

export type WorkflowRunStatusList = WorkflowRunStatus[];
Expand Down Expand Up @@ -930,6 +931,7 @@ export enum StepRunStatus {
FAILED = 'FAILED',
CANCELLED = 'CANCELLED',
CANCELLING = 'CANCELLING',
BACKOFF = 'BACKOFF',
}

export interface StepRun {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,10 @@ const StepRunOutputCancelling = () => {
return oneLiner('Step run is being cancelled');
};

const StepRunOutputBackoff = () => {
return oneLiner('Step run is in a retry backoff state');
};

const OUTPUT_STATE_MAP: Record<StepRunStatus, React.FC<StepRunOutputProps>> = {
[StepRunStatus.CANCELLED]: StepRunOutputCancelled,
[StepRunStatus.PENDING]: StepRunOutputPending,
Expand All @@ -98,6 +102,7 @@ const OUTPUT_STATE_MAP: Record<StepRunStatus, React.FC<StepRunOutputProps>> = {
[StepRunStatus.SUCCEEDED]: StepRunOutputSucceeded,
[StepRunStatus.FAILED]: StepRunOutputFailed,
[StepRunStatus.CANCELLING]: StepRunOutputCancelling,
[StepRunStatus.BACKOFF]: StepRunOutputBackoff,
};

const StepRunOutput: React.FC<StepRunOutputProps> = (props) => {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,10 @@ const RUN_STATUS_VARIANTS: Record<RunStatusType, RunStatusVariant> = {
text: 'Scheduled',
variant: 'outline',
},
BACKOFF: {
text: 'Backoff',
variant: 'outline',
},
};

const RUN_STATUS_REASONS: Record<string, string> = {
Expand Down
2 changes: 1 addition & 1 deletion internal/services/controllers/jobs/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -474,7 +474,7 @@ func (ec *JobsControllerImpl) handleStepRunRetry(ctx context.Context, task *msgq

// if the step has retry backoff enabled, then we should calculate the backoff time and insert into the retry queue
if retryAfter != nil {
return ec.repo.StepRun().StepRunRetryBackoff(ctx, metadata.TenantId, sqlchelpers.UUIDToStr(stepRun.SRID), *retryAfter)
return ec.repo.StepRun().StepRunRetryBackoff(ctx, metadata.TenantId, sqlchelpers.UUIDToStr(stepRun.SRID), *retryAfter, retryCount) // TODO update backoff after picked up
}

return ec.queueStepRun(ctx, metadata.TenantId, sqlchelpers.UUIDToStr(stepRun.StepId), sqlchelpers.UUIDToStr(stepRun.SRID), true)
Expand Down
2 changes: 2 additions & 0 deletions pkg/client/rest/gen.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

3 changes: 3 additions & 0 deletions pkg/repository/prisma/dbsqlc/job_runs.sql
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ WITH stepRuns AS (
SELECT
runs."jobRunId",
sum(case when runs."status" IN ('PENDING', 'PENDING_ASSIGNMENT') then 1 else 0 end) AS pendingRuns,
sum(case when runs."status" = 'BACKOFF' then 1 else 0 end) AS backoffRuns,
sum(case when runs."status" IN ('RUNNING', 'ASSIGNED') then 1 else 0 end) AS runningRuns,
sum(case when runs."status" = 'SUCCEEDED' then 1 else 0 end) AS succeededRuns,
sum(case when runs."status" = 'FAILED' then 1 else 0 end) AS failedRuns,
Expand All @@ -27,6 +28,8 @@ SET "status" = CASE
-- Final states are final, cannot be updated
WHEN "status" IN ('SUCCEEDED', 'FAILED', 'CANCELLED') THEN "status"
-- NOTE: Order of the following conditions is important
-- When one step run is backoff AND no other step runs are running, then the job is backoff
WHEN s.backoffRuns > 0 AND s.runningRuns = 0 THEN 'BACKOFF'
-- When one step run is running, then the job is running
WHEN (s.runningRuns > 0 OR s.pendingRuns > 0) THEN 'RUNNING'
-- When one step run has failed, then the job is failed
Expand Down
3 changes: 3 additions & 0 deletions pkg/repository/prisma/dbsqlc/job_runs.sql.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

3 changes: 3 additions & 0 deletions pkg/repository/prisma/dbsqlc/models.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 2 additions & 2 deletions pkg/repository/prisma/dbsqlc/queue.sql
Original file line number Diff line number Diff line change
Expand Up @@ -553,7 +553,7 @@ WHERE
AND w."isActive" = true
AND w."isPaused" = false;

-- name: RetryStepRuns :one
-- name: RetryStepRuns :many
WITH retries AS (
SELECT
*
Expand Down Expand Up @@ -633,7 +633,7 @@ WITH retries AS (
srs
RETURNING "stepRunId"
)
SELECT COUNT(*) FROM retries;
SELECT * FROM retries;

-- name: CreateRetryQueueItem :exec
INSERT INTO
Expand Down
41 changes: 34 additions & 7 deletions pkg/repository/prisma/dbsqlc/queue.sql.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

18 changes: 18 additions & 0 deletions pkg/repository/prisma/dbsqlc/step_runs.sql
Original file line number Diff line number Diff line change
Expand Up @@ -518,6 +518,24 @@ FROM (
WHERE
"StepRun"."id" = input."id";


-- name: BulkBackoffStepRun :exec
UPDATE
"StepRun"
SET
"status" = 'BACKOFF'
WHERE
"id" = ANY(@stepRunIds::uuid[]);


-- name: BulkRetryStepRun :exec
UPDATE
"StepRun"
SET
"status" = 'PENDING_ASSIGNMENT'
WHERE
"id" = ANY(@stepRunIds::uuid[]);

-- name: ResolveLaterStepRuns :many
WITH RECURSIVE currStepRun AS (
SELECT "id", "status", "cancelledReason"
Expand Down
28 changes: 28 additions & 0 deletions pkg/repository/prisma/dbsqlc/step_runs.sql.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

3 changes: 3 additions & 0 deletions pkg/repository/prisma/dbsqlc/workflow_runs.sql
Original file line number Diff line number Diff line change
Expand Up @@ -413,6 +413,7 @@ RETURNING workflowRun.*;
WITH jobRuns AS (
SELECT
runs."workflowRunId",
sum(case when runs."status" = 'BACKOFF' then 1 else 0 end) AS backoffRuns,
sum(case when runs."status" = 'PENDING' then 1 else 0 end) AS pendingRuns,
sum(case when runs."status" = 'RUNNING' then 1 else 0 end) AS runningRuns,
sum(case when runs."status" = 'SUCCEEDED' then 1 else 0 end) AS succeededRuns,
Expand All @@ -436,6 +437,8 @@ WITH jobRuns AS (
SET "status" = CASE
-- Final states are final, cannot be updated
WHEN "status" IN ('SUCCEEDED', 'FAILED') THEN "status"
-- When one job run is backoff AND no other job runs are running, then the workflow is backoff
WHEN j.backoffRuns > 0 AND j.runningRuns = 0 THEN 'BACKOFF'
-- We check for running first, because if a job run is running, then the workflow is running
WHEN j.runningRuns > 0 THEN 'RUNNING'
-- When at least one job run has failed or been cancelled, then the workflow is failed
Expand Down
3 changes: 3 additions & 0 deletions pkg/repository/prisma/dbsqlc/workflow_runs.sql.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Loading

0 comments on commit d821a7b

Please sign in to comment.