Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

postgres support #360

Open
wants to merge 5 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions backend/postgres/db/migrations/000001_initial.down.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
DROP TABLE IF EXISTS instances;
DROP TABLE IF EXISTS pending_events;
DROP TABLE IF EXISTS history;
DROP TABLE IF EXISTS activities;
73 changes: 73 additions & 0 deletions backend/postgres/db/migrations/000001_initial.up.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,73 @@
DROP TABLE IF EXISTS instances;

CREATE TABLE instances (
id BIGSERIAL NOT NULL PRIMARY KEY,
instance_id UUID NOT NULL,
execution_id UUID NOT NULL,
parent_instance_id UUID NULL,
parent_execution_id UUID NULL,
parent_schedule_event_id NUMERIC NULL,
metadata BYTEA NULL,
state INT NOT NULL,
created_at timestamptz NOT NULL DEFAULT CURRENT_TIMESTAMP,
completed_at timestamptz NULL,
locked_until timestamptz NULL,
sticky_until timestamptz NULL,
worker VARCHAR(64) NULL
);

CREATE UNIQUE INDEX idx_instances_instance_id_execution_id on instances (instance_id, execution_id);
CREATE INDEX idx_instances_locked_until_completed_at on instances (completed_at, locked_until, sticky_until, worker);
CREATE INDEX idx_instances_parent_instance_id_parent_execution_id ON instances (parent_instance_id, parent_execution_id);

DROP TABLE IF EXISTS pending_events;
CREATE TABLE pending_events (
id BIGSERIAL NOT NULL PRIMARY KEY,
event_id UUID NOT NULL,
sequence_id BIGSERIAL NOT NULL, -- Not used, but keep for now for query compat
instance_id UUID NOT NULL,
execution_id UUID NOT NULL,
event_type INT NOT NULL,
timestamp timestamptz NOT NULL,
schedule_event_id BIGSERIAL NOT NULL,
attributes BYTEA NOT NULL,
visible_at timestamptz NULL
);

CREATE INDEX idx_pending_events_inid_exid ON pending_events (instance_id, execution_id);
CREATE INDEX idx_pending_events_inid_exid_visible_at_schedule_event_id ON pending_events (instance_id, execution_id, visible_at, schedule_event_id);

DROP TABLE IF EXISTS history;
CREATE TABLE IF NOT EXISTS history (
id BIGSERIAL NOT NULL PRIMARY KEY,
event_id UUID NOT NULL,
sequence_id BIGSERIAL NOT NULL,
instance_id UUID NOT NULL,
execution_id UUID NOT NULL,
event_type INT NOT NULL,
timestamp timestamptz NOT NULL,
schedule_event_id BIGSERIAL NOT NULL,
attributes BYTEA NOT NULL,
visible_at timestamptz NULL
);

CREATE INDEX idx_history_instance_id_execution_id ON history (instance_id, execution_id);
CREATE INDEX idx_history_instance_id_execution_id_sequence_id ON history (instance_id, execution_id, sequence_id);

DROP TABLE IF EXISTS activities;
CREATE TABLE IF NOT EXISTS activities (
id BIGSERIAL NOT NULL PRIMARY KEY,
activity_id UUID NOT NULL,
instance_id UUID NOT NULL,
execution_id UUID NOT NULL,
event_type INT NOT NULL,
timestamp timestamptz NOT NULL,
schedule_event_id BIGSERIAL NOT NULL,
attributes BYTEA NOT NULL,
visible_at timestamptz NULL,
locked_until timestamptz NULL,
worker VARCHAR(64) NULL
);

CREATE UNIQUE INDEX idx_activities_instance_id_execution_id_activity_id_worker ON activities (instance_id, execution_id, activity_id, worker);
CREATE INDEX idx_activities_locked_until on activities (locked_until);
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
ALTER TABLE activities ALTER COLUMN attributes TYPE BYTEA, ALTER COLUMN attributes SET NOT NULL;
ALTER TABLE history ALTER COLUMN attributes TYPE BYTEA, ALTER COLUMN attributes SET NOT NULL;
ALTER TABLE pending_events ALTER COLUMN attributes TYPE BYTEA, ALTER COLUMN attributes SET NOT NULL;
3 changes: 3 additions & 0 deletions backend/postgres/db/migrations/000002_attributes_blob.up.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
ALTER TABLE activities ALTER COLUMN attributes TYPE BYTEA, ALTER COLUMN attributes SET NOT NULL;
ALTER TABLE history ALTER COLUMN attributes TYPE BYTEA, ALTER COLUMN attributes SET NOT NULL;
ALTER TABLE pending_events ALTER COLUMN attributes TYPE BYTEA, ALTER COLUMN attributes SET NOT NULL;
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
ALTER TABLE activities ADD COLUMN attributes BYTEA, ALTER COLUMN attributes SET NULL;
UPDATE activities SET attributes = attributes.data FROM attributes WHERE activities.event_id = attributes.event_id AND activities.instance_id = attributes.instance_id AND activities.execution_id = attributes.execution_id;
ALTER TABLE activities ALTER COLUMN attributes TYPE BYTEA, ALTER COLUMN attributes SET NOT NULL;

