diff --git a/internal/maintenance/job_scheduler.go b/internal/maintenance/job_scheduler.go index e94feab1..b09bc464 100644 --- a/internal/maintenance/job_scheduler.go +++ b/internal/maintenance/job_scheduler.go @@ -151,7 +151,7 @@ func (s *JobScheduler) runOnce(ctx context.Context) (*schedulerRunOnceResult, er now := s.Time.NowUTC() nowWithLookAhead := now.Add(s.config.Interval) - scheduledJobs, err := tx.JobSchedule(ctx, &riverdriver.JobScheduleParams{ + scheduledJobResults, err := tx.JobSchedule(ctx, &riverdriver.JobScheduleParams{ Max: s.config.Limit, Now: nowWithLookAhead, }) @@ -159,7 +159,7 @@ func (s *JobScheduler) runOnce(ctx context.Context) (*schedulerRunOnceResult, er return 0, fmt.Errorf("error scheduling jobs: %w", err) } - queues := make([]string, 0, len(scheduledJobs)) + queues := make([]string, 0, len(scheduledJobResults)) // Notify about scheduled jobs with a scheduled_at in the past, or just // slightly in the future (this loop, the notify, and tx commit will take @@ -167,12 +167,12 @@ func (s *JobScheduler) runOnce(ctx context.Context) (*schedulerRunOnceResult, er // is to roughly try to guess when the clients will attempt to fetch jobs. notificationHorizon := s.Time.NowUTC().Add(5 * time.Millisecond) - for _, job := range scheduledJobs { - if job.ScheduledAt.After(notificationHorizon) { + for _, result := range scheduledJobResults { + if result.Job.ScheduledAt.After(notificationHorizon) { continue } - queues = append(queues, job.Queue) + queues = append(queues, result.Job.Queue) } if len(queues) > 0 { @@ -182,7 +182,7 @@ func (s *JobScheduler) runOnce(ctx context.Context) (*schedulerRunOnceResult, er s.TestSignals.NotifiedQueues.Signal(queues) } - return len(scheduledJobs), tx.Commit(ctx) + return len(scheduledJobResults), tx.Commit(ctx) }() if err != nil { return nil, err diff --git a/internal/maintenance/job_scheduler_test.go b/internal/maintenance/job_scheduler_test.go index e5acf3ab..ddc0d366 100644 --- a/internal/maintenance/job_scheduler_test.go +++ b/internal/maintenance/job_scheduler_test.go @@ -8,6 +8,7 @@ import ( "github.com/stretchr/testify/require" + "github.com/riverqueue/river/internal/dbunique" "github.com/riverqueue/river/internal/riverinternaltest" "github.com/riverqueue/river/riverdriver" "github.com/riverqueue/river/riverdriver/riverpgxv5" @@ -77,6 +78,14 @@ func TestJobScheduler(t *testing.T) { require.Equal(t, rivertype.JobStateAvailable, newJob.State) return newJob } + requireJobStateDiscarded := func(t *testing.T, exec riverdriver.Executor, job *rivertype.JobRow) *rivertype.JobRow { + t.Helper() + newJob, err := exec.JobGetByID(ctx, job.ID) + require.NoError(t, err) + require.Equal(t, rivertype.JobStateDiscarded, newJob.State) + require.NotNil(t, newJob.FinalizedAt) + return newJob + } t.Run("Defaults", func(t *testing.T) { t.Parallel() @@ -139,6 +148,50 @@ func TestJobScheduler(t *testing.T) { requireJobStateUnchanged(t, bundle.exec, retryableJob3) // still retryable }) + t.Run("MovesUniqueKeyConflictingJobsToDiscarded", func(t *testing.T) { + t.Parallel() + + scheduler, bundle := setupTx(t) + now := time.Now().UTC() + + // The list of default states, but without retryable to allow for dupes in that state: + uniqueStates := []rivertype.JobState{ + rivertype.JobStateAvailable, + rivertype.JobStateCompleted, + rivertype.JobStatePending, + rivertype.JobStateRunning, + rivertype.JobStateScheduled, + } + uniqueMap := dbunique.UniqueStatesToBitmask(uniqueStates) + + retryableJob1 := testfactory.Job(ctx, t, bundle.exec, &testfactory.JobOpts{UniqueKey: []byte("1"), UniqueStates: uniqueMap, State: ptrutil.Ptr(rivertype.JobStateRetryable), ScheduledAt: ptrutil.Ptr(now.Add(-1 * time.Hour))}) + retryableJob2 := testfactory.Job(ctx, t, bundle.exec, &testfactory.JobOpts{UniqueKey: []byte("2"), UniqueStates: uniqueMap, State: ptrutil.Ptr(rivertype.JobStateRetryable), ScheduledAt: ptrutil.Ptr(now.Add(-5 * time.Second))}) + retryableJob3 := testfactory.Job(ctx, t, bundle.exec, &testfactory.JobOpts{UniqueKey: []byte("3"), UniqueStates: uniqueMap, State: ptrutil.Ptr(rivertype.JobStateRetryable), ScheduledAt: ptrutil.Ptr(now.Add(-5 * time.Second))}) // dupe + retryableJob4 := testfactory.Job(ctx, t, bundle.exec, &testfactory.JobOpts{UniqueKey: []byte("4"), UniqueStates: uniqueMap, State: ptrutil.Ptr(rivertype.JobStateRetryable), ScheduledAt: ptrutil.Ptr(now.Add(-5 * time.Second))}) // dupe + retryableJob5 := testfactory.Job(ctx, t, bundle.exec, &testfactory.JobOpts{UniqueKey: []byte("5"), UniqueStates: uniqueMap, State: ptrutil.Ptr(rivertype.JobStateRetryable), ScheduledAt: ptrutil.Ptr(now.Add(-5 * time.Second))}) // dupe + retryableJob6 := testfactory.Job(ctx, t, bundle.exec, &testfactory.JobOpts{UniqueKey: []byte("6"), UniqueStates: uniqueMap, State: ptrutil.Ptr(rivertype.JobStateRetryable), ScheduledAt: ptrutil.Ptr(now.Add(-5 * time.Second))}) // dupe + retryableJob7 := testfactory.Job(ctx, t, bundle.exec, &testfactory.JobOpts{UniqueKey: []byte("7"), UniqueStates: uniqueMap, State: ptrutil.Ptr(rivertype.JobStateRetryable), ScheduledAt: ptrutil.Ptr(now.Add(-5 * time.Second))}) // dupe + + // Will cause conflicts with above jobs when retried: + testfactory.Job(ctx, t, bundle.exec, &testfactory.JobOpts{UniqueKey: []byte("3"), UniqueStates: uniqueMap, State: ptrutil.Ptr(rivertype.JobStateAvailable)}) + testfactory.Job(ctx, t, bundle.exec, &testfactory.JobOpts{UniqueKey: []byte("4"), UniqueStates: uniqueMap, State: ptrutil.Ptr(rivertype.JobStateCompleted)}) + testfactory.Job(ctx, t, bundle.exec, &testfactory.JobOpts{UniqueKey: []byte("5"), UniqueStates: uniqueMap, State: ptrutil.Ptr(rivertype.JobStatePending)}) + testfactory.Job(ctx, t, bundle.exec, &testfactory.JobOpts{UniqueKey: []byte("6"), UniqueStates: uniqueMap, State: ptrutil.Ptr(rivertype.JobStateRunning)}) + testfactory.Job(ctx, t, bundle.exec, &testfactory.JobOpts{UniqueKey: []byte("7"), UniqueStates: uniqueMap, State: ptrutil.Ptr(rivertype.JobStateScheduled)}) + + require.NoError(t, scheduler.Start(ctx)) + + scheduler.TestSignals.ScheduledBatch.WaitOrTimeout() + + requireJobStateAvailable(t, bundle.exec, retryableJob1) + requireJobStateAvailable(t, bundle.exec, retryableJob2) + requireJobStateDiscarded(t, bundle.exec, retryableJob3) + requireJobStateDiscarded(t, bundle.exec, retryableJob4) + requireJobStateDiscarded(t, bundle.exec, retryableJob5) + requireJobStateDiscarded(t, bundle.exec, retryableJob6) + requireJobStateDiscarded(t, bundle.exec, retryableJob7) + }) + t.Run("SchedulesInBatches", func(t *testing.T) { t.Parallel() diff --git a/riverdriver/river_driver_interface.go b/riverdriver/river_driver_interface.go index 35e0f75c..2cedb610 100644 --- a/riverdriver/river_driver_interface.go +++ b/riverdriver/river_driver_interface.go @@ -123,7 +123,7 @@ type Executor interface { JobListFields() string JobRescueMany(ctx context.Context, params *JobRescueManyParams) (*struct{}, error) JobRetry(ctx context.Context, id int64) (*rivertype.JobRow, error) - JobSchedule(ctx context.Context, params *JobScheduleParams) ([]*rivertype.JobRow, error) + JobSchedule(ctx context.Context, params *JobScheduleParams) ([]*JobScheduleResult, error) JobSetCompleteIfRunningMany(ctx context.Context, params *JobSetCompleteIfRunningManyParams) ([]*rivertype.JobRow, error) JobSetStateIfRunning(ctx context.Context, params *JobSetStateIfRunningParams) (*rivertype.JobRow, error) JobUpdate(ctx context.Context, params *JobUpdateParams) (*rivertype.JobRow, error) @@ -304,8 +304,8 @@ type JobScheduleParams struct { } type JobScheduleResult struct { - Queue string - ScheduledAt time.Time + Job rivertype.JobRow + ConflictDiscarded bool } // JobSetCompleteIfRunningManyParams are parameters to set many running jobs to diff --git a/riverdriver/riverdatabasesql/internal/dbsqlc/river_job.sql.go b/riverdriver/riverdatabasesql/internal/dbsqlc/river_job.sql.go index a1b16407..cb4900de 100644 --- a/riverdriver/riverdatabasesql/internal/dbsqlc/river_job.sql.go +++ b/riverdriver/riverdatabasesql/internal/dbsqlc/river_job.sql.go @@ -1013,13 +1013,13 @@ func (q *Queries) JobRetry(ctx context.Context, db DBTX, id int64) (*RiverJob, e const jobSchedule = `-- name: JobSchedule :many WITH jobs_to_schedule AS ( - SELECT id + SELECT id, unique_key, unique_states FROM river_job WHERE state IN ('retryable', 'scheduled') AND queue IS NOT NULL AND priority >= 0 - AND river_job.scheduled_at <= $1::timestamptz + AND scheduled_at <= $1::timestamptz ORDER BY priority, scheduled_at, @@ -1027,16 +1027,35 @@ WITH jobs_to_schedule AS ( LIMIT $2::bigint FOR UPDATE ), -river_job_scheduled AS ( +conflicting_jobs AS ( + SELECT DISTINCT unique_key + FROM river_job + WHERE unique_key IN ( + SELECT unique_key + FROM jobs_to_schedule + WHERE unique_key IS NOT NULL + AND unique_states IS NOT NULL + ) + AND unique_states IS NOT NULL + AND river_job_state_in_bitmask(unique_states, state) +), +updated_jobs AS ( UPDATE river_job - SET state = 'available' - FROM jobs_to_schedule - WHERE river_job.id = jobs_to_schedule.id - RETURNING river_job.id + SET + state = CASE WHEN cj.unique_key IS NULL THEN 'available'::river_job_state + ELSE 'discarded'::river_job_state END, + finalized_at = CASE WHEN cj.unique_key IS NOT NULL THEN $1::timestamptz + ELSE finalized_at END + FROM jobs_to_schedule jts + LEFT JOIN conflicting_jobs cj ON jts.unique_key = cj.unique_key + WHERE river_job.id = jts.id + RETURNING river_job.id, state = 'discarded'::river_job_state AS conflict_discarded ) -SELECT id, args, attempt, attempted_at, attempted_by, created_at, errors, finalized_at, kind, max_attempts, metadata, priority, queue, state, scheduled_at, tags, unique_key, unique_states +SELECT + river_job.id, river_job.args, river_job.attempt, river_job.attempted_at, river_job.attempted_by, river_job.created_at, river_job.errors, river_job.finalized_at, river_job.kind, river_job.max_attempts, river_job.metadata, river_job.priority, river_job.queue, river_job.state, river_job.scheduled_at, river_job.tags, river_job.unique_key, river_job.unique_states, + updated_jobs.conflict_discarded FROM river_job -WHERE id IN (SELECT id FROM river_job_scheduled) +JOIN updated_jobs ON river_job.id = updated_jobs.id ` type JobScheduleParams struct { @@ -1044,34 +1063,40 @@ type JobScheduleParams struct { Max int64 } -func (q *Queries) JobSchedule(ctx context.Context, db DBTX, arg *JobScheduleParams) ([]*RiverJob, error) { +type JobScheduleRow struct { + RiverJob RiverJob + ConflictDiscarded bool +} + +func (q *Queries) JobSchedule(ctx context.Context, db DBTX, arg *JobScheduleParams) ([]*JobScheduleRow, error) { rows, err := db.QueryContext(ctx, jobSchedule, arg.Now, arg.Max) if err != nil { return nil, err } defer rows.Close() - var items []*RiverJob + var items []*JobScheduleRow for rows.Next() { - var i RiverJob + var i JobScheduleRow if err := rows.Scan( - &i.ID, - &i.Args, - &i.Attempt, - &i.AttemptedAt, - pq.Array(&i.AttemptedBy), - &i.CreatedAt, - pq.Array(&i.Errors), - &i.FinalizedAt, - &i.Kind, - &i.MaxAttempts, - &i.Metadata, - &i.Priority, - &i.Queue, - &i.State, - &i.ScheduledAt, - pq.Array(&i.Tags), - &i.UniqueKey, - &i.UniqueStates, + &i.RiverJob.ID, + &i.RiverJob.Args, + &i.RiverJob.Attempt, + &i.RiverJob.AttemptedAt, + pq.Array(&i.RiverJob.AttemptedBy), + &i.RiverJob.CreatedAt, + pq.Array(&i.RiverJob.Errors), + &i.RiverJob.FinalizedAt, + &i.RiverJob.Kind, + &i.RiverJob.MaxAttempts, + &i.RiverJob.Metadata, + &i.RiverJob.Priority, + &i.RiverJob.Queue, + &i.RiverJob.State, + &i.RiverJob.ScheduledAt, + pq.Array(&i.RiverJob.Tags), + &i.RiverJob.UniqueKey, + &i.RiverJob.UniqueStates, + &i.ConflictDiscarded, ); err != nil { return nil, err } diff --git a/riverdriver/riverdatabasesql/river_database_sql_driver.go b/riverdriver/riverdatabasesql/river_database_sql_driver.go index 7a73e2f6..4e4e894e 100644 --- a/riverdriver/riverdatabasesql/river_database_sql_driver.go +++ b/riverdriver/riverdatabasesql/river_database_sql_driver.go @@ -524,15 +524,21 @@ func (e *Executor) JobRetry(ctx context.Context, id int64) (*rivertype.JobRow, e return jobRowFromInternal(job) } -func (e *Executor) JobSchedule(ctx context.Context, params *riverdriver.JobScheduleParams) ([]*rivertype.JobRow, error) { - jobs, err := dbsqlc.New().JobSchedule(ctx, e.dbtx, &dbsqlc.JobScheduleParams{ +func (e *Executor) JobSchedule(ctx context.Context, params *riverdriver.JobScheduleParams) ([]*riverdriver.JobScheduleResult, error) { + scheduleResults, err := dbsqlc.New().JobSchedule(ctx, e.dbtx, &dbsqlc.JobScheduleParams{ Max: int64(params.Max), Now: params.Now, }) if err != nil { return nil, interpretError(err) } - return mapSliceError(jobs, jobRowFromInternal) + return mapSliceError(scheduleResults, func(result *dbsqlc.JobScheduleRow) (*riverdriver.JobScheduleResult, error) { + job, err := jobRowFromInternal(&result.RiverJob) + if err != nil { + return nil, err + } + return &riverdriver.JobScheduleResult{ConflictDiscarded: result.ConflictDiscarded, Job: *job}, nil + }) } func (e *Executor) JobSetCompleteIfRunningMany(ctx context.Context, params *riverdriver.JobSetCompleteIfRunningManyParams) ([]*rivertype.JobRow, error) { diff --git a/riverdriver/riverpgxv5/internal/dbsqlc/river_job.sql b/riverdriver/riverpgxv5/internal/dbsqlc/river_job.sql index c9d99c3e..b326e311 100644 --- a/riverdriver/riverpgxv5/internal/dbsqlc/river_job.sql +++ b/riverdriver/riverpgxv5/internal/dbsqlc/river_job.sql @@ -406,13 +406,13 @@ FROM updated_job; -- name: JobSchedule :many WITH jobs_to_schedule AS ( - SELECT id + SELECT id, unique_key, unique_states FROM river_job WHERE state IN ('retryable', 'scheduled') AND queue IS NOT NULL AND priority >= 0 - AND river_job.scheduled_at <= @now::timestamptz + AND scheduled_at <= @now::timestamptz ORDER BY priority, scheduled_at, @@ -420,16 +420,35 @@ WITH jobs_to_schedule AS ( LIMIT @max::bigint FOR UPDATE ), -river_job_scheduled AS ( +conflicting_jobs AS ( + SELECT DISTINCT unique_key + FROM river_job + WHERE unique_key IN ( + SELECT unique_key + FROM jobs_to_schedule + WHERE unique_key IS NOT NULL + AND unique_states IS NOT NULL + ) + AND unique_states IS NOT NULL + AND river_job_state_in_bitmask(unique_states, state) +), +updated_jobs AS ( UPDATE river_job - SET state = 'available' - FROM jobs_to_schedule - WHERE river_job.id = jobs_to_schedule.id - RETURNING river_job.id + SET + state = CASE WHEN cj.unique_key IS NULL THEN 'available'::river_job_state + ELSE 'discarded'::river_job_state END, + finalized_at = CASE WHEN cj.unique_key IS NOT NULL THEN @now::timestamptz + ELSE finalized_at END + FROM jobs_to_schedule jts + LEFT JOIN conflicting_jobs cj ON jts.unique_key = cj.unique_key + WHERE river_job.id = jts.id + RETURNING river_job.id, state = 'discarded'::river_job_state AS conflict_discarded ) -SELECT * +SELECT + sqlc.embed(river_job), + updated_jobs.conflict_discarded FROM river_job -WHERE id IN (SELECT id FROM river_job_scheduled); +JOIN updated_jobs ON river_job.id = updated_jobs.id; -- name: JobSetCompleteIfRunningMany :many WITH job_to_finalized_at AS ( diff --git a/riverdriver/riverpgxv5/internal/dbsqlc/river_job.sql.go b/riverdriver/riverpgxv5/internal/dbsqlc/river_job.sql.go index a362e648..e1ae5071 100644 --- a/riverdriver/riverpgxv5/internal/dbsqlc/river_job.sql.go +++ b/riverdriver/riverpgxv5/internal/dbsqlc/river_job.sql.go @@ -997,13 +997,13 @@ func (q *Queries) JobRetry(ctx context.Context, db DBTX, id int64) (*RiverJob, e const jobSchedule = `-- name: JobSchedule :many WITH jobs_to_schedule AS ( - SELECT id + SELECT id, unique_key, unique_states FROM river_job WHERE state IN ('retryable', 'scheduled') AND queue IS NOT NULL AND priority >= 0 - AND river_job.scheduled_at <= $1::timestamptz + AND scheduled_at <= $1::timestamptz ORDER BY priority, scheduled_at, @@ -1011,16 +1011,35 @@ WITH jobs_to_schedule AS ( LIMIT $2::bigint FOR UPDATE ), -river_job_scheduled AS ( +conflicting_jobs AS ( + SELECT DISTINCT unique_key + FROM river_job + WHERE unique_key IN ( + SELECT unique_key + FROM jobs_to_schedule + WHERE unique_key IS NOT NULL + AND unique_states IS NOT NULL + ) + AND unique_states IS NOT NULL + AND river_job_state_in_bitmask(unique_states, state) +), +updated_jobs AS ( UPDATE river_job - SET state = 'available' - FROM jobs_to_schedule - WHERE river_job.id = jobs_to_schedule.id - RETURNING river_job.id + SET + state = CASE WHEN cj.unique_key IS NULL THEN 'available'::river_job_state + ELSE 'discarded'::river_job_state END, + finalized_at = CASE WHEN cj.unique_key IS NOT NULL THEN $1::timestamptz + ELSE finalized_at END + FROM jobs_to_schedule jts + LEFT JOIN conflicting_jobs cj ON jts.unique_key = cj.unique_key + WHERE river_job.id = jts.id + RETURNING river_job.id, state = 'discarded'::river_job_state AS conflict_discarded ) -SELECT id, args, attempt, attempted_at, attempted_by, created_at, errors, finalized_at, kind, max_attempts, metadata, priority, queue, state, scheduled_at, tags, unique_key, unique_states +SELECT + river_job.id, river_job.args, river_job.attempt, river_job.attempted_at, river_job.attempted_by, river_job.created_at, river_job.errors, river_job.finalized_at, river_job.kind, river_job.max_attempts, river_job.metadata, river_job.priority, river_job.queue, river_job.state, river_job.scheduled_at, river_job.tags, river_job.unique_key, river_job.unique_states, + updated_jobs.conflict_discarded FROM river_job -WHERE id IN (SELECT id FROM river_job_scheduled) +JOIN updated_jobs ON river_job.id = updated_jobs.id ` type JobScheduleParams struct { @@ -1028,34 +1047,40 @@ type JobScheduleParams struct { Max int64 } -func (q *Queries) JobSchedule(ctx context.Context, db DBTX, arg *JobScheduleParams) ([]*RiverJob, error) { +type JobScheduleRow struct { + RiverJob RiverJob + ConflictDiscarded bool +} + +func (q *Queries) JobSchedule(ctx context.Context, db DBTX, arg *JobScheduleParams) ([]*JobScheduleRow, error) { rows, err := db.Query(ctx, jobSchedule, arg.Now, arg.Max) if err != nil { return nil, err } defer rows.Close() - var items []*RiverJob + var items []*JobScheduleRow for rows.Next() { - var i RiverJob + var i JobScheduleRow if err := rows.Scan( - &i.ID, - &i.Args, - &i.Attempt, - &i.AttemptedAt, - &i.AttemptedBy, - &i.CreatedAt, - &i.Errors, - &i.FinalizedAt, - &i.Kind, - &i.MaxAttempts, - &i.Metadata, - &i.Priority, - &i.Queue, - &i.State, - &i.ScheduledAt, - &i.Tags, - &i.UniqueKey, - &i.UniqueStates, + &i.RiverJob.ID, + &i.RiverJob.Args, + &i.RiverJob.Attempt, + &i.RiverJob.AttemptedAt, + &i.RiverJob.AttemptedBy, + &i.RiverJob.CreatedAt, + &i.RiverJob.Errors, + &i.RiverJob.FinalizedAt, + &i.RiverJob.Kind, + &i.RiverJob.MaxAttempts, + &i.RiverJob.Metadata, + &i.RiverJob.Priority, + &i.RiverJob.Queue, + &i.RiverJob.State, + &i.RiverJob.ScheduledAt, + &i.RiverJob.Tags, + &i.RiverJob.UniqueKey, + &i.RiverJob.UniqueStates, + &i.ConflictDiscarded, ); err != nil { return nil, err } diff --git a/riverdriver/riverpgxv5/river_pgx_v5_driver.go b/riverdriver/riverpgxv5/river_pgx_v5_driver.go index 2a239f4d..3107f598 100644 --- a/riverdriver/riverpgxv5/river_pgx_v5_driver.go +++ b/riverdriver/riverpgxv5/river_pgx_v5_driver.go @@ -408,15 +408,21 @@ func (e *Executor) JobRetry(ctx context.Context, id int64) (*rivertype.JobRow, e return jobRowFromInternal(job) } -func (e *Executor) JobSchedule(ctx context.Context, params *riverdriver.JobScheduleParams) ([]*rivertype.JobRow, error) { - jobs, err := dbsqlc.New().JobSchedule(ctx, e.dbtx, &dbsqlc.JobScheduleParams{ +func (e *Executor) JobSchedule(ctx context.Context, params *riverdriver.JobScheduleParams) ([]*riverdriver.JobScheduleResult, error) { + scheduleResults, err := dbsqlc.New().JobSchedule(ctx, e.dbtx, &dbsqlc.JobScheduleParams{ Max: int64(params.Max), Now: params.Now, }) if err != nil { return nil, interpretError(err) } - return mapSliceError(jobs, jobRowFromInternal) + return mapSliceError(scheduleResults, func(result *dbsqlc.JobScheduleRow) (*riverdriver.JobScheduleResult, error) { + job, err := jobRowFromInternal(&result.RiverJob) + if err != nil { + return nil, err + } + return &riverdriver.JobScheduleResult{ConflictDiscarded: result.ConflictDiscarded, Job: *job}, nil + }) } func (e *Executor) JobSetCompleteIfRunningMany(ctx context.Context, params *riverdriver.JobSetCompleteIfRunningManyParams) ([]*rivertype.JobRow, error) {