Skip to content

Commit

Permalink
checkJobConcurrency
Browse files Browse the repository at this point in the history
  • Loading branch information
Zettat123 committed Dec 12, 2024
1 parent b3ddfce commit ed133ff
Show file tree
Hide file tree
Showing 5 changed files with 209 additions and 46 deletions.
61 changes: 34 additions & 27 deletions models/actions/run.go
Original file line number Diff line number Diff line change
Expand Up @@ -230,42 +230,49 @@ func CancelPreviousJobsWithOpts(ctx context.Context, opts *FindRunOptions) error
return err
}

// Iterate over each job and attempt to cancel it.
for _, job := range jobs {
// Skip jobs that are already in a terminal state (completed, cancelled, etc.).
status := job.Status
if status.IsDone() {
continue
}
if err := CancelJobs(ctx, jobs); err != nil {
return err
}
}

// If the job has no associated task (probably an error), set its status to 'Cancelled' and stop it.
if job.TaskID == 0 {
job.Status = StatusCancelled
job.Stopped = timeutil.TimeStampNow()
// Return nil to indicate successful cancellation of all running and waiting jobs.
return nil
}

// Update the job's status and stopped time in the database.
n, err := UpdateRunJob(ctx, job, builder.Eq{"task_id": 0}, "status", "stopped")
if err != nil {
return err
}
func CancelJobs(ctx context.Context, jobs []*ActionRunJob) error {
// Iterate over each job and attempt to cancel it.
for _, job := range jobs {
// Skip jobs that are already in a terminal state (completed, cancelled, etc.).
status := job.Status
if status.IsDone() {
continue
}

// If the update affected 0 rows, it means the job has changed in the meantime, so we need to try again.
if n == 0 {
return fmt.Errorf("job has changed, try again")
}
// If the job has no associated task (probably an error), set its status to 'Cancelled' and stop it.
if job.TaskID == 0 {
job.Status = StatusCancelled
job.Stopped = timeutil.TimeStampNow()

// Continue with the next job.
continue
// Update the job's status and stopped time in the database.
n, err := UpdateRunJob(ctx, job, builder.Eq{"task_id": 0}, "status", "stopped")
if err != nil {
return err
}

// If the job has an associated task, try to stop the task, effectively cancelling the job.
if err := StopTask(ctx, job.TaskID, StatusCancelled); err != nil {
return err
// If the update affected 0 rows, it means the job has changed in the meantime, so we need to try again.
if n == 0 {
return fmt.Errorf("job has changed, try again")
}

// Continue with the next job.
continue
}
}

// Return nil to indicate successful cancellation of all running and waiting jobs.
// If the job has an associated task, try to stop the task, effectively cancelling the job.
if err := StopTask(ctx, job.TaskID, StatusCancelled); err != nil {
return err
}
}
return nil
}

Expand Down
16 changes: 10 additions & 6 deletions models/actions/run_job_list.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,12 +48,13 @@ func (jobs ActionJobList) LoadAttributes(ctx context.Context, withRepo bool) err

type FindRunJobOptions struct {
db.ListOptions
RunID int64
RepoID int64
OwnerID int64
CommitSHA string
Statuses []Status
UpdatedBefore timeutil.TimeStamp
RunID int64
RepoID int64
OwnerID int64
CommitSHA string
Statuses []Status
UpdatedBefore timeutil.TimeStamp
ConcurrencyGroup string
}

