diff --git a/core/chains/evm/logpoller/log_poller.go b/core/chains/evm/logpoller/log_poller.go index dee5d1d1a5..c456893888 100644 --- a/core/chains/evm/logpoller/log_poller.go +++ b/core/chains/evm/logpoller/log_poller.go @@ -658,12 +658,14 @@ func (lp *logPoller) backgroundWorkerRun() { } case <-logPruneTick: logPruneTick = time.After(utils.WithJitter(lp.pollPeriod * 2401)) // = 7^5 avoids common factors with 1000 - if allRemoved, err := lp.PruneExpiredLogs(ctx); err != nil { - lp.lggr.Errorw("Unable to prune expired logs", "err", err) - } else if !allRemoved { + allRemoved, err := lp.PruneExpiredLogs(ctx) + if !allRemoved { // Tick faster when cleanup can't keep up with the pace of new logs logPruneTick = time.After(utils.WithJitter(lp.pollPeriod * 241)) } + if err != nil { + lp.lggr.Errorw("Unable to prune expired logs", "err", err) + } } } } @@ -1093,7 +1095,11 @@ func (lp *logPoller) PruneOldBlocks(ctx context.Context) (bool, error) { // Returns whether all logs eligible for pruning were removed. If logPrunePageSize is set to 0, it will always return true. func (lp *logPoller) PruneExpiredLogs(ctx context.Context) (bool, error) { rowsRemoved, err := lp.orm.DeleteExpiredLogs(ctx, lp.logPrunePageSize) - return lp.logPrunePageSize == 0 || rowsRemoved < lp.logPrunePageSize, err + if err != nil || rowsRemoved < lp.logPrunePageSize { + return true, err + } + rowsRemoved, err = lp.orm.DeleteExcessLogs(ctx, lp.logPrunePageSize) + return lp.logPrunePageSize == 0 || err != nil || rowsRemoved < lp.logPrunePageSize, err } // Logs returns logs matching topics and address (exactly) in the given block range, diff --git a/core/chains/evm/logpoller/orm.go b/core/chains/evm/logpoller/orm.go index 0550b524f4..b9bbfe0948 100644 --- a/core/chains/evm/logpoller/orm.go +++ b/core/chains/evm/logpoller/orm.go @@ -15,6 +15,7 @@ import ( "github.com/smartcontractkit/chainlink-common/pkg/types/query" "github.com/smartcontractkit/chainlink-common/pkg/sqlutil" + evmtypes "github.com/smartcontractkit/chainlink/v2/core/chains/evm/types" "github.com/smartcontractkit/chainlink/v2/core/chains/evm/types" @@ -36,6 +37,7 @@ type ORM interface { DeleteBlocksBefore(ctx context.Context, end int64, limit int64) (int64, error) DeleteLogsAndBlocksAfter(ctx context.Context, start int64) error DeleteExpiredLogs(ctx context.Context, limit int64) (int64, error) + DeleteExcessLogs(ctx context.Context, limit int64) (int64, error) GetBlocksRange(ctx context.Context, start int64, end int64) ([]LogPollerBlock, error) SelectBlockByNumber(ctx context.Context, blockNumber int64) (*LogPollerBlock, error) @@ -350,23 +352,30 @@ func (o *DSORM) DeleteExpiredLogs(ctx context.Context, limit int64) (int64, erro } // DeleteExcessLogs deletes any logs old enough that MaxLogsKept has been exceeded for every filter they match. -func (o *DSORM) DeleteExcessLogs(ctx context.Context) (int64, error) { - rowIds := struct { - EvmChainId ubig.Big - BlockHash common.Hash - LogIndex uint64 - }{} - err := o.ds.GetContext(ctx, &rowIds, ` +func (o *DSORM) DeleteExcessLogs(ctx context.Context, limit int64) (int64, error) { + var rowIds []struct { + BlockNumber uint64 + LogIndex uint64 + } + + var limitClause string + if limit > 0 { + limitClause = fmt.Sprintf(" LIMIT %d", limit) + } + + query := ` SELECT block_number, log_index FROM ( SELECT max_logs_kept != 0 AND ROW_NUMBER() OVER(PARTITION BY f.id ORDER BY block_number, log_index DESC) > max_logs_kept AS old, block_number, log_index FROM evm.log_poller_filters f JOIN evm.logs l ON f.evm_chain_id = l.evm_chain_id AND f.address = l.address AND f.event = l.event_sig WHERE f.evm_chain_id=$1 - ) x GROUP BY block_number, log_index HAVING BOOL_AND(old)`, ubig.New(o.chainID)) + ) x GROUP BY block_number, log_index HAVING BOOL_AND(old)` + limitClause + + err := o.ds.SelectContext(ctx, &rowIds, query, ubig.New(o.chainID)) if err != nil { return 0, err } - result, err := o.ds.ExecContext(ctx, `DELETE FROM evm.logs WHERE block_hash=:block_hash AND log_index=:log_index AND evm_chain_id=5`, rowIds) + result, err := o.ds.ExecContext(ctx, `DELETE FROM evm.logs WHERE evm_chain_id = :evm_chain_id AND (block_number, log_index) IN rowIds`, rowIds) if err != nil { return 0, err diff --git a/core/chains/evm/logpoller/orm_test.go b/core/chains/evm/logpoller/orm_test.go index 90e3e19102..5829a0e57e 100644 --- a/core/chains/evm/logpoller/orm_test.go +++ b/core/chains/evm/logpoller/orm_test.go @@ -2113,7 +2113,7 @@ func Benchmark_LogPruning(b *testing.B) { return o.DeleteExpiredLogs(ctx, 1000) }) runBenchmarking("DeleteExcessLogs", func(ctx context.Context) (int64, error) { - return o.DeleteExcessLogs(ctx) + return o.DeleteExcessLogs(ctx, 0) }) }