diff --git a/models/actions/run.go b/models/actions/run.go index fa6902b008397..f87667a3b3dad 100644 --- a/models/actions/run.go +++ b/models/actions/run.go @@ -230,42 +230,49 @@ func CancelPreviousJobsWithOpts(ctx context.Context, opts *FindRunOptions) error return err } - // Iterate over each job and attempt to cancel it. - for _, job := range jobs { - // Skip jobs that are already in a terminal state (completed, cancelled, etc.). - status := job.Status - if status.IsDone() { - continue - } + if err := CancelJobs(ctx, jobs); err != nil { + return err + } + } - // If the job has no associated task (probably an error), set its status to 'Cancelled' and stop it. - if job.TaskID == 0 { - job.Status = StatusCancelled - job.Stopped = timeutil.TimeStampNow() + // Return nil to indicate successful cancellation of all running and waiting jobs. + return nil +} - // Update the job's status and stopped time in the database. - n, err := UpdateRunJob(ctx, job, builder.Eq{"task_id": 0}, "status", "stopped") - if err != nil { - return err - } +func CancelJobs(ctx context.Context, jobs []*ActionRunJob) error { + // Iterate over each job and attempt to cancel it. + for _, job := range jobs { + // Skip jobs that are already in a terminal state (completed, cancelled, etc.). + status := job.Status + if status.IsDone() { + continue + } - // If the update affected 0 rows, it means the job has changed in the meantime, so we need to try again. - if n == 0 { - return fmt.Errorf("job has changed, try again") - } + // If the job has no associated task (probably an error), set its status to 'Cancelled' and stop it. + if job.TaskID == 0 { + job.Status = StatusCancelled + job.Stopped = timeutil.TimeStampNow() - // Continue with the next job. - continue + // Update the job's status and stopped time in the database. + n, err := UpdateRunJob(ctx, job, builder.Eq{"task_id": 0}, "status", "stopped") + if err != nil { + return err } - // If the job has an associated task, try to stop the task, effectively cancelling the job. - if err := StopTask(ctx, job.TaskID, StatusCancelled); err != nil { - return err + // If the update affected 0 rows, it means the job has changed in the meantime, so we need to try again. + if n == 0 { + return fmt.Errorf("job has changed, try again") } + + // Continue with the next job. + continue } - } - // Return nil to indicate successful cancellation of all running and waiting jobs. + // If the job has an associated task, try to stop the task, effectively cancelling the job. + if err := StopTask(ctx, job.TaskID, StatusCancelled); err != nil { + return err + } + } return nil } diff --git a/models/actions/run_job_list.go b/models/actions/run_job_list.go index 6c5d3b3252ebf..6b808b5abd888 100644 --- a/models/actions/run_job_list.go +++ b/models/actions/run_job_list.go @@ -48,12 +48,13 @@ func (jobs ActionJobList) LoadAttributes(ctx context.Context, withRepo bool) err type FindRunJobOptions struct { db.ListOptions - RunID int64 - RepoID int64 - OwnerID int64 - CommitSHA string - Statuses []Status - UpdatedBefore timeutil.TimeStamp + RunID int64 + RepoID int64 + OwnerID int64 + CommitSHA string + Statuses []Status + UpdatedBefore timeutil.TimeStamp + ConcurrencyGroup string } func (opts FindRunJobOptions) ToConds() builder.Cond { @@ -76,5 +77,8 @@ func (opts FindRunJobOptions) ToConds() builder.Cond { if opts.UpdatedBefore > 0 { cond = cond.And(builder.Lt{"updated": opts.UpdatedBefore}) } + if opts.ConcurrencyGroup != "" { + cond = cond.And(builder.Eq{"concurrency_group": opts.ConcurrencyGroup}) + } return cond } diff --git a/services/actions/job_emitter.go b/services/actions/job_emitter.go index b927b1ebd0553..bfb1edc258e87 100644 --- a/services/actions/job_emitter.go +++ b/services/actions/job_emitter.go @@ -4,6 +4,7 @@ package actions import ( + "bytes" "context" "errors" "fmt" @@ -11,9 +12,11 @@ import ( actions_model "code.gitea.io/gitea/models/actions" "code.gitea.io/gitea/models/db" "code.gitea.io/gitea/modules/graceful" + "code.gitea.io/gitea/modules/log" "code.gitea.io/gitea/modules/queue" "github.com/nektos/act/pkg/jobparser" + act_model "github.com/nektos/act/pkg/model" "xorm.io/builder" ) @@ -55,7 +58,7 @@ func checkJobsByRunID(ctx context.Context, runID int64) error { return db.WithTx(ctx, func(ctx context.Context) error { // check jobs of the current run - if err := checkJobsOfRun(ctx, runID); err != nil { + if err := checkJobsOfRun(ctx, run); err != nil { return err } @@ -78,7 +81,7 @@ func checkJobsByRunID(ctx context.Context, runID int64) error { if cRun.NeedApproval { continue } - if err := checkJobsOfRun(ctx, cRun.ID); err != nil { + if err := checkJobsOfRun(ctx, cRun); err != nil { return err } break // only run one blocked action run with the same concurrency group @@ -87,18 +90,23 @@ func checkJobsByRunID(ctx context.Context, runID int64) error { }) } -func checkJobsOfRun(ctx context.Context, runID int64) error { - jobs, err := db.Find[actions_model.ActionRunJob](ctx, actions_model.FindRunJobOptions{RunID: runID}) +func checkJobsOfRun(ctx context.Context, run *actions_model.ActionRun) error { + jobs, err := db.Find[actions_model.ActionRunJob](ctx, actions_model.FindRunJobOptions{RunID: run.ID}) if err != nil { return err } + + vars, err := actions_model.GetVariablesOfRun(ctx, run) + if err != nil { + return fmt.Errorf("get run %d variables: %w", run.ID, err) + } + if err := db.WithTx(ctx, func(ctx context.Context) error { - idToJobs := make(map[string][]*actions_model.ActionRunJob, len(jobs)) for _, job := range jobs { - idToJobs[job.JobID] = append(idToJobs[job.JobID], job) + job.Run = run } - updates := newJobStatusResolver(jobs).Resolve() + updates := newJobStatusResolver(jobs, vars).Resolve(ctx) for _, job := range jobs { if status, ok := updates[job.ID]; ok { job.Status = status @@ -121,9 +129,10 @@ type jobStatusResolver struct { statuses map[int64]actions_model.Status needs map[int64][]int64 jobMap map[int64]*actions_model.ActionRunJob + vars map[string]string } -func newJobStatusResolver(jobs actions_model.ActionJobList) *jobStatusResolver { +func newJobStatusResolver(jobs actions_model.ActionJobList, vars map[string]string) *jobStatusResolver { idToJobs := make(map[string][]*actions_model.ActionRunJob, len(jobs)) jobMap := make(map[int64]*actions_model.ActionRunJob) for _, job := range jobs { @@ -145,13 +154,14 @@ func newJobStatusResolver(jobs actions_model.ActionJobList) *jobStatusResolver { statuses: statuses, needs: needs, jobMap: jobMap, + vars: vars, } } -func (r *jobStatusResolver) Resolve() map[int64]actions_model.Status { +func (r *jobStatusResolver) Resolve(ctx context.Context) map[int64]actions_model.Status { ret := map[int64]actions_model.Status{} for i := 0; i < len(r.statuses); i++ { - updated := r.resolve() + updated := r.resolve(ctx) if len(updated) == 0 { return ret } @@ -163,7 +173,7 @@ func (r *jobStatusResolver) Resolve() map[int64]actions_model.Status { return ret } -func (r *jobStatusResolver) resolve() map[int64]actions_model.Status { +func (r *jobStatusResolver) resolve(ctx context.Context) map[int64]actions_model.Status { ret := map[int64]actions_model.Status{} for id, status := range r.statuses { if status != actions_model.StatusBlocked { @@ -180,6 +190,17 @@ func (r *jobStatusResolver) resolve() map[int64]actions_model.Status { } } if allDone { + // check concurrency + blockedByJobConcurrency, err := checkJobConcurrency(ctx, r.jobMap[id], r.vars) + if err != nil { + log.Error("Check run %d job %d concurrency: %v. This job will stay blocked.") + continue + } + + if blockedByJobConcurrency { + continue + } + if allSucceed { ret[id] = actions_model.StatusWaiting } else { @@ -203,3 +224,85 @@ func (r *jobStatusResolver) resolve() map[int64]actions_model.Status { } return ret } + +func checkJobConcurrency(ctx context.Context, actionRunJob *actions_model.ActionRunJob, vars map[string]string) (bool, error) { + if len(actionRunJob.RawConcurrencyGroup) == 0 { + return false, nil + } + + run := actionRunJob.Run + + if len(actionRunJob.ConcurrencyGroup) == 0 { + rawConcurrency := &act_model.RawConcurrency{ + Group: actionRunJob.RawConcurrencyGroup, + CancelInProgress: actionRunJob.RawConcurrencyCancel, + } + + gitCtx := jobparser.ToGitContext(GenerateGitContext(run, actionRunJob)) + + actWorkflow, err := act_model.ReadWorkflow(bytes.NewReader(actionRunJob.WorkflowPayload)) + if err != nil { + return false, fmt.Errorf("read workflow: %w", err) + } + actJob := actWorkflow.GetJob(actionRunJob.JobID) + + task, err := actions_model.GetTaskByID(ctx, actionRunJob.TaskID) + if err != nil { + return false, fmt.Errorf("get task by id: %w", err) + } + taskNeeds, err := FindTaskNeeds(ctx, task) + if err != nil { + return false, fmt.Errorf("find task needs: %w", err) + } + + jobResults := make(map[string]*jobparser.JobResult, len(taskNeeds)) + for jobID, taskNeed := range taskNeeds { + jobResult := &jobparser.JobResult{ + Result: taskNeed.Result.String(), + Outputs: taskNeed.Outputs, + } + jobResults[jobID] = jobResult + } + + actionRunJob.ConcurrencyGroup, actionRunJob.ConcurrencyCancel = jobparser.InterpolatJobConcurrency(rawConcurrency, actJob, gitCtx, vars, jobResults) + if _, err := actions_model.UpdateRunJob(ctx, &actions_model.ActionRunJob{ + ID: actionRunJob.ID, + ConcurrencyGroup: actionRunJob.ConcurrencyGroup, + ConcurrencyCancel: actionRunJob.ConcurrencyCancel, + }, nil); err != nil { + return false, fmt.Errorf("update run job: %w", err) + } + } + + if actionRunJob.ConcurrencyCancel { + // cancel previous jobs in the same concurrency group + previousJobs, err := db.Find[actions_model.ActionRunJob](ctx, actions_model.FindRunJobOptions{ + RepoID: actionRunJob.RepoID, + ConcurrencyGroup: actionRunJob.ConcurrencyGroup, + Statuses: []actions_model.Status{ + actions_model.StatusRunning, + actions_model.StatusWaiting, + actions_model.StatusBlocked, + }, + }) + if err != nil { + return false, fmt.Errorf("find previous jobs: %w", err) + } + if err := actions_model.CancelJobs(ctx, previousJobs); err != nil { + return false, fmt.Errorf("cancel previous jobs: %w", err) + } + // we have cancelled all previous jobs, so this job does not need to be blocked + return false, nil + } + + waitingConcurrentJobsNum, err := db.Count[actions_model.ActionRunJob](ctx, actions_model.FindRunJobOptions{ + RepoID: actionRunJob.RepoID, + ConcurrencyGroup: actionRunJob.ConcurrencyGroup, + Statuses: []actions_model.Status{actions_model.StatusWaiting}, + }) + if err != nil { + return false, fmt.Errorf("count waiting jobs: %w", err) + } + + return waitingConcurrentJobsNum > 0, nil +} diff --git a/services/actions/job_emitter_test.go b/services/actions/job_emitter_test.go index 58c2dc3b242bb..5fe9c59dc32d9 100644 --- a/services/actions/job_emitter_test.go +++ b/services/actions/job_emitter_test.go @@ -4,6 +4,7 @@ package actions import ( + "context" "testing" actions_model "code.gitea.io/gitea/models/actions" @@ -129,8 +130,8 @@ jobs: } for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { - r := newJobStatusResolver(tt.jobs) - assert.Equal(t, tt.want, r.Resolve()) + r := newJobStatusResolver(tt.jobs, nil) + assert.Equal(t, tt.want, r.Resolve(context.Background())) }) } } diff --git a/services/actions/utils.go b/services/actions/utils.go index 6ad996eba95ef..3c3f61c6043ce 100644 --- a/services/actions/utils.go +++ b/services/actions/utils.go @@ -4,10 +4,13 @@ package actions import ( + "context" "fmt" actions_model "code.gitea.io/gitea/models/actions" + "code.gitea.io/gitea/models/db" actions_module "code.gitea.io/gitea/modules/actions" + "code.gitea.io/gitea/modules/container" "code.gitea.io/gitea/modules/git" "code.gitea.io/gitea/modules/json" "code.gitea.io/gitea/modules/setting" @@ -94,3 +97,48 @@ func GenerateGitContext(run *actions_model.ActionRun, job *actions_model.ActionR return gitContext } + +type TaskNeed struct { + Result actions_model.Status + Outputs map[string]string +} + +func FindTaskNeeds(ctx context.Context, task *actions_model.ActionTask) (map[string]*TaskNeed, error) { + if err := task.LoadAttributes(ctx); err != nil { + return nil, fmt.Errorf("LoadAttributes: %w", err) + } + if len(task.Job.Needs) == 0 { + return nil, nil + } + needs := container.SetOf(task.Job.Needs...) + + jobs, err := db.Find[actions_model.ActionRunJob](ctx, actions_model.FindRunJobOptions{RunID: task.Job.RunID}) + if err != nil { + return nil, fmt.Errorf("FindRunJobs: %w", err) + } + + ret := make(map[string]*TaskNeed, len(needs)) + for _, job := range jobs { + if !needs.Contains(job.JobID) { + continue + } + if job.TaskID == 0 || !job.Status.IsDone() { + // it shouldn't happen, or the job has been rerun + continue + } + outputs := make(map[string]string) + got, err := actions_model.FindTaskOutputByTaskID(ctx, job.TaskID) + if err != nil { + return nil, fmt.Errorf("FindTaskOutputByTaskID: %w", err) + } + for _, v := range got { + outputs[v.OutputKey] = v.OutputValue + } + ret[job.JobID] = &TaskNeed{ + Outputs: outputs, + Result: job.Status, + } + } + + return ret, nil +}