diff --git a/backend/metrics/metrics.go b/backend/metrics/metrics.go index 4bf6b865..b6e37236 100644 --- a/backend/metrics/metrics.go +++ b/backend/metrics/metrics.go @@ -5,13 +5,18 @@ import "time" type Tags map[string]string type Client interface { + //Counter records a value at a point in time. Counter(name string, tags Tags, value int64) + // Distribution records a value at a point in time. Distribution(name string, tags Tags, value float64) + // Gauge records a value at a point in time. Gauge(name string, tags Tags, value int64) + // Timing records the duration of an event. Timing(name string, tags Tags, duration time.Duration) + // WithTags returns a new client with the given tags applied to all metrics. WithTags(tags Tags) Client } diff --git a/backend/options.go b/backend/options.go index 467abefb..972b4d15 100644 --- a/backend/options.go +++ b/backend/options.go @@ -43,6 +43,9 @@ type Options struct { // removed immediately, including their history. If set to false, the instance will be removed after the configured // retention period or never. RemoveContinuedAsNewInstances bool + + // MaxHistorySize is the maximum size of a workflow history. If a workflow exceeds this size, it will be failed. + MaxHistorySize int64 } var DefaultOptions Options = Options{ @@ -58,6 +61,8 @@ var DefaultOptions Options = Options{ ContextPropagators: []workflow.ContextPropagator{&propagators.TracingContextPropagator{}}, RemoveContinuedAsNewInstances: false, + + MaxHistorySize: 10_000, } type BackendOption func(*Options) @@ -104,6 +109,12 @@ func WithRemoveContinuedAsNewInstances() BackendOption { } } +func WithMaxHistorySize(size int64) BackendOption { + return func(o *Options) { + o.MaxHistorySize = size + } +} + func ApplyOptions(opts ...BackendOption) *Options { options := DefaultOptions diff --git a/backend/test/e2e.go b/backend/test/e2e.go index 642264f5..2d8339dc 100644 --- a/backend/test/e2e.go +++ b/backend/test/e2e.go @@ -613,6 +613,34 @@ func EndToEndBackendTest(t *testing.T, setup func(options ...backend.BackendOpti require.Equal(t, "hello-23", r) }, }, + { + name: "MaxHistorySize", + options: []backend.BackendOption{backend.WithMaxHistorySize(2)}, + f: func(t *testing.T, ctx context.Context, c *client.Client, w *worker.Worker, b TestBackend) { + b.Options() + + a := func(ctx context.Context) (int, error) { + return 0, nil + } + + wf := func(ctx workflow.Context) (int, error) { + for i := 0; i < 10; i++ { + _, err := workflow.ExecuteActivity[int](ctx, workflow.DefaultActivityOptions, a).Get(ctx) + if err != nil { + return 0, err + } + } + + return 42, nil + } + register(t, ctx, w, []interface{}{wf}, nil) + + instance := runWorkflow(t, ctx, c, wf) + _, err := client.GetWorkflowResult[int](ctx, c, instance, time.Second*5) + require.Error(t, err) + require.EqualError(t, err, "workflow history size exceeded 2 events") + }, + }, } tests = append(tests, e2eActivityTests...) diff --git a/internal/worker/workflow.go b/internal/worker/workflow.go index e966c3c5..cfcfd197 100644 --- a/internal/worker/workflow.go +++ b/internal/worker/workflow.go @@ -180,6 +180,7 @@ func (wtw *WorkflowTaskWorker) getExecutor(ctx context.Context, t *backend.Workf t.WorkflowInstance, t.Metadata, clock.New(), + wtw.backend.Options().MaxHistorySize, ) if err != nil { return nil, fmt.Errorf("creating workflow task executor: %w", err) diff --git a/tester/options.go b/tester/options.go index ed7a904c..0abfe4b0 100644 --- a/tester/options.go +++ b/tester/options.go @@ -9,11 +9,12 @@ import ( ) type options struct { - TestTimeout time.Duration - Logger *slog.Logger - Converter converter.Converter - Propagators []workflow.ContextPropagator - InitialTime time.Time + TestTimeout time.Duration + Logger *slog.Logger + Converter converter.Converter + Propagators []workflow.ContextPropagator + InitialTime time.Time + MaxHistorySize int64 } type WorkflowTesterOption func(*options) @@ -47,3 +48,9 @@ func WithInitialTime(t time.Time) WorkflowTesterOption { o.InitialTime = t } } + +func WithMaxHistorySize(size int64) WorkflowTesterOption { + return func(o *options) { + o.MaxHistorySize = size + } +} diff --git a/tester/tester.go b/tester/tester.go index 0fdf04c1..4ab3df32 100644 --- a/tester/tester.go +++ b/tester/tester.go @@ -194,9 +194,10 @@ func NewWorkflowTester[TResult any](workflow workflow.Workflow, opts ...Workflow } options := &options{ - TestTimeout: time.Second * 10, - Logger: slog.Default(), - Converter: converter.DefaultConverter, + TestTimeout: time.Second * 10, + Logger: slog.Default(), + Converter: converter.DefaultConverter, + MaxHistorySize: 10_000, } for _, o := range opts { @@ -357,7 +358,18 @@ func (wt *workflowTester[TResult]) Execute(ctx context.Context, args ...any) { tw.pendingEvents = tw.pendingEvents[:0] // Execute task - e, err := executor.NewExecutor(wt.logger, wt.tracer, wt.registry, wt.converter, wt.propagators, &testHistoryProvider{tw.history}, tw.instance, tw.metadata, wt.clock) + e, err := executor.NewExecutor( + wt.logger, + wt.tracer, + wt.registry, + wt.converter, + wt.propagators, + &testHistoryProvider{tw.history}, + tw.instance, + tw.metadata, + wt.clock, + wt.options.MaxHistorySize, + ) if err != nil { panic(fmt.Errorf("could not create workflow executor: %v", err)) } diff --git a/workflow/executor/cache/cache_test.go b/workflow/executor/cache/cache_test.go index 199a3a6a..fc171638 100644 --- a/workflow/executor/cache/cache_test.go +++ b/workflow/executor/cache/cache_test.go @@ -31,6 +31,7 @@ func Test_Cache_StoreAndGet(t *testing.T) { e, err := executor.NewExecutor( slog.Default(), noop.NewTracerProvider().Tracer(backend.TracerName), r, converter.DefaultConverter, []workflow.ContextPropagator{}, &testHistoryProvider{}, i, &metadata.WorkflowMetadata{}, clock.New(), + 10_000, ) require.NoError(t, err) @@ -38,6 +39,7 @@ func Test_Cache_StoreAndGet(t *testing.T) { e2, err := executor.NewExecutor( slog.Default(), noop.NewTracerProvider().Tracer(backend.TracerName), r, converter.DefaultConverter, []workflow.ContextPropagator{}, &testHistoryProvider{}, i, &metadata.WorkflowMetadata{}, clock.New(), + 10_000, ) require.NoError(t, err) @@ -72,6 +74,7 @@ func Test_Cache_AutoEviction(t *testing.T) { slog.Default(), noop.NewTracerProvider().Tracer(backend.TracerName), r, converter.DefaultConverter, []workflow.ContextPropagator{}, &testHistoryProvider{}, i, &metadata.WorkflowMetadata{}, clock.New(), + 10_000, ) require.NoError(t, err) @@ -102,6 +105,7 @@ func Test_Cache_Evict(t *testing.T) { slog.Default(), noop.NewTracerProvider().Tracer(backend.TracerName), r, converter.DefaultConverter, []workflow.ContextPropagator{}, &testHistoryProvider{}, i, &metadata.WorkflowMetadata{}, clock.New(), + 10_000, ) require.NoError(t, err) diff --git a/workflow/executor/executor.go b/workflow/executor/executor.go index e2e72455..116d65e0 100644 --- a/workflow/executor/executor.go +++ b/workflow/executor/executor.go @@ -74,6 +74,8 @@ type executor struct { parentSpan trace.Span workflowSpan trace.Span + + maxHistorySize int64 } func NewExecutor( @@ -86,6 +88,7 @@ func NewExecutor( instance *core.WorkflowInstance, metadata *metadata.WorkflowMetadata, clock clock.Clock, + maxHistorySize int64, ) (WorkflowExecutor, error) { s := workflowstate.NewWorkflowState(instance, logger, tracer, clock) @@ -118,6 +121,7 @@ func NewExecutor( workflowCtxCancel: cancel, cv: cv, clock: clock, + maxHistorySize: maxHistorySize, logger: logger, tracer: tracer, }, nil @@ -170,6 +174,11 @@ func (e *executor) ExecuteTask(ctx context.Context, t *backend.WorkflowTask) (*E } } + // Enforce max history size limit + if e.lastSequenceID+int64(len(executedEvents)) >= e.maxHistorySize { + e.workflowCompleted(nil, fmt.Errorf("workflow history size exceeded %d events", e.maxHistorySize)) + } + // Process any commands added while executing new events state := core.WorkflowInstanceStateActive newCommandEvents := make([]*history.Event, 0) diff --git a/workflow/executor/executor_test.go b/workflow/executor/executor_test.go index 994d338b..24b5d17b 100644 --- a/workflow/executor/executor_test.go +++ b/workflow/executor/executor_test.go @@ -39,7 +39,7 @@ func newExecutor(r *registry.Registry, i *core.WorkflowInstance, historyProvider logger := slog.Default() tracer := noop.NewTracerProvider().Tracer("test") - e, err := NewExecutor(logger, tracer, r, converter.DefaultConverter, []wf.ContextPropagator{}, historyProvider, i, &metadata.WorkflowMetadata{}, clock.New()) + e, err := NewExecutor(logger, tracer, r, converter.DefaultConverter, []wf.ContextPropagator{}, historyProvider, i, &metadata.WorkflowMetadata{}, clock.New(), 10_000) return e.(*executor), err }