Skip to content

Commit

Permalink
pgx: fast batches: always create an explicit tx
Browse files Browse the repository at this point in the history
  • Loading branch information
mitjat committed Feb 10, 2024
1 parent 7bcde02 commit bc896d3
Showing 1 changed file with 9 additions and 31 deletions.
40 changes: 9 additions & 31 deletions storage/postgres/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -100,57 +100,35 @@ func (c *Client) SendBatch(ctx context.Context, batch *storage.QueryBatch) error
// However, it reports errors poorly: If _any_ query is syntactically
// malformed, called with the wrong number of args, or has a type conversion problem,
// pgx will report the _first_ query as failing.
//
// For efficiency and simplicity, the method does not use explicit transactions
// with BEGIN/COMMIT unless required by the tx options. Even so, the batch is processed
// atomically, because:
// 1) We use pgx in its default QueryExecMode.
// 2) This in turn makes pgx use postgresql in pipeline mode.
// 3) Postgresql pipeline mode implies transactions-like behavior:
// https://www.postgresql.org/docs/15/libpq-pipeline-mode.html#LIBPQ-PIPELINE-ERRORS

func (c *Client) sendBatchWithOptionsFast(ctx context.Context, batch *storage.QueryBatch, opts pgx.TxOptions) error {
pgxBatch := batch.AsPgxBatch()
var batchResults pgx.BatchResults
var emptyTxOptions pgx.TxOptions
var tx pgx.Tx
var err error

// Begin a transaction.
useExplicitTx := opts != emptyTxOptions // see function docstring for more info
if useExplicitTx {
// set up our own tx with the specified options
tx, err = c.pool.BeginTx(ctx, opts)
if err != nil {
return fmt.Errorf("failed to begin tx: %w", err)
}
batchResults = c.pool.SendBatch(ctx, &pgxBatch)
} else {
// use implicit tx provided by SendBatch; see https://github.com/jackc/pgx/issues/879
batchResults = c.pool.SendBatch(ctx, &pgxBatch)
tx, err = c.pool.BeginTx(ctx, opts)
if err != nil {
return fmt.Errorf("failed to begin tx: %w", err)
}
batchResults = tx.SendBatch(ctx, &pgxBatch)
defer common.CloseOrLog(batchResults, c.logger)

// Read the results of indiviual queries in the batch.
for i := 0; i < pgxBatch.Len(); i++ {
if _, err := batchResults.Exec(); err != nil {
rollbackErr := ""
if useExplicitTx {
err2 := tx.Rollback(ctx)
if err2 != nil {
rollbackErr = fmt.Sprintf("; also failed to rollback tx: %s", err2.Error())
}
err2 := tx.Rollback(ctx)
if err2 != nil {
rollbackErr = fmt.Sprintf("; also failed to roll back tx: %s", err2.Error())
}
return fmt.Errorf("query %d %v: %w%s", i, batch.Queries()[i], err, rollbackErr)
}
}

// Commit the tx.
if useExplicitTx {
err := tx.Commit(ctx)
if err != nil {
return fmt.Errorf("failed to commit tx: %w", err)
}
if err = tx.Commit(ctx); err != nil {
return fmt.Errorf("failed to commit tx: %w", err)
}
return nil
}
Expand Down

0 comments on commit bc896d3

Please sign in to comment.