Skip to content

Commit

Permalink
Merge pull request #261 from cschleiden/migrations
Browse files Browse the repository at this point in the history
Use golang-migrate for schema migrations for SQL backends
  • Loading branch information
cschleiden authored Nov 3, 2023
2 parents ec972f2 + c6961f7 commit 1a94c1f
Show file tree
Hide file tree
Showing 18 changed files with 271 additions and 56 deletions.
6 changes: 5 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -125,7 +125,7 @@ For all backends, for now the initial schema is applied upon first usage. In the

#### Sqlite

The Sqlite backend implementation supports two different modes, in-memory and on-disk.
The Sqlite backend implementation supports two different modes, in-memory and on-disk:

* In-memory:
```go
Expand All @@ -136,12 +136,16 @@ The Sqlite backend implementation supports two different modes, in-memory and on
b := sqlite.NewSqliteBackend("simple.sqlite")
```

By default the schema is automatically created/migrations are automatically applied. Use `WithApplyMigrations(false)` to disable this behavior.

#### MySql

```go
b := mysql.NewMysqlBackend("localhost", 3306, "root", "SqlPassw0rd", "simple")
```

By default the schema is automatically created/migrations are automatically applied. Use `WithApplyMigrations(false)` to disable this behavior.

#### Redis

```go
Expand Down
5 changes: 3 additions & 2 deletions backend/monoprocess/monoprocess_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ func Test_MonoprocessBackend(t *testing.T) {
// Disable sticky workflow behavior for the test execution
options = append(options, backend.WithStickyTimeout(0))

return NewMonoprocessBackend(sqlite.NewInMemoryBackend(options...))
return NewMonoprocessBackend(sqlite.NewInMemoryBackend(sqlite.WithBackendOptions(options...)))
}, nil)
}

Expand All @@ -33,7 +33,7 @@ func Test_EndToEndMonoprocessBackend(t *testing.T) {
// Disable sticky workflow behavior for the test execution
options = append(options, backend.WithStickyTimeout(0))

return NewMonoprocessBackend(sqlite.NewInMemoryBackend(options...))
return NewMonoprocessBackend(sqlite.NewInMemoryBackend(sqlite.WithBackendOptions(options...)))
}, nil)
}

Expand All @@ -43,5 +43,6 @@ func (b *monoprocessBackend) GetFutureEvents(ctx context.Context) ([]*history.Ev
if testBackend, ok := b.Backend.(test.TestBackend); ok {
return testBackend.GetFutureEvents(ctx)
}

return nil, errors.New("not implemented")
}
8 changes: 8 additions & 0 deletions backend/mysql/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
# Sqlite backend

## Adding a migration

1. Install [golang-migrate/migrate](https://www.github.com/golang-migrate/migrate)
1. ```bash
migrate create -ext sql -dir ./db/migrations -seq <name>
```
4 changes: 4 additions & 0 deletions backend/mysql/db/migrations/000001_initial.down.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
DROP TABLE IF EXISTS `instances`;
DROP TABLE IF EXISTS `pending_events`;
DROP TABLE IF EXISTS `history`;
DROP TABLE IF EXISTS `activities`;
File renamed without changes.
82 changes: 64 additions & 18 deletions backend/mysql/mysql.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package mysql
import (
"context"
"database/sql"
"embed"
_ "embed"
"encoding/json"
"errors"
Expand All @@ -22,44 +23,89 @@ import (
_ "github.com/go-sql-driver/mysql"
"github.com/google/uuid"
"go.opentelemetry.io/otel/trace"
)

//go:embed schema.sql
var schema string
"github.com/golang-migrate/migrate/v4"
"github.com/golang-migrate/migrate/v4/database/mysql"
"github.com/golang-migrate/migrate/v4/source/iofs"
)

func NewMysqlBackend(host string, port int, user, password, database string, opts ...backend.BackendOption) *mysqlBackend {
dsn := fmt.Sprintf("%s:%s@tcp(%s:%d)/%s?parseTime=true&interpolateParams=true", user, password, host, port, database)
//go:embed db/migrations/*.sql
var migrationsFS embed.FS

schemaDsn := dsn + "&multiStatements=true"
db, err := sql.Open("mysql", schemaDsn)
if err != nil {
panic(err)
func NewMysqlBackend(host string, port int, user, password, database string, opts ...option) *mysqlBackend {
options := &options{
Options: backend.ApplyOptions(),
ApplyMigrations: true,
}

if _, err := db.Exec(schema); err != nil {
panic(fmt.Errorf("initializing database: %w", err))
for _, opt := range opts {
opt(options)
}

if err := db.Close(); err != nil {
panic(err)
}
dsn := fmt.Sprintf("%s:%s@tcp(%s:%d)/%s?parseTime=true&interpolateParams=true", user, password, host, port, database)

db, err = sql.Open("mysql", dsn)
db, err := sql.Open("mysql", dsn)
if err != nil {
panic(err)
}

return &mysqlBackend{
b := &mysqlBackend{
dsn: dsn,
db: db,
workerName: fmt.Sprintf("worker-%v", uuid.NewString()),
options: backend.ApplyOptions(opts...),
options: options,
}

if options.ApplyMigrations {
if err := b.Migrate(); err != nil {
panic(err)
}
}

return b
}

type mysqlBackend struct {
dsn string
db *sql.DB
workerName string
options backend.Options
options *options
}

// Migrate applies any pending database migrations.
func (sb *mysqlBackend) Migrate() error {
schemaDsn := sb.dsn + "&multiStatements=true"
db, err := sql.Open("mysql", schemaDsn)
if err != nil {
return fmt.Errorf("opening schema database: %w", err)
}

dbi, err := mysql.WithInstance(db, &mysql.Config{})
if err != nil {
return fmt.Errorf("creating migration instance: %w", err)
}

migrations, err := iofs.New(migrationsFS, "db/migrations")
if err != nil {
return fmt.Errorf("creating migration source: %w", err)
}

m, err := migrate.NewWithInstance("iofs", migrations, "mysql", dbi)
if err != nil {
return fmt.Errorf("creating migration: %w", err)
}

if err := m.Up(); err != nil {
if !errors.Is(err, migrate.ErrNoChange) {
return fmt.Errorf("running migrations: %w", err)
}
}

if err := db.Close(); err != nil {
return fmt.Errorf("closing schema database: %w", err)
}

return nil
}

func (b *mysqlBackend) Logger() *slog.Logger {
Expand Down
12 changes: 10 additions & 2 deletions backend/mysql/mysql_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,8 +43,12 @@ func Test_MysqlBackend(t *testing.T) {

options = append(options, backend.WithStickyTimeout(0))

return NewMysqlBackend("localhost", 3306, testUser, testPassword, dbName, options...)
return NewMysqlBackend("localhost", 3306, testUser, testPassword, dbName, WithBackendOptions(options...))
}, func(b test.TestBackend) {
if err := b.(*mysqlBackend).db.Close(); err != nil {
panic(err)
}

db, err := sql.Open("mysql", fmt.Sprintf("%s:%s@/?parseTime=true&interpolateParams=true", testUser, testPassword))
if err != nil {
panic(err)
Expand Down Expand Up @@ -84,8 +88,12 @@ func TestMySqlBackendE2E(t *testing.T) {

options = append(options, backend.WithStickyTimeout(0))

return NewMysqlBackend("localhost", 3306, testUser, testPassword, dbName, options...)
return NewMysqlBackend("localhost", 3306, testUser, testPassword, dbName, WithBackendOptions(options...))
}, func(b test.TestBackend) {
if err := b.(*mysqlBackend).db.Close(); err != nil {
panic(err)
}

db, err := sql.Open("mysql", fmt.Sprintf("%s:%s@/?parseTime=true&interpolateParams=true", testUser, testPassword))
if err != nil {
panic(err)
Expand Down
30 changes: 30 additions & 0 deletions backend/mysql/options.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
package mysql

import (
"github.com/cschleiden/go-workflows/backend"
)

type options struct {
backend.Options

// ApplyMigrations automatically applies database migrations on startup.
ApplyMigrations bool
}

type option func(*options)

// WithApplyMigrations automatically applies database migrations on startup.
func WithApplyMigrations(applyMigrations bool) option {
return func(o *options) {
o.ApplyMigrations = applyMigrations
}
}

// WithBackendOptions allows to pass generic backend options.
func WithBackendOptions(opts ...backend.BackendOption) option {
return func(o *options) {
for _, opt := range opts {
opt(&o.Options)
}
}
}
8 changes: 8 additions & 0 deletions backend/sqlite/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
# Sqlite backend

## Adding a migration

1. Install [golang-migrate/migrate](https://www.github.com/golang-migrate/migrate)
1. ```bash
migrate create -ext sql -dir ./db/migrations -seq <name>
```
4 changes: 4 additions & 0 deletions backend/sqlite/db/migrations/000001_initial.down.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
DROP TABLE IF EXISTS `instances`;
DROP TABLE IF EXISTS `pending_events`;
DROP TABLE IF EXISTS `history`;
DROP TABLE IF EXISTS `activities`;
Original file line number Diff line number Diff line change
Expand Up @@ -59,4 +59,9 @@ CREATE TABLE IF NOT EXISTS `activities` (
`visible_at` DATETIME NULL,
`locked_until` DATETIME NULL,
`worker` TEXT NULL
);
);


CREATE INDEX IF NOT EXISTS `idx_activities_id_worker` ON `activities` (`id`, `worker`);
CREATE INDEX IF NOT EXISTS `idx_activities_locked_until` ON `activities` (`locked_until`);
CREATE INDEX IF NOT EXISTS `idx_activities_instance_id_execution_id_worker` ON `activities` (`instance_id`, `execution_id`, `worker`);
30 changes: 30 additions & 0 deletions backend/sqlite/options.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
package sqlite

import (
"github.com/cschleiden/go-workflows/backend"
)

type options struct {
backend.Options

// ApplyMigrations automatically applies database migrations on startup.
ApplyMigrations bool
}

type option func(*options)

// WithApplyMigrations automatically applies database migrations on startup.
func WithApplyMigrations(applyMigrations bool) option {
return func(o *options) {
o.ApplyMigrations = applyMigrations
}
}

// WithBackendOptions allows to pass generic backend options.
func WithBackendOptions(opts ...backend.BackendOption) option {
return func(o *options) {
for _, opt := range opts {
opt(&o.Options)
}
}
}
Loading

0 comments on commit 1a94c1f

Please sign in to comment.