Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support for PgBouncer transaction pooling #2447

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 3 additions & 12 deletions sql/postgres/driver.go
Original file line number Diff line number Diff line change
Expand Up @@ -164,16 +164,7 @@ func (d *Driver) Lock(ctx context.Context, name string, timeout time.Duration) (
}
return func() error {
defer conn.Close()
rows, err := conn.QueryContext(ctx, "SELECT pg_advisory_unlock($1)", id)
if err != nil {
return err
}
switch released, err := sqlx.ScanNullBool(rows); {
case err != nil:
return err
case !released.Valid || !released.Bool:
return fmt.Errorf("sql/postgres: failed releasing lock %d", id)
}
// Don't need to unlock explicitly cause transactional lock will be automatically released
return nil
}, nil
}
Expand Down Expand Up @@ -337,7 +328,7 @@ func acquire(ctx context.Context, conn schema.ExecQuerier, id uint32, timeout ti
fallthrough
// Infinite timeout.
case timeout < 0:
rows, err := conn.QueryContext(ctx, "SELECT pg_advisory_lock($1)", id)
rows, err := conn.QueryContext(ctx, "SELECT pg_advisory_xact_lock($1)", id)
if errors.Is(err, context.DeadlineExceeded) || errors.Is(err, context.Canceled) {
err = schema.ErrLocked
}
Expand All @@ -347,7 +338,7 @@ func acquire(ctx context.Context, conn schema.ExecQuerier, id uint32, timeout ti
return rows.Close()
// No timeout.
default:
rows, err := conn.QueryContext(ctx, "SELECT pg_try_advisory_lock($1)", id)
rows, err := conn.QueryContext(ctx, "SELECT pg_try_advisory_xact_lock($1)", id)
if err != nil {
return err
}
Expand Down
26 changes: 7 additions & 19 deletions sql/postgres/driver_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,14 +22,10 @@ func TestDriver_LockAcquired(t *testing.T) {
db, m, err := sqlmock.New()
require.NoError(t, err)
name, hash := "name", 797654004
m.ExpectQuery(sqltest.Escape("SELECT pg_try_advisory_lock($1)")).
m.ExpectQuery(sqltest.Escape("SELECT pg_try_advisory_xact_lock($1)")).
WithArgs(hash).
WillReturnRows(sqlmock.NewRows([]string{"pg_advisory_lock"}).AddRow(1)).
RowsWillBeClosed()
m.ExpectQuery(sqltest.Escape("SELECT pg_advisory_unlock($1)")).
WithArgs(hash).
WillReturnRows(sqlmock.NewRows([]string{"pg_advisory_unlock"}).AddRow(1)).
RowsWillBeClosed()

d := &Driver{conn: &conn{ExecQuerier: db}}
unlock, err := d.Lock(context.Background(), name, 0)
Expand All @@ -45,7 +41,7 @@ func TestDriver_LockError(t *testing.T) {
name, hash := "migrate", 979249972

t.Run("Timeout", func(t *testing.T) {
m.ExpectQuery(sqltest.Escape("SELECT pg_advisory_lock($1)")).
m.ExpectQuery(sqltest.Escape("SELECT pg_advisory_xact_lock($1)")).
WithArgs(hash).
WillReturnError(context.DeadlineExceeded).
RowsWillBeClosed()
Expand All @@ -55,7 +51,7 @@ func TestDriver_LockError(t *testing.T) {
})

t.Run("Internal", func(t *testing.T) {
m.ExpectQuery(sqltest.Escape("SELECT pg_advisory_lock($1)")).
m.ExpectQuery(sqltest.Escape("SELECT pg_advisory_xact_lock($1)")).
WithArgs(hash).
WillReturnError(io.EOF).
RowsWillBeClosed()
Expand All @@ -71,32 +67,24 @@ func TestDriver_UnlockError(t *testing.T) {
d := &Driver{conn: &conn{ExecQuerier: db}}
name, hash := "up", 1551306158
acquired := func() {
m.ExpectQuery(sqltest.Escape("SELECT pg_try_advisory_lock($1)")).
m.ExpectQuery(sqltest.Escape("SELECT pg_try_advisory_xact_lock($1)")).
WithArgs(hash).
WillReturnRows(sqlmock.NewRows([]string{"pg_try_advisory_lock"}).AddRow(1)).
WillReturnRows(sqlmock.NewRows([]string{"pg_try_advisory_xact_lock"}).AddRow(1)).
RowsWillBeClosed()
}

t.Run("NotHeld", func(t *testing.T) {
acquired()
unlock, err := d.Lock(context.Background(), name, 0)
require.NoError(t, err)
m.ExpectQuery(sqltest.Escape("SELECT pg_advisory_unlock($1)")).
WithArgs(hash).
WillReturnRows(sqlmock.NewRows([]string{"pg_advisory_unlock"}).AddRow(0)).
RowsWillBeClosed()
require.Error(t, unlock())
require.NoError(t, unlock())
})

t.Run("Internal", func(t *testing.T) {
acquired()
unlock, err := d.Lock(context.Background(), name, 0)
require.NoError(t, err)
m.ExpectQuery(sqltest.Escape("SELECT pg_advisory_unlock($1)")).
WithArgs(hash).
WillReturnRows(sqlmock.NewRows([]string{"pg_advisory_unlock"}).AddRow(nil)).
RowsWillBeClosed()
require.Error(t, unlock())
require.NoError(t, unlock())
})
}

Expand Down
Loading