func (opts FindRunJobOptions) ToConds() builder.Cond {
Expand All @@ -76,5 +77,8 @@ func (opts FindRunJobOptions) ToConds() builder.Cond {
if opts.UpdatedBefore > 0 {
cond = cond.And(builder.Lt{"updated": opts.UpdatedBefore})
}
if opts.ConcurrencyGroup != "" {
cond = cond.And(builder.Eq{"concurrency_group": opts.ConcurrencyGroup})
}
return cond
}
125 changes: 114 additions & 11 deletions services/actions/job_emitter.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,16 +4,19 @@
package actions

import (
"bytes"
"context"
"errors"
"fmt"

actions_model "code.gitea.io/gitea/models/actions"
"code.gitea.io/gitea/models/db"
"code.gitea.io/gitea/modules/graceful"
"code.gitea.io/gitea/modules/log"
"code.gitea.io/gitea/modules/queue"

"github.com/nektos/act/pkg/jobparser"
act_model "github.com/nektos/act/pkg/model"
"xorm.io/builder"
)

Expand Down Expand Up @@ -55,7 +58,7 @@ func checkJobsByRunID(ctx context.Context, runID int64) error {

return db.WithTx(ctx, func(ctx context.Context) error {
// check jobs of the current run
if err := checkJobsOfRun(ctx, runID); err != nil {
if err := checkJobsOfRun(ctx, run); err != nil {
return err
}

Expand All @@ -78,7 +81,7 @@ func checkJobsByRunID(ctx context.Context, runID int64) error {
if cRun.NeedApproval {
continue
}
if err := checkJobsOfRun(ctx, cRun.ID); err != nil {
if err := checkJobsOfRun(ctx, cRun); err != nil {
return err
}
break // only run one blocked action run with the same concurrency group
Expand All @@ -87,18 +90,23 @@ func checkJobsByRunID(ctx context.Context, runID int64) error {
})
}

func checkJobsOfRun(ctx context.Context, runID int64) error {
jobs, err := db.Find[actions_model.ActionRunJob](ctx, actions_model.FindRunJobOptions{RunID: runID})
func checkJobsOfRun(ctx context.Context, run *actions_model.ActionRun) error {
jobs, err := db.Find[actions_model.ActionRunJob](ctx, actions_model.FindRunJobOptions{RunID: run.ID})
if err != nil {
return err
}

vars, err := actions_model.GetVariablesOfRun(ctx, run)
if err != nil {
return fmt.Errorf("get run %d variables: %w", run.ID, err)
}

if err := db.WithTx(ctx, func(ctx context.Context) error {
idToJobs := make(map[string][]*actions_model.ActionRunJob, len(jobs))
for _, job := range jobs {
idToJobs[job.JobID] = append(idToJobs[job.JobID], job)
job.Run = run
}

updates := newJobStatusResolver(jobs).Resolve()
updates := newJobStatusResolver(jobs, vars).Resolve(ctx)
for _, job := range jobs {
if status, ok := updates[job.ID]; ok {
job.Status = status
Expand All @@ -121,9 +129,10 @@ type jobStatusResolver struct {
statuses map[int64]actions_model.Status
needs map[int64][]int64
jobMap map[int64]*actions_model.ActionRunJob
vars map[string]string
}

func newJobStatusResolver(jobs actions_model.ActionJobList) *jobStatusResolver {
func newJobStatusResolver(jobs actions_model.ActionJobList, vars map[string]string) *jobStatusResolver {
idToJobs := make(map[string][]*actions_model.ActionRunJob, len(jobs))
jobMap := make(map[int64]*actions_model.ActionRunJob)
for _, job := range jobs {
Expand All @@ -145,13 +154,14 @@ func newJobStatusResolver(jobs actions_model.ActionJobList) *jobStatusResolver {
statuses: statuses,
needs: needs,
jobMap: jobMap,
vars: vars,
}
}

func (r *jobStatusResolver) Resolve() map[int64]actions_model.Status {
func (r *jobStatusResolver) Resolve(ctx context.Context) map[int64]actions_model.Status {
ret := map[int64]actions_model.Status{}
for i := 0; i < len(r.statuses); i++ {
updated := r.resolve()
updated := r.resolve(ctx)
if len(updated) == 0 {
return ret
}
Expand All @@ -163,7 +173,7 @@ func (r *jobStatusResolver) Resolve() map[int64]actions_model.Status {
return ret
}

func (r *jobStatusResolver) resolve() map[int64]actions_model.Status {
func (r *jobStatusResolver) resolve(ctx context.Context) map[int64]actions_model.Status {
ret := map[int64]actions_model.Status{}
for id, status := range r.statuses {
if status != actions_model.StatusBlocked {
Expand All @@ -180,6 +190,17 @@ func (r *jobStatusResolver) resolve() map[int64]actions_model.Status {
}
}
if allDone {
// check concurrency
blockedByJobConcurrency, err := checkJobConcurrency(ctx, r.jobMap[id], r.vars)
if err != nil {
log.Error("Check run %d job %d concurrency: %v. This job will stay blocked.")
continue
}

if blockedByJobConcurrency {
continue
}

if allSucceed {
ret[id] = actions_model.StatusWaiting
} else {
Expand All @@ -203,3 +224,85 @@ func (r *jobStatusResolver) resolve() map[int64]actions_model.Status {
}
return ret
}

func checkJobConcurrency(ctx context.Context, actionRunJob *actions_model.ActionRunJob, vars map[string]string) (bool, error) {
if len(actionRunJob.RawConcurrencyGroup) == 0 {
return false, nil
}

run := actionRunJob.Run

if len(actionRunJob.ConcurrencyGroup) == 0 {
rawConcurrency := &act_model.RawConcurrency{

Check failure on line 236 in services/actions/job_emitter.go

View workflow job for this annotation

GitHub Actions / checks-backend

undefined: act_model.RawConcurrency
Group: actionRunJob.RawConcurrencyGroup,
CancelInProgress: actionRunJob.RawConcurrencyCancel,
}

gitCtx := jobparser.ToGitContext(GenerateGitContext(run, actionRunJob))

Check failure on line 241 in services/actions/job_emitter.go

View workflow job for this annotation

GitHub Actions / checks-backend

undefined: jobparser.ToGitContext

actWorkflow, err := act_model.ReadWorkflow(bytes.NewReader(actionRunJob.WorkflowPayload))
if err != nil {
return false, fmt.Errorf("read workflow: %w", err)
}
actJob := actWorkflow.GetJob(actionRunJob.JobID)

task, err := actions_model.GetTaskByID(ctx, actionRunJob.TaskID)
if err != nil {
return false, fmt.Errorf("get task by id: %w", err)
}
taskNeeds, err := FindTaskNeeds(ctx, task)
if err != nil {
return false, fmt.Errorf("find task needs: %w", err)
}

jobResults := make(map[string]*jobparser.JobResult, len(taskNeeds))
for jobID, taskNeed := range taskNeeds {
jobResult := &jobparser.JobResult{
Result: taskNeed.Result.String(),
Outputs: taskNeed.Outputs,
}
jobResults[jobID] = jobResult
}

actionRunJob.ConcurrencyGroup, actionRunJob.ConcurrencyCancel = jobparser.InterpolatJobConcurrency(rawConcurrency, actJob, gitCtx, vars, jobResults)

Check failure on line 267 in services/actions/job_emitter.go

View workflow job for this annotation

GitHub Actions / checks-backend

undefined: jobparser.InterpolatJobConcurrency
if _, err := actions_model.UpdateRunJob(ctx, &actions_model.ActionRunJob{
ID: actionRunJob.ID,
ConcurrencyGroup: actionRunJob.ConcurrencyGroup,
ConcurrencyCancel: actionRunJob.ConcurrencyCancel,
}, nil); err != nil {
return false, fmt.Errorf("update run job: %w", err)
}
}

if actionRunJob.ConcurrencyCancel {
// cancel previous jobs in the same concurrency group
previousJobs, err := db.Find[actions_model.ActionRunJob](ctx, actions_model.FindRunJobOptions{
RepoID: actionRunJob.RepoID,
ConcurrencyGroup: actionRunJob.ConcurrencyGroup,
Statuses: []actions_model.Status{
actions_model.StatusRunning,
actions_model.StatusWaiting,
actions_model.StatusBlocked,
},
})
if err != nil {
return false, fmt.Errorf("find previous jobs: %w", err)
}
if err := actions_model.CancelJobs(ctx, previousJobs); err != nil {
return false, fmt.Errorf("cancel previous jobs: %w", err)
}
// we have cancelled all previous jobs, so this job does not need to be blocked
return false, nil
}

waitingConcurrentJobsNum, err := db.Count[actions_model.ActionRunJob](ctx, actions_model.FindRunJobOptions{
RepoID: actionRunJob.RepoID,
ConcurrencyGroup: actionRunJob.ConcurrencyGroup,
Statuses: []actions_model.Status{actions_model.StatusWaiting},
})
if err != nil {
return false, fmt.Errorf("count waiting jobs: %w", err)
}

return waitingConcurrentJobsNum > 0, nil
}
5 changes: 3 additions & 2 deletions services/actions/job_emitter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
package actions

import (
"context"
"testing"

actions_model "code.gitea.io/gitea/models/actions"
Expand Down Expand Up @@ -129,8 +130,8 @@ jobs:
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
r := newJobStatusResolver(tt.jobs)
assert.Equal(t, tt.want, r.Resolve())
r := newJobStatusResolver(tt.jobs, nil)
assert.Equal(t, tt.want, r.Resolve(context.Background()))
})
}
}
48 changes: 48 additions & 0 deletions services/actions/utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,10 +4,13 @@
package actions

import (
"context"
"fmt"

actions_model "code.gitea.io/gitea/models/actions"
"code.gitea.io/gitea/models/db"
actions_module "code.gitea.io/gitea/modules/actions"
"code.gitea.io/gitea/modules/container"
"code.gitea.io/gitea/modules/git"
"code.gitea.io/gitea/modules/json"
"code.gitea.io/gitea/modules/setting"
Expand Down Expand Up @@ -94,3 +97,48 @@ func GenerateGitContext(run *actions_model.ActionRun, job *actions_model.ActionR

return gitContext
}

type TaskNeed struct {
Result actions_model.Status
Outputs map[string]string
}

func FindTaskNeeds(ctx context.Context, task *actions_model.ActionTask) (map[string]*TaskNeed, error) {
if err := task.LoadAttributes(ctx); err != nil {
return nil, fmt.Errorf("LoadAttributes: %w", err)
}
if len(task.Job.Needs) == 0 {
return nil, nil
}
needs := container.SetOf(task.Job.Needs...)

jobs, err := db.Find[actions_model.ActionRunJob](ctx, actions_model.FindRunJobOptions{RunID: task.Job.RunID})
if err != nil {
return nil, fmt.Errorf("FindRunJobs: %w", err)
}

ret := make(map[string]*TaskNeed, len(needs))
for _, job := range jobs {
if !needs.Contains(job.JobID) {
continue
}
if job.TaskID == 0 || !job.Status.IsDone() {
// it shouldn't happen, or the job has been rerun
continue
}
outputs := make(map[string]string)
got, err := actions_model.FindTaskOutputByTaskID(ctx, job.TaskID)
if err != nil {
return nil, fmt.Errorf("FindTaskOutputByTaskID: %w", err)
}
for _, v := range got {
outputs[v.OutputKey] = v.OutputValue
}
ret[job.JobID] = &TaskNeed{
Outputs: outputs,
Result: job.Status,
}
}

return ret, nil
}

0 comments on commit ed133ff

Please sign in to comment.