From d118c072017f2c20d084553c026dd3ff7e3645f8 Mon Sep 17 00:00:00 2001 From: Bolek Kulbabinski <1416262+bolekk@users.noreply.github.com> Date: Wed, 13 Nov 2024 07:45:25 -0800 Subject: [PATCH] Prune workfllw DB entries --- core/services/chainlink/application.go | 1 + core/services/workflows/delegate.go | 1 + core/services/workflows/store/store_db.go | 69 +++++++++++++++++++++-- 3 files changed, 67 insertions(+), 4 deletions(-) diff --git a/core/services/chainlink/application.go b/core/services/chainlink/application.go index 0b2352f67d4..abbe9dad9ab 100644 --- a/core/services/chainlink/application.go +++ b/core/services/chainlink/application.go @@ -400,6 +400,7 @@ func NewApplication(opts ApplicationOpts) (Application, error) { streamRegistry = streams.NewRegistry(globalLogger, pipelineRunner) workflowORM = workflowstore.NewDBStore(opts.DS, globalLogger, clockwork.NewRealClock()) ) + srvcs = append(srvcs, workflowORM) promReporter := headreporter.NewPrometheusReporter(opts.DS, legacyEVMChains) chainIDs := make([]*big.Int, legacyEVMChains.Len()) diff --git a/core/services/workflows/delegate.go b/core/services/workflows/delegate.go index 72aff3033d0..21b370dca64 100644 --- a/core/services/workflows/delegate.go +++ b/core/services/workflows/delegate.go @@ -75,6 +75,7 @@ func (d *Delegate) ServicesForSpec(ctx context.Context, spec job.Job) ([]job.Ser if err != nil { return nil, err } + d.logger.Infow("Creating Workflow Engine for workflow spec", "workflowID", spec.WorkflowSpec.WorkflowID, "workflowOwner", spec.WorkflowSpec.WorkflowOwner, "workflowName", spec.WorkflowSpec.WorkflowName) return []job.ServiceCtx{engine}, nil } diff --git a/core/services/workflows/store/store_db.go b/core/services/workflows/store/store_db.go index 926dd06d09b..1511e11aaf1 100644 --- a/core/services/workflows/store/store_db.go +++ b/core/services/workflows/store/store_db.go @@ -4,6 +4,7 @@ import ( "context" "errors" "fmt" + "sync" "time" "google.golang.org/protobuf/proto" @@ -15,17 +16,24 @@ import ( "github.com/smartcontractkit/chainlink-common/pkg/values" valuespb "github.com/smartcontractkit/chainlink-common/pkg/values/pb" + commonservices "github.com/smartcontractkit/chainlink-common/pkg/services" "github.com/smartcontractkit/chainlink/v2/core/logger" + "github.com/smartcontractkit/chainlink/v2/core/services" ) // `DBStore` is a postgres-backed // data store that persists workflow progress. type DBStore struct { - lggr logger.Logger - db sqlutil.DataSource - clock clockwork.Clock + commonservices.StateMachine + lggr logger.Logger + db sqlutil.DataSource + shutdownWaitGroup sync.WaitGroup + chStop commonservices.StopChan + clock clockwork.Clock } +var _ services.ServiceCtx = (*DBStore)(nil) + // `workflowExecutionRow` describes a row // of the `workflow_executions` table type workflowExecutionRow struct { @@ -70,6 +78,59 @@ type workflowExecutionWithStep struct { WEFinishedAt *time.Time `db:"we_finished_at"` } +func (d *DBStore) Start(context.Context) error { + return d.StartOnce("DBStore", func() error { + d.shutdownWaitGroup.Add(1) + go d.pruneDBEntries() + return nil + }) +} + +func (d *DBStore) Close() error { + return d.StopOnce("DBStore", func() error { + close(d.chStop) + d.shutdownWaitGroup.Wait() + return nil + }) +} + +func (d *DBStore) pruneDBEntries() { + defer d.shutdownWaitGroup.Done() + ticker := time.NewTicker(10 * time.Second) + defer ticker.Stop() + for { + select { + case <-d.chStop: + return + case <-ticker.C: + ctx, cancel := d.chStop.CtxWithTimeout(60 * time.Second) + err := sqlutil.TransactDataSource(ctx, d.db, nil, func(tx sqlutil.DataSource) error { + stmt := "DELETE FROM workflow_executions WHERE (id) IN (SELECT id FROM workflow_executions WHERE (created_at < now() - interval '2 hours') LIMIT 5000);" + _, err := tx.ExecContext(ctx, stmt) + return err + }) + if err != nil { + d.lggr.Errorw("Failed to prune workflow_executions", "err", err) + } else { + d.lggr.Infow("Pruned up to 5k entries") + } + cancel() + } + } +} + +func (d *DBStore) Ready() error { + return nil +} + +func (d *DBStore) HealthReport() map[string]error { + return map[string]error{d.Name(): d.Healthy()} +} + +func (d *DBStore) Name() string { + return d.lggr.Name() +} + // `UpdateStatus` updates the status of the given workflow execution func (d *DBStore) UpdateStatus(ctx context.Context, executionID string, status string) error { sql := `UPDATE workflow_executions SET status = $1, updated_at = $2 WHERE id = $3` @@ -407,5 +468,5 @@ func (d *DBStore) GetUnfinished(ctx context.Context, workflowID string, offset, } func NewDBStore(ds sqlutil.DataSource, lggr logger.Logger, clock clockwork.Clock) *DBStore { - return &DBStore{db: ds, lggr: lggr.Named("WorkflowDBStore"), clock: clock} + return &DBStore{db: ds, lggr: lggr.Named("WorkflowDBStore"), clock: clock, chStop: make(chan struct{})} }