Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

remove deprecated advisory lock uniqueness, consolidate insert logic #614

Merged
merged 1 commit into from
Oct 4, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,13 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0

## [Unreleased]

⚠️ Version 0.13.0 removes the original advisory lock based unique jobs implementation that was deprecated in v0.12.0. See details in the note below or the v0.12.0 release notes.

### Changed

- **Breaking change:** The advisory lock unique jobs implementation which was deprecated in v0.12.0 has been removed. Users of that feature should first upgrade to v0.12.1 to ensure they don't see any warning logs about using the deprecated advisory lock uniqueness. The new, faster unique implementation will be used automatically as long as the `UniqueOpts.ByState` list hasn't been customized to remove [required states](https://riverqueue.com/docs/unique-jobs#unique-by-state) (`pending`, `scheduled`, `available`, and `running`). As of this release, customizing `ByState` without these required states returns an error. [PR #614](https://github.com/riverqueue/river/pull/614).
- Single job inserts are now unified under the hood to use the `InsertMany` bulk insert query. This should not be noticeable to users, and the unified code path will make it easier to build new features going forward. [PR #614](https://github.com/riverqueue/river/pull/614).

## [0.12.1] - 2024-09-26

### Changed
Expand Down
121 changes: 39 additions & 82 deletions client.go
Original file line number Diff line number Diff line change
Expand Up @@ -346,7 +346,6 @@ type Client[TTx any] struct {
stopped <-chan struct{}
subscriptionManager *subscriptionManager
testSignals clientTestSignals
uniqueInserter *dbunique.UniqueInserter // deprecated fallback path for unique job insertion

// workCancel cancels the context used for all work goroutines. Normal Stop
// does not cancel that context.
Expand Down Expand Up @@ -495,10 +494,7 @@ func NewClient[TTx any](driver riverdriver.Driver[TTx], config *Config) (*Client
driver: driver,
producersByQueueName: make(map[string]*producer),
testSignals: clientTestSignals{},
uniqueInserter: baseservice.Init(archetype, &dbunique.UniqueInserter{
AdvisoryLockPrefix: config.AdvisoryLockPrefix,
}),
workCancel: func(cause error) {}, // replaced on start, but here in case StopAndCancel is called before start up
workCancel: func(cause error) {}, // replaced on start, but here in case StopAndCancel is called before start up
}
client.queues = &QueueBundle{addProducer: client.addProducer}

Expand Down Expand Up @@ -1162,10 +1158,10 @@ func (c *Client[TTx]) ID() string {
return c.config.ID
}

func insertParamsFromConfigArgsAndOptions(archetype *baseservice.Archetype, config *Config, args JobArgs, insertOpts *InsertOpts, bulk bool) (*riverdriver.JobInsertFastParams, *dbunique.UniqueOpts, error) {
func insertParamsFromConfigArgsAndOptions(archetype *baseservice.Archetype, config *Config, args JobArgs, insertOpts *InsertOpts) (*riverdriver.JobInsertFastParams, error) {
encodedArgs, err := json.Marshal(args)
if err != nil {
return nil, nil, fmt.Errorf("error marshaling args to JSON: %w", err)
return nil, fmt.Errorf("error marshaling args to JSON: %w", err)
}

if insertOpts == nil {
Expand All @@ -1187,7 +1183,7 @@ func insertParamsFromConfigArgsAndOptions(archetype *baseservice.Archetype, conf
queue := valutil.FirstNonZero(insertOpts.Queue, jobInsertOpts.Queue, rivercommon.QueueDefault)

if err := validateQueueName(queue); err != nil {
return nil, nil, err
return nil, err
}

tags := insertOpts.Tags
Expand All @@ -1199,24 +1195,24 @@ func insertParamsFromConfigArgsAndOptions(archetype *baseservice.Archetype, conf
} else {
for _, tag := range tags {
if len(tag) > 255 {
return nil, nil, errors.New("tags should be a maximum of 255 characters long")
return nil, errors.New("tags should be a maximum of 255 characters long")
}
if !tagRE.MatchString(tag) {
return nil, nil, errors.New("tags should match regex " + tagRE.String())
return nil, errors.New("tags should match regex " + tagRE.String())
}
}
}

if priority > 4 {
return nil, nil, errors.New("priority must be between 1 and 4")
return nil, errors.New("priority must be between 1 and 4")
}

uniqueOpts := insertOpts.UniqueOpts
if uniqueOpts.isEmpty() {
uniqueOpts = jobInsertOpts.UniqueOpts
}
if err := uniqueOpts.validate(); err != nil {
return nil, nil, err
return nil, err
}

metadata := insertOpts.Metadata
Expand All @@ -1236,21 +1232,13 @@ func insertParamsFromConfigArgsAndOptions(archetype *baseservice.Archetype, conf
State: rivertype.JobStateAvailable,
Tags: tags,
}
var returnUniqueOpts *dbunique.UniqueOpts
if !uniqueOpts.isEmpty() {
if uniqueOpts.isV1() {
if bulk {
return nil, nil, errors.New("bulk inserts do not support advisory lock uniqueness and cannot remove required states")
}
returnUniqueOpts = (*dbunique.UniqueOpts)(&uniqueOpts)
} else {
internalUniqueOpts := (*dbunique.UniqueOpts)(&uniqueOpts)
insertParams.UniqueKey, err = dbunique.UniqueKey(archetype.Time, internalUniqueOpts, insertParams)
if err != nil {
return nil, nil, err
}
insertParams.UniqueStates = internalUniqueOpts.StateBitmask()
internalUniqueOpts := (*dbunique.UniqueOpts)(&uniqueOpts)
insertParams.UniqueKey, err = dbunique.UniqueKey(archetype.Time, internalUniqueOpts, insertParams)
if err != nil {
return nil, err
}
insertParams.UniqueStates = internalUniqueOpts.StateBitmask()
}

switch {
Expand All @@ -1270,7 +1258,7 @@ func insertParamsFromConfigArgsAndOptions(archetype *baseservice.Archetype, conf
insertParams.State = rivertype.JobStatePending
}

return insertParams, returnUniqueOpts, nil
return insertParams, nil
}

var errNoDriverDBPool = errors.New("driver must have non-nil database pool to use non-transactional methods like Insert and InsertMany (try InsertTx or InsertManyTx instead")
Expand All @@ -1290,7 +1278,21 @@ func (c *Client[TTx]) Insert(ctx context.Context, args JobArgs, opts *InsertOpts
return nil, errNoDriverDBPool
}

return c.insert(ctx, c.driver.GetExecutor(), args, opts, false)
tx, err := c.driver.GetExecutor().Begin(ctx)
if err != nil {
return nil, err
}
defer tx.Rollback(ctx)

inserted, err := c.insert(ctx, tx, args, opts)
if err != nil {
return nil, err
}

if err := tx.Commit(ctx); err != nil {
return nil, err
}
return inserted, nil
}

// InsertTx inserts a new job with the provided args on the given transaction.
Expand All @@ -1311,52 +1313,17 @@ func (c *Client[TTx]) Insert(ctx context.Context, args JobArgs, opts *InsertOpts
// transactions, the job will not be worked until the transaction has committed,
// and if the transaction rolls back, so too is the inserted job.
func (c *Client[TTx]) InsertTx(ctx context.Context, tx TTx, args JobArgs, opts *InsertOpts) (*rivertype.JobInsertResult, error) {
return c.insert(ctx, c.driver.UnwrapExecutor(tx), args, opts, false)
return c.insert(ctx, c.driver.UnwrapExecutor(tx), args, opts)
}

func (c *Client[TTx]) insert(ctx context.Context, exec riverdriver.Executor, args JobArgs, opts *InsertOpts, bulk bool) (*rivertype.JobInsertResult, error) {
if err := c.validateJobArgs(args); err != nil {
return nil, err
}

params, uniqueOpts, err := insertParamsFromConfigArgsAndOptions(&c.baseService.Archetype, c.config, args, opts, bulk)
func (c *Client[TTx]) insert(ctx context.Context, tx riverdriver.ExecutorTx, args JobArgs, opts *InsertOpts) (*rivertype.JobInsertResult, error) {
params := []InsertManyParams{{Args: args, InsertOpts: opts}}
results, err := c.insertMany(ctx, tx, params)
if err != nil {
return nil, err
}

tx, err := exec.Begin(ctx)
if err != nil {
return nil, err
}
defer tx.Rollback(ctx)

// TODO: consolidate insertion paths for single + multi, remove deprecated uniqueness design
var jobInsertRes *riverdriver.JobInsertFastResult
if uniqueOpts == nil {
jobInsertRes, err = tx.JobInsertFast(ctx, params)
if err != nil {
return nil, err
}
} else {
if bulk {
return nil, errors.New("bulk inserts do not support advisory lock uniqueness")
}
// Old deprecated advisory lock route
c.baseService.Logger.WarnContext(ctx, "Using deprecated advisory lock uniqueness for job insert")
jobInsertRes, err = c.uniqueInserter.JobInsert(ctx, tx, params, uniqueOpts)
if err != nil {
return nil, err
}
}

if err := c.maybeNotifyInsert(ctx, tx, params.State, params.Queue); err != nil {
return nil, err
}
if err := tx.Commit(ctx); err != nil {
return nil, err
}

return (*rivertype.JobInsertResult)(jobInsertRes), nil
return results[0], nil
}

// InsertManyParams encapsulates a single job combined with insert options for
Expand Down Expand Up @@ -1455,13 +1422,10 @@ func (c *Client[TTx]) insertMany(ctx context.Context, tx riverdriver.ExecutorTx,
})
}

// The shared code path for all InsertMany methods. It takes a function that
// executes the actual insert operation and allows for different implementations
// of the insert query to be passed in, each mapping their results back to a
// common result type.
//
// TODO(bgentry): this isn't yet used for the single insert path. The only thing
// blocking that is the removal of advisory lock unique inserts.
// The shared code path for all Insert and InsertMany methods. It takes a
// function that executes the actual insert operation and allows for different
// implementations of the insert query to be passed in, each mapping their
// results back to a common result type.
func (c *Client[TTx]) insertManyShared(
ctx context.Context,
tx riverdriver.ExecutorTx,
Expand Down Expand Up @@ -1503,7 +1467,7 @@ func (c *Client[TTx]) insertManyParams(params []InsertManyParams) ([]*riverdrive
return nil, err
}

insertParamsItem, _, err := insertParamsFromConfigArgsAndOptions(&c.baseService.Archetype, c.config, param.Args, param.InsertOpts, true)
insertParamsItem, err := insertParamsFromConfigArgsAndOptions(&c.baseService.Archetype, c.config, param.Args, param.InsertOpts)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -1598,13 +1562,6 @@ func (c *Client[TTx]) insertManyFast(ctx context.Context, tx riverdriver.Executo
return len(results), nil
}

func (c *Client[TTx]) maybeNotifyInsert(ctx context.Context, tx riverdriver.ExecutorTx, state rivertype.JobState, queue string) error {
if state != rivertype.JobStateAvailable {
return nil
}
return c.maybeNotifyInsertForQueues(ctx, tx, []string{queue})
}

// Notify the given queues that new jobs are available. The queues list will be
// deduplicated and each will be checked to see if it is due for an insert
// notification from this client.
Expand Down
Loading
Loading