diff --git a/CHANGELOG.md b/CHANGELOG.md index 2ef0e0dc..e3b28cad 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -34,6 +34,8 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 } ``` +- **Deprecated**: The `MigrateTx` method of `rivermigrate` has been deprecated. It turns out there are certain combinations of schema changes which cannot be run within a single transaction, and the migrator now prefers to run each migration in its own transaction, one-at-a-time. `MigrateTx` will be removed in future version. + - The migrator now produces a better error in case of a non-existent migration line including suggestions for known migration lines that are similar in name to the invalid one. [PR #558](https://github.com/riverqueue/river/pull/558). ## Fixed diff --git a/go.work.sum b/go.work.sum index dde2f1fd..b9b39ddb 100644 --- a/go.work.sum +++ b/go.work.sum @@ -27,11 +27,13 @@ golang.org/x/sys v0.0.0-20220722155257-8c9f86f7a55f/go.mod h1:oPkhp1MJrh7nUepCBc golang.org/x/sys v0.20.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA= golang.org/x/sys v0.23.0 h1:YfKFowiIMvtgl1UERQoTPPToxltDeZfbj4H7dVUCwmM= golang.org/x/sys v0.23.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA= +golang.org/x/sys v0.25.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA= golang.org/x/telemetry v0.0.0-20240228155512-f48c80bd79b2 h1:IRJeR9r1pYWsHKTRe/IInb7lYvbBVIqOgsX/u0mbOWY= golang.org/x/telemetry v0.0.0-20240228155512-f48c80bd79b2/go.mod h1:TeRTkGYfJXctD9OcfyVLyj2J3IxLnKwHJR8f4D8a3YE= golang.org/x/term v0.20.0/go.mod h1:8UkIAJTvZgivsXaD6/pH6U9ecQzZ45awqEOzuCvwpFY= golang.org/x/term v0.23.0 h1:F6D4vR+EHoL9/sWAWgAR1H2DcHr4PareCbAaCo1RpuU= golang.org/x/term v0.23.0/go.mod h1:DgV24QBUrK6jhZXl+20l6UWznPlwAHm1Q1mGHtydmSk= +golang.org/x/term v0.24.0/go.mod h1:lOBK/LVxemqiMij05LGJ0tzNr8xlmwBRJ81PX6wVLH8= golang.org/x/text v0.15.0/go.mod h1:18ZOQIKpY8NJVqYksKHtTdi31H5itFRjB5/qKTNYzSU= golang.org/x/tools v0.13.0/go.mod h1:HvlwmtVNQAhOuCjW7xxvovg8wbNq7LwfXh/k7wXUl58= golang.org/x/tools v0.21.1-0.20240508182429-e35e4ccd0d2d h1:vU5i/LfpvrRCpgM/VPfJLg5KjxD3E+hfT1SH+d9zLwg= diff --git a/rivermigrate/example_migrate_database_sql_test.go b/rivermigrate/example_migrate_database_sql_test.go index 9fd04cd7..7751971b 100644 --- a/rivermigrate/example_migrate_database_sql_test.go +++ b/rivermigrate/example_migrate_database_sql_test.go @@ -18,26 +18,27 @@ import ( func Example_migrateDatabaseSQL() { ctx := context.Background() - dbPool, err := sql.Open("pgx", riverinternaltest.DatabaseURL("river_test_example")) + // Use a dedicated Postgres schema for this example so we can migrate and drop it at will: + schemaName := "migration_example_dbsql" + url := riverinternaltest.DatabaseURL("river_test_example") + "&search_path=" + schemaName + dbPool, err := sql.Open("pgx", url) if err != nil { panic(err) } defer dbPool.Close() - tx, err := dbPool.BeginTx(ctx, nil) + driver := riverdatabasesql.New(dbPool) + migrator, err := rivermigrate.New(driver, nil) if err != nil { panic(err) } - defer tx.Rollback() - migrator, err := rivermigrate.New(riverdatabasesql.New(dbPool), nil) - if err != nil { + // Create the schema used for this example. Drop it when we're done. + // This isn't necessary outside this test. + if _, err := dbPool.ExecContext(ctx, "CREATE SCHEMA IF NOT EXISTS "+schemaName); err != nil { panic(err) } - - // Our test database starts with a full River schema. Drop it so that we can - // demonstrate working migrations. This isn't necessary outside this test. - dropRiverSchema(ctx, migrator, tx) + defer dropRiverSchema(ctx, driver, schemaName) printVersions := func(res *rivermigrate.MigrateResult) { for _, version := range res.Versions { @@ -47,7 +48,7 @@ func Example_migrateDatabaseSQL() { // Migrate to version 3. An actual call may want to omit all MigrateOpts, // which will default to applying all available up migrations. - res, err := migrator.MigrateTx(ctx, tx, rivermigrate.DirectionUp, &rivermigrate.MigrateOpts{ + res, err := migrator.Migrate(ctx, rivermigrate.DirectionUp, &rivermigrate.MigrateOpts{ TargetVersion: 3, }) if err != nil { @@ -57,7 +58,7 @@ func Example_migrateDatabaseSQL() { // Migrate down by three steps. Down migrating defaults to running only one // step unless overridden by an option like MaxSteps or TargetVersion. - res, err = migrator.MigrateTx(ctx, tx, rivermigrate.DirectionDown, &rivermigrate.MigrateOpts{ + res, err = migrator.Migrate(ctx, rivermigrate.DirectionDown, &rivermigrate.MigrateOpts{ MaxSteps: 3, }) if err != nil { diff --git a/rivermigrate/example_migrate_test.go b/rivermigrate/example_migrate_test.go index 47b572bb..b9cdf6b0 100644 --- a/rivermigrate/example_migrate_test.go +++ b/rivermigrate/example_migrate_test.go @@ -8,6 +8,7 @@ import ( "github.com/jackc/pgx/v5/pgxpool" "github.com/riverqueue/river/internal/riverinternaltest" + "github.com/riverqueue/river/riverdriver" "github.com/riverqueue/river/riverdriver/riverpgxv5" "github.com/riverqueue/river/rivermigrate" ) @@ -17,26 +18,29 @@ import ( func Example_migrate() { ctx := context.Background() - dbPool, err := pgxpool.NewWithConfig(ctx, riverinternaltest.DatabaseConfig("river_test_example")) + // Use a dedicated Postgres schema for this example so we can migrate and drop it at will: + schemaName := "migration_example" + poolConfig := riverinternaltest.DatabaseConfig("river_test_example") + poolConfig.ConnConfig.RuntimeParams["search_path"] = schemaName + + dbPool, err := pgxpool.NewWithConfig(ctx, poolConfig) if err != nil { panic(err) } defer dbPool.Close() - tx, err := dbPool.Begin(ctx) + driver := riverpgxv5.New(dbPool) + migrator, err := rivermigrate.New(driver, nil) if err != nil { panic(err) } - defer tx.Rollback(ctx) - migrator, err := rivermigrate.New(riverpgxv5.New(dbPool), nil) - if err != nil { + // Create the schema used for this example. Drop it when we're done. + // This isn't necessary outside this test. + if _, err := dbPool.Exec(ctx, "CREATE SCHEMA IF NOT EXISTS "+schemaName); err != nil { panic(err) } - - // Our test database starts with a full River schema. Drop it so that we can - // demonstrate working migrations. This isn't necessary outside this test. - dropRiverSchema(ctx, migrator, tx) + defer dropRiverSchema(ctx, driver, schemaName) printVersions := func(res *rivermigrate.MigrateResult) { for _, version := range res.Versions { @@ -46,7 +50,7 @@ func Example_migrate() { // Migrate to version 3. An actual call may want to omit all MigrateOpts, // which will default to applying all available up migrations. - res, err := migrator.MigrateTx(ctx, tx, rivermigrate.DirectionUp, &rivermigrate.MigrateOpts{ + res, err := migrator.Migrate(ctx, rivermigrate.DirectionUp, &rivermigrate.MigrateOpts{ TargetVersion: 3, }) if err != nil { @@ -56,7 +60,7 @@ func Example_migrate() { // Migrate down by three steps. Down migrating defaults to running only one // step unless overridden by an option like MaxSteps or TargetVersion. - res, err = migrator.MigrateTx(ctx, tx, rivermigrate.DirectionDown, &rivermigrate.MigrateOpts{ + res, err = migrator.Migrate(ctx, rivermigrate.DirectionDown, &rivermigrate.MigrateOpts{ MaxSteps: 3, }) if err != nil { @@ -73,10 +77,8 @@ func Example_migrate() { // Migrated [DOWN] version 1 } -func dropRiverSchema[TTx any](ctx context.Context, migrator *rivermigrate.Migrator[TTx], tx TTx) { - _, err := migrator.MigrateTx(ctx, tx, rivermigrate.DirectionDown, &rivermigrate.MigrateOpts{ - TargetVersion: -1, - }) +func dropRiverSchema[TTx any](ctx context.Context, driver riverdriver.Driver[TTx], schemaName string) { + _, err := driver.GetExecutor().Exec(ctx, "DROP SCHEMA IF EXISTS "+schemaName+" CASCADE;") if err != nil { panic(err) } diff --git a/rivermigrate/river_migrate.go b/rivermigrate/river_migrate.go index 4eb9df8d..e7e03aa5 100644 --- a/rivermigrate/river_migrate.go +++ b/rivermigrate/river_migrate.go @@ -326,6 +326,9 @@ func (m *Migrator[TTx]) Migrate(ctx context.Context, direction Direction, opts * // This variant lets a caller run migrations within a transaction. Postgres DDL // is transactional, so migration changes aren't visible until the transaction // commits, and are rolled back if the transaction rolls back. +// +// Deprecated: Use Migrate instead. Certain migrations cannot be batched together +// in a single transaction, so this method is not recommended. func (m *Migrator[TTx]) MigrateTx(ctx context.Context, tx TTx, direction Direction, opts *MigrateOpts) (*MigrateResult, error) { switch direction { case DirectionDown: @@ -572,7 +575,6 @@ func (m *Migrator[TTx]) applyMigrations(ctx context.Context, exec riverdriver.Ex } return nil }) - if err != nil { return nil, err } diff --git a/rivermigrate/river_migrate_test.go b/rivermigrate/river_migrate_test.go index 3c6c752c..2fb96dac 100644 --- a/rivermigrate/river_migrate_test.go +++ b/rivermigrate/river_migrate_test.go @@ -24,6 +24,7 @@ import ( "github.com/riverqueue/river/riverdriver/riverdatabasesql" "github.com/riverqueue/river/riverdriver/riverpgxv5" "github.com/riverqueue/river/rivershared/riversharedtest" + "github.com/riverqueue/river/rivershared/util/randutil" "github.com/riverqueue/river/rivershared/util/sliceutil" ) @@ -73,31 +74,37 @@ func TestMigrator(t *testing.T) { dbPool *pgxpool.Pool driver *driverWithAlternateLine logger *slog.Logger - tx pgx.Tx } setup := func(t *testing.T) (*Migrator[pgx.Tx], *testBundle) { t.Helper() - // The test suite largely works fine with test transactions, but due to - // the invasive nature of changing schemas, it's quite easy to have test - // transactions deadlock with each other as they run in parallel. Here - // we use test DBs instead of test transactions, but this could be - // changed to test transactions as long as test cases were made to run - // non-parallel. - dbPool := riverinternaltest.TestDB(ctx, t) + // Not all migrations can be executed together in a single transaction. + // Examples include `CREATE INDEX CONCURRENTLY`, or adding an enum value + // that's used by a later migration. As such, the migrator and its tests + // must use a full database with commits between each migration. + // + // To make this easier to clean up afterward, we create a new, clean schema + // for each test run and then drop it afterward. + baseDBPool := riverinternaltest.TestDB(ctx, t) + schemaName := "river_migrate_test_" + randutil.Hex(8) + _, err := baseDBPool.Exec(ctx, "CREATE SCHEMA "+schemaName) + require.NoError(t, err) + + t.Cleanup(func() { + _, err := baseDBPool.Exec(ctx, fmt.Sprintf("DROP SCHEMA %s CASCADE", schemaName)) + require.NoError(t, err) + }) - // Despite being in an isolated database, we still start a transaction - // because we don't want schema changes we make to persist. - tx, err := dbPool.Begin(ctx) + newSchemaConfig := baseDBPool.Config() + newSchemaConfig.ConnConfig.RuntimeParams["search_path"] = schemaName + newSchemaPool, err := pgxpool.NewWithConfig(ctx, newSchemaConfig) require.NoError(t, err) - t.Cleanup(func() { _ = tx.Rollback(ctx) }) bundle := &testBundle{ - dbPool: dbPool, - driver: &driverWithAlternateLine{Driver: riverpgxv5.New(dbPool)}, + dbPool: newSchemaPool, + driver: &driverWithAlternateLine{Driver: riverpgxv5.New(newSchemaPool)}, logger: riversharedtest.Logger(t), - tx: tx, } migrator, err := New(bundle.driver, &Config{Logger: bundle.logger}) @@ -168,17 +175,37 @@ func TestMigrator(t *testing.T) { migrator, _ := setup(t) + _, err := migrator.Migrate(ctx, DirectionUp, &MigrateOpts{TargetVersion: migrationsBundle.MaxVersion}) + require.NoError(t, err) + migrations, err := migrator.ExistingVersions(ctx) require.NoError(t, err) require.Equal(t, seqOneTo(migrationsBundle.MaxVersion), sliceutil.Map(migrations, migrationToInt)) }) - t.Run("ExistingMigrationsTxDefault", func(t *testing.T) { + t.Run("ExistingMigrationsEmpty", func(t *testing.T) { + t.Parallel() + + migrator, _ := setup(t) + + migrations, err := migrator.ExistingVersions(ctx) + require.NoError(t, err) + require.Equal(t, []int{}, sliceutil.Map(migrations, migrationToInt)) + }) + + t.Run("ExistingMigrationsTxDefaultLine", func(t *testing.T) { t.Parallel() migrator, bundle := setup(t) - migrations, err := migrator.ExistingVersionsTx(ctx, bundle.tx) + _, err := migrator.Migrate(ctx, DirectionUp, &MigrateOpts{TargetVersion: migrationsBundle.MaxVersion}) + require.NoError(t, err) + + tx, err := bundle.dbPool.Begin(ctx) + require.NoError(t, err) + t.Cleanup(func() { require.NoError(t, tx.Rollback(ctx)) }) + + migrations, err := migrator.ExistingVersionsTx(ctx, tx) require.NoError(t, err) require.Equal(t, seqOneTo(migrationsBundle.MaxVersion), sliceutil.Map(migrations, migrationToInt)) }) @@ -188,10 +215,11 @@ func TestMigrator(t *testing.T) { migrator, bundle := setup(t) - _, err := migrator.MigrateTx(ctx, bundle.tx, DirectionDown, &MigrateOpts{TargetVersion: -1}) + tx, err := bundle.dbPool.Begin(ctx) require.NoError(t, err) + t.Cleanup(func() { require.NoError(t, tx.Rollback(ctx)) }) - migrations, err := migrator.ExistingVersionsTx(ctx, bundle.tx) + migrations, err := migrator.ExistingVersionsTx(ctx, tx) require.NoError(t, err) require.Equal(t, []int{}, sliceutil.Map(migrations, migrationToInt)) }) @@ -199,12 +227,12 @@ func TestMigrator(t *testing.T) { t.Run("ExistingMigrationsTxFullyMigrated", func(t *testing.T) { t.Parallel() - migrator, bundle := setup(t) + migrator, _ := setup(t) - _, err := migrator.MigrateTx(ctx, bundle.tx, DirectionUp, &MigrateOpts{}) + _, err := migrator.Migrate(ctx, DirectionUp, &MigrateOpts{}) require.NoError(t, err) - migrations, err := migrator.ExistingVersionsTx(ctx, bundle.tx) + migrations, err := migrator.ExistingVersions(ctx) require.NoError(t, err) require.Equal(t, seqOneTo(migrationsBundle.WithTestVersionsMaxVersion), sliceutil.Map(migrations, migrationToInt)) }) @@ -219,19 +247,17 @@ func TestMigrator(t *testing.T) { // Run two initial times to get to the version before river_job is dropped. // Defaults to only running one step when moving in the down direction. - for i := migrationsBundle.MaxVersion; i > migrateVersionIncludingRiverJob; i-- { - res, err := migrator.MigrateTx(ctx, bundle.tx, DirectionDown, &MigrateOpts{}) - require.NoError(t, err) - require.Equal(t, DirectionDown, res.Direction) - require.Equal(t, []int{i}, sliceutil.Map(res.Versions, migrateVersionToInt)) + res, err := migrator.Migrate(ctx, DirectionUp, &MigrateOpts{MaxSteps: migrateVersionIncludingRiverJob}) + require.NoError(t, err) + require.Equal(t, DirectionUp, res.Direction) + require.Equal(t, seqOneTo(migrateVersionIncludingRiverJob), sliceutil.Map(res.Versions, migrateVersionToInt)) - err = dbExecError(ctx, bundle.driver.UnwrapExecutor(bundle.tx), "SELECT * FROM river_job") - require.NoError(t, err) - } + err = dbExecError(ctx, bundle.driver.GetExecutor(), "SELECT * FROM river_job") + require.NoError(t, err) // Run once more to go down one more step { - res, err := migrator.MigrateTx(ctx, bundle.tx, DirectionDown, &MigrateOpts{}) + res, err := migrator.Migrate(ctx, DirectionDown, &MigrateOpts{}) require.NoError(t, err) require.Equal(t, DirectionDown, res.Direction) require.Equal(t, []int{migrateVersionIncludingRiverJob}, sliceutil.Map(res.Versions, migrateVersionToInt)) @@ -239,7 +265,7 @@ func TestMigrator(t *testing.T) { version := res.Versions[0] require.Equal(t, "initial schema", version.Name) - err = dbExecError(ctx, bundle.driver.UnwrapExecutor(bundle.tx), "SELECT * FROM river_job") + err = dbExecError(ctx, bundle.driver.GetExecutor(), "SELECT * FROM river_job") require.Error(t, err) } }) @@ -247,12 +273,12 @@ func TestMigrator(t *testing.T) { t.Run("MigrateDownAfterUp", func(t *testing.T) { t.Parallel() - migrator, bundle := setup(t) + migrator, _ := setup(t) - _, err := migrator.MigrateTx(ctx, bundle.tx, DirectionUp, &MigrateOpts{}) + _, err := migrator.Migrate(ctx, DirectionUp, &MigrateOpts{}) require.NoError(t, err) - res, err := migrator.MigrateTx(ctx, bundle.tx, DirectionDown, &MigrateOpts{}) + res, err := migrator.Migrate(ctx, DirectionDown, &MigrateOpts{}) require.NoError(t, err) require.Equal(t, []int{migrationsBundle.WithTestVersionsMaxVersion}, sliceutil.Map(res.Versions, migrateVersionToInt)) }) @@ -262,20 +288,20 @@ func TestMigrator(t *testing.T) { migrator, bundle := setup(t) - _, err := migrator.MigrateTx(ctx, bundle.tx, DirectionUp, &MigrateOpts{}) + _, err := migrator.Migrate(ctx, DirectionUp, &MigrateOpts{}) require.NoError(t, err) - res, err := migrator.MigrateTx(ctx, bundle.tx, DirectionDown, &MigrateOpts{MaxSteps: 2}) + res, err := migrator.Migrate(ctx, DirectionDown, &MigrateOpts{MaxSteps: 2}) require.NoError(t, err) require.Equal(t, []int{migrationsBundle.WithTestVersionsMaxVersion, migrationsBundle.WithTestVersionsMaxVersion - 1}, sliceutil.Map(res.Versions, migrateVersionToInt)) - migrations, err := bundle.driver.UnwrapExecutor(bundle.tx).MigrationGetByLine(ctx, riverdriver.MigrationLineMain) + migrations, err := bundle.driver.GetExecutor().MigrationGetByLine(ctx, riverdriver.MigrationLineMain) require.NoError(t, err) require.Equal(t, seqOneTo(migrationsBundle.WithTestVersionsMaxVersion-2), sliceutil.Map(migrations, driverMigrationToInt)) - err = dbExecError(ctx, bundle.driver.UnwrapExecutor(bundle.tx), "SELECT name FROM test_table") + err = dbExecError(ctx, bundle.driver.GetExecutor(), "SELECT name FROM test_table") require.Error(t, err) }) @@ -284,6 +310,9 @@ func TestMigrator(t *testing.T) { migrator, bundle := setup(t) + _, err := migrator.Migrate(ctx, DirectionUp, &MigrateOpts{MaxSteps: migrationsBundle.MaxVersion}) + require.NoError(t, err) + // We don't actually migrate anything (max steps = -1) because doing so // would mess with the test database, but this still runs most code to // check that the function generally works. @@ -291,7 +320,7 @@ func TestMigrator(t *testing.T) { require.NoError(t, err) require.Equal(t, []int{}, sliceutil.Map(res.Versions, migrateVersionToInt)) - migrations, err := bundle.driver.UnwrapExecutor(bundle.tx).MigrationGetByLine(ctx, riverdriver.MigrationLineMain) + migrations, err := bundle.driver.GetExecutor().MigrationGetByLine(ctx, riverdriver.MigrationLineMain) require.NoError(t, err) require.Equal(t, seqOneTo(migrationsBundle.MaxVersion), sliceutil.Map(migrations, driverMigrationToInt)) @@ -303,7 +332,10 @@ func TestMigrator(t *testing.T) { _, bundle := setup(t) migrator, tx := setupDatabaseSQLMigrator(t, bundle) - res, err := migrator.MigrateTx(ctx, tx, DirectionDown, &MigrateOpts{MaxSteps: 1}) + _, err := migrator.Migrate(ctx, DirectionUp, &MigrateOpts{MaxSteps: migrationsBundle.MaxVersion}) + require.NoError(t, err) + + res, err := migrator.Migrate(ctx, DirectionDown, &MigrateOpts{MaxSteps: 1}) require.NoError(t, err) require.Equal(t, []int{migrationsBundle.MaxVersion}, sliceutil.Map(res.Versions, migrateVersionToInt)) @@ -318,20 +350,20 @@ func TestMigrator(t *testing.T) { migrator, bundle := setup(t) - _, err := migrator.MigrateTx(ctx, bundle.tx, DirectionUp, &MigrateOpts{}) + _, err := migrator.Migrate(ctx, DirectionUp, &MigrateOpts{}) require.NoError(t, err) - res, err := migrator.MigrateTx(ctx, bundle.tx, DirectionDown, &MigrateOpts{TargetVersion: 4}) + res, err := migrator.Migrate(ctx, DirectionDown, &MigrateOpts{TargetVersion: 4}) require.NoError(t, err) require.Equal(t, seqDownTo(migrationsBundle.WithTestVersionsMaxVersion, 5), sliceutil.Map(res.Versions, migrateVersionToInt)) - migrations, err := bundle.driver.UnwrapExecutor(bundle.tx).MigrationGetAllAssumingMain(ctx) + migrations, err := bundle.driver.GetExecutor().MigrationGetAllAssumingMain(ctx) require.NoError(t, err) require.Equal(t, seqOneTo(4), sliceutil.Map(migrations, driverMigrationToInt)) - err = dbExecError(ctx, bundle.driver.UnwrapExecutor(bundle.tx), "SELECT name FROM test_table") + err = dbExecError(ctx, bundle.driver.GetExecutor(), "SELECT name FROM test_table") require.Error(t, err) }) @@ -340,32 +372,32 @@ func TestMigrator(t *testing.T) { migrator, bundle := setup(t) - _, err := migrator.MigrateTx(ctx, bundle.tx, DirectionUp, &MigrateOpts{}) + _, err := migrator.Migrate(ctx, DirectionUp, &MigrateOpts{}) require.NoError(t, err) - res, err := migrator.MigrateTx(ctx, bundle.tx, DirectionDown, &MigrateOpts{TargetVersion: -1}) + res, err := migrator.Migrate(ctx, DirectionDown, &MigrateOpts{TargetVersion: -1}) require.NoError(t, err) require.Equal(t, seqDownTo(migrationsBundle.WithTestVersionsMaxVersion, 1), sliceutil.Map(res.Versions, migrateVersionToInt)) - err = dbExecError(ctx, bundle.driver.UnwrapExecutor(bundle.tx), "SELECT name FROM river_migrate") + err = dbExecError(ctx, bundle.driver.GetExecutor(), "SELECT name FROM river_migrate") require.Error(t, err) }) t.Run("MigrateDownWithTargetVersionInvalid", func(t *testing.T) { t.Parallel() - migrator, bundle := setup(t) + migrator, _ := setup(t) // migration doesn't exist { - _, err := migrator.MigrateTx(ctx, bundle.tx, DirectionDown, &MigrateOpts{TargetVersion: migrationsBundle.MaxVersion + 77}) + _, err := migrator.Migrate(ctx, DirectionDown, &MigrateOpts{TargetVersion: migrationsBundle.MaxVersion + 77}) require.EqualError(t, err, fmt.Sprintf("version %d is not a valid River migration version", migrationsBundle.MaxVersion+77)) } // migration exists but not one that's applied { - _, err := migrator.MigrateTx(ctx, bundle.tx, DirectionDown, &MigrateOpts{TargetVersion: migrationsBundle.MaxVersion + 1}) + _, err := migrator.Migrate(ctx, DirectionDown, &MigrateOpts{TargetVersion: migrationsBundle.MaxVersion + 1}) require.EqualError(t, err, fmt.Sprintf("version %d is not in target list of valid migrations to apply", migrationsBundle.MaxVersion+1)) } }) @@ -375,17 +407,17 @@ func TestMigrator(t *testing.T) { migrator, bundle := setup(t) - _, err := migrator.MigrateTx(ctx, bundle.tx, DirectionUp, &MigrateOpts{}) + _, err := migrator.Migrate(ctx, DirectionUp, &MigrateOpts{}) require.NoError(t, err) - res, err := migrator.MigrateTx(ctx, bundle.tx, DirectionDown, &MigrateOpts{DryRun: true}) + res, err := migrator.Migrate(ctx, DirectionDown, &MigrateOpts{DryRun: true}) require.NoError(t, err) require.Equal(t, []int{migrationsBundle.WithTestVersionsMaxVersion}, sliceutil.Map(res.Versions, migrateVersionToInt)) // Migrate down returned a result above for a migration that was // removed, but because we're in a dry run, the database still shows // this version. - migrations, err := bundle.driver.UnwrapExecutor(bundle.tx).MigrationGetByLine(ctx, riverdriver.MigrationLineMain) + migrations, err := bundle.driver.GetExecutor().MigrationGetByLine(ctx, riverdriver.MigrationLineMain) require.NoError(t, err) require.Equal(t, seqOneTo(migrationsBundle.WithTestVersionsMaxVersion), sliceutil.Map(migrations, driverMigrationToInt)) @@ -412,9 +444,12 @@ func TestMigrator(t *testing.T) { t.Run("MigrateNilOpts", func(t *testing.T) { t.Parallel() - migrator, bundle := setup(t) + migrator, _ := setup(t) - res, err := migrator.MigrateTx(ctx, bundle.tx, DirectionUp, nil) + _, err := migrator.Migrate(ctx, DirectionUp, &MigrateOpts{MaxSteps: migrationsBundle.MaxVersion}) + require.NoError(t, err) + + res, err := migrator.Migrate(ctx, DirectionUp, nil) require.NoError(t, err) require.Equal(t, []int{migrationsBundle.MaxVersion + 1, migrationsBundle.MaxVersion + 2}, sliceutil.Map(res.Versions, migrateVersionToInt)) }) @@ -426,34 +461,34 @@ func TestMigrator(t *testing.T) { // Run an initial time { - res, err := migrator.MigrateTx(ctx, bundle.tx, DirectionUp, &MigrateOpts{}) + res, err := migrator.Migrate(ctx, DirectionUp, &MigrateOpts{}) require.NoError(t, err) require.Equal(t, DirectionUp, res.Direction) - require.Equal(t, []int{migrationsBundle.WithTestVersionsMaxVersion - 1, migrationsBundle.WithTestVersionsMaxVersion}, + require.Equal(t, seqOneTo(migrationsBundle.WithTestVersionsMaxVersion), sliceutil.Map(res.Versions, migrateVersionToInt)) - migrations, err := bundle.driver.UnwrapExecutor(bundle.tx).MigrationGetByLine(ctx, riverdriver.MigrationLineMain) + migrations, err := bundle.driver.GetExecutor().MigrationGetByLine(ctx, riverdriver.MigrationLineMain) require.NoError(t, err) require.Equal(t, seqOneTo(migrationsBundle.WithTestVersionsMaxVersion), sliceutil.Map(migrations, driverMigrationToInt)) - _, err = bundle.tx.Exec(ctx, "SELECT * FROM test_table") + _, err = bundle.dbPool.Exec(ctx, "SELECT * FROM test_table") require.NoError(t, err) } // Run once more to verify idempotency { - res, err := migrator.MigrateTx(ctx, bundle.tx, DirectionUp, &MigrateOpts{}) + res, err := migrator.Migrate(ctx, DirectionUp, &MigrateOpts{}) require.NoError(t, err) require.Equal(t, DirectionUp, res.Direction) require.Equal(t, []int{}, sliceutil.Map(res.Versions, migrateVersionToInt)) - migrations, err := bundle.driver.UnwrapExecutor(bundle.tx).MigrationGetByLine(ctx, riverdriver.MigrationLineMain) + migrations, err := bundle.driver.GetExecutor().MigrationGetByLine(ctx, riverdriver.MigrationLineMain) require.NoError(t, err) require.Equal(t, seqOneTo(migrationsBundle.WithTestVersionsMaxVersion), sliceutil.Map(migrations, driverMigrationToInt)) - _, err = bundle.tx.Exec(ctx, "SELECT * FROM test_table") + _, err = bundle.dbPool.Exec(ctx, "SELECT * FROM test_table") require.NoError(t, err) } }) @@ -463,18 +498,21 @@ func TestMigrator(t *testing.T) { migrator, bundle := setup(t) - res, err := migrator.MigrateTx(ctx, bundle.tx, DirectionUp, &MigrateOpts{MaxSteps: 1}) + _, err := migrator.Migrate(ctx, DirectionUp, &MigrateOpts{MaxSteps: migrationsBundle.MaxVersion}) + require.NoError(t, err) + + res, err := migrator.Migrate(ctx, DirectionUp, &MigrateOpts{MaxSteps: 1}) require.NoError(t, err) require.Equal(t, []int{migrationsBundle.WithTestVersionsMaxVersion - 1}, sliceutil.Map(res.Versions, migrateVersionToInt)) - migrations, err := bundle.driver.UnwrapExecutor(bundle.tx).MigrationGetByLine(ctx, riverdriver.MigrationLineMain) + migrations, err := bundle.driver.GetExecutor().MigrationGetByLine(ctx, riverdriver.MigrationLineMain) require.NoError(t, err) require.Equal(t, seqOneTo(migrationsBundle.WithTestVersionsMaxVersion-1), sliceutil.Map(migrations, driverMigrationToInt)) // Column `name` is only added in the second test version. - err = dbExecError(ctx, bundle.driver.UnwrapExecutor(bundle.tx), "SELECT name FROM test_table") + err = dbExecError(ctx, bundle.driver.GetExecutor(), "SELECT name FROM test_table") require.Error(t, err) var pgErr *pgconn.PgError @@ -487,14 +525,11 @@ func TestMigrator(t *testing.T) { migrator, bundle := setup(t) - // We don't actually migrate anything (max steps = -1) because doing so - // would mess with the test database, but this still runs most code to - // check that the function generally works. - res, err := migrator.Migrate(ctx, DirectionUp, &MigrateOpts{MaxSteps: -1}) + res, err := migrator.Migrate(ctx, DirectionUp, &MigrateOpts{MaxSteps: migrationsBundle.MaxVersion}) require.NoError(t, err) - require.Equal(t, []int{}, sliceutil.Map(res.Versions, migrateVersionToInt)) + require.Equal(t, seqOneTo(migrationsBundle.MaxVersion), sliceutil.Map(res.Versions, migrateVersionToInt)) - migrations, err := bundle.driver.UnwrapExecutor(bundle.tx).MigrationGetByLine(ctx, riverdriver.MigrationLineMain) + migrations, err := bundle.driver.GetExecutor().MigrationGetByLine(ctx, riverdriver.MigrationLineMain) require.NoError(t, err) require.Equal(t, seqOneTo(migrationsBundle.MaxVersion), sliceutil.Map(migrations, driverMigrationToInt)) @@ -506,7 +541,10 @@ func TestMigrator(t *testing.T) { _, bundle := setup(t) migrator, tx := setupDatabaseSQLMigrator(t, bundle) - res, err := migrator.MigrateTx(ctx, tx, DirectionUp, &MigrateOpts{MaxSteps: 1}) + _, err := migrator.Migrate(ctx, DirectionUp, &MigrateOpts{MaxSteps: migrationsBundle.MaxVersion}) + require.NoError(t, err) + + res, err := migrator.Migrate(ctx, DirectionUp, &MigrateOpts{MaxSteps: 1}) require.NoError(t, err) require.Equal(t, []int{migrationsBundle.MaxVersion + 1}, sliceutil.Map(res.Versions, migrateVersionToInt)) @@ -521,12 +559,12 @@ func TestMigrator(t *testing.T) { migrator, bundle := setup(t) - res, err := migrator.MigrateTx(ctx, bundle.tx, DirectionUp, &MigrateOpts{TargetVersion: migrationsBundle.MaxVersion + 2}) + res, err := migrator.Migrate(ctx, DirectionUp, &MigrateOpts{TargetVersion: migrationsBundle.MaxVersion + 2}) require.NoError(t, err) - require.Equal(t, []int{migrationsBundle.MaxVersion + 1, migrationsBundle.MaxVersion + 2}, + require.Equal(t, seqOneTo(migrationsBundle.WithTestVersionsMaxVersion), sliceutil.Map(res.Versions, migrateVersionToInt)) - migrations, err := bundle.driver.UnwrapExecutor(bundle.tx).MigrationGetByLine(ctx, riverdriver.MigrationLineMain) + migrations, err := bundle.driver.GetExecutor().MigrationGetByLine(ctx, riverdriver.MigrationLineMain) require.NoError(t, err) require.Equal(t, seqOneTo(migrationsBundle.MaxVersion+2), sliceutil.Map(migrations, driverMigrationToInt)) }) @@ -534,17 +572,20 @@ func TestMigrator(t *testing.T) { t.Run("MigrateUpWithTargetVersionInvalid", func(t *testing.T) { t.Parallel() - migrator, bundle := setup(t) + migrator, _ := setup(t) + + _, err := migrator.Migrate(ctx, DirectionUp, &MigrateOpts{}) + require.NoError(t, err) // migration doesn't exist { - _, err := migrator.MigrateTx(ctx, bundle.tx, DirectionUp, &MigrateOpts{TargetVersion: 77}) + _, err := migrator.Migrate(ctx, DirectionUp, &MigrateOpts{TargetVersion: 77}) require.EqualError(t, err, "version 77 is not a valid River migration version") } // migration exists but already applied { - _, err := migrator.MigrateTx(ctx, bundle.tx, DirectionUp, &MigrateOpts{TargetVersion: 3}) + _, err := migrator.Migrate(ctx, DirectionUp, &MigrateOpts{TargetVersion: 3}) require.EqualError(t, err, "version 3 is not in target list of valid migrations to apply") } }) @@ -554,7 +595,10 @@ func TestMigrator(t *testing.T) { migrator, bundle := setup(t) - res, err := migrator.MigrateTx(ctx, bundle.tx, DirectionUp, &MigrateOpts{DryRun: true}) + _, err := migrator.Migrate(ctx, DirectionUp, &MigrateOpts{MaxSteps: migrationsBundle.MaxVersion}) + require.NoError(t, err) + + res, err := migrator.Migrate(ctx, DirectionUp, &MigrateOpts{DryRun: true}) require.NoError(t, err) require.Equal(t, DirectionUp, res.Direction) require.Equal(t, []int{migrationsBundle.WithTestVersionsMaxVersion - 1, migrationsBundle.WithTestVersionsMaxVersion}, @@ -563,7 +607,7 @@ func TestMigrator(t *testing.T) { // Migrate up returned a result above for migrations that were applied, // but because we're in a dry run, the database still shows the test // migration versions not applied. - migrations, err := bundle.driver.UnwrapExecutor(bundle.tx).MigrationGetByLine(ctx, riverdriver.MigrationLineMain) + migrations, err := bundle.driver.GetExecutor().MigrationGetByLine(ctx, riverdriver.MigrationLineMain) require.NoError(t, err) require.Equal(t, seqOneTo(migrationsBundle.MaxVersion), sliceutil.Map(migrations, driverMigrationToInt)) @@ -572,13 +616,13 @@ func TestMigrator(t *testing.T) { t.Run("ValidateSuccess", func(t *testing.T) { t.Parallel() - migrator, bundle := setup(t) + migrator, _ := setup(t) // Migrate all the way up. - _, err := migrator.MigrateTx(ctx, bundle.tx, DirectionUp, &MigrateOpts{}) + _, err := migrator.Migrate(ctx, DirectionUp, &MigrateOpts{}) require.NoError(t, err) - res, err := migrator.ValidateTx(ctx, bundle.tx) + res, err := migrator.Validate(ctx) require.NoError(t, err) require.Equal(t, &ValidateResult{OK: true}, res) }) @@ -586,41 +630,52 @@ func TestMigrator(t *testing.T) { t.Run("ValidateUnappliedMigrations", func(t *testing.T) { t.Parallel() - migrator, bundle := setup(t) + migrator, _ := setup(t) + + _, err := migrator.Migrate(ctx, DirectionUp, &MigrateOpts{MaxSteps: migrationsBundle.MaxVersion}) + require.NoError(t, err) - res, err := migrator.ValidateTx(ctx, bundle.tx) + res, err := migrator.Validate(ctx) require.NoError(t, err) require.Equal(t, &ValidateResult{ Messages: []string{fmt.Sprintf("Unapplied migrations: [%d %d]", migrationsBundle.MaxVersion+1, migrationsBundle.MaxVersion+2)}, }, res) }) - t.Run("MigrateDownToZeroAndBackUp", func(t *testing.T) { + t.Run("MigrateUpThenDownToZeroAndBackUp", func(t *testing.T) { t.Parallel() migrator, bundle := setup(t) requireMigrationTableExists := func(expectedExists bool) { - migrationExists, err := bundle.driver.UnwrapExecutor(bundle.tx).TableExists(ctx, "river_migration") + migrationExists, err := bundle.driver.GetExecutor().TableExists(ctx, "river_migration") require.NoError(t, err) require.Equal(t, expectedExists, migrationExists) } + // We start off with a clean schema so it has no tables: + requireMigrationTableExists(false) + + res, err := migrator.Migrate(ctx, DirectionUp, &MigrateOpts{TargetVersion: migrationsBundle.MaxVersion}) + require.NoError(t, err) + require.Equal(t, seqOneTo(migrationsBundle.MaxVersion), + sliceutil.Map(res.Versions, migrateVersionToInt)) + requireMigrationTableExists(true) - res, err := migrator.MigrateTx(ctx, bundle.tx, DirectionDown, &MigrateOpts{TargetVersion: -1}) + res, err = migrator.Migrate(ctx, DirectionDown, &MigrateOpts{TargetVersion: -1}) require.NoError(t, err) require.Equal(t, seqDownTo(migrationsBundle.MaxVersion, 1), sliceutil.Map(res.Versions, migrateVersionToInt)) requireMigrationTableExists(false) - res, err = migrator.MigrateTx(ctx, bundle.tx, DirectionUp, &MigrateOpts{}) + res, err = migrator.Migrate(ctx, DirectionUp, &MigrateOpts{}) require.NoError(t, err) require.Equal(t, seqOneTo(migrationsBundle.WithTestVersionsMaxVersion), sliceutil.Map(res.Versions, migrateVersionToInt)) - migrations, err := bundle.driver.UnwrapExecutor(bundle.tx).MigrationGetByLine(ctx, riverdriver.MigrationLineMain) + migrations, err := bundle.driver.GetExecutor().MigrationGetByLine(ctx, riverdriver.MigrationLineMain) require.NoError(t, err) require.Equal(t, seqOneTo(migrationsBundle.WithTestVersionsMaxVersion), sliceutil.Map(migrations, driverMigrationToInt)) @@ -629,7 +684,11 @@ func TestMigrator(t *testing.T) { t.Run("AlternateLineUpAndDown", func(t *testing.T) { t.Parallel() - _, bundle := setup(t) + migrator, bundle := setup(t) + + // Run the main migration line all the way up. + _, err := migrator.Migrate(ctx, DirectionUp, &MigrateOpts{MaxSteps: migrationsBundle.MaxVersion}) + require.NoError(t, err) // We have to reinitialize the alternateMigrator because the migrations bundle is // set in the constructor. @@ -639,23 +698,23 @@ func TestMigrator(t *testing.T) { }) require.NoError(t, err) - res, err := alternateMigrator.MigrateTx(ctx, bundle.tx, DirectionUp, &MigrateOpts{}) + res, err := alternateMigrator.Migrate(ctx, DirectionUp, &MigrateOpts{}) require.NoError(t, err) require.Equal(t, seqOneTo(migrationLineAlternateMaxVersion), sliceutil.Map(res.Versions, migrateVersionToInt)) - migrations, err := bundle.driver.UnwrapExecutor(bundle.tx).MigrationGetByLine(ctx, migrationLineAlternate) + migrations, err := bundle.driver.GetExecutor().MigrationGetByLine(ctx, migrationLineAlternate) require.NoError(t, err) require.Equal(t, seqOneTo(migrationLineAlternateMaxVersion), sliceutil.Map(migrations, driverMigrationToInt)) - res, err = alternateMigrator.MigrateTx(ctx, bundle.tx, DirectionDown, &MigrateOpts{TargetVersion: -1}) + res, err = alternateMigrator.Migrate(ctx, DirectionDown, &MigrateOpts{TargetVersion: -1}) require.NoError(t, err) require.Equal(t, seqDownTo(migrationLineAlternateMaxVersion, 1), sliceutil.Map(res.Versions, migrateVersionToInt)) // The main migration line should not have been touched. - migrations, err = bundle.driver.UnwrapExecutor(bundle.tx).MigrationGetByLine(ctx, riverdriver.MigrationLineMain) + migrations, err = bundle.driver.GetExecutor().MigrationGetByLine(ctx, riverdriver.MigrationLineMain) require.NoError(t, err) require.Equal(t, seqOneTo(migrationsBundle.MaxVersion), sliceutil.Map(migrations, driverMigrationToInt)) @@ -667,7 +726,7 @@ func TestMigrator(t *testing.T) { migrator, bundle := setup(t) // Main line to just before the `line` column was added. - _, err := migrator.MigrateTx(ctx, bundle.tx, DirectionDown, &MigrateOpts{TargetVersion: 4}) + _, err := migrator.Migrate(ctx, DirectionUp, &MigrateOpts{TargetVersion: 4}) require.NoError(t, err) alternateMigrator, err := New(bundle.driver, &Config{ @@ -677,58 +736,55 @@ func TestMigrator(t *testing.T) { require.NoError(t, err) // Alternate line not allowed because `river_job.line` doesn't exist. - _, err = alternateMigrator.MigrateTx(ctx, bundle.tx, DirectionUp, &MigrateOpts{}) + _, err = alternateMigrator.Migrate(ctx, DirectionUp, &MigrateOpts{}) require.EqualError(t, err, "can't add a non-main migration line until `river_migration.line` is raised; fully migrate the main migration line and try again") // Main line to zero. - _, err = migrator.MigrateTx(ctx, bundle.tx, DirectionDown, &MigrateOpts{TargetVersion: -1}) + _, err = migrator.Migrate(ctx, DirectionDown, &MigrateOpts{TargetVersion: -1}) require.NoError(t, err) // Alternate line not allowed because `river_job` doesn't exist. - _, err = alternateMigrator.MigrateTx(ctx, bundle.tx, DirectionUp, &MigrateOpts{}) + _, err = alternateMigrator.Migrate(ctx, DirectionUp, &MigrateOpts{}) require.EqualError(t, err, "can't add a non-main migration line until `river_migration` is raised; fully migrate the main migration line and try again") }) // Demonstrates that even when not using River's internal migration system, // version 005 is still able to run. + // + // This is special because it's the first time the table's changed since + // version 001. t.Run("Version005ToleratesRiverMigrateNotPresent", func(t *testing.T) { t.Parallel() migrator, bundle := setup(t) - // The migration version in which `line` is added to `river_migration`. - // This is special because it's the first time the table's changed since - // version 001. - const migrateVersionTarget = 5 - // Migrate down to version 004. - for i := migrationsBundle.MaxVersion; i > migrateVersionTarget-1; i-- { - res, err := migrator.MigrateTx(ctx, bundle.tx, DirectionDown, &MigrateOpts{}) - require.NoError(t, err) - require.Equal(t, DirectionDown, res.Direction) - require.Equal(t, []int{i}, sliceutil.Map(res.Versions, migrateVersionToInt)) - } + res, err := migrator.Migrate(ctx, DirectionUp, &MigrateOpts{MaxSteps: 4}) + require.NoError(t, err) + require.Equal(t, DirectionUp, res.Direction) + require.Equal(t, seqOneTo(4), sliceutil.Map(res.Versions, migrateVersionToInt)) // Drop `river_migration` table as if version 001 had never originally run. - _, err := bundle.tx.Exec(ctx, "DROP TABLE river_migration") + _, err = bundle.dbPool.Exec(ctx, "DROP TABLE river_migration") require.NoError(t, err) // Run version 005 to make sure it can tolerate the absence of // `river_migration`. Note that we have to run the version's SQL // directly because using the migrator will try to interact with // `river_migration`, which is no longer present. - _, err = bundle.tx.Exec(ctx, migrationsBundle.WithTestVersionsMap[5].SQLUp) + _, err = bundle.dbPool.Exec(ctx, migrationsBundle.WithTestVersionsMap[5].SQLUp) require.NoError(t, err) // And the version 005 down migration to verify the same. - _, err = bundle.tx.Exec(ctx, migrationsBundle.WithTestVersionsMap[5].SQLDown) + _, err = bundle.dbPool.Exec(ctx, migrationsBundle.WithTestVersionsMap[5].SQLDown) require.NoError(t, err) }) t.Run("MigrationsWithCommitRequired", func(t *testing.T) { t.Parallel() - _, bundle := setup(t) + migrator, bundle := setup(t) + t.Cleanup(func() { tx, err := bundle.dbPool.Begin(ctx) require.NoError(t, err) @@ -747,6 +803,9 @@ func TestMigrator(t *testing.T) { require.NoError(t, tx.Commit(ctx)) }) + _, err := migrator.Migrate(ctx, DirectionUp, &MigrateOpts{}) + require.NoError(t, err) + // We have to reinitialize the commitRequiredMigrator because the migrations // bundle is set in the constructor. commitRequiredMigrator, err := New(bundle.driver, &Config{ diff --git a/rivershared/util/randutil/rand_util.go b/rivershared/util/randutil/rand_util.go index 5f4e9836..7ef934a2 100644 --- a/rivershared/util/randutil/rand_util.go +++ b/rivershared/util/randutil/rand_util.go @@ -1,8 +1,10 @@ package randutil import ( + "crypto/rand" cryptorand "crypto/rand" "encoding/binary" + "encoding/hex" mathrand "math/rand" "sync" "time" @@ -28,6 +30,14 @@ func DurationBetween(rand *mathrand.Rand, lowerLimit, upperLimit time.Duration) return time.Duration(IntBetween(rand, int(lowerLimit), int(upperLimit))) } +func Hex(length int) string { + bytes := make([]byte, length) + if _, err := rand.Read(bytes); err != nil { + panic(err) + } + return hex.EncodeToString(bytes) +} + // IntBetween generates a random number in the range of [lowerLimit, upperLimit). // // TODO: When we drop Go 1.21 support, switch to `math/rand/v2` and kill the