Skip to content

Commit

Permalink
feat: sequencer rotation (#992)
Browse files Browse the repository at this point in the history
  • Loading branch information
mtsitrin authored Aug 22, 2024
1 parent 25c069c commit 88ba1fe
Show file tree
Hide file tree
Showing 98 changed files with 26,960 additions and 1,283 deletions.
9 changes: 7 additions & 2 deletions .mockery.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -10,10 +10,10 @@ packages:
github.com/dymensionxyz/dymint/settlement/dymension:
interfaces:
CosmosClient:
github.com/dymensionxyz/dymension/v3/x/sequencer/types:
github.com/dymensionxyz/dymint/third_party/dymension/sequencer/types:
interfaces:
QueryClient:
github.com/dymensionxyz/dymension/v3/x/rollapp/types:
github.com/dymensionxyz/dymint/third_party/dymension/rollapp/types:
interfaces:
QueryClient:
github.com/tendermint/tendermint/abci/types:
Expand All @@ -32,5 +32,10 @@ packages:
github.com/dymensionxyz/dymint/da:
interfaces:
DataAvailabilityLayerClient:
github.com/dymensionxyz/dymint/p2p:
interfaces:
GetProposerI:




98 changes: 51 additions & 47 deletions block/block.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,8 @@ import (
// - block height is the expected block height on the store (height + 1).
// - block height is the expected block height on the app (last block height + 1).
func (m *Manager) applyBlock(block *types.Block, commit *types.Commit, blockMetaData types.BlockMetaData) error {
var retainHeight int64

// TODO: add switch case to have defined behavior for each case.
// validate block height
if block.Header.Height != m.State.NextHeight() {
Expand All @@ -30,73 +32,78 @@ func (m *Manager) applyBlock(block *types.Block, commit *types.Commit, blockMeta
if err != nil {
return fmt.Errorf("check if block is already applied: %w", err)
}
// In case the following true, it means we crashed after the commit and before updating the store height.
// In that case we'll want to align the store with the app state and continue to the next block.
// In case the following true, it means we crashed after the app commit but before updating the state
// In that case we'll want to align the state with the app commit result, as if the block was applied.
if isBlockAlreadyApplied {
// In this case, where the app was committed, but the state wasn't updated
// it will update the state from appInfo, saved responses and validators.
err := m.UpdateStateFromApp()
if err != nil {
return fmt.Errorf("update state from app: %w", err)
}
m.logger.Debug("Aligned with app state required. Skipping to next block", "height", block.Header.Height)
return nil
}
// Start applying the block assuming no inconsistency was found.
_, err = m.Store.SaveBlock(block, commit, nil)
if err != nil {
return fmt.Errorf("save block: %w", err)
}
m.logger.Info("updated state from app commit", "height", block.Header.Height)
} else {
var appHash []byte
// Start applying the block assuming no inconsistency was found.
_, err = m.Store.SaveBlock(block, commit, nil)
if err != nil {
return fmt.Errorf("save block: %w", err)
}

responses, err := m.Executor.ExecuteBlock(m.State, block)
if err != nil {
return fmt.Errorf("execute block: %w", err)
}
responses, err := m.Executor.ExecuteBlock(m.State, block)
if err != nil {
return fmt.Errorf("execute block: %w", err)
}

dbBatch := m.Store.NewBatch()
dbBatch, err = m.Store.SaveBlockResponses(block.Header.Height, responses, dbBatch)
if err != nil {
dbBatch.Discard()
return fmt.Errorf("save block responses: %w", err)
_, err = m.Store.SaveBlockResponses(block.Header.Height, responses, nil)
if err != nil {
return fmt.Errorf("save block responses: %w", err)
}

// Commit block to app
appHash, retainHeight, err = m.Executor.Commit(m.State, block, responses)
if err != nil {
return fmt.Errorf("commit block: %w", err)
}

// Update the state with the new app hash, and store height from the commit.
// Every one of those, if happens before commit, prevents us from re-executing the block in case failed during commit.
m.Executor.UpdateStateAfterCommit(m.State, responses, appHash, block.Header.Height)
}

// Get the validator changes from the app
validators := m.State.NextValidators.Copy() // TODO: this will be changed when supporting multiple sequencers from the hub
// check if the proposer needs to be changed
switchRole := m.Executor.UpdateProposerFromBlock(m.State, block)

dbBatch, err = m.Store.SaveValidators(block.Header.Height, validators, dbBatch)
// save sequencers to store to be queried over RPC
batch := m.Store.NewBatch()
batch, err = m.Store.SaveSequencers(block.Header.Height, &m.State.Sequencers, batch)
if err != nil {
dbBatch.Discard()
return fmt.Errorf("save validators: %w", err)
return fmt.Errorf("save sequencers: %w", err)
}

err = dbBatch.Commit()
batch, err = m.Store.SaveState(m.State, batch)
if err != nil {
return fmt.Errorf("commit batch to disk: %w", err)
return fmt.Errorf("update state: %w", err)
}

// Commit block to app
appHash, retainHeight, err := m.Executor.Commit(m.State, block, responses)
err = batch.Commit()
if err != nil {
return fmt.Errorf("commit block: %w", err)
return fmt.Errorf("commit state: %w", err)
}

// If failed here, after the app committed, but before the state is updated, we'll update the state on
// UpdateStateFromApp using the saved responses and validators.
types.RollappHeightGauge.Set(float64(block.Header.Height))

// Update the state with the new app hash, last validators and store height from the commit.
// Every one of those, if happens before commit, prevents us from re-executing the block in case failed during commit.
m.Executor.UpdateStateAfterCommit(m.State, responses, appHash, block.Header.Height, validators)
_, err = m.Store.SaveState(m.State, nil)
if err != nil {
return fmt.Errorf("update state: %w", err)
}
// Prune old heights, if requested by ABCI app.
if 0 < retainHeight {
err = m.PruneBlocks(uint64(retainHeight))
if err != nil {
m.logger.Error("prune blocks", "retain_height", retainHeight, "err", err)
}
}

if switchRole {
// TODO: graceful role change (https://github.com/dymensionxyz/dymint/issues/1008)
m.logger.Info("Node changing to proposer role")
panic("sequencer is no longer the proposer")
}
return nil
}

Expand Down Expand Up @@ -125,7 +132,7 @@ func (m *Manager) attemptApplyCachedBlocks() error {
if !blockExists {
break
}
if err := m.validateBlock(cachedBlock.Block, cachedBlock.Commit); err != nil {
if err := m.validateBlockBeforeApply(cachedBlock.Block, cachedBlock.Commit); err != nil {
m.blockCache.Delete(cachedBlock.Block.Header.Height)
// TODO: can we take an action here such as dropping the peer / reducing their reputation?
return fmt.Errorf("block not valid at height %d, dropping it: err:%w", cachedBlock.Block.Header.Height, err)
Expand All @@ -143,10 +150,7 @@ func (m *Manager) attemptApplyCachedBlocks() error {
return nil
}

func (m *Manager) validateBlock(block *types.Block, commit *types.Commit) error {
// Currently we're assuming proposer is never nil as it's a pre-condition for
// dymint to start
proposer := m.SLClient.GetProposer()

return types.ValidateProposedTransition(m.State, block, commit, proposer)
// This function validates the block and commit against the state before applying it.
func (m *Manager) validateBlockBeforeApply(block *types.Block, commit *types.Commit) error {
return types.ValidateProposedTransition(m.State, block, commit, m.GetProposerPubKey())
}
28 changes: 8 additions & 20 deletions block/executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,11 +45,12 @@ func NewExecutor(localAddress []byte, chainID string, mempool mempool.Mempool, p
}

// InitChain calls InitChainSync using consensus connection to app.
func (e *Executor) InitChain(genesis *tmtypes.GenesisDoc, validators []*tmtypes.Validator) (*abci.ResponseInitChain, error) {
func (e *Executor) InitChain(genesis *tmtypes.GenesisDoc, valset []*tmtypes.Validator) (*abci.ResponseInitChain, error) {
params := genesis.ConsensusParams
valUpates := abci.ValidatorUpdates{}

for _, validator := range validators {
// prepare the validator updates as expected by the ABCI app
for _, validator := range valset {
tmkey, err := tmcrypto.PubKeyToProto(validator.PubKey)
if err != nil {
return nil, err
Expand Down Expand Up @@ -88,7 +89,7 @@ func (e *Executor) InitChain(genesis *tmtypes.GenesisDoc, validators []*tmtypes.
}

// CreateBlock reaps transactions from mempool and builds a block.
func (e *Executor) CreateBlock(height uint64, lastCommit *types.Commit, lastHeaderHash [32]byte, state *types.State, maxBlockDataSizeBytes uint64) *types.Block {
func (e *Executor) CreateBlock(height uint64, lastCommit *types.Commit, lastHeaderHash, nextSeqHash [32]byte, state *types.State, maxBlockDataSizeBytes uint64) *types.Block {
if state.ConsensusParams.Block.MaxBytes > 0 {
maxBlockDataSizeBytes = min(maxBlockDataSizeBytes, uint64(state.ConsensusParams.Block.MaxBytes))
}
Expand Down Expand Up @@ -117,9 +118,10 @@ func (e *Executor) CreateBlock(height uint64, lastCommit *types.Commit, lastHead
},
LastCommit: *lastCommit,
}
copy(block.Header.LastCommitHash[:], e.getLastCommitHash(lastCommit, &block.Header))
copy(block.Header.DataHash[:], e.getDataHash(block))
copy(block.Header.SequencersHash[:], state.Validators.Hash())
copy(block.Header.LastCommitHash[:], types.GetLastCommitHash(lastCommit, &block.Header))
copy(block.Header.DataHash[:], types.GetDataHash(block))
copy(block.Header.SequencerHash[:], state.Sequencers.ProposerHash())
copy(block.Header.NextSequencersHash[:], nextSeqHash[:])

return block
}
Expand Down Expand Up @@ -197,8 +199,6 @@ func (e *Executor) ExecuteBlock(state *types.State, block *types.Block) (*tmstat

hash := block.Hash()
abciHeader := types.ToABCIHeaderPB(&block.Header)
abciHeader.ChainID = e.chainID
abciHeader.ValidatorsHash = state.Validators.Hash()
abciResponses.BeginBlock, err = e.proxyAppConsensusConn.BeginBlockSync(
abci.RequestBeginBlock{
Hash: hash[:],
Expand Down Expand Up @@ -228,18 +228,6 @@ func (e *Executor) ExecuteBlock(state *types.State, block *types.Block) (*tmstat
return abciResponses, nil
}

func (e *Executor) getLastCommitHash(lastCommit *types.Commit, header *types.Header) []byte {
lastABCICommit := types.ToABCICommit(lastCommit, header)
return lastABCICommit.Hash()
}

func (e *Executor) getDataHash(block *types.Block) []byte {
abciData := tmtypes.Data{
Txs: types.ToABCIBlockDataTxs(&block.Data),
}
return abciData.Hash()
}

func (e *Executor) publishEvents(resp *tmstate.ABCIResponses, block *types.Block) error {
if e.eventBus == nil {
return nil
Expand Down
50 changes: 28 additions & 22 deletions block/executor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,9 @@ import (

"github.com/dymensionxyz/dymint/block"

cryptocodec "github.com/cosmos/cosmos-sdk/crypto/codec"
"github.com/cosmos/cosmos-sdk/crypto/keys/ed25519"

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/mock"
"github.com/stretchr/testify/require"
Expand All @@ -28,6 +30,7 @@ import (
"github.com/dymensionxyz/dymint/types"
)

// TODO: test UpdateProposerFromBlock
func TestCreateBlock(t *testing.T) {
assert := assert.New(t)
require := require.New(t)
Expand All @@ -49,21 +52,27 @@ func TestCreateBlock(t *testing.T) {

maxBytes := uint64(100)

// Create a valid proposer for the block
proposerKey := ed25519.GenPrivKey()
tmPubKey, err := cryptocodec.ToTmPubKeyInterface(proposerKey.PubKey())
require.NoError(err)

// Init state
state := &types.State{}
state.Sequencers.SetProposer(types.NewSequencerFromValidator(*tmtypes.NewValidator(tmPubKey, 1)))
state.ConsensusParams.Block.MaxBytes = int64(maxBytes)
state.ConsensusParams.Block.MaxGas = 100000
state.Validators = tmtypes.NewValidatorSet(nil)

// empty block
block := executor.CreateBlock(1, &types.Commit{}, [32]byte{}, state, maxBytes)
block := executor.CreateBlock(1, &types.Commit{}, [32]byte{}, [32]byte(state.Sequencers.ProposerHash()[:]), state, maxBytes)
require.NotNil(block)
assert.Empty(block.Data.Txs)
assert.Equal(uint64(1), block.Header.Height)

// one small Tx
err = mpool.CheckTx([]byte{1, 2, 3, 4}, func(r *abci.Response) {}, mempool.TxInfo{})
require.NoError(err)
block = executor.CreateBlock(2, &types.Commit{}, [32]byte{}, state, maxBytes)
block = executor.CreateBlock(2, &types.Commit{}, [32]byte{}, [32]byte(state.Sequencers.ProposerHash()), state, maxBytes)
require.NotNil(block)
assert.Equal(uint64(2), block.Header.Height)
assert.Len(block.Data.Txs, 1)
Expand All @@ -73,7 +82,7 @@ func TestCreateBlock(t *testing.T) {
require.NoError(err)
err = mpool.CheckTx(make([]byte, 100), func(r *abci.Response) {}, mempool.TxInfo{})
require.NoError(err)
block = executor.CreateBlock(3, &types.Commit{}, [32]byte{}, state, maxBytes)
block = executor.CreateBlock(3, &types.Commit{}, [32]byte{}, [32]byte(state.Sequencers.ProposerHash()), state, maxBytes)
require.NotNil(block)
assert.Len(block.Data.Txs, 2)
}
Expand Down Expand Up @@ -136,11 +145,14 @@ func TestApplyBlock(t *testing.T) {
require.NoError(err)
require.NotNil(headerSub)

// Create a valid proposer for the block
proposerKey := ed25519.GenPrivKey()
tmPubKey, err := cryptocodec.ToTmPubKeyInterface(proposerKey.PubKey())
require.NoError(err)

// Init state
state := &types.State{
NextValidators: tmtypes.NewValidatorSet(nil),
Validators: tmtypes.NewValidatorSet(nil),
}
state := &types.State{}
state.Sequencers.SetProposer(types.NewSequencerFromValidator(*tmtypes.NewValidator(tmPubKey, 1)))
state.InitialHeight = 1
state.SetHeight(0)
maxBytes := uint64(100)
Expand All @@ -150,16 +162,11 @@ func TestApplyBlock(t *testing.T) {
// Create first block with one Tx from mempool
_ = mpool.CheckTx([]byte{1, 2, 3, 4}, func(r *abci.Response) {}, mempool.TxInfo{})
require.NoError(err)
block := executor.CreateBlock(1, &types.Commit{Height: 0}, [32]byte{}, state, maxBytes)
block := executor.CreateBlock(1, &types.Commit{Height: 0}, [32]byte{}, [32]byte(state.Sequencers.ProposerHash()), state, maxBytes)
require.NotNil(block)
assert.Equal(uint64(1), block.Header.Height)
assert.Len(block.Data.Txs, 1)

// Create proposer for the block
proposerKey := ed25519.GenPrivKey()
proposer := &types.Sequencer{
PublicKey: proposerKey.PubKey(),
}
// Create commit for the block
abciHeaderPb := types.ToABCIHeaderPB(&block.Header)
abciHeaderBytes, err := abciHeaderPb.Marshal()
Expand All @@ -173,14 +180,15 @@ func TestApplyBlock(t *testing.T) {
}

// Apply the block
err = types.ValidateProposedTransition(state, block, commit, proposer)
err = types.ValidateProposedTransition(state, block, commit, state.Sequencers.GetProposerPubKey())
require.NoError(err)

resp, err := executor.ExecuteBlock(state, block)
require.NoError(err)
require.NotNil(resp)
appHash, _, err := executor.Commit(state, block, resp)
require.NoError(err)
executor.UpdateStateAfterCommit(state, resp, appHash, block.Header.Height, state.Validators)
executor.UpdateStateAfterCommit(state, resp, appHash, block.Header.Height)
assert.Equal(uint64(1), state.Height())
assert.Equal(mockAppHash, state.AppHash)

Expand All @@ -189,7 +197,7 @@ func TestApplyBlock(t *testing.T) {
require.NoError(mpool.CheckTx([]byte{5, 6, 7, 8, 9}, func(r *abci.Response) {}, mempool.TxInfo{}))
require.NoError(mpool.CheckTx([]byte{1, 2, 3, 4, 5}, func(r *abci.Response) {}, mempool.TxInfo{}))
require.NoError(mpool.CheckTx(make([]byte, 90), func(r *abci.Response) {}, mempool.TxInfo{}))
block = executor.CreateBlock(2, commit, [32]byte{}, state, maxBytes)
block = executor.CreateBlock(2, commit, [32]byte{}, [32]byte(state.Sequencers.ProposerHash()), state, maxBytes)
require.NotNil(block)
assert.Equal(uint64(2), block.Header.Height)
assert.Len(block.Data.Txs, 3)
Expand All @@ -210,8 +218,7 @@ func TestApplyBlock(t *testing.T) {
}

// Apply the block with an invalid commit
err = types.ValidateProposedTransition(state, block, invalidCommit, proposer)

err = types.ValidateProposedTransition(state, block, invalidCommit, state.Sequencers.GetProposerPubKey())
require.ErrorIs(err, types.ErrInvalidSignature)

// Create a valid commit for the block
Expand All @@ -224,15 +231,14 @@ func TestApplyBlock(t *testing.T) {
}

// Apply the block
err = types.ValidateProposedTransition(state, block, commit, proposer)
err = types.ValidateProposedTransition(state, block, commit, state.Sequencers.GetProposerPubKey())
require.NoError(err)
resp, err = executor.ExecuteBlock(state, block)
require.NoError(err)
require.NotNil(resp)
vals := state.NextValidators.Copy() // TODO: this will be changed when supporting multiple sequencers from the hub
_, _, err = executor.Commit(state, block, resp)
require.NoError(err)
executor.UpdateStateAfterCommit(state, resp, appHash, block.Header.Height, vals)
executor.UpdateStateAfterCommit(state, resp, appHash, block.Header.Height)
assert.Equal(uint64(2), state.Height())

// wait for at least 4 Tx events, for up to 3 second.
Expand Down
Loading

0 comments on commit 88ba1fe

Please sign in to comment.