From 411380a38cf2712c014b122cebf1128a9ff01371 Mon Sep 17 00:00:00 2001
From: Bolek Kulbabinski <1416262+bolekk@users.noreply.github.com>
Date: Wed, 13 Nov 2024 07:45:25 -0800
Subject: [PATCH] Prune workfllw DB entries
---
core/services/chainlink/application.go | 1 +
core/services/workflows/delegate.go | 1 +
core/services/workflows/store/store_db.go | 72 +++++++++++++++++++++--
core/web/testdata/body/health.html | 4 ++
core/web/testdata/body/health.json | 9 +++
testdata/scripts/health/default.txtar | 10 ++++
testdata/scripts/health/multi-chain.txtar | 10 ++++
7 files changed, 103 insertions(+), 4 deletions(-)
diff --git a/core/services/chainlink/application.go b/core/services/chainlink/application.go
index 0b2352f67d4..abbe9dad9ab 100644
--- a/core/services/chainlink/application.go
+++ b/core/services/chainlink/application.go
@@ -400,6 +400,7 @@ func NewApplication(opts ApplicationOpts) (Application, error) {
streamRegistry = streams.NewRegistry(globalLogger, pipelineRunner)
workflowORM = workflowstore.NewDBStore(opts.DS, globalLogger, clockwork.NewRealClock())
)
+ srvcs = append(srvcs, workflowORM)
promReporter := headreporter.NewPrometheusReporter(opts.DS, legacyEVMChains)
chainIDs := make([]*big.Int, legacyEVMChains.Len())
diff --git a/core/services/workflows/delegate.go b/core/services/workflows/delegate.go
index 72aff3033d0..1db26729ca6 100644
--- a/core/services/workflows/delegate.go
+++ b/core/services/workflows/delegate.go
@@ -75,6 +75,7 @@ func (d *Delegate) ServicesForSpec(ctx context.Context, spec job.Job) ([]job.Ser
if err != nil {
return nil, err
}
+ d.logger.Infow("Creating Workflow Engine for workflow spec", "workflowID", spec.WorkflowSpec.WorkflowID, "workflowOwner", spec.WorkflowSpec.WorkflowOwner, "workflowName", spec.WorkflowSpec.WorkflowName, "jobName", spec.Name)
return []job.ServiceCtx{engine}, nil
}
diff --git a/core/services/workflows/store/store_db.go b/core/services/workflows/store/store_db.go
index 926dd06d09b..f15a6928e7e 100644
--- a/core/services/workflows/store/store_db.go
+++ b/core/services/workflows/store/store_db.go
@@ -4,6 +4,7 @@ import (
"context"
"errors"
"fmt"
+ "sync"
"time"
"google.golang.org/protobuf/proto"
@@ -15,17 +16,31 @@ import (
"github.com/smartcontractkit/chainlink-common/pkg/values"
valuespb "github.com/smartcontractkit/chainlink-common/pkg/values/pb"
+ commonservices "github.com/smartcontractkit/chainlink-common/pkg/services"
"github.com/smartcontractkit/chainlink/v2/core/logger"
+ "github.com/smartcontractkit/chainlink/v2/core/services"
+)
+
+const (
+ defaultPruneFrequencySec = 20
+ defaultPruneTimeoutSec = 60
+ defaultPruneRecordAgeHours = 3
+ defaultPruneBatchSize = 3000
)
// `DBStore` is a postgres-backed
// data store that persists workflow progress.
type DBStore struct {
- lggr logger.Logger
- db sqlutil.DataSource
- clock clockwork.Clock
+ commonservices.StateMachine
+ lggr logger.Logger
+ db sqlutil.DataSource
+ shutdownWaitGroup sync.WaitGroup
+ chStop commonservices.StopChan
+ clock clockwork.Clock
}
+var _ services.ServiceCtx = (*DBStore)(nil)
+
// `workflowExecutionRow` describes a row
// of the `workflow_executions` table
type workflowExecutionRow struct {
@@ -70,6 +85,47 @@ type workflowExecutionWithStep struct {
WEFinishedAt *time.Time `db:"we_finished_at"`
}
+func (d *DBStore) Start(context.Context) error {
+ return d.StartOnce("DBStore", func() error {
+ d.shutdownWaitGroup.Add(1)
+ go d.pruneDBEntries()
+ return nil
+ })
+}
+
+func (d *DBStore) Close() error {
+ return d.StopOnce("DBStore", func() error {
+ close(d.chStop)
+ d.shutdownWaitGroup.Wait()
+ return nil
+ })
+}
+
+func (d *DBStore) pruneDBEntries() {
+ defer d.shutdownWaitGroup.Done()
+ ticker := time.NewTicker(defaultPruneFrequencySec * time.Second)
+ defer ticker.Stop()
+ for {
+ select {
+ case <-d.chStop:
+ return
+ case <-ticker.C:
+ ctx, cancel := d.chStop.CtxWithTimeout(defaultPruneTimeoutSec * time.Second)
+ err := sqlutil.TransactDataSource(ctx, d.db, nil, func(tx sqlutil.DataSource) error {
+ stmt := fmt.Sprintf("DELETE FROM workflow_executions WHERE (id) IN (SELECT id FROM workflow_executions WHERE (created_at < now() - interval '%d hours') LIMIT %d);", defaultPruneRecordAgeHours, defaultPruneBatchSize)
+ _, err := tx.ExecContext(ctx, stmt)
+ return err
+ })
+ if err != nil {
+ d.lggr.Errorw("Failed to prune workflow_executions", "err", err)
+ } else {
+ d.lggr.Infow("Pruned oldest workflow_executions", "batchSize", defaultPruneBatchSize, "ageLimitHours", defaultPruneRecordAgeHours)
+ }
+ cancel()
+ }
+ }
+}
+
// `UpdateStatus` updates the status of the given workflow execution
func (d *DBStore) UpdateStatus(ctx context.Context, executionID string, status string) error {
sql := `UPDATE workflow_executions SET status = $1, updated_at = $2 WHERE id = $3`
@@ -407,5 +463,13 @@ func (d *DBStore) GetUnfinished(ctx context.Context, workflowID string, offset,
}
func NewDBStore(ds sqlutil.DataSource, lggr logger.Logger, clock clockwork.Clock) *DBStore {
- return &DBStore{db: ds, lggr: lggr.Named("WorkflowDBStore"), clock: clock}
+ return &DBStore{db: ds, lggr: lggr.Named("WorkflowDBStore"), clock: clock, chStop: make(chan struct{})}
+}
+
+func (d *DBStore) HealthReport() map[string]error {
+ return map[string]error{d.Name(): d.Healthy()}
+}
+
+func (d *DBStore) Name() string {
+ return d.lggr.Name()
}
diff --git a/core/web/testdata/body/health.html b/core/web/testdata/body/health.html
index 2bf427f5e00..5e9a0433139 100644
--- a/core/web/testdata/body/health.html
+++ b/core/web/testdata/body/health.html
@@ -156,3 +156,7 @@
TelemetryManager
+
+ WorkflowDBStore
+
+
diff --git a/core/web/testdata/body/health.json b/core/web/testdata/body/health.json
index d573e0bd5fc..8c4c3b312de 100644
--- a/core/web/testdata/body/health.json
+++ b/core/web/testdata/body/health.json
@@ -287,6 +287,15 @@
"status": "passing",
"output": ""
}
+ },
+ {
+ "type": "checks",
+ "id": "WorkflowDBStore",
+ "attributes": {
+ "name": "WorkflowDBStore",
+ "status": "passing",
+ "output": ""
+ }
}
]
}
diff --git a/testdata/scripts/health/default.txtar b/testdata/scripts/health/default.txtar
index a7db2308e35..73b82bc7e39 100644
--- a/testdata/scripts/health/default.txtar
+++ b/testdata/scripts/health/default.txtar
@@ -41,6 +41,7 @@ ok PipelineRunner
ok PipelineRunner.BridgeCache
ok RetirementReportCache
ok TelemetryManager
+ok WorkflowDBStore
-- out.json --
{
@@ -134,6 +135,15 @@ ok TelemetryManager
"status": "passing",
"output": ""
}
+ },
+ {
+ "type": "checks",
+ "id": "WorkflowDBStore",
+ "attributes": {
+ "name": "WorkflowDBStore",
+ "status": "passing",
+ "output": ""
+ }
}
]
}
diff --git a/testdata/scripts/health/multi-chain.txtar b/testdata/scripts/health/multi-chain.txtar
index f53bbfebf8c..d3a0caf67b5 100644
--- a/testdata/scripts/health/multi-chain.txtar
+++ b/testdata/scripts/health/multi-chain.txtar
@@ -101,6 +101,7 @@ ok StarkNet.Baz.Chain
ok StarkNet.Baz.Relayer
ok StarkNet.Baz.Txm
ok TelemetryManager
+ok WorkflowDBStore
-- out-unhealthy.txt --
! EVM.1.HeadTracker.HeadListener
@@ -396,6 +397,15 @@ ok TelemetryManager
"status": "passing",
"output": ""
}
+ },
+ {
+ "type": "checks",
+ "id": "WorkflowDBStore",
+ "attributes": {
+ "name": "WorkflowDBStore",
+ "status": "passing",
+ "output": ""
+ }
}
]
}