Skip to content

Commit

Permalink
Breaking: Rename old InsertMany, add new version with return values (#…
Browse files Browse the repository at this point in the history
…589)

* add JobInsertManyReturning to drivers

This new query allows many rows to be inserted (up to ~7280 as of now)
while also allowing the new records to be returned. While this is not
quite as fast as the `COPY FROM` option when loading many rows, it
provides a better UX for the vast majority of use cases.

It _does_ require that we ditch sqlc for this one query, because sqlc
does not support the multirow values insert syntax due to the dynamic
nature of the param placeholders.

* rename InsertMany to InsertManyFast

* new InsertMany/InsertManyTx with return values

This adds new implementations of `InsertMany` / `InsertManyTx` that use
the multirow `VALUES` syntax to allow the new rows to be returned upon
insert. The alternative `COPY FROM ` implementation has been renamed to
`InsertManyFast` / `InsertManyFastTx`. The expectation is that these
forms will only be needed in cases where an extremely large number of
records is being inserted simultaneously, whereas the new form is more
user-friendly for the vast majority of other cases.

* renames
  • Loading branch information
bgentry authored Sep 13, 2024
1 parent 7d98386 commit 67c8a8a
Show file tree
Hide file tree
Showing 13 changed files with 1,046 additions and 52 deletions.
6 changes: 5 additions & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,11 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0

### Changed

⚠️ Version 0.12.0 has a small breaking change in `rivermigrate`. As before, we try never to make breaking changes, but this one was deemed worth it because it's quite small and may help avoid panics.
⚠️ Version 0.12.0 has two small breaking changes, one for `InsertMany` and one in `rivermigrate`. As before, we try never to make breaking changes, but these ones were deemed worth it because of minimal impact and to help avoid panics.

- **Breaking change:** `Client.InsertMany` / `InsertManyTx` now return the inserted rows rather than merely returning a count of the inserted rows. The new implementations no longer use Postgres' `COPY FROM` protocol in order to facilitate return values.