ALTER TABLE history ADD COLUMN attributes BYTEA, ALTER COLUMN attributes SET NULL;
UPDATE history SET attributes = attributes.data FROM attributes WHERE history.event_id = attributes.event_id AND history.instance_id = attributes.instance_id AND history.execution_id = attributes.execution_id;
ALTER TABLE history ALTER COLUMN attributes TYPE BYTEA, ALTER COLUMN attributes SET NOT NULL;

ALTER TABLE pending_events ADD COLUMN attributes BYTEA, ALTER COLUMN attributes SET NULL;
UPDATE pending_events SET attributes = attributes.data FROM attributes WHERE pending_events.event_id = attributes.event_id AND pending_events.instance_id = attributes.instance_id AND pending_events.execution_id = attributes.execution_id;
ALTER TABLE pending_events ALTER COLUMN attributes TYPE BYTEA, ALTER COLUMN attributes SET NOT NULL;


-- Drop attributes table
DROP TABLE attributes;
24 changes: 24 additions & 0 deletions backend/postgres/db/migrations/000003_add_attributes_table.up.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
DROP TABLE IF EXISTS attributes;

CREATE TABLE attributes (
id BIGSERIAL NOT NULL PRIMARY KEY,
event_id UUID NOT NULL,
instance_id UUID NOT NULL,
execution_id UUID NOT NULL,
data BYTEA NOT NULL
);

CREATE UNIQUE INDEX idx_attributes_instance_id_execution_id_event_id on attributes (instance_id, execution_id, event_id);
CREATE INDEX idx_attributes_event_id on attributes (event_id);

-- Move activity attributes to attributes table
INSERT INTO attributes (event_id, instance_id, execution_id, data) SELECT activity_id, instance_id, execution_id, attributes FROM activities ON CONFLICT DO NOTHING;
ALTER TABLE activities DROP COLUMN attributes;

-- Move history attributes to attributes table
INSERT INTO attributes (event_id, instance_id, execution_id, data) SELECT event_id, instance_id, execution_id, attributes FROM history ON CONFLICT DO NOTHING;
ALTER TABLE history DROP COLUMN attributes;

-- Move pending_events attributes to attributes table
INSERT INTO attributes (event_id, instance_id, execution_id, data) SELECT event_id, instance_id, execution_id, attributes FROM pending_events ON CONFLICT DO NOTHING;
ALTER TABLE pending_events DROP COLUMN attributes;
119 changes: 119 additions & 0 deletions backend/postgres/diagnostics.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,119 @@
package postgresbackend

import (
"context"
"database/sql"
"time"

"github.com/cschleiden/go-workflows/core"
"github.com/cschleiden/go-workflows/diag"
)

var _ diag.Backend = (*postgresBackend)(nil)

func (mb *postgresBackend) GetWorkflowInstances(ctx context.Context, afterInstanceID, afterExecutionID string, count int) ([]*diag.WorkflowInstanceRef, error) {
var err error
tx, err := mb.db.BeginTx(ctx, nil)
if err != nil {
return nil, err
}
defer tx.Rollback()

var rows *sql.Rows
if afterInstanceID != "" {
rows, err = tx.QueryContext(
ctx,
SQLReplacer(`SELECT i.instance_id, i.execution_id, i.created_at, i.completed_at
FROM instances i
INNER JOIN (SELECT instance_id, created_at FROM instances WHERE id = ? AND execution_id = ?) ii
ON i.created_at < ii.created_at OR (i.created_at = ii.created_at AND i.instance_id < ii.instance_id)
ORDER BY i.created_at DESC, i.instance_id DESC
LIMIT ?`),
afterInstanceID,
afterExecutionID,
count,
)
} else {
rows, err = tx.QueryContext(
ctx,
SQLReplacer(`SELECT i.instance_id, i.execution_id, i.created_at, i.completed_at
FROM instances i
ORDER BY i.created_at DESC, i.instance_id DESC
LIMIT ?`),
count,
)
}
if err != nil {
return nil, err
}

defer rows.Close()

var instances []*diag.WorkflowInstanceRef

for rows.Next() {
var id, executionID string
var createdAt time.Time
var completedAt *time.Time
err = rows.Scan(&id, &executionID, &createdAt, &completedAt)
if err != nil {
return nil, err
}

var state core.WorkflowInstanceState
if completedAt != nil {
state = core.WorkflowInstanceStateFinished
}

instances = append(instances, &diag.WorkflowInstanceRef{
Instance: core.NewWorkflowInstance(id, executionID),
CreatedAt: createdAt,
CompletedAt: completedAt,
State: state,
})
}

return instances, nil
}

