diff --git a/backend/postgres/db/migrations/000001_initial.down.sql b/backend/postgres/db/migrations/000001_initial.down.sql new file mode 100644 index 00000000..55d8f419 --- /dev/null +++ b/backend/postgres/db/migrations/000001_initial.down.sql @@ -0,0 +1,4 @@ +DROP TABLE IF EXISTS instances; +DROP TABLE IF EXISTS pending_events; +DROP TABLE IF EXISTS history; +DROP TABLE IF EXISTS activities; \ No newline at end of file diff --git a/backend/postgres/db/migrations/000001_initial.up.sql b/backend/postgres/db/migrations/000001_initial.up.sql new file mode 100644 index 00000000..9f0ee323 --- /dev/null +++ b/backend/postgres/db/migrations/000001_initial.up.sql @@ -0,0 +1,73 @@ +DROP TABLE IF EXISTS instances; + +CREATE TABLE instances ( + id BIGSERIAL NOT NULL PRIMARY KEY, + instance_id UUID NOT NULL, + execution_id UUID NOT NULL, + parent_instance_id UUID NULL, + parent_execution_id UUID NULL, + parent_schedule_event_id NUMERIC NULL, + metadata BYTEA NULL, + state INT NOT NULL, + created_at timestamptz NOT NULL DEFAULT CURRENT_TIMESTAMP, + completed_at timestamptz NULL, + locked_until timestamptz NULL, + sticky_until timestamptz NULL, + worker VARCHAR(64) NULL +); + +CREATE UNIQUE INDEX idx_instances_instance_id_execution_id on instances (instance_id, execution_id); +CREATE INDEX idx_instances_locked_until_completed_at on instances (completed_at, locked_until, sticky_until, worker); +CREATE INDEX idx_instances_parent_instance_id_parent_execution_id ON instances (parent_instance_id, parent_execution_id); + +DROP TABLE IF EXISTS pending_events; +CREATE TABLE pending_events ( + id BIGSERIAL NOT NULL PRIMARY KEY, + event_id UUID NOT NULL, + sequence_id BIGSERIAL NOT NULL, -- Not used, but keep for now for query compat + instance_id UUID NOT NULL, + execution_id UUID NOT NULL, + event_type INT NOT NULL, + timestamp timestamptz NOT NULL, + schedule_event_id BIGSERIAL NOT NULL, + attributes BYTEA NOT NULL, + visible_at timestamptz NULL +); + +CREATE INDEX idx_pending_events_inid_exid ON pending_events (instance_id, execution_id); +CREATE INDEX idx_pending_events_inid_exid_visible_at_schedule_event_id ON pending_events (instance_id, execution_id, visible_at, schedule_event_id); + +DROP TABLE IF EXISTS history; +CREATE TABLE IF NOT EXISTS history ( + id BIGSERIAL NOT NULL PRIMARY KEY, + event_id UUID NOT NULL, + sequence_id BIGSERIAL NOT NULL, + instance_id UUID NOT NULL, + execution_id UUID NOT NULL, + event_type INT NOT NULL, + timestamp timestamptz NOT NULL, + schedule_event_id BIGSERIAL NOT NULL, + attributes BYTEA NOT NULL, + visible_at timestamptz NULL +); + +CREATE INDEX idx_history_instance_id_execution_id ON history (instance_id, execution_id); +CREATE INDEX idx_history_instance_id_execution_id_sequence_id ON history (instance_id, execution_id, sequence_id); + +DROP TABLE IF EXISTS activities; +CREATE TABLE IF NOT EXISTS activities ( + id BIGSERIAL NOT NULL PRIMARY KEY, + activity_id UUID NOT NULL, + instance_id UUID NOT NULL, + execution_id UUID NOT NULL, + event_type INT NOT NULL, + timestamp timestamptz NOT NULL, + schedule_event_id BIGSERIAL NOT NULL, + attributes BYTEA NOT NULL, + visible_at timestamptz NULL, + locked_until timestamptz NULL, + worker VARCHAR(64) NULL +); + +CREATE UNIQUE INDEX idx_activities_instance_id_execution_id_activity_id_worker ON activities (instance_id, execution_id, activity_id, worker); +CREATE INDEX idx_activities_locked_until on activities (locked_until); diff --git a/backend/postgres/db/migrations/000002_attributes_blob.down.sql b/backend/postgres/db/migrations/000002_attributes_blob.down.sql new file mode 100644 index 00000000..251f09f4 --- /dev/null +++ b/backend/postgres/db/migrations/000002_attributes_blob.down.sql @@ -0,0 +1,3 @@ +ALTER TABLE activities ALTER COLUMN attributes TYPE BYTEA, ALTER COLUMN attributes SET NOT NULL; +ALTER TABLE history ALTER COLUMN attributes TYPE BYTEA, ALTER COLUMN attributes SET NOT NULL; +ALTER TABLE pending_events ALTER COLUMN attributes TYPE BYTEA, ALTER COLUMN attributes SET NOT NULL; \ No newline at end of file diff --git a/backend/postgres/db/migrations/000002_attributes_blob.up.sql b/backend/postgres/db/migrations/000002_attributes_blob.up.sql new file mode 100644 index 00000000..251f09f4 --- /dev/null +++ b/backend/postgres/db/migrations/000002_attributes_blob.up.sql @@ -0,0 +1,3 @@ +ALTER TABLE activities ALTER COLUMN attributes TYPE BYTEA, ALTER COLUMN attributes SET NOT NULL; +ALTER TABLE history ALTER COLUMN attributes TYPE BYTEA, ALTER COLUMN attributes SET NOT NULL; +ALTER TABLE pending_events ALTER COLUMN attributes TYPE BYTEA, ALTER COLUMN attributes SET NOT NULL; \ No newline at end of file diff --git a/backend/postgres/db/migrations/000003_add_attributes_table.down.sql b/backend/postgres/db/migrations/000003_add_attributes_table.down.sql new file mode 100644 index 00000000..df94b031 --- /dev/null +++ b/backend/postgres/db/migrations/000003_add_attributes_table.down.sql @@ -0,0 +1,15 @@ +ALTER TABLE activities ADD COLUMN attributes BYTEA, ALTER COLUMN attributes SET NULL; +UPDATE activities SET attributes = attributes.data FROM attributes WHERE activities.event_id = attributes.event_id AND activities.instance_id = attributes.instance_id AND activities.execution_id = attributes.execution_id; +ALTER TABLE activities ALTER COLUMN attributes TYPE BYTEA, ALTER COLUMN attributes SET NOT NULL; + +ALTER TABLE history ADD COLUMN attributes BYTEA, ALTER COLUMN attributes SET NULL; +UPDATE history SET attributes = attributes.data FROM attributes WHERE history.event_id = attributes.event_id AND history.instance_id = attributes.instance_id AND history.execution_id = attributes.execution_id; +ALTER TABLE history ALTER COLUMN attributes TYPE BYTEA, ALTER COLUMN attributes SET NOT NULL; + +ALTER TABLE pending_events ADD COLUMN attributes BYTEA, ALTER COLUMN attributes SET NULL; +UPDATE pending_events SET attributes = attributes.data FROM attributes WHERE pending_events.event_id = attributes.event_id AND pending_events.instance_id = attributes.instance_id AND pending_events.execution_id = attributes.execution_id; +ALTER TABLE pending_events ALTER COLUMN attributes TYPE BYTEA, ALTER COLUMN attributes SET NOT NULL; + + +-- Drop attributes table +DROP TABLE attributes; \ No newline at end of file diff --git a/backend/postgres/db/migrations/000003_add_attributes_table.up.sql b/backend/postgres/db/migrations/000003_add_attributes_table.up.sql new file mode 100644 index 00000000..dae88d88 --- /dev/null +++ b/backend/postgres/db/migrations/000003_add_attributes_table.up.sql @@ -0,0 +1,24 @@ +DROP TABLE IF EXISTS attributes; + +CREATE TABLE attributes ( + id BIGSERIAL NOT NULL PRIMARY KEY, + event_id UUID NOT NULL, + instance_id UUID NOT NULL, + execution_id UUID NOT NULL, + data BYTEA NOT NULL +); + +CREATE UNIQUE INDEX idx_attributes_instance_id_execution_id_event_id on attributes (instance_id, execution_id, event_id); +CREATE INDEX idx_attributes_event_id on attributes (event_id); + +-- Move activity attributes to attributes table +INSERT INTO attributes (event_id, instance_id, execution_id, data) SELECT activity_id, instance_id, execution_id, attributes FROM activities ON CONFLICT DO NOTHING; +ALTER TABLE activities DROP COLUMN attributes; + +-- Move history attributes to attributes table +INSERT INTO attributes (event_id, instance_id, execution_id, data) SELECT event_id, instance_id, execution_id, attributes FROM history ON CONFLICT DO NOTHING; +ALTER TABLE history DROP COLUMN attributes; + +-- Move pending_events attributes to attributes table +INSERT INTO attributes (event_id, instance_id, execution_id, data) SELECT event_id, instance_id, execution_id, attributes FROM pending_events ON CONFLICT DO NOTHING; +ALTER TABLE pending_events DROP COLUMN attributes; diff --git a/backend/postgres/diagnostics.go b/backend/postgres/diagnostics.go new file mode 100644 index 00000000..676ea9ed --- /dev/null +++ b/backend/postgres/diagnostics.go @@ -0,0 +1,119 @@ +package postgresbackend + +import ( + "context" + "database/sql" + "time" + + "github.com/cschleiden/go-workflows/core" + "github.com/cschleiden/go-workflows/diag" +) + +var _ diag.Backend = (*postgresBackend)(nil) + +func (mb *postgresBackend) GetWorkflowInstances(ctx context.Context, afterInstanceID, afterExecutionID string, count int) ([]*diag.WorkflowInstanceRef, error) { + var err error + tx, err := mb.db.BeginTx(ctx, nil) + if err != nil { + return nil, err + } + defer tx.Rollback() + + var rows *sql.Rows + if afterInstanceID != "" { + rows, err = tx.QueryContext( + ctx, + SQLReplacer(`SELECT i.instance_id, i.execution_id, i.created_at, i.completed_at + FROM instances i + INNER JOIN (SELECT instance_id, created_at FROM instances WHERE id = ? AND execution_id = ?) ii + ON i.created_at < ii.created_at OR (i.created_at = ii.created_at AND i.instance_id < ii.instance_id) + ORDER BY i.created_at DESC, i.instance_id DESC + LIMIT ?`), + afterInstanceID, + afterExecutionID, + count, + ) + } else { + rows, err = tx.QueryContext( + ctx, + SQLReplacer(`SELECT i.instance_id, i.execution_id, i.created_at, i.completed_at + FROM instances i + ORDER BY i.created_at DESC, i.instance_id DESC + LIMIT ?`), + count, + ) + } + if err != nil { + return nil, err + } + + defer rows.Close() + + var instances []*diag.WorkflowInstanceRef + + for rows.Next() { + var id, executionID string + var createdAt time.Time + var completedAt *time.Time + err = rows.Scan(&id, &executionID, &createdAt, &completedAt) + if err != nil { + return nil, err + } + + var state core.WorkflowInstanceState + if completedAt != nil { + state = core.WorkflowInstanceStateFinished + } + + instances = append(instances, &diag.WorkflowInstanceRef{ + Instance: core.NewWorkflowInstance(id, executionID), + CreatedAt: createdAt, + CompletedAt: completedAt, + State: state, + }) + } + + return instances, nil +} + +func (mb *postgresBackend) GetWorkflowInstance(ctx context.Context, instance *core.WorkflowInstance) (*diag.WorkflowInstanceRef, error) { + tx, err := mb.db.BeginTx(ctx, nil) + if err != nil { + return nil, err + } + defer tx.Rollback() + + res := tx.QueryRowContext( + ctx, + SQLReplacer("SELECT instance_id, execution_id, created_at, completed_at FROM instances WHERE instance_id = ? AND execution_id = ?"), instance.InstanceID, instance.ExecutionID) + + var id, executionID string + var createdAt time.Time + var completedAt *time.Time + + err = res.Scan(&id, &executionID, &createdAt, &completedAt) + if err != nil { + if err == sql.ErrNoRows { + return nil, nil + } + + return nil, err + } + + var state core.WorkflowInstanceState + if completedAt != nil { + state = core.WorkflowInstanceStateFinished + } + + return &diag.WorkflowInstanceRef{ + Instance: core.NewWorkflowInstance(id, executionID), + CreatedAt: createdAt, + CompletedAt: completedAt, + State: state, + }, nil +} + +func (mb *postgresBackend) GetWorkflowTree(ctx context.Context, instance *core.WorkflowInstance) (*diag.WorkflowInstanceTree, error) { + itb := diag.NewInstanceTreeBuilder(mb) + return itb.BuildWorkflowInstanceTree(ctx, instance) +} diff --git a/backend/postgres/events.go b/backend/postgres/events.go new file mode 100644 index 00000000..b3426324 --- /dev/null +++ b/backend/postgres/events.go @@ -0,0 +1,84 @@ +package postgresbackend + +import ( + "context" + "database/sql" + "fmt" + "strings" + + "github.com/cschleiden/go-workflows/backend/history" + "github.com/cschleiden/go-workflows/core" +) + +func insertPendingEvents(ctx context.Context, tx *sql.Tx, instance *core.WorkflowInstance, newEvents []*history.Event) error { + return insertEvents(ctx, tx, "pending_events", instance, newEvents) +} + +func insertHistoryEvents(ctx context.Context, tx *sql.Tx, instance *core.WorkflowInstance, historyEvents []*history.Event) error { + return insertEvents(ctx, tx, "history", instance, historyEvents) +} + +func insertEvents(ctx context.Context, tx *sql.Tx, tableName string, instance *core.WorkflowInstance, events []*history.Event) error { + const batchSize = 20 + for batchStart := 0; batchStart < len(events); batchStart += batchSize { + batchEnd := batchStart + batchSize + if batchEnd > len(events) { + batchEnd = len(events) + } + batchEvents := events[batchStart:batchEnd] + + aquery := "INSERT INTO attributes (event_id, instance_id, execution_id, data) VALUES (?, ?, ?, ?)" + strings.Repeat(", (?, ?, ?, ?)", len(batchEvents)-1) + " ON CONFLICT DO NOTHING" + aargs := make([]interface{}, 0, len(batchEvents)*4) + + query := "INSERT INTO " + tableName + + " (event_id, sequence_id, instance_id, execution_id, event_type, timestamp, schedule_event_id, visible_at) VALUES (?, ?, ?, ?, ?, ?, ?, ?)" + + strings.Repeat(", (?, ?, ?, ?, ?, ?, ?, ?)", len(batchEvents)-1) + + args := make([]interface{}, 0, len(batchEvents)*8) + + for _, newEvent := range batchEvents { + a, err := history.SerializeAttributes(newEvent.Attributes) + if err != nil { + return err + } + + aargs = append(aargs, newEvent.ID, instance.InstanceID, instance.ExecutionID, a) + + args = append( + args, + newEvent.ID, newEvent.SequenceID, instance.InstanceID, instance.ExecutionID, newEvent.Type, newEvent.Timestamp, newEvent.ScheduleEventID, newEvent.VisibleAt) + } + + if _, err := tx.ExecContext( + ctx, + SQLReplacer(aquery), + aargs..., + ); err != nil { + return fmt.Errorf("inserting attributes: %w", err) + } + + _, err := tx.ExecContext( + ctx, + SQLReplacer(query), + args..., + ) + if err != nil { + return fmt.Errorf("inserting events: %w", err) + } + } + + return nil +} + +func removeFutureEvent(ctx context.Context, tx *sql.Tx, instance *core.WorkflowInstance, scheduleEventID int64) error { + _, err := tx.ExecContext( + ctx, + SQLReplacer( + "DELETE pending_events, attributes FROM pending_events INNER JOIN attributes ON pending_events.event_id = attributes.event_id WHERE pending_events.instance_id = ? AND pending_events.execution_id = ? AND pending_events.schedule_event_id = ? AND pending_events.visible_at IS NOT NULL"), + instance.InstanceID, + instance.ExecutionID, + scheduleEventID, + ) + + return err +} diff --git a/backend/postgres/options.go b/backend/postgres/options.go new file mode 100644 index 00000000..b60df046 --- /dev/null +++ b/backend/postgres/options.go @@ -0,0 +1,40 @@ +package postgresbackend + +import ( + "database/sql" + + "github.com/cschleiden/go-workflows/backend" +) + +type options struct { + backend.Options + + PostgreSQLOptions func(db *sql.DB) + + // ApplyMigrations automatically applies database migrations on startup. + ApplyMigrations bool +} + +type option func(*options) + +// WithApplyMigrations automatically applies database migrations on startup. +func WithApplyMigrations(applyMigrations bool) option { + return func(o *options) { + o.ApplyMigrations = applyMigrations + } +} + +func WithPostgreSQLOptions(f func(db *sql.DB)) option { + return func(o *options) { + o.PostgreSQLOptions = f + } +} + +// WithBackendOptions allows to pass generic backend options. +func WithBackendOptions(opts ...backend.BackendOption) option { + return func(o *options) { + for _, opt := range opts { + opt(&o.Options) + } + } +} diff --git a/backend/postgres/postgres.go b/backend/postgres/postgres.go new file mode 100644 index 00000000..303d3f78 --- /dev/null +++ b/backend/postgres/postgres.go @@ -0,0 +1,856 @@ +package postgresbackend + +import ( + "context" + "database/sql" + "embed" + _ "embed" + "encoding/json" + "errors" + "fmt" + "log/slog" + "strings" + "time" + + "github.com/cschleiden/go-workflows/backend" + "github.com/cschleiden/go-workflows/backend/converter" + "github.com/cschleiden/go-workflows/backend/history" + "github.com/cschleiden/go-workflows/backend/metadata" + "github.com/cschleiden/go-workflows/backend/metrics" + "github.com/cschleiden/go-workflows/core" + "github.com/cschleiden/go-workflows/internal/metrickeys" + "github.com/cschleiden/go-workflows/internal/workflowerrors" + "github.com/cschleiden/go-workflows/workflow" + "github.com/golang-migrate/migrate/v4" + "github.com/golang-migrate/migrate/v4/database/postgres" + "github.com/golang-migrate/migrate/v4/source/iofs" + "github.com/google/uuid" + _ "github.com/jackc/pgx/v5/stdlib" + "go.opentelemetry.io/otel/trace" +) + +//go:embed db/migrations/*.sql +var migrationsFS embed.FS + +func SQLReplacer(src string) string { + for nParam := 1; strings.Contains(src, "?"); nParam++ { + src = strings.Replace(src, "?", fmt.Sprintf("$%d", nParam), 1) + } + return src +} + +func NewPostgresBackend(host string, port int, user, password, database string, opts ...option) *postgresBackend { + options := &options{ + Options: backend.ApplyOptions(), + ApplyMigrations: true, + } + + for _, opt := range opts { + opt(options) + } + + dsn := fmt.Sprintf("host=%s port=%d user=%s password=%s dbname=%s sslmode=require", host, port, user, password, database) + + db, err := sql.Open("pgx", dsn) + if err != nil { + panic(err) + } + + if options.PostgreSQLOptions != nil { + options.PostgreSQLOptions(db) + } + + b := &postgresBackend{ + dsn: dsn, + db: db, + workerName: fmt.Sprintf("worker-%v", uuid.NewString()), + options: options, + } + + if options.ApplyMigrations { + if err := b.Migrate(); err != nil { + panic(err) + } + } + + return b +} + +type postgresBackend struct { + dsn string + db *sql.DB + workerName string + options *options +} + +func (mb *postgresBackend) Close() error { + return mb.db.Close() +} + +// Migrate applies any pending database migrations. +func (mb *postgresBackend) Migrate() error { + schemaDsn := mb.dsn + db, err := sql.Open("pgx", schemaDsn) + if err != nil { + return fmt.Errorf("opening schema database: %w", err) + } + + dbi, err := postgres.WithInstance(db, &postgres.Config{}) + if err != nil { + return fmt.Errorf("creating migration instance: %w", err) + } + + migrations, err := iofs.New(migrationsFS, "db/migrations") + if err != nil { + return fmt.Errorf("creating migration source: %w", err) + } + + m, err := migrate.NewWithInstance("iofs", migrations, "postgres", dbi) + if err != nil { + return fmt.Errorf("creating migration: %w", err) + } + + if err := m.Up(); err != nil { + if !errors.Is(err, migrate.ErrNoChange) { + return fmt.Errorf("running migrations: %w", err) + } + } + + if err := db.Close(); err != nil { + return fmt.Errorf("closing schema database: %w", err) + } + + return nil +} + +func (b *postgresBackend) Logger() *slog.Logger { + return b.options.Logger +} + +func (b *postgresBackend) Tracer() trace.Tracer { + return b.options.TracerProvider.Tracer(backend.TracerName) +} + +func (b *postgresBackend) Metrics() metrics.Client { + return b.options.Metrics.WithTags(metrics.Tags{metrickeys.Backend: "postgres"}) +} + +func (b *postgresBackend) Converter() converter.Converter { + return b.options.Converter +} + +func (b *postgresBackend) ContextPropagators() []workflow.ContextPropagator { + return b.options.ContextPropagators +} + +func (b *postgresBackend) CreateWorkflowInstance(ctx context.Context, instance *workflow.Instance, event *history.Event) error { + tx, err := b.db.BeginTx(ctx, &sql.TxOptions{ + Isolation: sql.LevelReadCommitted, + }) + if err != nil { + return fmt.Errorf("starting transaction: %w", err) + } + defer tx.Rollback() + + // Create workflow instance + if err := createInstance(ctx, tx, instance, event.Attributes.(*history.ExecutionStartedAttributes).Metadata); err != nil { + return err + } + + // Initial history is empty, store only new events + if err := insertPendingEvents(ctx, tx, instance, []*history.Event{event}); err != nil { + return fmt.Errorf("inserting new event: %w", err) + } + + if err := tx.Commit(); err != nil { + return fmt.Errorf("creating workflow instance: %w", err) + } + + return nil +} + +func (b *postgresBackend) RemoveWorkflowInstance(ctx context.Context, instance *core.WorkflowInstance) error { + tx, err := b.db.BeginTx(ctx, nil) + if err != nil { + return err + } + defer tx.Rollback() + + row := tx.QueryRowContext(ctx, SQLReplacer("SELECT state FROM instances WHERE instance_id = ? AND execution_id = ? LIMIT 1"), instance.InstanceID, instance.ExecutionID) + var state core.WorkflowInstanceState + if err := row.Scan(&state); err != nil { + if err == sql.ErrNoRows { + return backend.ErrInstanceNotFound + } + } + + if state == core.WorkflowInstanceStateActive { + return backend.ErrInstanceNotFinished + } + + // Delete from instances and history tables + if _, err := tx.ExecContext(ctx, SQLReplacer("DELETE FROM instances WHERE instance_id = ? AND execution_id = ?"), instance.InstanceID, instance.ExecutionID); err != nil { + return err + } + + if _, err := tx.ExecContext(ctx, SQLReplacer("DELETE FROM history WHERE instance_id = ? AND execution_id = ?"), instance.InstanceID, instance.ExecutionID); err != nil { + return err + } + + if _, err := tx.ExecContext(ctx, SQLReplacer("DELETE FROM attributes WHERE instance_id = ? AND execution_id = ?"), instance.InstanceID, instance.ExecutionID); err != nil { + return err + } + + return tx.Commit() +} + +func (b *postgresBackend) CancelWorkflowInstance(ctx context.Context, instance *workflow.Instance, event *history.Event) error { + tx, err := b.db.BeginTx(ctx, &sql.TxOptions{ + Isolation: sql.LevelReadCommitted, + }) + if err != nil { + return err + } + defer tx.Rollback() + + // Cancel workflow instance + // TODO: Combine this with the event insertion + res := tx.QueryRowContext(ctx, SQLReplacer("SELECT 1 FROM instances WHERE instance_id = ? AND execution_id = ? LIMIT 1"), instance.InstanceID, instance.ExecutionID) + if err := res.Scan(new(int)); err != nil { + if err == sql.ErrNoRows { + return backend.ErrInstanceNotFound + } + + return err + } + + if err := insertPendingEvents(ctx, tx, instance, []*history.Event{event}); err != nil { + return fmt.Errorf("inserting cancellation event: %w", err) + } + + return tx.Commit() +} + +func (b *postgresBackend) GetWorkflowInstanceHistory(ctx context.Context, instance *workflow.Instance, lastSequenceID *int64) ([]*history.Event, error) { + tx, err := b.db.BeginTx(ctx, nil) + if err != nil { + return nil, err + } + defer tx.Rollback() + + var historyEvents *sql.Rows + if lastSequenceID != nil { + historyEvents, err = tx.QueryContext( + ctx, + SQLReplacer("SELECT h.event_id, h.sequence_id, h.event_type, h.timestamp, h.schedule_event_id, a.data, h.visible_at FROM history h JOIN attributes a ON h.event_id = a.event_id AND a.instance_id = h.instance_id AND a.execution_id = h.execution_id WHERE h.instance_id = ? AND h.execution_id = ? AND h.sequence_id > ? ORDER BY h.sequence_id"), + instance.InstanceID, + instance.ExecutionID, + *lastSequenceID, + ) + } else { + historyEvents, err = tx.QueryContext( + ctx, + SQLReplacer("SELECT h.event_id, h.sequence_id, h.event_type, h.timestamp, h.schedule_event_id, a.data, h.visible_at FROM history h JOIN attributes a ON h.event_id = a.event_id AND a.instance_id = h.instance_id AND a.execution_id = h.execution_id WHERE h.instance_id = ? AND h.execution_id = ? ORDER BY h.sequence_id"), + instance.InstanceID, + instance.ExecutionID, + ) + } + if err != nil { + return nil, fmt.Errorf("getting history: %w", err) + } + + defer historyEvents.Close() + + h := make([]*history.Event, 0) + + for historyEvents.Next() { + var attributes []byte + + historyEvent := &history.Event{} + + if err := historyEvents.Scan( + &historyEvent.ID, + &historyEvent.SequenceID, + &historyEvent.Type, + &historyEvent.Timestamp, + &historyEvent.ScheduleEventID, + &attributes, + &historyEvent.VisibleAt, + ); err != nil { + return nil, fmt.Errorf("scanning event: %w", err) + } + + a, err := history.DeserializeAttributes(historyEvent.Type, attributes) + if err != nil { + return nil, fmt.Errorf("deserializing attributes: %w", err) + } + + historyEvent.Attributes = a + + h = append(h, historyEvent) + } + + return h, nil +} + +func (b *postgresBackend) GetWorkflowInstanceState(ctx context.Context, instance *workflow.Instance) (core.WorkflowInstanceState, error) { + row := b.db.QueryRowContext( + ctx, + SQLReplacer("SELECT state FROM instances WHERE instance_id = ? AND execution_id = ?"), + instance.InstanceID, + instance.ExecutionID, + ) + + var state core.WorkflowInstanceState + if err := row.Scan(&state); err != nil { + if err == sql.ErrNoRows { + return core.WorkflowInstanceStateActive, backend.ErrInstanceNotFound + } + } + + return state, nil +} + +func createInstance(ctx context.Context, tx *sql.Tx, wfi *workflow.Instance, metadata *workflow.Metadata) error { + // Check for existing instance + if err := tx.QueryRowContext(ctx, SQLReplacer("SELECT 1 FROM instances WHERE instance_id = ? AND state = ? LIMIT 1"), wfi.InstanceID, core.WorkflowInstanceStateActive). + Scan(new(int)); err != sql.ErrNoRows { + return backend.ErrInstanceAlreadyExists + } + + var parentInstanceID, parentExecutionID *string + var parentEventID *int64 + if wfi.SubWorkflow() { + parentInstanceID = &wfi.Parent.InstanceID + parentExecutionID = &wfi.Parent.ExecutionID + parentEventID = &wfi.ParentEventID + } + + metadataJson, err := json.Marshal(metadata) + if err != nil { + return fmt.Errorf("marshaling metadata: %w", err) + } + + _, err = tx.ExecContext( + ctx, + SQLReplacer("INSERT INTO instances (instance_id, execution_id, parent_instance_id, parent_execution_id, parent_schedule_event_id, metadata, state) VALUES (?, ?, ?, ?, ?, ?, ?)"), + wfi.InstanceID, + wfi.ExecutionID, + parentInstanceID, + parentExecutionID, + parentEventID, + string(metadataJson), + core.WorkflowInstanceStateActive, + ) + if err != nil { + return fmt.Errorf("inserting workflow instance: %w", err) + } + + return nil +} + +// SignalWorkflow signals a running workflow instance +func (b *postgresBackend) SignalWorkflow(ctx context.Context, instanceID string, event *history.Event) error { + tx, err := b.db.BeginTx(ctx, &sql.TxOptions{ + Isolation: sql.LevelReadCommitted, + }) + if err != nil { + return err + } + defer tx.Rollback() + + // TODO: Combine this with the event insertion + res := tx.QueryRowContext(ctx, SQLReplacer("SELECT execution_id FROM instances WHERE instance_id = ? AND state = ? LIMIT 1"), instanceID, core.WorkflowInstanceStateActive) + var executionID string + if err := res.Scan(&executionID); err == sql.ErrNoRows { + return backend.ErrInstanceNotFound + } + + instance := core.NewWorkflowInstance(instanceID, executionID) + + if err := insertPendingEvents(ctx, tx, instance, []*history.Event{event}); err != nil { + return fmt.Errorf("inserting signal event: %w", err) + } + + return tx.Commit() +} + +// GetWorkflowInstance returns a pending workflow task or nil if there are no pending worflow executions +func (b *postgresBackend) GetWorkflowTask(ctx context.Context) (*backend.WorkflowTask, error) { + tx, err := b.db.BeginTx(ctx, &sql.TxOptions{ + Isolation: sql.LevelReadCommitted, + }) + if err != nil { + return nil, err + } + defer tx.Rollback() + + // Lock next workflow task by finding an unlocked instance with new events to process. + now := time.Now() + row := tx.QueryRowContext( + ctx, + SQLReplacer(`SELECT i.id, i.instance_id, i.execution_id, i.parent_instance_id, i.parent_execution_id, i.parent_schedule_event_id, i.metadata, i.sticky_until + FROM instances i + INNER JOIN pending_events pe ON i.instance_id = pe.instance_id + WHERE + state = ? AND i.completed_at IS NULL + AND (pe.visible_at IS NULL OR pe.visible_at <= ?) + AND (i.locked_until IS NULL OR i.locked_until < ?) + AND (i.sticky_until IS NULL OR i.sticky_until < ? OR i.worker = ?) + LIMIT 1 + FOR UPDATE OF i SKIP LOCKED`), + core.WorkflowInstanceStateActive, // state + now, // event.visible_at + now, // locked_until + now, // sticky_until + b.workerName, // worker + ) + + var id int + var instanceID, executionID string + var parentInstanceID, parentExecutionID *string + var parentEventID *int64 + var metadataJson sql.NullString + var stickyUntil *time.Time + if err := row.Scan(&id, &instanceID, &executionID, &parentInstanceID, &parentExecutionID, &parentEventID, &metadataJson, &stickyUntil); err != nil { + if err == sql.ErrNoRows { + return nil, nil + } + + return nil, fmt.Errorf("scanning workflow instance: %w", err) + } + + res, err := tx.ExecContext( + ctx, + SQLReplacer(`UPDATE instances i + SET locked_until = ?, worker = ? + WHERE id = ?`), + now.Add(b.options.WorkflowLockTimeout), + b.workerName, + id, + ) + if err != nil { + return nil, fmt.Errorf("locking workflow instance: %w", err) + } + + if affectedRows, err := res.RowsAffected(); err != nil { + return nil, fmt.Errorf("locking workflow instance: %w", err) + } else if affectedRows == 0 { + // No instance locked? + return nil, nil + } + + var wfi *workflow.Instance + if parentInstanceID != nil { + wfi = core.NewSubWorkflowInstance(instanceID, executionID, core.NewWorkflowInstance(*parentInstanceID, *parentExecutionID), *parentEventID) + } else { + wfi = core.NewWorkflowInstance(instanceID, executionID) + } + + var metadata *metadata.WorkflowMetadata + if metadataJson.Valid { + if err := json.Unmarshal([]byte(metadataJson.String), &metadata); err != nil { + return nil, fmt.Errorf("parsing workflow metadata: %w", err) + } + } + + t := &backend.WorkflowTask{ + ID: wfi.InstanceID, + WorkflowInstance: wfi, + WorkflowInstanceState: core.WorkflowInstanceStateActive, + Metadata: metadata, + NewEvents: []*history.Event{}, + } + + // Get new events + events, err := tx.QueryContext( + ctx, + SQLReplacer("SELECT pe.event_id, pe.sequence_id, pe.event_type, pe.timestamp, pe.schedule_event_id, a.data, pe.visible_at FROM pending_events pe LEFT JOIN attributes a ON pe.instance_id = a.instance_id AND pe.execution_id = a.execution_id AND pe.event_id = a.event_id WHERE pe.instance_id = ? AND pe.execution_id = ? AND (pe.visible_at IS NULL OR pe.visible_at <= ?) ORDER BY pe.id"), + instanceID, + executionID, + now, + ) + if err != nil { + return nil, fmt.Errorf("getting new events: %w", err) + } + + defer events.Close() + + for events.Next() { + var attributes []byte + + historyEvent := &history.Event{} + + if err := events.Scan( + &historyEvent.ID, + &historyEvent.SequenceID, + &historyEvent.Type, + &historyEvent.Timestamp, + &historyEvent.ScheduleEventID, + &attributes, + &historyEvent.VisibleAt, + ); err != nil { + return nil, fmt.Errorf("scanning event: %w", err) + } + + a, err := history.DeserializeAttributes(historyEvent.Type, attributes) + if err != nil { + return nil, fmt.Errorf("deserializing attributes: %w", err) + } + + historyEvent.Attributes = a + + t.NewEvents = append(t.NewEvents, historyEvent) + } + + // Return if there aren't any new events + if len(t.NewEvents) == 0 { + return nil, nil + } + + // Get most recent sequence id + var lastSequenceID sql.NullInt64 + row = tx.QueryRowContext(ctx, SQLReplacer("SELECT MAX(sequence_id) FROM history WHERE instance_id = ? AND execution_id = ?"), instanceID, executionID) + if err := row.Scan( + &lastSequenceID, + ); err != nil { + if err != sql.ErrNoRows { + return nil, fmt.Errorf("getting most recent sequence id: %w", err) + } + } + + if lastSequenceID.Valid { + t.LastSequenceID = lastSequenceID.Int64 + } + + if err := tx.Commit(); err != nil { + return nil, err + } + + return t, nil +} + +// CompleteWorkflowTask completes a workflow task retrieved using GetWorkflowTask +// +// This checkpoints the execution. events are new events from the last workflow execution +// which will be added to the workflow instance history. workflowEvents are new events for the +// completed or other workflow instances. +func (b *postgresBackend) CompleteWorkflowTask( + ctx context.Context, + task *backend.WorkflowTask, + instance *workflow.Instance, + state core.WorkflowInstanceState, + executedEvents, activityEvents, timerEvents []*history.Event, + workflowEvents []history.WorkflowEvent, +) error { + tx, err := b.db.BeginTx(ctx, &sql.TxOptions{ + Isolation: sql.LevelReadCommitted, + }) + if err != nil { + return err + } + defer tx.Rollback() + + // Unlock instance, but keep it sticky to the current worker + var completedAt *time.Time + if state == core.WorkflowInstanceStateContinuedAsNew || state == core.WorkflowInstanceStateFinished { + t := time.Now() + completedAt = &t + } + + res, err := tx.ExecContext( + ctx, + SQLReplacer(`UPDATE instances SET locked_until = NULL, sticky_until = ?, completed_at = ?, state = ? WHERE instance_id = ? AND execution_id = ? AND worker = ?`), + time.Now().Add(b.options.StickyTimeout), + completedAt, + state, + instance.InstanceID, + instance.ExecutionID, + b.workerName, + ) + if err != nil { + return fmt.Errorf("unlocking instance: %w", err) + } + + changedRows, err := res.RowsAffected() + if err != nil { + return fmt.Errorf("checking for unlocked workflow instances: %w", err) + } else if changedRows != 1 { + return errors.New("could not find workflow instance to unlock") + } + + // Remove handled events from task + if len(executedEvents) > 0 { + args := make([]interface{}, 0, len(executedEvents)+1) + args = append(args, instance.InstanceID, instance.ExecutionID) + for _, e := range executedEvents { + args = append(args, e.ID) + } + + if _, err := tx.ExecContext( + ctx, + SQLReplacer(fmt.Sprintf(`DELETE FROM pending_events WHERE instance_id = ? AND execution_id = ? AND event_id IN (?%v)`, strings.Repeat(",?", len(executedEvents)-1))), + args..., + ); err != nil { + return fmt.Errorf("deleting handled new events: %w", err) + } + } + + // Insert new events generated during this workflow execution to the history + if err := insertHistoryEvents(ctx, tx, instance, executedEvents); err != nil { + return fmt.Errorf("inserting new history events: %w", err) + } + + // Schedule activities + for _, e := range activityEvents { + if err := scheduleActivity(ctx, tx, instance, e); err != nil { + return fmt.Errorf("scheduling activity: %w", err) + } + } + + // Timer events + if err := insertPendingEvents(ctx, tx, instance, timerEvents); err != nil { + return fmt.Errorf("scheduling timers: %w", err) + } + + for _, event := range executedEvents { + switch event.Type { + case history.EventType_TimerCanceled: + if err := removeFutureEvent(ctx, tx, instance, event.ScheduleEventID); err != nil { + return fmt.Errorf("removing future event: %w", err) + } + } + } + + // Insert new workflow events + groupedEvents := history.EventsByWorkflowInstance(workflowEvents) + + for targetInstance, events := range groupedEvents { + // Are we creating a new sub-workflow instance? + m := events[0] + if m.HistoryEvent.Type == history.EventType_WorkflowExecutionStarted { + a := m.HistoryEvent.Attributes.(*history.ExecutionStartedAttributes) + // Create new instance + if err := createInstance(ctx, tx, m.WorkflowInstance, a.Metadata); err != nil { + if err == backend.ErrInstanceAlreadyExists { + if err := insertPendingEvents(ctx, tx, instance, []*history.Event{ + history.NewPendingEvent(time.Now(), history.EventType_SubWorkflowFailed, &history.SubWorkflowFailedAttributes{ + Error: workflowerrors.FromError(backend.ErrInstanceAlreadyExists), + }, history.ScheduleEventID(m.WorkflowInstance.ParentEventID)), + }); err != nil { + return fmt.Errorf("inserting sub-workflow failed event: %w", err) + } + continue + } + + return fmt.Errorf("creating sub-workflow instance: %w", err) + } + } + + // Insert pending events for target instance + historyEvents := []*history.Event{} + for _, m := range events { + historyEvents = append(historyEvents, m.HistoryEvent) + } + if err := insertPendingEvents(ctx, tx, &targetInstance, historyEvents); err != nil { + return fmt.Errorf("inserting messages: %w", err) + } + } + + if err := tx.Commit(); err != nil { + return fmt.Errorf("committing complete workflow transaction: %w", err) + } + + return nil +} + +func (b *postgresBackend) ExtendWorkflowTask(ctx context.Context, taskID string, instance *core.WorkflowInstance) error { + tx, err := b.db.BeginTx(ctx, nil) + if err != nil { + return err + } + defer tx.Rollback() + + until := time.Now().Add(b.options.WorkflowLockTimeout) + res, err := tx.ExecContext( + ctx, + SQLReplacer(`UPDATE instances SET locked_until = ? WHERE instance_id = ? AND execution_id = ? AND worker = ?`), + until, + instance.InstanceID, + instance.ExecutionID, + b.workerName, + ) + if err != nil { + return fmt.Errorf("extending workflow task lock: %w", err) + } + + if rowsAffected, err := res.RowsAffected(); err != nil { + return fmt.Errorf("determining if workflow task was extended: %w", err) + } else if rowsAffected == 0 { + return errors.New("could not extend workflow task") + } + + return tx.Commit() +} + +// GetActivityTask returns a pending activity task or nil if there are no pending activities +func (b *postgresBackend) GetActivityTask(ctx context.Context) (*backend.ActivityTask, error) { + tx, err := b.db.BeginTx(ctx, &sql.TxOptions{ + Isolation: sql.LevelReadCommitted, + }) + if err != nil { + return nil, err + } + defer tx.Rollback() + + // Lock next activity + now := time.Now() + res := tx.QueryRowContext( + ctx, + SQLReplacer(`SELECT a.id, a.activity_id, a.instance_id, a.execution_id, + a.event_type, a.timestamp, a.schedule_event_id, at.data, a.visible_at + FROM activities a + JOIN attributes at ON at.event_id = a.activity_id AND at.instance_id = a.instance_id AND at.execution_id = a.execution_id + WHERE a.locked_until IS NULL OR a.locked_until < ? + LIMIT 1 + FOR UPDATE SKIP LOCKED`), + now, + ) + + var id int64 + var instanceID, executionID string + var attributes []byte + event := &history.Event{} + + if err := res.Scan( + &id, &event.ID, &instanceID, &executionID, &event.Type, + &event.Timestamp, &event.ScheduleEventID, &attributes, &event.VisibleAt); err != nil { + if err == sql.ErrNoRows { + return nil, nil + } + + return nil, fmt.Errorf("finding activity task to lock: %w", err) + } + + a, err := history.DeserializeAttributes(event.Type, attributes) + if err != nil { + return nil, fmt.Errorf("deserializing attributes: %w", err) + } + + event.Attributes = a + + if _, err := tx.ExecContext( + ctx, + SQLReplacer(`UPDATE activities SET locked_until = ?, worker = ? WHERE id = ?`), + now.Add(b.options.ActivityLockTimeout), + b.workerName, + id, + ); err != nil { + return nil, fmt.Errorf("locking activity: %w", err) + } + + t := &backend.ActivityTask{ + ID: event.ID, + WorkflowInstance: core.NewWorkflowInstance(instanceID, executionID), + Event: event, + } + + if err := tx.Commit(); err != nil { + return nil, err + } + + return t, nil +} + +// CompleteActivityTask completes a activity task retrieved using GetActivityTask +func (b *postgresBackend) CompleteActivityTask(ctx context.Context, instance *workflow.Instance, id string, event *history.Event) error { + tx, err := b.db.BeginTx(ctx, &sql.TxOptions{ + Isolation: sql.LevelReadCommitted, + }) + if err != nil { + return err + } + defer tx.Rollback() + + // Remove activity + if res, err := tx.ExecContext( + ctx, + SQLReplacer(`DELETE FROM activities WHERE activity_id = ? AND instance_id = ? AND execution_id = ? AND worker = ?`), + id, + instance.InstanceID, + instance.ExecutionID, + b.workerName, + ); err != nil { + return fmt.Errorf("completing activity: %w", err) + } else { + affected, err := res.RowsAffected() + if err != nil { + return fmt.Errorf("checking for completed activity: %w", err) + } + + if affected == 0 { + return errors.New("could not find locked activity") + } + } + + // Insert new event generated during this workflow execution + if err := insertPendingEvents(ctx, tx, instance, []*history.Event{event}); err != nil { + return fmt.Errorf("inserting new events for completed activity: %w", err) + } + + if err := tx.Commit(); err != nil { + return err + } + + return nil +} + +func (b *postgresBackend) ExtendActivityTask(ctx context.Context, activityID string) error { + tx, err := b.db.BeginTx(ctx, nil) + if err != nil { + return err + } + defer tx.Rollback() + + until := time.Now().Add(b.options.ActivityLockTimeout) + _, err = tx.ExecContext( + ctx, + SQLReplacer(`UPDATE activities SET locked_until = ? WHERE activity_id = ? AND worker = ?`), + until, + activityID, + b.workerName, + ) + if err != nil { + return fmt.Errorf("extending activity lock: %w", err) + } + + if err := tx.Commit(); err != nil { + if errors.Is(err, sql.ErrTxDone) { + return nil + } + + return err + } + + return nil +} + +func scheduleActivity(ctx context.Context, tx *sql.Tx, instance *core.WorkflowInstance, event *history.Event) error { + // Attributes are already persisted via the history, we do not need to add them again. + + _, err := tx.ExecContext( + ctx, + SQLReplacer( + `INSERT INTO activities + (activity_id, instance_id, execution_id, event_type, timestamp, schedule_event_id, visible_at) VALUES (?, ?, ?, ?, ?, ?, ?)`), + event.ID, + instance.InstanceID, + instance.ExecutionID, + event.Type, + event.Timestamp, + event.ScheduleEventID, + event.VisibleAt, + ) + + return err +} diff --git a/backend/postgres/stats.go b/backend/postgres/stats.go new file mode 100644 index 00000000..5ac232b2 --- /dev/null +++ b/backend/postgres/stats.go @@ -0,0 +1,85 @@ +package postgresbackend + +import ( + "context" + "database/sql" + "fmt" + "time" + + "github.com/cschleiden/go-workflows/backend" + "github.com/cschleiden/go-workflows/core" +) + +func (b *postgresBackend) GetStats(ctx context.Context) (*backend.Stats, error) { + s := &backend.Stats{} + + tx, err := b.db.BeginTx(ctx, &sql.TxOptions{ + Isolation: sql.LevelReadCommitted, + }) + if err != nil { + return nil, fmt.Errorf("failed to start transaction: %w", err) + } + defer tx.Rollback() + + // Get active instances + row := tx.QueryRowContext( + ctx, + "SELECT COUNT(*) FROM instances i WHERE i.completed_at IS NULL", + ) + if err := row.Err(); err != nil { + return nil, fmt.Errorf("failed to query active instances: %w", err) + } + + var activeInstances int64 + if err := row.Scan(&activeInstances); err != nil { + return nil, fmt.Errorf("failed to scan active instances: %w", err) + } + + s.ActiveWorkflowInstances = activeInstances + + // Get workflow instances ready to be picked up + now := time.Now() + row = tx.QueryRowContext( + ctx, + SQLReplacer( + `SELECT COUNT(*) + FROM instances i + INNER JOIN pending_events pe ON i.instance_id = pe.instance_id + WHERE + state = ? AND i.completed_at IS NULL + AND (pe.visible_at IS NULL OR pe.visible_at <= ?) + AND (i.locked_until IS NULL OR i.locked_until < ?) + LIMIT 1 + FOR UPDATE OF i SKIP LOCKED`), + core.WorkflowInstanceStateActive, + now, // event.visible_at + now, // locked_until + ) + if err := row.Err(); err != nil { + return nil, fmt.Errorf("failed to query active instances: %w", err) + } + + var pendingInstances int64 + if err := row.Scan(&pendingInstances); err != nil { + return nil, fmt.Errorf("failed to scan active instances: %w", err) + } + + s.PendingWorkflowTasks = pendingInstances + + // Get pending activities + row = tx.QueryRowContext( + ctx, + "SELECT COUNT(*) FROM activities") + if err := row.Err(); err != nil { + return nil, fmt.Errorf("failed to query active activities: %w", err) + } + + var pendingActivities int64 + if err := row.Scan(&pendingActivities); err != nil { + return nil, fmt.Errorf("failed to scan active activities: %w", err) + } + + s.PendingActivities = pendingActivities + + return s, nil +}