Users who relied on the return count can merely wrap the returned rows in a `len()` to return to that behavior, or you can continue using the old APIs using their new names `InsertManyFast` and `InsertManyFastTx`. [PR #589](https://github.com/riverqueue/river/pull/589).

- **Breaking change:** `rivermigrate.New` now returns a possible error along with a migrator. An error may be returned, for example, when a migration line is configured that doesn't exist. [PR #558](https://github.com/riverqueue/river/pull/558).

Expand Down
151 changes: 141 additions & 10 deletions client.go
Original file line number Diff line number Diff line change
Expand Up @@ -1326,7 +1326,138 @@ type InsertManyParams struct {
InsertOpts *InsertOpts
}

// InsertMany inserts many jobs at once using Postgres' `COPY FROM` mechanism,
// InsertMany inserts many jobs at once. Each job is inserted as an
// InsertManyParams tuple, which takes job args along with an optional set of
// insert options, which override insert options provided by an
// JobArgsWithInsertOpts.InsertOpts implementation or any client-level defaults.
// The provided context is used for the underlying Postgres inserts and can be
// used to cancel the operation or apply a timeout.
//
// count, err := client.InsertMany(ctx, []river.InsertManyParams{
// {Args: BatchInsertArgs{}},
// {Args: BatchInsertArgs{}, InsertOpts: &river.InsertOpts{Priority: 3}},
// })
// if err != nil {
// // handle error
// }
//
// Job uniqueness is not respected when using InsertMany due to unique inserts
// using an internal transaction and advisory lock that might lead to
// significant lock contention. Insert unique jobs using Insert instead.
//
// Job uniqueness is not respected when using InsertMany due to unique inserts
// using an internal transaction and advisory lock that might lead to
// significant lock contention. Insert unique jobs using Insert instead.
func (c *Client[TTx]) InsertMany(ctx context.Context, params []InsertManyParams) ([]*rivertype.JobInsertResult, error) {
if !c.driver.HasPool() {
return nil, errNoDriverDBPool
}

tx, err := c.driver.GetExecutor().Begin(ctx)
if err != nil {
return nil, err
}
defer tx.Rollback(ctx)

inserted, err := c.insertMany(ctx, tx, params)
if err != nil {
return nil, err
}

if err := tx.Commit(ctx); err != nil {
return nil, err
}
return inserted, nil
}

// InsertManyTx inserts many jobs at once. Each job is inserted as an
// InsertManyParams tuple, which takes job args along with an optional set of
// insert options, which override insert options provided by an
// JobArgsWithInsertOpts.InsertOpts implementation or any client-level defaults.
// The provided context is used for the underlying Postgres inserts and can be
// used to cancel the operation or apply a timeout.
//
// count, err := client.InsertManyTx(ctx, tx, []river.InsertManyParams{
// {Args: BatchInsertArgs{}},
// {Args: BatchInsertArgs{}, InsertOpts: &river.InsertOpts{Priority: 3}},
// })
// if err != nil {
// // handle error
// }
//
// Job uniqueness is not respected when using InsertMany due to unique inserts
// using an internal transaction and advisory lock that might lead to
// significant lock contention. Insert unique jobs using Insert instead.
//
// This variant lets a caller insert jobs atomically alongside other database
// changes. An inserted job isn't visible to be worked until the transaction
// commits, and if the transaction rolls back, so too is the inserted job.
func (c *Client[TTx]) InsertManyTx(ctx context.Context, tx TTx, params []InsertManyParams) ([]*rivertype.JobInsertResult, error) {
exec := c.driver.UnwrapExecutor(tx)
return c.insertMany(ctx, exec, params)
}

func (c *Client[TTx]) insertMany(ctx context.Context, tx riverdriver.ExecutorTx, params []InsertManyParams) ([]*rivertype.JobInsertResult, error) {
insertParams, err := c.insertManyParams(params)
if err != nil {
return nil, err
}

jobRows, err := tx.JobInsertFastMany(ctx, insertParams)
if err != nil {
return nil, err
}

queues := make([]string, 0, 10)
for _, params := range insertParams {
if params.State == rivertype.JobStateAvailable {
queues = append(queues, params.Queue)
}
}
if err := c.maybeNotifyInsertForQueues(ctx, tx, queues); err != nil {
return nil, err
}

return sliceutil.Map(jobRows,
func(jobRow *rivertype.JobRow) *rivertype.JobInsertResult {
return &rivertype.JobInsertResult{Job: jobRow}
},
), nil
}

// Validates input parameters for a batch insert operation and generates a set
// of batch insert parameters.
func (c *Client[TTx]) insertManyParams(params []InsertManyParams) ([]*riverdriver.JobInsertFastParams, error) {
if len(params) < 1 {
return nil, errors.New("no jobs to insert")
}

insertParams := make([]*riverdriver.JobInsertFastParams, len(params))
for i, param := range params {
if err := c.validateJobArgs(param.Args); err != nil {
return nil, err
}

if param.InsertOpts != nil {
// UniqueOpts aren't supported for batch inserts because they use PG
// advisory locks to work, and taking many locks simultaneously could
// easily lead to contention and deadlocks.
if !param.InsertOpts.UniqueOpts.isEmpty() {
return nil, errors.New("UniqueOpts are not supported for batch inserts")
}
}

var err error
insertParams[i], _, err = insertParamsFromConfigArgsAndOptions(&c.baseService.Archetype, c.config, param.Args, param.InsertOpts)
if err != nil {
return nil, err
}
}

return insertParams, nil
}

// InsertManyFast inserts many jobs at once using Postgres' `COPY FROM` mechanism,
// making the operation quite fast and memory efficient. Each job is inserted as
// an InsertManyParams tuple, which takes job args along with an optional set of
// insert options, which override insert options provided by an
Expand All @@ -1345,12 +1476,12 @@ type InsertManyParams struct {
// Job uniqueness is not respected when using InsertMany due to unique inserts
// using an internal transaction and advisory lock that might lead to
// significant lock contention. Insert unique jobs using Insert instead.
func (c *Client[TTx]) InsertMany(ctx context.Context, params []InsertManyParams) (int, error) {
func (c *Client[TTx]) InsertManyFast(ctx context.Context, params []InsertManyParams) (int, error) {
if !c.driver.HasPool() {
return 0, errNoDriverDBPool
}

insertParams, err := c.insertManyParams(params)
insertParams, err := c.insertManyFastParams(params)
if err != nil {
return 0, err
}
Expand All @@ -1362,7 +1493,7 @@ func (c *Client[TTx]) InsertMany(ctx context.Context, params []InsertManyParams)
}
defer tx.Rollback(ctx)

inserted, err := c.insertFastMany(ctx, tx, insertParams)
inserted, err := c.insertManyFast(ctx, tx, insertParams)
if err != nil {
return 0, err
}
Expand Down Expand Up @@ -1395,18 +1526,18 @@ func (c *Client[TTx]) InsertMany(ctx context.Context, params []InsertManyParams)
// This variant lets a caller insert jobs atomically alongside other database
// changes. An inserted job isn't visible to be worked until the transaction
// commits, and if the transaction rolls back, so too is the inserted job.
func (c *Client[TTx]) InsertManyTx(ctx context.Context, tx TTx, params []InsertManyParams) (int, error) {
insertParams, err := c.insertManyParams(params)
func (c *Client[TTx]) InsertManyFastTx(ctx context.Context, tx TTx, params []InsertManyParams) (int, error) {
insertParams, err := c.insertManyFastParams(params)
if err != nil {
return 0, err
}

exec := c.driver.UnwrapExecutor(tx)
return c.insertFastMany(ctx, exec, insertParams)
return c.insertManyFast(ctx, exec, insertParams)
}

func (c *Client[TTx]) insertFastMany(ctx context.Context, tx riverdriver.ExecutorTx, insertParams []*riverdriver.JobInsertFastParams) (int, error) {
inserted, err := tx.JobInsertFastMany(ctx, insertParams)
func (c *Client[TTx]) insertManyFast(ctx context.Context, tx riverdriver.ExecutorTx, insertParams []*riverdriver.JobInsertFastParams) (int, error) {
inserted, err := tx.JobInsertFastManyNoReturning(ctx, insertParams)
if err != nil {
return inserted, err
}
Expand All @@ -1425,7 +1556,7 @@ func (c *Client[TTx]) insertFastMany(ctx context.Context, tx riverdriver.Executo

// Validates input parameters for an a batch insert operation and generates a
// set of batch insert parameters.
func (c *Client[TTx]) insertManyParams(params []InsertManyParams) ([]*riverdriver.JobInsertFastParams, error) {
func (c *Client[TTx]) insertManyFastParams(params []InsertManyParams) ([]*riverdriver.JobInsertFastParams, error) {
if len(params) < 1 {
return nil, errors.New("no jobs to insert")
}
Expand Down
Loading

0 comments on commit 67c8a8a

Please sign in to comment.