func (mb *postgresBackend) GetWorkflowInstance(ctx context.Context, instance *core.WorkflowInstance) (*diag.WorkflowInstanceRef, error) {
tx, err := mb.db.BeginTx(ctx, nil)
if err != nil {
return nil, err
}
defer tx.Rollback()

res := tx.QueryRowContext(
ctx,
SQLReplacer("SELECT instance_id, execution_id, created_at, completed_at FROM instances WHERE instance_id = ? AND execution_id = ?"), instance.InstanceID, instance.ExecutionID)

var id, executionID string
var createdAt time.Time
var completedAt *time.Time

err = res.Scan(&id, &executionID, &createdAt, &completedAt)
if err != nil {
if err == sql.ErrNoRows {
return nil, nil
}

return nil, err
}

var state core.WorkflowInstanceState
if completedAt != nil {
state = core.WorkflowInstanceStateFinished
}

return &diag.WorkflowInstanceRef{
Instance: core.NewWorkflowInstance(id, executionID),
CreatedAt: createdAt,
CompletedAt: completedAt,
State: state,
}, nil
}

func (mb *postgresBackend) GetWorkflowTree(ctx context.Context, instance *core.WorkflowInstance) (*diag.WorkflowInstanceTree, error) {
itb := diag.NewInstanceTreeBuilder(mb)
return itb.BuildWorkflowInstanceTree(ctx, instance)
}
84 changes: 84 additions & 0 deletions backend/postgres/events.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,84 @@
package postgresbackend

import (
"context"
"database/sql"
"fmt"
"strings"

"github.com/cschleiden/go-workflows/backend/history"
"github.com/cschleiden/go-workflows/core"
)

func insertPendingEvents(ctx context.Context, tx *sql.Tx, instance *core.WorkflowInstance, newEvents []*history.Event) error {
return insertEvents(ctx, tx, "pending_events", instance, newEvents)
}

func insertHistoryEvents(ctx context.Context, tx *sql.Tx, instance *core.WorkflowInstance, historyEvents []*history.Event) error {
return insertEvents(ctx, tx, "history", instance, historyEvents)
}

func insertEvents(ctx context.Context, tx *sql.Tx, tableName string, instance *core.WorkflowInstance, events []*history.Event) error {
const batchSize = 20
for batchStart := 0; batchStart < len(events); batchStart += batchSize {
batchEnd := batchStart + batchSize
if batchEnd > len(events) {
batchEnd = len(events)
}
batchEvents := events[batchStart:batchEnd]

aquery := "INSERT INTO attributes (event_id, instance_id, execution_id, data) VALUES (?, ?, ?, ?)" + strings.Repeat(", (?, ?, ?, ?)", len(batchEvents)-1) + " ON CONFLICT DO NOTHING"
aargs := make([]interface{}, 0, len(batchEvents)*4)

query := "INSERT INTO " + tableName +
" (event_id, sequence_id, instance_id, execution_id, event_type, timestamp, schedule_event_id, visible_at) VALUES (?, ?, ?, ?, ?, ?, ?, ?)" +
strings.Repeat(", (?, ?, ?, ?, ?, ?, ?, ?)", len(batchEvents)-1)

args := make([]interface{}, 0, len(batchEvents)*8)

for _, newEvent := range batchEvents {
a, err := history.SerializeAttributes(newEvent.Attributes)
if err != nil {
return err
}

aargs = append(aargs, newEvent.ID, instance.InstanceID, instance.ExecutionID, a)

args = append(
args,
newEvent.ID, newEvent.SequenceID, instance.InstanceID, instance.ExecutionID, newEvent.Type, newEvent.Timestamp, newEvent.ScheduleEventID, newEvent.VisibleAt)
}

if _, err := tx.ExecContext(
ctx,
SQLReplacer(aquery),
aargs...,
); err != nil {
return fmt.Errorf("inserting attributes: %w", err)
}

_, err := tx.ExecContext(
ctx,
SQLReplacer(query),
args...,
)
if err != nil {
return fmt.Errorf("inserting events: %w", err)
}
}

return nil
}

func removeFutureEvent(ctx context.Context, tx *sql.Tx, instance *core.WorkflowInstance, scheduleEventID int64) error {
_, err := tx.ExecContext(
ctx,
SQLReplacer(
"DELETE pending_events, attributes FROM pending_events INNER JOIN attributes ON pending_events.event_id = attributes.event_id WHERE pending_events.instance_id = ? AND pending_events.execution_id = ? AND pending_events.schedule_event_id = ? AND pending_events.visible_at IS NOT NULL"),
instance.InstanceID,
instance.ExecutionID,
scheduleEventID,
)

return err
}
40 changes: 40 additions & 0 deletions backend/postgres/options.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
package postgresbackend

import (
"database/sql"

"github.com/cschleiden/go-workflows/backend"
)

type options struct {
backend.Options

PostgreSQLOptions func(db *sql.DB)

// ApplyMigrations automatically applies database migrations on startup.
ApplyMigrations bool
}

type option func(*options)

// WithApplyMigrations automatically applies database migrations on startup.
func WithApplyMigrations(applyMigrations bool) option {
return func(o *options) {
o.ApplyMigrations = applyMigrations
}
}

func WithPostgreSQLOptions(f func(db *sql.DB)) option {
return func(o *options) {
o.PostgreSQLOptions = f
}
}

// WithBackendOptions allows to pass generic backend options.
func WithBackendOptions(opts ...backend.BackendOption) option {
return func(o *options) {
for _, opt := range opts {
opt(&o.Options)
}
}
}
Loading