Skip to content

Commit

Permalink
fix(store): improve pruning to avoid skipped heights (#1058)
Browse files Browse the repository at this point in the history
  • Loading branch information
srene authored Sep 19, 2024
1 parent d51b961 commit 5972ffe
Show file tree
Hide file tree
Showing 9 changed files with 218 additions and 80 deletions.
2 changes: 1 addition & 1 deletion block/block.go
Original file line number Diff line number Diff line change
Expand Up @@ -98,7 +98,7 @@ func (m *Manager) applyBlock(block *types.Block, commit *types.Commit, blockMeta

// Prune old heights, if requested by ABCI app.
if 0 < retainHeight {
err = m.PruneBlocks(uint64(retainHeight))
_, err := m.PruneBlocks(uint64(retainHeight))
if err != nil {
m.logger.Error("prune blocks", "retain_height", retainHeight, "err", err)
}
Expand Down
2 changes: 1 addition & 1 deletion block/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -177,7 +177,7 @@ func (m *Manager) Start(ctx context.Context) error {
}

/* ----------------------------- sequencer mode ----------------------------- */
// Subscribe to batch events, to update last submitted height in case batch confirmation was lost. This could happen if the sequencer crash/restarted just after submitting a batch to the settelement and by the time we query the last batch, this batch wasn't accepted yet.
// Subscribe to batch events, to update last submitted height in case batch confirmation was lost. This could happen if the sequencer crash/restarted just after submitting a batch to the settlement and by the time we query the last batch, this batch wasn't accepted yet.
go uevent.MustSubscribe(ctx, m.Pubsub, "updateSubmittedHeightLoop", settlement.EventQueryNewSettlementBatchAccepted, m.UpdateLastSubmittedHeight, m.logger)

// Sequencer must wait till DA is synced to start submitting blobs
Expand Down
23 changes: 11 additions & 12 deletions block/pruning.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,35 +3,34 @@ package block
import (
"context"
"fmt"

"github.com/dymensionxyz/gerr-cosmos/gerrc"
)

func (m *Manager) PruneBlocks(retainHeight uint64) error {
if m.IsProposer() && m.NextHeightToSubmit() < retainHeight { // do not delete anything that we might submit in future
return fmt.Errorf("cannot prune blocks before they have been submitted: retain height %d: next height to submit: %d: %w",
retainHeight,
m.NextHeightToSubmit(),
gerrc.ErrInvalidArgument)
// PruneBlocks prune all block related data from dymint store up to (but not including) retainHeight. It returns the number of blocks pruned, used for testing.
func (m *Manager) PruneBlocks(retainHeight uint64) (uint64, error) {
nextSubmissionHeight := m.NextHeightToSubmit()
if m.IsProposer() && nextSubmissionHeight < retainHeight { // do not delete anything that we might submit in future
m.logger.Debug("cannot prune blocks before they have been submitted. using height last submitted height for pruning", "retain_height", retainHeight, "height_to_submit", m.NextHeightToSubmit())
retainHeight = nextSubmissionHeight
}

err := m.P2PClient.RemoveBlocks(context.Background(), m.State.BaseHeight, retainHeight)
if err != nil {
m.logger.Error("pruning blocksync store", "retain_height", retainHeight, "err", err)
}
pruned, err := m.Store.PruneBlocks(m.State.BaseHeight, retainHeight)
pruned, err := m.Store.PruneStore(m.State.BaseHeight, retainHeight, m.logger)
if err != nil {
return fmt.Errorf("prune block store: %w", err)
return 0, fmt.Errorf("prune block store: %w", err)
}

// TODO: prune state/indexer and state/txindexer??

m.State.BaseHeight = retainHeight
_, err = m.Store.SaveState(m.State, nil)
if err != nil {
return fmt.Errorf("save state: %w", err)
return 0, fmt.Errorf("save state: %w", err)
}

m.logger.Info("pruned blocks", "pruned", pruned, "retain_height", retainHeight)
return nil

return pruned, nil
}
19 changes: 11 additions & 8 deletions block/pruning_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"github.com/dymensionxyz/dymint/da"
"github.com/dymensionxyz/dymint/testutil"
"github.com/dymensionxyz/dymint/version"
"github.com/dymensionxyz/gerr-cosmos/gerrc"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/mock"
"github.com/stretchr/testify/require"
Expand Down Expand Up @@ -64,13 +65,15 @@ func TestPruningRetainHeight(t *testing.T) {
_, _, err = manager.ProduceApplyGossipBlock(ctx, true)
require.NoError(err)
}

validRetainHeight := lastSubmitted + 1 // the max possible valid retain height
for i := validRetainHeight + 1; i < manager.State.Height(); i++ {
err = manager.PruneBlocks(i)
require.Error(err) // cannot prune blocks before they have been submitted
validRetainHeight := manager.NextHeightToSubmit() // the max possible valid retain height
for i := validRetainHeight; i < manager.State.Height(); i++ {
expectedPruned := validRetainHeight - manager.State.BaseHeight
pruned, err := manager.PruneBlocks(i)
if i <= validRetainHeight {
require.NoError(err)
assert.Equal(t, expectedPruned, pruned)
} else {
require.Error(gerrc.ErrInvalidArgument)
}
}

err = manager.PruneBlocks(validRetainHeight)
require.NoError(err)
}
6 changes: 3 additions & 3 deletions block/submit.go
Original file line number Diff line number Diff line change
Expand Up @@ -117,8 +117,8 @@ func SubmitLoopInner(
logger.Error("Create and submit batch", "err", err, "pending", pending)
panic(err)
}
// this could happen if we timed-out waiting for acceptance in the previous iteration, but the batch was indeed submitted.
// we panic here cause restarting may reset the last batch submitted counter and the sequencer can potentially resume submitting batches.
// this could happen if we timed-out waiting for acceptance in the previous iteration, but the batch was indeed submitted.
// we panic here cause restarting may reset the last batch submitted counter and the sequencer can potentially resume submitting batches.
if errors.Is(err, gerrc.ErrAlreadyExists) {
logger.Debug("Batch already accepted", "err", err, "pending", pending)
panic(err)
Expand Down Expand Up @@ -273,7 +273,7 @@ func (m *Manager) GetUnsubmittedBlocks() uint64 {
return m.State.Height() - m.LastSubmittedHeight.Load()
}

// UpdateLastSubmittedHeight will update last height submitted height upon events.
// UpdateLastSubmittedHeight will update last height submitted height upon events.
// This may be necessary in case we crashed/restarted before getting response for our submission to the settlement layer.
func (m *Manager) UpdateLastSubmittedHeight(event pubsub.Message) {
eventData, ok := event.Data().(*settlement.EventDataNewBatchAccepted)
Expand Down
10 changes: 7 additions & 3 deletions p2p/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package p2p
import (
"context"
"encoding/hex"
"errors"
"fmt"
"strconv"
"strings"
Expand Down Expand Up @@ -230,14 +231,17 @@ func (c *Client) RemoveBlocks(ctx context.Context, from, to uint64) error {
}

for h := from; h < to; h++ {

cid, err := c.store.LoadBlockCid(h)
if errors.Is(err, gerrc.ErrNotFound) {
continue
}
if err != nil {
return fmt.Errorf("load block id from store %d: %w", h, err)
c.logger.Error("load blocksync block id from store", "height", h, "error", err)
continue
}
err = c.blocksync.DeleteBlock(ctx, cid)
if err != nil {
return fmt.Errorf("remove block height %d: %w", h, err)
c.logger.Error("remove blocksync block", "height", h, "err", err)
}
}
return nil
Expand Down
107 changes: 84 additions & 23 deletions store/pruning.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,11 +3,12 @@ package store
import (
"fmt"

"github.com/dymensionxyz/dymint/types"
"github.com/dymensionxyz/gerr-cosmos/gerrc"
)

// PruneBlocks removes block up to (but not including) a height. It returns number of blocks pruned.
func (s *DefaultStore) PruneBlocks(from, to uint64) (uint64, error) {
// PruneStore removes blocks up to (but not including) a height. It returns number of blocks pruned.
func (s *DefaultStore) PruneStore(from, to uint64, logger types.Logger) (uint64, error) {
if from <= 0 {
return 0, fmt.Errorf("from height must be greater than 0: %w", gerrc.ErrInvalidArgument)
}
Expand All @@ -16,7 +17,84 @@ func (s *DefaultStore) PruneBlocks(from, to uint64) (uint64, error) {
return 0, fmt.Errorf("to height must be greater than from height: to: %d: from: %d: %w", to, from, gerrc.ErrInvalidArgument)
}

prunedBlocks, err := s.pruneBlocks(from, to, logger)
if err != nil {
logger.Error("pruning blocks", "from", from, "to", to, "blocks pruned", prunedBlocks, "err", err)
}

prunedResponses, err := s.pruneResponses(from, to, logger)
if err != nil {
logger.Error("pruning responses", "from", from, "to", to, "responses pruned", prunedResponses, "err", err)
}

prunedSequencers, err := s.pruneSequencers(from, to, logger)
if err != nil {
logger.Error("pruning sequencers", "from", from, "to", to, "sequencers pruned", prunedSequencers, "err", err)
}

prunedCids, err := s.pruneCids(from, to, logger)
if err != nil {
logger.Error("pruning block sync identifiers", "from", from, "to", to, "cids pruned", prunedCids, "err", err)
}

return prunedBlocks, nil
}

// pruneBlocks prunes all store entries that are stored along blocks (blocks,commit and block hash)
func (s *DefaultStore) pruneBlocks(from, to uint64, logger types.Logger) (uint64, error) {
pruneBlocks := func(batch KVBatch, height uint64) error {
hash, err := s.loadHashFromIndex(height)
if err != nil {
return err
}
if err := batch.Delete(getBlockKey(hash)); err != nil {
return err
}
if err := batch.Delete(getCommitKey(hash)); err != nil {
return err
}
if err := batch.Delete(getIndexKey(height)); err != nil {
return err
}
return nil
}

prunedBlocks, err := s.pruneHeights(from, to, pruneBlocks, logger)
return prunedBlocks, err
}

// pruneResponses prunes block execution responses from store
func (s *DefaultStore) pruneResponses(from, to uint64, logger types.Logger) (uint64, error) {
pruneResponses := func(batch KVBatch, height uint64) error {
return batch.Delete(getResponsesKey(height))
}

prunedResponses, err := s.pruneHeights(from, to, pruneResponses, logger)
return prunedResponses, err
}

// pruneSequencers prunes sequencers from store
func (s *DefaultStore) pruneSequencers(from, to uint64, logger types.Logger) (uint64, error) {
pruneSequencers := func(batch KVBatch, height uint64) error {
return batch.Delete(getSequencersKey(height))
}
prunedSequencers, err := s.pruneHeights(from, to, pruneSequencers, logger)
return prunedSequencers, err
}

// pruneCids prunes content identifiers from store
func (s *DefaultStore) pruneCids(from, to uint64, logger types.Logger) (uint64, error) {
pruneCids := func(batch KVBatch, height uint64) error {
return batch.Delete(getCidKey(height))
}
prunedCids, err := s.pruneHeights(from, to, pruneCids, logger)
return prunedCids, err
}

// pruneHeights is the common function for all pruning that iterates through all heights and prunes according to the pruning function set
func (s *DefaultStore) pruneHeights(from, to uint64, prune func(batch KVBatch, height uint64) error, logger types.Logger) (uint64, error) {
pruned := uint64(0)

batch := s.db.NewBatch()
defer batch.Discard()

Expand All @@ -29,29 +107,11 @@ func (s *DefaultStore) PruneBlocks(from, to uint64) (uint64, error) {
}

for h := from; h < to; h++ {
hash, err := s.loadHashFromIndex(h)
err := prune(batch, h)
if err != nil {
logger.Debug("unable to prune", "height", h, "err", err)
continue
}
if err := batch.Delete(getBlockKey(hash)); err != nil {
return 0, err
}
if err := batch.Delete(getCommitKey(hash)); err != nil {
return 0, err
}
if err := batch.Delete(getIndexKey(h)); err != nil {
return 0, err
}
if err := batch.Delete(getResponsesKey(h)); err != nil {
return 0, err
}
if err := batch.Delete(getSequencersKey(h)); err != nil {
return 0, err
}
if err := batch.Delete(getCidKey(h)); err != nil {
return 0, err
}

pruned++

// flush every 1000 blocks to avoid batches becoming too large
Expand All @@ -63,11 +123,12 @@ func (s *DefaultStore) PruneBlocks(from, to uint64) (uint64, error) {
batch.Discard()
batch = s.db.NewBatch()
}
}

}
err := flush(batch, to)
if err != nil {
return 0, err
}

return pruned, nil
}
Loading

0 comments on commit 5972ffe

Please sign in to comment.