Skip to content

Commit

Permalink
deprecate MigrateTx, convert tests to use schemas
Browse files Browse the repository at this point in the history
As detailed in #600, there are certain combinations of schema changes
which are not allowed to be run within the same transaction. The example
we encountered with #590 is adding a new enum value, then using it in an
immutable function during a subsequent migration. In Postgres, these
must be separated by a commit.

There are other examples of things which cannot be run in a transaction,
such as `CREATE INDEX CONCURRENTLY`. While that specific one isn't
solved here, moving away from a migrator that bundles migrations into a
single transaction will also allow us to update our migration system to
exclude certain migrations from transactions and i.e. add indexes
concurrently.
  • Loading branch information
bgentry committed Sep 18, 2024
1 parent 01044a2 commit 37fd615
Show file tree
Hide file tree
Showing 7 changed files with 220 additions and 142 deletions.
2 changes: 2 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,8 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
}
```

- **Deprecated**: The `MigrateTx` method of `rivermigrate` has been deprecated. It turns out there are certain combinations of schema changes which cannot be run within a single transaction, and the migrator now prefers to run each migration in its own transaction, one-at-a-time. `MigrateTx` will be removed in future version.

- The migrator now produces a better error in case of a non-existent migration line including suggestions for known migration lines that are similar in name to the invalid one. [PR #558](https://github.com/riverqueue/river/pull/558).

## Fixed
Expand Down
2 changes: 2 additions & 0 deletions go.work.sum
Original file line number Diff line number Diff line change
Expand Up @@ -27,11 +27,13 @@ golang.org/x/sys v0.0.0-20220722155257-8c9f86f7a55f/go.mod h1:oPkhp1MJrh7nUepCBc
golang.org/x/sys v0.20.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA=
golang.org/x/sys v0.23.0 h1:YfKFowiIMvtgl1UERQoTPPToxltDeZfbj4H7dVUCwmM=
golang.org/x/sys v0.23.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA=
golang.org/x/sys v0.25.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA=
golang.org/x/telemetry v0.0.0-20240228155512-f48c80bd79b2 h1:IRJeR9r1pYWsHKTRe/IInb7lYvbBVIqOgsX/u0mbOWY=
golang.org/x/telemetry v0.0.0-20240228155512-f48c80bd79b2/go.mod h1:TeRTkGYfJXctD9OcfyVLyj2J3IxLnKwHJR8f4D8a3YE=
golang.org/x/term v0.20.0/go.mod h1:8UkIAJTvZgivsXaD6/pH6U9ecQzZ45awqEOzuCvwpFY=
golang.org/x/term v0.23.0 h1:F6D4vR+EHoL9/sWAWgAR1H2DcHr4PareCbAaCo1RpuU=
golang.org/x/term v0.23.0/go.mod h1:DgV24QBUrK6jhZXl+20l6UWznPlwAHm1Q1mGHtydmSk=
golang.org/x/term v0.24.0/go.mod h1:lOBK/LVxemqiMij05LGJ0tzNr8xlmwBRJ81PX6wVLH8=
golang.org/x/text v0.15.0/go.mod h1:18ZOQIKpY8NJVqYksKHtTdi31H5itFRjB5/qKTNYzSU=
golang.org/x/tools v0.13.0/go.mod h1:HvlwmtVNQAhOuCjW7xxvovg8wbNq7LwfXh/k7wXUl58=
golang.org/x/tools v0.21.1-0.20240508182429-e35e4ccd0d2d h1:vU5i/LfpvrRCpgM/VPfJLg5KjxD3E+hfT1SH+d9zLwg=
Expand Down
23 changes: 12 additions & 11 deletions rivermigrate/example_migrate_database_sql_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,26 +18,27 @@ import (
func Example_migrateDatabaseSQL() {
ctx := context.Background()

dbPool, err := sql.Open("pgx", riverinternaltest.DatabaseURL("river_test_example"))
// Use a dedicated Postgres schema for this example so we can migrate and drop it at will:
schemaName := "migration_example_dbsql"
url := riverinternaltest.DatabaseURL("river_test_example") + "&search_path=" + schemaName
dbPool, err := sql.Open("pgx", url)
if err != nil {
panic(err)
}
defer dbPool.Close()

tx, err := dbPool.BeginTx(ctx, nil)
driver := riverdatabasesql.New(dbPool)
migrator, err := rivermigrate.New(driver, nil)
if err != nil {
panic(err)
}
defer tx.Rollback()

migrator, err := rivermigrate.New(riverdatabasesql.New(dbPool), nil)
if err != nil {
// Create the schema used for this example. Drop it when we're done.
// This isn't necessary outside this test.
if _, err := dbPool.ExecContext(ctx, "CREATE SCHEMA IF NOT EXISTS "+schemaName); err != nil {
panic(err)
}

// Our test database starts with a full River schema. Drop it so that we can
// demonstrate working migrations. This isn't necessary outside this test.
dropRiverSchema(ctx, migrator, tx)
defer dropRiverSchema(ctx, driver, schemaName)

printVersions := func(res *rivermigrate.MigrateResult) {
for _, version := range res.Versions {
Expand All @@ -47,7 +48,7 @@ func Example_migrateDatabaseSQL() {

// Migrate to version 3. An actual call may want to omit all MigrateOpts,
// which will default to applying all available up migrations.
res, err := migrator.MigrateTx(ctx, tx, rivermigrate.DirectionUp, &rivermigrate.MigrateOpts{
res, err := migrator.Migrate(ctx, rivermigrate.DirectionUp, &rivermigrate.MigrateOpts{
TargetVersion: 3,
})
if err != nil {
Expand All @@ -57,7 +58,7 @@ func Example_migrateDatabaseSQL() {

// Migrate down by three steps. Down migrating defaults to running only one
// step unless overridden by an option like MaxSteps or TargetVersion.
res, err = migrator.MigrateTx(ctx, tx, rivermigrate.DirectionDown, &rivermigrate.MigrateOpts{
res, err = migrator.Migrate(ctx, rivermigrate.DirectionDown, &rivermigrate.MigrateOpts{
MaxSteps: 3,
})
if err != nil {
Expand Down
32 changes: 17 additions & 15 deletions rivermigrate/example_migrate_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (
"github.com/jackc/pgx/v5/pgxpool"

"github.com/riverqueue/river/internal/riverinternaltest"
"github.com/riverqueue/river/riverdriver"
"github.com/riverqueue/river/riverdriver/riverpgxv5"
"github.com/riverqueue/river/rivermigrate"
)
Expand All @@ -17,26 +18,29 @@ import (
func Example_migrate() {
ctx := context.Background()

dbPool, err := pgxpool.NewWithConfig(ctx, riverinternaltest.DatabaseConfig("river_test_example"))
// Use a dedicated Postgres schema for this example so we can migrate and drop it at will:
schemaName := "migration_example"
poolConfig := riverinternaltest.DatabaseConfig("river_test_example")
poolConfig.ConnConfig.RuntimeParams["search_path"] = schemaName

dbPool, err := pgxpool.NewWithConfig(ctx, poolConfig)
if err != nil {
panic(err)
}
defer dbPool.Close()

tx, err := dbPool.Begin(ctx)
driver := riverpgxv5.New(dbPool)
migrator, err := rivermigrate.New(driver, nil)
if err != nil {
panic(err)
}
defer tx.Rollback(ctx)

migrator, err := rivermigrate.New(riverpgxv5.New(dbPool), nil)
if err != nil {
// Create the schema used for this example. Drop it when we're done.
// This isn't necessary outside this test.
if _, err := dbPool.Exec(ctx, "CREATE SCHEMA IF NOT EXISTS "+schemaName); err != nil {
panic(err)
}

// Our test database starts with a full River schema. Drop it so that we can
// demonstrate working migrations. This isn't necessary outside this test.
dropRiverSchema(ctx, migrator, tx)
defer dropRiverSchema(ctx, driver, schemaName)

printVersions := func(res *rivermigrate.MigrateResult) {
for _, version := range res.Versions {
Expand All @@ -46,7 +50,7 @@ func Example_migrate() {

// Migrate to version 3. An actual call may want to omit all MigrateOpts,
// which will default to applying all available up migrations.
res, err := migrator.MigrateTx(ctx, tx, rivermigrate.DirectionUp, &rivermigrate.MigrateOpts{
res, err := migrator.Migrate(ctx, rivermigrate.DirectionUp, &rivermigrate.MigrateOpts{
TargetVersion: 3,
})
if err != nil {
Expand All @@ -56,7 +60,7 @@ func Example_migrate() {

// Migrate down by three steps. Down migrating defaults to running only one
// step unless overridden by an option like MaxSteps or TargetVersion.
res, err = migrator.MigrateTx(ctx, tx, rivermigrate.DirectionDown, &rivermigrate.MigrateOpts{
res, err = migrator.Migrate(ctx, rivermigrate.DirectionDown, &rivermigrate.MigrateOpts{
MaxSteps: 3,
})
if err != nil {
Expand All @@ -73,10 +77,8 @@ func Example_migrate() {
// Migrated [DOWN] version 1
}

func dropRiverSchema[TTx any](ctx context.Context, migrator *rivermigrate.Migrator[TTx], tx TTx) {
_, err := migrator.MigrateTx(ctx, tx, rivermigrate.DirectionDown, &rivermigrate.MigrateOpts{
TargetVersion: -1,
})
func dropRiverSchema[TTx any](ctx context.Context, driver riverdriver.Driver[TTx], schemaName string) {
_, err := driver.GetExecutor().Exec(ctx, "DROP SCHEMA IF EXISTS "+schemaName+" CASCADE;")
if err != nil {
panic(err)
}
Expand Down
4 changes: 3 additions & 1 deletion rivermigrate/river_migrate.go
Original file line number Diff line number Diff line change
Expand Up @@ -326,6 +326,9 @@ func (m *Migrator[TTx]) Migrate(ctx context.Context, direction Direction, opts *
// This variant lets a caller run migrations within a transaction. Postgres DDL
// is transactional, so migration changes aren't visible until the transaction
// commits, and are rolled back if the transaction rolls back.
//
// Deprecated: Use Migrate instead. Certain migrations cannot be batched together
// in a single transaction, so this method is not recommended.
func (m *Migrator[TTx]) MigrateTx(ctx context.Context, tx TTx, direction Direction, opts *MigrateOpts) (*MigrateResult, error) {
switch direction {
case DirectionDown:
Expand Down Expand Up @@ -572,7 +575,6 @@ func (m *Migrator[TTx]) applyMigrations(ctx context.Context, exec riverdriver.Ex
}
return nil
})

if err != nil {
return nil, err
}
Expand Down
Loading

0 comments on commit 37fd615

Please sign in to comment.