From 931a84f276d416c54672343fc92a13f03fc2498b Mon Sep 17 00:00:00 2001 From: Mitja T Date: Fri, 9 Feb 2024 16:07:14 -0800 Subject: [PATCH] pgx: fast batches: always create an explicit tx --- storage/postgres/client.go | 43 +++++++++----------------------------- 1 file changed, 10 insertions(+), 33 deletions(-) diff --git a/storage/postgres/client.go b/storage/postgres/client.go index 5cec9be00..9393665bc 100644 --- a/storage/postgres/client.go +++ b/storage/postgres/client.go @@ -100,57 +100,34 @@ func (c *Client) SendBatch(ctx context.Context, batch *storage.QueryBatch) error // However, it reports errors poorly: If _any_ query is syntactically // malformed, called with the wrong number of args, or has a type conversion problem, // pgx will report the _first_ query as failing. -// -// For efficiency and simplicity, the method does not use explicit transactions -// with BEGIN/COMMIT unless required by the tx options. Even so, the batch is processed -// atomically, because: -// 1) We use pgx in its default QueryExecMode. -// 2) This in turn makes pgx use postgresql in pipeline mode. -// 3) Postgresql pipeline mode implies transactions-like behavior: -// https://www.postgresql.org/docs/15/libpq-pipeline-mode.html#LIBPQ-PIPELINE-ERRORS - func (c *Client) sendBatchWithOptionsFast(ctx context.Context, batch *storage.QueryBatch, opts pgx.TxOptions) error { pgxBatch := batch.AsPgxBatch() var batchResults pgx.BatchResults - var emptyTxOptions pgx.TxOptions var tx pgx.Tx var err error // Begin a transaction. - useExplicitTx := opts != emptyTxOptions // see function docstring for more info - if useExplicitTx { - // set up our own tx with the specified options - tx, err = c.pool.BeginTx(ctx, opts) - if err != nil { - return fmt.Errorf("failed to begin tx: %w", err) - } - batchResults = c.pool.SendBatch(ctx, &pgxBatch) - } else { - // use implicit tx provided by SendBatch; see https://github.com/jackc/pgx/issues/879 - batchResults = c.pool.SendBatch(ctx, &pgxBatch) + if tx, err = c.pool.BeginTx(ctx, opts); err != nil { + return fmt.Errorf("failed to begin tx: %w", err) } + batchResults = tx.SendBatch(ctx, &pgxBatch) defer common.CloseOrLog(batchResults, c.logger) // Read the results of indiviual queries in the batch. for i := 0; i < pgxBatch.Len(); i++ { - if _, err := batchResults.Exec(); err != nil { + if _, err2 := batchResults.Exec(); err2 != nil { rollbackErr := "" - if useExplicitTx { - err2 := tx.Rollback(ctx) - if err2 != nil { - rollbackErr = fmt.Sprintf("; also failed to rollback tx: %s", err2.Error()) - } + err3 := tx.Rollback(ctx) + if err3 != nil { + rollbackErr = fmt.Sprintf("; also failed to roll back tx: %s", err3.Error()) } - return fmt.Errorf("query %d %v: %w%s", i, batch.Queries()[i], err, rollbackErr) + return fmt.Errorf("query %d %v: %w%s", i, batch.Queries()[i], err2, rollbackErr) } } // Commit the tx. - if useExplicitTx { - err := tx.Commit(ctx) - if err != nil { - return fmt.Errorf("failed to commit tx: %w", err) - } + if err = tx.Commit(ctx); err != nil { + return fmt.Errorf("failed to commit tx: %w", err) } return nil }