From 29c5d9d49423fe715f54ddbefce7441d760f65b5 Mon Sep 17 00:00:00 2001 From: Yonas Habteab Date: Tue, 17 Sep 2024 11:14:55 +0200 Subject: [PATCH] Introduce `DB#ExecTx()` method --- database/db.go | 25 +++++++++++++++++++++++++ 1 file changed, 25 insertions(+) diff --git a/database/db.go b/database/db.go index 99661cca..2fc5e131 100644 --- a/database/db.go +++ b/database/db.go @@ -769,6 +769,31 @@ func (db *DB) Delete( return db.DeleteStreamed(ctx, entityType, idsCh, onSuccess...) } +// ExecTx executes the provided function within a database transaction. +// +// Starts a new transaction, executes the provided function, and commits the transaction +// if the function succeeds. If the function returns an error, the transaction is rolled back. +// +// Returns an error if starting the transaction, executing the function, or committing the transaction fails. +func (db *DB) ExecTx(ctx context.Context, fn func(context.Context, *sqlx.Tx) error) error { + tx, err := db.BeginTxx(ctx, nil) + if err != nil { + return errors.Wrap(err, "can't start transaction") + } + // We don't expect meaningful errors from rolling back the tx other than the sql.ErrTxDone, so just ignore it. + defer func() { _ = tx.Rollback() }() + + if err := fn(ctx, tx); err != nil { + return errors.WithStack(err) + } + + if err := tx.Commit(); err != nil { + return errors.Wrap(err, "can't commit transaction") + } + + return nil +} + func (db *DB) GetSemaphoreForTable(table string) *semaphore.Weighted { db.tableSemaphoresMu.Lock() defer db.tableSemaphoresMu.Unlock()