From bc896d3ce0e878c44aa8ff2f2d3357b607165b74 Mon Sep 17 00:00:00 2001 From: Mitja T Date: Fri, 9 Feb 2024 16:07:14 -0800 Subject: [PATCH] pgx: fast batches: always create an explicit tx --- storage/postgres/client.go | 40 +++++++++----------------------------- 1 file changed, 9 insertions(+), 31 deletions(-) diff --git a/storage/postgres/client.go b/storage/postgres/client.go index 5cec9be00..4aa36f2f5 100644 --- a/storage/postgres/client.go +++ b/storage/postgres/client.go @@ -100,57 +100,35 @@ func (c *Client) SendBatch(ctx context.Context, batch *storage.QueryBatch) error // However, it reports errors poorly: If _any_ query is syntactically // malformed, called with the wrong number of args, or has a type conversion problem, // pgx will report the _first_ query as failing. -// -// For efficiency and simplicity, the method does not use explicit transactions -// with BEGIN/COMMIT unless required by the tx options. Even so, the batch is processed -// atomically, because: -// 1) We use pgx in its default QueryExecMode. -// 2) This in turn makes pgx use postgresql in pipeline mode. -// 3) Postgresql pipeline mode implies transactions-like behavior: -// https://www.postgresql.org/docs/15/libpq-pipeline-mode.html#LIBPQ-PIPELINE-ERRORS - func (c *Client) sendBatchWithOptionsFast(ctx context.Context, batch *storage.QueryBatch, opts pgx.TxOptions) error { pgxBatch := batch.AsPgxBatch() var batchResults pgx.BatchResults - var emptyTxOptions pgx.TxOptions var tx pgx.Tx var err error // Begin a transaction. - useExplicitTx := opts != emptyTxOptions // see function docstring for more info - if useExplicitTx { - // set up our own tx with the specified options - tx, err = c.pool.BeginTx(ctx, opts) - if err != nil { - return fmt.Errorf("failed to begin tx: %w", err) - } - batchResults = c.pool.SendBatch(ctx, &pgxBatch) - } else { - // use implicit tx provided by SendBatch; see https://github.com/jackc/pgx/issues/879 - batchResults = c.pool.SendBatch(ctx, &pgxBatch) + tx, err = c.pool.BeginTx(ctx, opts) + if err != nil { + return fmt.Errorf("failed to begin tx: %w", err) } + batchResults = tx.SendBatch(ctx, &pgxBatch) defer common.CloseOrLog(batchResults, c.logger) // Read the results of indiviual queries in the batch. for i := 0; i < pgxBatch.Len(); i++ { if _, err := batchResults.Exec(); err != nil { rollbackErr := "" - if useExplicitTx { - err2 := tx.Rollback(ctx) - if err2 != nil { - rollbackErr = fmt.Sprintf("; also failed to rollback tx: %s", err2.Error()) - } + err2 := tx.Rollback(ctx) + if err2 != nil { + rollbackErr = fmt.Sprintf("; also failed to roll back tx: %s", err2.Error()) } return fmt.Errorf("query %d %v: %w%s", i, batch.Queries()[i], err, rollbackErr) } } // Commit the tx. - if useExplicitTx { - err := tx.Commit(ctx) - if err != nil { - return fmt.Errorf("failed to commit tx: %w", err) - } + if err = tx.Commit(ctx); err != nil { + return fmt.Errorf("failed to commit tx: %w", err) } return nil }