Skip to content

Commit

Permalink
Prune workfllw DB entries
Browse files Browse the repository at this point in the history
  • Loading branch information
bolekk committed Nov 13, 2024
1 parent 61aa9af commit 8cdddd9
Show file tree
Hide file tree
Showing 3 changed files with 67 additions and 4 deletions.
1 change: 1 addition & 0 deletions core/services/chainlink/application.go
Original file line number Diff line number Diff line change
Expand Up @@ -400,6 +400,7 @@ func NewApplication(opts ApplicationOpts) (Application, error) {
streamRegistry = streams.NewRegistry(globalLogger, pipelineRunner)
workflowORM = workflowstore.NewDBStore(opts.DS, globalLogger, clockwork.NewRealClock())
)
srvcs = append(srvcs, workflowORM)

promReporter := headreporter.NewPrometheusReporter(opts.DS, legacyEVMChains)
chainIDs := make([]*big.Int, legacyEVMChains.Len())
Expand Down
1 change: 1 addition & 0 deletions core/services/workflows/delegate.go
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,7 @@ func (d *Delegate) ServicesForSpec(ctx context.Context, spec job.Job) ([]job.Ser
if err != nil {
return nil, err
}
d.logger.Infow("Creating Workflow Engine for workflow spec", "workflowID", spec.WorkflowSpec.WorkflowID, "workflowOwner", spec.WorkflowSpec.WorkflowOwner, "workflowName", spec.WorkflowSpec.WorkflowName, "jobName", spec.Name)
return []job.ServiceCtx{engine}, nil
}

Expand Down
69 changes: 65 additions & 4 deletions core/services/workflows/store/store_db.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"context"
"errors"
"fmt"
"sync"
"time"

"google.golang.org/protobuf/proto"
Expand All @@ -15,17 +16,24 @@ import (
"github.com/smartcontractkit/chainlink-common/pkg/values"
valuespb "github.com/smartcontractkit/chainlink-common/pkg/values/pb"

commonservices "github.com/smartcontractkit/chainlink-common/pkg/services"
"github.com/smartcontractkit/chainlink/v2/core/logger"
"github.com/smartcontractkit/chainlink/v2/core/services"
)

// `DBStore` is a postgres-backed
// data store that persists workflow progress.
type DBStore struct {
lggr logger.Logger
db sqlutil.DataSource
clock clockwork.Clock
commonservices.StateMachine
lggr logger.Logger
db sqlutil.DataSource
shutdownWaitGroup sync.WaitGroup
chStop commonservices.StopChan
clock clockwork.Clock
}

var _ services.ServiceCtx = (*DBStore)(nil)

// `workflowExecutionRow` describes a row
// of the `workflow_executions` table
type workflowExecutionRow struct {
Expand Down Expand Up @@ -70,6 +78,59 @@ type workflowExecutionWithStep struct {
WEFinishedAt *time.Time `db:"we_finished_at"`
}

func (d *DBStore) Start(context.Context) error {
return d.StartOnce("DBStore", func() error {
d.shutdownWaitGroup.Add(1)
go d.pruneDBEntries()
return nil
})
}

func (d *DBStore) Close() error {
return d.StopOnce("DBStore", func() error {
close(d.chStop)
d.shutdownWaitGroup.Wait()
return nil
})
}

func (d *DBStore) pruneDBEntries() {
defer d.shutdownWaitGroup.Done()
ticker := time.NewTicker(10 * time.Second)
defer ticker.Stop()
for {
select {
case <-d.chStop:
return
case <-ticker.C:
ctx, cancel := d.chStop.CtxWithTimeout(60 * time.Second)
err := sqlutil.TransactDataSource(ctx, d.db, nil, func(tx sqlutil.DataSource) error {
stmt := "DELETE FROM workflow_executions WHERE (id) IN (SELECT id FROM workflow_executions WHERE (created_at < now() - interval '2 hours') LIMIT 5000);"
_, err := tx.ExecContext(ctx, stmt)
return err
})
if err != nil {
d.lggr.Errorw("Failed to prune workflow_executions", "err", err)
} else {
d.lggr.Infow("Pruned up to 5k entries")
}
cancel()
}
}
}

func (d *DBStore) Ready() error {
return nil
}

func (d *DBStore) HealthReport() map[string]error {
return map[string]error{d.Name(): d.Healthy()}
}

func (d *DBStore) Name() string {
return d.lggr.Name()
}

// `UpdateStatus` updates the status of the given workflow execution
func (d *DBStore) UpdateStatus(ctx context.Context, executionID string, status string) error {
sql := `UPDATE workflow_executions SET status = $1, updated_at = $2 WHERE id = $3`
Expand Down Expand Up @@ -407,5 +468,5 @@ func (d *DBStore) GetUnfinished(ctx context.Context, workflowID string, offset,
}

func NewDBStore(ds sqlutil.DataSource, lggr logger.Logger, clock clockwork.Clock) *DBStore {
return &DBStore{db: ds, lggr: lggr.Named("WorkflowDBStore"), clock: clock}
return &DBStore{db: ds, lggr: lggr.Named("WorkflowDBStore"), clock: clock, chStop: make(chan struct{})}
}

0 comments on commit 8cdddd9

Please sign in to comment.