Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Prune workflow DB entries #15226

Merged
merged 1 commit into from
Nov 14, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions core/services/chainlink/application.go
Original file line number Diff line number Diff line change
Expand Up @@ -400,6 +400,7 @@ func NewApplication(opts ApplicationOpts) (Application, error) {
streamRegistry = streams.NewRegistry(globalLogger, pipelineRunner)
workflowORM = workflowstore.NewDBStore(opts.DS, globalLogger, clockwork.NewRealClock())
)
srvcs = append(srvcs, workflowORM)

promReporter := headreporter.NewPrometheusReporter(opts.DS, legacyEVMChains)
chainIDs := make([]*big.Int, legacyEVMChains.Len())
Expand Down
1 change: 1 addition & 0 deletions core/services/workflows/delegate.go
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,7 @@ func (d *Delegate) ServicesForSpec(ctx context.Context, spec job.Job) ([]job.Ser
if err != nil {
return nil, err
}
d.logger.Infow("Creating Workflow Engine for workflow spec", "workflowID", spec.WorkflowSpec.WorkflowID, "workflowOwner", spec.WorkflowSpec.WorkflowOwner, "workflowName", spec.WorkflowSpec.WorkflowName, "jobName", spec.Name)
return []job.ServiceCtx{engine}, nil
}

Expand Down
72 changes: 68 additions & 4 deletions core/services/workflows/store/store_db.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"context"
"errors"
"fmt"
"sync"
"time"

"google.golang.org/protobuf/proto"
Expand All @@ -15,17 +16,31 @@ import (
"github.com/smartcontractkit/chainlink-common/pkg/values"
valuespb "github.com/smartcontractkit/chainlink-common/pkg/values/pb"

commonservices "github.com/smartcontractkit/chainlink-common/pkg/services"
"github.com/smartcontractkit/chainlink/v2/core/logger"
"github.com/smartcontractkit/chainlink/v2/core/services"
)

const (
defaultPruneFrequencySec = 20
defaultPruneTimeoutSec = 60
defaultPruneRecordAgeHours = 3
defaultPruneBatchSize = 3000
)

// `DBStore` is a postgres-backed
// data store that persists workflow progress.
type DBStore struct {
lggr logger.Logger
db sqlutil.DataSource
clock clockwork.Clock
commonservices.StateMachine
lggr logger.Logger
db sqlutil.DataSource
shutdownWaitGroup sync.WaitGroup
chStop commonservices.StopChan
clock clockwork.Clock
}

var _ services.ServiceCtx = (*DBStore)(nil)

