From 2299320d6b5dd569354ef2614107fb68d190d2ab Mon Sep 17 00:00:00 2001 From: Sergi Rene Date: Thu, 7 Nov 2024 18:32:35 +0100 Subject: [PATCH] fix(pruning): memory leak pruning tx indexer iterator (#1206) --- indexers/blockindexer/kv/kv.go | 31 ++++++++++++++++--------------- indexers/txindex/kv/kv.go | 28 +++++++++++++++------------- store/pruning.go | 21 ++++++++++----------- 3 files changed, 41 insertions(+), 39 deletions(-) diff --git a/indexers/blockindexer/kv/kv.go b/indexers/blockindexer/kv/kv.go index 736bd7895..d03e32940 100644 --- a/indexers/blockindexer/kv/kv.go +++ b/indexers/blockindexer/kv/kv.go @@ -546,6 +546,19 @@ func (idx *BlockerIndexer) pruneBlocks(from, to uint64, logger log.Logger) (uint } for h := int64(from); h < int64(to); h++ { + + // flush every 1000 blocks to avoid batches becoming too large + if toFlush > 1000 { + err := flush(batch, h) + if err != nil { + return 0, err + } + batch.Discard() + batch = idx.store.NewBatch() + + toFlush = 0 + } + ok, err := idx.Has(h) if err != nil { logger.Debug("pruning block indexer checking height", "height", h, "err", err) @@ -565,6 +578,7 @@ func (idx *BlockerIndexer) pruneBlocks(from, to uint64, logger log.Logger) (uint } pruned++ + toFlush++ prunedEvents, err := idx.pruneEvents(h, logger, batch) if err != nil { @@ -572,20 +586,8 @@ func (idx *BlockerIndexer) pruneBlocks(from, to uint64, logger log.Logger) (uint continue } pruned += prunedEvents + toFlush += prunedEvents - toFlush += pruned - - // flush every 1000 blocks to avoid batches becoming too large - if toFlush > 1000 { - err := flush(batch, h) - if err != nil { - return 0, err - } - batch.Discard() - batch = idx.store.NewBatch() - - toFlush = 0 - } } err := flush(batch, int64(to)) @@ -613,13 +615,12 @@ func (idx *BlockerIndexer) pruneEvents(height int64, logger log.Logger, batch st return pruned, err } for _, key := range eventKeys.Keys { + pruned++ err := batch.Delete(key) if err != nil { logger.Error("pruning block indexer iterate events", "height", height, "err", err) continue } - pruned++ - } return pruned, nil } diff --git a/indexers/txindex/kv/kv.go b/indexers/txindex/kv/kv.go index 1dd3cbf96..aeca26fc6 100644 --- a/indexers/txindex/kv/kv.go +++ b/indexers/txindex/kv/kv.go @@ -594,6 +594,17 @@ func (txi *TxIndex) pruneTxsAndEvents(from, to uint64, logger log.Logger) (uint6 for h := from; h < to; h++ { + // flush every 1000 txs to avoid batches becoming too large + if toFlush > 1000 { + err := flush(batch, int64(h)) + if err != nil { + return 0, err + } + batch.Discard() + batch = txi.store.NewBatch() + toFlush = 0 + } + // first all events are pruned associated to the same height prunedEvents, err := txi.pruneEvents(h, batch) if err != nil { @@ -601,13 +612,14 @@ func (txi *TxIndex) pruneTxsAndEvents(from, to uint64, logger log.Logger) (uint6 continue } pruned += prunedEvents + toFlush += prunedEvents // then all txs indexed are iterated by height it := txi.store.PrefixIterator(prefixForHeight(int64(h))) - defer it.Discard() // and deleted all indexed (by hash and by keyheight) for ; it.Valid(); it.Next() { + toFlush++ if err := batch.Delete(it.Key()); err != nil { logger.Error("pruning txs indexer event key", "height", h, "error", err) continue @@ -620,17 +632,7 @@ func (txi *TxIndex) pruneTxsAndEvents(from, to uint64, logger log.Logger) (uint6 pruned++ } - toFlush += pruned - // flush every 1000 txs to avoid batches becoming too large - if toFlush > 1000 { - err := flush(batch, int64(h)) - if err != nil { - return 0, err - } - batch.Discard() - batch = txi.store.NewBatch() - toFlush = 0 - } + it.Discard() } @@ -658,11 +660,11 @@ func (txi *TxIndex) pruneEvents(height uint64, batch store.KVBatch) (uint64, err return pruned, err } for _, key := range eventKeys.Keys { + pruned++ err := batch.Delete(key) if err != nil { return pruned, err } - pruned++ } return pruned, nil } diff --git a/store/pruning.go b/store/pruning.go index e1cccc827..5940f8ae9 100644 --- a/store/pruning.go +++ b/store/pruning.go @@ -32,17 +32,6 @@ func (s *DefaultStore) PruneStore(to uint64, logger types.Logger) (uint64, error // pruneHeights prunes all store entries that are stored along blocks (blocks,commit,proposer, etc) func (s *DefaultStore) pruneHeights(from, to uint64, logger types.Logger) (uint64, error) { pruneBlocks := func(batch KVBatch, height uint64) error { - - if err := batch.Delete(getResponsesKey(height)); err != nil { - logger.Error("delete responses", "error", err) - } - if err := batch.Delete(getDRSVersionKey(height)); err != nil { - logger.Error("delete drs", "error", err) - } - if err := batch.Delete(getProposerKey(height)); err != nil { - logger.Error("delete proposer", "error", err) - } - hash, err := s.loadHashFromIndex(height) if err != nil { return err @@ -58,6 +47,16 @@ func (s *DefaultStore) pruneHeights(from, to uint64, logger types.Logger) (uint6 logger.Error("delete hash index", "error", err) } + if err := batch.Delete(getResponsesKey(height)); err != nil { + logger.Error("delete responses", "error", err) + } + if err := batch.Delete(getDRSVersionKey(height)); err != nil { + logger.Error("delete drs", "error", err) + } + if err := batch.Delete(getProposerKey(height)); err != nil { + logger.Error("delete proposer", "error", err) + } + return nil }