Skip to content

Commit

Permalink
Merge pull request #373 from cschleiden/max-history-size
Browse files Browse the repository at this point in the history
Support max history size
  • Loading branch information
cschleiden authored Nov 11, 2024
2 parents 4973be8 + b99bd5a commit 1f6bb05
Show file tree
Hide file tree
Showing 9 changed files with 87 additions and 10 deletions.
5 changes: 5 additions & 0 deletions backend/metrics/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,13 +5,18 @@ import "time"
type Tags map[string]string

type Client interface {
//Counter records a value at a point in time.
Counter(name string, tags Tags, value int64)

// Distribution records a value at a point in time.
Distribution(name string, tags Tags, value float64)

// Gauge records a value at a point in time.
Gauge(name string, tags Tags, value int64)

// Timing records the duration of an event.
Timing(name string, tags Tags, duration time.Duration)

// WithTags returns a new client with the given tags applied to all metrics.
WithTags(tags Tags) Client
}
11 changes: 11 additions & 0 deletions backend/options.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,9 @@ type Options struct {
// removed immediately, including their history. If set to false, the instance will be removed after the configured
// retention period or never.
RemoveContinuedAsNewInstances bool

// MaxHistorySize is the maximum size of a workflow history. If a workflow exceeds this size, it will be failed.
MaxHistorySize int64
}

var DefaultOptions Options = Options{
Expand All @@ -58,6 +61,8 @@ var DefaultOptions Options = Options{
ContextPropagators: []workflow.ContextPropagator{&propagators.TracingContextPropagator{}},

RemoveContinuedAsNewInstances: false,

MaxHistorySize: 10_000,
}

type BackendOption func(*Options)
Expand Down Expand Up @@ -104,6 +109,12 @@ func WithRemoveContinuedAsNewInstances() BackendOption {
}
}

func WithMaxHistorySize(size int64) BackendOption {
return func(o *Options) {
o.MaxHistorySize = size
}
}

func ApplyOptions(opts ...BackendOption) *Options {
options := DefaultOptions

Expand Down
28 changes: 28 additions & 0 deletions backend/test/e2e.go
Original file line number Diff line number Diff line change
Expand Up @@ -613,6 +613,34 @@ func EndToEndBackendTest(t *testing.T, setup func(options ...backend.BackendOpti
require.Equal(t, "hello-23", r)
},
},
{
name: "MaxHistorySize",
options: []backend.BackendOption{backend.WithMaxHistorySize(2)},
f: func(t *testing.T, ctx context.Context, c *client.Client, w *worker.Worker, b TestBackend) {
b.Options()

a := func(ctx context.Context) (int, error) {
return 0, nil
}

wf := func(ctx workflow.Context) (int, error) {
for i := 0; i < 10; i++ {
_, err := workflow.ExecuteActivity[int](ctx, workflow.DefaultActivityOptions, a).Get(ctx)
if err != nil {
return 0, err
}
}

return 42, nil
}
register(t, ctx, w, []interface{}{wf}, nil)

instance := runWorkflow(t, ctx, c, wf)
_, err := client.GetWorkflowResult[int](ctx, c, instance, time.Second*5)
require.Error(t, err)
require.EqualError(t, err, "workflow history size exceeded 2 events")
},
},
}

tests = append(tests, e2eActivityTests...)
Expand Down
1 change: 1 addition & 0 deletions internal/worker/workflow.go
Original file line number Diff line number Diff line change
Expand Up @@ -180,6 +180,7 @@ func (wtw *WorkflowTaskWorker) getExecutor(ctx context.Context, t *backend.Workf
t.WorkflowInstance,
t.Metadata,
clock.New(),
wtw.backend.Options().MaxHistorySize,
)
if err != nil {
return nil, fmt.Errorf("creating workflow task executor: %w", err)
Expand Down
17 changes: 12 additions & 5 deletions tester/options.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,11 +9,12 @@ import (
)

type options struct {
TestTimeout time.Duration
Logger *slog.Logger
Converter converter.Converter
Propagators []workflow.ContextPropagator
InitialTime time.Time
TestTimeout time.Duration
Logger *slog.Logger
Converter converter.Converter
Propagators []workflow.ContextPropagator
InitialTime time.Time
MaxHistorySize int64
}

type WorkflowTesterOption func(*options)
Expand Down Expand Up @@ -47,3 +48,9 @@ func WithInitialTime(t time.Time) WorkflowTesterOption {
o.InitialTime = t
}
}

func WithMaxHistorySize(size int64) WorkflowTesterOption {
return func(o *options) {
o.MaxHistorySize = size
}
}
20 changes: 16 additions & 4 deletions tester/tester.go
Original file line number Diff line number Diff line change
Expand Up @@ -194,9 +194,10 @@ func NewWorkflowTester[TResult any](workflow workflow.Workflow, opts ...Workflow
}

