Skip to content

Commit

Permalink
feat(blockManager): refactor and use state as single source of truth …
Browse files Browse the repository at this point in the history
…for height (#847)

Co-authored-by: danwt <[email protected]>
  • Loading branch information
mtsitrin and danwt authored May 15, 2024
1 parent fcc2b15 commit 73aae62
Show file tree
Hide file tree
Showing 52 changed files with 632 additions and 817 deletions.
3 changes: 2 additions & 1 deletion .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -5,4 +5,5 @@ proto/pb
.go-version
build

vendor/
vendor/
da/grpc/mockserv/db/
141 changes: 40 additions & 101 deletions block/block.go
Original file line number Diff line number Diff line change
@@ -1,27 +1,23 @@
package block

import (
"context"
"fmt"

errorsmod "cosmossdk.io/errors"

"github.com/dymensionxyz/dymint/p2p"
"github.com/dymensionxyz/dymint/types"
tmtypes "github.com/tendermint/tendermint/types"
)

// applyBlock applies the block to the store and the abci app.
// Contract: block and commit must be validated before calling this function!
// steps: save block -> execute block with app -> update state -> commit block to app -> update store height and state hash.
// steps: save block -> execute block with app -> update state -> commit block to app -> update state's height and commit result.
// As the entire process can't be atomic we need to make sure the following condition apply before
// - block height is the expected block height on the store (height + 1).
// - block height is the expected block height on the app (last block height + 1).
func (m *Manager) applyBlock(block *types.Block, commit *types.Commit, blockMetaData blockMetaData) error {
// TODO (#330): allow genesis block with height > 0 to be applied.
// TODO: add switch case to have defined behavior for each case.
// validate block height
if block.Header.Height != m.Store.NextHeight() {
if block.Header.Height != m.State.NextHeight() {
return types.ErrInvalidBlockHeight
}

Expand All @@ -35,6 +31,8 @@ func (m *Manager) applyBlock(block *types.Block, commit *types.Commit, blockMeta
// In case the following true, it means we crashed after the commit and before updating the store height.
// In that case we'll want to align the store with the app state and continue to the next block.
if isBlockAlreadyApplied {
// In this case, where the app was committed, but the state wasn't updated
// it will update the state from appInfo, saved responses and validators.
err := m.UpdateStateFromApp()
if err != nil {
return fmt.Errorf("update state from app: %w", err)
Expand All @@ -48,83 +46,82 @@ func (m *Manager) applyBlock(block *types.Block, commit *types.Commit, blockMeta
return fmt.Errorf("save block: %w", err)
}

responses, err := m.Executor.ExecuteBlock(m.LastState, block)
responses, err := m.Executor.ExecuteBlock(m.State, block)
if err != nil {
return fmt.Errorf("execute block: %w", err)
}

newState, err := m.Executor.UpdateStateFromResponses(responses, m.LastState, block)
dbBatch := m.Store.NewBatch()
dbBatch, err = m.Store.SaveBlockResponses(block.Header.Height, responses, dbBatch)
if err != nil {
return fmt.Errorf("update state from responses: %w", err)
}

batch := m.Store.NewBatch()

batch, err = m.Store.SaveBlockResponses(block.Header.Height, responses, batch)
if err != nil {
batch.Discard()
dbBatch.Discard()
return fmt.Errorf("save block responses: %w", err)
}

m.LastState = newState
batch, err = m.Store.UpdateState(m.LastState, batch)
// Get the validator changes from the app
validators, err := m.Executor.NextValSetFromResponses(m.State, responses, block)
if err != nil {
batch.Discard()
return fmt.Errorf("update state: %w", err)
return fmt.Errorf("update state from responses: %w", err)
}
batch, err = m.Store.SaveValidators(block.Header.Height, m.LastState.Validators, batch)

dbBatch, err = m.Store.SaveValidators(block.Header.Height, validators, dbBatch)
if err != nil {
batch.Discard()
dbBatch.Discard()
return fmt.Errorf("save validators: %w", err)
}

err = batch.Commit()
err = dbBatch.Commit()
if err != nil {
return fmt.Errorf("commit batch to disk: %w", err)
}

// Commit block to app
retainHeight, err := m.Executor.Commit(&newState, block, responses)
appHash, retainHeight, err := m.Executor.Commit(m.State, block, responses)
if err != nil {
return fmt.Errorf("commit block: %w", err)
}

// If failed here, after the app committed, but before the state is updated, we'll update the state on
// UpdateStateFromApp using the saved responses and validators.

// Update the state with the new app hash, last validators and store height from the commit.
// Every one of those, if happens before commit, prevents us from re-executing the block in case failed during commit.
m.Executor.UpdateStateAfterCommit(m.State, responses, appHash, block.Header.Height, validators)
_, err = m.Store.SaveState(m.State, nil)
if err != nil {
return fmt.Errorf("update state: %w", err)
}

// Prune old heights, if requested by ABCI app.
if retainHeight > 0 {
pruned, err := m.pruneBlocks(retainHeight)
_, err := m.pruneBlocks(uint64(retainHeight))
if err != nil {
m.logger.Error("prune blocks", "retain_height", retainHeight, "err", err)
} else {
m.logger.Debug("pruned blocks", "pruned", pruned, "retain_height", retainHeight)
}
}
return nil
}

// Update the state with the new app hash, last validators and store height from the commit.
// Every one of those, if happens before commit, prevents us from re-executing the block in case failed during commit.
newState.LastValidators = m.LastState.Validators.Copy()
newState.LastStoreHeight = block.Header.Height
newState.BaseHeight = m.Store.Base()

_, err = m.Store.UpdateState(newState, nil)
// isHeightAlreadyApplied checks if the block height is already applied to the app.
func (m *Manager) isHeightAlreadyApplied(blockHeight uint64) (bool, error) {
proxyAppInfo, err := m.Executor.GetAppInfo()
if err != nil {
return fmt.Errorf("final update state: %w", err)
return false, errorsmod.Wrap(err, "get app info")
}
m.LastState = newState

if ok := m.Store.SetHeight(block.Header.Height); !ok {
return fmt.Errorf("store set height: %d", block.Header.Height)
}
isBlockAlreadyApplied := uint64(proxyAppInfo.LastBlockHeight) == blockHeight

return nil
// TODO: add switch case to validate better the current app state

return isBlockAlreadyApplied, nil
}

// TODO: move to gossip.go
func (m *Manager) attemptApplyCachedBlocks() error {
m.retrieverMutex.Lock()
defer m.retrieverMutex.Unlock()

for {
expectedHeight := m.Store.NextHeight()
expectedHeight := m.State.NextHeight()

cachedBlock, blockExists := m.blockCache[expectedHeight]
if !blockExists {
Expand All @@ -148,68 +145,10 @@ func (m *Manager) attemptApplyCachedBlocks() error {
return nil
}

// isHeightAlreadyApplied checks if the block height is already applied to the app.
func (m *Manager) isHeightAlreadyApplied(blockHeight uint64) (bool, error) {
proxyAppInfo, err := m.Executor.GetAppInfo()
if err != nil {
return false, errorsmod.Wrap(err, "get app info")
}

isBlockAlreadyApplied := uint64(proxyAppInfo.LastBlockHeight) == blockHeight

// TODO: add switch case to validate better the current app state

return isBlockAlreadyApplied, nil
}

// UpdateStateFromApp is responsible for aligning the state of the store from the abci app
func (m *Manager) UpdateStateFromApp() error {
proxyAppInfo, err := m.Executor.GetAppInfo()
if err != nil {
return errorsmod.Wrap(err, "get app info")
}

appHeight := uint64(proxyAppInfo.LastBlockHeight)

// update the state with the hash, last store height and last validators.
m.LastState.AppHash = *(*[32]byte)(proxyAppInfo.LastBlockAppHash)
m.LastState.LastStoreHeight = appHeight
m.LastState.LastValidators = m.LastState.Validators.Copy()

resp, err := m.Store.LoadBlockResponses(appHeight)
if err != nil {
return errorsmod.Wrap(err, "load block responses")
}
copy(m.LastState.LastResultsHash[:], tmtypes.NewResults(resp.DeliverTxs).Hash())

_, err = m.Store.UpdateState(m.LastState, nil)
if err != nil {
return errorsmod.Wrap(err, "update state")
}
if ok := m.Store.SetHeight(appHeight); !ok {
return fmt.Errorf("store set height: %d", appHeight)
}
return nil
}

func (m *Manager) validateBlock(block *types.Block, commit *types.Commit) error {
// Currently we're assuming proposer is never nil as it's a pre-condition for
// dymint to start
proposer := m.SLClient.GetProposer()

return types.ValidateProposedTransition(m.LastState, block, commit, proposer)
}

func (m *Manager) gossipBlock(ctx context.Context, block types.Block, commit types.Commit) error {
gossipedBlock := p2p.GossipedBlock{Block: block, Commit: commit}
gossipedBlockBytes, err := gossipedBlock.MarshalBinary()
if err != nil {
return fmt.Errorf("marshal binary: %w: %w", err, ErrNonRecoverable)
}
if err := m.p2pClient.GossipBlock(ctx, gossipedBlockBytes); err != nil {
// Although this boils down to publishing on a topic, we don't want to speculate too much on what
// could cause that to fail, so we assume recoverable.
return fmt.Errorf("p2p gossip block: %w: %w", err, ErrRecoverable)
}
return nil
return types.ValidateProposedTransition(m.State, block, commit, proposer)
}
20 changes: 8 additions & 12 deletions block/executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,7 @@ func (e *Executor) InitChain(genesis *tmtypes.GenesisDoc, validators []*tmtypes.
}

// CreateBlock reaps transactions from mempool and builds a block.
func (e *Executor) CreateBlock(height uint64, lastCommit *types.Commit, lastHeaderHash [32]byte, state types.State, maxBytes uint64) *types.Block {
func (e *Executor) CreateBlock(height uint64, lastCommit *types.Commit, lastHeaderHash [32]byte, state *types.State, maxBytes uint64) *types.Block {
if state.ConsensusParams.Block.MaxBytes > 0 {
maxBytes = min(maxBytes, uint64(state.ConsensusParams.Block.MaxBytes))
}
Expand Down Expand Up @@ -134,21 +134,18 @@ func (e *Executor) CreateBlock(height uint64, lastCommit *types.Commit, lastHead
}

// Commit commits the block
func (e *Executor) Commit(state *types.State, block *types.Block, resp *tmstate.ABCIResponses) (int64, error) {
func (e *Executor) Commit(state *types.State, block *types.Block, resp *tmstate.ABCIResponses) ([]byte, int64, error) {
appHash, retainHeight, err := e.commit(state, block, resp.DeliverTxs)
if err != nil {
return 0, err
return nil, 0, err
}

copy(state.AppHash[:], appHash[:])
copy(state.LastResultsHash[:], tmtypes.NewResults(resp.DeliverTxs).Hash())

err = e.publishEvents(resp, block, *state)
err = e.publishEvents(resp, block)
if err != nil {
e.logger.Error("fire block events", "error", err)
return 0, err
return nil, 0, err
}
return retainHeight, nil
return appHash, retainHeight, nil
}

// GetAppInfo returns the latest AppInfo from the proxyApp.
Expand Down Expand Up @@ -183,7 +180,7 @@ func (e *Executor) commit(state *types.State, block *types.Block, deliverTxs []*
}

// ExecuteBlock executes the block and returns the ABCIResponses. Block should be valid (passed validation checks).
func (e *Executor) ExecuteBlock(state types.State, block *types.Block) (*tmstate.ABCIResponses, error) {
func (e *Executor) ExecuteBlock(state *types.State, block *types.Block) (*tmstate.ABCIResponses, error) {
abciResponses := new(tmstate.ABCIResponses)
abciResponses.DeliverTxs = make([]*abci.ResponseDeliverTx, len(block.Data.Txs))

Expand Down Expand Up @@ -252,13 +249,12 @@ func (e *Executor) getDataHash(block *types.Block) []byte {
return abciData.Hash()
}

func (e *Executor) publishEvents(resp *tmstate.ABCIResponses, block *types.Block, state types.State) error {
func (e *Executor) publishEvents(resp *tmstate.ABCIResponses, block *types.Block) error {
if e.eventBus == nil {
return nil
}

abciBlock, err := types.ToABCIBlock(block)
abciBlock.Header.ValidatorsHash = state.Validators.Hash()
if err != nil {
return err
}
Expand Down
32 changes: 14 additions & 18 deletions block/executor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ func TestCreateBlock(t *testing.T) {

maxBytes := uint64(100)

state := types.State{}
state := &types.State{}
state.ConsensusParams.Block.MaxBytes = int64(maxBytes)
state.ConsensusParams.Block.MaxGas = 100000
state.Validators = tmtypes.NewValidatorSet(nil)
Expand Down Expand Up @@ -140,13 +140,12 @@ func TestApplyBlock(t *testing.T) {
require.NotNil(headerSub)

// Init state
state := types.State{
state := &types.State{
NextValidators: tmtypes.NewValidatorSet(nil),
Validators: tmtypes.NewValidatorSet(nil),
LastValidators: tmtypes.NewValidatorSet(nil),
}
state.InitialHeight = 1
state.LastBlockHeight = 0
state.LastBlockHeight.Store(0)
maxBytes := uint64(100)
state.ConsensusParams.Block.MaxBytes = int64(maxBytes)
state.ConsensusParams.Block.MaxGas = 100000
Expand Down Expand Up @@ -182,21 +181,18 @@ func TestApplyBlock(t *testing.T) {
resp, err := executor.ExecuteBlock(state, block)
require.NoError(err)
require.NotNil(resp)
newState, err := executor.UpdateStateFromResponses(resp, state, block)
appHash, _, err := executor.Commit(state, block, resp)
require.NoError(err)
require.NotNil(newState)
assert.Equal(int64(1), newState.LastBlockHeight)
_, err = executor.Commit(&newState, block, resp)
require.NoError(err)
assert.Equal(mockAppHash, newState.AppHash)
newState.LastStoreHeight = uint64(newState.LastBlockHeight)
executor.UpdateStateAfterCommit(state, resp, appHash, block.Header.Height, state.Validators)
assert.Equal(uint64(1), state.Height())
assert.Equal(mockAppHash, state.AppHash)

// Create another block with multiple Tx from mempool
require.NoError(mpool.CheckTx([]byte{0, 1, 2, 3, 4}, func(r *abci.Response) {}, mempool.TxInfo{}))
require.NoError(mpool.CheckTx([]byte{5, 6, 7, 8, 9}, func(r *abci.Response) {}, mempool.TxInfo{}))
require.NoError(mpool.CheckTx([]byte{1, 2, 3, 4, 5}, func(r *abci.Response) {}, mempool.TxInfo{}))
require.NoError(mpool.CheckTx(make([]byte, 90), func(r *abci.Response) {}, mempool.TxInfo{}))
block = executor.CreateBlock(2, commit, [32]byte{}, newState, maxBytes)
block = executor.CreateBlock(2, commit, [32]byte{}, state, maxBytes)
require.NotNil(block)
assert.Equal(uint64(2), block.Header.Height)
assert.Len(block.Data.Txs, 3)
Expand All @@ -217,7 +213,7 @@ func TestApplyBlock(t *testing.T) {
}

// Apply the block with an invalid commit
err = types.ValidateProposedTransition(newState, block, invalidCommit, proposer)
err = types.ValidateProposedTransition(state, block, invalidCommit, proposer)

require.ErrorIs(err, types.ErrInvalidSignature)

Expand All @@ -231,17 +227,17 @@ func TestApplyBlock(t *testing.T) {
}

// Apply the block
err = types.ValidateProposedTransition(newState, block, commit, proposer)
err = types.ValidateProposedTransition(state, block, commit, proposer)
require.NoError(err)
resp, err = executor.ExecuteBlock(state, block)
require.NoError(err)
require.NotNil(resp)
newState, err = executor.UpdateStateFromResponses(resp, state, block)
vals, err := executor.NextValSetFromResponses(state, resp, block)
require.NoError(err)
require.NotNil(newState)
assert.Equal(int64(2), newState.LastBlockHeight)
_, err = executor.Commit(&newState, block, resp)
_, _, err = executor.Commit(state, block, resp)
require.NoError(err)
executor.UpdateStateAfterCommit(state, resp, appHash, block.Header.Height, vals)
assert.Equal(uint64(2), state.Height())

// wait for at least 4 Tx events, for up to 3 second.
// 3 seconds is a fail-scenario only
Expand Down
Loading

0 comments on commit 73aae62

Please sign in to comment.