Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor: move executer to block package #638

Merged
merged 23 commits into from
Apr 16, 2024
Merged
Show file tree
Hide file tree
Changes from 14 commits
Commits
Show all changes
23 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
47 changes: 30 additions & 17 deletions block/block.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,23 +13,27 @@ import (
// applyBlock applies the block to the store and the abci app.
// steps: save block -> execute block with app -> update state -> commit block to app -> update store height and state hash.
// As the entire process can't be atomic we need to make sure the following condition apply before
// we're applying the block in the happy path: block height - 1 == abci app last block height.
// In case the following doesn't hold true, it means we crashed after the commit and before updating the store height.
// In that case we'll want to align the store with the app state and continue to the next block.
// - block height is the expected block height on the store (height + 1).
// - block height is the expected block height on the app (last block height + 1).
func (m *Manager) applyBlock(ctx context.Context, block *types.Block, commit *types.Commit, blockMetaData blockMetaData) error {
if block.Header.Height != m.store.Height()+1 {
// We crashed after the commit and before updating the store height.
m.logger.Error("Block not applied. Wrong height", "block height", block.Header.Height, "store height", m.store.Height())
return nil
Copy link
Contributor Author

@mtsitrin mtsitrin Apr 7, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

fixed bug when applying cache, as no error is returned but expected

// TODO (#330): allow genesis block with height > 0 to be applied.
// TODO: add switch case to have defined behavior for each case.
//validate block height
if block.Header.Height != m.store.NextHeight() {
m.logger.Error("Block not applied. wrong height", "block height", block.Header.Height, "expected height", m.store.NextHeight())
return types.ErrInvalidBlockHeight
}

m.logger.Debug("Applying block", "height", block.Header.Height, "source", blockMetaData.source)

// Check if alignment is needed due to incosistencies between the store and the app.
isAlignRequired, err := m.alignStoreWithApp(ctx, block)
// In the happy path we expect block height - 1 == abci app last block height.
isAlignRequired, err := m.alignStoreWithAppIfNeeded(block)
if err != nil {
return err
}
// In case the following true, it means we crashed after the commit and before updating the store height.
// In that case we'll want to align the store with the app state and continue to the next block.
if isAlignRequired {
danwt marked this conversation as resolved.
Show resolved Hide resolved
m.logger.Debug("Aligned with app state required. Skipping to next block", "height", block.Header.Height)
return nil
Expand Down Expand Up @@ -117,17 +121,23 @@ func (m *Manager) attemptApplyCachedBlocks(ctx context.Context) error {
m.applyCachedBlockMutex.Lock()
defer m.applyCachedBlockMutex.Unlock()

prevCachedBlock, exists := m.prevBlock[m.store.Height()+1]
expectedHeight := m.store.NextHeight()

for exists {
m.logger.Debug("Applying cached block", "height", m.store.Height()+1)
prevCachedBlock, blockExists := m.prevBlock[expectedHeight]
prevCachedCommit, commitExists := m.prevCommit[expectedHeight]

err := m.applyBlock(ctx, prevCachedBlock, m.prevCommit[m.store.Height()+1], blockMetaData{source: gossipedBlock})
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

bug fixed as commit cache not checked

for blockExists && commitExists {
m.logger.Debug("Applying cached block", "height", expectedHeight)
err := m.applyBlock(ctx, prevCachedBlock, prevCachedCommit, blockMetaData{source: gossipedBlock})
if err != nil {
m.logger.Debug("Failed to apply previously cached block", "err", err)
return err
}
prevCachedBlock, exists = m.prevBlock[m.store.Height()+1]

expectedHeight := m.store.NextHeight()
mtsitrin marked this conversation as resolved.
Show resolved Hide resolved

prevCachedBlock, blockExists = m.prevBlock[expectedHeight]
prevCachedCommit, commitExists = m.prevCommit[expectedHeight]
}
danwt marked this conversation as resolved.
Show resolved Hide resolved

for k := range m.prevBlock {
Expand All @@ -139,16 +149,19 @@ func (m *Manager) attemptApplyCachedBlocks(ctx context.Context) error {
return nil
}

// alignStoreWithApp is responsible for aligning the state of the store and the abci app if necessary.
func (m *Manager) alignStoreWithApp(ctx context.Context, block *types.Block) (bool, error) {
// alignStoreWithAppIfNeeded is responsible for aligning the state of the store and the abci app if necessary.
mtsitrin marked this conversation as resolved.
Show resolved Hide resolved
func (m *Manager) alignStoreWithAppIfNeeded(block *types.Block) (bool, error) {
isRequired := false
// Validate incosistency in height wasn't caused by a crash and if so handle it.
mtsitrin marked this conversation as resolved.
Show resolved Hide resolved
proxyAppInfo, err := m.executor.GetAppInfo()
if err != nil {
return isRequired, errors.Wrap(err, "failed to get app info")
return false, errors.Wrap(err, "failed to get app info")
mtsitrin marked this conversation as resolved.
Show resolved Hide resolved
}

// no alignment is required if the last block height is less than the current block height.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

comment says 'less than' but the equation is !=

// TODO: add switch case to have defined behavior for each case.
if uint64(proxyAppInfo.LastBlockHeight) != block.Header.Height {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is this definitely correct?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this is our long lasting logic, didn't want to change major parts
added TODO to validate and enforce more checks on this part

return isRequired, nil
return false, nil
}

isRequired = true
Expand Down
153 changes: 8 additions & 145 deletions state/executor.go → block/executor.go
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
package state
package block

import (
"bytes"
Expand All @@ -7,9 +7,7 @@ import (
"errors"
"time"

"github.com/cometbft/cometbft/crypto/merkle"
abci "github.com/tendermint/tendermint/abci/types"
abcitypes "github.com/tendermint/tendermint/abci/types"
tmcrypto "github.com/tendermint/tendermint/crypto/encoding"
tmstate "github.com/tendermint/tendermint/proto/tendermint/state"
tmproto "github.com/tendermint/tendermint/proto/tendermint/types"
Expand All @@ -18,7 +16,6 @@ import (
"go.uber.org/multierr"

abciconv "github.com/dymensionxyz/dymint/conv/abci"
"github.com/dymensionxyz/dymint/log"
"github.com/dymensionxyz/dymint/mempool"
"github.com/dymensionxyz/dymint/types"
)
Expand All @@ -34,12 +31,12 @@ type BlockExecutor struct {

eventBus *tmtypes.EventBus

logger log.Logger
logger types.Logger
}

// NewBlockExecutor creates new instance of BlockExecutor.
// Proposer address and namespace ID will be used in all newly created blocks.
func NewBlockExecutor(proposerAddress []byte, namespaceID string, chainID string, mempool mempool.Mempool, proxyApp proxy.AppConns, eventBus *tmtypes.EventBus, logger log.Logger) (*BlockExecutor, error) {
func NewBlockExecutor(proposerAddress []byte, namespaceID string, chainID string, mempool mempool.Mempool, proxyApp proxy.AppConns, eventBus *tmtypes.EventBus, logger types.Logger) (*BlockExecutor, error) {
bytes, err := hex.DecodeString(namespaceID)
if err != nil {
return nil, err
Expand All @@ -61,15 +58,15 @@ func NewBlockExecutor(proposerAddress []byte, namespaceID string, chainID string
// InitChain calls InitChainSync using consensus connection to app.
func (e *BlockExecutor) InitChain(genesis *tmtypes.GenesisDoc, validators []*tmtypes.Validator) (*abci.ResponseInitChain, error) {
params := genesis.ConsensusParams
valUpates := abcitypes.ValidatorUpdates{}
valUpates := abci.ValidatorUpdates{}

for _, validator := range validators {
tmkey, err := tmcrypto.PubKeyToProto(validator.PubKey)
if err != nil {
return nil, err
}

valUpates = append(valUpates, abcitypes.ValidatorUpdate{
valUpates = append(valUpates, abci.ValidatorUpdate{
PubKey: tmkey,
Power: validator.VotingPower,
})
Expand Down Expand Up @@ -101,49 +98,6 @@ func (e *BlockExecutor) InitChain(genesis *tmtypes.GenesisDoc, validators []*tmt
})
}

func (e *BlockExecutor) UpdateStateAfterInitChain(s *types.State, res *abci.ResponseInitChain, validators []*tmtypes.Validator) {
// If the app did not return an app hash, we keep the one set from the genesis doc in
// the state. We don't set appHash since we don't want the genesis doc app hash
// recorded in the genesis block. We should probably just remove GenesisDoc.AppHash.
if len(res.AppHash) > 0 {
copy(s.AppHash[:], res.AppHash)
}

//The validators after initChain must be greater than zero, otherwise this state is not loadable
if len(validators) <= 0 {
panic("Validators must be greater than zero")
}

if res.ConsensusParams != nil {
params := res.ConsensusParams
if params.Block != nil {
s.ConsensusParams.Block.MaxBytes = params.Block.MaxBytes
s.ConsensusParams.Block.MaxGas = params.Block.MaxGas
}
if params.Evidence != nil {
s.ConsensusParams.Evidence.MaxAgeNumBlocks = params.Evidence.MaxAgeNumBlocks
s.ConsensusParams.Evidence.MaxAgeDuration = params.Evidence.MaxAgeDuration
s.ConsensusParams.Evidence.MaxBytes = params.Evidence.MaxBytes
}
if params.Validator != nil {
// Copy params.Validator.PubkeyTypes, and set result's value to the copy.
// This avoids having to initialize the slice to 0 values, and then write to it again.
s.ConsensusParams.Validator.PubKeyTypes = append([]string{}, params.Validator.PubKeyTypes...)
}
if params.Version != nil {
s.ConsensusParams.Version.AppVersion = params.Version.AppVersion
}
s.Version.Consensus.App = s.ConsensusParams.Version.AppVersion
}
// We update the last results hash with the empty hash, to conform with RFC-6962.
copy(s.LastResultsHash[:], merkle.HashFromByteSlices(nil))

// Set the validators in the state
s.Validators = tmtypes.NewValidatorSet(validators).CopyIncrementProposerPriority(1)
s.NextValidators = s.Validators.Copy()
s.LastValidators = s.Validators.Copy()
}

// CreateBlock reaps transactions from mempool and builds a block.
func (e *BlockExecutor) CreateBlock(height uint64, lastCommit *types.Commit, lastHeaderHash [32]byte, state types.State) *types.Block {
maxBytes := state.ConsensusParams.Block.MaxBytes
Expand All @@ -158,7 +112,7 @@ func (e *BlockExecutor) CreateBlock(height uint64, lastCommit *types.Commit, las
App: state.Version.Consensus.App,
},
ChainID: e.chainID,
NamespaceID: e.namespaceID,
NamespaceID: e.namespaceID, //TODO: used?????
Height: height,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

good catch

Time: uint64(time.Now().UTC().UnixNano()),
LastHeaderHash: lastHeaderHash,
Expand Down Expand Up @@ -193,24 +147,6 @@ func (e *BlockExecutor) Validate(state types.State, block *types.Block, commit *
return nil
}

// UpdateStateFromResponses updates state based on the ABCIResponses.
func (e *BlockExecutor) UpdateStateFromResponses(resp *tmstate.ABCIResponses, state types.State, block *types.Block) (types.State, error) {
//Dymint ignores any setValidator responses from the app, as it is manages the validator set based on the settlement consensus
//TODO: this will be changed when supporting multiple sequencers from the hub
validatorUpdates := []*tmtypes.Validator{}

if state.ConsensusParams.Block.MaxBytes == 0 {
e.logger.Error("maxBytes=0", "state.ConsensusParams.Block", state.ConsensusParams.Block)
}

state, err := e.updateState(state, block, resp, validatorUpdates)
if err != nil {
return types.State{}, err
}

return state, nil
}

// Commit commits the block
func (e *BlockExecutor) Commit(ctx context.Context, state *types.State, block *types.Block, resp *tmstate.ABCIResponses) (int64, error) {
appHash, retainHeight, err := e.commit(ctx, state, block, resp.DeliverTxs)
Expand All @@ -229,57 +165,9 @@ func (e *BlockExecutor) Commit(ctx context.Context, state *types.State, block *t
return retainHeight, nil
}

func (e *BlockExecutor) updateState(state types.State, block *types.Block, abciResponses *tmstate.ABCIResponses, validatorUpdates []*tmtypes.Validator) (types.State, error) {
nValSet := state.NextValidators.Copy()
lastHeightValSetChanged := state.LastHeightValidatorsChanged
// Dymint can work without validators
if len(nValSet.Validators) > 0 {
if len(validatorUpdates) > 0 {
err := nValSet.UpdateWithChangeSet(validatorUpdates)
if err != nil {
return state, nil
}
// Change results from this height but only applies to the next next height.
lastHeightValSetChanged = int64(block.Header.Height + 1 + 1)
}

// TODO(tzdybal): right now, it's for backward compatibility, may need to change this
nValSet.IncrementProposerPriority(1)
}

hash := block.Header.Hash()
//TODO: we can probably pass the state as a pointer and update it directly
s := types.State{
Version: state.Version,
ChainID: state.ChainID,
InitialHeight: state.InitialHeight,
SLStateIndex: state.SLStateIndex,
LastBlockHeight: int64(block.Header.Height),
LastBlockTime: time.Unix(0, int64(block.Header.Time)),
LastBlockID: tmtypes.BlockID{
Hash: hash[:],
// for now, we don't care about part set headers
},
NextValidators: nValSet,
Validators: state.NextValidators.Copy(),
LastHeightValidatorsChanged: lastHeightValSetChanged,
ConsensusParams: state.ConsensusParams,
LastHeightConsensusParamsChanged: state.LastHeightConsensusParamsChanged,
// We're gonna update those fields only after we commit the blocks
AppHash: state.AppHash,
LastValidators: state.LastValidators.Copy(),
LastStoreHeight: state.LastStoreHeight,

LastResultsHash: state.LastResultsHash,
BaseHeight: state.BaseHeight,
}

return s, nil
}

// GetAppInfo returns the latest AppInfo from the proxyApp.
func (e *BlockExecutor) GetAppInfo() (*abcitypes.ResponseInfo, error) {
return e.proxyAppQueryConn.InfoSync(abcitypes.RequestInfo{})
func (e *BlockExecutor) GetAppInfo() (*abci.ResponseInfo, error) {
return e.proxyAppQueryConn.InfoSync(abci.RequestInfo{})
}

func (e *BlockExecutor) commit(ctx context.Context, state *types.State, block *types.Block, deliverTxs []*abci.ResponseDeliverTx) ([]byte, int64, error) {
Expand Down Expand Up @@ -469,28 +357,3 @@ func fromDymintTxs(optiTxs types.Txs) tmtypes.Txs {
}
return txs
}

// func validateValidatorUpdates(abciUpdates []abci.ValidatorUpdate,
// params tmproto.ValidatorParams) error {
// for _, valUpdate := range abciUpdates {
// if valUpdate.GetPower() < 0 {
// return fmt.Errorf("voting power can't be negative %v", valUpdate)
// } else if valUpdate.GetPower() == 0 {
// // continue, since this is deleting the validator, and thus there is no
mtsitrin marked this conversation as resolved.
Show resolved Hide resolved
// // pubkey to check
// continue
// }

// // Check if validator's pubkey matches an ABCI type in the consensus params
// pk, err := cryptoenc.PubKeyFromProto(valUpdate.PubKey)
// if err != nil {
// return err
// }

// if !tmtypes.IsValidPubkeyType(params, pk.Type()) {
// return fmt.Errorf("validator %v is using pubkey %s, which is unsupported for consensus",
// valUpdate, pk.Type())
// }
// }
// return nil
// }
8 changes: 5 additions & 3 deletions state/executor_test.go → block/executor_test.go
Original file line number Diff line number Diff line change
@@ -1,11 +1,13 @@
package state
package block_test

import (
"context"
"crypto/rand"
"testing"
"time"

"github.com/dymensionxyz/dymint/block"

"github.com/cosmos/cosmos-sdk/crypto/keys/ed25519"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/mock"
Expand Down Expand Up @@ -43,7 +45,7 @@ func TestCreateBlock(t *testing.T) {
nsID := "0102030405060708"

mpool := mempoolv1.NewTxMempool(logger, cfg.DefaultMempoolConfig(), proxy.NewAppConnMempool(abciClient), 0)
executor, err := NewBlockExecutor([]byte("test address"), nsID, "test", mpool, proxy.NewAppConns(clientCreator), nil, logger)
executor, err := block.NewBlockExecutor([]byte("test address"), nsID, "test", mpool, proxy.NewAppConns(clientCreator), nil, logger)
assert.NoError(err)

state := types.State{}
Expand Down Expand Up @@ -117,7 +119,7 @@ func TestApplyBlock(t *testing.T) {
appConns := &mocks.AppConns{}
appConns.On("Consensus").Return(abciClient)
appConns.On("Query").Return(abciClient)
executor, err := NewBlockExecutor([]byte("test address"), nsID, chainID, mpool, appConns, eventBus, logger)
executor, err := block.NewBlockExecutor([]byte("test address"), nsID, chainID, mpool, appConns, eventBus, logger)
assert.NoError(err)

// Subscribe to tx events
Expand Down
Loading
Loading