options := &options{
TestTimeout: time.Second * 10,
Logger: slog.Default(),
Converter: converter.DefaultConverter,
TestTimeout: time.Second * 10,
Logger: slog.Default(),
Converter: converter.DefaultConverter,
MaxHistorySize: 10_000,
}

for _, o := range opts {
Expand Down Expand Up @@ -357,7 +358,18 @@ func (wt *workflowTester[TResult]) Execute(ctx context.Context, args ...any) {
tw.pendingEvents = tw.pendingEvents[:0]

// Execute task
e, err := executor.NewExecutor(wt.logger, wt.tracer, wt.registry, wt.converter, wt.propagators, &testHistoryProvider{tw.history}, tw.instance, tw.metadata, wt.clock)
e, err := executor.NewExecutor(
wt.logger,
wt.tracer,
wt.registry,
wt.converter,
wt.propagators,
&testHistoryProvider{tw.history},
tw.instance,
tw.metadata,
wt.clock,
wt.options.MaxHistorySize,
)
if err != nil {
panic(fmt.Errorf("could not create workflow executor: %v", err))
}
Expand Down
4 changes: 4 additions & 0 deletions workflow/executor/cache/cache_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,13 +31,15 @@ func Test_Cache_StoreAndGet(t *testing.T) {
e, err := executor.NewExecutor(
slog.Default(), noop.NewTracerProvider().Tracer(backend.TracerName), r, converter.DefaultConverter,
[]workflow.ContextPropagator{}, &testHistoryProvider{}, i, &metadata.WorkflowMetadata{}, clock.New(),
10_000,
)
require.NoError(t, err)

i2 := core.NewWorkflowInstance("instanceID2", "executionID2")
e2, err := executor.NewExecutor(
slog.Default(), noop.NewTracerProvider().Tracer(backend.TracerName), r, converter.DefaultConverter,
[]workflow.ContextPropagator{}, &testHistoryProvider{}, i, &metadata.WorkflowMetadata{}, clock.New(),
10_000,
)
require.NoError(t, err)

Expand Down Expand Up @@ -72,6 +74,7 @@ func Test_Cache_AutoEviction(t *testing.T) {
slog.Default(), noop.NewTracerProvider().Tracer(backend.TracerName), r,
converter.DefaultConverter, []workflow.ContextPropagator{}, &testHistoryProvider{}, i,
&metadata.WorkflowMetadata{}, clock.New(),
10_000,
)
require.NoError(t, err)

Expand Down Expand Up @@ -102,6 +105,7 @@ func Test_Cache_Evict(t *testing.T) {
slog.Default(), noop.NewTracerProvider().Tracer(backend.TracerName), r,
converter.DefaultConverter, []workflow.ContextPropagator{}, &testHistoryProvider{}, i,
&metadata.WorkflowMetadata{}, clock.New(),
10_000,
)
require.NoError(t, err)

Expand Down
9 changes: 9 additions & 0 deletions workflow/executor/executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,8 @@ type executor struct {

parentSpan trace.Span
workflowSpan trace.Span

maxHistorySize int64
}

func NewExecutor(
Expand All @@ -86,6 +88,7 @@ func NewExecutor(
instance *core.WorkflowInstance,
metadata *metadata.WorkflowMetadata,
clock clock.Clock,
maxHistorySize int64,
) (WorkflowExecutor, error) {
s := workflowstate.NewWorkflowState(instance, logger, tracer, clock)

Expand Down Expand Up @@ -118,6 +121,7 @@ func NewExecutor(
workflowCtxCancel: cancel,
cv: cv,
clock: clock,
maxHistorySize: maxHistorySize,
logger: logger,
tracer: tracer,
}, nil
Expand Down Expand Up @@ -170,6 +174,11 @@ func (e *executor) ExecuteTask(ctx context.Context, t *backend.WorkflowTask) (*E
}
}

// Enforce max history size limit
if e.lastSequenceID+int64(len(executedEvents)) >= e.maxHistorySize {
e.workflowCompleted(nil, fmt.Errorf("workflow history size exceeded %d events", e.maxHistorySize))
}

// Process any commands added while executing new events
state := core.WorkflowInstanceStateActive
newCommandEvents := make([]*history.Event, 0)
Expand Down
2 changes: 1 addition & 1 deletion workflow/executor/executor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ func newExecutor(r *registry.Registry, i *core.WorkflowInstance, historyProvider
logger := slog.Default()
tracer := noop.NewTracerProvider().Tracer("test")

e, err := NewExecutor(logger, tracer, r, converter.DefaultConverter, []wf.ContextPropagator{}, historyProvider, i, &metadata.WorkflowMetadata{}, clock.New())
e, err := NewExecutor(logger, tracer, r, converter.DefaultConverter, []wf.ContextPropagator{}, historyProvider, i, &metadata.WorkflowMetadata{}, clock.New(), 10_000)

return e.(*executor), err
}
Expand Down

0 comments on commit 1f6bb05

Please sign in to comment.