// `workflowExecutionRow` describes a row
// of the `workflow_executions` table
type workflowExecutionRow struct {
Expand Down Expand Up @@ -70,6 +85,47 @@ type workflowExecutionWithStep struct {
WEFinishedAt *time.Time `db:"we_finished_at"`
}

func (d *DBStore) Start(context.Context) error {
return d.StartOnce("DBStore", func() error {
d.shutdownWaitGroup.Add(1)
go d.pruneDBEntries()
return nil
})
}

func (d *DBStore) Close() error {
return d.StopOnce("DBStore", func() error {
close(d.chStop)
d.shutdownWaitGroup.Wait()
return nil
})
}

func (d *DBStore) pruneDBEntries() {
defer d.shutdownWaitGroup.Done()
ticker := time.NewTicker(defaultPruneFrequencySec * time.Second)
defer ticker.Stop()
for {
select {
case <-d.chStop:
return
case <-ticker.C:
ctx, cancel := d.chStop.CtxWithTimeout(defaultPruneTimeoutSec * time.Second)
err := sqlutil.TransactDataSource(ctx, d.db, nil, func(tx sqlutil.DataSource) error {
stmt := fmt.Sprintf("DELETE FROM workflow_executions WHERE (id) IN (SELECT id FROM workflow_executions WHERE (created_at < now() - interval '%d hours') LIMIT %d);", defaultPruneRecordAgeHours, defaultPruneBatchSize)
_, err := tx.ExecContext(ctx, stmt)
return err
})
if err != nil {
d.lggr.Errorw("Failed to prune workflow_executions", "err", err)
} else {
d.lggr.Infow("Pruned oldest workflow_executions", "batchSize", defaultPruneBatchSize, "ageLimitHours", defaultPruneRecordAgeHours)
}
cancel()
}
}
}

// `UpdateStatus` updates the status of the given workflow execution
func (d *DBStore) UpdateStatus(ctx context.Context, executionID string, status string) error {
sql := `UPDATE workflow_executions SET status = $1, updated_at = $2 WHERE id = $3`
Expand Down Expand Up @@ -407,5 +463,13 @@ func (d *DBStore) GetUnfinished(ctx context.Context, workflowID string, offset,
}

func NewDBStore(ds sqlutil.DataSource, lggr logger.Logger, clock clockwork.Clock) *DBStore {
return &DBStore{db: ds, lggr: lggr.Named("WorkflowDBStore"), clock: clock}
return &DBStore{db: ds, lggr: lggr.Named("WorkflowDBStore"), clock: clock, chStop: make(chan struct{})}
}

func (d *DBStore) HealthReport() map[string]error {
return map[string]error{d.Name(): d.Healthy()}
}

func (d *DBStore) Name() string {
return d.lggr.Name()
}
4 changes: 4 additions & 0 deletions core/web/testdata/body/health.html
Original file line number Diff line number Diff line change
Expand Up @@ -156,3 +156,7 @@
<details open>
<summary title="TelemetryManager" class="noexpand"><span class="passing">TelemetryManager</span></summary>
</details>
<details open>
<summary title="WorkflowDBStore" class="noexpand"><span class="passing">WorkflowDBStore</span></summary>
</details>

9 changes: 9 additions & 0 deletions core/web/testdata/body/health.json
Original file line number Diff line number Diff line change
Expand Up @@ -287,6 +287,15 @@
"status": "passing",
"output": ""
}
},
{
"type": "checks",
"id": "WorkflowDBStore",
"attributes": {
"name": "WorkflowDBStore",
"status": "passing",
"output": ""
}
}
]
}
1 change: 1 addition & 0 deletions core/web/testdata/body/health.txt
Original file line number Diff line number Diff line change
Expand Up @@ -31,3 +31,4 @@ ok StarkNet.Baz.Chain
ok StarkNet.Baz.Relayer
ok StarkNet.Baz.Txm
ok TelemetryManager
ok WorkflowDBStore
10 changes: 10 additions & 0 deletions testdata/scripts/health/default.txtar
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ ok PipelineRunner
ok PipelineRunner.BridgeCache
ok RetirementReportCache
ok TelemetryManager
ok WorkflowDBStore

-- out.json --
{
Expand Down Expand Up @@ -134,6 +135,15 @@ ok TelemetryManager
"status": "passing",
"output": ""
}
},
{
"type": "checks",
"id": "WorkflowDBStore",
"attributes": {
"name": "WorkflowDBStore",
"status": "passing",
"output": ""
}
}
]
}
10 changes: 10 additions & 0 deletions testdata/scripts/health/multi-chain.txtar
Original file line number Diff line number Diff line change
Expand Up @@ -101,6 +101,7 @@ ok StarkNet.Baz.Chain
ok StarkNet.Baz.Relayer
ok StarkNet.Baz.Txm
ok TelemetryManager
ok WorkflowDBStore

-- out-unhealthy.txt --
! EVM.1.HeadTracker.HeadListener
Expand Down Expand Up @@ -396,6 +397,15 @@ ok TelemetryManager
"status": "passing",
"output": ""
}
},
{
"type": "checks",
"id": "WorkflowDBStore",
"attributes": {
"name": "WorkflowDBStore",
"status": "passing",
"output": ""
}
}
]
}
Expand Down
Loading