From 95d5825bc88028fe9e2f0e9bc4eebc43e7b05b41 Mon Sep 17 00:00:00 2001 From: Blake Gentry Date: Mon, 23 Sep 2024 16:56:31 -0500 Subject: [PATCH] remove deprecated advisory lock uniqueness, consolidate insert logic This removes the original unique jobs implementation in its entirety. It was already deprecated in the previous release. All known use cases are better supported with the new unique jobs implementation which is also dramatically faster and supports batch insertion. As part of this change, single insertions now inherit the behavior of batch insertions as far as always setting a `scheduled_at` time in the job args prior to hitting the database. This is due to the difficulty of trying to pass an array of nullable timestamps for `scheduled_at` to the database using sqlc. One side effect of this is that some tests needed to be updated because they run in a transaction, which locks in a particular `now()` time used in `JobGetAvailble` by default. Jobs inserted _after_ the start of that transaction would pick up a scheduled timestamp from Go code that is _later_ than the database transaction's timestamp, and so those jobs would never run. This was fixed in some cases by allowing `now` to be overridden by lower level callers of that query, whereas other tests were updated to simply insert jobs with a past `scheduled_at` to ensure their visibility. --- client.go | 121 ++++------- client_test.go | 194 ++++-------------- driver_test.go | 26 +-- insert_opts.go | 49 ++--- insert_opts_test.go | 57 +++-- internal/dbunique/db_unique.go | 163 --------------- internal/maintenance/periodic_job_enqueuer.go | 77 ++----- .../maintenance/periodic_job_enqueuer_test.go | 63 +----- internal/maintenance/queue_maintainer_test.go | 5 +- .../riverdrivertest/riverdrivertest.go | 158 +++----------- job_executor_test.go | 14 +- job_test.go | 2 +- periodic_job.go | 7 +- periodic_job_test.go | 6 +- producer_test.go | 45 ++-- riverdriver/river_driver_interface.go | 2 +- .../internal/dbsqlc/river_job.sql.go | 141 ++----------- .../river_database_sql_driver.go | 30 +-- .../riverpgxv5/internal/dbsqlc/river_job.sql | 62 +----- .../internal/dbsqlc/river_job.sql.go | 141 ++----------- riverdriver/riverpgxv5/river_pgx_v5_driver.go | 30 +-- 21 files changed, 280 insertions(+), 1113 deletions(-) diff --git a/client.go b/client.go index 7b1f5e02..4fd26aee 100644 --- a/client.go +++ b/client.go @@ -346,7 +346,6 @@ type Client[TTx any] struct { stopped <-chan struct{} subscriptionManager *subscriptionManager testSignals clientTestSignals - uniqueInserter *dbunique.UniqueInserter // deprecated fallback path for unique job insertion // workCancel cancels the context used for all work goroutines. Normal Stop // does not cancel that context. @@ -495,10 +494,7 @@ func NewClient[TTx any](driver riverdriver.Driver[TTx], config *Config) (*Client driver: driver, producersByQueueName: make(map[string]*producer), testSignals: clientTestSignals{}, - uniqueInserter: baseservice.Init(archetype, &dbunique.UniqueInserter{ - AdvisoryLockPrefix: config.AdvisoryLockPrefix, - }), - workCancel: func(cause error) {}, // replaced on start, but here in case StopAndCancel is called before start up + workCancel: func(cause error) {}, // replaced on start, but here in case StopAndCancel is called before start up } client.queues = &QueueBundle{addProducer: client.addProducer} @@ -1162,10 +1158,10 @@ func (c *Client[TTx]) ID() string { return c.config.ID } -func insertParamsFromConfigArgsAndOptions(archetype *baseservice.Archetype, config *Config, args JobArgs, insertOpts *InsertOpts, bulk bool) (*riverdriver.JobInsertFastParams, *dbunique.UniqueOpts, error) { +func insertParamsFromConfigArgsAndOptions(archetype *baseservice.Archetype, config *Config, args JobArgs, insertOpts *InsertOpts) (*riverdriver.JobInsertFastParams, error) { encodedArgs, err := json.Marshal(args) if err != nil { - return nil, nil, fmt.Errorf("error marshaling args to JSON: %w", err) + return nil, fmt.Errorf("error marshaling args to JSON: %w", err) } if insertOpts == nil { @@ -1187,7 +1183,7 @@ func insertParamsFromConfigArgsAndOptions(archetype *baseservice.Archetype, conf queue := valutil.FirstNonZero(insertOpts.Queue, jobInsertOpts.Queue, rivercommon.QueueDefault) if err := validateQueueName(queue); err != nil { - return nil, nil, err + return nil, err } tags := insertOpts.Tags @@ -1199,16 +1195,16 @@ func insertParamsFromConfigArgsAndOptions(archetype *baseservice.Archetype, conf } else { for _, tag := range tags { if len(tag) > 255 { - return nil, nil, errors.New("tags should be a maximum of 255 characters long") + return nil, errors.New("tags should be a maximum of 255 characters long") } if !tagRE.MatchString(tag) { - return nil, nil, errors.New("tags should match regex " + tagRE.String()) + return nil, errors.New("tags should match regex " + tagRE.String()) } } } if priority > 4 { - return nil, nil, errors.New("priority must be between 1 and 4") + return nil, errors.New("priority must be between 1 and 4") } uniqueOpts := insertOpts.UniqueOpts @@ -1216,7 +1212,7 @@ func insertParamsFromConfigArgsAndOptions(archetype *baseservice.Archetype, conf uniqueOpts = jobInsertOpts.UniqueOpts } if err := uniqueOpts.validate(); err != nil { - return nil, nil, err + return nil, err } metadata := insertOpts.Metadata @@ -1236,21 +1232,13 @@ func insertParamsFromConfigArgsAndOptions(archetype *baseservice.Archetype, conf State: rivertype.JobStateAvailable, Tags: tags, } - var returnUniqueOpts *dbunique.UniqueOpts if !uniqueOpts.isEmpty() { - if uniqueOpts.isV1() { - if bulk { - return nil, nil, errors.New("bulk inserts do not support advisory lock uniqueness and cannot remove required states") - } - returnUniqueOpts = (*dbunique.UniqueOpts)(&uniqueOpts) - } else { - internalUniqueOpts := (*dbunique.UniqueOpts)(&uniqueOpts) - insertParams.UniqueKey, err = dbunique.UniqueKey(archetype.Time, internalUniqueOpts, insertParams) - if err != nil { - return nil, nil, err - } - insertParams.UniqueStates = internalUniqueOpts.StateBitmask() + internalUniqueOpts := (*dbunique.UniqueOpts)(&uniqueOpts) + insertParams.UniqueKey, err = dbunique.UniqueKey(archetype.Time, internalUniqueOpts, insertParams) + if err != nil { + return nil, err } + insertParams.UniqueStates = internalUniqueOpts.StateBitmask() } switch { @@ -1270,7 +1258,7 @@ func insertParamsFromConfigArgsAndOptions(archetype *baseservice.Archetype, conf insertParams.State = rivertype.JobStatePending } - return insertParams, returnUniqueOpts, nil + return insertParams, nil } var errNoDriverDBPool = errors.New("driver must have non-nil database pool to use non-transactional methods like Insert and InsertMany (try InsertTx or InsertManyTx instead") @@ -1290,7 +1278,21 @@ func (c *Client[TTx]) Insert(ctx context.Context, args JobArgs, opts *InsertOpts return nil, errNoDriverDBPool } - return c.insert(ctx, c.driver.GetExecutor(), args, opts, false) + tx, err := c.driver.GetExecutor().Begin(ctx) + if err != nil { + return nil, err + } + defer tx.Rollback(ctx) + + inserted, err := c.insert(ctx, tx, args, opts) + if err != nil { + return nil, err + } + + if err := tx.Commit(ctx); err != nil { + return nil, err + } + return inserted, nil } // InsertTx inserts a new job with the provided args on the given transaction. @@ -1311,52 +1313,17 @@ func (c *Client[TTx]) Insert(ctx context.Context, args JobArgs, opts *InsertOpts // transactions, the job will not be worked until the transaction has committed, // and if the transaction rolls back, so too is the inserted job. func (c *Client[TTx]) InsertTx(ctx context.Context, tx TTx, args JobArgs, opts *InsertOpts) (*rivertype.JobInsertResult, error) { - return c.insert(ctx, c.driver.UnwrapExecutor(tx), args, opts, false) + return c.insert(ctx, c.driver.UnwrapExecutor(tx), args, opts) } -func (c *Client[TTx]) insert(ctx context.Context, exec riverdriver.Executor, args JobArgs, opts *InsertOpts, bulk bool) (*rivertype.JobInsertResult, error) { - if err := c.validateJobArgs(args); err != nil { - return nil, err - } - - params, uniqueOpts, err := insertParamsFromConfigArgsAndOptions(&c.baseService.Archetype, c.config, args, opts, bulk) +func (c *Client[TTx]) insert(ctx context.Context, tx riverdriver.ExecutorTx, args JobArgs, opts *InsertOpts) (*rivertype.JobInsertResult, error) { + params := []InsertManyParams{{Args: args, InsertOpts: opts}} + results, err := c.insertMany(ctx, tx, params) if err != nil { return nil, err } - tx, err := exec.Begin(ctx) - if err != nil { - return nil, err - } - defer tx.Rollback(ctx) - - // TODO: consolidate insertion paths for single + multi, remove deprecated uniqueness design - var jobInsertRes *riverdriver.JobInsertFastResult - if uniqueOpts == nil { - jobInsertRes, err = tx.JobInsertFast(ctx, params) - if err != nil { - return nil, err - } - } else { - if bulk { - return nil, errors.New("bulk inserts do not support advisory lock uniqueness") - } - // Old deprecated advisory lock route - c.baseService.Logger.WarnContext(ctx, "Using deprecated advisory lock uniqueness for job insert") - jobInsertRes, err = c.uniqueInserter.JobInsert(ctx, tx, params, uniqueOpts) - if err != nil { - return nil, err - } - } - - if err := c.maybeNotifyInsert(ctx, tx, params.State, params.Queue); err != nil { - return nil, err - } - if err := tx.Commit(ctx); err != nil { - return nil, err - } - - return (*rivertype.JobInsertResult)(jobInsertRes), nil + return results[0], nil } // InsertManyParams encapsulates a single job combined with insert options for @@ -1455,13 +1422,10 @@ func (c *Client[TTx]) insertMany(ctx context.Context, tx riverdriver.ExecutorTx, }) } -// The shared code path for all InsertMany methods. It takes a function that -// executes the actual insert operation and allows for different implementations -// of the insert query to be passed in, each mapping their results back to a -// common result type. -// -// TODO(bgentry): this isn't yet used for the single insert path. The only thing -// blocking that is the removal of advisory lock unique inserts. +// The shared code path for all Insert and InsertMany methods. It takes a +// function that executes the actual insert operation and allows for different +// implementations of the insert query to be passed in, each mapping their +// results back to a common result type. func (c *Client[TTx]) insertManyShared( ctx context.Context, tx riverdriver.ExecutorTx, @@ -1503,7 +1467,7 @@ func (c *Client[TTx]) insertManyParams(params []InsertManyParams) ([]*riverdrive return nil, err } - insertParamsItem, _, err := insertParamsFromConfigArgsAndOptions(&c.baseService.Archetype, c.config, param.Args, param.InsertOpts, true) + insertParamsItem, err := insertParamsFromConfigArgsAndOptions(&c.baseService.Archetype, c.config, param.Args, param.InsertOpts) if err != nil { return nil, err } @@ -1598,13 +1562,6 @@ func (c *Client[TTx]) insertManyFast(ctx context.Context, tx riverdriver.Executo return len(results), nil } -func (c *Client[TTx]) maybeNotifyInsert(ctx context.Context, tx riverdriver.ExecutorTx, state rivertype.JobState, queue string) error { - if state != rivertype.JobStateAvailable { - return nil - } - return c.maybeNotifyInsertForQueues(ctx, tx, []string{queue}) -} - // Notify the given queues that new jobs are available. The queues list will be // deduplicated and each will be checked to see if it is due for an insert // notification from this client. diff --git a/client_test.go b/client_test.go index b7b33d9c..8ee79f01 100644 --- a/client_test.go +++ b/client_test.go @@ -1815,7 +1815,7 @@ func Test_Client_InsertManyFast(t *testing.T) { require.NoError(t, err) }) - t.Run("ErrorsOnInsertOptsWithV1UniqueOpts", func(t *testing.T) { + t.Run("ErrorsOnInsertOptsWithoutRequiredUniqueStates", func(t *testing.T) { t.Parallel() client, _ := setup(t) @@ -1827,7 +1827,7 @@ func Test_Client_InsertManyFast(t *testing.T) { ByState: []rivertype.JobState{rivertype.JobStateAvailable}, }}}, }) - require.EqualError(t, err, "bulk inserts do not support advisory lock uniqueness and cannot remove required states") + require.EqualError(t, err, "UniqueOpts.ByState must contain all required states, missing: pending, running, scheduled") require.Equal(t, 0, count) }) } @@ -1982,7 +1982,7 @@ func Test_Client_InsertManyFastTx(t *testing.T) { ByState: []rivertype.JobState{rivertype.JobStateAvailable}, }}}, }) - require.EqualError(t, err, "bulk inserts do not support advisory lock uniqueness and cannot remove required states") + require.EqualError(t, err, "UniqueOpts.ByState must contain all required states, missing: pending, running, scheduled") require.Equal(t, 0, count) }) } @@ -2248,7 +2248,7 @@ func Test_Client_InsertMany(t *testing.T) { ByState: []rivertype.JobState{rivertype.JobStateAvailable}, }}}, }) - require.EqualError(t, err, "bulk inserts do not support advisory lock uniqueness and cannot remove required states") + require.EqualError(t, err, "UniqueOpts.ByState must contain all required states, missing: pending, running, scheduled") require.Empty(t, results) }) } @@ -2463,7 +2463,7 @@ func Test_Client_InsertManyTx(t *testing.T) { ByState: []rivertype.JobState{rivertype.JobStateAvailable}, }}}, }) - require.EqualError(t, err, "bulk inserts do not support advisory lock uniqueness and cannot remove required states") + require.EqualError(t, err, "UniqueOpts.ByState must contain all required states, missing: pending, running, scheduled") require.Empty(t, results) }) } @@ -2966,9 +2966,9 @@ func Test_Client_ErrorHandler(t *testing.T) { // Bypass the normal Insert function because that will error on an // unknown job. - insertParams, _, err := insertParamsFromConfigArgsAndOptions(&client.baseService.Archetype, config, unregisteredJobArgs{}, nil, false) + insertParams, err := insertParamsFromConfigArgsAndOptions(&client.baseService.Archetype, config, unregisteredJobArgs{}, nil) require.NoError(t, err) - _, err = client.driver.GetExecutor().JobInsertFast(ctx, insertParams) + _, err = client.driver.GetExecutor().JobInsertFastMany(ctx, []*riverdriver.JobInsertFastParams{insertParams}) require.NoError(t, err) riversharedtest.WaitOrTimeout(t, bundle.SubscribeChan) @@ -4322,65 +4322,6 @@ func Test_Client_InsertNotificationsAreDeduplicatedAndDebounced(t *testing.T) { expectImmediateNotification(t, "queue1") } -func Test_Client_CanInsertAndWorkV1UniqueJob(t *testing.T) { - t.Parallel() - - ctx := context.Background() - dbPool := riverinternaltest.TestDB(ctx, t) - startedChan := make(chan int64) - waitChan := make(chan struct{}) - config := newTestConfig(t, func(ctx context.Context, job *Job[callbackArgs]) error { - close(startedChan) - <-waitChan - return nil - }) - client := newTestClient(t, dbPool, config) - - startClient(ctx, t, client) - riversharedtest.WaitOrTimeout(t, client.baseStartStop.Started()) - - // Insert a job, wait for it to start, then insert the same job again. The - // second insert should be ignored because the job is already running. - insertRes1, err := client.Insert(ctx, callbackArgs{}, &InsertOpts{UniqueOpts: UniqueOpts{ByState: []rivertype.JobState{rivertype.JobStateAvailable, rivertype.JobStateRunning}}}) - require.NoError(t, err) - require.False(t, insertRes1.UniqueSkippedAsDuplicate) - - riversharedtest.WaitOrTimeout(t, startedChan) - - insertRes2, err := client.Insert(ctx, callbackArgs{}, &InsertOpts{UniqueOpts: UniqueOpts{ByState: []rivertype.JobState{rivertype.JobStateAvailable, rivertype.JobStateRunning}}}) - require.NoError(t, err) - require.True(t, insertRes2.UniqueSkippedAsDuplicate) - require.Equal(t, insertRes1.Job.ID, insertRes2.Job.ID) - - close(waitChan) -} - -func Test_Client_CanWorkPreexistingV2UniqueJob(t *testing.T) { - t.Parallel() - - ctx := context.Background() - dbPool := riverinternaltest.TestDB(ctx, t) - config := newTestConfig(t, nil) - client := newTestClient(t, dbPool, config) - - subscribeChan, cancel := client.Subscribe(EventKindJobCompleted) - t.Cleanup(cancel) - - startClient(ctx, t, client) - riversharedtest.WaitOrTimeout(t, client.baseStartStop.Started()) - - job := testfactory.Job(ctx, t, client.driver.GetExecutor(), &testfactory.JobOpts{ - Kind: ptrutil.Ptr((&noOpArgs{}).Kind()), - State: ptrutil.Ptr(rivertype.JobStateAvailable), - UniqueKey: []byte("v2_unique_key"), - UniqueStates: 0, // will be inserted as null - }) - - jobWorked := riversharedtest.WaitOrTimeout(t, subscribeChan) - require.Equal(t, job.ID, jobWorked.Job.ID) - require.Equal(t, rivertype.JobStateCompleted, jobWorked.Job.State) -} - func Test_Client_JobCompletion(t *testing.T) { t.Parallel() @@ -4627,11 +4568,13 @@ func Test_Client_UnknownJobKindErrorsTheJob(t *testing.T) { subscribeChan, cancel := client.Subscribe(EventKindJobFailed) t.Cleanup(cancel) - insertParams, _, err := insertParamsFromConfigArgsAndOptions(&client.baseService.Archetype, config, unregisteredJobArgs{}, nil, false) + insertParams, err := insertParamsFromConfigArgsAndOptions(&client.baseService.Archetype, config, unregisteredJobArgs{}, nil) require.NoError(err) - insertedResult, err := client.driver.GetExecutor().JobInsertFast(ctx, insertParams) + insertedResults, err := client.driver.GetExecutor().JobInsertFastMany(ctx, []*riverdriver.JobInsertFastParams{insertParams}) require.NoError(err) + insertedResult := insertedResults[0] + event := riversharedtest.WaitOrTimeout(t, subscribeChan) require.Equal(insertedResult.Job.ID, event.Job.ID) require.Equal("RandomWorkerNameThatIsNeverRegistered", insertedResult.Job.Kind) @@ -4761,7 +4704,7 @@ func Test_NewClient_Defaults(t *testing.T) { }) require.NoError(t, err) - require.Zero(t, client.uniqueInserter.AdvisoryLockPrefix) + require.Zero(t, client.config.AdvisoryLockPrefix) jobCleaner := maintenance.GetService[*maintenance.JobCleaner](client.queueMaintainer) require.Equal(t, maintenance.CancelledJobRetentionPeriodDefault, jobCleaner.Config.CancelledJobRetentionPeriod) @@ -4815,7 +4758,7 @@ func Test_NewClient_Overrides(t *testing.T) { }) require.NoError(t, err) - require.Equal(t, int32(123_456), client.uniqueInserter.AdvisoryLockPrefix) + require.Equal(t, int32(123_456), client.config.AdvisoryLockPrefix) jobCleaner := maintenance.GetService[*maintenance.JobCleaner](client.queueMaintainer) require.Equal(t, 1*time.Hour, jobCleaner.Config.CancelledJobRetentionPeriod) @@ -5237,7 +5180,7 @@ func TestInsertParamsFromJobArgsAndOptions(t *testing.T) { t.Run("Defaults", func(t *testing.T) { t.Parallel() - insertParams, uniqueOpts, err := insertParamsFromConfigArgsAndOptions(archetype, config, noOpArgs{}, nil, false) + insertParams, err := insertParamsFromConfigArgsAndOptions(archetype, config, noOpArgs{}, nil) require.NoError(t, err) require.Equal(t, `{"name":""}`, string(insertParams.EncodedArgs)) require.Equal(t, (noOpArgs{}).Kind(), insertParams.Kind) @@ -5246,8 +5189,6 @@ func TestInsertParamsFromJobArgsAndOptions(t *testing.T) { require.Equal(t, QueueDefault, insertParams.Queue) require.Nil(t, insertParams.ScheduledAt) require.Equal(t, []string{}, insertParams.Tags) - - require.Nil(t, uniqueOpts) require.Empty(t, insertParams.UniqueKey) require.Zero(t, insertParams.UniqueStates) }) @@ -5259,7 +5200,7 @@ func TestInsertParamsFromJobArgsAndOptions(t *testing.T) { MaxAttempts: 34, } - insertParams, _, err := insertParamsFromConfigArgsAndOptions(archetype, overrideConfig, noOpArgs{}, nil, false) + insertParams, err := insertParamsFromConfigArgsAndOptions(archetype, overrideConfig, noOpArgs{}, nil) require.NoError(t, err) require.Equal(t, overrideConfig.MaxAttempts, insertParams.MaxAttempts) }) @@ -5274,7 +5215,7 @@ func TestInsertParamsFromJobArgsAndOptions(t *testing.T) { ScheduledAt: time.Now().Add(time.Hour), Tags: []string{"tag1", "tag2"}, } - insertParams, _, err := insertParamsFromConfigArgsAndOptions(archetype, config, noOpArgs{}, opts, false) + insertParams, err := insertParamsFromConfigArgsAndOptions(archetype, config, noOpArgs{}, opts) require.NoError(t, err) require.Equal(t, 42, insertParams.MaxAttempts) require.Equal(t, 2, insertParams.Priority) @@ -5288,9 +5229,9 @@ func TestInsertParamsFromJobArgsAndOptions(t *testing.T) { nearFuture := time.Now().Add(5 * time.Minute) - insertParams, _, err := insertParamsFromConfigArgsAndOptions(archetype, config, &customInsertOptsJobArgs{ + insertParams, err := insertParamsFromConfigArgsAndOptions(archetype, config, &customInsertOptsJobArgs{ ScheduledAt: nearFuture, - }, nil, false) + }, nil) require.NoError(t, err) // All these come from overrides in customInsertOptsJobArgs's definition: require.Equal(t, 42, insertParams.MaxAttempts) @@ -5304,9 +5245,9 @@ func TestInsertParamsFromJobArgsAndOptions(t *testing.T) { t.Run("WorkerInsertOptsScheduledAtNotRespectedIfZero", func(t *testing.T) { t.Parallel() - insertParams, _, err := insertParamsFromConfigArgsAndOptions(archetype, config, &customInsertOptsJobArgs{ + insertParams, err := insertParamsFromConfigArgsAndOptions(archetype, config, &customInsertOptsJobArgs{ ScheduledAt: time.Time{}, - }, nil, false) + }, nil) require.NoError(t, err) require.Nil(t, insertParams.ScheduledAt) }) @@ -5315,16 +5256,16 @@ func TestInsertParamsFromJobArgsAndOptions(t *testing.T) { t.Parallel() { - _, _, err := insertParamsFromConfigArgsAndOptions(archetype, config, &customInsertOptsJobArgs{}, &InsertOpts{ + _, err := insertParamsFromConfigArgsAndOptions(archetype, config, &customInsertOptsJobArgs{}, &InsertOpts{ Tags: []string{strings.Repeat("h", 256)}, - }, false) + }) require.EqualError(t, err, "tags should be a maximum of 255 characters long") } { - _, _, err := insertParamsFromConfigArgsAndOptions(archetype, config, &customInsertOptsJobArgs{}, &InsertOpts{ + _, err := insertParamsFromConfigArgsAndOptions(archetype, config, &customInsertOptsJobArgs{}, &InsertOpts{ Tags: []string{"tag,with,comma"}, - }, false) + }) require.EqualError(t, err, "tags should match regex "+tagRE.String()) } }) @@ -5342,9 +5283,8 @@ func TestInsertParamsFromJobArgsAndOptions(t *testing.T) { ExcludeKind: true, } - params, resultUniqueOpts, err := insertParamsFromConfigArgsAndOptions(archetype, config, noOpArgs{}, &InsertOpts{UniqueOpts: uniqueOpts}, false) + params, err := insertParamsFromConfigArgsAndOptions(archetype, config, noOpArgs{}, &InsertOpts{UniqueOpts: uniqueOpts}) require.NoError(t, err) - require.Nil(t, resultUniqueOpts) internalUniqueOpts := &dbunique.UniqueOpts{ ByArgs: true, ByPeriod: 10 * time.Second, @@ -5380,9 +5320,8 @@ func TestInsertParamsFromJobArgsAndOptions(t *testing.T) { ByState: states, } - params, resultUniqueOpts, err := insertParamsFromConfigArgsAndOptions(archetype, config, noOpArgs{}, &InsertOpts{UniqueOpts: uniqueOpts}, true) + params, err := insertParamsFromConfigArgsAndOptions(archetype, config, noOpArgs{}, &InsertOpts{UniqueOpts: uniqueOpts}) require.NoError(t, err) - require.Nil(t, resultUniqueOpts) internalUniqueOpts := &dbunique.UniqueOpts{ ByPeriod: 10 * time.Second, ByQueue: true, @@ -5396,41 +5335,6 @@ func TestInsertParamsFromJobArgsAndOptions(t *testing.T) { require.Equal(t, internalUniqueOpts.StateBitmask(), params.UniqueStates) }) - t.Run("UniqueOptsV1", func(t *testing.T) { - t.Parallel() - - archetype := riversharedtest.BaseServiceArchetype(t) - archetype.Time.StubNowUTC(time.Now().UTC()) - - uniqueOpts := UniqueOpts{ - ByArgs: true, - ByPeriod: 10 * time.Second, - ByQueue: true, - // This list of custom states (without pending, scheduled, running, etc.) is only valid for v1 unique opts: - ByState: []rivertype.JobState{rivertype.JobStateAvailable, rivertype.JobStateCompleted}, - } - - params, resultUniqueOpts, err := insertParamsFromConfigArgsAndOptions(archetype, config, noOpArgs{}, &InsertOpts{UniqueOpts: uniqueOpts}, false) - require.NoError(t, err) - require.NotNil(t, resultUniqueOpts) - internalUniqueOpts := &dbunique.UniqueOpts{ - ByArgs: true, - ByPeriod: 10 * time.Second, - ByQueue: true, - ByState: []rivertype.JobState{rivertype.JobStateAvailable, rivertype.JobStateCompleted}, - } - require.Equal(t, internalUniqueOpts, resultUniqueOpts) - - require.Nil(t, params.UniqueKey) - require.Zero(t, params.UniqueStates) - - // In a bulk insert, this should be explicitly blocked: - params, resultUniqueOpts, err = insertParamsFromConfigArgsAndOptions(archetype, config, noOpArgs{}, &InsertOpts{UniqueOpts: uniqueOpts}, true) - require.ErrorContains(t, err, "bulk inserts do not support advisory lock uniqueness and cannot remove required states") - require.Nil(t, params) - require.Nil(t, resultUniqueOpts) - }) - t.Run("UniqueOptsWithPartialArgs", func(t *testing.T) { t.Parallel() @@ -5448,9 +5352,8 @@ func TestInsertParamsFromJobArgsAndOptions(t *testing.T) { Excluded: true, } - params, resultUniqueOpts, err := insertParamsFromConfigArgsAndOptions(archetype, config, args, &InsertOpts{UniqueOpts: uniqueOpts}, true) + params, err := insertParamsFromConfigArgsAndOptions(archetype, config, args, &InsertOpts{UniqueOpts: uniqueOpts}) require.NoError(t, err) - require.Nil(t, resultUniqueOpts) internalUniqueOpts := &dbunique.UniqueOpts{ByArgs: true} expectedKey, err := dbunique.UniqueKey(archetype.Time, internalUniqueOpts, params) @@ -5464,9 +5367,8 @@ func TestInsertParamsFromJobArgsAndOptions(t *testing.T) { Excluded: false, } - params2, resultUniqueOpts2, err := insertParamsFromConfigArgsAndOptions(archetype, config, argsWithExcludedFalse, &InsertOpts{UniqueOpts: uniqueOpts}, true) + params2, err := insertParamsFromConfigArgsAndOptions(archetype, config, argsWithExcludedFalse, &InsertOpts{UniqueOpts: uniqueOpts}) require.NoError(t, err) - require.Nil(t, resultUniqueOpts2) internalUniqueOpts2 := &dbunique.UniqueOpts{ByArgs: true} expectedKey2, err := dbunique.UniqueKey(archetype.Time, internalUniqueOpts2, params2) @@ -5479,7 +5381,7 @@ func TestInsertParamsFromJobArgsAndOptions(t *testing.T) { t.Run("PriorityIsLimitedTo4", func(t *testing.T) { t.Parallel() - insertParams, _, err := insertParamsFromConfigArgsAndOptions(archetype, config, noOpArgs{}, &InsertOpts{Priority: 5}, false) + insertParams, err := insertParamsFromConfigArgsAndOptions(archetype, config, noOpArgs{}, &InsertOpts{Priority: 5}) require.ErrorContains(t, err, "priority must be between 1 and 4") require.Nil(t, insertParams) }) @@ -5488,7 +5390,7 @@ func TestInsertParamsFromJobArgsAndOptions(t *testing.T) { t.Parallel() args := timeoutTestArgs{TimeoutValue: time.Hour} - insertParams, _, err := insertParamsFromConfigArgsAndOptions(archetype, config, args, nil, false) + insertParams, err := insertParamsFromConfigArgsAndOptions(archetype, config, args, nil) require.NoError(t, err) require.Equal(t, `{"timeout_value":3600000000000}`, string(insertParams.EncodedArgs)) }) @@ -5499,19 +5401,15 @@ func TestInsertParamsFromJobArgsAndOptions(t *testing.T) { // Ensure that unique opts are validated. No need to be exhaustive here // since we already have tests elsewhere for that. Just make sure validation // is running. - insertParams, resultUniqueOpts, err := insertParamsFromConfigArgsAndOptions( + insertParams, err := insertParamsFromConfigArgsAndOptions( archetype, config, noOpArgs{}, &InsertOpts{UniqueOpts: UniqueOpts{ByPeriod: 1 * time.Millisecond}}, - false, ) - require.EqualError(t, err, "JobUniqueOpts.ByPeriod should not be less than 1 second") + require.EqualError(t, err, "UniqueOpts.ByPeriod should not be less than 1 second") require.Nil(t, insertParams) - require.Nil(t, resultUniqueOpts) }) - - // TODO NOW NEXT: validate unique opts for v1 unique opts w/ advisory lock and custom states: } func TestID(t *testing.T) { @@ -5761,7 +5659,7 @@ func TestUniqueOpts(t *testing.T) { require.NotEqual(t, insertRes0.Job.ID, insertRes2.Job.ID) }) - t.Run("UniqueV1ByCustomStates", func(t *testing.T) { + t.Run("ErrorsWithUniqueV1CustomStates", func(t *testing.T) { t.Parallel() client, _ := setup(t) @@ -5771,31 +5669,11 @@ func TestUniqueOpts(t *testing.T) { ByState: []rivertype.JobState{rivertype.JobStateAvailable, rivertype.JobStateCompleted}, } - insertRes0, err := client.Insert(ctx, noOpArgs{}, &InsertOpts{ - UniqueOpts: uniqueOpts, - }) - require.NoError(t, err) - - insertRes1, err := client.Insert(ctx, noOpArgs{}, &InsertOpts{ + insertRes, err := client.Insert(ctx, noOpArgs{}, &InsertOpts{ UniqueOpts: uniqueOpts, }) - require.NoError(t, err) - - // Expect the same job to come back because the original is either still - // `available` or `completed`, both which we deduplicate off of. - require.Equal(t, insertRes0.Job.ID, insertRes1.Job.ID) - - insertRes2, err := client.Insert(ctx, noOpArgs{}, &InsertOpts{ - // Use a scheduled time so the job's inserted in state `scheduled` - // instead of `available`. - ScheduledAt: time.Now().Add(1 * time.Hour), - UniqueOpts: uniqueOpts, - }) - require.NoError(t, err) - - // This job however is _not_ the same because it's inserted as - // `scheduled` which is outside the unique constraints. - require.NotEqual(t, insertRes0.Job.ID, insertRes2.Job.ID) + require.EqualError(t, err, "UniqueOpts.ByState must contain all required states, missing: pending, running, scheduled") + require.Nil(t, insertRes) }) } diff --git a/driver_test.go b/driver_test.go index 72a42965..c9e0203a 100644 --- a/driver_test.go +++ b/driver_test.go @@ -98,8 +98,8 @@ func BenchmarkDriverRiverPgxV5_Executor(b *testing.B) { return driver.UnwrapExecutor(tx), &testBundle{} } - makeInsertParams := func() *riverdriver.JobInsertFastParams { - return &riverdriver.JobInsertFastParams{ + makeInsertParams := func() []*riverdriver.JobInsertFastParams { + return []*riverdriver.JobInsertFastParams{{ EncodedArgs: []byte(`{}`), Kind: "fake_job", MaxAttempts: rivercommon.MaxAttemptsDefault, @@ -108,7 +108,7 @@ func BenchmarkDriverRiverPgxV5_Executor(b *testing.B) { Queue: rivercommon.QueueDefault, ScheduledAt: nil, State: rivertype.JobStateAvailable, - } + }} } b.Run("JobInsert_Sequential", func(b *testing.B) { @@ -118,7 +118,7 @@ func BenchmarkDriverRiverPgxV5_Executor(b *testing.B) { exec, _ := setupTx(b) for i := 0; i < b.N; i++ { - if _, err := exec.JobInsertFast(ctx, makeInsertParams()); err != nil { + if _, err := exec.JobInsertFastMany(ctx, makeInsertParams()); err != nil { b.Fatal(err) } } @@ -133,7 +133,7 @@ func BenchmarkDriverRiverPgxV5_Executor(b *testing.B) { b.RunParallel(func(pb *testing.PB) { i := 0 for pb.Next() { - if _, err := exec.JobInsertFast(ctx, makeInsertParams()); err != nil { + if _, err := exec.JobInsertFastMany(ctx, makeInsertParams()); err != nil { b.Fatal(err) } i++ @@ -148,7 +148,7 @@ func BenchmarkDriverRiverPgxV5_Executor(b *testing.B) { exec, _ := setupTx(b) for i := 0; i < b.N*100; i++ { - if _, err := exec.JobInsertFast(ctx, makeInsertParams()); err != nil { + if _, err := exec.JobInsertFastMany(ctx, makeInsertParams()); err != nil { b.Fatal(err) } } @@ -173,7 +173,7 @@ func BenchmarkDriverRiverPgxV5_Executor(b *testing.B) { exec, _ := setupPool(b) for i := 0; i < b.N*100*runtime.NumCPU(); i++ { - if _, err := exec.JobInsertFast(ctx, makeInsertParams()); err != nil { + if _, err := exec.JobInsertFastMany(ctx, makeInsertParams()); err != nil { b.Fatal(err) } } @@ -218,27 +218,27 @@ func BenchmarkDriverRiverPgxV5Insert(b *testing.B) { return driver, bundle } - b.Run("InsertFast", func(b *testing.B) { + b.Run("InsertFastMany", func(b *testing.B) { _, bundle := setup(b) for n := 0; n < b.N; n++ { - _, err := bundle.exec.JobInsertFast(ctx, &riverdriver.JobInsertFastParams{ + _, err := bundle.exec.JobInsertFastMany(ctx, []*riverdriver.JobInsertFastParams{{ EncodedArgs: []byte(`{"encoded": "args"}`), Kind: "test_kind", MaxAttempts: rivercommon.MaxAttemptsDefault, Priority: rivercommon.PriorityDefault, Queue: rivercommon.QueueDefault, State: rivertype.JobStateAvailable, - }) + }}) require.NoError(b, err) } }) - b.Run("InsertFast_WithUnique", func(b *testing.B) { + b.Run("InsertFastMany_WithUnique", func(b *testing.B) { _, bundle := setup(b) for i := 0; i < b.N; i++ { - _, err := bundle.exec.JobInsertFast(ctx, &riverdriver.JobInsertFastParams{ + _, err := bundle.exec.JobInsertFastMany(ctx, []*riverdriver.JobInsertFastParams{{ EncodedArgs: []byte(`{"encoded": "args"}`), Kind: "test_kind", MaxAttempts: rivercommon.MaxAttemptsDefault, @@ -247,7 +247,7 @@ func BenchmarkDriverRiverPgxV5Insert(b *testing.B) { State: rivertype.JobStateAvailable, UniqueKey: []byte("test_unique_key_" + strconv.Itoa(i)), UniqueStates: 0xFB, - }) + }}) require.NoError(b, err) } }) diff --git a/insert_opts.go b/insert_opts.go index ffd1399c..caf73df4 100644 --- a/insert_opts.go +++ b/insert_opts.go @@ -5,6 +5,7 @@ import ( "fmt" "regexp" "slices" + "strings" "time" "github.com/riverqueue/river/rivertype" @@ -170,8 +171,8 @@ type UniqueOpts struct { // isEmpty returns true for an empty, uninitialized options struct. // -// This is required because we can't check against `JobUniqueOpts{}` because -// slices aren't comparable. Unfortunately it makes things a little more brittle +// This is required because we can't check against `UniqueOpts{}` because slices +// aren't comparable. Unfortunately it makes things a little more brittle // comparatively because any new options must also be considered here for things // to work. func (o *UniqueOpts) isEmpty() bool { @@ -181,25 +182,6 @@ func (o *UniqueOpts) isEmpty() bool { o.ByState == nil } -func (o *UniqueOpts) isV1() bool { - requiredV3states := []rivertype.JobState{ - rivertype.JobStatePending, - rivertype.JobStateScheduled, - rivertype.JobStateAvailable, - rivertype.JobStateRunning, - } - if len(o.ByState) == 0 { - return false - } - - for _, state := range requiredV3states { - if !slices.Contains(o.ByState, state) { - return true - } - } - return false -} - var jobStateAll = rivertype.JobStates() //nolint:gochecknoglobals func (o *UniqueOpts) validate() error { @@ -208,7 +190,7 @@ func (o *UniqueOpts) validate() error { } if o.ByPeriod != time.Duration(0) && o.ByPeriod < 1*time.Second { - return errors.New("JobUniqueOpts.ByPeriod should not be less than 1 second") + return errors.New("UniqueOpts.ByPeriod should not be less than 1 second") } // Job states are typed, but since the underlying type is a string, users @@ -218,9 +200,30 @@ func (o *UniqueOpts) validate() error { // difference for tiny slice sizes is negligible, and map lookup might // even be slower. if !slices.Contains(jobStateAll, state) { - return fmt.Errorf("JobUniqueOpts.ByState contains invalid state %q", state) + return fmt.Errorf("UniqueOpts.ByState contains invalid state %q", state) } } + // Skip required states validation if no custom states were provided. + if len(o.ByState) == 0 { + return nil + } + + requiredV3states := []rivertype.JobState{ + rivertype.JobStateAvailable, + rivertype.JobStatePending, + rivertype.JobStateRunning, + rivertype.JobStateScheduled, + } + missingStates := []string{} + for _, state := range requiredV3states { + if !slices.Contains(o.ByState, state) { + missingStates = append(missingStates, string(state)) + } + } + if len(missingStates) > 0 { + return fmt.Errorf("UniqueOpts.ByState must contain all required states, missing: %s", strings.Join(missingStates, ", ")) + } + return nil } diff --git a/insert_opts_test.go b/insert_opts_test.go index e41a5642..d15a2223 100644 --- a/insert_opts_test.go +++ b/insert_opts_test.go @@ -32,7 +32,7 @@ func TestTagRE(t *testing.T) { require.NotRegexp(t, tagRE, "commas,never,allowed") } -func TestJobUniqueOpts_validate(t *testing.T) { +func TestUniqueOpts_validate(t *testing.T) { t.Parallel() require.NoError(t, (&UniqueOpts{}).validate()) @@ -40,42 +40,39 @@ func TestJobUniqueOpts_validate(t *testing.T) { ByArgs: true, ByPeriod: 1 * time.Second, ByQueue: true, - ByState: []rivertype.JobState{rivertype.JobStateAvailable}, }).validate()) - require.EqualError(t, (&UniqueOpts{ByPeriod: 1 * time.Millisecond}).validate(), "JobUniqueOpts.ByPeriod should not be less than 1 second") - require.EqualError(t, (&UniqueOpts{ByState: []rivertype.JobState{rivertype.JobState("invalid")}}).validate(), `JobUniqueOpts.ByState contains invalid state "invalid"`) -} + require.EqualError(t, (&UniqueOpts{ByPeriod: 1 * time.Millisecond}).validate(), "UniqueOpts.ByPeriod should not be less than 1 second") + require.EqualError(t, (&UniqueOpts{ByState: []rivertype.JobState{rivertype.JobState("invalid")}}).validate(), `UniqueOpts.ByState contains invalid state "invalid"`) -func TestJobUniqueOpts_isV1(t *testing.T) { - t.Parallel() + requiredStates := []rivertype.JobState{ + rivertype.JobStateAvailable, + rivertype.JobStatePending, + rivertype.JobStateRunning, + rivertype.JobStateScheduled, + } - // Test when ByState is empty - require.False(t, (&UniqueOpts{}).isV1()) + for _, state := range requiredStates { + // Test with each state individually removed from requiredStates to ensure + // it's validated. - // Test when ByState contains none of the required V3 states - require.True(t, (&UniqueOpts{ByState: []rivertype.JobState{rivertype.JobStateCompleted}}).isV1()) + // Create a copy of requiredStates without the current state + var testStates []rivertype.JobState + for _, s := range requiredStates { + if s != state { + testStates = append(testStates, s) + } + } - // Test when ByState contains some but not all required V3 states - require.True(t, (&UniqueOpts{ByState: []rivertype.JobState{rivertype.JobStatePending}}).isV1()) - require.True(t, (&UniqueOpts{ByState: []rivertype.JobState{rivertype.JobStateScheduled}}).isV1()) - require.True(t, (&UniqueOpts{ByState: []rivertype.JobState{rivertype.JobStateAvailable}}).isV1()) - require.True(t, (&UniqueOpts{ByState: []rivertype.JobState{rivertype.JobStateRunning}}).isV1()) + // Test validation + require.EqualError(t, (&UniqueOpts{ByState: testStates}).validate(), "UniqueOpts.ByState must contain all required states, missing: "+string(state)) + } - // Test when ByState contains all required V3 states - require.False(t, (&UniqueOpts{ByState: []rivertype.JobState{ - rivertype.JobStatePending, - rivertype.JobStateScheduled, + // test with more than one required state missing: + require.EqualError(t, (&UniqueOpts{ByState: []rivertype.JobState{ rivertype.JobStateAvailable, - rivertype.JobStateRunning, - }}).isV1()) - - // Test when ByState contains more than the required V3 states - require.False(t, (&UniqueOpts{ByState: []rivertype.JobState{ - rivertype.JobStatePending, rivertype.JobStateScheduled, - rivertype.JobStateAvailable, - rivertype.JobStateRunning, - rivertype.JobStateCompleted, - }}).isV1()) + }}).validate(), "UniqueOpts.ByState must contain all required states, missing: pending, running") + + require.NoError(t, (&UniqueOpts{ByState: rivertype.JobStates()}).validate()) } diff --git a/internal/dbunique/db_unique.go b/internal/dbunique/db_unique.go index a153149a..60256acf 100644 --- a/internal/dbunique/db_unique.go +++ b/internal/dbunique/db_unique.go @@ -1,10 +1,7 @@ package dbunique import ( - "context" "crypto/sha256" - "errors" - "fmt" "slices" "strings" "time" @@ -12,7 +9,6 @@ import ( "github.com/tidwall/gjson" "github.com/tidwall/sjson" - "github.com/riverqueue/river/internal/util/hashutil" "github.com/riverqueue/river/riverdriver" "github.com/riverqueue/river/rivershared/baseservice" "github.com/riverqueue/river/rivershared/util/sliceutil" @@ -33,8 +29,6 @@ var defaultUniqueStates = []rivertype.JobState{ //nolint:gochecknoglobals rivertype.JobStateScheduled, } -var defaultUniqueStatesStrings = sliceutil.Map(defaultUniqueStates, func(s rivertype.JobState) string { return string(s) }) //nolint:gochecknoglobals - var jobStateBitPositions = map[rivertype.JobState]uint{ //nolint:gochecknoglobals rivertype.JobStateAvailable: 7, rivertype.JobStateCancelled: 6, @@ -180,160 +174,3 @@ func UniqueBitmaskToStates(mask byte) []rivertype.JobState { slices.Sort(states) return states } - -type UniqueInserter struct { - baseservice.BaseService - AdvisoryLockPrefix int32 -} - -func (i *UniqueInserter) JobInsert(ctx context.Context, exec riverdriver.Executor, params *riverdriver.JobInsertFastParams, uniqueOpts *UniqueOpts) (*riverdriver.JobInsertFastResult, error) { - // This should never get called with no unique options going forward. - if uniqueOpts == nil || uniqueOpts.IsEmpty() { - panic("UniqueInserter.JobInsert called with no unique options") - } - - // Build a unique key for use in an advisory lock prefix: - uniqueKey, doUniqueInsert := i.buildUniqueKey(params, uniqueOpts) - if !doUniqueInsert { - return insertNonUnique(ctx, exec, params) - } - - // Sort so we can more easily compare against default state list. - if uniqueOpts.ByState != nil { - slices.Sort(uniqueOpts.ByState) - } - - // Slow path: open a subtransaction, take an advisory lock, check to see if - // a job with the given criteria exists, then either return an existing row - // or insert a new one. - - advisoryLockHash := hashutil.NewAdvisoryLockHash(i.AdvisoryLockPrefix) - advisoryLockHash.Write([]byte("unique_key")) - advisoryLockHash.Write([]byte("kind=" + params.Kind)) - advisoryLockHash.Write([]byte(uniqueKey)) - - getParams := i.buildGetParams(params, uniqueOpts) - - // Begin a subtransaction - subExec, err := exec.Begin(ctx) - if err != nil { - return nil, err - } - defer subExec.Rollback(ctx) - - // The wrapping transaction should maintain snapshot consistency even if we - // were to only have a SELECT + INSERT, but given that a conflict is - // possible, obtain an advisory lock based on the parameters of the unique - // job first, and have contending inserts wait for it. This is a synchronous - // lock so we rely on context timeout in case something goes wrong and it's - // blocking for too long. - if _, err := subExec.PGAdvisoryXactLock(ctx, advisoryLockHash.Key()); err != nil { - return nil, fmt.Errorf("error acquiring unique lock: %w", err) - } - - existing, err := subExec.JobGetByKindAndUniqueProperties(ctx, getParams) - if err != nil { - if !errors.Is(err, rivertype.ErrNotFound) { - return nil, fmt.Errorf("error getting unique job: %w", err) - } - } - - if existing != nil { - // Insert skipped; returns an existing row. - return &riverdriver.JobInsertFastResult{Job: existing, UniqueSkippedAsDuplicate: true}, nil - } - - result, err := subExec.JobInsertFast(ctx, params) - if err != nil { - return nil, err - } - - if err := subExec.Commit(ctx); err != nil { - return nil, err - } - - return result, nil -} - -// Builds a unique key made up of the unique options in place. The key is hashed -// to become a value for `unique_key` in the fast insertion path, or hashed and -// used for an advisory lock on the slow insertion path. -func (i *UniqueInserter) buildUniqueKey(params *riverdriver.JobInsertFastParams, uniqueOpts *UniqueOpts) (string, bool) { - var sb strings.Builder - - if uniqueOpts.ByArgs { - sb.WriteString("&args=") - sb.Write(params.EncodedArgs) - } - - if uniqueOpts.ByPeriod != time.Duration(0) { - lowerPeriodBound := i.Time.NowUTC().Truncate(uniqueOpts.ByPeriod) - sb.WriteString("&period=" + lowerPeriodBound.Format(time.RFC3339)) - } - - if uniqueOpts.ByQueue { - sb.WriteString("&queue=" + params.Queue) - } - - { - stateSet := defaultUniqueStatesStrings - if len(uniqueOpts.ByState) > 0 { - stateSet = sliceutil.Map(uniqueOpts.ByState, func(s rivertype.JobState) string { return string(s) }) - slices.Sort(stateSet) - } - - sb.WriteString("&state=" + strings.Join(stateSet, ",")) - - if !slices.Contains(stateSet, string(params.State)) { - return "", false - } - } - - return sb.String(), true -} - -// Builds get parameters suitable for looking up a unique job on the slow unique -// insertion path. -func (i *UniqueInserter) buildGetParams(params *riverdriver.JobInsertFastParams, uniqueOpts *UniqueOpts) *riverdriver.JobGetByKindAndUniquePropertiesParams { - getParams := riverdriver.JobGetByKindAndUniquePropertiesParams{ - Kind: params.Kind, - } - - if uniqueOpts.ByArgs { - getParams.Args = params.EncodedArgs - getParams.ByArgs = true - } - - if uniqueOpts.ByPeriod != time.Duration(0) { - lowerPeriodBound := i.Time.NowUTC().Truncate(uniqueOpts.ByPeriod) - - getParams.ByCreatedAt = true - getParams.CreatedAtBegin = lowerPeriodBound - getParams.CreatedAtEnd = lowerPeriodBound.Add(uniqueOpts.ByPeriod) - } - - if uniqueOpts.ByQueue { - getParams.ByQueue = true - getParams.Queue = params.Queue - } - - { - stateSet := defaultUniqueStatesStrings - if len(uniqueOpts.ByState) > 0 { - stateSet = sliceutil.Map(uniqueOpts.ByState, func(s rivertype.JobState) string { return string(s) }) - } - - getParams.ByState = true - getParams.State = stateSet - } - - return &getParams -} - -func insertNonUnique(ctx context.Context, exec riverdriver.Executor, params *riverdriver.JobInsertFastParams) (*riverdriver.JobInsertFastResult, error) { - result, err := exec.JobInsertFast(ctx, params) - if err != nil { - return nil, err - } - return result, nil -} diff --git a/internal/maintenance/periodic_job_enqueuer.go b/internal/maintenance/periodic_job_enqueuer.go index 6fbb31af..fb1c435f 100644 --- a/internal/maintenance/periodic_job_enqueuer.go +++ b/internal/maintenance/periodic_job_enqueuer.go @@ -7,7 +7,6 @@ import ( "sync" "time" - "github.com/riverqueue/river/internal/dbunique" "github.com/riverqueue/river/riverdriver" "github.com/riverqueue/river/rivershared/baseservice" "github.com/riverqueue/river/rivershared/startstop" @@ -39,7 +38,7 @@ func (ts *PeriodicJobEnqueuerTestSignals) Init() { // river.PeriodicJobArgs, but needs a separate type because the enqueuer is in a // subpackage. type PeriodicJob struct { - ConstructorFunc func() (*riverdriver.JobInsertFastParams, *dbunique.UniqueOpts, error) + ConstructorFunc func() (*riverdriver.JobInsertFastParams, error) RunOnStart bool ScheduleFunc func(time.Time) time.Time @@ -87,7 +86,6 @@ type PeriodicJobEnqueuer struct { nextHandle rivertype.PeriodicJobHandle periodicJobs map[rivertype.PeriodicJobHandle]*PeriodicJob recalculateNextRun chan struct{} - uniqueInserter *dbunique.UniqueInserter } func NewPeriodicJobEnqueuer(archetype *baseservice.Archetype, config *PeriodicJobEnqueuerConfig, exec riverdriver.Executor) *PeriodicJobEnqueuer { @@ -114,7 +112,6 @@ func NewPeriodicJobEnqueuer(archetype *baseservice.Archetype, config *PeriodicJo nextHandle: nextHandle, periodicJobs: periodicJobs, recalculateNextRun: make(chan struct{}, 1), - uniqueInserter: baseservice.Init(archetype, &dbunique.UniqueInserter{AdvisoryLockPrefix: config.AdvisoryLockPrefix}), }) return svc @@ -196,11 +193,6 @@ func (s *PeriodicJobEnqueuer) RemoveMany(periodicJobHandles []rivertype.Periodic } } -type insertParamsAndUniqueOpts struct { - InsertParams *riverdriver.JobInsertFastParams - UniqueOpts *dbunique.UniqueOpts -} - func (s *PeriodicJobEnqueuer) Start(ctx context.Context) error { ctx, shouldStart, started, stopped := s.StartInit(ctx) if !shouldStart { @@ -232,9 +224,7 @@ func (s *PeriodicJobEnqueuer) Start(ctx context.Context) error { var ( insertParamsMany []*riverdriver.JobInsertFastParams - // only contains jobs using deprecated v1 unique options: - insertParamsUnique []*insertParamsAndUniqueOpts - now = s.Time.NowUTC() + now = s.Time.NowUTC() ) // Handle periodic jobs in sorted order so we can correctly account @@ -257,19 +247,15 @@ func (s *PeriodicJobEnqueuer) Start(ctx context.Context) error { continue } - if insertParams, uniqueOpts, ok := s.insertParamsFromConstructor(ctx, periodicJob.ConstructorFunc, now); ok { - if uniqueOpts == nil || uniqueOpts.IsEmpty() { - insertParamsMany = append(insertParamsMany, insertParams) - } else { - insertParamsUnique = append(insertParamsUnique, &insertParamsAndUniqueOpts{insertParams, uniqueOpts}) - } + if insertParams, ok := s.insertParamsFromConstructor(ctx, periodicJob.ConstructorFunc, now); ok { + insertParamsMany = append(insertParamsMany, insertParams) } } - s.insertBatch(ctx, insertParamsMany, insertParamsUnique) + s.insertBatch(ctx, insertParamsMany) - if len(insertParamsMany) > 0 || len(insertParamsUnique) > 0 { - s.Logger.DebugContext(ctx, s.Name+": Inserted RunOnStart jobs", "num_jobs", len(insertParamsMany)+len(insertParamsUnique)) + if len(insertParamsMany) > 0 { + s.Logger.DebugContext(ctx, s.Name+": Inserted RunOnStart jobs", "num_jobs", len(insertParamsMany)) } } @@ -283,10 +269,7 @@ func (s *PeriodicJobEnqueuer) Start(ctx context.Context) error { for { select { case <-timerUntilNextRun.C: - var ( - insertParamsMany []*riverdriver.JobInsertFastParams - insertParamsUnique []*insertParamsAndUniqueOpts - ) + var insertParamsMany []*riverdriver.JobInsertFastParams now := s.Time.NowUTC() @@ -304,12 +287,8 @@ func (s *PeriodicJobEnqueuer) Start(ctx context.Context) error { continue } - if insertParams, uniqueOpts, ok := s.insertParamsFromConstructor(ctx, periodicJob.ConstructorFunc, periodicJob.nextRunAt); ok { - if uniqueOpts == nil || uniqueOpts.IsEmpty() { - insertParamsMany = append(insertParamsMany, insertParams) - } else { - insertParamsUnique = append(insertParamsUnique, &insertParamsAndUniqueOpts{insertParams, uniqueOpts}) - } + if insertParams, ok := s.insertParamsFromConstructor(ctx, periodicJob.ConstructorFunc, periodicJob.nextRunAt); ok { + insertParamsMany = append(insertParamsMany, insertParams) } // Although we may have inserted a new job a little @@ -320,7 +299,7 @@ func (s *PeriodicJobEnqueuer) Start(ctx context.Context) error { } }() - s.insertBatch(ctx, insertParamsMany, insertParamsUnique) + s.insertBatch(ctx, insertParamsMany) case <-s.recalculateNextRun: if !timerUntilNextRun.Stop() { @@ -350,8 +329,8 @@ func (s *PeriodicJobEnqueuer) Start(ctx context.Context) error { return nil } -func (s *PeriodicJobEnqueuer) insertBatch(ctx context.Context, insertParamsMany []*riverdriver.JobInsertFastParams, insertParamsUnique []*insertParamsAndUniqueOpts) { - if len(insertParamsMany) == 0 && len(insertParamsUnique) == 0 { +func (s *PeriodicJobEnqueuer) insertBatch(ctx context.Context, insertParamsMany []*riverdriver.JobInsertFastParams) { + if len(insertParamsMany) == 0 { return } @@ -362,7 +341,7 @@ func (s *PeriodicJobEnqueuer) insertBatch(ctx context.Context, insertParamsMany } defer tx.Rollback(ctx) - queues := make([]string, 0, len(insertParamsMany)+len(insertParamsUnique)) + queues := make([]string, 0, len(insertParamsMany)) if len(insertParamsMany) > 0 { results, err := tx.JobInsertFastMany(ctx, insertParamsMany) @@ -378,24 +357,6 @@ func (s *PeriodicJobEnqueuer) insertBatch(ctx context.Context, insertParamsMany } } - // Unique periodic jobs must be inserted one at a time because bulk insert - // doesn't respect uniqueness. Unique jobs are rare compared to non-unique, - // so we still maintain an insert many fast path above for programs that - // aren't inserting any unique jobs periodically (which we expect is most). - if len(insertParamsUnique) > 0 { - for _, params := range insertParamsUnique { - res, err := s.uniqueInserter.JobInsert(ctx, tx, params.InsertParams, params.UniqueOpts) - if err != nil { - s.Logger.ErrorContext(ctx, s.Name+": Error inserting unique periodic job", - "error", err.Error(), "kind", params.InsertParams.Kind) - continue - } - if !res.UniqueSkippedAsDuplicate { - queues = append(queues, params.InsertParams.Queue) - } - } - } - if len(queues) > 0 { if err := s.Config.NotifyInsert(ctx, tx, queues); err != nil { s.Logger.ErrorContext(ctx, s.Name+": Error notifying insert", "error", err.Error()) @@ -412,23 +373,23 @@ func (s *PeriodicJobEnqueuer) insertBatch(ctx context.Context, insertParamsMany s.TestSignals.InsertedJobs.Signal(struct{}{}) } -func (s *PeriodicJobEnqueuer) insertParamsFromConstructor(ctx context.Context, constructorFunc func() (*riverdriver.JobInsertFastParams, *dbunique.UniqueOpts, error), scheduledAt time.Time) (*riverdriver.JobInsertFastParams, *dbunique.UniqueOpts, bool) { - insertParams, uniqueOpts, err := constructorFunc() +func (s *PeriodicJobEnqueuer) insertParamsFromConstructor(ctx context.Context, constructorFunc func() (*riverdriver.JobInsertFastParams, error), scheduledAt time.Time) (*riverdriver.JobInsertFastParams, bool) { + insertParams, err := constructorFunc() if err != nil { if errors.Is(err, ErrNoJobToInsert) { s.Logger.InfoContext(ctx, s.Name+": nil returned from periodic job constructor, skipping") s.TestSignals.SkippedJob.Signal(struct{}{}) - return nil, nil, false + return nil, false } s.Logger.ErrorContext(ctx, s.Name+": Internal error generating periodic job", "error", err.Error()) - return nil, nil, false + return nil, false } if insertParams.ScheduledAt == nil { insertParams.ScheduledAt = &scheduledAt } - return insertParams, uniqueOpts, true + return insertParams, true } const periodicJobEnqueuerVeryLongDuration = 24 * time.Hour diff --git a/internal/maintenance/periodic_job_enqueuer_test.go b/internal/maintenance/periodic_job_enqueuer_test.go index c1cb776a..48677407 100644 --- a/internal/maintenance/periodic_job_enqueuer_test.go +++ b/internal/maintenance/periodic_job_enqueuer_test.go @@ -40,8 +40,8 @@ func TestPeriodicJobEnqueuer(t *testing.T) { stubSvc := &riversharedtest.TimeStub{} stubSvc.StubNowUTC(time.Now().UTC()) - jobConstructorWithQueueFunc := func(name string, unique bool, queue string) func() (*riverdriver.JobInsertFastParams, *dbunique.UniqueOpts, error) { - return func() (*riverdriver.JobInsertFastParams, *dbunique.UniqueOpts, error) { + jobConstructorWithQueueFunc := func(name string, unique bool, queue string) func() (*riverdriver.JobInsertFastParams, error) { + return func() (*riverdriver.JobInsertFastParams, error) { params := &riverdriver.JobInsertFastParams{ Args: noOpArgs{}, EncodedArgs: []byte("{}"), @@ -56,30 +56,17 @@ func TestPeriodicJobEnqueuer(t *testing.T) { var err error params.UniqueKey, err = dbunique.UniqueKey(stubSvc, uniqueOpts, params) if err != nil { - return nil, nil, err + return nil, err } params.UniqueStates = uniqueOpts.StateBitmask() } - return params, nil, nil + return params, nil } } - jobConstructorUniqueV1Func := func(name string) func() (*riverdriver.JobInsertFastParams, *dbunique.UniqueOpts, error) { - return func() (*riverdriver.JobInsertFastParams, *dbunique.UniqueOpts, error) { - return &riverdriver.JobInsertFastParams{ - EncodedArgs: []byte("{}"), - Kind: name, - MaxAttempts: rivercommon.MaxAttemptsDefault, - Priority: rivercommon.PriorityDefault, - Queue: rivercommon.QueueDefault, - State: rivertype.JobStateAvailable, - }, &dbunique.UniqueOpts{ByArgs: true}, nil - } - } - - jobConstructorFunc := func(name string, unique bool) func() (*riverdriver.JobInsertFastParams, *dbunique.UniqueOpts, error) { + jobConstructorFunc := func(name string, unique bool) func() (*riverdriver.JobInsertFastParams, error) { return jobConstructorWithQueueFunc(name, unique, rivercommon.QueueDefault) } @@ -241,42 +228,6 @@ func TestPeriodicJobEnqueuer(t *testing.T) { } }) - t.Run("RespectsV1JobUniqueness", func(t *testing.T) { - t.Parallel() - - svc, bundle := setup(t) - - svc.AddMany([]*PeriodicJob{ - {ScheduleFunc: periodicIntervalSchedule(500 * time.Millisecond), ConstructorFunc: jobConstructorUniqueV1Func("unique_periodic_job_500ms")}, - }) - - startService(t, svc) - - // Should be no jobs to start. - requireNJobs(t, bundle.exec, "unique_periodic_job_500ms", 0) - - svc.TestSignals.InsertedJobs.WaitOrTimeout() - requireNJobs(t, bundle.exec, "unique_periodic_job_500ms", 1) - // This initial insert should emit a notification: - svc.TestSignals.NotifiedQueues.WaitOrTimeout() - - // Another insert was attempted, but there's still only one job due to - // uniqueness conditions. - svc.TestSignals.InsertedJobs.WaitOrTimeout() - requireNJobs(t, bundle.exec, "unique_periodic_job_500ms", 1) - - svc.TestSignals.InsertedJobs.WaitOrTimeout() - requireNJobs(t, bundle.exec, "unique_periodic_job_500ms", 1) - - // Ensure that no notifications were emitted beyond the first one because no - // additional jobs were inserted: - select { - case queues := <-svc.TestSignals.NotifiedQueues.WaitC(): - t.Fatalf("Expected no notification to be emitted, but got one for queues: %v", queues) - case <-time.After(100 * time.Millisecond): - } - }) - t.Run("RunOnStart", func(t *testing.T) { t.Parallel() @@ -305,8 +256,8 @@ func TestPeriodicJobEnqueuer(t *testing.T) { svc.AddMany([]*PeriodicJob{ // skip this insert when it returns nil: - {ScheduleFunc: periodicIntervalSchedule(time.Second), ConstructorFunc: func() (*riverdriver.JobInsertFastParams, *dbunique.UniqueOpts, error) { - return nil, nil, ErrNoJobToInsert + {ScheduleFunc: periodicIntervalSchedule(time.Second), ConstructorFunc: func() (*riverdriver.JobInsertFastParams, error) { + return nil, ErrNoJobToInsert }, RunOnStart: true}, }) diff --git a/internal/maintenance/queue_maintainer_test.go b/internal/maintenance/queue_maintainer_test.go index d8f94323..a68e91a8 100644 --- a/internal/maintenance/queue_maintainer_test.go +++ b/internal/maintenance/queue_maintainer_test.go @@ -8,7 +8,6 @@ import ( "github.com/robfig/cron/v3" "github.com/stretchr/testify/require" - "github.com/riverqueue/river/internal/dbunique" "github.com/riverqueue/river/internal/riverinternaltest" "github.com/riverqueue/river/internal/riverinternaltest/sharedtx" "github.com/riverqueue/river/riverdriver" @@ -108,8 +107,8 @@ func TestQueueMaintainer(t *testing.T) { NewPeriodicJobEnqueuer(archetype, &PeriodicJobEnqueuerConfig{ PeriodicJobs: []*PeriodicJob{ { - ConstructorFunc: func() (*riverdriver.JobInsertFastParams, *dbunique.UniqueOpts, error) { - return nil, nil, ErrNoJobToInsert + ConstructorFunc: func() (*riverdriver.JobInsertFastParams, error) { + return nil, ErrNoJobToInsert }, ScheduleFunc: cron.Every(15 * time.Minute).Next, }, diff --git a/internal/riverinternaltest/riverdrivertest/riverdrivertest.go b/internal/riverinternaltest/riverdrivertest/riverdrivertest.go index 42695d33..9386e0a7 100644 --- a/internal/riverinternaltest/riverdrivertest/riverdrivertest.go +++ b/internal/riverinternaltest/riverdrivertest/riverdrivertest.go @@ -569,6 +569,32 @@ func Exercise[TTx any](ctx context.Context, t *testing.T, require.Empty(t, jobRows) }) + t.Run("ConstrainedToScheduledAtBeforeCustomNowTime", func(t *testing.T) { + t.Parallel() + + exec, _ := setup(ctx, t) + + now := time.Now().Add(1 * time.Minute) + // Job 1 is scheduled after now so it's not found: + _ = testfactory.Job(ctx, t, exec, &testfactory.JobOpts{ + ScheduledAt: ptrutil.Ptr(now.Add(1 * time.Minute)), + }) + // Job 2 is scheduled just before now so it's found: + job2 := testfactory.Job(ctx, t, exec, &testfactory.JobOpts{ + ScheduledAt: ptrutil.Ptr(now.Add(-1 * time.Microsecond)), + }) + + jobRows, err := exec.JobGetAvailable(ctx, &riverdriver.JobGetAvailableParams{ + AttemptedBy: clientID, + Max: 100, + Now: ptrutil.Ptr(now), + Queue: rivercommon.QueueDefault, + }) + require.NoError(t, err) + require.Len(t, jobRows, 1) + require.Equal(t, job2.ID, jobRows[0].ID) + }) + t.Run("Prioritized", func(t *testing.T) { t.Parallel() @@ -845,84 +871,6 @@ func Exercise[TTx any](ctx context.Context, t *testing.T, sliceutil.Map(stuckJobs, func(j *rivertype.JobRow) int64 { return j.ID })) }) - t.Run("JobInsertFast", func(t *testing.T) { - t.Parallel() - - t.Run("MinimalArgsWithDefaults", func(t *testing.T) { - t.Parallel() - - exec, _ := setup(ctx, t) - - now := time.Now().UTC() - - result, err := exec.JobInsertFast(ctx, &riverdriver.JobInsertFastParams{ - EncodedArgs: []byte(`{"encoded": "args"}`), - Kind: "test_kind", - MaxAttempts: rivercommon.MaxAttemptsDefault, - Priority: rivercommon.PriorityDefault, - Queue: rivercommon.QueueDefault, - State: rivertype.JobStateAvailable, - }) - require.NoError(t, err) - job := result.Job - require.Equal(t, 0, job.Attempt) - require.Nil(t, job.AttemptedAt) - require.WithinDuration(t, now, job.CreatedAt, 2*time.Second) - require.Equal(t, []byte(`{"encoded": "args"}`), job.EncodedArgs) - require.Empty(t, job.Errors) - require.Nil(t, job.FinalizedAt) - require.Equal(t, "test_kind", job.Kind) - require.Equal(t, rivercommon.MaxAttemptsDefault, job.MaxAttempts) - require.Equal(t, []byte(`{}`), job.Metadata) - require.Equal(t, rivercommon.PriorityDefault, job.Priority) - require.Equal(t, rivercommon.QueueDefault, job.Queue) - require.WithinDuration(t, now, job.ScheduledAt, 2*time.Second) - require.Equal(t, rivertype.JobStateAvailable, job.State) - require.Equal(t, []string{}, job.Tags) - }) - - t.Run("AllArgs", func(t *testing.T) { - t.Parallel() - - exec, _ := setup(ctx, t) - - targetTime := time.Now().UTC().Add(-15 * time.Minute) - - result, err := exec.JobInsertFast(ctx, &riverdriver.JobInsertFastParams{ - CreatedAt: &targetTime, - EncodedArgs: []byte(`{"encoded": "args"}`), - Kind: "test_kind", - MaxAttempts: 6, - Metadata: []byte(`{"meta": "data"}`), - Priority: 2, - Queue: "queue_name", - ScheduledAt: &targetTime, - State: rivertype.JobStateRunning, - Tags: []string{"tag"}, - UniqueKey: []byte("unique-key"), - UniqueStates: 0xFF, - }) - require.NoError(t, err) - - require.False(t, result.UniqueSkippedAsDuplicate) - job := result.Job - require.Equal(t, 0, job.Attempt) - require.Nil(t, job.AttemptedAt) - requireEqualTime(t, targetTime, job.CreatedAt) - require.Equal(t, []byte(`{"encoded": "args"}`), job.EncodedArgs) - require.Empty(t, job.Errors) - require.Nil(t, job.FinalizedAt) - require.Equal(t, "test_kind", job.Kind) - require.Equal(t, 6, job.MaxAttempts) - require.Equal(t, []byte(`{"meta": "data"}`), job.Metadata) - require.Equal(t, 2, job.Priority) - require.Equal(t, "queue_name", job.Queue) - requireEqualTime(t, targetTime, job.ScheduledAt) - require.Equal(t, rivertype.JobStateRunning, job.State) - require.Equal(t, []string{"tag"}, job.Tags) - }) - }) - t.Run("JobInsertFastMany", func(t *testing.T) { t.Parallel() @@ -1998,33 +1946,6 @@ func Exercise[TTx any](ctx context.Context, t *testing.T, require.Equal(t, rivertype.JobStateCancelled, jobUpdated.State) require.Equal(t, "unique-key", string(jobUpdated.UniqueKey)) }) - - t.Run("CancelsARunningV2UniqueJobAndClearsUniqueKey", func(t *testing.T) { //nolint:dupl - t.Parallel() - - exec, _ := setup(ctx, t) - - now := time.Now().UTC() - // V2 unique jobs (with no UniqueStates) should not have UniqueKey cleared: - job := testfactory.Job(ctx, t, exec, &testfactory.JobOpts{ - State: ptrutil.Ptr(rivertype.JobStateRunning), - UniqueKey: []byte("unique-key"), - }) - // expclitly null out UniqueStates to simulate an old v2 job: - _, err := exec.Exec(ctx, fmt.Sprintf("UPDATE river_job SET unique_states = NULL WHERE id = %d", job.ID)) - require.NoError(t, err) - - jobAfter, err := exec.JobSetStateIfRunning(ctx, riverdriver.JobSetStateCancelled(job.ID, now, makeErrPayload(t, now))) - require.NoError(t, err) - require.Equal(t, rivertype.JobStateCancelled, jobAfter.State) - require.WithinDuration(t, now, *jobAfter.FinalizedAt, time.Microsecond) - require.Nil(t, jobAfter.UniqueKey) - - jobUpdated, err := exec.JobGetByID(ctx, job.ID) - require.NoError(t, err) - require.Equal(t, rivertype.JobStateCancelled, jobUpdated.State) - require.Nil(t, jobUpdated.UniqueKey) - }) }) t.Run("JobSetStateIfRunning_JobSetStateDiscarded", func(t *testing.T) { @@ -2054,33 +1975,6 @@ func Exercise[TTx any](ctx context.Context, t *testing.T, require.NoError(t, err) require.Equal(t, rivertype.JobStateDiscarded, jobUpdated.State) }) - - t.Run("DiscardsARunningV2UniqueJobAndClearsUniqueKey", func(t *testing.T) { //nolint:dupl - t.Parallel() - - exec, _ := setup(ctx, t) - - now := time.Now().UTC() - // V2 unique jobs (with no UniqueStates) should not have UniqueKey cleared: - job := testfactory.Job(ctx, t, exec, &testfactory.JobOpts{ - State: ptrutil.Ptr(rivertype.JobStateRunning), - UniqueKey: []byte("unique-key"), - }) - // expclitly null out UniqueStates to simulate an old v2 job: - _, err := exec.Exec(ctx, fmt.Sprintf("UPDATE river_job SET unique_states = NULL WHERE id = %d", job.ID)) - require.NoError(t, err) - - jobAfter, err := exec.JobSetStateIfRunning(ctx, riverdriver.JobSetStateDiscarded(job.ID, now, makeErrPayload(t, now))) - require.NoError(t, err) - require.Equal(t, rivertype.JobStateDiscarded, jobAfter.State) - require.WithinDuration(t, now, *jobAfter.FinalizedAt, time.Microsecond) - require.Nil(t, jobAfter.UniqueKey) - - jobUpdated, err := exec.JobGetByID(ctx, job.ID) - require.NoError(t, err) - require.Equal(t, rivertype.JobStateDiscarded, jobUpdated.State) - require.Nil(t, jobUpdated.UniqueKey) - }) }) setStateManyParams := func(params ...*riverdriver.JobSetStateIfRunningParams) *riverdriver.JobSetStateIfRunningManyParams { diff --git a/job_executor_test.go b/job_executor_test.go index a181b99c..3140463b 100644 --- a/job_executor_test.go +++ b/job_executor_test.go @@ -17,6 +17,7 @@ import ( "github.com/riverqueue/river/riverdriver/riverpgxv5" "github.com/riverqueue/river/rivershared/baseservice" "github.com/riverqueue/river/rivershared/riversharedtest" + "github.com/riverqueue/river/rivershared/util/ptrutil" "github.com/riverqueue/river/rivershared/util/timeutil" "github.com/riverqueue/river/rivertype" ) @@ -141,24 +142,31 @@ func TestJobExecutor_Execute(t *testing.T) { workUnitFactory := newWorkUnitFactoryWithCustomRetry(func() error { return nil }, nil) - result, err := exec.JobInsertFast(ctx, &riverdriver.JobInsertFastParams{ + now := time.Now().UTC() + results, err := exec.JobInsertFastMany(ctx, []*riverdriver.JobInsertFastParams{{ EncodedArgs: []byte("{}"), Kind: (callbackArgs{}).Kind(), MaxAttempts: rivercommon.MaxAttemptsDefault, Priority: rivercommon.PriorityDefault, Queue: rivercommon.QueueDefault, + // Needs to be explicitly set to a "now" horizon that's aligned with the + // JobGetAvailable call. InsertMany applies a default scheduled_at in Go + // so it can't pick up the Postgres-level `now()` default. + ScheduledAt: ptrutil.Ptr(now), State: rivertype.JobStateAvailable, - }) + }}) require.NoError(t, err) // Fetch the job to make sure it's marked as running: jobs, err := exec.JobGetAvailable(ctx, &riverdriver.JobGetAvailableParams{ Max: 1, + Now: ptrutil.Ptr(now), Queue: rivercommon.QueueDefault, }) require.NoError(t, err) + require.Len(t, jobs, 1) - require.Equal(t, result.Job.ID, jobs[0].ID) + require.Equal(t, results[0].Job.ID, jobs[0].ID) job := jobs[0] bundle := &testBundle{ diff --git a/job_test.go b/job_test.go index b7576289..2d9ecb65 100644 --- a/job_test.go +++ b/job_test.go @@ -9,7 +9,7 @@ import ( "github.com/riverqueue/river/rivertype" ) -func TestJobUniqueOpts_isEmpty(t *testing.T) { +func TestUniqueOpts_isEmpty(t *testing.T) { t.Parallel() require.True(t, (&UniqueOpts{}).isEmpty()) diff --git a/periodic_job.go b/periodic_job.go index 5d043a21..c3db82f2 100644 --- a/periodic_job.go +++ b/periodic_job.go @@ -3,7 +3,6 @@ package river import ( "time" - "github.com/riverqueue/river/internal/dbunique" "github.com/riverqueue/river/internal/maintenance" "github.com/riverqueue/river/riverdriver" "github.com/riverqueue/river/rivershared/util/sliceutil" @@ -181,12 +180,12 @@ func (b *PeriodicJobBundle) toInternal(periodicJob *PeriodicJob) *maintenance.Pe opts = periodicJob.opts } return &maintenance.PeriodicJob{ - ConstructorFunc: func() (*riverdriver.JobInsertFastParams, *dbunique.UniqueOpts, error) { + ConstructorFunc: func() (*riverdriver.JobInsertFastParams, error) { args, options := periodicJob.constructorFunc() if args == nil { - return nil, nil, maintenance.ErrNoJobToInsert + return nil, maintenance.ErrNoJobToInsert } - return insertParamsFromConfigArgsAndOptions(&b.periodicJobEnqueuer.Archetype, b.clientConfig, args, options, false) + return insertParamsFromConfigArgsAndOptions(&b.periodicJobEnqueuer.Archetype, b.clientConfig, args, options) }, RunOnStart: opts.RunOnStart, ScheduleFunc: periodicJob.scheduleFunc.Next, diff --git a/periodic_job_test.go b/periodic_job_test.go index 745a06d8..8857eee0 100644 --- a/periodic_job_test.go +++ b/periodic_job_test.go @@ -51,11 +51,11 @@ func TestPeriodicJobBundle(t *testing.T) { internalPeriodicJob := periodicJobBundle.toInternal(periodicJob) - insertParams1, _, err := internalPeriodicJob.ConstructorFunc() + insertParams1, err := internalPeriodicJob.ConstructorFunc() require.NoError(t, err) require.Equal(t, 1, mustUnmarshalJSON[TestJobArgs](t, insertParams1.EncodedArgs).JobNum) - insertParams2, _, err := internalPeriodicJob.ConstructorFunc() + insertParams2, err := internalPeriodicJob.ConstructorFunc() require.NoError(t, err) require.Equal(t, 2, mustUnmarshalJSON[TestJobArgs](t, insertParams2.EncodedArgs).JobNum) }) @@ -76,7 +76,7 @@ func TestPeriodicJobBundle(t *testing.T) { internalPeriodicJob := periodicJobBundle.toInternal(periodicJob) - _, _, err := internalPeriodicJob.ConstructorFunc() + _, err := internalPeriodicJob.ConstructorFunc() require.ErrorIs(t, err, maintenance.ErrNoJobToInsert) }) } diff --git a/producer_test.go b/producer_test.go index eec07da0..7c79cb00 100644 --- a/producer_test.go +++ b/producer_test.go @@ -102,7 +102,7 @@ func Test_Producer_CanSafelyCompleteJobsWhileFetchingNewOnes(t *testing.T) { params := make([]*riverdriver.JobInsertFastParams, maxJobCount) for i := range params { - insertParams, _, err := insertParamsFromConfigArgsAndOptions(archetype, config, WithJobNumArgs{JobNum: i}, nil, true) + insertParams, err := insertParamsFromConfigArgsAndOptions(archetype, config, WithJobNumArgs{JobNum: i}, nil) require.NoError(err) params[i] = insertParams @@ -240,17 +240,20 @@ func testProducer(t *testing.T, makeProducer func(ctx context.Context, t *testin ctx := context.Background() type testBundle struct { - archetype *baseservice.Archetype - completer jobcompleter.JobCompleter - config *Config - exec riverdriver.Executor - jobUpdates chan jobcompleter.CompleterJobUpdated - workers *Workers + archetype *baseservice.Archetype + completer jobcompleter.JobCompleter + config *Config + exec riverdriver.Executor + jobUpdates chan jobcompleter.CompleterJobUpdated + timeBeforeStart time.Time + workers *Workers } setup := func(t *testing.T) (*producer, *testBundle) { t.Helper() + timeBeforeStart := time.Now().UTC() + producer, jobUpdates := makeProducer(ctx, t) producer.testSignals.Init() config := newTestConfig(t, nil) @@ -265,22 +268,34 @@ func testProducer(t *testing.T, makeProducer func(ctx context.Context, t *testin }() return producer, &testBundle{ - archetype: &producer.Archetype, - completer: producer.completer, - config: config, - exec: producer.exec, - jobUpdates: jobUpdatesFlattened, - workers: producer.workers, + archetype: &producer.Archetype, + completer: producer.completer, + config: config, + exec: producer.exec, + jobUpdates: jobUpdatesFlattened, + timeBeforeStart: timeBeforeStart, + workers: producer.workers, } } mustInsert := func(ctx context.Context, t *testing.T, bundle *testBundle, args JobArgs) { t.Helper() - insertParams, _, err := insertParamsFromConfigArgsAndOptions(bundle.archetype, bundle.config, args, nil, false) + insertParams, err := insertParamsFromConfigArgsAndOptions(bundle.archetype, bundle.config, args, nil) require.NoError(t, err) + if insertParams.ScheduledAt == nil { + // Without this, newly inserted jobs will pick up a scheduled_at time + // that's the current Go time at the time of insertion. If the test is + // using a transaction, this will be after the `now()` time in the + // transaction that gets used by default in `JobGetAvailable`, so new jobs + // won't be visible. + // + // To work around this, set all inserted jobs to a time before the start + // of the test to ensure they're visible. + insertParams.ScheduledAt = &bundle.timeBeforeStart + } - _, err = bundle.exec.JobInsertFast(ctx, insertParams) + _, err = bundle.exec.JobInsertFastMany(ctx, []*riverdriver.JobInsertFastParams{insertParams}) require.NoError(t, err) } diff --git a/riverdriver/river_driver_interface.go b/riverdriver/river_driver_interface.go index 52af07f4..b7c4a6f4 100644 --- a/riverdriver/river_driver_interface.go +++ b/riverdriver/river_driver_interface.go @@ -115,7 +115,6 @@ type Executor interface { JobGetByKindAndUniqueProperties(ctx context.Context, params *JobGetByKindAndUniquePropertiesParams) (*rivertype.JobRow, error) JobGetByKindMany(ctx context.Context, kind []string) ([]*rivertype.JobRow, error) JobGetStuck(ctx context.Context, params *JobGetStuckParams) ([]*rivertype.JobRow, error) - JobInsertFast(ctx context.Context, params *JobInsertFastParams) (*JobInsertFastResult, error) JobInsertFastMany(ctx context.Context, params []*JobInsertFastParams) ([]*JobInsertFastResult, error) JobInsertFastManyNoReturning(ctx context.Context, params []*JobInsertFastParams) (int, error) JobInsertFull(ctx context.Context, params *JobInsertFullParams) (*rivertype.JobRow, error) @@ -227,6 +226,7 @@ type JobDeleteBeforeParams struct { type JobGetAvailableParams struct { AttemptedBy string Max int + Now *time.Time Queue string } diff --git a/riverdriver/riverdatabasesql/internal/dbsqlc/river_job.sql.go b/riverdriver/riverdatabasesql/internal/dbsqlc/river_job.sql.go index cb339c1a..b750dd6f 100644 --- a/riverdriver/riverdatabasesql/internal/dbsqlc/river_job.sql.go +++ b/riverdriver/riverdatabasesql/internal/dbsqlc/river_job.sql.go @@ -43,15 +43,7 @@ updated_job AS ( finalized_at = CASE WHEN state = 'running' THEN finalized_at ELSE now() END, -- Mark the job as cancelled by query so that the rescuer knows not to -- rescue it, even if it gets stuck in the running state: - metadata = jsonb_set(metadata, '{cancel_attempted_at}'::text[], $3::jsonb, true), - -- Similarly, zero a ` + "`" + `unique_key` + "`" + ` if the job is transitioning directly - -- to cancelled. Otherwise, it'll be cleared in the job executor. - -- - -- This is transition code to support existing jobs using the old v2 - -- uniqueness design. We specifically avoid clearing this value if the - -- v3 unique_states field is populated, because the v3 design never - -- involves clearing unique_key. - unique_key = CASE WHEN (state = 'running' OR unique_states IS NOT NULL) THEN unique_key ELSE NULL END + metadata = jsonb_set(metadata, '{cancel_attempted_at}'::text[], $3::jsonb, true) FROM notification WHERE river_job.id = notification.id RETURNING river_job.id, river_job.args, river_job.attempt, river_job.attempted_at, river_job.attempted_by, river_job.created_at, river_job.errors, river_job.finalized_at, river_job.kind, river_job.max_attempts, river_job.metadata, river_job.priority, river_job.queue, river_job.state, river_job.scheduled_at, river_job.tags, river_job.unique_key, river_job.unique_states @@ -208,12 +200,12 @@ WITH locked_jobs AS ( WHERE state = 'available' AND queue = $2::text - AND scheduled_at <= now() + AND scheduled_at <= coalesce($3::timestamptz, now()) ORDER BY priority ASC, scheduled_at ASC, id ASC - LIMIT $3::integer + LIMIT $4::integer FOR UPDATE SKIP LOCKED ) @@ -235,11 +227,17 @@ RETURNING type JobGetAvailableParams struct { AttemptedBy string Queue string + Now *time.Time Max int32 } func (q *Queries) JobGetAvailable(ctx context.Context, db DBTX, arg *JobGetAvailableParams) ([]*RiverJob, error) { - rows, err := db.QueryContext(ctx, jobGetAvailable, arg.AttemptedBy, arg.Queue, arg.Max) + rows, err := db.QueryContext(ctx, jobGetAvailable, + arg.AttemptedBy, + arg.Queue, + arg.Now, + arg.Max, + ) if err != nil { return nil, err } @@ -527,107 +525,6 @@ func (q *Queries) JobGetStuck(ctx context.Context, db DBTX, arg *JobGetStuckPara return items, nil } -const jobInsertFast = `-- name: JobInsertFast :one -INSERT INTO river_job( - args, - created_at, - finalized_at, - kind, - max_attempts, - metadata, - priority, - queue, - scheduled_at, - state, - tags, - unique_key, - unique_states -) VALUES ( - $1, - coalesce($2::timestamptz, now()), - $3, - $4, - $5, - coalesce($6::jsonb, '{}'), - $7, - $8, - coalesce($9::timestamptz, now()), - $10, - coalesce($11::varchar(255)[], '{}'), - $12, - $13 -) -ON CONFLICT (unique_key) - WHERE unique_key IS NOT NULL - AND unique_states IS NOT NULL - AND river_job_state_in_bitmask(unique_states, state) - -- Something needs to be updated for a row to be returned on a conflict. - DO UPDATE SET kind = EXCLUDED.kind -RETURNING river_job.id, river_job.args, river_job.attempt, river_job.attempted_at, river_job.attempted_by, river_job.created_at, river_job.errors, river_job.finalized_at, river_job.kind, river_job.max_attempts, river_job.metadata, river_job.priority, river_job.queue, river_job.state, river_job.scheduled_at, river_job.tags, river_job.unique_key, river_job.unique_states, (xmax != 0) AS unique_skipped_as_duplicate -` - -type JobInsertFastParams struct { - Args string - CreatedAt *time.Time - FinalizedAt *time.Time - Kind string - MaxAttempts int16 - Metadata string - Priority int16 - Queue string - ScheduledAt *time.Time - State RiverJobState - Tags []string - UniqueKey []byte - UniqueStates pgtypealias.Bits -} - -type JobInsertFastRow struct { - RiverJob RiverJob - UniqueSkippedAsDuplicate bool -} - -func (q *Queries) JobInsertFast(ctx context.Context, db DBTX, arg *JobInsertFastParams) (*JobInsertFastRow, error) { - row := db.QueryRowContext(ctx, jobInsertFast, - arg.Args, - arg.CreatedAt, - arg.FinalizedAt, - arg.Kind, - arg.MaxAttempts, - arg.Metadata, - arg.Priority, - arg.Queue, - arg.ScheduledAt, - arg.State, - pq.Array(arg.Tags), - arg.UniqueKey, - arg.UniqueStates, - ) - var i JobInsertFastRow - err := row.Scan( - &i.RiverJob.ID, - &i.RiverJob.Args, - &i.RiverJob.Attempt, - &i.RiverJob.AttemptedAt, - pq.Array(&i.RiverJob.AttemptedBy), - &i.RiverJob.CreatedAt, - pq.Array(&i.RiverJob.Errors), - &i.RiverJob.FinalizedAt, - &i.RiverJob.Kind, - &i.RiverJob.MaxAttempts, - &i.RiverJob.Metadata, - &i.RiverJob.Priority, - &i.RiverJob.Queue, - &i.RiverJob.State, - &i.RiverJob.ScheduledAt, - pq.Array(&i.RiverJob.Tags), - &i.RiverJob.UniqueKey, - &i.RiverJob.UniqueStates, - &i.UniqueSkippedAsDuplicate, - ) - return &i, err -} - const jobInsertFastMany = `-- name: JobInsertFastMany :many INSERT INTO river_job( args, @@ -1245,12 +1142,7 @@ updated_job AS ( max_attempts = CASE WHEN NOT should_cancel AND $7::boolean THEN $8 ELSE max_attempts END, scheduled_at = CASE WHEN NOT should_cancel AND $9::boolean THEN $10::timestamptz - ELSE scheduled_at END, - -- This is transitional code for the v2 uniqueness design. We specifically - -- avoid clearing this value if the v3 unique_states field is populated, - -- because the v3 design never involves clearing unique_key. - unique_key = CASE WHEN (unique_states IS NULL AND ($1 IN ('cancelled', 'discarded') OR should_cancel)) THEN NULL - ELSE unique_key END + ELSE scheduled_at END FROM job_to_update WHERE river_job.id = job_to_update.id AND river_job.state = 'running' @@ -1449,11 +1341,8 @@ SET attempted_at = CASE WHEN $3::boolean THEN $4 ELSE attempted_at END, errors = CASE WHEN $5::boolean THEN $6::jsonb[] ELSE errors END, finalized_at = CASE WHEN $7::boolean THEN $8 ELSE finalized_at END, - state = CASE WHEN $9::boolean THEN $10 ELSE state END, - -- Transitional code to support tests for v2 uniqueness design. This field - -- is never modified in the v3 design. - unique_key = CASE WHEN $11::boolean THEN $12 ELSE unique_key END -WHERE id = $13 + state = CASE WHEN $9::boolean THEN $10 ELSE state END +WHERE id = $11 RETURNING id, args, attempt, attempted_at, attempted_by, created_at, errors, finalized_at, kind, max_attempts, metadata, priority, queue, state, scheduled_at, tags, unique_key, unique_states ` @@ -1468,8 +1357,6 @@ type JobUpdateParams struct { FinalizedAt *time.Time StateDoUpdate bool State RiverJobState - UniqueKeyDoUpdate bool - UniqueKey []byte ID int64 } @@ -1487,8 +1374,6 @@ func (q *Queries) JobUpdate(ctx context.Context, db DBTX, arg *JobUpdateParams) arg.FinalizedAt, arg.StateDoUpdate, arg.State, - arg.UniqueKeyDoUpdate, - arg.UniqueKey, arg.ID, ) var i RiverJob diff --git a/riverdriver/riverdatabasesql/river_database_sql_driver.go b/riverdriver/riverdatabasesql/river_database_sql_driver.go index ca8f0e1a..ec40f409 100644 --- a/riverdriver/riverdatabasesql/river_database_sql_driver.go +++ b/riverdriver/riverdatabasesql/river_database_sql_driver.go @@ -141,6 +141,7 @@ func (e *Executor) JobGetAvailable(ctx context.Context, params *riverdriver.JobG jobs, err := dbsqlc.New().JobGetAvailable(ctx, e.dbtx, &dbsqlc.JobGetAvailableParams{ AttemptedBy: params.AttemptedBy, Max: int32(min(params.Max, math.MaxInt32)), //nolint:gosec + Now: params.Now, Queue: params.Queue, }) if err != nil { @@ -200,31 +201,6 @@ func (e *Executor) JobGetStuck(ctx context.Context, params *riverdriver.JobGetSt return mapSliceError(jobs, jobRowFromInternal) } -func (e *Executor) JobInsertFast(ctx context.Context, params *riverdriver.JobInsertFastParams) (*riverdriver.JobInsertFastResult, error) { - result, err := dbsqlc.New().JobInsertFast(ctx, e.dbtx, &dbsqlc.JobInsertFastParams{ - Args: string(params.EncodedArgs), - CreatedAt: params.CreatedAt, - Kind: params.Kind, - MaxAttempts: int16(min(params.MaxAttempts, math.MaxInt16)), //nolint:gosec - Metadata: valutil.ValOrDefault(string(params.Metadata), "{}"), - Priority: int16(min(params.Priority, math.MaxInt16)), //nolint:gosec - Queue: params.Queue, - ScheduledAt: params.ScheduledAt, - State: dbsqlc.RiverJobState(params.State), - Tags: params.Tags, - UniqueKey: params.UniqueKey, - UniqueStates: pgtypealias.Bits{Bits: pgtype.Bits{Bytes: []byte{params.UniqueStates}, Len: 8, Valid: params.UniqueStates != 0}}, - }) - if err != nil { - return nil, interpretError(err) - } - externalJob, err := jobRowFromInternal(&result.RiverJob) - if err != nil { - return nil, err - } - return &riverdriver.JobInsertFastResult{Job: externalJob, UniqueSkippedAsDuplicate: result.UniqueSkippedAsDuplicate}, nil -} - func (e *Executor) JobInsertFastMany(ctx context.Context, params []*riverdriver.JobInsertFastParams) ([]*riverdriver.JobInsertFastResult, error) { insertJobsParams := &dbsqlc.JobInsertFastManyParams{ Args: make([]string, len(params)), @@ -239,7 +215,7 @@ func (e *Executor) JobInsertFastMany(ctx context.Context, params []*riverdriver. UniqueKey: make([][]byte, len(params)), UniqueStates: make([]pgtypealias.Bits, len(params)), } - now := time.Now() + now := time.Now().UTC() for i := 0; i < len(params); i++ { params := params[i] @@ -297,7 +273,7 @@ func (e *Executor) JobInsertFastManyNoReturning(ctx context.Context, params []*r UniqueKey: make([][]byte, len(params)), UniqueStates: make([]pgtypealias.Bits, len(params)), } - now := time.Now() + now := time.Now().UTC() for i := 0; i < len(params); i++ { params := params[i] diff --git a/riverdriver/riverpgxv5/internal/dbsqlc/river_job.sql b/riverdriver/riverpgxv5/internal/dbsqlc/river_job.sql index c1ac0bdf..585b1864 100644 --- a/riverdriver/riverpgxv5/internal/dbsqlc/river_job.sql +++ b/riverdriver/riverpgxv5/internal/dbsqlc/river_job.sql @@ -67,15 +67,7 @@ updated_job AS ( finalized_at = CASE WHEN state = 'running' THEN finalized_at ELSE now() END, -- Mark the job as cancelled by query so that the rescuer knows not to -- rescue it, even if it gets stuck in the running state: - metadata = jsonb_set(metadata, '{cancel_attempted_at}'::text[], @cancel_attempted_at::jsonb, true), - -- Similarly, zero a `unique_key` if the job is transitioning directly - -- to cancelled. Otherwise, it'll be cleared in the job executor. - -- - -- This is transition code to support existing jobs using the old v2 - -- uniqueness design. We specifically avoid clearing this value if the - -- v3 unique_states field is populated, because the v3 design never - -- involves clearing unique_key. - unique_key = CASE WHEN (state = 'running' OR unique_states IS NOT NULL) THEN unique_key ELSE NULL END + metadata = jsonb_set(metadata, '{cancel_attempted_at}'::text[], @cancel_attempted_at::jsonb, true) FROM notification WHERE river_job.id = notification.id RETURNING river_job.* @@ -144,7 +136,7 @@ WITH locked_jobs AS ( WHERE state = 'available' AND queue = @queue::text - AND scheduled_at <= now() + AND scheduled_at <= coalesce(sqlc.narg('now')::timestamptz, now()) ORDER BY priority ASC, scheduled_at ASC, @@ -202,44 +194,6 @@ WHERE state = 'running' ORDER BY id LIMIT @max; --- name: JobInsertFast :one -INSERT INTO river_job( - args, - created_at, - finalized_at, - kind, - max_attempts, - metadata, - priority, - queue, - scheduled_at, - state, - tags, - unique_key, - unique_states -) VALUES ( - @args, - coalesce(sqlc.narg('created_at')::timestamptz, now()), - @finalized_at, - @kind, - @max_attempts, - coalesce(@metadata::jsonb, '{}'), - @priority, - @queue, - coalesce(sqlc.narg('scheduled_at')::timestamptz, now()), - @state, - coalesce(@tags::varchar(255)[], '{}'), - @unique_key, - @unique_states -) -ON CONFLICT (unique_key) - WHERE unique_key IS NOT NULL - AND unique_states IS NOT NULL - AND river_job_state_in_bitmask(unique_states, state) - -- Something needs to be updated for a row to be returned on a conflict. - DO UPDATE SET kind = EXCLUDED.kind -RETURNING sqlc.embed(river_job), (xmax != 0) AS unique_skipped_as_duplicate; - -- name: JobInsertFastMany :many INSERT INTO river_job( args, @@ -536,12 +490,7 @@ updated_job AS ( max_attempts = CASE WHEN NOT should_cancel AND @max_attempts_update::boolean THEN @max_attempts ELSE max_attempts END, scheduled_at = CASE WHEN NOT should_cancel AND @scheduled_at_do_update::boolean THEN sqlc.narg('scheduled_at')::timestamptz - ELSE scheduled_at END, - -- This is transitional code for the v2 uniqueness design. We specifically - -- avoid clearing this value if the v3 unique_states field is populated, - -- because the v3 design never involves clearing unique_key. - unique_key = CASE WHEN (unique_states IS NULL AND (@state IN ('cancelled', 'discarded') OR should_cancel)) THEN NULL - ELSE unique_key END + ELSE scheduled_at END FROM job_to_update WHERE river_job.id = job_to_update.id AND river_job.state = 'running' @@ -624,9 +573,6 @@ SET attempted_at = CASE WHEN @attempted_at_do_update::boolean THEN @attempted_at ELSE attempted_at END, errors = CASE WHEN @errors_do_update::boolean THEN @errors::jsonb[] ELSE errors END, finalized_at = CASE WHEN @finalized_at_do_update::boolean THEN @finalized_at ELSE finalized_at END, - state = CASE WHEN @state_do_update::boolean THEN @state ELSE state END, - -- Transitional code to support tests for v2 uniqueness design. This field - -- is never modified in the v3 design. - unique_key = CASE WHEN @unique_key_do_update::boolean THEN @unique_key ELSE unique_key END + state = CASE WHEN @state_do_update::boolean THEN @state ELSE state END WHERE id = @id RETURNING *; diff --git a/riverdriver/riverpgxv5/internal/dbsqlc/river_job.sql.go b/riverdriver/riverpgxv5/internal/dbsqlc/river_job.sql.go index 1fff4859..b3b0aa65 100644 --- a/riverdriver/riverpgxv5/internal/dbsqlc/river_job.sql.go +++ b/riverdriver/riverpgxv5/internal/dbsqlc/river_job.sql.go @@ -42,15 +42,7 @@ updated_job AS ( finalized_at = CASE WHEN state = 'running' THEN finalized_at ELSE now() END, -- Mark the job as cancelled by query so that the rescuer knows not to -- rescue it, even if it gets stuck in the running state: - metadata = jsonb_set(metadata, '{cancel_attempted_at}'::text[], $3::jsonb, true), - -- Similarly, zero a ` + "`" + `unique_key` + "`" + ` if the job is transitioning directly - -- to cancelled. Otherwise, it'll be cleared in the job executor. - -- - -- This is transition code to support existing jobs using the old v2 - -- uniqueness design. We specifically avoid clearing this value if the - -- v3 unique_states field is populated, because the v3 design never - -- involves clearing unique_key. - unique_key = CASE WHEN (state = 'running' OR unique_states IS NOT NULL) THEN unique_key ELSE NULL END + metadata = jsonb_set(metadata, '{cancel_attempted_at}'::text[], $3::jsonb, true) FROM notification WHERE river_job.id = notification.id RETURNING river_job.id, river_job.args, river_job.attempt, river_job.attempted_at, river_job.attempted_by, river_job.created_at, river_job.errors, river_job.finalized_at, river_job.kind, river_job.max_attempts, river_job.metadata, river_job.priority, river_job.queue, river_job.state, river_job.scheduled_at, river_job.tags, river_job.unique_key, river_job.unique_states @@ -207,12 +199,12 @@ WITH locked_jobs AS ( WHERE state = 'available' AND queue = $2::text - AND scheduled_at <= now() + AND scheduled_at <= coalesce($3::timestamptz, now()) ORDER BY priority ASC, scheduled_at ASC, id ASC - LIMIT $3::integer + LIMIT $4::integer FOR UPDATE SKIP LOCKED ) @@ -234,11 +226,17 @@ RETURNING type JobGetAvailableParams struct { AttemptedBy string Queue string + Now *time.Time Max int32 } func (q *Queries) JobGetAvailable(ctx context.Context, db DBTX, arg *JobGetAvailableParams) ([]*RiverJob, error) { - rows, err := db.Query(ctx, jobGetAvailable, arg.AttemptedBy, arg.Queue, arg.Max) + rows, err := db.Query(ctx, jobGetAvailable, + arg.AttemptedBy, + arg.Queue, + arg.Now, + arg.Max, + ) if err != nil { return nil, err } @@ -514,107 +512,6 @@ func (q *Queries) JobGetStuck(ctx context.Context, db DBTX, arg *JobGetStuckPara return items, nil } -const jobInsertFast = `-- name: JobInsertFast :one -INSERT INTO river_job( - args, - created_at, - finalized_at, - kind, - max_attempts, - metadata, - priority, - queue, - scheduled_at, - state, - tags, - unique_key, - unique_states -) VALUES ( - $1, - coalesce($2::timestamptz, now()), - $3, - $4, - $5, - coalesce($6::jsonb, '{}'), - $7, - $8, - coalesce($9::timestamptz, now()), - $10, - coalesce($11::varchar(255)[], '{}'), - $12, - $13 -) -ON CONFLICT (unique_key) - WHERE unique_key IS NOT NULL - AND unique_states IS NOT NULL - AND river_job_state_in_bitmask(unique_states, state) - -- Something needs to be updated for a row to be returned on a conflict. - DO UPDATE SET kind = EXCLUDED.kind -RETURNING river_job.id, river_job.args, river_job.attempt, river_job.attempted_at, river_job.attempted_by, river_job.created_at, river_job.errors, river_job.finalized_at, river_job.kind, river_job.max_attempts, river_job.metadata, river_job.priority, river_job.queue, river_job.state, river_job.scheduled_at, river_job.tags, river_job.unique_key, river_job.unique_states, (xmax != 0) AS unique_skipped_as_duplicate -` - -type JobInsertFastParams struct { - Args []byte - CreatedAt *time.Time - FinalizedAt *time.Time - Kind string - MaxAttempts int16 - Metadata []byte - Priority int16 - Queue string - ScheduledAt *time.Time - State RiverJobState - Tags []string - UniqueKey []byte - UniqueStates pgtype.Bits -} - -type JobInsertFastRow struct { - RiverJob RiverJob - UniqueSkippedAsDuplicate bool -} - -func (q *Queries) JobInsertFast(ctx context.Context, db DBTX, arg *JobInsertFastParams) (*JobInsertFastRow, error) { - row := db.QueryRow(ctx, jobInsertFast, - arg.Args, - arg.CreatedAt, - arg.FinalizedAt, - arg.Kind, - arg.MaxAttempts, - arg.Metadata, - arg.Priority, - arg.Queue, - arg.ScheduledAt, - arg.State, - arg.Tags, - arg.UniqueKey, - arg.UniqueStates, - ) - var i JobInsertFastRow - err := row.Scan( - &i.RiverJob.ID, - &i.RiverJob.Args, - &i.RiverJob.Attempt, - &i.RiverJob.AttemptedAt, - &i.RiverJob.AttemptedBy, - &i.RiverJob.CreatedAt, - &i.RiverJob.Errors, - &i.RiverJob.FinalizedAt, - &i.RiverJob.Kind, - &i.RiverJob.MaxAttempts, - &i.RiverJob.Metadata, - &i.RiverJob.Priority, - &i.RiverJob.Queue, - &i.RiverJob.State, - &i.RiverJob.ScheduledAt, - &i.RiverJob.Tags, - &i.RiverJob.UniqueKey, - &i.RiverJob.UniqueStates, - &i.UniqueSkippedAsDuplicate, - ) - return &i, err -} - const jobInsertFastMany = `-- name: JobInsertFastMany :many INSERT INTO river_job( args, @@ -1223,12 +1120,7 @@ updated_job AS ( max_attempts = CASE WHEN NOT should_cancel AND $7::boolean THEN $8 ELSE max_attempts END, scheduled_at = CASE WHEN NOT should_cancel AND $9::boolean THEN $10::timestamptz - ELSE scheduled_at END, - -- This is transitional code for the v2 uniqueness design. We specifically - -- avoid clearing this value if the v3 unique_states field is populated, - -- because the v3 design never involves clearing unique_key. - unique_key = CASE WHEN (unique_states IS NULL AND ($1 IN ('cancelled', 'discarded') OR should_cancel)) THEN NULL - ELSE unique_key END + ELSE scheduled_at END FROM job_to_update WHERE river_job.id = job_to_update.id AND river_job.state = 'running' @@ -1424,11 +1316,8 @@ SET attempted_at = CASE WHEN $3::boolean THEN $4 ELSE attempted_at END, errors = CASE WHEN $5::boolean THEN $6::jsonb[] ELSE errors END, finalized_at = CASE WHEN $7::boolean THEN $8 ELSE finalized_at END, - state = CASE WHEN $9::boolean THEN $10 ELSE state END, - -- Transitional code to support tests for v2 uniqueness design. This field - -- is never modified in the v3 design. - unique_key = CASE WHEN $11::boolean THEN $12 ELSE unique_key END -WHERE id = $13 + state = CASE WHEN $9::boolean THEN $10 ELSE state END +WHERE id = $11 RETURNING id, args, attempt, attempted_at, attempted_by, created_at, errors, finalized_at, kind, max_attempts, metadata, priority, queue, state, scheduled_at, tags, unique_key, unique_states ` @@ -1443,8 +1332,6 @@ type JobUpdateParams struct { FinalizedAt *time.Time StateDoUpdate bool State RiverJobState - UniqueKeyDoUpdate bool - UniqueKey []byte ID int64 } @@ -1462,8 +1349,6 @@ func (q *Queries) JobUpdate(ctx context.Context, db DBTX, arg *JobUpdateParams) arg.FinalizedAt, arg.StateDoUpdate, arg.State, - arg.UniqueKeyDoUpdate, - arg.UniqueKey, arg.ID, ) var i RiverJob diff --git a/riverdriver/riverpgxv5/river_pgx_v5_driver.go b/riverdriver/riverpgxv5/river_pgx_v5_driver.go index 8b4e6d0b..8335f717 100644 --- a/riverdriver/riverpgxv5/river_pgx_v5_driver.go +++ b/riverdriver/riverpgxv5/river_pgx_v5_driver.go @@ -146,6 +146,7 @@ func (e *Executor) JobGetAvailable(ctx context.Context, params *riverdriver.JobG jobs, err := dbsqlc.New().JobGetAvailable(ctx, e.dbtx, &dbsqlc.JobGetAvailableParams{ AttemptedBy: params.AttemptedBy, Max: int32(min(params.Max, math.MaxInt32)), //nolint:gosec + Now: params.Now, Queue: params.Queue, }) if err != nil { @@ -194,31 +195,6 @@ func (e *Executor) JobGetStuck(ctx context.Context, params *riverdriver.JobGetSt return mapSliceError(jobs, jobRowFromInternal) } -func (e *Executor) JobInsertFast(ctx context.Context, params *riverdriver.JobInsertFastParams) (*riverdriver.JobInsertFastResult, error) { - result, err := dbsqlc.New().JobInsertFast(ctx, e.dbtx, &dbsqlc.JobInsertFastParams{ - Args: params.EncodedArgs, - CreatedAt: params.CreatedAt, - Kind: params.Kind, - MaxAttempts: int16(min(params.MaxAttempts, math.MaxInt16)), //nolint:gosec - Metadata: params.Metadata, - Priority: int16(min(params.Priority, math.MaxInt16)), //nolint:gosec - Queue: params.Queue, - ScheduledAt: params.ScheduledAt, - State: dbsqlc.RiverJobState(params.State), - Tags: params.Tags, - UniqueKey: params.UniqueKey, - UniqueStates: pgtype.Bits{Bytes: []byte{params.UniqueStates}, Len: 8, Valid: params.UniqueStates != 0}, - }) - if err != nil { - return nil, interpretError(err) - } - externalJob, err := jobRowFromInternal(&result.RiverJob) - if err != nil { - return nil, err - } - return &riverdriver.JobInsertFastResult{Job: externalJob, UniqueSkippedAsDuplicate: result.UniqueSkippedAsDuplicate}, nil -} - func (e *Executor) JobInsertFastMany(ctx context.Context, params []*riverdriver.JobInsertFastParams) ([]*riverdriver.JobInsertFastResult, error) { insertJobsParams := &dbsqlc.JobInsertFastManyParams{ Args: make([][]byte, len(params)), @@ -233,7 +209,7 @@ func (e *Executor) JobInsertFastMany(ctx context.Context, params []*riverdriver. UniqueKey: make([][]byte, len(params)), UniqueStates: make([]pgtype.Bits, len(params)), } - now := time.Now() + now := time.Now().UTC() for i := 0; i < len(params); i++ { params := params[i] @@ -279,7 +255,7 @@ func (e *Executor) JobInsertFastMany(ctx context.Context, params []*riverdriver. func (e *Executor) JobInsertFastManyNoReturning(ctx context.Context, params []*riverdriver.JobInsertFastParams) (int, error) { insertJobsParams := make([]*dbsqlc.JobInsertFastManyCopyFromParams, len(params)) - now := time.Now() + now := time.Now().UTC() for i := 0; i < len(params); i++ { params := params[i]