Skip to content

Commit

Permalink
We always ignore the pending run if the job is alread queued
Browse files Browse the repository at this point in the history
  • Loading branch information
whywaita committed Jan 20, 2025
1 parent bc59e07 commit b63c915
Show file tree
Hide file tree
Showing 2 changed files with 39 additions and 24 deletions.
39 changes: 39 additions & 0 deletions pkg/datastore/github.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,45 @@ type PendingWorkflowRunWithTarget struct {

// GetPendingWorkflowRunByRecentRepositories get pending workflow runs by recent active repositories
func GetPendingWorkflowRunByRecentRepositories(ctx context.Context, ds Datastore) ([]PendingWorkflowRunWithTarget, error) {
pendingRuns, err := getPendingWorkflowRunByRecentRepositories(ctx, ds)
if err != nil {
return nil, fmt.Errorf("failed to get pending workflow runs: %w", err)
}

queuedJob, err := ds.ListJobs(ctx)
if err != nil {
return nil, fmt.Errorf("failed to get list of jobs: %w", err)
}

var result []PendingWorkflowRunWithTarget
// We ignore the pending run if the job is already queued.
for _, pendingRun := range pendingRuns {
for _, job := range queuedJob {
webhookEvent, err := github.ParseWebHook("workflow_job", []byte(job.CheckEventJSON))
if err != nil {
logger.Logf(false, "failed to parse webhook payload (job id: %s): %+v", job.UUID, err)
continue
}

workflowJob, ok := webhookEvent.(*github.WorkflowJobEvent)
if !ok {
logger.Logf(false, "failed to cast to WorkflowJobEvent (job id: %s)", job.UUID)
continue
}

if pendingRun.WorkflowRun.GetID() == workflowJob.GetWorkflowJob().GetRunID() {
logger.Logf(true, "found job in datastore, So will ignore: (repo: %s, runID: %d)", pendingRun.WorkflowRun.GetRepository().GetFullName(), pendingRun.WorkflowRun.GetID())
continue
}
}

result = append(result, pendingRun)
}

return result, nil
}

func getPendingWorkflowRunByRecentRepositories(ctx context.Context, ds Datastore) ([]PendingWorkflowRunWithTarget, error) {
recentActiveRepositories, err := getRecentRepositories(ctx, ds)
if err != nil {
return nil, fmt.Errorf("failed to get recent repositories: %w", err)
Expand Down
24 changes: 0 additions & 24 deletions pkg/starter/starter.go
Original file line number Diff line number Diff line change
Expand Up @@ -405,30 +405,6 @@ func (s *Starter) reRunWorkflow(ctx context.Context) {
}

func reRunWorkflowByPendingRun(ctx context.Context, ds datastore.Datastore, pendingRun datastore.PendingWorkflowRunWithTarget) error {
queuedJob, err := ds.ListJobs(ctx)
if err != nil {
return fmt.Errorf("failed to get list of jobs: %w", err)
}

for _, job := range queuedJob {
webhookEvent, err := github.ParseWebHook("workflow_job", []byte(job.CheckEventJSON))
if err != nil {
logger.Logf(false, "failed to parse webhook payload (job id: %s): %+v", job.UUID, err)
continue
}

workflowJob, ok := webhookEvent.(*github.WorkflowJobEvent)
if !ok {
logger.Logf(false, "failed to cast to WorkflowJobEvent (job id: %s)", job.UUID)
continue
}

if pendingRun.WorkflowRun.GetID() == workflowJob.GetWorkflowJob().GetRunID() {
logger.Logf(true, "found job in datastore, So will ignore: (repo: %s, runID: %d)", pendingRun.WorkflowRun.GetRepository().GetFullName(), pendingRun.WorkflowRun.GetID())
return nil
}
}

if err := enqueueRescueRun(ctx, pendingRun, ds); err != nil {
return fmt.Errorf("failed to enqueue rescue job: %w", err)
}
Expand Down

0 comments on commit b63c915

Please sign in to comment.